1. 簡介
RestHighLevelClient是官方指定的Java連接ElasticSearch的API。
需要使用maven引用以下依賴:
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>6.5.2</version> </dependency>
注意:以上的依賴版本可以根據你使用的ES的版本來定,向下兼容,但是無法向上兼容
2. 創建客戶端
創建客戶端在網上有很多種配置,下面是最簡單的一種,需要其他中詳情配置的可以自己查。
static String ip = "localhost"; static int port = 9200; static RestHighLevelClient restHighLevelClient = null; static TransportClient client = null; static RestHighLevelClient initClient(){ //這里的RestClient.builder(new HttpHost(ip,port),.........)支持多個httphost連接,也就是支持連接多個elasticsearch restHighLevelClient = new RestHighLevelClient(RestClient.builder(new HttpHost(ip,port))); } static Connection getInstance(){ synchronized (ElasticSearchFactory.class){ if (restHighLevelClient == null){ restHighLevelClient = initClient(); } } return connection; }
注意:以下操作都是個使用了RestHighLevelClient的API對ElasticSearch進行操作,各版本之間的語法可能略有區別。
3. 索引操作
3.1 創建索引
創建索引操作,最簡單的操作就是只傳一個索引名稱,CreateIndexRequest request = new CreateIndexRequest(index); 至於下面組織setting和mappings的XContentBuilder對象,是可以不傳,也可以按照自己的業務自行定義。
@Autowired private RestHighLevelClient client;
下面出現的變量client,都是引用的RestHighLevelClient
/** * 創建索引 * @param index 索引名稱 * @throws IOException e */ public void createIndex(String index) throws IOException { CreateIndexRequest request = new CreateIndexRequest(index); XContentBuilder builder = JsonXContent.contentBuilder() .startObject() .startObject("settings").field("number_of_shards", 3).field("number_of_replicas", 1).endObject() .startObject("mappings") .startObject("doc") .startObject("properties") .startObject("title").field("type", "text").endObject() .startObject("content").field("type", "text").field("index", true).endObject() .startObject("uniqueId").field("type", "keyword").field("index", false).endObject() .startObject("created").field("type", "date").field("format", "strict_date_optional_time||epoch_millis").endObject() .endObject() .endObject() .endObject() .endObject(); request.source(builder); CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT); System.out.println("createIndex: " + JSONUtil.toJsonStr(createIndexResponse)); }
3.2 查詢索引列表
/** * 查詢索引列表 * @return Set<String> * @throws IOException e */ public Set<String> indices() throws IOException { GetAliasesRequest request = new GetAliasesRequest(); GetAliasesResponse getAliasesResponse = client.indices().getAlias(request,RequestOptions.DEFAULT); Map<String, Set<AliasMetaData>> aliases = getAliasesResponse.getAliases(); return aliases.keySet(); }
3.3 判斷索引存在
/** * 判斷索引是否存在,在創建索引之前使用 * @param index 索引名稱 * @return boolean * @throws IOException e */ public boolean existsIndex(String index) throws IOException { GetIndexRequest request = new GetIndexRequest(); request.indices(index); return client.indices().exists(request, RequestOptions.DEFAULT); }
4. 數據操作
4.1 新增數據
/** * 新增數據 * @param index 索引 * @param type 索引類型 * @param id 數據ID * @param object 數據對象 */public void addData(String index, String type, String id, JSONObject object) throws IOException { CreateIndexRequest request = new CreateIndexRequest(); client.indices().create(request, RequestOptions.DEFAULT); IndexRequest indexRequest = new IndexRequest(index, type, id); indexRequest.source(JSONUtil.toJsonStr(object), XContentType.JSON); IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT); log.info("新增結果: " + JSONUtil.toJsonStr(indexResponse)); }
4.2 判斷數據是否存在
/** * 判斷數據是否存在 * @param index 索引 * @param type 索引類型 * @param id 數據ID * @return boolean */ public boolean exists(String index, String type, String id) throws IOException { GetRequest getRequest = new GetRequest(index, type, id); getRequest.fetchSourceContext(new FetchSourceContext(false)); getRequest.storedFields("_none_"); boolean exists = client.exists(getRequest, RequestOptions.DEFAULT); log.info("判斷數據是否存在:" + exists); return exists; }
4.3 修改數據
/** * 修改數據 * @param index 索引 * @param type 索引類型 * @param id 數據ID * @param object 修改的數據 */ public void updateData(String index, String type, String id, JSONObject object) throws IOException { UpdateRequest request = new UpdateRequest(index, type, id); request.doc(JSONUtil.toJsonStr(object), XContentType.JSON); UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT); log.info("修改數據結果: " + JSONUtil.toJsonStr(updateResponse)); }
4.4 刪除數據
/** * 刪除數據 * @param index 索引 * @param type 索引類型 * @param id 數據ID */ public void deleteData(String index, String type, String id) throws IOException { DeleteRequest deleteRequest = new DeleteRequest(index, type, id); DeleteResponse response = client.delete(deleteRequest, RequestOptions.DEFAULT); System.out.println("刪除結果: " + JSONUtil.toJsonStr(response)); }
4.5 批量操作數據
下面的方法是模擬了批量新增、修改、刪除,各位可以按照自己的需求修改。
public void bulk(List<UserBean> userList) throws IOException { // 批量增加 BulkRequest bulkAddRequest = new BulkRequest(); for (UserBean user : userList) { IndexRequest indexRequest = new IndexRequest(INDEX_TEST, TYPE_TEST, user.getId().toString()); indexRequest.source(JSONUtil.toJsonStr(user), XContentType.JSON); bulkAddRequest.add(indexRequest); } BulkResponse bulkAddResponse = client.bulk(bulkAddRequest, RequestOptions.DEFAULT); System.out.println("bulkAdd: " + JSONUtil.toJsonStr(bulkAddResponse)); // 批量更新 BulkRequest bulkUpdateRequest = new BulkRequest(); for (UserBean user : userList) { user.setName(user.getName() + " updated"); UpdateRequest updateRequest = new UpdateRequest(INDEX_TEST, TYPE_TEST, user.getId().toString()); updateRequest.doc(JSONUtil.toJsonStr(user), XContentType.JSON); bulkUpdateRequest.add(updateRequest); } BulkResponse bulkUpdateResponse = client.bulk(bulkUpdateRequest, RequestOptions.DEFAULT); System.out.println("bulkUpdate: " + JSONUtil.toJsonStr(bulkUpdateResponse)); // 批量刪除 BulkRequest bulkDeleteRequest = new BulkRequest(); for (UserBean user : userList) { DeleteRequest deleteRequest = new DeleteRequest(INDEX_TEST, TYPE_TEST, user.getId().toString()); bulkDeleteRequest.add(deleteRequest); } BulkResponse bulkDeleteResponse = client.bulk(bulkDeleteRequest, RequestOptions.DEFAULT); System.out.println("bulkDelete: " + JSONUtil.toJsonStr(bulkDeleteResponse)); }
5. 數據查詢
5.1 關鍵字查詢,匹配單個字段
查詢條件都是通過QueryBuilders對象進行創建。
SearchSourceBuilder是用來拼接其他查詢參數,比如分頁參數,返回的字段值,排序等。
SearchRequest用來指定查詢的索引index和type
SearchResponse 是用來接收查詢結果,response.getHits().getHits()是查詢的數據結果集。
/** * 關鍵字查詢,匹配單個字段 * @param index 索引 * @param type 索引類型 * @param field 查詢字段 * @param value 查詢參數 * @return String */ public String searchMatch(String index, String type, String field, String value) throws IOException { MatchQueryBuilder matchQueryBuilder = QueryBuilders.matchQuery(field, value); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(matchQueryBuilder); sourceBuilder.from(0); // 查詢起始行,分頁參數 sourceBuilder.size(100); // 獲取記錄數,默認10,分頁參數 sourceBuilder.fetchSource(new String[] { "id", "name" }, new String[] {}); // 第一個參數是要獲取字段,第二個是要過濾的字段,默認獲取返回全部字段 sourceBuilder.sort("id", SortOrder.DESC); // 排序 SearchRequest searchRequest = new SearchRequest(index).types(type).source(sourceBuilder); SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT); SearchHits hits = response.getHits(); SearchHit[] searchHits = hits.getHits(); for (SearchHit hit : searchHits) { log.info("查詢結果:" + hit.getSourceAsString()); } return JSONUtil.toJsonStr(searchHits); }
5.2 關鍵字查詢,匹配多個字段
/** * 關鍵字查詢,匹配多個字段 * @param index 索引 * @param type 索引類型 * @param fields 查詢多個字段 * @param value 查詢參數 * @return String */public String searchMultiMatch(String index, String type, List<String> fields, String value) throws IOException { MultiMatchQueryBuilder multiMatchQueryBuilder = QueryBuilders.multiMatchQuery(value, fields.toArray(new String[]{})); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(multiMatchQueryBuilder).from(0).size(100); SearchRequest searchRequest = new SearchRequest(index).types(type).source(sourceBuilder); SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT); ...... }
5.3 關鍵字查詢,范圍查詢
/** * 關鍵字查詢,范圍查詢(適用於數字類型、日期類型) * @param index 索引 * @param type 索引類型 * @param field 查詢字段 * @param start 開始參數 * @param end 結束參數 */ @Override public String searchRange(String index, String type, String field, int start, int end) throws IOException { RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(field).from(start).to(end); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(rangeQueryBuilder).from(0).size(100); SearchRequest searchRequest = new SearchRequest(index).types(type).source(sourceBuilder); SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT); ...... }
5.4 關鍵字精確查詢
/** * 關鍵字精確查詢(精確值可能是數字、時間、布爾或者not_analyzed的字符串) * @param index 索引 * @param type 索引類型 * @param field 查詢字段 * @param value 查詢值 */public String searchTerm(String index, String type, String field, String value) throws IOException { TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(field, value); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(termQueryBuilder).from(0).size(100); SearchRequest searchRequest = new SearchRequest(index).types(type).source(sourceBuilder); SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT); ...... }
5.5 組合查詢
/** * 組合查詢 * must 文檔 必須 匹配這些條件才能被包含進來。 * must_not 文檔 必須不 匹配這些條件才能被包含進來。 * should 如果滿足這些語句中的任意語句,將增加 _score ,否則,無任何影響。它們主要用於修正每個文檔的相關性得分。 * filter 必須 匹配,但它以不評分、過濾模式來進行。這些語句對評分沒有貢獻,只是根據過濾標准來排除或包含文檔。 * @param index 索引 * @param type 索引類型 * @param field 查詢字段 * @param value 查詢值 */ public String searchBool(String index, String type, String field, String value) throws IOException { BoolQueryBuilder builder = QueryBuilders.boolQuery(); builder.must(QueryBuilders.matchQuery("title", "我是誰")); builder.mustNot(QueryBuilders.matchQuery("title", "你")); builder.should(QueryBuilders.matchQuery("desc", "啊啊啊")); builder.filter(QueryBuilders.termsQuery("author", "老王")); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(builder).from(0).size(100); SearchRequest searchRequest = new SearchRequest(index).types(type).source(sourceBuilder); SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT); ...... }
6. 聚合
6.1 基礎聚合
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); TermsAggregationBuilder categoryAggregationBuilder = AggregationBuilders.terms("byCategory").field("category"); TermsAggregationBuilder sourceAggregationBuilder = AggregationBuilders.terms("bySource").field("source"); sourceBuilder.aggregation(categoryAggregationBuilder).aggregation(sourceAggregationBuilder); SearchRequest searchRequest = new SearchRequest(INDEX_TEST).types(TYPE_TEST).source(sourceBuilder); SearchResponse response = rhlClient.search(searchRequest, RequestOptions.DEFAULT); if (response != null) { ParsedLongTerms resourceTerms = response.getAggregations().get("byCategory"); for (Terms.Bucket entry : resourceTerms.getBuckets()) { System.out.println("統計key=" + entry.getKeyAsString() + ", 統計數量:"+ entry.getDocCount()); } } else { log.error("資源狀態聚合查詢失敗:" + searchRequest.toString()); }
代碼解釋:
1. 使用AggregationBuilders創建agg對象,terms的值是此次聚合的的別名,field是要聚合的字段
2. sourceBuilder.aggregation(****) 用來傳入agg對象,可以同時接收多個agg對象,表示同時對多個維度進行統計。
3. response.getAggregations().get("byCategory"); 結果集中,getAggregations獲取聚合對象,get("byCategory")獲取byCategory別名的統計結果,結果集要的數據類型要區別清除
6.2 聚合過濾
在執行聚合操作之前,先用過濾條件把需要聚合的數據過濾出來。
注意:聚合、過濾可以嵌套,並且隨意組合,造成返回的SearchResponse對象,組裝的數據格式無法統一,所以只能按照自己的業務提取最終數據。
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.aggregation(AggregationBuilders .filter("categoryFilter", QueryBuilders.termQuery("status", 3)) .subAggregation(AggregationBuilders.terms("byCategory").field("category"))) .aggregation(AggregationBuilders.terms("bySource").field("source")) .size(0); // 設置索引,類型 SearchRequest searchRequest = new SearchRequest(properties.getEsIndex()).types(properties.getEsType()).source(sourceBuilder); SearchResponse response; try { response = rhlClient.search(searchRequest, RequestOptions.DEFAULT); if (response != null) { // 分享狀態結果 ParsedFilter filter = response.getAggregations().get("categoryFilter"); ParsedStringTerms sharedTerms = filter.getAggregations().get("byCategory"); for (Terms.Bucket entry : sharedTerms.getBuckets()) { System.out.println("統計key=" + entry.getKeyAsString() + ", 統計數量:"+ entry.getDocCount()); } // 資源狀態結果 ParsedLongTerms resourceTerms = response.getAggregations().get("bySource"); for (Terms.Bucket entry : resourceTerms.getBuckets()) { System.out.println("統計key=" + entry.getKeyAsString() + ", 統計數量:"+ entry.getDocCount()); } } } catch (Exception e) { ...... }
相同的主題的其他文章:
https://mp.weixin.qq.com/s/94j352V5xt9Tpqe00-eiQA