zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Elasticsearch Java REST Client 批量操作(Bulk API)

JAVA批量elasticsearchAPI 操作 Client rest bulk
2023-09-11 14:19:18 时间

上一篇:Elasticsearch Java REST Client Term Vectors API
下一篇:Elasticsearch Java REST Client Search APIs 查询

BulkRequest

BulkRequest可用于使用单个请求执行多个索引、更新和/或删除操作。
它需要至少一个操作添加到 Bulk 请求中:

# 方式一:
@GetMapping("test")
public String test() throws IOException {
    BulkRequest request = new BulkRequest();
    request.add(new IndexRequest("edu-app-user", "doc", "1")
            .source(XContentType.JSON, "name", "foo"));
    request.add(new IndexRequest("edu-app-user", "doc", "2")
            .source(XContentType.JSON, "name", "elastic"));
    request.add(new IndexRequest("edu-app-user", "doc", "3")
            .source(XContentType.JSON, "name", "wdz"));

    BulkResponse bulk = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
    return response(bulk);
}
# 混合操作
@GetMapping("test1")
public String test1() throws IOException {
     BulkRequest request = new BulkRequest();
     request.add(new DeleteRequest("edu-app-user", "doc", "3"));
     request.add(new UpdateRequest("edu-app-user", "doc", "2")
             .doc(XContentType.JSON, "name", "update"));
     request.add(new IndexRequest("edu-app-user", "doc", "4")
             .source(XContentType.JSON, "name", "baz"));
     BulkResponse bulk = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
     return response(bulk);
 }
private String response(BulkResponse bulk){
    String str = "";
    for (BulkItemResponse responses : bulk) {
        System.out.println(responses.toString());
        DocWriteResponse response = responses.getResponse();
        switch (responses.getOpType()) {
            case CREATE:
                System.out.println("创建数据-----------------");
                break;
            case INDEX:
                IndexResponse indexResponse = (IndexResponse) response;
                System.out.println("操作索引数据-----------------"+indexResponse.toString());
                str = indexResponse.toString();
                break;
            case DELETE:
                DeleteResponse deleteResponse = (DeleteResponse) response;
                System.out.println("操作删除数据-----------------"+deleteResponse.toString());
                str = deleteResponse.toString();
                break;
            case UPDATE:
                UpdateResponse updateResponse = (UpdateResponse) response;
                System.out.println("操作更新数据-----------------"+updateResponse.toString());
                str = updateResponse.toString();
                break ;
        }
    }
    return str;
}
# 异步处理
@GetMapping("test3")
public void test3() throws IOException {
    BulkRequest request = new BulkRequest();
    request.add(new DeleteRequest("edu-app-user", "doc", "3"));
    request.add(new UpdateRequest("edu-app-user", "doc", "100")
            .doc(XContentType.JSON, "name", "update"));
    request.add(new IndexRequest("edu-app-user", "doc", "1")
            .source(XContentType.JSON, "name", "baz"));
    restHighLevelClient.bulkAsync(request, RequestOptions.DEFAULT,new BulkListen());
}
# 监听
package com.wdz.es.config.es;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateResponse;

public class BulkListen implements ActionListener<BulkResponse> {
    @Override
    public void onResponse(BulkResponse bulkItemResponses) {
        String response = response(bulkItemResponses);
        System.out.println("异步成功: "+response);
    }

    @Override
    public void onFailure(Exception e) {
        System.out.println("异步处理失败:"+e.getMessage());
    }

    public static String response(BulkResponse bulk) {
        String str = "";
        for (BulkItemResponse responses : bulk) {
            System.out.println(responses.toString());
            DocWriteResponse response = responses.getResponse();

            switch (responses.getOpType()) {
                case CREATE:
                    System.out.println("创建数据-----------------");
                    break;
                case INDEX:
                    IndexResponse indexResponse = (IndexResponse) response;
                    System.out.println("操作索引数据-----------------" + indexResponse.toString());
                    str = indexResponse.toString();
                    break;
                case DELETE:
                    DeleteResponse deleteResponse = (DeleteResponse) response;
                    System.out.println("操作删除数据-----------------" + deleteResponse.toString());
                    str = deleteResponse.toString();
                    break;
                case UPDATE:
                    UpdateResponse updateResponse = (UpdateResponse) response;
                    System.out.println("操作更新数据-----------------" + updateResponse.toString());
                    str = updateResponse.toString();
                    break;
            }
            // 获取失败的处理
            BulkItemResponse.Failure failure = responses.getFailure();
            System.out.println(failure.toString());
        }
        return str;
    }
}

# 结果
org.elasticsearch.action.bulk.BulkItemResponse@6ca820cd
操作删除数据-----------------DeleteResponse[index=edu-app-user,type=doc,id=3,version=4,result=deleted,shards=ShardInfo{total=2, successful=1, failures=[]}]
org.elasticsearch.action.bulk.BulkItemResponse@25ae673
操作更新数据-----------------UpdateResponse[index=edu-app-user,type=doc,id=2,version=4,seqNo=5,primaryTerm=1,result=updated,shards=ShardInfo{total=2, successful=1, failures=[]}]
org.elasticsearch.action.bulk.BulkItemResponse@5e6d6373
操作索引数据-----------------IndexResponse[index=edu-app-user,type=doc,id=4,version=1,result=created,seqNo=6,primaryTerm=1,shards={"total":2,"successful":1,"failed":0}]

可选操作

# 设置超时时间(两种方式)
request.timeout(TimeValue.timeValueMinutes(2)); 
request.timeout("2m"); 
# 设置在继续执行索引/更新/删除操作之前必须处于活动状态的分片副本数。
request.waitForActiveShards(2); 
# 提供的分片副本数ActiveShardCount: 可以是 ActiveShardCount.ALL,ActiveShardCount.ONE或 ActiveShardCount.DEFAULT(默认)
request.waitForActiveShards(ActiveShardCount.ALL); 	

批处理 BulkProcessor

@GetMapping("test2")
public void test2() throws IOException {
   BulkProcessor.Listener listener = new BulkProcessor.Listener() {
       @Override
       public void beforeBulk(long executionId, BulkRequest request) {
           System.out.println(executionId+"批处理之前request:"+JSONObject.toJSONString(request));
       }
       @Override
       public void afterBulk(long executionId, BulkRequest request,
                             BulkResponse response) {
           System.out.println(executionId+"批处理之后request:"+JSONObject.toJSONString(request));
           System.out.println(executionId+"批处理之后response:"+JSONObject.toJSONString(response));
       }
       @Override
       public void afterBulk(long executionId, BulkRequest request,
                             Throwable failure) {
           System.out.println(executionId+"批处理异常request:"+JSONObject.toJSONString(request));
           System.out.println(executionId+"批处理异常failure:"+failure.getMessage());
       }
   };

   BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
           (request, bulkListener) ->
                   restHighLevelClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
   BulkProcessor bulkProcessor =
            BulkProcessor.builder(bulkConsumer, listener).build();
   bulkProcessor.add(new IndexRequest("edu-app-user","doc","1002").source(XContentType.JSON,"name","BulkProcessor"));
   bulkProcessor.add(new DeleteRequest("edu-app-user","doc","2"));
   bulkProcessor.add(new UpdateRequest("edu-app-user","doc","28").doc(XContentType.JSON,"name","更新测试"));
   // 这两种方法都在关闭处理器之前刷新添加到处理器的请求,并且还禁止向其添加任何新请求
   // 直到所有请求都已处理或指定的等待时间过去
   try {
       bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
   } catch (InterruptedException e) {
       e.printStackTrace();
   }
   // 该close()方法可用于立即关闭BulkProcessor
   bulkProcessor.close();
   System.out.println(JSONObject.toJSONString(bulkConsumer));
   System.out.println(JSONObject.toJSONString(add));
   System.out.println(JSONObject.toJSONString(bulkProcessor));
}
# 异常结果
1批处理之前request:{"description":"requests[1], indices[bulk-test]","parentTask":{"id":-1,"nodeId":"","set":false},"refreshPolicy":"NONE","shouldStoreResult":false}
1批处理异常request:{"description":"requests[1], indices[bulk-test]","parentTask":{"id":-1,"nodeId":"","set":false},"refreshPolicy":"NONE","shouldStoreResult":false}
1批处理异常failure:Validation Failed: 1: source is missing;2: content type is missing;
{}
{}
{}
# 成功结果
1批处理之前request:{"description":"requests[1], indices[edu-app-user]","parentTask":{"id":-1,"nodeId":"","set":false},"refreshPolicy":"NONE","shouldStoreResult":false}
{}
{}
{}
1批处理之后request:{"description":"requests[1], indices[edu-app-user]","parentTask":{"id":-1,"nodeId":"","set":false},"refreshPolicy":"NONE","shouldStoreResult":false}
1批处理之后response:{"fragment":false,"ingestTook":{"days":0,"daysFrac":-1.1574074074074074E-8,"hours":0,"hoursFrac":-2.7777777777777776E-7,"micros":-1000,"microsFrac":-1000.0,"millis":-1,"millisFrac":-1.0,"minutes":0,"minutesFrac":-1.6666666666666667E-5,"nanos":-1000000,"seconds":0,"secondsFrac":-0.001,"stringRep":"-1"},"ingestTookInMillis":-1,"items":[{"failed":false,"fragment":false,"id":"100","index":"edu-app-user","itemId":0,"opType":"INDEX","response":{"fragment":false,"id":"100","index":"edu-app-user","primaryTerm":1,"result":"CREATED","seqNo":1,"shardId":{"fragment":true,"id":-1,"index":{"fragment":false,"name":"edu-app-user","uUID":"_na_"},"indexName":"edu-app-user"},"shardInfo":{"failed":0,"failures":[],"fragment":false,"successful":1,"total":2},"type":"doc","version":1},"type":"doc","version":1}],"took":{"days":0,"daysFrac":1.0300925925925926E-6,"hours":0,"hoursFrac":2.4722222222222223E-5,"micros":89000,"microsFrac":89000.0,"millis":89,"millisFrac":89.0,"minutes":0,"minutesFrac":0.0014833333333333332,"nanos":89000000,"seconds":0,"secondsFrac":0.089,"stringRep":"89ms"}}

# 混合批处理结果
1批处理之前request:{"description":"requests[3], indices[edu-app-user]","parentTask":{"id":-1,"nodeId":"","set":false},"refreshPolicy":"NONE","shouldStoreResult":false}
{}
{}
{}
1批处理之后request:{"description":"requests[3], indices[edu-app-user]","parentTask":{"id":-1,"nodeId":"","set":false},"refreshPolicy":"NONE","shouldStoreResult":false}
1批处理之后response:{"fragment":false,"ingestTook":{"days":0,"daysFrac":-1.1574074074074074E-8,"hours":0,"hoursFrac":-2.7777777777777776E-7,"micros":-1000,"microsFrac":-1000.0,"millis":-1,"millisFrac":-1.0,"minutes":0,"minutesFrac":-1.6666666666666667E-5,"nanos":-1000000,"seconds":0,"secondsFrac":-0.001,"stringRep":"-1"},"ingestTookInMillis":-1,"items":[{"failed":false,"fragment":false,"id":"1002","index":"edu-app-user","itemId":0,"opType":"INDEX","response":{"fragment":false,"id":"1002","index":"edu-app-user","primaryTerm":1,"result":"CREATED","seqNo":16,"shardId":{"fragment":true,"id":-1,"index":{"fragment":false,"name":"edu-app-user","uUID":"_na_"},"indexName":"edu-app-user"},"shardInfo":{"failed":0,"failures":[],"fragment":false,"successful":1,"total":2},"type":"doc","version":1},"type":"doc","version":1},{"failed":false,"fragment":false,"id":"2","index":"edu-app-user","itemId":1,"opType":"DELETE","response":{"fragment":false,"id":"2","index":"edu-app-user","primaryTerm":1,"result":"DELETED","seqNo":7,"shardId":{"fragment":true,"id":-1,"index":{"fragment":false,"name":"edu-app-user","uUID":"_na_"},"indexName":"edu-app-user"},"shardInfo":{"failed":0,"failures":[],"fragment":false,"successful":1,"total":2},"type":"doc","version":5},"type":"doc","version":5},{"failed":false,"fragment":false,"id":"28","index":"edu-app-user","itemId":2,"opType":"UPDATE","response":{"fragment":false,"id":"28","index":"edu-app-user","primaryTerm":1,"result":"UPDATED","seqNo":17,"shardId":{"fragment":true,"id":-1,"index":{"fragment":false,"name":"edu-app-user","uUID":"_na_"},"indexName":"edu-app-user"},"shardInfo":{"failed":0,"failures":[],"fragment":false,"successful":1,"total":2},"type":"doc","version":8},"type":"doc","version":8}],"took":{"days":0,"daysFrac":1.0069444444444445E-6,"hours":0,"hoursFrac":2.4166666666666667E-5,"micros":87000,"microsFrac":87000.0,"millis":87,"millisFrac":87.0,"minutes":0,"minutesFrac":0.00145,"nanos":87000000,"seconds":0,"secondsFrac":0.087,"stringRep":"87ms"}}

multi Get API 多获取

multiGetAPI 在单个 http 请求中并行执行多个请求get 。
MultiGetRequest,添加 `MultiGetRequest.Item 来配置要获取的内容:

@GetMapping("get")
public MultiGetResponse get(){
     MultiGetRequest request = new MultiGetRequest();
     MultiGetRequest.Item item = new MultiGetRequest.Item(index, type, "28");
     MultiGetRequest.Item item1 = new MultiGetRequest.Item(index, type, "1");
     MultiGetRequest.Item item2 = new MultiGetRequest.Item(index, type, "3");
     request.add(item);
     request.add(item1);
     request.add(item2);
     MultiGetResponse mget = null;
     try {
         mget = restHighLevelClient.mget(request, RequestOptions.DEFAULT);
     } catch (IOException e) {
         e.printStackTrace();
     }
     return mget;
 }

可选参数:

# 禁用抓取_source
new MultiGetRequest.Item("index", "type", "example_id").fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE)

# 过滤数据
String[] includes = new String[] {"name", "age"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.add(item.fetchSourceContext(fetchSourceContext));
    

多获取异步处理方式与其他异步一致更新泛型即可

上一篇:Elasticsearch Java REST Client Term Vectors API
下一篇:Elasticsearch Java REST Client Search APIs 查询