使用Java High Level REST Client操作elasticsearch


說明

在明確了ES的基本概念和使用方法后,我們來學習如何使用ES的Java API.
本文假設你已經對ES的基本概念已經有了一個比較全面的認識。

客戶端

你可以用Java客戶端做很多事情:

  • 執行標准的index,get,delete,update,search等操作。
  • 在正在運行的集群上執行管理任務。

但是,通過官方文檔可以得知,現在存在至少三種Java客戶端。

  1. Transport Client
  2. Java High Level REST Client
  3. Java Low Level Rest Client

造成這種混亂的原因是:

  • 長久以來,ES並沒有官方的Java客戶端,並且Java自身是可以簡單支持ES的API的,於是就先做成了TransportClient。但是TransportClient的缺點是顯而易見的,它沒有使用RESTful風格的接口,而是二進制的方式傳輸數據。

  • 之后ES官方推出了Java Low Level REST Client,它支持RESTful,用起來也不錯。但是缺點也很明顯,因為TransportClient的使用者把代碼遷移到Low Level REST Client的工作量比較大。官方文檔專門為遷移代碼出了一堆文檔來提供參考。

  • 現在ES官方推出Java High Level REST Client,它是基於Java Low Level REST Client的封裝,並且API接收參數和返回值和TransportClient是一樣的,使得代碼遷移變得容易並且支持了RESTful的風格,兼容了這兩種客戶端的優點。當然缺點是存在的,就是版本的問題。ES的小版本更新非常頻繁,在最理想的情況下,客戶端的版本要和ES的版本一致(至少主版本號一致),次版本號不一致的話,基本操作也許可以,但是新API就不支持了。

  • 強烈建議ES5及其以后的版本使用Java High Level REST Client。筆者這里使用的是ES5.6.3,下面的文章將基於JDK1.8+Spring Boot+ES5.6.3 Java High Level REST Client+Maven進行示例。

前置條件:

  • JDK1.8
  • elasticsearch 6.3.2(其他版本未做測試,不保證完全兼容)
  • maven 
  • spring boot
  • 1.maven依賴:
        <!--elasticsearch base-->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>6.3.2</version>
        </dependency>
        <!-- Java Low Level REST Client -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>6.3.2</version>
        </dependency>
        <!-- Java High Level REST Client -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>6.3.2</version>
        </dependency>
  • 2.接入rest-higl-level-client
 1 import org.apache.http.HttpHost;
 2 import org.apache.http.auth.AuthScope;
 3 import org.apache.http.auth.UsernamePasswordCredentials;
 4 import org.apache.http.client.CredentialsProvider;
 5 import org.apache.http.impl.client.BasicCredentialsProvider;
 6 import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
 7 import org.elasticsearch.client.RestClient;
 8 import org.elasticsearch.client.RestClientBuilder;
 9 import org.elasticsearch.client.RestHighLevelClient;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12 import org.springframework.beans.factory.DisposableBean;
13 import org.springframework.beans.factory.FactoryBean;
14 import org.springframework.beans.factory.InitializingBean;
15 import org.springframework.beans.factory.annotation.Value;
16 import org.springframework.context.annotation.Configuration;
17 
18 @Configuration
19 public class ElasticsearchConfiguration implements FactoryBean<RestHighLevelClient>, InitializingBean, DisposableBean {
20     private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfiguration.class);
21 
22     @Value("${spring.data.elasticsearch.host}")
23     private String host;
24     @Value("${spring.data.elasticsearch.port}")
25     private int port;
26     @Value("${spring.data.elasticsearch.username}")
27     private String username;
28     @Value("${spring.data.elasticsearch.password}")
29     private String password;
30 
31     private RestHighLevelClient restHighLevelClient;
32 
33     @Override
34     public void destroy() throws Exception {
35         try {
36             LOGGER.info("Closing elasticSearch client");
37             if (restHighLevelClient != null) {
38                 restHighLevelClient.close();
39             }
40         } catch (final Exception e) {
41             LOGGER.error("Error closing ElasticSearch client: ", e);
42         }
43     }
44 
45     @Override
46     public RestHighLevelClient getObject() throws Exception {
47         return restHighLevelClient;
48     }
49 
50     @Override
51     public Class<RestHighLevelClient> getObjectType() {
52         return RestHighLevelClient.class;
53     }
54 
55     @Override
56     public boolean isSingleton() {
57         return false;
58     }
59 
60     @Override
61     public void afterPropertiesSet() throws Exception {
62         buildClient();
63     }
64 
65     protected void buildClient() {
66         final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
67         credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
68         RestClientBuilder builder = RestClient.builder(new HttpHost(host, port))
69                 .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
70                     @Override
71                     public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
72                         return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
73                     }
74                 });
75 
76         restHighLevelClient = new RestHighLevelClient(builder);
77     }
78 
79 }
  • 3.index api
1 Map<String, Object> jsonMap = new HashMap<>();
2 jsonMap.put("user", "laimailai");
3 jsonMap.put("postDate", new Date());
4 jsonMap.put("message", "trying out Elasticsearch");
5 IndexRequest indexRequest = new IndexRequest("index", "type", "1")
6         .source(jsonMap);
7 IndexResponse indexResponse = client.index(request);
  • 4.get api
1 GetRequest getRequest = new GetRequest(
2         "index",
3         "type",
4         "1");
5 GetResponse getResponse = client.get(request);
  • 5.update api
1 UpdateRequest request = new UpdateRequest(
2         "index",
3         "type",
4         "1");
5 UpdateResponse updateResponse = client.update(request);
  • 6.delete api
1 DeleteRequest request = new DeleteRequest(
2         "index",    
3         "type",     
4         "1");
  • 7.bulk api

之前的文檔說明過,bulk接口是批量index/update/delete操作
在API中,只需要一個bulk request就可以完成一批請求。

 1 //1.bulk
 2 BulkRequest request = new BulkRequest();
 3 request.add(new IndexRequest("index", "type", "1")
 4         .source(XContentType.JSON, "field", "foo"));
 5 request.add(new IndexRequest("index", "type", "2")
 6         .source(XContentType.JSON, "field", "bar"));
 7 request.add(new IndexRequest("index", "type", "3")
 8         .source(XContentType.JSON, "field", "baz"));
 9 
10 //同步
11 BulkResponse bulkResponse = client.bulk(request);
12 
13 //異步
14 client.bulkAsync(request, new ActionListener<BulkResponse>() {
15     @Override
16     public void onResponse(BulkResponse bulkResponse) {
17 
18     }
19 
20     @Override
21     public void onFailure(Exception e) {
22 
23     }
24 });
  • 8.bulkprocessor 划重點!!!

BulkProcessor 簡化bulk API的使用,並且使整個批量操作透明化。
BulkProcessor 的執行需要三部分組成:

  1. RestHighLevelClient :執行bulk請求並拿到響應對象。
  2. BulkProcessor.Listener:在執行bulk request之前、之后和當bulk response發生錯誤時調用。
  3. ThreadPool:bulk request在這個線程池中執行操作,這使得每個請求不會被擋住,在其他請求正在執行時,也可以接收新的請求。

示例代碼:

 1 @Service
 2 public class ElasticSearchUtil {
 3     private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchUtil.class);
 4 
 5     @Autowired
 6     private RestHighLevelClient restHighLevelClient;
 7 
 8     private BulkProcessor bulkProcessor;
 9 
10     @PostConstruct
11     public void init() {
12         BulkProcessor.Listener listener = new BulkProcessor.Listener() {
13             @Override
14             public void beforeBulk(long executionId, BulkRequest request) {
15                 //重寫beforeBulk,在每次bulk request發出前執行,在這個方法里面可以知道在本次批量操作中有多少操作數
16                 int numberOfActions = request.numberOfActions();
17                 LOGGER.info("Executing bulk [{}] with {} requests", executionId, numberOfActions);
18             }
19 
20             @Override
21             public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
22                 //重寫afterBulk方法,每次批量請求結束后執行,可以在這里知道是否有錯誤發生。
23                 if (response.hasFailures()) {
24                     LOGGER.error("Bulk [{}] executed with failures,response = {}", executionId, response.buildFailureMessage());
25                 } else {
26                     LOGGER.info("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
27                 }
28                 BulkItemResponse[] responses = response.getItems();
29             }
30 
31             @Override
32             public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
33                 //重寫方法,如果發生錯誤就會調用。
34                 LOGGER.error("Failed to execute bulk", failure);
35             }
36         };
37 
38         //在這里調用build()方法構造bulkProcessor,在底層實際上是用了bulk的異步操作
39         BulkProcessor bulkProcessor = BulkProcessor.builder(restHighLevelClient::bulkAsync, listener)
40                 // 1000條數據請求執行一次bulk
41                 .setBulkActions(1000)
42                 // 5mb的數據刷新一次bulk
43                 .setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB))
44                 // 並發請求數量, 0不並發, 1並發允許執行
45                 .setConcurrentRequests(0)
46                 // 固定1s必須刷新一次
47                 .setFlushInterval(TimeValue.timeValueSeconds(1L))
48                 // 重試5次,間隔1s
49                 .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 5))
50                 .build();
51         this.bulkProcessor = bulkProcessor;
52     }
53 
54     @PreDestroy
55     public void destroy() {
56         try {
57             bulkProcessor.awaitClose(30, TimeUnit.SECONDS);
58         } catch (InterruptedException e) {
59             LOGGER.error("Failed to close bulkProcessor", e);
60         }
61         LOGGER.info("bulkProcessor closed!");
62     }
63 
64     /**
65      * 修改
66      *
67      * @param request
68      * @throws IOException
69      */
70     public void update(UpdateRequest request) {
71         this.bulkProcessor.add(request);
72     }
73 
74     /**
75      * 新增
76      *
77      * @param request
78      */
79     public void insert(IndexRequest request) {
80         this.bulkProcessor.add(request);
81     }
82 }

bulkProcessor使用用例:

 1         //新建三個 index 請求
 2         IndexRequest one = new IndexRequest("posts", "doc", "1").
 3                 source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?");
 4         IndexRequest two = new IndexRequest("posts", "doc", "2")
 5                 .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch");
 6         IndexRequest three = new IndexRequest("posts", "doc", "3")
 7                 .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch");
 8         //新的三條index請求加入到上面配置好的bulkProcessor里面。
 9         bulkProcessor.add(one);
10         bulkProcessor.add(two);
11         bulkProcessor.add(three);
12         // add many request here.
13         //bulkProcess必須被關閉才能使上面添加的操作生效
14         bulkProcessor.close(); //立即關閉
15         //關閉bulkProcess的兩種方法:
16         try {
17             //2.調用awaitClose.
18             //簡單來說,就是在規定的時間內,是否所有批量操作完成。全部完成,返回true,未完成返//回false
19             
20             boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
21             
22         } catch (InterruptedException e) {
23             // TODO Auto-generated catch block
24             e.printStackTrace();
25         }
  • 9.upsert api

update --當id不存在時將會拋出異常:

1 UpdateRequest request = new UpdateRequest(index, type, "1").doc(jsonMap);
2 UpdateResponse response = restHighLevelClient.update(request);

upsert--id不存在時就插入:

1 UpdateRequest request = new UpdateRequest(index, type, "1").doc(jsonMap).upsert(jsonMap);
2 UpdateResponse response = restHighLevelClient.update(request);
  • 10.search api

Search API提供了對文檔的查詢和聚合的查詢。
它的基本形式:

1 SearchRequest searchRequest = new SearchRequest();  //構造search request .在這里無參,查詢全部索引
2 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();//大多數查詢參數要寫在searchSourceBuilder里 
3 searchSourceBuilder.query(QueryBuilders.matchAllQuery());//增加match_all的條件。
1 SearchRequest searchRequest = new SearchRequest("posts"); //指定posts索引
2 searchRequest.types("doc"); //指定doc類型

使用SearchSourceBuilder

大多數的查詢控制都可以使用SearchSourceBuilder實現。
舉一個簡單例子:

1 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); //構造一個默認配置的對象
2 sourceBuilder.query(QueryBuilders.termQuery("user", "kimchy")); //設置查詢
3 sourceBuilder.from(0); //設置從哪里開始
4 sourceBuilder.size(5); //每頁5條
5 sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); //設置超時時間

配置好searchSourceBuilder后,將它傳入searchRequest里:

1 SearchRequest searchRequest = new SearchRequest();
2 searchRequest.source(sourceBuilder);
1 //全量搜索
2 SearchRequest searchRequest = new SearchRequest();
3 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
4 searchSourceBuilder.query(QueryBuilders.matchAllQuery());
5 searchRequest.source(searchSourceBuilder);
6 SearchRequest searchRequest = new SearchRequest("index");
 1 //根據多個條件搜索
 2 BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
 3 for (String id: ids) {
 4     TermQueryBuilder termQueryBuilder = new TermQueryBuilder("id", id);
 5     boolQueryBuilder.should(termQueryBuilder);
 6 }
 7 SearchRequest searchRequest = new SearchRequest(index);
 8 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
 9 searchSourceBuilder.query(boolQueryBuilder);
10 searchRequest.source(searchSourceBuilder);
11 SearchResponse response = null;
12     response = restHighLevelClient.search(searchRequest);
13 return response;
  • 11.search scroll api
 1 //scroll 分頁搜索
 2 final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
 3 SearchRequest searchRequest = new SearchRequest("posts");
 4 searchRequest.scroll(scroll);
 5 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
 6 searchSourceBuilder.query(matchQuery("title", "Elasticsearch"));
 7 searchRequest.source(searchSourceBuilder);
 8 
 9 SearchResponse searchResponse = client.search(searchRequest);
10 String scrollId = searchResponse.getScrollId();
11 SearchHit[] searchHits = searchResponse.getHits().getHits();
12 
13 while (searchHits != null && searchHits.length > 0) {
14     SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
15     scrollRequest.scroll(scroll);
16     searchResponse = client.searchScroll(scrollRequest);
17     scrollId = searchResponse.getScrollId();
18     searchHits = searchResponse.getHits().getHits();
19 
20 }
21 
22 ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
23 clearScrollRequest.addScrollId(scrollId);
24 ClearScrollResponse clearScrollResponse = client.clearScroll(clearScrollRequest);
25 boolean succeeded = clearScrollResponse.isSucceeded();
  • 12.排序

 

SearchSourceBuilder可以添加一種或多種SortBuilder。
有四種特殊的排序實現:

    • field
    • score
    • GeoDistance
    • scriptSortBuilder
1 sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC)); //按照score倒序排列
2 sourceBuilder.sort(new FieldSortBuilder("_uid").order(SortOrder.ASC));  //並且按照id正序排列
  • 13.過濾

默認情況下,searchRequest返回文檔內容,與REST API一樣,這里你可以重寫search行為。例如,你可以完全關閉"_source"檢索。

1 sourceBuilder.fetchSource(false);

該方法還接受一個或多個通配符模式的數組,以更細粒度地控制包含或排除哪些字段。

1 String[] includeFields = new String[] {"title", "user", "innerObject.*"};
2 String[] excludeFields = new String[] {"_type"};
3 sourceBuilder.fetchSource(includeFields, excludeFields);
  • 14.聚合

通過配置適當的 AggregationBuilder ,再將它傳入SearchSourceBuilder里,就可以完成聚合請求了。
之前的文檔里面,我們通過下面這條命令,導入了一千條account信息:

curl -H "Content-Type: application/json" -XPOST 'localhost:9200/bank/account/_bulk?pretty&refresh' --data-binary "@accounts.json"

隨后,我們介紹了如何通過聚合請求進行分組:

GET /bank/_search?pretty
{
  "size": 0,
  "aggs": {
    "group_by_state": {
      "terms": {
        "field": "state.keyword"
      }
    }
  }
}

我們將這一千條數據根據state字段分組,得到響應:

{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 999,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "group_by_state": {
      "doc_count_error_upper_bound": 20,
      "sum_other_doc_count": 770,
      "buckets": [
        {
          "key": "ID",
          "doc_count": 27
        },
        {
          "key": "TX",
          "doc_count": 27
        },
        {
          "key": "AL",
          "doc_count": 25
        },
        {
          "key": "MD",
          "doc_count": 25
        },
        {
          "key": "TN",
          "doc_count": 23
        },
        {
          "key": "MA",
          "doc_count": 21
        },
        {
          "key": "NC",
          "doc_count": 21
        },
        {
          "key": "ND",
          "doc_count": 21
        },
        {
          "key": "MO",
          "doc_count": 20
        },
        {
          "key": "AK",
          "doc_count": 19
        }
      ]
    }
  }
}

Java實現:

 1    @Test
 2     public void test2(){
 3         RestClient lowLevelRestClient = RestClient.builder(
 4                 new HttpHost("172.16.73.50", 9200, "http")).build();
 5         RestHighLevelClient client =
 6                 new RestHighLevelClient(lowLevelRestClient);
 7         SearchRequest searchRequest = new SearchRequest("bank");
 8         searchRequest.types("account");
 9         TermsAggregationBuilder aggregation = AggregationBuilders.terms("group_by_state")
10                 .field("state.keyword");
11         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
12         searchSourceBuilder.aggregation(aggregation);
13         searchSourceBuilder.size(0);
14         searchRequest.source(searchSourceBuilder);
15         try {
16             SearchResponse searchResponse = client.search(searchRequest);
17             System.out.println(searchResponse.toString());
18         } catch (IOException e) {
19             e.printStackTrace();
20         }
21         
22     }

Search response

Search response返回對象與其在API里的一樣,返回一些元數據和文檔數據。
首先,返回對象里的數據十分重要,因為這是查詢的返回結果、使用分片情況、文檔數據,HTTP狀態碼等

1 RestStatus status = searchResponse.status();
2 TimeValue took = searchResponse.getTook();
3 Boolean terminatedEarly = searchResponse.isTerminatedEarly();
4 boolean timedOut = searchResponse.isTimedOut();

其次,返回對象里面包含關於分片的信息和分片失敗的處理:

1 int totalShards = searchResponse.getTotalShards();
2 int successfulShards = searchResponse.getSuccessfulShards();
3 int failedShards = searchResponse.getFailedShards();
4 for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
5     // failures should be handled here
6 }

取回searchHit

為了取回文檔數據,我們要從search response的返回對象里先得到searchHit對象:

1 SearchHits hits = searchResponse.getHits();

取回文檔數據:

 1     @Test
 2     public void test2(){
 3         RestClient lowLevelRestClient = RestClient.builder(
 4                 new HttpHost("172.16.73.50", 9200, "http")).build();
 5         RestHighLevelClient client =
 6                 new RestHighLevelClient(lowLevelRestClient);
 7         SearchRequest searchRequest = new SearchRequest("bank");
 8         searchRequest.types("account");
 9         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
10         searchRequest.source(searchSourceBuilder);
11         try {
12             SearchResponse searchResponse = client.search(searchRequest);
13             SearchHits searchHits = searchResponse.getHits();
14             SearchHit[] searchHit = searchHits.getHits();
15             for (SearchHit hit : searchHit) {
16                 System.out.println(hit.getSourceAsString());
17             }
18         } catch (IOException e) {
19             e.printStackTrace();
20         }
21         
22     }

根據需要,還可以轉換成其他數據類型:

1 String sourceAsString = hit.getSourceAsString();
2 Map<String, Object> sourceAsMap = hit.getSourceAsMap();
3 String documentTitle = (String) sourceAsMap.get("title");
4 List<Object> users = (List<Object>) sourceAsMap.get("user");
5 Map<String, Object> innerObject = (Map<String, Object>) sourceAsMap.get("innerObject");

取回聚合數據

聚合數據可以通過SearchResponse返回對象,取到它的根節點,然后再根據名稱取到聚合數據。

GET /bank/_search?pretty
{
  "size": 0,
  "aggs": {
    "group_by_state": {
      "terms": {
        "field": "state.keyword"
      }
    }
  }
}

響應:

{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 999,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "group_by_state": {
      "doc_count_error_upper_bound": 20,
      "sum_other_doc_count": 770,
      "buckets": [
        {
          "key": "ID",
          "doc_count": 27
        },
        {
          "key": "TX",
          "doc_count": 27
        },
        {
          "key": "AL",
          "doc_count": 25
        },
        {
          "key": "MD",
          "doc_count": 25
        },
        {
          "key": "TN",
          "doc_count": 23
        },
        {
          "key": "MA",
          "doc_count": 21
        },
        {
          "key": "NC",
          "doc_count": 21
        },
        {
          "key": "ND",
          "doc_count": 21
        },
        {
          "key": "MO",
          "doc_count": 20
        },
        {
          "key": "AK",
          "doc_count": 19
        }
      ]
    }
  }
}

Java實現:

 1     @Test
 2     public void test2(){
 3         RestClient lowLevelRestClient = RestClient.builder(
 4                 new HttpHost("172.16.73.50", 9200, "http")).build();
 5         RestHighLevelClient client =
 6                 new RestHighLevelClient(lowLevelRestClient);
 7         SearchRequest searchRequest = new SearchRequest("bank");
 8         searchRequest.types("account");
 9         TermsAggregationBuilder aggregation = AggregationBuilders.terms("group_by_state")
10                 .field("state.keyword");
11         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
12         searchSourceBuilder.aggregation(aggregation);
13         searchSourceBuilder.size(0);
14         searchRequest.source(searchSourceBuilder);
15         try {
16             SearchResponse searchResponse = client.search(searchRequest);
17             Aggregations aggs = searchResponse.getAggregations();
18             Terms byStateAggs = aggs.get("group_by_state");
19             Terms.Bucket b = byStateAggs.getBucketByKey("ID"); //只取key是ID的bucket
20             System.out.println(b.getKeyAsString()+","+b.getDocCount());
21             System.out.println("!!!");
22             List<? extends Bucket> aggList = byStateAggs.getBuckets();//獲取bucket數組里所有數據
23             for (Bucket bucket : aggList) {
24                 System.out.println("key:"+bucket.getKeyAsString()+",docCount:"+bucket.getDocCount());;
25             }
26         } catch (IOException e) {
27             e.printStackTrace();
28         }
29     }

 

參考:https://www.jianshu.com/p/5cb91ed22956

參考:https://my.oschina.net/u/3795437/blog/2253366


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM