人肉翻譯,非谷歌機翻,部分地方添加了個人的理解,並做了分割,如有錯誤請在評論指出。轉載請指明原鏈接,尊重個人勞動成果。
High-Level-Rest-Client基於Low-Level-Rest-Client封裝,Client配置方面基於Low-Level,而API使用上基於High-Level。
翻譯的版本為6.5.4版本的Elasticsearch,部分不重要的內容(如Maven/Gradle坐標、License等不在本文出現)。
在實際的配置過程中,查看源碼發現LowLevelClient完全基於HttpAsyncClient來實現的,內部具體組裝Client的細節,如果有時間會寫另外一篇博文分享出來。
低級別rest-client包含了如下特性:
- 最小的依賴
- 在集群所有結點間負載均衡
- 結點失敗后,根據具體的響應碼(response code)進行失效轉移
- 連接失敗懲罰機制(是否重連一個失敗的結點取決於它失敗的次數;嘗試重新連接失敗的次數越多,客戶端下一次嘗試重新連接該結點的等待時間也越長)
- 持久化連接
- 請求和響應的日志追蹤
- (可選功能)集群結點自動發現
1、初始化客戶端
1.1.基本設置
RestClient實例只需要通過RestClientBuilder類進行構建就行,如下所示,可以提供多個連接實例的HttpHost(每一個HttpHost都是一個結點Node):
1 RestClient restClient = RestClient.builder( 2 new HttpHost("localhost", 9200, "http"), 3 new HttpHost("localhost", 9201, "http")).build();
RestClient是線程安全的,並且和它依附的應用有相同的生命周期。需要注意的是,當不需要Client的時候,需要關閉它以便正確的釋放其持有的資源(這些資源有底層的http client實例與它的線程):
1 restClient.close();
1.2.參數化設置
RestClientBuilder同樣允許參數化的設置如下參數來用於構建RestClient實例:
- 設置默認的請求頭,該請求頭會應用到每個request上去。不允許單獨的在每次請求時進行請求頭設置。
1 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http")); 2 Header[] defaultHeaders = new Header[]{new BasicHeader("header", "value")}; 3 builder.setDefaultHeaders(defaultHeaders);
- 設置同一個請求的多次嘗試時的最大超時時間,這個設置默認的時間是30秒(同socket timeout的默認超時時間一樣)。如果你修改了socket timeout,你也應該修改這個值來適應socket timeout。
1 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http")); 2 builder.setMaxRetryTimeoutMillis(10000);
- 設置監聽器,當一個結點故障時收到通知,以采取措施。嗅探失敗節點功能會隱式的使用一個這樣的監聽器。
1 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http")); 2 builder.setFailureListener(new RestClient.FailureListener() { 3 @Override 4 public void onFailure(Node node) { 5 6 } 7 });
- 設置結點選擇器,用於過濾某些es的結點(這些結點在初始化時設置到Client中)。當結點嗅探開啟時,如果你不想將請求發送至dedicated master結點上,該功能很好用。Client默認會將請求發送到所有配置的節點上。
1 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http")); 2 builder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS);
- 通過一個回調函數修改默認的請求配置(如超時時間、驗證信息等其他apache httpclient包中RequestConfig.Builder允許的配置)。
1 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http")); 2 builder.setRequestConfigCallback( 3 new RestClientBuilder.RequestConfigCallback() { 4 @Override 5 public RequestConfig.Builder customizeRequestConfig( 6 RequestConfig.Builder requestConfigBuilder) { 7 return requestConfigBuilder.setSocketTimeout(10000); 8 } 9 });
- 通過一個回調函數修改默認的HttpClient配置(如ssl配置等其他apache httpclient包中HttpAsyncClientBuilder允許的配置)。
1 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http")); 2 builder.setHttpClientConfigCallback(new HttpClientConfigCallback() { 3 @Override 4 public HttpAsyncClientBuilder customizeHttpClient( 5 HttpAsyncClientBuilder httpClientBuilder) { 6 return httpClientBuilder.setProxy( 7 new HttpHost("proxy", 9000, "http")); 8 } 9 });
2、執行請求
2.1.同步與異步API
一旦Client創建,就可以通過performRequest或performRequestAsync來執行請求:
(1)performRequest是同步請求,它會阻塞調用線程,在請求成功或者請求失敗拋出異常之后,返回Response;
1 Request request = new Request( 2 "GET", //HTTP方法,支持GET、POST、HEAD等 3 "/"); //es服務端點endpoint 4 Response response = restClient.performRequest(request);
(2)performRequestAsync是異步請求,它接收一個ResponseListener參數,當請求成功或者請求失敗拋出異常之后,該Listener會被調起並傳入Response或Exception到相應方法;
1 Request request = new Request( 2 "GET", //HTTP方法,支持GET、POST、HEAD等 3 "/"); //es服務端點endpoint 4 restClient.performRequestAsync(request, new ResponseListener() { 5 @Override 6 public void onSuccess(Response response) { 7 //成功回調 8 } 9 10 @Override 11 public void onFailure(Exception exception) { 12 //失敗回調 13 } 14 });
另外,你可以向Request對象中添加請求參數,如下:
1 request.addParameter("pretty", "true");
如果將HttpEntity設置為String,那么ContentType會被默認設置為application/json:
2.2.請求可選項
RequestOptions這個類用於設置請求可選項,我們建議在同一個應用中,對於使用同樣的請求可選項的Request共享一個RequestOption實例,你可以通過單例來實現:
1 private static final RequestOptions COMMON_OPTIONS; 2 static { 3 RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); 4 builder.addHeader("Authorization", "Bearer " + TOKEN); //設置請求頭的校驗信息 5 builder.setHttpAsyncResponseConsumerFactory( //設置Response的緩沖池大小 6 new HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024)); 7 COMMON_OPTIONS = builder.build(); 8 }
- 這里有一個addHeader方法,需要注意的是,在這里不需要設置ContentType,因為在設置HttpEntity時就會自動關聯ContentType;
- 你可以在這里設置NodeSelector。NodeSelector.NOT_MASTER_ONLY是一個比較好的選擇;
- 你可以在這里定制用於緩沖異步響應的Response Consumer。默認的ResponseConsumer會在JVM堆上緩存100M的響應數據。如果響應數據比緩沖區過大,當次請求會失敗。如果你的應用跑在一個JVM堆比較小的環境中,你需要降低緩沖區的最大值。
配置好RequestOptions之后,我們在每一個共享該配置的Request中,簡單設置一下該對象即可:
1 request.setOptions(COMMON_OPTIONS);
如果有某些請求需要對OPTIONS做一定程度的變更,可以通過如下方式增量處理而不是重新寫一遍設置代碼:
1 RequestOptions.Builder options = COMMON_OPTIONS.toBuilder(); 2 options.addHeader("cats", "knock things off of other things"); 3 request.setOptions(options);
2.3.批量並行異步請求
下面是一個使用閉鎖並行執行異步請求的例子。在實際的編程中,你一般會傾向於使用_bulk這個API來替代,當然了,下面的例子依然具有參考性:
1 final CountDownLatch latch = new CountDownLatch(documents.length); 2 for (int i = 0; i < documents.length; i++) { 3 Request request = new Request("PUT", "/posts/doc/" + i); 4 //let's assume that the documents are stored in an HttpEntity array 5 request.setEntity(documents[i]); 6 restClient.performRequestAsync( 7 request, 8 new ResponseListener() { 9 @Override 10 public void onSuccess(Response response) { 11 12 latch.countDown(); 13 } 14 15 @Override 16 public void onFailure(Exception exception) { 17 18 latch.countDown(); 19 } 20 } 21 ); 22 } 23 latch.await();
3、讀取響應
3.1.獲取響應對象
響應對象,無論是通過同步還是異步獲取到的(同步通過API返回值獲取,異步通過回調方法中的參數獲取),都是同一個類的對象。該對象包裝了http client原始的響應對象以及一些附加信息:
1 Response response = restClient.performRequest(new Request("GET", "/")); 2 RequestLine requestLine = response.getRequestLine(); 3 HttpHost host = response.getHost(); 4 int statusCode = response.getStatusLine().getStatusCode(); 5 Header[] headers = response.getHeaders(); 6 String responseBody = EntityUtils.toString(response.getEntity());
如果執行請求失敗的話,你將會得到一個異常,異常有以下幾種:
(1)IOException:通訊問題,例如SocketTimeoutException;
(2)ResponseException:響應已經返回,但是其狀態碼不為2XX時將拋出這個異常。ResponseException源自於一個有效的http Response,因此該異常會將源Response暴露出來,並允許你獲取它。
注意:HEAD請求如果返回了404,ResponseException是不會被拋出的,因為對於HEAD來說資源未找到也是該方法期望的返回值之一。所有其他的HTTP方法對於404返回碼都會拋出一個ResponseException,除非在ignore參數中包含了404。ignore是一個特殊的參數,該參數不會發送給es,它包含了一個分割的error status code列表。當Response的狀態碼在這個列表中時,它會被當成Response而不是作為Exception。對於GET方法相關的API,這是一個相當有用的特性,比如GET查詢一個文檔返回404,我們期望其不返回一個異常,而是一個沒有document的response對象。
注意:low-level-client並不會給出json相關的編組與解組API。你可以自由選擇相關的庫來進行json的編組與解組。
底層的Apache Async Http Client庫附帶了不同的org.apache.http.HttpEntity實現,這些實現允許以不同的格式(流,字節數組,字符串等)提供請求主體。至於讀取響應體,HttpEntity#getContent方法很方便,它返回從先前緩沖的響應體讀取的InputStream。作為替代方案,可以提供一個自定義的org.apache.http.nio.protocol.HttpAsyncResponseConsumer來控制字節的讀取和緩沖方式。
4、通用配置
在初始化客戶端中,我們看到了RestClientBuilder支持提供一個RequestConfigCallback與一個能夠用於個性化異步請求的HttpClientConfigCallback。這些回調函數能夠讓你在不覆蓋每一個RestClient初始化默認值的情況下修改某些指定行為。本小節我們描述一些需要為low-level-client添加額外配置的公共場景。
4.1.Timeouts
在初始化客戶端的參數化設置就提過了,可以通過如下形式進行設置:
1 RestClientBuilder builder = RestClient.builder( 2 new HttpHost("localhost", 9200)) 3 .setRequestConfigCallback( 4 new RestClientBuilder.RequestConfigCallback() { 5 @Override 6 public RequestConfig.Builder customizeRequestConfig( 7 RequestConfig.Builder requestConfigBuilder) { 8 return requestConfigBuilder 9 .setConnectTimeout(5000) //默認值1S 10 .setSocketTimeout(60000); //默認值30S 11 } 12 }) 13 .setMaxRetryTimeoutMillis(60000); //設置完SocketTimeout后,一般需要聯動修改這個值
4.2.線程數
ApacheHttpAsyncClient基於IOReactor模式,在啟動時會創建一個分發線程和一組工作線程,它們由Connection Manager使用,工作線程的默認數量與能夠檢測到的CPU核心數一致(Runtime.getRuntime().availableProcessors()的返回結果)。可以通過如下方法修改工作線程數:
1 RestClientBuilder builder = RestClient.builder( 2 new HttpHost("localhost", 9200)) 3 .setHttpClientConfigCallback(new HttpClientConfigCallback() { 4 @Override 5 public HttpAsyncClientBuilder customizeHttpClient( 6 HttpAsyncClientBuilder httpClientBuilder) { 7 return httpClientBuilder.setDefaultIOReactorConfig( 8 IOReactorConfig.custom() 9 .setIoThreadCount(1) 10 .build()); 11 } 12 });
4.3.身份驗證
使用如下的方式設置身份驗證相關的請求參數:
1 final CredentialsProvider credentialsProvider = 2 new BasicCredentialsProvider(); 3 credentialsProvider.setCredentials(AuthScope.ANY, 4 new UsernamePasswordCredentials("user", "password")); 5 6 RestClientBuilder builder = RestClient.builder( 7 new HttpHost("localhost", 9200)) 8 .setHttpClientConfigCallback(new HttpClientConfigCallback() { 9 @Override 10 public HttpAsyncClientBuilder customizeHttpClient( 11 HttpAsyncClientBuilder httpClientBuilder) { 12 return httpClientBuilder 13 .setDefaultCredentialsProvider(credentialsProvider); 14 } 15 });
搶占式(這里翻譯不太確定,結合上下文,應該指的是在第一次請求就發送身份驗證信息)身份驗證可以被禁用,這意味着每個HTTP請求都會在沒有身份驗證請求頭的情況下發送,以查看它是否被接受,並且在收到HTTP 401響應后,它將使用含有身份驗證請求頭的報文重新發送完全相同的請求。 如果您希望這樣做,那么您可以通過HttpAsyncClientBuilder禁用它:
1 final CredentialsProvider credentialsProvider = 2 new BasicCredentialsProvider(); 3 credentialsProvider.setCredentials(AuthScope.ANY, 4 new UsernamePasswordCredentials("user", "password")); 5 6 RestClientBuilder builder = RestClient.builder( 7 new HttpHost("localhost", 9200)) 8 .setHttpClientConfigCallback(new HttpClientConfigCallback() { 9 @Override 10 public HttpAsyncClientBuilder customizeHttpClient( 11 HttpAsyncClientBuilder httpClientBuilder) { 12 httpClientBuilder.disableAuthCaching(); /* 禁用搶占式身份驗證 */ 13 return httpClientBuilder 14 .setDefaultCredentialsProvider(credentialsProvider); 15 } 16 });
4.4.SSL通信
加密通信同樣可以通過HttpClientConfigCallback進行配置。HttpAsyncClientBuilder通過3個暴露的API:setSSLContext、setSSLSessionStrategy、setConnectionManager進行加密通信的設置。下面是一個配置示例(如果沒有顯式配置,則會使用系統的默認配置):
1 KeyStore truststore = KeyStore.getInstance("jks"); 2 try (InputStream is = Files.newInputStream(keyStorePath)) { 3 truststore.load(is, keyStorePass.toCharArray()); 4 } 5 SSLContextBuilder sslBuilder = SSLContexts.custom() 6 .loadTrustMaterial(truststore, null); 7 final SSLContext sslContext = sslBuilder.build(); 8 RestClientBuilder builder = RestClient.builder( 9 new HttpHost("localhost", 9200, "https")) 10 .setHttpClientConfigCallback(new HttpClientConfigCallback() { 11 @Override 12 public HttpAsyncClientBuilder customizeHttpClient( 13 HttpAsyncClientBuilder httpClientBuilder) { 14 return httpClientBuilder.setSSLContext(sslContext); 15 } 16 });
4.5.節點選擇器
客戶端默認情況下會以輪詢的形式將請求發送到每一個初始化時配置的節點上。在初始化時,我們可以提供一個節點選擇器NodeSelector。當嗅探開啟時這個功能會很好用,以防只有HTTP請求才能訪問專用主節點。 對於每個請求,客戶端將運行最終配置的節點選擇器以過濾候選節點,然后從列表中選擇下一個節點選擇器。下面是一個基於機架進行節點選擇的例子:
1 RestClientBuilder builder = RestClient.builder( 2 new HttpHost("localhost", 9200, "http")); 3 builder.setNodeSelector(new NodeSelector() { 4 @Override 5 public void select(Iterable<Node> nodes) { 6 /* 7 * Prefer any node that belongs to rack_one. If none is around 8 * we will go to another rack till it's time to try and revive 9 * some of the nodes that belong to rack_one. 10 */ 11 boolean foundOne = false; 12 for (Node node : nodes) { 13 String rackId = node.getAttributes().get("rack_id").get(0); 14 if ("rack_one".equals(rackId)) { 15 foundOne = true; 16 break; 17 } 18 } 19 if (foundOne) { 20 Iterator<Node> nodesIt = nodes.iterator(); 21 while (nodesIt.hasNext()) { 22 Node node = nodesIt.next(); 23 String rackId = node.getAttributes().get("rack_id").get(0); 24 if ("rack_one".equals(rackId) == false) { 25 nodesIt.remove(); 26 } 27 } 28 } 29 } 30 });
上面的例子中,優先選擇一號機架上的節點。當一號機架沒有任何節點時,才會選擇其他機架節點。
需要注意的是,如果一個NodeSelector無法提供一個始終如一的(或者說穩定的)節點列表,那么輪詢發送請求至不同node的行為將會變得不可預測或失效。在上面的例子中,輪詢行為會很好的工作,因為這個例子中的代碼可以很好的(或比較穩定的)獲取到哪些結點可用(這些獲取到的可用結點能夠影響輪詢行為的可預測性)。因此,這也提醒我們NodeSelector不應該依賴於任何外部因素,否則輪詢行為將會變的不可預測。
5、嗅探Sniffer
允許從一個運行的ES集群中自動發現結點,並且將發現的結點設置入一個已存在的RestClient。默認的,它使用節點信息api檢索屬於集群的節點,並使用jackson解析獲得的json響應。
要使用該功能,必須引入依賴,該依賴版本同rest-client版本一樣,跟着es的發布版本走的:
1 <dependency> 2 <groupId>org.elasticsearch.client</groupId> 3 <artifactId>elasticsearch-rest-client-sniffer</artifactId> 4 <version>6.5.4</version> 5 </dependency>
5.1.基本設置
在初始化的RestClient的時候將一個Sniffer與其進行關聯。Sniffer嗅探器會為RestClient提供一個周期性的拉取es所有結點列表的功能(默認5分鍾拉取一次),拉取到節點列表后,會調用RestClient.setNodes的API將其更新入RestClient。
1 RestClient restClient = RestClient.builder( 2 new HttpHost("localhost", 9200, "http")) 3 .build(); 4 Sniffer sniffer = Sniffer.builder(restClient).build();
Sniffer應該具有同RestClient一樣的生命周期,也就是說,在關閉RestClient時,也需要關閉Sniffer以釋放其占用的資源:
1 sniffer.close(); 2 restClient.close();
5.2.定制刷新間隔
上面我們提到了,Sniffer默認5分鍾刷新一次,我們可以在初始化Sniffer的時候改變這個時間(單為:毫秒):
1 RestClient restClient = RestClient.builder( 2 new HttpHost("localhost", 9200, "http")) 3 .build(); 4 Sniffer sniffer = Sniffer.builder(restClient) 5 .setSniffIntervalMillis(60000).build(); //設置Sniffer刷新間隔
另外,我們可以在請求失敗上啟用Sniffer,即在每次請求節點失敗后,可以讓Sniffer執行一次節點更新(區別於Sniffer固定間隔的刷新)。我們需要為RestClient提供一個SnifferOnFailureListener,並在Client初始化的時候設置這個監聽器,同時在創建Sniffer之后也需要將其設置給這個監聽器。這樣一來,每次失敗之后,監聽器監聽到失敗就會委托Sniffer去執行節點刷新:
1 SniffOnFailureListener sniffOnFailureListener = 2 new SniffOnFailureListener(); //創建Sniffer失敗監聽器 3 RestClient restClient = RestClient.builder( 4 new HttpHost("localhost", 9200)) 5 .setFailureListener(sniffOnFailureListener) //為RestClient設置監聽器 6 .build(); 7 Sniffer sniffer = Sniffer.builder(restClient) 8 .setSniffAfterFailureDelayMillis(30000) //設置失敗之后的額外調度延遲 9 .build(); 10 sniffOnFailureListener.setSniffer(sniffer); //為監聽器設置Sniffer
- 關於setSniffAfterFailureDelayMillis:當我們在失敗上設置了Sniffer監聽后,每次失敗不僅會通過監聽器勾起一次Sniffer的節點刷新行為,一次額外的Sniffer節點刷新行為也會比平常的間隔調度更快的到來(這是Sniffer的默認特征,且默認時間為1分鍾)。假設失敗的節點能夠很快的恢復,我們的應用需要盡快知道這個情況,那么我們可以通過setSniffAfterFailureDelayMillis來調整這個額外Sniffer行為的延遲時間(這是什么意思呢?舉個例子,當請求失敗后,通過監聽器觸發的Sniffer行為獲取到的節點列表可能是剔除了有問題節點的列表,如果此時問題節點未恢復的話;可能這些問題節點在一陣子后自行恢復了,但是Sniffer還沒有到下次觸發時間,因此我們需要調整一下這個失敗后Sniffer額外調度的延遲時間,縮短該時間以盡快拉取到恢復后的節點列表)。
5.3.設置節點通信協議
Sniffer的節點列表刷新行為使用的是NodeInfo API,這個API只能返回節點的IP和PORT信息,對於連接節點使用到的協議無法返回,因此默認認為節點通信使用的是HTTP協議。在使用HTTPS協議的情況下,我們需要手工的設置一個ElasticsearchNodesSniffer實例用來告知Sniffer節點通信的協議是HTTPS:
通過同樣的方式,我們可以設置SnifferRequestTimeout這個參數的值(默認1S)。當調用NodeInfoAPI時,這個超時參數會作為查詢字符串中的參數,它告訴ES服務器查詢節點列表的超時時間。因此當服務器側出現超時的時候仍然會返回一個有效的響應結果(服務器側查詢節點列表的超時,不代表Sniffer請求自身的超時),這個響應結果可能只包含節點列表的一個子集,即在超時時間內能獲取到的節點列表數據:
1 RestClient restClient = RestClient.builder( 2 new HttpHost("localhost", 9200, "http")) 3 .build(); 4 NodesSniffer nodesSniffer = new ElasticsearchNodesSniffer( 5 restClient, 6 TimeUnit.SECONDS.toMillis(5), 7 ElasticsearchNodesSniffer.Scheme.HTTP); 8 Sniffer sniffer = Sniffer.builder(restClient) 9 .setNodesSniffer(nodesSniffer).build();
5.4.自定義NodesSniffer
有時候,我們需要自定義一個NodesSniffer完成一些個性化的特殊功能,比如從一個內部資源中獲取節點列表(如文件)而不是從ES服務器集群中獲取節點列表:
1 RestClient restClient = RestClient.builder( 2 new HttpHost("localhost", 9200, "http")) 3 .build(); 4 NodesSniffer nodesSniffer = new NodesSniffer() { 5 @Override 6 public List<Node> sniff() throws IOException { 7 return null; 8 } 9 }; 10 Sniffer sniffer = Sniffer.builder(restClient) 11 .setNodesSniffer(nodesSniffer).build();