ElasticSearch中bulkProcesser使用


初次接觸es,可能對增刪改查很熟悉,以為能為得心應手,本次應用場景為 數據庫變更一條記錄,會觸發更新es中的數據,每秒並發大概30條左右,測試環境一切工作正常(數據量較少),上線后發現日志中很多類似於下面的錯誤:

隊列滿了

org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution (queue capacity 200) on org.elasticsearch.search.action.SearchServiceTransportAction$23@5f804c60
    at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:62)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
    at org.elasticsearch.search.action.SearchServiceTransportAction.execute(SearchServiceTransportAction.java:509)
    at org.elasticsearch.search.action.SearchServiceTransportAction.sendExecuteScan(SearchServiceTransportAction.java:441)
    at org.elasticsearch.action.search.type.TransportSearchScanAction$AsyncAction.sendExecuteFirstPhase(TransportSearchScanAction.java:68)
    at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.performFirstPhase(TransportSearchTypeAction.java:171)
    at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.start(TransportSearchTypeAction.java:153)
    at org.elasticsearch.action.search.type.TransportSearchScanAction.doExecute(TransportSearchScanAction.java:52)
    at org.elasticsearch.action.search.type.TransportSearchScanAction.doExecute(TransportSearchScanAction.java:42)
  ...

更新版本錯誤:

Caused by: org.elasticsearch.index.engine.VersionConflictEngineException: [kpi][4] [opportunity][1442415600000]: version conflict, current [5933], provided [5932]
        at org.elasticsearch.index.engine.internal.InternalEngine.innerIndex(InternalEngine.java:582) [elasticsearch-1.4.4.jar:]
        at org.elasticsearch.index.engine.internal.InternalEngine.index(InternalEngine.java:522) [elasticsearch-1.4.4.jar:]
        at org.elasticsearch.index.shard.service.InternalIndexShard.index(InternalIndexShard.java:425) [elasticsearch-1.4.4.jar:]
        at org.elasticsearch.action.index.TransportIndexAction.shardOperationOnPrimary(TransportIndexAction.java:193) [elasticsearch-1.4.4.jar:]

經過高手指點,從單次的實時操作改為批量操作,這樣做的好處有,減少網路開銷,從消息大小,時間,消息數量三個維度來衡量 批量操作的維度,如果數據不是要求非常實時的操作(非常實時的存儲應該也不會選擇es),改為批量操作后,錯誤均修復,大概配置如下。

 private BulkProcessor bulkProcessor;

    @PostConstruct
    public void init() {
        this.bulkProcessor = BulkProcessor.builder(
                esTransportMainClient.getClient(),
                new BulkProcessor.Listener() {

                    @Override
                    public void beforeBulk(long executionId, BulkRequest request) {
                        logger.info("---嘗試插入{}條數據---", request.numberOfActions());
                    }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request, BulkResponse response) {
                        logger.info("---嘗試插入{}條數據成功---", request.numberOfActions());
                    }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request, Throwable failure) {
                        logger.error("[es錯誤]---嘗試插入數據失敗---", failure);
                    }

                })
                .setBulkActions(1000)
                .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                .setConcurrentRequests(2)
                .build();
    }

我這里全局保持一個bulkProcesser就可以維持正常業務了。

每次使用的方法:

bulkProcessor.add(updateRequestBuilder.request());

此processer的含義為如果消息數量到達1000 或者消息大小到大5M 或者時間達到5s 任意條件滿足,客戶端就會把當前的數據提交到服務端處理。效率很高。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM