Elasticsearch Java Rest Client簡述


ESJavaClient的歷史

JavaAPI Client

  • 優勢:基於transport進行數據訪問,能夠使用ES集群內部的性能特性,性能相對好
  • 劣勢:client版本需要和es集群版本一致,數據序列化通過java實現,es集群或jdk升級,客戶端需要伴隨升級。
 
    ES官網最早提供的Client,spring-data-elasticsearch也基於該client開發,使用transport接口進行通信,其工作方式是將webserver當做集群中的節點,獲取集群數據節點(DataNode)並將請求路由到Node獲取數據將,返回結果在webserver內部進行結果匯總。 client需要與es集群保持相對一致性,否則會出現各種『奇怪』的異常。由於ES集群升級很快,集群升級時客戶端伴隨升級的成本高。
 
官網已聲明es7.0將不在支持transport client(API Client),8.0正時移除
 

REST Client

  • 優勢:REST風格交互,符合ES設計初衷;兼容性強;
  • 劣勢:性能相對API較低
 
ESREST基於http協議,客戶端發送請求到es的任何節點,節點會通過transport接口將請求路由到其他節點,完成數據處理后匯總並返回客戶端,客戶端負載低,。
 
RestFul是ES特性之一,但是直到5.0才有了自己的客戶端,6.0才有的相對好用的客戶端。在此之前JestClient作為第三方客戶端,使用非常廣泛。
 
本文將對Java Rest Client、Java High Level Client、Jest三種Restfull風格的客戶端做簡單的介紹,個人推薦使用JestClient,詳見后文。
 

Java Rest Client

三種客戶端中,感覺最『原始』的客戶端,感覺在使用HttpClient,請求參數繁多,幾乎沒有提供Mapping能力,請求結束后需要拿着json一層層解析,對於開發來說大部分情況下不需要關心"took"、"_shards"、"timed_out"這些屬性,這些更多依賴集群的穩定性,如何方便快捷的拿到_source中的數據才是首要編碼內容。
{
    "took": 9,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 2,
        "max_score": 0.2876821,
        "hits": [
            {
                "_index": "pangu",
                "_type": "normal",
                "_id": "2088201805281551",
                "_score": 0.2876821,
                "_source": {
                    "age": 8,
                    "country": "UK",
                    "id": "2088201805281551",
                    "name": "baby"
                }
            },
            {
                "_index": "pangu",
                "_type": "normal",
                "_id": "2088201805281552",
                "_score": 0.2876821,
                "_source": {
                    "age": 8,
                    "country": "UK",
                    "id": "2088201805281552",
                    "name": "baby"
                }
            }
        ]
    }
}

 

 

Demo

 
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author yanlei
 * @version $Id: RestClientDemo.java, v 0.1 2018年05月26日 下午12:27 yanlei Exp $
 */
public class RestClientDemo {

    private static final Logger LOGGER = LoggerFactory.getLogger(RestClientDemo.class);

    /**
     * index 名稱
     */
    private static final String INDEX_NAME = "pangu";

    /**
     * type 名稱
     */
    private static final String TYPE_NAME = "normal";

    private static RestClient restClient;

    static {
        restClient = RestClient.builder(new HttpHost("search.alipay.com", 9999, "http"))
                .setFailureListener(new RestClient.FailureListener() { // 連接失敗策略
                    @Override
                    public void onFailure(HttpHost host) {
                        LOGGER.error("init client error, host:{}", host);
                    }
                })
                .setMaxRetryTimeoutMillis(10000) // 超時時間
                .setHttpClientConfigCallback(new HttpClientConfigCallback() { // 認證
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("example", "WnhmUwjU"));
                        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    }
                })
                .build();
    }

    public boolean index(String id, String jsonString) {
        String method = "PUT";
        String endpoint = new StringBuilder().append("/").append(INDEX_NAME).append("/").append(TYPE_NAME).append("/").append(id)
                .toString();
        LOGGER.info("method={}, endpoint={}", method, endpoint);

        HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
        Response response = null;
        try {
            response = restClient.performRequest(method, endpoint, Collections.emptyMap(), entity);
            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_CREATED
                    || response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
                LOGGER.info("index success");
                return true;
            } else {
                LOGGER.error("index error : {}", response.toString());
                return false;
            }
        } catch (IOException e) {
            LOGGER.error("system error, id={}, jsonString={}", id, jsonString, e);
            return false;
        }
    }

    public <T> T search(String searchString, CallbackSearch<T> callbackSearch) {
        String method = "GET";
        String endpoint = new StringBuilder().append("/").append(INDEX_NAME).append("/").append(TYPE_NAME).append("/").append("_search")
                .toString();
        LOGGER.info("method={}, endpoint={}", method, endpoint);

        HttpEntity entity = new NStringEntity(searchString, ContentType.APPLICATION_JSON);
        Response response = null;
        try {
            response = restClient.performRequest(method, endpoint, Collections.emptyMap(), entity);
            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
                // 提取數據
                String resultString = EntityUtils.toString(response.getEntity(), "UTF-8");
                return callbackSearch.get(resultString);
            } else {
                LOGGER.error("index false, error : {}", response);
                return null;
            }
        } catch (IOException e) {
            LOGGER.error("system error, searchString={}", searchString, e);
            return null;
        }
    }


    public interface CallbackSearch<T> {
        T get(String responseString);
    }

    public static void main(String[] args) {
        RestClientDemo restClientDemo = new RestClientDemo();

        // 1. 索引數據
        User user = new User();
        user.setId("2088201805281345");
        user.setName("nick");
        user.setAge(16);
        user.setCountry("USA");
        boolean indexOK = restClientDemo.index(user.getId(), JSON.toJSONString(user));
        LOGGER.info("index param={} result={}", user, indexOK);

        // 2. 數據檢索
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
                .query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("name", "nick")))
                .sort("id", SortOrder.DESC)
                .from(0).size(10);

        List<User> searchResult = restClientDemo.search(sourceBuilder.toString(), new CallbackSearch<List<User>>() {
            @Override
            public List<User> get(String responseString) {
                LOGGER.info("responseString={}", responseString);
                List<User> result = new ArrayList<>();
                JSONObject responseObj = JSON.parseObject(responseString);
                JSONObject hits = responseObj.getJSONObject("hits");
                if (hits.getIntValue("total") != 0) {
                    JSONArray innerHits = hits.getJSONArray("hits");
                    for (int i = 0; i < innerHits.size(); i++) {
                        JSONObject innerhit = innerHits.getJSONObject(i);
                        User user = innerhit.getObject("_source", User.class);
                        result.add(user);
                    }
                }
                return result;
            }
        });
        LOGGER.info("search param={} result={}", sourceBuilder, searchResult);

        // 3. 聚合查詢
        SearchSourceBuilder aggSearchSourceBuilder = new SearchSourceBuilder()
                .query(QueryBuilders.matchAllQuery())
                .aggregation(AggregationBuilders.avg("age_avg").field("age"));

        Double aggResult = restClientDemo.search(aggSearchSourceBuilder.toString(), new CallbackSearch<Double>() {

            @Override
            public Double get(String responseString) {
                LOGGER.info("responseString={}", responseString);
                JSONObject responseObj = JSON.parseObject(responseString);
                JSONObject aggregations = responseObj.getJSONObject("aggregations");
                Double result = aggregations.getJSONObject("age_avg").getDouble("value");

                return result;
            }
        });

        LOGGER.info("aggregation param={} result={}", aggSearchSourceBuilder, aggResult);
    }
}

 

Java High Level REST Client

5.0的RestClient很難用,官網似乎也發現了這個問題。從6.0開始推出的Java High Level REST Client明顯在交互易用性方面提升了很多,提供了諸如getHit等方法獲取結果,開發者可以更關注與核心數據的操作
但是Java High Level REST Client似乎犯了和Java API Client的問題,底層設置了很多6.0才有的特性,導致改版本無法用於低版本ES(包含ZSearch,目前ZSearch是5.3版本)。
 
 

Demo

 
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import com.alibaba.fastjson.JSON;

import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author yanlei
 * @version $Id: RestClientDemo.java, v 0.1 2018年05月26日 下午12:27 yanlei Exp $
 */
public class HighLevelRestClientDemo {

    private static final Logger LOGGER = LoggerFactory.getLogger(HighLevelRestClientDemo.class);

    private static final String INDEX_NAME = "pangu";

    private static final String TYPE_NAME = "normal";

    private static RestHighLevelClient client;

    static {
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials("example", "WnhmUwjU"));

        RestClientBuilder builder = RestClient.builder(
                new HttpHost("127.0.0.1", 9200, "http"))
                .setFailureListener(new RestClient.FailureListener() { // 連接失敗策略
                    @Override
                    public void onFailure(HttpHost host) {
                        LOGGER.error("init client error, host:{}", host);
                    }
                })
                .setMaxRetryTimeoutMillis(10000) // 超時時間
                .setHttpClientConfigCallback(new HttpClientConfigCallback() { // 認證
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    }
                });

        client = new RestHighLevelClient(builder);
    }

    public boolean index(String id, String jsonString) {
        IndexRequest request = new IndexRequest(
                INDEX_NAME,
                TYPE_NAME,
                id);
        request.source(jsonString, XContentType.JSON);

        IndexResponse response = null;
        try {
            response = client.index(request);
            if(response.status().getStatus() == HttpStatus.SC_CREATED || response.status().getStatus() == HttpStatus.SC_OK) {
                LOGGER.info("index success");
                return true;
            }else {
                LOGGER.error("index error : {}", response.toString());
                return false;
            }
        } catch (IOException e) {
            LOGGER.error("系統異常", e);
            return false;
        }
    }

    public <T> T search(SearchSourceBuilder sourceBuilder, CallbackSearch<T> callbackSearch) {
        SearchRequest searchRequest = new SearchRequest()
                .indices(INDEX_NAME)
                .types(TYPE_NAME)
                .source(sourceBuilder);

        SearchResponse response = null;
        try {
            response = client.search(searchRequest);
            if(response.status().getStatus() == HttpStatus.SC_OK) {
                // 提取數據
                return callbackSearch.get(response);
            }else {
                LOGGER.error("index false, error : {}", response);
                return null;
            }
        } catch (IOException e) {
            LOGGER.error("系統異常", e);
            return null;
        }
    }

    public interface CallbackSearch<T> {
        T get(SearchResponse response);
    }

    public static void main(String[] args) {
        HighLevelRestClientDemo restClientDemo = new HighLevelRestClientDemo();

        // 1. 索引數據
        User user = new User();
        user.setId("2088201805281552");
        user.setName("baby");
        user.setAge(8);
        user.setCountry("UK");
        boolean indexOK = restClientDemo.index(user.getId(), JSON.toJSONString(user));
        LOGGER.info("index param={} result={}", user, indexOK);

        // 2. 數據檢索
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
                .query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("name", "baby")))
                //.sort("age", SortOrder.DESC)
                .from(0).size(10);

        List<User> searchResult = restClientDemo.search(sourceBuilder, new CallbackSearch<List<User>>() {
            @Override
            public List<User> get(SearchResponse response) {
                List<User> result = new ArrayList<>();
                response.getHits().forEach(hit -> {
                    User user = JSON.parseObject(hit.getSourceAsString(), User.class);
                    result.add(user);
                return result;
            }
        });
        LOGGER.info("search param={} result={}", sourceBuilder, searchResult);

    }
}

 

JestClient

伴隨1.*ES誕生,底層基於HttpClient+GSON提供集群訪問與數據映射,Java High Level REST Client的很多設計也借鑒了Jest。
Jest在ES操作易用性方面與Java High Level REST Client部分伯仲,但是其多版本兼容性比后者強很多。雖然使用有龜速之稱的GSON切不能替換,但是其性能應該能滿足大部分業務場景。
在18台服務器(4核+64G內存)13億數據的數據上做性能測試,QPS達到200+時,集群出現異常,所以GSON不是首要考慮的因素。PS:200+QPS在集群沒有做冷熱數據切分與rounting下的效果。
 

DEMO

 
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.JestResult;
import io.searchbox.client.config.HttpClientConfig;
import io.searchbox.core.Index;
import io.searchbox.core.Search;
import io.searchbox.core.SearchResult;
import io.searchbox.core.SearchResult.Hit;
import io.searchbox.core.search.aggregation.MetricAggregation;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author yanlei
 * @version $Id: JestClientDemo.java, v 0.1 2018年05月23日 下午5:45 yanlei Exp $
 */
public class JestClientDemo {

    private static final Logger LOGGER = LoggerFactory.getLogger(JestClientDemo.class);

    private static final String INDEX_NAME = "pangu";

    private static final String TYPE_NAME = "normal";

    private static JestClient client;

    static {
        JestClientFactory factory = new JestClientFactory();

        // 定制化gson配置
        Gson gson = new GsonBuilder().setDateFormat("yyyyMMdd hh:mm:ss").create();

        factory.setHttpClientConfig(new HttpClientConfig
                .Builder("http://search.alipay.com:9999") // 集群地址
                .defaultCredentials("example", "WnhmUwjU") // 認證信息
                .gson(gson) // 啟用定制化gson,可使用默認
                .multiThreaded(true)
                .defaultMaxTotalConnectionPerRoute(2)
                .maxTotalConnection(10)
                .build());


        client = factory.getObject();
    }

    public boolean index(String id, Object obj) {
        Index index = new Index.Builder(obj).index(INDEX_NAME).type(TYPE_NAME).id(id).build();
        try {
            JestResult result = client.execute(index);
            if(result.isSucceeded()) {
                LOGGER.info("index success");
                return true;
            }else {
                LOGGER.error("index error : {}", result.getErrorMessage());
                return false;
            }
        } catch (IOException e) {
            LOGGER.error("系統異常", e);
            return false;
        }
    }

    public boolean index(User user) {
        Index index = new Index.Builder(user).index(INDEX_NAME).type(TYPE_NAME).build();
        try {
            JestResult result = client.execute(index);
            if(result.isSucceeded()) {
                LOGGER.info("index success");
                return true;
            }else {
                LOGGER.error("index error : {}", result.getErrorMessage());
                return false;
            }
        } catch (IOException e) {
            LOGGER.error("系統異常", e);
            return false;
        }
    }

    public <T> List<T> search(SearchSourceBuilder sourceBuilder, CallbackSearch<T> callbackSearch, Class<T> response) {
        Search search = new Search.Builder(sourceBuilder.toString())
                .addIndex(INDEX_NAME)
                .addType(TYPE_NAME)
                .build();

        SearchResult result = null;
        try {
            result = client.execute(search);
            if(result.isSucceeded()) {
                // 提取數據
                List<Hit<T, Void>> hits = result.getHits(response);
                return callbackSearch.getHits(hits);
            }else {
                LOGGER.error("index false, error : {}", result.getErrorMessage());
                return null;
            }
        } catch (IOException e) {
            LOGGER.error("系統異常", e);
            return null;
        }
    }

    public <T> T aggregation(SearchSourceBuilder sourceBuilder, CallbackAggregation<T> callbackAggregation) {
        Search search = new Search.Builder(sourceBuilder.toString())
                .addIndex(INDEX_NAME)
                .addType(TYPE_NAME)
                .build();

        SearchResult result = null;
        try {
            result = client.execute(search);
            if(result.isSucceeded()) {
                return callbackAggregation.getAgg(result.getAggregations());
            }else {
                LOGGER.error("index false, error : {}", result.getErrorMessage());
                return null;
            }
        } catch (IOException e) {
            LOGGER.error("系統異常", e);
            return null;
        }
    }

    public interface CallbackSearch<T> {
        List<T> getHits(List<Hit<T, Void>> hits);
    }

    public interface CallbackAggregation<T> {
        T getAgg(MetricAggregation metricAggregation);
    }

    public static void main(String[] args) {
        JestClientDemo demo = new JestClientDemo();

        // 1. 索引數據
        User user = new User();
        user.setId("2088201805281205");
        user.setName("nick");
        user.setAge(18);
        user.setCountry("CHINA");
        boolean indexOK = demo.index(user);
        LOGGER.info("index param={} result={}", user, indexOK);

        // 2. 數據檢索
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
                .query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("name", "nick")))
                .sort("id", SortOrder.DESC)
                .from(0).size(10);
        List<User> searchResult = demo.search(sourceBuilder, new CallbackSearch<User>() {
            @Override
            public List<User> getHits(List<Hit<User, Void>> hits) {
                List<User> userList = new ArrayList<>();
                hits.forEach(hit -> {
                    userList.add(hit.source);
                });
                return userList;
            }
        }, User.class);
        LOGGER.info("search param={} result={}", sourceBuilder, searchResult);


        // 3. 聚合查詢
        SearchSourceBuilder aggSearchSourceBuilder = new SearchSourceBuilder()
                .query(QueryBuilders.matchAllQuery())
                .aggregation(AggregationBuilders.avg("age_avg").field("age"));
        double aggResult = demo.aggregation(aggSearchSourceBuilder, metricAggregation -> {
            double avgAge = metricAggregation.getAvgAggregation("age_avg").getAvg();
            return avgAge;
        });
        LOGGER.info("aggregation param={} result={}", aggSearchSourceBuilder, aggResult);
    }
}

 

小結

JestClient兼容性由於其他兩者,Java High Level REST Client設置了默認調優參數,若版本匹配,其性能會更加優秀。Java Rest Cilent隨Java High Level REST Client推出,更名為Java Low Level REST Client了,官網也不推薦使用了。
最后補充下,RestClient雖然基於ResulFul風格的請求,但是構建QueryDSL還是一件成本比較高的事情,所以Java High Level REST Client和Jest都使用ElasticSearch的APIclient構建QueryDSL,其中的Builder模式在在兩個客戶端中都有比較好的實現,值得學習研究。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM