/**
* 初始化一個連接ElasticSearch的客戶端
*
* @param addresses ES服務器的Transport地址和端口的列表,多個服務器用逗號分隔,例如 localhost:9300,localhost:9300,...
* @param clusterName 集群名稱
* @param index 索引名稱,這里應該使用項目名稱
* @param username 用戶名稱
* @param password 用戶密碼
* @param type 索引類型
* @param clazz 存儲類
*/
public ESClient(String addresses, String clusterName, String index,
String username, String password, String type, Class<T> clazz) {
if (StringUtils.isBlank(addresses)) {
throw new RuntimeException("沒有給定的ES服務器地址。");
}
this.index = index;
this.type = type;
this.clazz = clazz;
// 獲得鏈接地址對象列表
List<InetSocketTransportAddress> addressList = Lists.transform(
Splitter.on(",").trimResults().omitEmptyStrings().splitToList(addresses),
new Function<String, InetSocketTransportAddress>() {
@Override
public InetSocketTransportAddress apply(String input) {
String[] addressPort = input.split(":");
String address = addressPort[0];
Integer port = Integer.parseInt(addressPort[1]);
serverHttpAddressList.add(address + ":" + 9200);
return new InetSocketTransportAddress(address, port);
}
}
);
// 建立關於ES的配置
ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder()
.put("cluster.name", clusterName)
.put("client.transport.sniff", false);
if (StringUtils.isNotBlank(username)) {
builder.put("shield.user", username + ":" + password);
}
Settings settings = builder.build();
// 生成原生客戶端
TransportClient transportClient = new TransportClient(settings);
for (InetSocketTransportAddress address : addressList) {
transportClient.addTransportAddress(address);
}
client = transportClient;
bulkProcessor = BulkProcessor.builder(
client, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
throw new RuntimeException(failure);
}
}).build();
}
/**
* 初始化連接ElasticSearch的客戶端
*
* @param client 原生客戶端
* @param index 索引名稱
* @param type 類型
* @param clazz 存儲類
*/
public ESClient(Client client, String index, String type, Class<T> clazz) {
this.client = client;
this.index = index;
this.type = type;
this.clazz = clazz;
}
/**
* 向ES發送存儲請求,將一個對象存儲到服務器。
*
* @param id 該對象的id
* @param t 存儲實例
* @return 是否存儲成功
*/
public boolean indexDocument(String id, T t) {
return indexDocument(id, type, t);
}
/**
* 向ES發送存儲請求,將一個對象存儲到服務器。
*
* @param t 存儲實例
* @return 返回存儲之后在ES服務器內生成的隨機ID
*/
public String indexDocument(T t) {
IndexResponse indexResponse = client.prepareIndex(index, type)
.setSource(toJSONString(t))
.execute()
.actionGet();
return indexResponse.getId();
}
/**
* 向ES發送存儲請求,將一個對象存儲到服務器,這個方法允許用戶手動指定該對象的存儲類型名稱
*
* @param id 對象id
* @param type 存儲類型
* @param t 存儲實例
* @return 是否存儲成功
*/
public boolean indexDocument(String id, String type, T t) {
IndexResponse indexResponse = client.prepareIndex(index, type, id)
.setSource(toJSONString(t))
.execute()
.actionGet();
return true;
}
/**
* 向ES發送批量儲存請求, 請求不會馬上提交,而是會等待到達bulk設置的閾值后進行提交.<br/>
* 最后客戶端需要調用{@link #flushBulk()}方法.
*
* @param id 對象id
* @param t 存儲實例
* @return 成功表示放入到bulk成功, 可能會拋出runtimeException
*/
public boolean indexDocumentBulk(String id, T t) {
return indexDocumentBulk(id, type, t);
}
/**
* 向ES發送批量存儲請求,將一個對象存儲到服務器,這個方法允許用戶手動指定該對象的存儲類型名稱
*
* @param id 對象id
* @param type 存儲類型
* @param t 存儲實例
* @return 成功表示放入到bulk成功, 可能會拋出runtimeException
* @see #indexDocument(String, Object)
*/
public boolean indexDocumentBulk(String id, String type, T t) {
IndexRequest indexRequest = new IndexRequest(index, type, id).source(toJSONString(t));
bulkProcessor.add(indexRequest);
return true;
}
/**
* 向ES發送批量存儲請求, 請求不會馬上提交,而是會等待到達bulk設置的閾值后進行提交.<br/>
* 最后客戶端需要調用{@link #flushBulk()}方法.
*
* @param t 存儲實例
* @return 成功表示放入到bulk成功, 可能會拋出runtimeException
*/
public boolean indexDocumentBulk(T t) {
IndexRequest indexRequest = new IndexRequest(index, type).source(toJSONString(t));
bulkProcessor.add(indexRequest);
return true;
}
public boolean indexDocumentBulk(List<T> list) {
for (T t : list) {
indexDocumentBulk(t);
}
return true;
}
/**
* 向ES發送批量存儲請求, 允許傳入一個Function, 用來從對象中獲取ID.
*
* @param list 對象列表
* @param idFunction 獲取ID
* @return 成功表示放入到bulk成功, 可能會拋出runtimeException
*/
public boolean indexDocumentBulk(List<T> list, Function<T, String> idFunction) {
for (T t : list) {
indexDocumentBulk(idFunction.apply(t), t);
}
return true;
}
/**
* 向ES發送更新文檔請求,將一個對象更新到服務器,會替換原有對應ID的數據。
*
* @param id id
* @param t 存儲對象
* @return 是否更新成功
*/
public boolean updateDocument(String id, T t) {
return updateDocument(id, type, t);
}
/**
* 向ES發送更新文檔請求,將一個對象更新到服務器,會替換原有對應ID的數據。
*
* @param id id
* @param type 存儲類型
* @param t 存儲對象
* @return 是否更新成功
*/
public boolean updateDocument(String id, String type, T t) {
client.prepareUpdate(index, type, id).setDoc(toJSONString(t))
.execute().actionGet();
return true;
}
/**
* 向ES發送批量更新請求
*
* @param id 索引ID
* @param t 存儲對象
* @return 成功表示放入到bulk成功, 可能會拋出runtimeException
*/
public boolean updateDocumentBulk(String id, T t) {
UpdateRequest updateRequest = new UpdateRequest(index, type, id).doc(toJSONString(t));
bulkProcessor.add(updateRequest);
return true;
}
/**
* 向ES發送upsert請求, 如果該document不存在將會新建這個document, 如果存在則更新.
*
* @param id id
* @param t 存儲對象
* @return 是否執行成功
*/
public boolean upsertDocument(String id, T t) {
return upsertDocument(id, type, t);
}
/**
* 向ES發送upsert請求, 如果該document不存在將會新建這個document, 如果存在則更新.
*
* @param id id
* @param type 存儲類型
* @param t 存儲對象
* @return 是否執行成功
*/
public boolean upsertDocument(String id, String type, T t) {
client.prepareUpdate(index, type, id).setDocAsUpsert(true).setDoc(toJSONString(t))
.execute().actionGet();
return true;
}
/**
* 向ES發送批量upsert的請求.
*
* @param id id
* @param t 儲存對象
* @return 是否執行成功
*/
public boolean upsertDocumentBulk(String id, T t) {
UpdateRequest updateRequest = new UpdateRequest(index, type, id)
.doc(toJSONString(t));
updateRequest.docAsUpsert(true);
bulkProcessor.add(updateRequest);
return true;
}
/**
* 向ES發送獲取指定ID文檔的請求
*
* @param id id
* @return 搜索引擎實例
* @throws Exception
*/
public T getDocument(String id) throws Exception {
try {
GetResponse getResponse = client.prepareGet(index, type, id)
.execute().actionGet();
if (getResponse.getSource() == null) {
return null;
}
JSONObject jsonObject = new JSONObject(getResponse.getSource());
T t = clazz.newInstance();
toObject(t, jsonObject);
return t;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 向ES發送刪除指定ID文檔的請求
*
* @param id id
* @return 是否刪除成功
* @throws Exception
*/
public boolean deleteDocument(String id) throws Exception {
return deleteDocument(id, type);
}
/**
* 向ES發送刪除指定ID文檔的請求
*
* @param id id
* @param type 存儲類型
* @return 是否刪除成功
* @throws Exception
*/
public boolean deleteDocument(String id, String type) throws Exception {
DeleteResponse deleteResponse = client.prepareDelete(index, type, id)
.execute().actionGet();
return deleteResponse.isFound();
}
/**
* 向ES發送搜索文檔的請求,返回分頁結果
*
* @param searchText 搜索內容
* @return 分頁結果
* @throws Exception
*/
public PageResponse<T> searchDocument(String searchText) throws Exception {
PageRequest pageRequest = WebContext.get().page();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index)
.setTypes(type)
.setQuery(QueryBuilders.matchQuery("_all", searchText))
.setFrom(pageRequest.getOffset())
.setSize(pageRequest.getLimit())
.setFetchSource(true);
return searchDocument(searchRequestBuilder);
}
/**
* 向ES發送搜索文檔的請求,返回列表結果
*
* @param searchText 搜索內容
* @param start 起始位置
* @param size 獲取數據大小
* @return 返回數據列表
* @throws Exception
*/
public List<T> searchDocument(String searchText, int start, int size) throws Exception {
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index)
.setTypes(type)
.setQuery(QueryBuilders.matchQuery("_all", searchText))
.setFrom(start)
.setSize(size)
.setFetchSource(true);
PageResponse<T> pageResponse = searchDocument(searchRequestBuilder);
return pageResponse.getItemList();
}
/**
* 向ES發送搜索文檔的請求,返回列表結果
*
* @param searchText 搜索內容
* @param type 類型
* @param start 起始位置
* @param size 數據大小
* @return 返回數據列表
* @throws Exception
*/
public List<T> searchDocument(String searchText, String type, int start, int size) throws Exception {
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index)
.setTypes(type)
.setQuery(QueryBuilders.matchQuery("_all", searchText))
.setFrom(start)
.setSize(size)
.setFetchSource(true);
PageResponse<T> pageResponse = searchDocument(searchRequestBuilder);
return pageResponse.getItemList();
}
/**
* 向ES發送搜索文檔的請求,返回分頁結果
*
* @param searchRequestBuilder 搜索構造器
* @return 分頁結果
* @throws Exception
*/
public PageResponse<T> searchDocument(SearchRequestBuilder searchRequestBuilder) throws Exception {
SearchResponse searchResponse = search(searchRequestBuilder);
return searchResponseToPageResponse(searchResponse);
}
/**
* 獲得scrollId對應的數據. 請查看{@link #getScrollId(SearchRequestBuilder, int, int)}.<br/>
* 可以反復調用該方法, 直到返回數據為0.
*
* @param scrollId 給定的scrollId
* @param keepSeconds scroll數據保留時間
* @return 分頁結果
* @throws Exception
*/
public PageResponse<T> scrollSearchDocument(String scrollId, int keepSeconds) throws Exception {
return searchResponseToPageResponse(scrollSearch(scrollId, keepSeconds));
}
/**
* 向ES發送搜索請求,然后直接返回原始結果。
*
* @param searchRequestBuilder 搜索構造器
* @return 返回結果
*/
public SearchResponse search(SearchRequestBuilder searchRequestBuilder) {
return searchRequestBuilder.setTypes(type).execute().actionGet();
}
/**
* 向ES發送搜索請求,然后直接返回原始結果。
*
* @param searchRequestBuilder 搜索構造器
* @param type 類型
* @return 返回結果
*/
@Deprecated
public SearchResponse search(SearchRequestBuilder searchRequestBuilder, String type) {
return searchRequestBuilder.setTypes(type).execute().actionGet();
}
/**
* 通過scrollId獲得數據.請查看{@link #getScrollId(SearchRequestBuilder, int, int)}.<br/>
* 可以反復調用該方法, 直到返回數據為0.
*
* @param scrollId 給定的scrollId
* @param keepSeconds scroll繼續保留的時間, 建議60秒
* @return 返回獲取的數據
*/
public SearchResponse scrollSearch(String scrollId, int keepSeconds) {
return client.prepareSearchScroll(scrollId).setScroll(new TimeValue(keepSeconds * 1000))
.execute().actionGet();
}
/**
* 提供搜索構造器來獲得搜索scrollId, 這個scrollId用作{@link #scrollSearch(String, int)}
* 和{@link #scrollSearchDocument(String, int)}的參數. <br/>
* 當需要獲取大量數據的時候, 請使用scrollSearch來進行.
*
* @param searchRequestBuilder 搜索構造器
* @param keepSeconds scroll搜索保留時間, 建議60秒
* @param sizePerShard 每次每個分片獲取的尺寸
* @return 返回scrollId, 用於scrollSearch方法.
*/
public String getScrollId(SearchRequestBuilder searchRequestBuilder, int keepSeconds, int sizePerShard) {
SearchResponse searchResponse = searchRequestBuilder.setSearchType(SearchType.SCAN)
.setScroll(new TimeValue(keepSeconds * 1000))
.setSize(sizePerShard).execute().actionGet();
return searchResponse.getScrollId();
}
/**
* 返回搜索指定內容后,總共ES找到匹配的數據量。
*
* @param searchText 搜索內容
* @return 搜索結果數據量
*/
@Deprecated
public long countSearchResult(String searchText) {
CountRequestBuilder countRequestBuilder = client.prepareCount(index)
.setTypes(type)
.setQuery(QueryBuilders.matchQuery("_all", searchText));
return countSearchResult(countRequestBuilder);
}
/**
* 返回搜索指定內容后,總共ES找到匹配的數據量。
*
* @param searchText 搜索內容
* @param type 類型
* @return 搜索結果數據量
*/
@Deprecated
public long countSearchResult(String searchText, String type) {
CountRequestBuilder countRequestBuilder = client.prepareCount(index)
.setTypes(type)
.setQuery(QueryBuilders.matchQuery("_all", searchText));
return countSearchResult(countRequestBuilder);
}
/**
* 返回搜索指定內容后,總共ES找到匹配的數據量。
*
* @param countRequestBuilder 計數請求構造器實例
* @return 搜索結果數據量
* @see #prepareCount()
*/
@Deprecated
public long countSearchResult(CountRequestBuilder countRequestBuilder) {
return countRequestBuilder.execute().actionGet().getCount();
}
/**
* 用默認的分詞器進行文本分詞。
*
* @param docText 給定的文本
* @param order 是否使用排序,如果使用排序,則相同分詞會被合並,並且出現次數最高的排在返回列表最頭部。
* @return 分詞器將文本分詞之后的詞語列表
*/
public List<String> analyzeDocument(String docText, boolean order) {
List<AnalyzeToken> tokenList = analyzeDocument(docText, DEFAULT_ANALYZER);
if (order) {
// 如果是使用排序,按照分詞出現次數進行排序,並且會合並相同的分詞。
// 構造分詞Map,key為分詞,value為出現次數。
Map<String, Integer> tokenMap = Maps.newHashMap();
for (AnalyzeToken token : tokenList) {
if (tokenMap.get(token.getTerm()) == null) {
tokenMap.put(token.getTerm(), 1);
} else {
tokenMap.put(token.getTerm(), tokenMap.get(token.getTerm()) + 1);
}
}
// 將分詞Map進行排序
List<Map.Entry<String, Integer>> tokenSortList = Ordering.from(new Comparator<Map.Entry<String, Integer>>() {
@Override
public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
return o2.getValue().compareTo(o1.getValue());
}
}).sortedCopy(tokenMap.entrySet());
// 返回分詞列表。
return Lists.transform(tokenSortList, new Function<Map.Entry<String, Integer>, String>() {
@Override
public String apply(Map.Entry<String, Integer> input) {
return input.getKey();
}
});
} else {
// 返回所有分詞結果
return Lists.transform(tokenList, new Function<AnalyzeToken, String>() {
@Override
public String apply(AnalyzeToken input) {
return input.getTerm();
}
});
}
}
/**
* 用指定分詞器來分析給定的文本
*
* @param docText 給定的文本
* @param analyzer 指定的分析器
* @return 分詞器將文本分詞之后的詞語列表
*/
public List<AnalyzeToken> analyzeDocument(String docText, String analyzer) {
AnalyzeResponse analyzeResponse = client.admin().indices().prepareAnalyze(index, docText)
.setAnalyzer(analyzer)
.execute().actionGet();
return analyzeResponse.getTokens();
}
/**
* 獲得一個搜索請求構造器的實例,通過這個實例,可以進行查詢相關操作。<br/>
* 使用這個方法{@link ESClient#searchDocument(SearchRequestBuilder)}進行查詢。
* <pre>
* prepareSearch("telepathy")
* .setTypes("article")
* .setSearchType(SearchType.QUERY_THEN_FETCH)
* .setQuery(QueryBuilders.matchQuery("_all", searchText))
* .setFrom(pageRequest.getLimit() * (pageRequest.getPage() - 1))
* .setSize(pageRequest.getLimit())
* .setExplain(true)
* .addHighlightedField("title", 100, 1)
* .setFetchSource(new String[]{}, new String[]{});
* </pre>
*
* @return 搜索請求構造器實例
*/
public SearchRequestBuilder prepareSearch() {
return client.prepareSearch(index);
}
/**
* 獲得一個計數請求構造器的實例,通過這個實例可以進行查詢選項的構造。
*
* @return 計數請求構造器實例
* @see #prepareSearch()
*/
@Deprecated
public CountRequestBuilder prepareCount() {
return client.prepareCount(index);
}
/**
* 獲得一個Document的term vector (doc frequency, positions, offsets)
*
* @return TermVectorResponse
* @see #termVector()
*/
public ActionFuture<TermVectorResponse> termVector(TermVectorRequest request) {
return client.termVector(request);
}
/**
* 將SQL轉換成ES的JSON查詢對象.
*
* @param sql 給定的SQL
* @return JSON對象
*/
public JSONObject convertSqlToJSON(String sql) {
if (sqlJsonMap.get(sql) != null) {
return sqlJsonMap.get(sql);
}
List<String> addresses = Lists.newArrayList(serverHttpAddressList);
while (addresses.size() > 0) {
String sqlPluginUrl = "http://" + addresses.remove(RandomUtils.nextInt(0, addresses.size())) + "/_sql/_explain";
try {
JSONObject json = JSONObject.parseObject(
MucangHttpClient.getDefault().httpPostBody(sqlPluginUrl, sql, "text/plain")
);
sqlJsonMap.put(sql, json);
return json;
} catch (Exception e) {
LOG.error("調用elasticsearch-sql插件時遇到錯誤, 原因:{}", e);
}
}
throw new RuntimeException("調用elasticsearch-sql插件多次失敗, 請檢查服務器或者插件功能是否正常.");
}
/**
* 用SQL語句進行搜索. 使用${keyName}的方式代表需要替換的字符串(需要替換的字符串請用雙引號或者單引號引起來, 否則插件不能解析)<br/>
* 例如: select * from table where mediaId="${mediaId}"<br/>
*
* @param sql 指定的SQL
* @param kvPairs 替換鍵值對
* @return 搜索結果
* @throws Exception
*/
public SearchResponse searchSql(String sql, final Map<String, String> kvPairs) throws Exception {
JSONObject jsonQuery = convertSqlToJSON(sql);
PropertyPlaceholderHelper propertyPlaceholderHelper = new PropertyPlaceholderHelper("${", "}");
String queryString = propertyPlaceholderHelper.replacePlaceholders(
jsonQuery.toJSONString(),
new PropertyPlaceholderHelper.PlaceholderResolver() {
@Override
public String resolvePlaceholder(String placeholderName) {
if (StringUtils.isBlank(kvPairs.get(placeholderName))) {
return "";
} else {
return kvPairs.get(placeholderName);
}
}
});
SearchRequestBuilder searchRequestBuilder = prepareSearch()
.setSource(XContentFactory.jsonBuilder().value(JSONObject.parseObject(queryString)));
return search(searchRequestBuilder);
}
/**
* 將給定的對象轉換成JSON字符串,如果有特殊需求,可以覆蓋該方法。
*
* @param t 給定的對象
* @return JSON字符串
*/
public String toJSONString(T t) {
return JSON.toJSONString(t);
}
/**
* 將給定的Map里面的值注入到目標對象。如果有特殊需求,可以覆蓋該方法。
*
* @param t 目標對象
* @param map 給定的map
* @throws Exception
*/
public void toObject(T t, Map<String, ?> map) throws Exception {
dozerBeanMapper.map(map, t);
}
/**
* 將BulkProcessor的緩沖內容進行立即提交.
*/
public void flushBulk() {
this.bulkProcessor.flush();
}
public BulkProcessor getBulkProcessor() {
return bulkProcessor;
}
public void setBulkProcessor(BulkProcessor bulkProcessor) {
this.bulkProcessor = bulkProcessor;
}
public PageResponse<T> searchResponseToPageResponse(SearchResponse searchResponse) throws Exception {
PageResponse<T> pageResponse = new PageResponse<>();
for (SearchHit searchHit : searchResponse.getHits().getHits()) {
// 將結果實例化成對應的類型實例
T t = this.clazz.newInstance();
Map<String, Object> hitMap;
if (searchHit.getSource() != null) {
hitMap = searchHit.getSource();
} else {
hitMap = Maps.newHashMap(Maps.transformValues(searchHit.getFields(),
new Function<SearchHitField, Object>() {
@Override
public Object apply(SearchHitField input) {
return input.getValues();
}
}
));
}
for (HighlightField highlightField : searchHit.getHighlightFields().values()) {
hitMap.put(highlightField.getName(),
StringUtils.join(highlightField.getFragments(), "..."));
}
// 將數據轉換成對應的實例
toObject(t, hitMap);
pageResponse.getItemList().add(t);
}
pageResponse.setTotal(searchResponse.getHits().getTotalHits());
return pageResponse;
}
/**
* 關閉native的鏈接.
*/
public void close() {
IOUtils.closeQuietly(bulkProcessor);
this.client.close();
}
}