es之java插入優化(批量插入)


插入文檔操作的一種優化,因為每次插入單條文檔,都會向es中發送請求。然后es執行在返回結果;

如果有大批量的文檔數據需要插入,這個時候單挑插入操作顯然是不合理的;

之前學習的命令行批量執行方式:

POST /_bulk
{ "delete": { "_index": "website", "_type": "blog", "_id": "123" }} 
{ "create": { "_index": "website", "_type": "blog", "_id": "123" }}
{ "title":    "My first blog post" }
{ "index":  { "_index": "website", "_type": "blog" }}
{ "title":    "My second blog post" }
{ "update": { "_index": "website", "_type": "blog", "_id": "123", "_retry_on_conflict" : 3} }
{ "doc" : {"title" : "My updated blog post"} }

1:普通的批量插入方式

 @Test
    public void BulkInsertDocument() throws IOException {
        BulkRequestBuilder bulkRequest = client.prepareBulk();

// either use client#prepare, or use Requests# to directly build index/delete requests
        bulkRequest.add(client.prepareIndex("weibo", "article", "1")
                .setSource(jsonBuilder()
                        .startObject()
                        .field("user", "張三")
                        .field("postDate", new Date())
                        .field("message", "近段時間關於馬蓉的消息傳得到處都是,而且我們也發現她這一段時間似乎小動作不斷。對於她近期的所做所為,王寶強方面則顯得冷靜一些,要求法庭二審選擇公開審理。")
                        .endObject()
                )
        );

        bulkRequest.add(client.prepareIndex("weibo", "article", "2")
                .setSource(jsonBuilder()
                        .startObject()
                        .field("user", "王二")
                        .field("postDate", new Date())
                        .field("message", "新浪2016里約奧運站為您全程報道第31屆里約夏季奧林匹克運動會,金牌榜,賽程賽果,賽事直播,高清圖片,中國隊比賽信息第一時間推送,與奧運人物微博互動")
                        .endObject()
                )
        );

        BulkResponse bulkResponse = bulkRequest.get();
        if (bulkResponse.hasFailures()) {
            // process failures by iterating through each bulk response item
        }
    }

這種方式也會有問題,比如在批量插入的時候,多大數據量插入一次,多少條插入一次,多少秒插入一次這樣的定時定量的優化都是沒有的,那么接下來讓我們在看一下批量插入的優化操作

2:優化后的批量執行方式

@Test
    public void BulkInsertDocumen2t() throws Exception {
        BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
            //todo beforeBulk會在批量提交之前執行
            public void beforeBulk(long l, org.elasticsearch.action.bulk.BulkRequest bulkRequest) {
                System.out.println("---嘗試操作" + bulkRequest.numberOfActions() + "條數據---");
            }
            //TODO 第一個afterBulk會在批量成功后執行,可以跟beforeBulk配合計算批量所需時間
            public void afterBulk(long l, org.elasticsearch.action.bulk.BulkRequest bulkRequest, BulkResponse bulkResponse) {
                System.out.println("---嘗試操作" + bulkRequest.numberOfActions() + "條數據成功---");
            }
            //TODO 第二個afterBulk會在批量失敗后執行
            public void afterBulk(long l, org.elasticsearch.action.bulk.BulkRequest bulkRequest, Throwable throwable) {
                System.out.println("---嘗試操作" + bulkRequest.numberOfActions() + "條數據失敗---");
            }

        })
                // 1w次請求執行一次bulk
                .setBulkActions(10000)
                // 1gb的數據刷新一次bulk
                .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
                // 固定5s必須刷新一次
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                // 並發請求數量, 0不並發, 1並發允許執行
                .setConcurrentRequests(1)
                // 設置退避, 100ms后執行, 最大請求3次
                .setBackoffPolicy(
                        BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                .build();

        // 添加請求數據
        Map<String, Object> m = new HashMap<String, Object>();
        m.put("document", "這是異步批量插入測試");
        bulkProcessor.add(new IndexRequest("testblog", "test", "1").source(m));
        bulkProcessor.add(new IndexRequest("testblog", "test", "2").source(m));
bulkProcessor.flush();
//        bulkProcessor.add(new DeleteRequest("testblog", "test", "2"));

        // 關閉
        bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM