徹底搞懂 Elasticsearch Java API


 正文前先來一波福利推薦:

福利一:

百萬年薪架構師視頻,該視頻可以學到很多東西,是本人花錢買的VIP課程,學習消化了一年,為了支持一下女朋友公眾號也方便大家學習,共享給大家。

福利二:

畢業答辯以及工作上各種答辯,平時積累了不少精品PPT,現在共享給大家,大大小小加起來有幾千套,總有適合你的一款,很多是網上是下載不到。

獲取方式:

微信關注 精品3分鍾 ,id為 jingpin3mins,關注后回復   百萬年薪架構師 ,精品收藏PPT  獲取雲盤鏈接,謝謝大家支持!

------------------------正文開始---------------------------

說明

在明確了ES的基本概念和使用方法后,我們來學習如何使用ES的Java API.
本文假設你已經對ES的基本概念已經有了一個比較全面的認識。

客戶端

你可以用Java客戶端做很多事情:

  • 執行標准的index,get,delete,update,search等操作。
  • 在正在運行的集群上執行管理任務。

但是,通過官方文檔可以得知,現在存在至少三種Java客戶端。

  1. Transport Client
  2. Java High Level REST Client
  3. Java Low Level Rest Client

造成這種混亂的原因是:

  • 長久以來,ES並沒有官方的Java客戶端,並且Java自身是可以簡單支持ES的API的,於是就先做成了TransportClient。但是TransportClient的缺點是顯而易見的,它沒有使用RESTful風格的接口,而是二進制的方式傳輸數據。

  • 之后ES官方推出了Java Low Level REST Client,它支持RESTful,用起來也不錯。但是缺點也很明顯,因為TransportClient的使用者把代碼遷移到Low Level REST Client的工作量比較大。官方文檔專門為遷移代碼出了一堆文檔來提供參考。

  • 現在ES官方推出Java High Level REST Client,它是基於Java Low Level REST Client的封裝,並且API接收參數和返回值和TransportClient是一樣的,使得代碼遷移變得容易並且支持了RESTful的風格,兼容了這兩種客戶端的優點。當然缺點是存在的,就是版本的問題。ES的小版本更新非常頻繁,在最理想的情況下,客戶端的版本要和ES的版本一致(至少主版本號一致),次版本號不一致的話,基本操作也許可以,但是新API就不支持了。

  • 強烈建議ES5及其以后的版本使用Java High Level REST Client。筆者這里使用的是ES5.6.3,下面的文章將基於JDK1.8+Spring Boot+ES5.6.3 Java High Level REST Client+Maven進行示例。

stackoverflow上的問答:
https://stackoverflow.com/questions/47031840/elasticsearchhow-to-choose-java-client/47036028#47036028

詳細說明:

https://www.elastic.co/blog/the-elasticsearch-java-high-level-rest-client-is-out

參考資料:

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/java-rest-high.html

Java High Level REST Client 介紹

Java High Level REST Client 是基於Java Low Level REST Client的,每個方法都可以是同步或者異步的。同步方法返回響應對象,而異步方法名以“async”結尾,並需要傳入一個監聽參數,來確保提醒是否有錯誤發生。

Java High Level REST Client需要Java1.8版本和ES。並且ES的版本要和客戶端版本一致。和TransportClient接收的參數和返回值是一樣的。

以下實踐均是基於5.6.3的ES集群和Java High Level REST Client的。

Maven 依賴

<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>5.6.3</version> </dependency> 

初始化

        //Low Level Client init RestClient lowLevelRestClient = RestClient.builder( new HttpHost("localhost", 9200, "http")).build(); //High Level Client init RestHighLevelClient client = new RestHighLevelClient(lowLevelRestClient); 

High Level REST Client的初始化是依賴Low Level客戶端的

Index API

類似HTTP請求,Index API包括index request和index response

Index request的構造

構造一條index request的例子:

IndexRequest request = new IndexRequest( "posts", //index name "doc", // type "1"); // doc id String jsonString = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}"; request.source(jsonString, XContentType.JSON); 

注意到這里是使用的String 類型。
另一種構造的方法:

Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("user", "kimchy"); jsonMap.put("postDate", new Date()); jsonMap.put("message", "trying out Elasticsearch"); IndexRequest indexRequest = new IndexRequest("posts", "doc", "1") .source(jsonMap); //Map會自動轉成JSON 

除了String和Map ,XContentBuilder 類型也是可以的:

XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
    builder.field("user", "kimchy"); builder.field("postDate", new Date()); builder.field("message", "trying out Elasticsearch"); } builder.endObject(); IndexRequest indexRequest = new IndexRequest("posts", "doc", "1") .source(builder); 

更直接一點的,在實例化index request對象時,可以直接給出鍵值對:

IndexRequest indexRequest = new IndexRequest("posts", "doc", "1") .source("user", "kimchy", "postDate", new Date(), "message", "trying out Elasticsearch"); 

index response的獲取

同步執行

IndexResponse indexResponse = client.index(request);

異步執行

client.indexAsync(request, new ActionListener<IndexResponse>() { @Override public void onResponse(IndexResponse indexResponse) { } @Override public void onFailure(Exception e) { } }); 

需要注意的是,異步執行的方法名以Async結尾,並且多了一個Listener參數,並且需要重寫回調方法。
在kibana控制台查詢得到數據:

{
  "_index": "posts", "_type": "doc", "_id": "1", "_version": 1, "found": true, "_source": { "user": "kimchy", "postDate": "2017-11-01T05:48:26.648Z", "message": "trying out Elasticsearch" } } 

index request中的數據已經成功入庫。

index response的返回值操作

client.index()方法返回值類型為IndexResponse,我們可以用它來進行如下操作:

String index = indexResponse.getIndex(); //index名稱,類型等信息 String type = indexResponse.getType(); String id = indexResponse.getId(); long version = indexResponse.getVersion(); if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) { } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) { } ShardInfo shardInfo = indexResponse.getShardInfo(); //對分片使用的判斷 if (shardInfo.getTotal() != shardInfo.getSuccessful()) { } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { String reason = failure.reason(); } } 

對version沖突的判斷:

IndexRequest request = new IndexRequest("posts", "doc", "1") .source("field", "value") .version(1); try { IndexResponse response = client.index(request); } catch(ElasticsearchException e) { if (e.status() == RestStatus.CONFLICT) { } } 

對index動作的判斷:

IndexRequest request = new IndexRequest("posts", "doc", "1") .source("field", "value") .opType(DocWriteRequest.OpType.CREATE);//create or update try { IndexResponse response = client.index(request); } catch(ElasticsearchException e) { if (e.status() == RestStatus.CONFLICT) { } } 

GET API

GET request

GetRequest getRequest = new GetRequest( "posts",//index name "doc", //type "1"); //id 

GET response

同步方法:

GetResponse getResponse = client.get(getRequest);

異步方法:

client.getAsync(request, new ActionListener<GetResponse>() { @Override public void onResponse(GetResponse getResponse) { } @Override public void onFailure(Exception e) { } }); 

對返回對象的操作:

String index = getResponse.getIndex(); String type = getResponse.getType(); String id = getResponse.getId(); if (getResponse.isExists()) { long version = getResponse.getVersion(); String sourceAsString = getResponse.getSourceAsString(); Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); byte[] sourceAsBytes = getResponse.getSourceAsBytes(); } else { //TODO } 

異常處理:

GetRequest request = new GetRequest("does_not_exist", "doc", "1"); try { GetResponse getResponse = client.get(request); } catch (ElasticsearchException e) { if (e.status() == RestStatus.NOT_FOUND) { } if (e.status() == RestStatus.CONFLICT) { } } 

DELETE API

與Index API和 GET API及其相似

DELETE request

DeleteRequest request = new DeleteRequest( "posts", "doc", "1"); 

DELETE response

同步:

DeleteResponse deleteResponse = client.delete(request); 

異步:

client.deleteAsync(request, new ActionListener<DeleteResponse>() { @Override public void onResponse(DeleteResponse deleteResponse) { } @Override public void onFailure(Exception e) { } }); 

Update API

update request

UpdateRequest updateRequest = new UpdateRequest( "posts", "doc", "1"); 

update腳本:
在之前我們介紹了如何使用簡單的腳本來更新數據

POST /posts/doc/1/_update?pretty
{
  "script" : "ctx._source.age += 5" } 

也可以寫成:

POST /posts/doc/1/_update?pretty
{
  "script" : { "lang":"painless", "source":"ctx._source.age += 5" } } 

對應代碼:

        UpdateRequest updateRequest = new UpdateRequest("posts", "doc", "1"); Map<String, Object> parameters = new HashMap<>(); parameters.put("age", 4); Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.age += params.age", parameters); updateRequest.script(inline); try { UpdateResponse updateResponse = client.update(updateRequest); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } 

使用部分文檔更新

  1. String
        String jsonString = "{" + "\"updated\":\"2017-01-02\"," + "\"reason\":\"easy update\"" + "}"; updateRequest.doc(jsonString, XContentType.JSON); try { client.update(updateRequest); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } 

2.Map

        Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("updated", new Date()); jsonMap.put("reason", "dailys update"); UpdateRequest updateRequest = new UpdateRequest("posts", "doc", "1").doc(jsonMap); try { client.update(updateRequest); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } 

3.XContentBuilder

    try { XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); { builder.field("updated", new Date()); System.out.println(new Date()); builder.field("reason", "daily update"); } builder.endObject(); UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc(builder); client.update(request); } catch (IOException e) { // TODO: handle exception } 

4.鍵值對

    try { UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc("updated", new Date(), "reason", "daily updatesss"); client.update(request); } catch (IOException e) { // TODO: handle exception } 

upsert

如果文檔不存在,可以使用upsert來生成這個文檔。

String jsonString = "{\"created\":\"2017-01-01\"}"; request.upsert(jsonString, XContentType.JSON); 

同樣地,upsert可以接Map,Xcontent,鍵值對參數。

update response

同樣地,update response可以是同步的,也可以是異步的

同步執行:

UpdateResponse updateResponse = client.update(request);

異步執行:

   client.updateAsync(request, new ActionListener<UpdateResponse>() { @Override public void onResponse(UpdateResponse updateResponse) { } @Override public void onFailure(Exception e) { } }); 

與其他response類似,update response返回對象可以進行各種判斷操作,這里不再贅述。

Bulk API

Bulk request

之前的文檔說明過,bulk接口是批量index/update/delete操作
在API中,只需要一個bulk request就可以完成一批請求。

BulkRequest request = new BulkRequest(); request.add(new IndexRequest("posts", "doc", "1") .source(XContentType.JSON,"field", "foo")); request.add(new IndexRequest("posts", "doc", "2") .source(XContentType.JSON,"field", "bar")); request.add(new IndexRequest("posts", "doc", "3") .source(XContentType.JSON,"field", "baz")); 
  • 注意,Bulk API只接受JSON和SMILE格式.其他格式的數據將會報錯。
  • 不同類型的request可以寫在同一個bulk request里。
BulkRequest request = new BulkRequest(); request.add(new DeleteRequest("posts", "doc", "3")); request.add(new UpdateRequest("posts", "doc", "2") .doc(XContentType.JSON,"other", "test")); request.add(new IndexRequest("posts", "doc", "4") .source(XContentType.JSON,"field", "baz")); 

bulk response

同步執行:

BulkResponse bulkResponse = client.bulk(request);

異步執行:

client.bulkAsync(request, new ActionListener<BulkResponse>() { @Override public void onResponse(BulkResponse bulkResponse) { } @Override public void onFailure(Exception e) { } }); 

對response的處理與其他類型的response十分類似,在這不再贅述。

bulk processor

BulkProcessor 簡化bulk API的使用,並且使整個批量操作透明化。
BulkProcessor 的執行需要三部分組成:

  1. RestHighLevelClient :執行bulk請求並拿到響應對象。
  2. BulkProcessor.Listener:在執行bulk request之前、之后和當bulk response發生錯誤時調用。
  3. ThreadPool:bulk request在這個線程池中執行操作,這使得每個請求不會被擋住,在其他請求正在執行時,也可以接收新的請求。

示例代碼:

        Settings settings = Settings.EMPTY; 
        ThreadPool threadPool = new ThreadPool(settings); //構建新的線程池 BulkProcessor.Listener listener = new BulkProcessor.Listener() { //構建bulk listener @Override public void beforeBulk(long executionId, BulkRequest request) { //重寫beforeBulk,在每次bulk request發出前執行,在這個方法里面可以知道在本次批量操作中有多少操作數 int numberOfActions = request.numberOfActions(); logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { //重寫afterBulk方法,每次批量請求結束后執行,可以在這里知道是否有錯誤發生。 if (response.hasFailures()) { logger.warn("Bulk [{}] executed with failures", executionId); } else { logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis()); } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { //重寫方法,如果發生錯誤就會調用。 logger.error("Failed to execute bulk", failure); } }; BulkProcessor.Builder builder = new BulkProcessor.Builder(client::bulkAsync, listener, threadPool);//使用builder做批量操作的控制 BulkProcessor bulkProcessor = builder.build(); //在這里調用build()方法構造bulkProcessor,在底層實際上是用了bulk的異步操作 builder.setBulkActions(500); //執行多少次動作后刷新bulk.默認1000,-1禁用 builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));//執行的動作大小超過多少時,刷新bulk。默認5M,-1禁用 builder.setConcurrentRequests(0);//最多允許多少請求同時執行。默認是1,0是只允許一個。 builder.setFlushInterval(TimeValue.timeValueSeconds(10L));//設置刷新bulk的時間間隔。默認是不刷新的。 builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); //設置補償機制參數。由於資源限制(比如線程池滿),批量操作可能會失敗,在這定義批量操作的重試次數。 //新建三個 index 請求 IndexRequest one = new IndexRequest("posts", "doc", "1"). source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?"); IndexRequest two = new IndexRequest("posts", "doc", "2") .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch"); IndexRequest three = new IndexRequest("posts", "doc", "3") .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch"); //新的三條index請求加入到上面配置好的bulkProcessor里面。 bulkProcessor.add(one); bulkProcessor.add(two); bulkProcessor.add(three); // add many request here. //bulkProcess必須被關閉才能使上面添加的操作生效 bulkProcessor.close(); //立即關閉 //關閉bulkProcess的兩種方法: try { //2.調用awaitClose. //簡單來說,就是在規定的時間內,是否所有批量操作完成。全部完成,返回true,未完成返//回false boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } 

Search API

Search request

Search API提供了對文檔的查詢和聚合的查詢。
它的基本形式:

SearchRequest searchRequest = new SearchRequest(); //構造search request .在這里無參,查詢全部索引 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();//大多數查詢參數要寫在searchSourceBuilder里 searchSourceBuilder.query(QueryBuilders.matchAllQuery());//增加match_all的條件。 
SearchRequest searchRequest = new SearchRequest("posts"); //指定posts索引 searchRequest.types("doc"); //指定doc類型 

使用SearchSourceBuilder

大多數的查詢控制都可以使用SearchSourceBuilder實現。
舉一個簡單例子:

SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); //構造一個默認配置的對象 sourceBuilder.query(QueryBuilders.termQuery("user", "kimchy")); //設置查詢 sourceBuilder.from(0); //設置從哪里開始 sourceBuilder.size(5); //每頁5條 sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); //設置超時時間 

配置好searchSourceBuilder后,將它傳入searchRequest里:

SearchRequest searchRequest = new SearchRequest(); searchRequest.source(sourceBuilder); 

建立查詢

在上面的例子,我們注意到,sourceBuilder構造查詢條件時,使用QueryBuilders對象.
在所有ES查詢中,它存在於所有ES支持的查詢類型中。
使用它的構造體來創建:

MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("user", "kimchy"); 

這里的代碼相當於:

 "query": { "match": { "user": "kimchy" } } 

相關設置:

matchQueryBuilder.fuzziness(Fuzziness.AUTO);  //是否模糊查詢 matchQueryBuilder.prefixLength(3); //設置前綴長度 matchQueryBuilder.maxExpansions(10);//設置最大膨脹系數 ??? 

QueryBuilder還可以使用 QueryBuilders工具類來創造,編程體驗比較順暢:

QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("user", "kimchy") .fuzziness(Fuzziness.AUTO) .prefixLength(3) .maxExpansions(10); 

無論QueryBuilder對象是如何創建的,都要將它傳入SearchSourceBuilder里面:

searchSourceBuilder.query(matchQueryBuilder); 

在之前導入的account數據中,使用match的示例代碼:

GET /bank/_search?pretty
{
  "query": { "match": { "firstname": "Virginia" } } } 

JAVA:

    @Test public void test2(){ RestClient lowLevelRestClient = RestClient.builder( new HttpHost("172.16.73.50", 9200, "http")).build(); RestHighLevelClient client = new RestHighLevelClient(lowLevelRestClient); SearchRequest searchRequest = new SearchRequest("bank"); searchRequest.types("account"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); MatchQueryBuilder mqb = QueryBuilders.matchQuery("firstname", "Virginia"); searchSourceBuilder.query(mqb); searchRequest.source(searchSourceBuilder); try { SearchResponse searchResponse = client.search(searchRequest); System.out.println(searchResponse.toString()); } catch (IOException e) { e.printStackTrace(); } } 

排序

SearchSourceBuilder可以添加一種或多種SortBuilder。
有四種特殊的排序實現:

  • field
  • score
  • GeoDistance
  • scriptSortBuilder
sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC)); //按照score倒序排列 sourceBuilder.sort(new FieldSortBuilder("_uid").order(SortOrder.ASC)); //並且按照id正序排列 

過濾

默認情況下,searchRequest返回文檔內容,與REST API一樣,這里你可以重寫search行為。例如,你可以完全關閉"_source"檢索。

sourceBuilder.fetchSource(false); 

該方法還接受一個或多個通配符模式的數組,以更細粒度地控制包含或排除哪些字段。

String[] includeFields = new String[] {"title", "user", "innerObject.*"}; String[] excludeFields = new String[] {"_type"}; sourceBuilder.fetchSource(includeFields, excludeFields); 

聚合請求

通過配置適當的 AggregationBuilder ,再將它傳入SearchSourceBuilder里,就可以完成聚合請求了。
之前的文檔里面,我們通過下面這條命令,導入了一千條account信息:

curl -H "Content-Type: application/json" -XPOST 'localhost:9200/bank/account/_bulk?pretty&refresh' --data-binary "@accounts.json" 

隨后,我們介紹了如何通過聚合請求進行分組:

GET /bank/_search?pretty
{
  "size": 0, "aggs": { "group_by_state": { "terms": { "field": "state.keyword" } } } } 

我們將這一千條數據根據state字段分組,得到響應:

{
  "took": 2, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 999, "max_score": 0, "hits": [] }, "aggregations": { "group_by_state": { "doc_count_error_upper_bound": 20, "sum_other_doc_count": 770, "buckets": [ { "key": "ID", "doc_count": 27 }, { "key": "TX", "doc_count": 27 }, { "key": "AL", "doc_count": 25 }, { "key": "MD", "doc_count": 25 }, { "key": "TN", "doc_count": 23 }, { "key": "MA", "doc_count": 21 }, { "key": "NC", "doc_count": 21 }, { "key": "ND", "doc_count": 21 }, { "key": "MO", "doc_count": 20 }, { "key": "AK", "doc_count": 19 } ] } } } 

Java實現:

    @Test public void test2(){ RestClient lowLevelRestClient = RestClient.builder( new HttpHost("172.16.73.50", 9200, "http")).build(); RestHighLevelClient client = new RestHighLevelClient(lowLevelRestClient); SearchRequest searchRequest = new SearchRequest("bank"); searchRequest.types("account"); TermsAggregationBuilder aggregation = AggregationBuilders.terms("group_by_state") .field("state.keyword"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.aggregation(aggregation); searchSourceBuilder.size(0); searchRequest.source(searchSourceBuilder); try { SearchResponse searchResponse = client.search(searchRequest); System.out.println(searchResponse.toString()); } catch (IOException e) { e.printStackTrace(); } } 

輸出:

{"took":4,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":999,"max_score":0.0,"hits":[]},"aggregations":{"sterms#group_by_state":{"doc_count_error_upper_bound":20,"sum_other_doc_count":770,"buckets":[{"key":"ID","doc_count":27},{"key":"TX","doc_count":27},{"key":"AL","doc_count":25},{"key":"MD","doc_count":25},{"key":"TN","doc_count":23},{"key":"MA","doc_count":21},{"key":"NC","doc_count":21},{"key":"ND","doc_count":21},{"key":"MO","doc_count":20},{"key":"AK","doc_count":19}]}}} 

同步執行

SearchResponse searchResponse = client.search(searchRequest);

異步執行

client.searchAsync(searchRequest, new ActionListener<SearchResponse>() { @Override public void onResponse(SearchResponse searchResponse) { } @Override public void onFailure(Exception e) { } }); 

Search response

Search response返回對象與其在API里的一樣,返回一些元數據和文檔數據。
首先,返回對象里的數據十分重要,因為這是查詢的返回結果、使用分片情況、文檔數據,HTTP狀態碼等

RestStatus status = searchResponse.status();
TimeValue took = searchResponse.getTook();
Boolean terminatedEarly = searchResponse.isTerminatedEarly();
boolean timedOut = searchResponse.isTimedOut(); 

其次,返回對象里面包含關於分片的信息和分片失敗的處理:

int totalShards = searchResponse.getTotalShards(); int successfulShards = searchResponse.getSuccessfulShards(); int failedShards = searchResponse.getFailedShards(); for (ShardSearchFailure failure : searchResponse.getShardFailures()) { // failures should be handled here } 

取回searchHit

為了取回文檔數據,我們要從search response的返回對象里先得到searchHit對象。

SearchHits hits = searchResponse.getHits();

取回文檔數據:

    @Test public void test2(){ RestClient lowLevelRestClient = RestClient.builder( new HttpHost("172.16.73.50", 9200, "http")).build(); RestHighLevelClient client = new RestHighLevelClient(lowLevelRestClient); SearchRequest searchRequest = new SearchRequest("bank"); searchRequest.types("account"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchRequest.source(searchSourceBuilder); try { SearchResponse searchResponse = client.search(searchRequest); SearchHits searchHits = searchResponse.getHits(); SearchHit[] searchHit = searchHits.getHits(); for (SearchHit hit : searchHit) { System.out.println(hit.getSourceAsString()); } } catch (IOException e) { e.printStackTrace(); } } 

根據需要,還可以轉換成其他數據類型:

String sourceAsString = hit.getSourceAsString(); Map<String, Object> sourceAsMap = hit.getSourceAsMap(); String documentTitle = (String) sourceAsMap.get("title"); List<Object> users = (List<Object>) sourceAsMap.get("user"); Map<String, Object> innerObject = (Map<String, Object>) sourceAsMap.get("innerObject"); 

取回聚合數據

聚合數據可以通過SearchResponse返回對象,取到它的根節點,然后再根據名稱取到聚合數據。

GET /bank/_search?pretty
{
  "size": 0, "aggs": { "group_by_state": { "terms": { "field": "state.keyword" } } } } 

響應:

{
  "took": 2, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 999, "max_score": 0, "hits": [] }, "aggregations": { "group_by_state": { "doc_count_error_upper_bound": 20, "sum_other_doc_count": 770, "buckets": [ { "key": "ID", "doc_count": 27 }, { "key": "TX", "doc_count": 27 }, { "key": "AL", "doc_count": 25 }, { "key": "MD", "doc_count": 25 }, { "key": "TN", "doc_count": 23 }, { "key": "MA", "doc_count": 21 }, { "key": "NC", "doc_count": 21 }, { "key": "ND", "doc_count": 21 }, { "key": "MO", "doc_count": 20 }, { "key": "AK", "doc_count": 19 } ] } } } 

Java實現:

    @Test public void test2(){ RestClient lowLevelRestClient = RestClient.builder( new HttpHost("172.16.73.50", 9200, "http")).build(); RestHighLevelClient client = new RestHighLevelClient(lowLevelRestClient); SearchRequest searchRequest = new SearchRequest("bank"); searchRequest.types("account"); TermsAggregationBuilder aggregation = AggregationBuilders.terms("group_by_state") .field("state.keyword"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.aggregation(aggregation); searchSourceBuilder.size(0); searchRequest.source(searchSourceBuilder); try { SearchResponse searchResponse = client.search(searchRequest); Aggregations aggs = searchResponse.getAggregations(); Terms byStateAggs = aggs.get("group_by_state"); Terms.Bucket b = byStateAggs.getBucketByKey("ID"); //只取key是ID的bucket System.out.println(b.getKeyAsString()+","+b.getDocCount()); System.out.println("!!!"); List<? extends Bucket> aggList = byStateAggs.getBuckets();//獲取bucket數組里所有數據 for (Bucket bucket : aggList) { System.out.println("key:"+bucket.getKeyAsString()+",docCount:"+bucket.getDocCount());; } } catch (IOException e) { e.printStackTrace(); } } 

Search Scroll API

search scroll API是用於處理search request里面的大量數據的。

  • 使用ES做分頁查詢有兩種方法。一是配置search request的from,size參數。二是使用scroll API。搜索結果建議使用scroll API,查詢效率高。

為了使用scroll,按照下面給出的步驟執行:

初始化search scroll上下文

帶有scroll參數的search請求必須被執行,來初始化scroll session。ES能檢測到scroll參數的存在,保證搜索上下文在相應的時間間隔里存活

SearchRequest searchRequest = new SearchRequest("account"); //從 account 索引中查詢 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(matchQuery("first", "Virginia")); //match條件 searchSourceBuilder.size(size); //一次取回多少數據 searchRequest.source(searchSourceBuilder); searchRequest.scroll(TimeValue.timeValueMinutes(1L));//設置scroll間隔 SearchResponse searchResponse = client.search(searchRequest); String scrollId = searchResponse.getScrollId(); //取回這條響應的scroll id,在后續的scroll調用中會用到 SearchHit[] hits = searchResponse.getHits().getHits();//得到文檔數組 

取回所有相關文檔

第二步,得到的scroll id 和新的scroll間隔要設置到 SearchScrollRequest里,再調用searchScroll方法。
ES會返回一批帶有新scroll id的查詢結果。以此類推,新的scroll id可以用於子查詢,來得到另一批新數據。這個過程應該在一個循環內,直到沒有數據返回為止,這意味着scroll消耗殆盡,所有匹配上的數據都已經取回。

SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); //傳入scroll id並設置間隔。 scrollRequest.scroll(TimeValue.timeValueSeconds(30)); SearchResponse searchScrollResponse = client.searchScroll(scrollRequest);//執行scroll搜索 scrollId = searchScrollResponse.getScrollId(); //得到本次scroll id hits = searchScrollResponse.getHits(); 

清理 scroll 上下文

使用Clear scroll API來檢測到最后一個scroll id 來釋放scroll上下文.雖然在scroll過期時,這個清理行為會最終自動觸發,但是最好的實踐是當scroll session結束時,馬上釋放它。

可選參數

scrollRequest.scroll(TimeValue.timeValueSeconds(60L)); //設置60S的scroll存活時間 scrollRequest.scroll("60s"); //字符串參數 

如果在scrollRequest不設置的話,會以searchRequest.scroll()設置的為准。

同步執行

SearchResponse searchResponse = client.searchScroll(scrollRequest);

異步執行

client.searchScrollAsync(scrollRequest, new ActionListener<SearchResponse>() { @Override public void onResponse(SearchResponse searchResponse) { } @Override public void onFailure(Exception e) { } }); 
  • 需要注意的是,search scroll API的請求響應返回值也是一個searchResponse對象。

完整示例

    @Test public void test3(){ RestClient lowLevelRestClient = RestClient.builder( new HttpHost("172.16.73.50", 9200, "http")).build(); RestHighLevelClient client = new RestHighLevelClient(lowLevelRestClient); SearchRequest searchRequest = new SearchRequest("bank"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); MatchAllQueryBuilder mqb = QueryBuilders.matchAllQuery(); searchSourceBuilder.query(mqb); searchSourceBuilder.size(10); searchRequest.source(searchSourceBuilder); searchRequest.scroll(TimeValue.timeValueMinutes(1L)); try { SearchResponse searchResponse = client.search(searchRequest); String scrollId = searchResponse.getScrollId(); SearchHit[] hits = searchResponse.getHits().getHits(); System.out.println("first scroll:"); for (SearchHit searchHit : hits) { System.out.println(searchHit.getSourceAsString()); } Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L)); System.out.println("loop scroll:"); while(hits != null && hits.length>0){ SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); scrollRequest.scroll(scroll); searchResponse = client.searchScroll(scrollRequest); scrollId = searchResponse.getScrollId(); hits = searchResponse.getHits().getHits(); for (SearchHit searchHit : hits) { System.out.println(searchHit.getSourceAsString()); } } ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); clearScrollRequest.addScrollId(scrollId); ClearScrollResponse clearScrollResponse = client.clearScroll(clearScrollRequest); boolean succeeded = clearScrollResponse.isSucceeded(); System.out.println("cleared:"+succeeded); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } 

Info API

Info API 提供一些關於集群、節點相關的信息查詢。

request

MainResponse response = client.info();

response

ClusterName clusterName = response.getClusterName(); 
String clusterUuid = response.getClusterUuid(); String nodeName = response.getNodeName(); Version version = response.getVersion(); Build build = response.getBuild(); 
    @Test public void test4(){ RestClient lowLevelRestClient = RestClient.builder( new HttpHost("172.16.73.50", 9200, "http")).build(); RestHighLevelClient client = new RestHighLevelClient(lowLevelRestClient); try { MainResponse response = client.info(); ClusterName clusterName = response.getClusterName(); String clusterUuid = response.getClusterUuid(); String nodeName = response.getNodeName(); Version version = response.getVersion(); Build build = response.getBuild(); System.out.println("cluster name:"+clusterName); System.out.println("cluster uuid:"+clusterUuid); System.out.println("node name:"+nodeName); System.out.println("node version:"+version); System.out.println("node name:"+nodeName); System.out.println("build info:"+build); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } 

總結

關於Elasticsearch 的 Java High Level REST Client API的基本用法大概就是這些,一些進階技巧、概念要隨時查閱官方文檔。

地址:

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/java-rest-high.html



作者:epicGeek
鏈接:https://www.jianshu.com/p/5cb91ed22956
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權並注明出處。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM