- Elasticsearch連接方式有兩種;分別為TCP協議與HTTP協議
最近使用es比較多,之前使用一直是使用spring封裝的spring-data-elasticsearch;關於spring-data-elasticsearch有以下幾點比較難受:
-
基於TCP協議的使用(不確定是否支持http, 公司XX雲大佬推薦使用HTTP協議,好像是官方推薦?)
-
版本對應比較惡心人
-
不好用
-
基於以上幾點,索性拋棄spring-data-elasticsearch,自己造輪子;
-
根據 官方文檔 描述,我們選擇使用RestHighLevelClient來實現es基礎查詢;
官方描述:
The Java REST Client comes in 2 flavors:
Java Low Level REST Client: the official low-level client for Elasticsearch. It allows to communicate with an Elasticsearch cluster through http. Leaves requests marshalling and responses un-marshalling to users. It is compatible with all Elasticsearch versions.
Java High Level REST Client: the official high-level client for Elasticsearch. Based on the low-level client, it exposes API specific methods and takes care of requests marshalling and responses un-marshalling.
-
提供Java Low Level REST Client 版本和 Java High Level REST Client 版本:
- Java Low Level REST Client 與所有Elasticsearch版本兼容(版本問題舒服)
- 通過HTTP協議與Elasticsearch集群進行通信(大佬推薦)
- Java High Level REST Client 是基於Java Low Level REST Client 版本實現更多高級API
-
很顯然我們選擇RestHighLevelClient
Spring整合RestHighLevelClient
-
構建ElasticsearchClient
- 查看RestHighLevelClient構造器可以發現可以使用RestClientBuilder來構建,簡單demo如下
/**
* 連接超時時間
*/
private final static int CONNECT_TIMEOUT = 5000;
/**
* 連接超時時間
*/
private final static int SOCKET_TIMEOUT = 40000;
/**
* 獲取連接的超時時間
*/
private final static int CONNECTION_REQUEST_TIMEOUT = 1000;
/**
* 最大連接數
*/
private final static int MAX_CONNECT_NUM = 100;
/**
* 最大路由連接數
*/
private final static int MAX_CONNECT_ROUTE = 100;
@Bean(name = "elasticsearchClient", destroyMethod = "close")
public RestHighLevelClient client() {
RestClientBuilder builder = RestClient.builder(new HttpHost("host", "port", "http"));
// 配置一些請求配置的參數
builder.setRequestConfigCallback(requestConfigBuilder -> {
requestConfigBuilder.setConnectTimeout(CONNECT_TIMEOUT);
requestConfigBuilder.setSocketTimeout(SOCKET_TIMEOUT);
requestConfigBuilder.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT);
return requestConfigBuilder;
});
// 配置一些httpClient的參數
builder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setMaxConnTotal(MAX_CONNECT_NUM);
httpClientBuilder.setMaxConnPerRoute(MAX_CONNECT_ROUTE);
return httpClientBuilder;
});
builder.setFailureListener(new RestClient.FailureListener(){
@Override
public void onFailure(HttpHost host) {
// TODO do something when failed
super.onFailure(host);
}
});
return new RestHighLevelClient(builder);
}
- 支持一些回調與參數的配置,具體的API可自行查看RestClientBuilder的源碼
- 配置完client后我們可以使用client造一些簡單的輪子, 如es默認查詢只可以查詢1000條數據,我們可以封裝查詢所有數據
public List<SearchHit> searchAll(SearchRequest searchRequest) {
try {
List<SearchHit> hits = new ArrayList<>(16);
int maxNum = searchRequest.source().size();
searchRequest.scroll(TimeValue.timeValueMinutes(10));
SearchResponse search = client.search(searchRequest);
hits.addAll(Arrays.asList(search.getHits().getHits()));
while (search.getHits().getHits().length == maxNum) {
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(search.getScrollId());
searchScrollRequest.scroll(TimeValue.timeValueMinutes(10));
search = client.searchScroll(searchScrollRequest);
hits.addAll(Arrays.asList(search.getHits().getHits()));
}
return hits;
} catch (IOException e) {
log.error("Get message error.", e);
return null;
}
}
- 有了以上接口,我們可以查詢一些常用數據,如以下為查詢數據的簡單使用:
BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();
boolBuilder.filter(QueryBuilders.termQuery("type", 0));
boolBuilder.filter(QueryBuilders.termsQuery("id.keyword", id));
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("createTime");
rangeQueryBuilder.gte(startTime);
rangeQueryBuilder.lte(endTime);
boolBuilder.filter(rangeQueryBuilder);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.size(9999);
sourceBuilder.fetchSource(new String[]{"field1", "field2", "field3"}, new String[]{});
sourceBuilder.query(boolBuilder);
SearchRequest searchRequest = new SearchRequest("index");
searchRequest.source(sourceBuilder);
List<SearchHit> searchHits = repository.searchAll(searchRequest);
- 具體API使用可查看官方文檔
更新於2019-10-28
IndexRequest indexRequest = new IndexRequest(index, type, id);
indexRequest.source(entityMapper.mapToString(map), Requests.INDEX_CONTENT_TYPE);
return client.index(indexRequest);
-
官方API中IndexRequest提供以下幾種source方法:
- 值得注意的是source(Map source) 與 source(Map source, XContentType contentType) 方法,對於Map的傳參,會進行類型校驗;
- 源碼如下:
public IndexRequest source(Map source, XContentType contentType) throws ElasticsearchGenerationException { try { XContentBuilder builder = XContentFactory.contentBuilder(contentType); builder.map(source); return source(builder); } catch (IOException e) { throw new ElasticsearchGenerationException("Failed to generate [" + source + "]", e); } }
-
其中builder.map中的unknownValue方法會遍歷參數進行逐一校驗:
-
源碼如下:
private XContentBuilder map(Map<String, ?> values, boolean ensureNoSelfReferences) throws IOException {
if (values == null) {
return this.nullValue();
} else {
if (ensureNoSelfReferences) {
ensureNoSelfReferences(values);
}
this.startObject();
Iterator var3 = values.entrySet().iterator();
while(var3.hasNext()) {
Entry<String, ?> value = (Entry)var3.next();
this.field((String)value.getKey());
this.unknownValue(value.getValue(), false);
}
this.endObject();
return this;
}
}
- 檢驗方法源碼
private void unknownValue(Object value, boolean ensureNoSelfReferences) throws IOException {
if (value == null) {
this.nullValue();
} else {
XContentBuilder.Writer writer = (XContentBuilder.Writer)WRITERS.get(value.getClass());
if (writer != null) {
writer.write(this, value);
} else if (value instanceof Path) {
this.value((Path)value);
} else if (value instanceof Map) {
Map<String, ?> valueMap = (Map)value;
this.map(valueMap, ensureNoSelfReferences);
} else if (value instanceof Iterable) {
this.value((Iterable)value, ensureNoSelfReferences);
} else if (value instanceof Object[]) {
this.values((Object[])value, ensureNoSelfReferences);
} else if (value instanceof ToXContent) {
this.value((ToXContent)value);
} else {
if (!(value instanceof Enum)) {
throw new IllegalArgumentException("cannot write xcontent for unknown value of type " + value.getClass());
}
this.value(Objects.toString(value));
}
}
}
- 為了避免這個坑,可以使用jsonString來規避,具體使用如下:
IndexRequest indexRequest = new IndexRequest(index, type, id);
indexRequest.source(JSON.toJSONString(map), Requests.INDEX_CONTENT_TYPE);
client.index(indexRequest);