Bulk API


承接上文,使用Java High Level REST Client操作elasticsearch

Bulk API

高級客戶端提供了批量處理器以協助批量請求

Bulk Request

BulkRequest可以在一次請求中執行多個索引,更新或者刪除操作。一次請求至少需要一個操作。

        //創建BulkRequest實例
        BulkRequest request = new BulkRequest();
        //使用IndexRequest添加三個文檔,不清楚用法可以參考Index API
        request.add(new IndexRequest("posts", "doc", "1")
                .source(XContentType.JSON,"field", "foo"));
        request.add(new IndexRequest("posts", "doc", "2")
                .source(XContentType.JSON,"field", "bar"));
        request.add(new IndexRequest("posts", "doc", "3")
                .source(XContentType.JSON,"field", "baz"));

Bulk API僅支持以JSON或SMILE編碼的文檔。 提供任何其他格式的文檔將導致錯誤。

同一個BulkRequest可以添加不同類型的操作。

      // 添加 DeleteRequest到BulkRequest,不清楚用法可以參考Delete API
        request.add(new DeleteRequest("posts", "doc", "3"));
        // 添加 UpdateRequest到BulkRequest,不清楚用法可以參考Update API
        request.add(new UpdateRequest("posts", "doc", "2")
                .doc(XContentType.JSON, "other", "test"));
        // 添加 一個使用SMILE格式的IndexRequest
        request.add(new IndexRequest("posts", "doc", "4")
                .source(XContentType.SMILE, "field", "baz"));

可選參數

//設置超時,等待批處理被執行的超時時間(使用TimeValue形式)
request.timeout(TimeValue.timeValueMinutes(2)); 
//設置超時,等待批處理被執行的超時時間(字符串形式)
request.timeout("2m"); 
//刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);//WriteRequest.RefreshPolicy實例方式
request.setRefreshPolicy("wait_for");//字符串方式
//設置在執行索引/更新/刪除操作之前必須處於活動狀態的分片副本數。
request.waitForActiveShards(2);
//使用ActiveShardCount方式來提供分片副本數:可以是ActiveShardCount.ALL,ActiveShardCount.ONE或ActiveShardCount.DEFAULT(默認)
request.waitForActiveShards(ActiveShardCount.ALL);

同步執行

BulkResponse bulkResponse = client.bulk(request);

異步執行

 批量請求的異步執行需要將BulkRequest實例和ActionListener實例傳遞給異步方法:

//當BulkRequest執行完成時,ActionListener會被調用
client.bulkAsync(request, listener);

異步方法不會阻塞並會立即返回。完成后,如果執行成功完成,則使用onResponse方法回調ActionListener,如果失敗則使用onFailure方法。
BulkResponse 的典型監聽器如下所示:

ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
    @Override
    public void onResponse(BulkResponse bulkResponse) {
        //執行成功完成時調用。 response作為參數提供,並包含已執行的每個操作的單個結果列表。 請注意,一個或多個操作可能已失敗,然而其他操作已成功執行。
    }

    @Override
    public void onFailure(Exception e) {
        //在整個BulkRequest失敗時調用。 在這種情況下,exception作為參數提供,並且沒有執行任何操作。
    }
};

Bulk Response

返回的BulkResponse包含有關已執行操作的信息,並允許迭代每個結果,如下所示:

    //遍歷所有操作結果
        for (BulkItemResponse bulkItemResponse : bulkResponse) {
            //獲取操作的響應,可以是IndexResponse,UpdateResponse或DeleteResponse,
            // 它們都可以被視為DocWriteResponse實例
            DocWriteResponse itemResponse = bulkItemResponse.getResponse();

            if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
                    || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
                //處理index操作
                IndexResponse indexResponse = (IndexResponse) itemResponse;

            } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
                //處理update操作
                UpdateResponse updateResponse = (UpdateResponse) itemResponse;

            } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
                //處理delete操作
                DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
            }
        }

批量響應提供了用於快速檢查一個或多個操作是否失敗的方法:

if (bulkResponse.hasFailures()) { 
    //該方法只要有一個操作失敗都會返回true
}

如果想要查看操作失敗的原因,則需要遍歷所有操作結果:

        for (BulkItemResponse bulkItemResponse : bulkResponse) {
            if (bulkItemResponse.isFailed()) {//判斷當前操作是否失敗
                //獲取失敗對象,拿到了failure對象,想怎么玩都行
                BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
            }
        }

Bulk Processor

BulkProcessor通過提供一個工具類來簡化Bulk API的使用,允許索引/更新/刪除操作在添加到處理器后透明地執行。

為了執行請求,BulkProcessor需要以下組件:

RestHighLevelClient
此客戶端用於執行BulkRequest並獲取BulkResponse
BulkProcessor.Listener
在每次BulkRequest執行之前和之后或BulkRequest失敗時調用此監聽器
然后BulkProcessor.builder方法可用於構建新的BulkProcessor:

        //創建BulkProcessor.Listener
        BulkProcessor.Listener listener1 = new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                //在每次執行BulkRequest之前調用此方法
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request,BulkResponse response) {
                //在每次執行BulkRequest之后調用此方法
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                //執行BulkRequest失敗時調用此方法
            }
        };
     //通過從BulkProcessor.Builder調用build()方法來創建BulkProcessor。
     //RestHighLevelClient.bulkAsync()方法將用來執行BulkRequest。
        BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulkAsync, listener1).build();

BulkProcessor.Builder提供了配置BulkProcessor應如何處理請求執行的方法:

        BulkProcessor.Builder builder = BulkProcessor.builder(client::bulkAsync, listener1);
        //設置何時刷新新的批量請求,根據當前已添加的操作數量(默認為1000,使用-1禁用它)
        builder.setBulkActions(500);//操作數為500時就刷新請求
        //設置何時刷新新的批量請求,根據當前已添加的操作大小(默認為5Mb,使用-1禁用它)
        builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));//操作大小為1M時就刷新請求
        //設置允許執行的並發請求數(默認為1,使用0只允許執行單個請求)
        builder.setConcurrentRequests(0);//不並發執行
        //設置刷新間隔時間,如果超過了間隔時間,則隨便刷新一個BulkRequest掛起(默認為未設置)
        builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
        //設置一個最初等待1秒,最多重試3次的常量退避策略。
        // 有關更多選項,請參閱BackoffPolicy.noBackoff(),BackoffPolicy.constantBackoff()和BackoffPolicy.exponentialBackoff()。
        builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));

創建BulkProcessor后,就可以向其添加請求了:

IndexRequest one = new IndexRequest("posts", "doc", "1").
        source(XContentType.JSON, "title",
                "In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts", "doc", "2")
        .source(XContentType.JSON, "title",
                "Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts", "doc", "3")
        .source(XContentType.JSON, "title",
                "The Future of Federated Search in Elasticsearch");

bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);

這些請求將由BulkProcessor執行,BulkProcessor負責為每個批量請求調用BulkProcessor.Listener。
偵聽器提供訪問BulkRequest和BulkResponse的方法:

       BulkProcessor.Listener listener = new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                //在每次執行BulkRequest之前調用,通過此方法可以獲取將在BulkRequest中執行的操作數
                int numberOfActions = request.numberOfActions();
                logger.debug("Executing bulk [{}] with {} requests",
                        executionId, numberOfActions);
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                //在每次執行BulkRequest后調用,通過此方法可以獲取BulkResponse是否包含錯誤
                if (response.hasFailures()) {
                    logger.warn("Bulk [{}] executed with failures", executionId);
                } else {
                    logger.debug("Bulk [{}] completed in {} milliseconds",
                            executionId, response.getTook().getMillis());
                }
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                //如果BulkRequest失敗,通過調用此方法可以獲取失敗
                logger.error("Failed to execute bulk", failure);
            }
        };

將所有請求添加到BulkProcessor后,需要使用兩種可用的關閉方法之一關閉其實例。

awaitClose()方法可用於等待所有請求都已處理或過了指定的等待時間:

boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS); 

如果所有批量請求都已完成,則該方法返回true;如果在所有批量請求完成之前等待時間已過,則返回false

close()方法可用於立即關閉BulkProcessor:

bulkProcessor.close();

兩種方法在關閉處理器之前會刷新已添加到處理器的請求,並且還會禁止將任何新請求添加到處理器。

 

官方文檔:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-document-bulk.html#_optional_arguments_4


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM