ES transport client底層是netty實現,netty本質上是異步方式,但是netty自身可以使用sync或者await(future超時機制)來實現類似同步調用!因此,ES transport client可以同步調用也可以異步(不過底層的socket必然是異步實現)


ES transport client底層是netty實現,netty本質上是異步方式,但是netty自身可以使用sync或者await(future超時機制)來實現類似同步調用!

因此,ES transport client可以同步調用也可以異步(不過底層的socket必然是異步實現)。

發送端例子

對於java client的數據發送(這里以bulk為例),寫過的人都知道,其實是很簡單的,因為大部分事情都已經被client做掉了,那么我們先給出例子感知一下:

client初始化

Settings settings = Settings.settingsBuilder() .put("cluster.name", "myClusterName") .put("client.transport.sniff", true).build(); client=new TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress("host1",9300)) .addTransportAddress(new InetSocketTransportAddress("host2",9300)); 

bulk數據發送

對於數據的發送ES提供了兩種方式:

第一種bulk api:

import static org.elasticsearch.common.xcontent.XContentFactory.*; BulkRequestBuilder bulkRequest = client.prepareBulk(); // either use client#prepare, or use Requests# to directly build index/delete requests bulkRequest.add(client.prepareIndex("twitter", "tweet", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) ); bulkRequest.add(client.prepareIndex("twitter", "tweet", "2") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "another post") .endObject() ) ); BulkResponse bulkResponse = bulkRequest.get(); if (bulkResponse.hasFailures()) { // process failures by iterating through each bulk response item } 

可以看到這種方式是由client端自己添加數據,然后調用BulkResponse bulkResponse = bulkRequest.get();來完成數據的發送。

第二種叫做Bulk Processor:

import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; BulkProcessor bulkProcessor = BulkProcessor.builder( client, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { ... } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { ... } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { ... } }) .setBulkActions(10000) .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) .setFlushInterval(TimeValue.timeValueSeconds(5)) .setConcurrentRequests(1) .build(); 

初始化bulk processor之后,客戶端只需要往bulkProcessor添加數據即可bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));,你可以先配置好bulk的size、interval等,其他的事情就交給processor自己去做吧。

兩種方式各有利弊,第一種要自己控制bulk size和interval,但是有利於對發送失敗的處理;而第二種簡單易用,用戶只管add數據就好,但是對於使用回調函數來處理異常會不那么方便,如何選擇就看使用場景的了。

 

部分內容摘自:http://www.opscoder.info/es_javaclient.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM