ESJavaClient的歷史
JavaAPI Client
-
優勢:基於transport進行數據訪問,能夠使用ES集群內部的性能特性,性能相對好
-
劣勢:client版本需要和es集群版本一致,數據序列化通過java實現,es集群或jdk升級,客戶端需要伴隨升級。
ES官網最早提供的Client,spring-data-elasticsearch也基於該client開發,使用transport接口進行通信,其工作方式是將webserver當做集群中的節點,獲取集群數據節點(DataNode)並將請求路由到Node獲取數據將,返回結果在webserver內部進行結果匯總。 client需要與es集群保持相對一致性,否則會出現各種『奇怪』的異常。由於ES集群升級很快,集群升級時客戶端伴隨升級的成本高。
官網已聲明es7.0將不在支持transport client(API Client),8.0正時移除
REST Client
-
優勢:REST風格交互,符合ES設計初衷;兼容性強;
-
劣勢:性能相對API較低
ESREST基於http協議,客戶端發送請求到es的任何節點,節點會通過transport接口將請求路由到其他節點,完成數據處理后匯總並返回客戶端,客戶端負載低,。
RestFul是ES特性之一,但是直到5.0才有了自己的客戶端,6.0才有的相對好用的客戶端。在此之前JestClient作為第三方客戶端,使用非常廣泛。
本文將對Java Rest Client、Java High Level Client、Jest三種Restfull風格的客戶端做簡單的介紹,個人推薦使用JestClient,詳見后文。
Java Rest Client
三種客戶端中,感覺最『原始』的客戶端,感覺在使用HttpClient,請求參數繁多,幾乎沒有提供Mapping能力,請求結束后需要拿着json一層層解析,對於開發來說大部分情況下不需要關心"took"、"_shards"、"timed_out"這些屬性,這些更多依賴集群的穩定性,如何方便快捷的拿到_source中的數據才是首要編碼內容。
{ "took": 9, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 2, "max_score": 0.2876821, "hits": [ { "_index": "pangu", "_type": "normal", "_id": "2088201805281551", "_score": 0.2876821, "_source": { "age": 8, "country": "UK", "id": "2088201805281551", "name": "baby" } }, { "_index": "pangu", "_type": "normal", "_id": "2088201805281552", "_score": 0.2876821, "_source": { "age": 8, "country": "UK", "id": "2088201805281552", "name": "baby" } } ] } }
Demo
import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; import org.apache.http.HttpStatus; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.entity.ContentType; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.nio.entity.NStringEntity; import org.apache.http.util.EntityUtils; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortOrder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author yanlei * @version $Id: RestClientDemo.java, v 0.1 2018年05月26日 下午12:27 yanlei Exp $ */ public class RestClientDemo { private static final Logger LOGGER = LoggerFactory.getLogger(RestClientDemo.class); /** * index 名稱 */ private static final String INDEX_NAME = "pangu"; /** * type 名稱 */ private static final String TYPE_NAME = "normal"; private static RestClient restClient; static { restClient = RestClient.builder(new HttpHost("search.alipay.com", 9999, "http")) .setFailureListener(new RestClient.FailureListener() { // 連接失敗策略 @Override public void onFailure(HttpHost host) { LOGGER.error("init client error, host:{}", host); } }) .setMaxRetryTimeoutMillis(10000) // 超時時間 .setHttpClientConfigCallback(new HttpClientConfigCallback() { // 認證 @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("example", "WnhmUwjU")); return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } }) .build(); } public boolean index(String id, String jsonString) { String method = "PUT"; String endpoint = new StringBuilder().append("/").append(INDEX_NAME).append("/").append(TYPE_NAME).append("/").append(id) .toString(); LOGGER.info("method={}, endpoint={}", method, endpoint); HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON); Response response = null; try { response = restClient.performRequest(method, endpoint, Collections.emptyMap(), entity); if (response.getStatusLine().getStatusCode() == HttpStatus.SC_CREATED || response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { LOGGER.info("index success"); return true; } else { LOGGER.error("index error : {}", response.toString()); return false; } } catch (IOException e) { LOGGER.error("system error, id={}, jsonString={}", id, jsonString, e); return false; } } public <T> T search(String searchString, CallbackSearch<T> callbackSearch) { String method = "GET"; String endpoint = new StringBuilder().append("/").append(INDEX_NAME).append("/").append(TYPE_NAME).append("/").append("_search") .toString(); LOGGER.info("method={}, endpoint={}", method, endpoint); HttpEntity entity = new NStringEntity(searchString, ContentType.APPLICATION_JSON); Response response = null; try { response = restClient.performRequest(method, endpoint, Collections.emptyMap(), entity); if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { // 提取數據 String resultString = EntityUtils.toString(response.getEntity(), "UTF-8"); return callbackSearch.get(resultString); } else { LOGGER.error("index false, error : {}", response); return null; } } catch (IOException e) { LOGGER.error("system error, searchString={}", searchString, e); return null; } } public interface CallbackSearch<T> { T get(String responseString); } public static void main(String[] args) { RestClientDemo restClientDemo = new RestClientDemo(); // 1. 索引數據 User user = new User(); user.setId("2088201805281345"); user.setName("nick"); user.setAge(16); user.setCountry("USA"); boolean indexOK = restClientDemo.index(user.getId(), JSON.toJSONString(user)); LOGGER.info("index param={} result={}", user, indexOK); // 2. 數據檢索 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder() .query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("name", "nick"))) .sort("id", SortOrder.DESC) .from(0).size(10); List<User> searchResult = restClientDemo.search(sourceBuilder.toString(), new CallbackSearch<List<User>>() { @Override public List<User> get(String responseString) { LOGGER.info("responseString={}", responseString); List<User> result = new ArrayList<>(); JSONObject responseObj = JSON.parseObject(responseString); JSONObject hits = responseObj.getJSONObject("hits"); if (hits.getIntValue("total") != 0) { JSONArray innerHits = hits.getJSONArray("hits"); for (int i = 0; i < innerHits.size(); i++) { JSONObject innerhit = innerHits.getJSONObject(i); User user = innerhit.getObject("_source", User.class); result.add(user); } } return result; } }); LOGGER.info("search param={} result={}", sourceBuilder, searchResult); // 3. 聚合查詢 SearchSourceBuilder aggSearchSourceBuilder = new SearchSourceBuilder() .query(QueryBuilders.matchAllQuery()) .aggregation(AggregationBuilders.avg("age_avg").field("age")); Double aggResult = restClientDemo.search(aggSearchSourceBuilder.toString(), new CallbackSearch<Double>() { @Override public Double get(String responseString) { LOGGER.info("responseString={}", responseString); JSONObject responseObj = JSON.parseObject(responseString); JSONObject aggregations = responseObj.getJSONObject("aggregations"); Double result = aggregations.getJSONObject("age_avg").getDouble("value"); return result; } }); LOGGER.info("aggregation param={} result={}", aggSearchSourceBuilder, aggResult); } }
Java High Level REST Client
5.0的RestClient很難用,官網似乎也發現了這個問題。從6.0開始推出的Java High Level REST Client明顯在交互易用性方面提升了很多,提供了諸如getHit等方法獲取結果,開發者可以更關注與核心數據的操作
但是Java High Level REST Client似乎犯了和Java API Client的問題,底層設置了很多6.0才有的特性,導致改版本無法用於低版本ES(包含ZSearch,目前ZSearch是5.3版本)。
Demo
import java.io.IOException; import java.util.ArrayList; import java.util.List; import com.alibaba.fastjson.JSON; import org.apache.http.HttpHost; import org.apache.http.HttpStatus; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author yanlei * @version $Id: RestClientDemo.java, v 0.1 2018年05月26日 下午12:27 yanlei Exp $ */ public class HighLevelRestClientDemo { private static final Logger LOGGER = LoggerFactory.getLogger(HighLevelRestClientDemo.class); private static final String INDEX_NAME = "pangu"; private static final String TYPE_NAME = "normal"; private static RestHighLevelClient client; static { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("example", "WnhmUwjU")); RestClientBuilder builder = RestClient.builder( new HttpHost("127.0.0.1", 9200, "http")) .setFailureListener(new RestClient.FailureListener() { // 連接失敗策略 @Override public void onFailure(HttpHost host) { LOGGER.error("init client error, host:{}", host); } }) .setMaxRetryTimeoutMillis(10000) // 超時時間 .setHttpClientConfigCallback(new HttpClientConfigCallback() { // 認證 @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } }); client = new RestHighLevelClient(builder); } public boolean index(String id, String jsonString) { IndexRequest request = new IndexRequest( INDEX_NAME, TYPE_NAME, id); request.source(jsonString, XContentType.JSON); IndexResponse response = null; try { response = client.index(request); if(response.status().getStatus() == HttpStatus.SC_CREATED || response.status().getStatus() == HttpStatus.SC_OK) { LOGGER.info("index success"); return true; }else { LOGGER.error("index error : {}", response.toString()); return false; } } catch (IOException e) { LOGGER.error("系統異常", e); return false; } } public <T> T search(SearchSourceBuilder sourceBuilder, CallbackSearch<T> callbackSearch) { SearchRequest searchRequest = new SearchRequest() .indices(INDEX_NAME) .types(TYPE_NAME) .source(sourceBuilder); SearchResponse response = null; try { response = client.search(searchRequest); if(response.status().getStatus() == HttpStatus.SC_OK) { // 提取數據 return callbackSearch.get(response); }else { LOGGER.error("index false, error : {}", response); return null; } } catch (IOException e) { LOGGER.error("系統異常", e); return null; } } public interface CallbackSearch<T> { T get(SearchResponse response); } public static void main(String[] args) { HighLevelRestClientDemo restClientDemo = new HighLevelRestClientDemo(); // 1. 索引數據 User user = new User(); user.setId("2088201805281552"); user.setName("baby"); user.setAge(8); user.setCountry("UK"); boolean indexOK = restClientDemo.index(user.getId(), JSON.toJSONString(user)); LOGGER.info("index param={} result={}", user, indexOK); // 2. 數據檢索 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder() .query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("name", "baby"))) //.sort("age", SortOrder.DESC) .from(0).size(10); List<User> searchResult = restClientDemo.search(sourceBuilder, new CallbackSearch<List<User>>() { @Override public List<User> get(SearchResponse response) { List<User> result = new ArrayList<>(); response.getHits().forEach(hit -> { User user = JSON.parseObject(hit.getSourceAsString(), User.class); result.add(user); return result; } }); LOGGER.info("search param={} result={}", sourceBuilder, searchResult); } }
JestClient
伴隨1.*ES誕生,底層基於HttpClient+GSON提供集群訪問與數據映射,Java High Level REST Client的很多設計也借鑒了Jest。
Jest在ES操作易用性方面與Java High Level REST Client部分伯仲,但是其多版本兼容性比后者強很多。雖然使用有龜速之稱的GSON切不能替換,但是其性能應該能滿足大部分業務場景。
在18台服務器(4核+64G內存)13億數據的數據上做性能測試,QPS達到200+時,集群出現異常,所以GSON不是首要考慮的因素。PS:200+QPS在集群沒有做冷熱數據切分與rounting下的效果。
DEMO
import java.io.IOException; import java.util.ArrayList; import java.util.List; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import io.searchbox.client.JestClient; import io.searchbox.client.JestClientFactory; import io.searchbox.client.JestResult; import io.searchbox.client.config.HttpClientConfig; import io.searchbox.core.Index; import io.searchbox.core.Search; import io.searchbox.core.SearchResult; import io.searchbox.core.SearchResult.Hit; import io.searchbox.core.search.aggregation.MetricAggregation; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortOrder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author yanlei * @version $Id: JestClientDemo.java, v 0.1 2018年05月23日 下午5:45 yanlei Exp $ */ public class JestClientDemo { private static final Logger LOGGER = LoggerFactory.getLogger(JestClientDemo.class); private static final String INDEX_NAME = "pangu"; private static final String TYPE_NAME = "normal"; private static JestClient client; static { JestClientFactory factory = new JestClientFactory(); // 定制化gson配置 Gson gson = new GsonBuilder().setDateFormat("yyyyMMdd hh:mm:ss").create(); factory.setHttpClientConfig(new HttpClientConfig .Builder("http://search.alipay.com:9999") // 集群地址 .defaultCredentials("example", "WnhmUwjU") // 認證信息 .gson(gson) // 啟用定制化gson,可使用默認 .multiThreaded(true) .defaultMaxTotalConnectionPerRoute(2) .maxTotalConnection(10) .build()); client = factory.getObject(); } public boolean index(String id, Object obj) { Index index = new Index.Builder(obj).index(INDEX_NAME).type(TYPE_NAME).id(id).build(); try { JestResult result = client.execute(index); if(result.isSucceeded()) { LOGGER.info("index success"); return true; }else { LOGGER.error("index error : {}", result.getErrorMessage()); return false; } } catch (IOException e) { LOGGER.error("系統異常", e); return false; } } public boolean index(User user) { Index index = new Index.Builder(user).index(INDEX_NAME).type(TYPE_NAME).build(); try { JestResult result = client.execute(index); if(result.isSucceeded()) { LOGGER.info("index success"); return true; }else { LOGGER.error("index error : {}", result.getErrorMessage()); return false; } } catch (IOException e) { LOGGER.error("系統異常", e); return false; } } public <T> List<T> search(SearchSourceBuilder sourceBuilder, CallbackSearch<T> callbackSearch, Class<T> response) { Search search = new Search.Builder(sourceBuilder.toString()) .addIndex(INDEX_NAME) .addType(TYPE_NAME) .build(); SearchResult result = null; try { result = client.execute(search); if(result.isSucceeded()) { // 提取數據 List<Hit<T, Void>> hits = result.getHits(response); return callbackSearch.getHits(hits); }else { LOGGER.error("index false, error : {}", result.getErrorMessage()); return null; } } catch (IOException e) { LOGGER.error("系統異常", e); return null; } } public <T> T aggregation(SearchSourceBuilder sourceBuilder, CallbackAggregation<T> callbackAggregation) { Search search = new Search.Builder(sourceBuilder.toString()) .addIndex(INDEX_NAME) .addType(TYPE_NAME) .build(); SearchResult result = null; try { result = client.execute(search); if(result.isSucceeded()) { return callbackAggregation.getAgg(result.getAggregations()); }else { LOGGER.error("index false, error : {}", result.getErrorMessage()); return null; } } catch (IOException e) { LOGGER.error("系統異常", e); return null; } } public interface CallbackSearch<T> { List<T> getHits(List<Hit<T, Void>> hits); } public interface CallbackAggregation<T> { T getAgg(MetricAggregation metricAggregation); } public static void main(String[] args) { JestClientDemo demo = new JestClientDemo(); // 1. 索引數據 User user = new User(); user.setId("2088201805281205"); user.setName("nick"); user.setAge(18); user.setCountry("CHINA"); boolean indexOK = demo.index(user); LOGGER.info("index param={} result={}", user, indexOK); // 2. 數據檢索 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder() .query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("name", "nick"))) .sort("id", SortOrder.DESC) .from(0).size(10); List<User> searchResult = demo.search(sourceBuilder, new CallbackSearch<User>() { @Override public List<User> getHits(List<Hit<User, Void>> hits) { List<User> userList = new ArrayList<>(); hits.forEach(hit -> { userList.add(hit.source); }); return userList; } }, User.class); LOGGER.info("search param={} result={}", sourceBuilder, searchResult); // 3. 聚合查詢 SearchSourceBuilder aggSearchSourceBuilder = new SearchSourceBuilder() .query(QueryBuilders.matchAllQuery()) .aggregation(AggregationBuilders.avg("age_avg").field("age")); double aggResult = demo.aggregation(aggSearchSourceBuilder, metricAggregation -> { double avgAge = metricAggregation.getAvgAggregation("age_avg").getAvg(); return avgAge; }); LOGGER.info("aggregation param={} result={}", aggSearchSourceBuilder, aggResult); } }
小結
JestClient兼容性由於其他兩者,Java High Level REST Client設置了默認調優參數,若版本匹配,其性能會更加優秀。Java Rest Cilent隨Java High Level REST Client推出,更名為Java Low Level REST Client了,官網也不推薦使用了。
最后補充下,RestClient雖然基於ResulFul風格的請求,但是構建QueryDSL還是一件成本比較高的事情,所以Java High Level REST Client和Jest都使用ElasticSearch的APIclient構建QueryDSL,其中的Builder模式在在兩個客戶端中都有比較好的實現,值得學習研究。
