elasticsearch es java api Using Bulk Processor
The BulkProcessor
class offers a simple interface to flush bulk operations automatically based on the number or size of requests, or after a given period.
To use it, first create a BulkProcessor
instance:
import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; BulkProcessor bulkProcessor = BulkProcessor.builder( client, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { ... } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { ... } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { ... } }) .setBulkActions(10000) .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) .setFlushInterval(TimeValue.timeValueSeconds(5)) .setConcurrentRequests(1) .setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) .build();
Add your Elasticsearch client |
|
This method is called just before bulk is executed. You can for example see the numberOfActions with |
|
This method is called after bulk execution. You can for example check if there was some failing requests with |
|
This method is called when the bulk failed and raised a |
|
We want to execute the bulk every 10 000 requests |
|
We want to flush the bulk every 5mb |
|
We want to flush the bulk every 5 seconds whatever the number of requests |
|
Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests. |
|
Set a custom backoff policy which will initially wait for 100ms, increase exponentially and retries up to three times. A retry is attempted whenever one or more bulk item requests have failed with an |
By default, BulkProcessor
:
- sets bulkActions to
1000
- sets bulkSize to
5mb
- does not set flushInterval
- sets concurrentRequests to 1, which means an asynchronous execution of the flush operation.
- sets backoffPolicy to an exponential backoff with 8 retries and a start delay of 50ms. The total wait time is roughly 5.1 seconds.
Then you can simply add your requests to the BulkProcessor
:
bulkProcessor.add(new IndexRequest("twitter", "_doc", "1").source(/* your doc here */)); bulkProcessor.add(new DeleteRequest("twitter", "_doc", "2"));
Closing the Bulk Processoredit
When all documents are loaded to the BulkProcessor
it can be closed by using awaitClose
or close
methods:
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
or
bulkProcessor.close();
Both methods flush any remaining documents and disable all other scheduled flushes, if they were scheduled by setting flushInterval
. If concurrent requests were enabled, the awaitClose
method waits for up to the specified timeout for all bulk requests to complete then returns true
; if the specified waiting time elapses before all bulk requests complete, false
is returned. The close
method doesn’t wait for any remaining bulk requests to complete and exits immediately.
If you are running tests with Elasticsearch and are using the BulkProcessor
to populate your dataset you should better set the number of concurrent requests to 0
so the flush operation of the bulk will be executed in a synchronous manner:
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ }) .setBulkActions(10000) .setConcurrentRequests(0) .build(); // Add your requests bulkProcessor.add(/* Your requests */); // Flush any remaining requests bulkProcessor.flush(); // Or close the bulkProcessor if you don't need it anymore bulkProcessor.close(); // Refresh your indices client.admin().indices().prepareRefresh().get(); // Now you can start searching! client.prepareSearch().get();
Global parameters can be specified on the BulkRequest as well as BulkProcessor, similar to the REST API. These global parameters serve as defaults and can be overridden by local parameters specified on each sub request. Some parameters have to be set before any sub request is added - index, type - and you have to specify them during BulkRequest or BulkProcessor creation. Some are optional - pipeline, routing - and can be specified at any point before the bulk is sent.
try (BulkProcessor processor = initBulkProcessorBuilder(listener) .setGlobalIndex("tweets") .setGlobalType("_doc") .setGlobalRouting("routing") .setGlobalPipeline("pipeline_id") .build()) { processor.add(new IndexRequest() .source(XContentType.JSON, "user", "some user")); processor.add(new IndexRequest("blogs").id("1") .source(XContentType.JSON, "title", "some title")); }
global parameters from the BulkRequest will be applied on a sub request |
|
local pipeline parameter on a sub request will override global parameters from BulkRequest |
BulkRequest request = new BulkRequest(); request.pipeline("globalId"); request.add(new IndexRequest("test").id("1") .source(XContentType.JSON, "field", "bulk1") .setPipeline("perIndexId")); request.add(new IndexRequest("test").id("2") .source(XContentType.JSON, "field", "bulk2"));
local pipeline parameter on a sub request will override global pipeline from the BulkRequest |
|
global parameter from the BulkRequest will be applied on a sub request |
相关文章
- java集合(3)- Java中的equals和hashCode方法详解
- java虚拟机学习-JVM内存管理:深入Java内存区域与OOM(3)
- Java 对象的序列化、反序列化
- C# vs Java
- Java中如何实现类似C++的struct、Dephi的Type Record等结构体方法?
- Java Date Time 教程-java.sql.Timestamp
- JAVA础--利用java反射机制绕过编译时的类型检查和访问控制检查
- JAVA单元测试框架-5-timeOut测试
- 【Java】整理关于java的String类,equals函数和比较操作符的区别
- 10 Java的方法 可变参数
- 【Java】java: 无法访问org.testng.annotations.Test
- 【Java】时间转换(可用于计算耗时场景)
- 第二十八节:Java基础-进阶继承,抽象类,接口
- 【Java】Eclipse如何创建java项目并运行
- Java基础语法:1.第一个java程序
- 2019年全国高校计算机能力挑战赛初赛java语言解答
- Java 并发工具包 java.util.concurrent 大全
- java整理软件--- Java OCR 图像智能字符识别技术,可识别中文,但是验证码不可以识别...已测识别中文效果很好
- JAVA 睡眠、定时任务
- 正确使用MySQL JDBC setFetchSize()方法解决JDBC处理大结果集 java.lang.OutOfMemoryError: Java heap space
- 浅析Java对集合进行操作时报java.util.ConcurrentModificationException并发修改异常问题:产生原因、单线程/多线程环境解决、CopyOnWriteArrayList线程安全的ArrayList、fail-fast快速失败机制防止多线程修改集合造成并发问题
- 『Java练习生的自我修养』java-se进阶² • 并发与多线程
- JAVA学习.java.sql.date 与java.util.date以及gettime()方法的分析
- Java中的反射如何理解——精简
- 【JAVA】【NIO】10、Java NIO ServerSocketChannel
- 蓝桥杯VIP试题 之 基础练习 Sine之舞 - JAVA
- Java中的JSON对象的使用
- Java Socket与操作系统的关系
- Java虚拟机解析篇之---内存模型
- 深入理解Java:类加载机制及反射
- 亿级别记录的mongodb批量导入Es的java代码完整实现
- Java小白入门200例61之java中Date日期类型的大小比较
- Java IDEA 基本设置