ElasticSearch與Java集成-RestHighLevelClient


1.  簡介

  RestHighLevelClient是官方指定的Java連接ElasticSearch的API。

  需要使用maven引用以下依賴:

<dependency>
  <groupId>org.elasticsearch.client</groupId>
   <artifactId>elasticsearch-rest-high-level-client</artifactId>
   <version>6.5.2</version>
</dependency>

  注意:以上的依賴版本可以根據你使用的ES的版本來定,向下兼容,但是無法向上兼容

 

2. 創建客戶端

  創建客戶端在網上有很多種配置,下面是最簡單的一種,需要其他中詳情配置的可以自己查。

    static String ip = "localhost";
    static int port = 9200;
    static RestHighLevelClient restHighLevelClient = null;
    static TransportClient client = null;
    
    static  RestHighLevelClient initClient(){
        //這里的RestClient.builder(new HttpHost(ip,port),.........)支持多個httphost連接,也就是支持連接多個elasticsearch
        restHighLevelClient = new RestHighLevelClient(RestClient.builder(new HttpHost(ip,port)));
    }

    static Connection getInstance(){
        synchronized (ElasticSearchFactory.class){
            if (restHighLevelClient == null){
             restHighLevelClient  =   initClient();
            }
        }
        return connection;
    }

 

注意:以下操作都是個使用了RestHighLevelClient的API對ElasticSearch進行操作,各版本之間的語法可能略有區別。

3. 索引操作

3.1 創建索引 

  創建索引操作,最簡單的操作就是只傳一個索引名稱,CreateIndexRequest request = new CreateIndexRequest(index); 至於下面組織setting和mappings的XContentBuilder對象,是可以不傳,也可以按照自己的業務自行定義。

@Autowired
private RestHighLevelClient client;

  下面出現的變量client,都是引用的RestHighLevelClient

    /**
     * 創建索引
     * @param index  索引名稱
     * @throws IOException e
     */
    public void createIndex(String index) throws IOException {
        CreateIndexRequest request = new CreateIndexRequest(index);
        XContentBuilder builder = JsonXContent.contentBuilder()
                .startObject()
                    .startObject("settings").field("number_of_shards", 3).field("number_of_replicas", 1).endObject()
                    .startObject("mappings")
                        .startObject("doc")
                            .startObject("properties")
                                .startObject("title").field("type", "text").endObject()
                                .startObject("content").field("type", "text").field("index", true).endObject()
                                .startObject("uniqueId").field("type", "keyword").field("index", false).endObject()
                                .startObject("created").field("type", "date").field("format", "strict_date_optional_time||epoch_millis").endObject()
                            .endObject()
                        .endObject()
                    .endObject()
                .endObject();
        request.source(builder);
        CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
        System.out.println("createIndex: " + JSONUtil.toJsonStr(createIndexResponse));
    }

 

3.2 查詢索引列表

    /**
     * 查詢索引列表
     * @return Set<String>
     * @throws IOException e
     */
    public Set<String> indices() throws IOException {
        GetAliasesRequest request = new GetAliasesRequest();
        GetAliasesResponse getAliasesResponse =  client.indices().getAlias(request,RequestOptions.DEFAULT);
        Map<String, Set<AliasMetaData>> aliases = getAliasesResponse.getAliases();
        return aliases.keySet();
    }

 

3.3 判斷索引存在

    /**
     * 判斷索引是否存在,在創建索引之前使用
     * @param index 索引名稱
     * @return boolean
     * @throws IOException e
     */
    public boolean existsIndex(String index) throws IOException {
        GetIndexRequest request = new GetIndexRequest();
        request.indices(index);
        return client.indices().exists(request, RequestOptions.DEFAULT);
    }

 

4. 數據操作

4.1 新增數據

    /**
     * 新增數據
     * @param index 索引
     * @param type  索引類型
     * @param id    數據ID
     * @param object 數據對象
     */public void addData(String index, String type, String id, JSONObject object) throws IOException {
        CreateIndexRequest request = new CreateIndexRequest();
        client.indices().create(request, RequestOptions.DEFAULT);

        IndexRequest indexRequest = new IndexRequest(index, type, id);
        indexRequest.source(JSONUtil.toJsonStr(object), XContentType.JSON);
        IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
        log.info("新增結果: " + JSONUtil.toJsonStr(indexResponse));
    }

 

4.2 判斷數據是否存在

    /**
     * 判斷數據是否存在
     * @param index 索引
     * @param type  索引類型
     * @param id    數據ID
     * @return boolean
     */
    public boolean exists(String index, String type, String id) throws IOException {
        GetRequest getRequest = new GetRequest(index, type, id);
        getRequest.fetchSourceContext(new FetchSourceContext(false));
        getRequest.storedFields("_none_");
        boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
        log.info("判斷數據是否存在:" + exists);
        return exists;
    }

 

4.3 修改數據

    /**
     * 修改數據
     * @param index  索引
     * @param type   索引類型
     * @param id     數據ID
     * @param object 修改的數據
     */
    public void updateData(String index, String type, String id, JSONObject object) throws IOException {
        UpdateRequest request = new UpdateRequest(index, type, id);
        request.doc(JSONUtil.toJsonStr(object), XContentType.JSON);
        UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
        log.info("修改數據結果: " + JSONUtil.toJsonStr(updateResponse));
    }

 

4.4 刪除數據

    /**
     * 刪除數據
     * @param index 索引
     * @param type  索引類型
     * @param id    數據ID
     */
    public void deleteData(String index, String type, String id) throws IOException {
        DeleteRequest deleteRequest = new DeleteRequest(index, type, id);
        DeleteResponse response = client.delete(deleteRequest, RequestOptions.DEFAULT);
        System.out.println("刪除結果: " + JSONUtil.toJsonStr(response));
    }

 

4.5 批量操作數據

  下面的方法是模擬了批量新增、修改、刪除,各位可以按照自己的需求修改。

public void bulk(List<UserBean> userList) throws IOException {
        // 批量增加
        BulkRequest bulkAddRequest = new BulkRequest();
        for (UserBean user : userList) {
            IndexRequest indexRequest = new IndexRequest(INDEX_TEST, TYPE_TEST, user.getId().toString());
            indexRequest.source(JSONUtil.toJsonStr(user), XContentType.JSON);
            bulkAddRequest.add(indexRequest);
        }
        
        BulkResponse bulkAddResponse = client.bulk(bulkAddRequest, RequestOptions.DEFAULT);
        System.out.println("bulkAdd: " + JSONUtil.toJsonStr(bulkAddResponse));

        // 批量更新
        BulkRequest bulkUpdateRequest = new BulkRequest();
        for (UserBean user : userList) {
            user.setName(user.getName() + " updated");
            UpdateRequest updateRequest = new UpdateRequest(INDEX_TEST, TYPE_TEST, user.getId().toString());
            updateRequest.doc(JSONUtil.toJsonStr(user), XContentType.JSON);
            bulkUpdateRequest.add(updateRequest);
        }
        BulkResponse bulkUpdateResponse = client.bulk(bulkUpdateRequest, RequestOptions.DEFAULT);
        System.out.println("bulkUpdate: " + JSONUtil.toJsonStr(bulkUpdateResponse));
        
        // 批量刪除
        BulkRequest bulkDeleteRequest = new BulkRequest();
        for (UserBean user : userList) {
            DeleteRequest deleteRequest = new DeleteRequest(INDEX_TEST, TYPE_TEST, user.getId().toString());
            bulkDeleteRequest.add(deleteRequest);
        }
        BulkResponse bulkDeleteResponse = client.bulk(bulkDeleteRequest, RequestOptions.DEFAULT);
        System.out.println("bulkDelete: " + JSONUtil.toJsonStr(bulkDeleteResponse));
    }

 

5. 數據查詢

5.1  關鍵字查詢,匹配單個字段

  查詢條件都是通過QueryBuilders對象進行創建。

  SearchSourceBuilder是用來拼接其他查詢參數,比如分頁參數,返回的字段值,排序等。

  SearchRequest用來指定查詢的索引index和type

  SearchResponse 是用來接收查詢結果,response.getHits().getHits()是查詢的數據結果集。

    /**
     * 關鍵字查詢,匹配單個字段
     * @param index  索引
     * @param type   索引類型
     * @param field  查詢字段
     * @param value  查詢參數
     * @return String
     */
    public String searchMatch(String index, String type, String field, String value) throws IOException {
        MatchQueryBuilder matchQueryBuilder = QueryBuilders.matchQuery(field, value);
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query(matchQueryBuilder);
        sourceBuilder.from(0);   // 查詢起始行,分頁參數
        sourceBuilder.size(100); // 獲取記錄數,默認10,分頁參數
        sourceBuilder.fetchSource(new String[] { "id", "name" }, new String[] {}); // 第一個參數是要獲取字段,第二個是要過濾的字段,默認獲取返回全部字段
        sourceBuilder.sort("id", SortOrder.DESC); // 排序

        SearchRequest searchRequest = new SearchRequest(index).types(type).source(sourceBuilder);
        SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
        SearchHits hits = response.getHits();
        SearchHit[] searchHits = hits.getHits();
        for (SearchHit hit : searchHits) {
            log.info("查詢結果:" + hit.getSourceAsString());
        }
        return JSONUtil.toJsonStr(searchHits);
    }

 

5.2 關鍵字查詢,匹配多個字段

    /**
     * 關鍵字查詢,匹配多個字段
     * @param index  索引
     * @param type   索引類型
     * @param fields  查詢多個字段
     * @param value  查詢參數
     * @return String
     */public String searchMultiMatch(String index, String type, List<String> fields, String value) throws IOException {
        MultiMatchQueryBuilder multiMatchQueryBuilder = QueryBuilders.multiMatchQuery(value, fields.toArray(new String[]{}));
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query(multiMatchQueryBuilder).from(0).size(100);
        SearchRequest searchRequest = new SearchRequest(index).types(type).source(sourceBuilder);
        SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
        ......
    }

 

5.3 關鍵字查詢,范圍查詢

    /**
     * 關鍵字查詢,范圍查詢(適用於數字類型、日期類型)
     * @param index  索引
     * @param type   索引類型
     * @param field  查詢字段
     * @param start  開始參數
     * @param end    結束參數
     */
    @Override
    public String searchRange(String index, String type, String field, int start, int end) throws IOException {
        RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(field).from(start).to(end);
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query(rangeQueryBuilder).from(0).size(100);
        SearchRequest searchRequest = new SearchRequest(index).types(type).source(sourceBuilder);
        SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
        ......
    }

 

5.4 關鍵字精確查詢

    /**
     * 關鍵字精確查詢(精確值可能是數字、時間、布爾或者not_analyzed的字符串)
     * @param index  索引
     * @param type   索引類型
     * @param field  查詢字段
     * @param value  查詢值
     */public String searchTerm(String index, String type, String field, String value) throws IOException {
        TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(field, value);
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query(termQueryBuilder).from(0).size(100);
        SearchRequest searchRequest = new SearchRequest(index).types(type).source(sourceBuilder);
        SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
        ......
    }

 

5.5 組合查詢

    /**
     * 組合查詢
     * must 文檔 必須 匹配這些條件才能被包含進來。
     * must_not 文檔 必須不 匹配這些條件才能被包含進來。
     * should 如果滿足這些語句中的任意語句,將增加 _score ,否則,無任何影響。它們主要用於修正每個文檔的相關性得分。
     * filter 必須 匹配,但它以不評分、過濾模式來進行。這些語句對評分沒有貢獻,只是根據過濾標准來排除或包含文檔。
     * @param index  索引
     * @param type   索引類型
     * @param field  查詢字段
     * @param value  查詢值
     */
    public String searchBool(String index, String type, String field, String value) throws IOException {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        builder.must(QueryBuilders.matchQuery("title", "我是誰"));
        builder.mustNot(QueryBuilders.matchQuery("title", "你"));
        builder.should(QueryBuilders.matchQuery("desc", "啊啊啊"));
        builder.filter(QueryBuilders.termsQuery("author", "老王"));
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query(builder).from(0).size(100);
        SearchRequest searchRequest = new SearchRequest(index).types(type).source(sourceBuilder);
        SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
        ......
    }

 

6. 聚合

6.1 基礎聚合

        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        TermsAggregationBuilder categoryAggregationBuilder = AggregationBuilders.terms("byCategory").field("category");
        TermsAggregationBuilder sourceAggregationBuilder = AggregationBuilders.terms("bySource").field("source");
        sourceBuilder.aggregation(categoryAggregationBuilder).aggregation(sourceAggregationBuilder);

        SearchRequest searchRequest = new SearchRequest(INDEX_TEST).types(TYPE_TEST).source(sourceBuilder);
        SearchResponse response = rhlClient.search(searchRequest, RequestOptions.DEFAULT);
        if (response != null) {
            ParsedLongTerms resourceTerms = response.getAggregations().get("byCategory");
            for (Terms.Bucket entry : resourceTerms.getBuckets()) {
                System.out.println("統計key=" + entry.getKeyAsString() + ", 統計數量:"+  entry.getDocCount());
            }
        } else {
            log.error("資源狀態聚合查詢失敗:" + searchRequest.toString());
        }

   代碼解釋:

  1. 使用AggregationBuilders創建agg對象,terms的值是此次聚合的的別名,field是要聚合的字段

  2. sourceBuilder.aggregation(****) 用來傳入agg對象,可以同時接收多個agg對象,表示同時對多個維度進行統計。

  3. response.getAggregations().get("byCategory"); 結果集中,getAggregations獲取聚合對象,get("byCategory")獲取byCategory別名的統計結果,結果集要的數據類型要區別清除

6.2 聚合過濾

  在執行聚合操作之前,先用過濾條件把需要聚合的數據過濾出來。

  注意:聚合、過濾可以嵌套,並且隨意組合,造成返回的SearchResponse對象,組裝的數據格式無法統一,所以只能按照自己的業務提取最終數據。

        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.aggregation(AggregationBuilders
                        .filter("categoryFilter", QueryBuilders.termQuery("status", 3))
                        .subAggregation(AggregationBuilders.terms("byCategory").field("category")))
                .aggregation(AggregationBuilders.terms("bySource").field("source"))
                .size(0);
        // 設置索引,類型
        SearchRequest searchRequest = new SearchRequest(properties.getEsIndex()).types(properties.getEsType()).source(sourceBuilder);
        SearchResponse response;
        try {
            response = rhlClient.search(searchRequest, RequestOptions.DEFAULT);
            if (response != null) {
                // 分享狀態結果
                ParsedFilter filter = response.getAggregations().get("categoryFilter");
                ParsedStringTerms sharedTerms = filter.getAggregations().get("byCategory");
                for (Terms.Bucket entry : sharedTerms.getBuckets()) {
                    System.out.println("統計key=" + entry.getKeyAsString() + ", 統計數量:"+  entry.getDocCount());
                }
                // 資源狀態結果
                ParsedLongTerms resourceTerms = response.getAggregations().get("bySource");
                for (Terms.Bucket entry : resourceTerms.getBuckets()) {
                    System.out.println("統計key=" + entry.getKeyAsString() + ", 統計數量:"+  entry.getDocCount());

                }
            }
        } catch (Exception e) {
            ......
        }

 

 

相同的主題的其他文章:

  https://mp.weixin.qq.com/s/94j352V5xt9Tpqe00-eiQA

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM