如果有大批量的文檔數據需要插入,這個時候單挑插入操作顯然是不合理的;
之前學習的命令行批量執行方式:
POST /_bulk { "delete": { "_index": "website", "_type": "blog", "_id": "123" }} { "create": { "_index": "website", "_type": "blog", "_id": "123" }} { "title": "My first blog post" } { "index": { "_index": "website", "_type": "blog" }} { "title": "My second blog post" } { "update": { "_index": "website", "_type": "blog", "_id": "123", "_retry_on_conflict" : 3} } { "doc" : {"title" : "My updated blog post"} }
@Test public void BulkInsertDocument() throws IOException { BulkRequestBuilder bulkRequest = client.prepareBulk(); // either use client#prepare, or use Requests# to directly build index/delete requests bulkRequest.add(client.prepareIndex("weibo", "article", "1") .setSource(jsonBuilder() .startObject() .field("user", "張三") .field("postDate", new Date()) .field("message", "近段時間關於馬蓉的消息傳得到處都是,而且我們也發現她這一段時間似乎小動作不斷。對於她近期的所做所為,王寶強方面則顯得冷靜一些,要求法庭二審選擇公開審理。") .endObject() ) ); bulkRequest.add(client.prepareIndex("weibo", "article", "2") .setSource(jsonBuilder() .startObject() .field("user", "王二") .field("postDate", new Date()) .field("message", "新浪2016里約奧運站為您全程報道第31屆里約夏季奧林匹克運動會,金牌榜,賽程賽果,賽事直播,高清圖片,中國隊比賽信息第一時間推送,與奧運人物微博互動") .endObject() ) ); BulkResponse bulkResponse = bulkRequest.get(); if (bulkResponse.hasFailures()) { // process failures by iterating through each bulk response item } }
這種方式也會有問題,比如在批量插入的時候,多大數據量插入一次,多少條插入一次,多少秒插入一次這樣的定時定量的優化都是沒有的,那么接下來讓我們在看一下批量插入的優化操作
2:優化后的批量執行方式
@Test public void BulkInsertDocumen2t() throws Exception { BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { //todo beforeBulk會在批量提交之前執行 public void beforeBulk(long l, org.elasticsearch.action.bulk.BulkRequest bulkRequest) { System.out.println("---嘗試操作" + bulkRequest.numberOfActions() + "條數據---"); } //TODO 第一個afterBulk會在批量成功后執行,可以跟beforeBulk配合計算批量所需時間 public void afterBulk(long l, org.elasticsearch.action.bulk.BulkRequest bulkRequest, BulkResponse bulkResponse) { System.out.println("---嘗試操作" + bulkRequest.numberOfActions() + "條數據成功---"); } //TODO 第二個afterBulk會在批量失敗后執行 public void afterBulk(long l, org.elasticsearch.action.bulk.BulkRequest bulkRequest, Throwable throwable) { System.out.println("---嘗試操作" + bulkRequest.numberOfActions() + "條數據失敗---"); } }) // 1w次請求執行一次bulk .setBulkActions(10000) // 1gb的數據刷新一次bulk .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) // 固定5s必須刷新一次 .setFlushInterval(TimeValue.timeValueSeconds(5)) // 並發請求數量, 0不並發, 1並發允許執行 .setConcurrentRequests(1) // 設置退避, 100ms后執行, 最大請求3次 .setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) .build(); // 添加請求數據 Map<String, Object> m = new HashMap<String, Object>(); m.put("document", "這是異步批量插入測試"); bulkProcessor.add(new IndexRequest("testblog", "test", "1").source(m)); bulkProcessor.add(new IndexRequest("testblog", "test", "2").source(m)); bulkProcessor.flush(); // bulkProcessor.add(new DeleteRequest("testblog", "test", "2")); // 關閉 bulkProcessor.awaitClose(10, TimeUnit.MINUTES); }