Elasticsearch的Bulk API允許批量提交index和delete請求,有如下兩種用法:
用法1
BulkRequestBuilder requestBuilder = client.prepareBulk();
for(Person person : personList){
String obj = getIndexDataFromHotspotData(person);
if(obj != null){
requestBuilder.add(client.prepareIndex("test_index","test",String.valueOf(person.getId())).setRefresh(true).setSource(obj));
}
}
用法2
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
}
}).setBulkActions(10000).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).setFlushInterval(TimeValue.timeValueSeconds(5)).build();
for(Person person : personList){
String obj = getIndexDataFromHotspotData(person);
if(obj != null){
bulkProcessor.add(new IndexRequest("test_index","test",String.valueOf(person.getId())).source(obj));
}
}
- beforeBulk會在批量提交之前執行,可以從BulkRequest中獲取請求信息request.requests()或者請求數量request.numberOfActions()
- 第一個afterBulk會在批量成功后執行,可以跟beforeBulk配合計算批量所需時間
- 第二個afterBulk會在批量失敗后執行
- 在例子中,當請求超過10000個(default=1000)或者總大小超過1GB(default=5MB)時,觸發批量提交動作