Spring與RestHighLevelClient


  • Elasticsearch連接方式有兩種;分別為TCP協議HTTP協議

最近使用es比較多,之前使用一直是使用spring封裝的spring-data-elasticsearch;關於spring-data-elasticsearch有以下幾點比較難受:

  • 基於TCP協議的使用(不確定是否支持http, 公司XX雲大佬推薦使用HTTP協議,好像是官方推薦?)

  • 版本對應比較惡心人

  • 不好用

  • 基於以上幾點,索性拋棄spring-data-elasticsearch,自己造輪子;

  • 根據 官方文檔 描述,我們選擇使用RestHighLevelClient來實現es基礎查詢;


官方描述:

The Java REST Client comes in 2 flavors:

Java Low Level REST Client: the official low-level client for Elasticsearch. It allows to communicate with an Elasticsearch cluster through http. Leaves requests marshalling and responses un-marshalling to users. It is compatible with all Elasticsearch versions.
Java High Level REST Client: the official high-level client for Elasticsearch. Based on the low-level client, it exposes API specific methods and takes care of requests marshalling and responses un-marshalling.

  • 提供Java Low Level REST Client 版本和 Java High Level REST Client 版本:

    • Java Low Level REST Client 與所有Elasticsearch版本兼容(版本問題舒服)
    • 通過HTTP協議與Elasticsearch集群進行通信(大佬推薦)
    • Java High Level REST Client 是基於Java Low Level REST Client 版本實現更多高級API
  • 很顯然我們選擇RestHighLevelClient


Spring整合RestHighLevelClient

  1. 構建ElasticsearchClient
  • 查看RestHighLevelClient構造器可以發現可以使用RestClientBuilder來構建,簡單demo如下
    /**
     * 連接超時時間
     */
    private final static int CONNECT_TIMEOUT = 5000;
    /**
     * 連接超時時間
     */
    private final static int SOCKET_TIMEOUT = 40000;
    /**
     * 獲取連接的超時時間
     */
    private final static int CONNECTION_REQUEST_TIMEOUT = 1000;
    /**
     * 最大連接數
     */
    private final static int MAX_CONNECT_NUM = 100;
    /**
     * 最大路由連接數
     */
    private final static int MAX_CONNECT_ROUTE = 100;

    @Bean(name = "elasticsearchClient", destroyMethod = "close")
    public RestHighLevelClient client() {
        RestClientBuilder builder = RestClient.builder(new HttpHost("host", "port", "http"));
        // 配置一些請求配置的參數
        builder.setRequestConfigCallback(requestConfigBuilder -> {
            requestConfigBuilder.setConnectTimeout(CONNECT_TIMEOUT);
            requestConfigBuilder.setSocketTimeout(SOCKET_TIMEOUT);
            requestConfigBuilder.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT);
            return requestConfigBuilder;
        });
        // 配置一些httpClient的參數
        builder.setHttpClientConfigCallback(httpClientBuilder -> {
            httpClientBuilder.setMaxConnTotal(MAX_CONNECT_NUM);
            httpClientBuilder.setMaxConnPerRoute(MAX_CONNECT_ROUTE);
            return httpClientBuilder;
        });
        builder.setFailureListener(new RestClient.FailureListener(){
            @Override
            public void onFailure(HttpHost host) {
                // TODO do something when failed
                super.onFailure(host);
            }
        });
        return new RestHighLevelClient(builder);
    }
  • 支持一些回調與參數的配置,具體的API可自行查看RestClientBuilder的源碼
  • 配置完client后我們可以使用client造一些簡單的輪子, 如es默認查詢只可以查詢1000條數據,我們可以封裝查詢所有數據
    public List<SearchHit> searchAll(SearchRequest searchRequest) {
        try {
            List<SearchHit> hits = new ArrayList<>(16);
            int maxNum = searchRequest.source().size();
            searchRequest.scroll(TimeValue.timeValueMinutes(10));
            SearchResponse search = client.search(searchRequest);
            hits.addAll(Arrays.asList(search.getHits().getHits()));
            while (search.getHits().getHits().length == maxNum) {
                SearchScrollRequest searchScrollRequest = new SearchScrollRequest(search.getScrollId());
                searchScrollRequest.scroll(TimeValue.timeValueMinutes(10));
                search = client.searchScroll(searchScrollRequest);
                hits.addAll(Arrays.asList(search.getHits().getHits()));
            }
            return hits;
        } catch (IOException e) {
            log.error("Get message error.", e);
            return null;
        }
    }
  • 有了以上接口,我們可以查詢一些常用數據,如以下為查詢數據的簡單使用:
        BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();
        boolBuilder.filter(QueryBuilders.termQuery("type", 0));
        boolBuilder.filter(QueryBuilders.termsQuery("id.keyword", id));

        RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("createTime");
        rangeQueryBuilder.gte(startTime);
        rangeQueryBuilder.lte(endTime);
        boolBuilder.filter(rangeQueryBuilder);

        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.size(9999);
        sourceBuilder.fetchSource(new String[]{"field1", "field2", "field3"}, new String[]{});
        sourceBuilder.query(boolBuilder);

        SearchRequest searchRequest = new SearchRequest("index");
        searchRequest.source(sourceBuilder);
        List<SearchHit> searchHits = repository.searchAll(searchRequest);
  • 具體API使用可查看官方文檔

更新於2019-10-28

IndexRequest indexRequest = new IndexRequest(index, type, id);
indexRequest.source(entityMapper.mapToString(map), Requests.INDEX_CONTENT_TYPE);
return client.index(indexRequest);
  • 官方API中IndexRequest提供以下幾種source方法:

    • 值得注意的是source(Map source)source(Map source, XContentType contentType) 方法,對於Map的傳參,會進行類型校驗;
    • 源碼如下:
     public IndexRequest source(Map source, XContentType contentType) throws ElasticsearchGenerationException {
         try {
             XContentBuilder builder = XContentFactory.contentBuilder(contentType);
             builder.map(source);
             return source(builder);
         } catch (IOException e) {
             throw new ElasticsearchGenerationException("Failed to generate [" + source + "]", e);
         }
     }
    
  • 其中builder.map中的unknownValue方法會遍歷參數進行逐一校驗:

  • 源碼如下:

   private XContentBuilder map(Map<String, ?> values, boolean ensureNoSelfReferences) throws IOException {
       if (values == null) {
           return this.nullValue();
       } else {
           if (ensureNoSelfReferences) {
               ensureNoSelfReferences(values);
           }

           this.startObject();
           Iterator var3 = values.entrySet().iterator();

           while(var3.hasNext()) {
               Entry<String, ?> value = (Entry)var3.next();
               this.field((String)value.getKey());
               this.unknownValue(value.getValue(), false);
           }

           this.endObject();
           return this;
       }
   }
  • 檢驗方法源碼
    private void unknownValue(Object value, boolean ensureNoSelfReferences) throws IOException {
        if (value == null) {
            this.nullValue();
        } else {
            XContentBuilder.Writer writer = (XContentBuilder.Writer)WRITERS.get(value.getClass());
            if (writer != null) {
                writer.write(this, value);
            } else if (value instanceof Path) {
                this.value((Path)value);
            } else if (value instanceof Map) {
                Map<String, ?> valueMap = (Map)value;
                this.map(valueMap, ensureNoSelfReferences);
            } else if (value instanceof Iterable) {
                this.value((Iterable)value, ensureNoSelfReferences);
            } else if (value instanceof Object[]) {
                this.values((Object[])value, ensureNoSelfReferences);
            } else if (value instanceof ToXContent) {
                this.value((ToXContent)value);
            } else {
                if (!(value instanceof Enum)) {
                    throw new IllegalArgumentException("cannot write xcontent for unknown value of type " + value.getClass());
                }

                this.value(Objects.toString(value));
            }

        }
    }
  • 為了避免這個坑,可以使用jsonString來規避,具體使用如下:
	IndexRequest indexRequest = new IndexRequest(index, type, id);
        indexRequest.source(JSON.toJSONString(map), Requests.INDEX_CONTENT_TYPE);
        client.index(indexRequest);


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM