@Autowired
private ConfigVo configVo;
@Autowired
private EsClient client;
private static BulkProcessor bulkProcessor;
@PostConstruct
public void init() {
//異步批量請求
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
(request, bulkListener) ->
client.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
bulkProcessor = BulkProcessor.builder(bulkConsumer,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
logger.info("序號:{} ,開始執行 {} 條數據批量操作。", executionId, request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
// 在每次執行BulkRequest后調用,通過此方法可以獲取BulkResponse是否包含錯誤
if (response.hasFailures()) {
logger.error("Bulk {} executed with failures", executionId);
} else {
logger.info("序號:{} ,執行 {} 條數據批量操作成功,共耗費{}毫秒。", executionId, request.numberOfActions(), response.getTook().getMillis());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("序號:{} 批量操作失敗,總記錄數:{} ,報錯信息為:{}", executionId, request.numberOfActions(), failure.getMessage());
}
})
//每添加n個request,執行一次bulk操作
.setBulkActions(Integer.valueOf(configVo.getBulkActions()).intValue())
//每達到一定大小的請求時,執行一次bulk操作
.setBulkSize(new ByteSizeValue(Integer.valueOf(configVo.getBulkSizeMb()).intValue(), ByteSizeUnit.MB))
//每n秒執行一次bulk操作
.setFlushInterval(TimeValue.timeValueSeconds(Integer.valueOf(configVo.getFlushIntervalSeconds()).intValue()))
//設置並發請求數,默認是1,表示允許執行1個並發請求,積累bulk requests和發送bulk是異步的
.setConcurrentRequests(Integer.valueOf(configVo.getConcurrentRes()).intValue())
// .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
}