Elasticsearch 的API 分為 REST Client API(http請求形式)以及 transportClient API兩種。相比來說transportClient API效率更高,transportClient 是通過Elasticsearch內部RPC的形式進行請求的,連接可以是一個長連接,相當於是把客戶端的請求當成
Elasticsearch 集群的一個節點,當然 REST Client API 也支持http keepAlive形式的長連接,只是非內部RPC形式。但是從Elasticsearch 7 后就會移除transportClient 。主要原因是transportClient 難以向下兼容版本。
本文中所有的講解和操作都是基於jdk 1.8 和elasticsearch 6.2.4版本。
備注:本文參考了很多Elasticsearch 的官方文檔以及部l網絡資料做的綜合整理。 本人github 參考代碼:https://github.com/597365581/bigdata_tools/tree/master/yongqing-bigdata-tools/yongqing-elasticsearch-tool
一、High REST Client
High Client 基於 Low Client, 主要目的是暴露一些 API,這些 API 可以接受請求對象為參數,返回響應對象,而對請求和響應細節的處理都是由 client 自動完成的。
API 在調用時都可以是同步或者異步兩種形式
同步 API 會導致阻塞,一直等待數據返回
異步 API 在命名上會加上 async 后綴,需要有一個 listener 作為參數,等這個請求返回結果或者發生錯誤時,這個 listener 就會被調用,listener主要是解決自動回調的問題,有點像安卓 開發里面的listener監聽回調。
Elasticsearch REST APi 官方 地址:https://www.elastic.co/guide/en/elasticsearch/reference/6.2/index.html
Maven 依賴
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.2.4</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.2.4</version>
</dependency>
client初始化:
RestHighLevelClient 實例依賴 REST low-level client builder
public class ElasticSearchClient {
private String[] hostsAndPorts;
public ElasticSearchClient(String[] hostsAndPorts) {
this.hostsAndPorts = hostsAndPorts;
}
public RestHighLevelClient getClient() { RestHighLevelClient client = null; List<HttpHost> httpHosts = new ArrayList<HttpHost>(); if (hostsAndPorts.length > 0) { for (String hostsAndPort : hostsAndPorts) { String[] hp = hostsAndPort.split(":"); httpHosts.add(new HttpHost(hp[0], Integer.valueOf(hp[1]), "http")); } client = new RestHighLevelClient( RestClient.builder(httpHosts.toArray(new HttpHost[0]))); } else { client = new RestHighLevelClient( RestClient.builder(new HttpHost("127.0.0.1", 9200, "http"))); } return client; }
}
文檔 API(High level rest 客戶端支持下面的 文檔(Document) API):
- 單文檔 API:
- index API
- Get API
- Delete API
- Update API
- 多文檔 API:
- Bulk API
- Multi-Get API
1、Index API:
IndexRequest:
封裝好的參考方法:
private IndexRequest getIndexRequest(String index, String indexType, String docId, Map<String, Object> dataMap) { IndexRequest indexRequest = null; if (null == index || null == indexType) { throw new ElasticsearchException("index or indexType must not be null"); } if (null == docId) { indexRequest = new IndexRequest(index, indexType); } else { indexRequest = new IndexRequest(index, indexType, docId); } return indexRequest; } /** * 同步執行索引 * * @param index * @param indexType * @param docId * @param dataMap * @throws IOException */ public IndexResponse execIndex(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException { return getClient().index(getIndexRequest(index, indexType, docId, dataMap).source(dataMap)); } /** * 異步執行 * * @param index * @param indexType * @param docId * @param dataMap * @param indexResponseActionListener * @throws IOException */ public void asyncExecIndex(String index, String indexType, String docId, Map<String, Object> dataMap, ActionListener<IndexResponse> indexResponseActionListener) throws IOException { getClient().indexAsync(getIndexRequest(index, indexType, docId, dataMap).source(dataMap), indexResponseActionListener); }
API解釋:
IndexRequest request = new IndexRequest( "posts", // 索引 Index "doc", // Type "1"); // 文檔 Document Id String jsonString = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}"; request.source(jsonString, XContentType.JSON); // 文檔源格式為 json string
Document Source
document source 可以是下面的格式
本文作者:張永清,轉載請注明出處:Elasticsearch Java API 很全的整理
Map類型的輸入:
Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("user", "kimchy"); jsonMap.put("postDate", new Date()); jsonMap.put("message", "trying out Elasticsearch"); IndexRequest indexRequest = new IndexRequest("posts", "doc", "1") .source(jsonMap); // 會自動將 Map 轉換為 JSON 格式
XContentBuilder : 這是 Document Source 提供的幫助類,專門用來產生 json 格式的數據:
XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); { builder.field("user", "kimchy"); builder.timeField("postDate", new Date()); builder.field("message", "trying out Elasticsearch"); } builder.endObject(); IndexRequest indexRequest = new IndexRequest("posts", "doc", "1") .source(builder);
Object 鍵對:
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1") .source("user", "kimchy", "postDate", new Date(), "message", "trying out Elasticsearch");
同步索引:
IndexResponse indexResponse = client.index(request);
異步索引:異步執行函數需要添加 listener, 而對於 index 而言,這個 listener 的類型就是 ActionListener
client.indexAsync(request, listener);
異步方法執行后會立刻返回,在索引操作執行完成后,ActionListener 就會被回調:
執行成功,調用 onResponse 函數
執行失敗,調用 onFailure 函數
ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() { @Override public void onResponse(IndexResponse indexResponse) { } @Override public void onFailure(Exception e) { } };
IndexResponse:
不管是同步回調還是異步回調,如果調用成功,都會返回 IndexRespose 對象。
String index = indexResponse.getIndex(); String type = indexResponse.getType(); String id = indexResponse.getId(); long version = indexResponse.getVersion(); if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) { // 文檔第一次創建 } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) { // 文檔之前已存在,當前是重寫 } ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { // 成功的分片數量少於總分片數量 } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { String reason = failure.reason(); // 處理潛在的失敗信息 } }
在索引時有版本沖突的話,會拋出 ElasticsearchException
IndexRequest request = new IndexRequest("posts", "doc", "1") .source("field", "value") .version(1); // 這里是文檔版本號 try { IndexResponse response = client.index(request); } catch(ElasticsearchException e) { if (e.status() == RestStatus.CONFLICT) { // 沖突了 } }
如果將 opType 設置為 create, 而且如果索引的文檔與已存在的文檔在 index, type 和 id 上均相同,也會拋出沖突異常。
IndexRequest request = new IndexRequest("posts", "doc", "1") .source("field", "value") .opType(DocWriteRequest.OpType.CREATE); try { IndexResponse response = client.index(request); } catch(ElasticsearchException e) { if (e.status() == RestStatus.CONFLICT) { } }
2、GET API
GET 請求
每個 GET 請求都必須需傳入下面 3 個參數:
- Index
- Type
- Document id
GetRequest getRequest = new GetRequest( "posts", "doc", "1");
可選參數
下面的參數都是可選的, 里面的選項並不完整,如要獲取完整的屬性,請參考 官方文檔
不獲取源數據,默認是獲取的
request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
配置返回數據中包含指定字段
String[] includes = new String[]{"message", "*Date"}; String[] excludes = Strings.EMPTY_ARRAY; FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); request.fetchSourceContext(fetchSourceContext);
配置返回數據中排除指定字段
String[] includes = Strings.EMPTY_ARRAY; String[] excludes = new String[]{"message"}; FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); request.fetchSourceContext(fetchSourceContext);
實時 默認為 true
request.realtime(false);
版本
request.version(2);
版本類型
request.versionType(VersionType.EXTERNAL);
同步執行
GetResponse getResponse = client.get(getRequest);
異步執行
此部分與 index 相似, 只有一點不同, 返回類型為 GetResponse
Get Response
返回的 GetResponse 對象包含要請求的文檔數據(包含元數據和字段)
String index = getResponse.getIndex(); String type = getResponse.getType(); String id = getResponse.getId(); if (getResponse.isExists()) { long version = getResponse.getVersion(); String sourceAsString = getResponse.getSourceAsString(); // string 形式 Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); // map byte[] sourceAsBytes = getResponse.getSourceAsBytes(); // 字節形式 } else { // 沒有發現請求的文檔 }
在請求中如果包含特定的文檔版本,如果與已存在的文檔版本不匹配, 就會出現沖突
try { GetRequest request = new GetRequest("posts", "doc", "1").version(2); GetResponse getResponse = client.get(request); } catch (ElasticsearchException exception) { if (exception.status() == RestStatus.CONFLICT) { // 版本沖突 } }
封裝好的參考方法: /** * @param index * @param indexType * @param docId * @param includes 返回需要包含的字段,可以傳入空 * @param excludes 返回需要不包含的字段,可以傳入為空 * @param excludes version * @param excludes versionType * @return * @throws IOException */ public GetResponse getRequest(String index, String indexType, String docId, String[] includes, String[] excludes, Integer version, VersionType versionType) throws IOException { if (null == includes || includes.length == 0) { includes = Strings.EMPTY_ARRAY; } if (null == excludes || excludes.length == 0) { excludes = Strings.EMPTY_ARRAY; } GetRequest getRequest = new GetRequest(index, indexType, docId); FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); getRequest.realtime(true); if (null != version) { getRequest.version(version); } if (null != versionType) { getRequest.versionType(versionType); } return getClient().get(getRequest.fetchSourceContext(fetchSourceContext)); } /** * @param index * @param indexType * @param docId * @param includes * @param excludes * @return * @throws IOException */ public GetResponse getRequest(String index, String indexType, String docId, String[] includes, String[] excludes) throws IOException { return getRequest(index, indexType, docId, includes, excludes, null, null); } /** * @param index * @param indexType * @param docId * @return * @throws IOException */ public GetResponse getRequest(String index, String indexType, String docId) throws IOException { GetRequest getRequest = new GetRequest(index, indexType, docId); return getClient().get(getRequest); }
3、Exists API
如果文檔存在 Exists API 返回 true, 否則返回 fasle。
Exists Request
GetRequest 用法和 Get API 差不多,兩個對象的可選參數是相同的。由於 exists() 方法只返回 true 或者 false, 建議將獲取 _source 以及任何存儲字段的值關閉,盡量使請求輕量級。
GetRequest getRequest = new GetRequest( "posts", // Index "doc", // Type "1"); // Document id getRequest.fetchSourceContext(new FetchSourceContext(false)); // 禁用 _source 字段 getRequest.storedFields("_none_"); // 禁止存儲任何字段
同步請求
boolean exists = client.exists(getRequest);
異步請求
異步請求與 Index API 相似,此處不贅述,只粘貼代碼。如需詳細了解,請參閱官方地址
ActionListener<Boolean> listener = new ActionListener<Boolean>() { @Override public void onResponse(Boolean exists) { } @Override public void onFailure(Exception e) { } }; client.existsAsync(getRequest, listener);
封裝的參考方法:
/** * @param index * @param indexType * @param docId * @return * @throws IOException */ public Boolean existDoc(String index, String indexType, String docId) throws IOException { GetRequest getRequest = new GetRequest(index, indexType, docId); getRequest.fetchSourceContext(new FetchSourceContext(false)); getRequest.storedFields("_none_"); return getClient().exists(getRequest); }
4、Delete API
Delete Request
DeleteRequest 必須傳入下面參數
DeleteRequest request = new DeleteRequest( "posts", // index "doc", // doc "1"); // document id
可選參數
超時時間
request.timeout(TimeValue.timeValueMinutes(2));
request.timeout("2m");
刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.setRefreshPolicy("wait_for");
版本
request.version(2);
版本類型
request.versionType(VersionType.EXTERNAL);
同步執行
DeleteResponse deleteResponse = client.delete(request);
異步執行
ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() { @Override public void onResponse(DeleteResponse deleteResponse) { } @Override public void onFailure(Exception e) { } };
client.deleteAsync(request, listener);
Delete Response
DeleteResponse 可以檢索執行操作的信息
String index = deleteResponse.getIndex(); String type = deleteResponse.getType(); String id = deleteResponse.getId(); long version = deleteResponse.getVersion(); ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { // 成功分片數目小於總分片 } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { String reason = failure.reason(); // 處理潛在失敗 } }
也可以來檢查文檔是否存在
DeleteRequest request = new DeleteRequest("posts", "doc", "does_not_exist"); DeleteResponse deleteResponse = client.delete(request); if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { // 文檔不存在 } 版本沖突時也會拋出 `ElasticsearchException try { DeleteRequest request = new DeleteRequest("posts", "doc", "1").version(2); DeleteResponse deleteResponse = client.delete(request); } catch (ElasticsearchException exception) { if (exception.status() == RestStatus.CONFLICT) { // 版本沖突 } }
封裝好的參考方法:
本文作者:張永清,轉載請注明出處:Elasticsearch Java API 很全的整理
/** * @param index * @param indexType * @param docId * @param timeValue * @param refreshPolicy * @param version * @param versionType * @return * @throws IOException */ public DeleteResponse deleteDoc(String index, String indexType, String docId, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy, Integer version, VersionType versionType) throws IOException { DeleteRequest deleteRequest = new DeleteRequest(index, indexType, docId); if (null != timeValue) { deleteRequest.timeout(timeValue); } if (null != refreshPolicy) { deleteRequest.setRefreshPolicy(refreshPolicy); } if (null != version) { deleteRequest.version(version); } if (null != versionType) { deleteRequest.versionType(versionType); } return getClient().delete(deleteRequest); } /** * @param index * @param indexType * @param docId * @return * @throws IOException */ public DeleteResponse deleteDoc(String index, String indexType, String docId) throws IOException { return deleteDoc(index, indexType, docId, null, null, null, null); }
5、Update API
Update Request
UpdateRequest 的必需參數如下
UpdateRequest request = new UpdateRequest( "posts", // Index "doc", // 類型 "1"); // 文檔 Id
使用腳本更新
部分文檔更新:
在更新部分文檔時,已存在文檔與部分文檔會合並。
部分文檔可以有以下形式:
JSON 格式:
UpdateRequest request = new UpdateRequest("posts", "doc", "1"); String jsonString = "{" + "\"updated\":\"2017-01-01\"," + "\"reason\":\"daily update\"" + "}"; request.doc(jsonString, XContentType.JSON);
Map 格式:
Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("updated", new Date()); jsonMap.put("reason", "daily update"); UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc(jsonMap);
XContentBuilder 對象:
XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); { builder.timeField("updated", new Date()); builder.field("reason", "daily update"); } builder.endObject(); UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc(builder); Object key-pairs UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc("updated", new Date(), "reason", "daily update");
Upserts:如果文檔不存在,可以使用 upserts 方法將文檔以新文檔的方式創建。
UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc("updated", new Date(), "reason", "daily update");
upserts 方法支持的文檔格式與 update 方法相同。
可選參數:
超時時間
request.timeout(TimeValue.timeValueSeconds(1));
request.timeout("1s");
刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.setRefreshPolicy("wait_for");
沖突后重試次數
request.retryOnConflict(3);
獲取數據源,默認是開啟的
request.fetchSource(true);
包括特定字段
String[] includes = new String[]{"updated", "r*"}; String[] excludes = Strings.EMPTY_ARRAY; request.fetchSource(new FetchSourceContext(true, includes, excludes));
排除特定字段
String[] includes = Strings.EMPTY_ARRAY; String[] excludes = new String[]{"updated"}; request.fetchSource(new FetchSourceContext(true, includes, excludes));
指定版本
request.version(2);
禁用 noop detection
request.scriptedUpsert(true);
設置如果更新的文檔不存在,就必須要創建一個
request.docAsUpsert(true);
同步執行
UpdateResponse updateResponse = client.update(request);
異步執行
ActionListener<UpdateResponse> listener = new ActionListener<UpdateResponse>() { @Override public void onResponse(UpdateResponse updateResponse) { } @Override public void onFailure(Exception e) { } }; client.updateAsync(request, listener);
Update Response
String index = updateResponse.getIndex(); String type = updateResponse.getType(); String id = updateResponse.getId(); long version = updateResponse.getVersion(); if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) { // 文檔已創建 } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) { // 文檔已更新 } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) { // 文檔已刪除 } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) { // 文檔不受更新的影響 }
如果在 UpdateRequest 中使能了獲取源數據,響應中則包含了更新后的源文檔信息。
GetResult result = updateResponse.getGetResult(); if (result.isExists()) { String sourceAsString = result.sourceAsString(); // 將獲取的文檔以 string 格式輸出 Map<String, Object> sourceAsMap = result.sourceAsMap(); // 以 Map 格式輸出 byte[] sourceAsBytes = result.source(); // 字節形式 } else { // 默認情況下,不會返回文檔源數據 }
也可以檢測是否分片失敗
ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { // 成功的分片數量小於總分片數量 } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { String reason = failure.reason(); // 得到分片失敗的原因 } }
如果在執行 UpdateRequest 時,文檔不存在,響應中會包含 404 狀態碼,而且會拋出 ElasticsearchException 。
UpdateRequest request = new UpdateRequest("posts", "type", "does_not_exist") .doc("field", "value"); try { UpdateResponse updateResponse = client.update(request); } catch (ElasticsearchException e) { if (e.status() == RestStatus.NOT_FOUND) { // 處理文檔不存在的情況 } }
如果版本沖突,也會拋出 ElasticsearchException
UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc("field", "value") .version(1); try { UpdateResponse updateResponse = client.update(request); } catch(ElasticsearchException e) { if (e.status() == RestStatus.CONFLICT) { // 處理版本沖突的情況 } }
封裝好的參考方法:
/** * @param index * @param indexType * @param docId * @param dataMap * @param timeValue * @param refreshPolicy * @param version * @param versionType * @param docAsUpsert * @param includes * @param excludes * @return * @throws IOException */ public UpdateResponse updateDoc(String index, String indexType, String docId, Map<String, Object> dataMap, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy, Integer version, VersionType versionType, Boolean docAsUpsert, String[] includes, String[] excludes) throws IOException { UpdateRequest updateRequest = new UpdateRequest(index, indexType, docId); updateRequest.doc(dataMap); if (null != timeValue) { updateRequest.timeout(timeValue); } if (null != refreshPolicy) { updateRequest.setRefreshPolicy(refreshPolicy); } if (null != version) { updateRequest.version(version); } if (null != versionType) { updateRequest.versionType(versionType); } updateRequest.docAsUpsert(docAsUpsert); //沖突時重試的次數 updateRequest.retryOnConflict(3); if (null == includes && null == excludes) { return getClient().update(updateRequest); } else { if (null == includes || includes.length == 0) { includes = Strings.EMPTY_ARRAY; } if (null == excludes || excludes.length == 0) { excludes = Strings.EMPTY_ARRAY; } return getClient().update(updateRequest.fetchSource(new FetchSourceContext(true, includes, excludes))); } } /** * 更新時不存在就插入 * * @param index * @param indexType * @param docId * @param dataMap * @return * @throws IOException */ public UpdateResponse upDdateocAsUpsert(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException { return updateDoc(index, indexType, docId, dataMap, null, null, null, null, true, null, null); } /** * 存在才更新 * * @param index * @param indexType * @param docId * @param dataMap * @return * @throws IOException */ public UpdateResponse updateDoc(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException { return updateDoc(index, indexType, docId, dataMap, null, null, null, null, false, null, null); }
6、Bulk API 批量處理
批量請求
使用 BulkRequest 可以在一次請求中執行多個索引,更新和刪除的操作。
BulkRequest request = new BulkRequest(); request.add(new IndexRequest("posts", "doc", "1") .source(XContentType.JSON,"field", "foo")); // 將第一個 IndexRequest 添加到批量請求中 request.add(new IndexRequest("posts", "doc", "2") .source(XContentType.JSON,"field", "bar")); // 第二個 request.add(new IndexRequest("posts", "doc", "3") .source(XContentType.JSON,"field", "baz")); // 第三個
在同一個 BulkRequest 也可以添加不同的操作類型
BulkRequest request = new BulkRequest(); request.add(new DeleteRequest("posts", "doc", "3")); request.add(new UpdateRequest("posts", "doc", "2") .doc(XContentType.JSON,"other", "test")); request.add(new IndexRequest("posts", "doc", "4") .source(XContentType.JSON,"field", "baz"));
可選參數
超時時間
request.timeout(TimeValue.timeValueMinutes(2));
request.timeout("2m");
刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.setRefreshPolicy("wait_for");
設置在批量操作前必須有幾個分片處於激活狀態
request.waitForActiveShards(2); request.waitForActiveShards(ActiveShardCount.ALL); // 全部分片都處於激活狀態 request.waitForActiveShards(ActiveShardCount.DEFAULT); // 默認 request.waitForActiveShards(ActiveShardCount.ONE); // 一個
同步請求
BulkResponse bulkResponse = client.bulk(request);
異步請求
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() { @Override public void onResponse(BulkResponse bulkResponse) { } @Override public void onFailure(Exception e) { } }; client.bulkAsync(request, listener);
Bulk Response
BulkResponse 中包含執行操作后的信息,並允許對每個操作結果迭代。
for (BulkItemResponse bulkItemResponse : bulkResponse) { // 遍歷所有的操作結果 DocWriteResponse itemResponse = bulkItemResponse.getResponse(); // 獲取操作結果的響應,可以是 IndexResponse, UpdateResponse or DeleteResponse, 它們都可以慚怍是 DocWriteResponse 實例 if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { IndexResponse indexResponse = (IndexResponse) itemResponse; // index 操作后的響應結果 } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { UpdateResponse updateResponse = (UpdateResponse) itemResponse; // update 操作后的響應結果 } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { DeleteResponse deleteResponse = (DeleteResponse) itemResponse; // delete 操作后的響應結果 } }
此外,批量響應還有一個非常便捷的方法來檢測是否有一個或多個操作失敗
if (bulkResponse.hasFailures()) { // 表示至少有一個操作失敗 }
在這種情況下,我們要遍歷所有的操作結果,檢查是否是失敗的操作,並獲取對應的失敗信息
for (BulkItemResponse bulkItemResponse : bulkResponse) { if (bulkItemResponse.isFailed()) { // 檢測給定的操作是否失敗 BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); // 獲取失敗信息 } }
Bulk Processor
BulkProcessor 是為了簡化 Bulk API 的操作提供的一個工具類,要執行操作,就需要下面組件
RestHighLevelClient 用來執行 BulkRequest 並獲取 BulkResponse`
BulkProcessor.Listener 對 BulkRequest 執行前后以及失敗時監聽
BulkProcessor.builder 方法用來構建一個新的BulkProcessor
BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { // 在每個 BulkRequest 執行前調用 } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { // 在每個 BulkRequest 執行后調用 } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { // 失敗時調用 } };
BulkProcessor.Builder 提供了多個方法來配置 BulkProcessor
如何來處理請求的執行。
BulkProcessor.Builder builder = BulkProcessor.builder(client::bulkAsync, listener); builder.setBulkActions(500); // 指定多少操作時,就會刷新一次 builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); builder.setConcurrentRequests(0); // 指定多大容量,就會刷新一次 builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); // 允許並發執行的數量 builder.setBackoffPolicy(BackoffPolicy .constantBackoff(TimeValue.timeValueSeconds(1L), 3)); BulkProcessor 創建后,各種請求就可以添加進去: IndexRequest one = new IndexRequest("posts", "doc", "1"). source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?"); IndexRequest two = new IndexRequest("posts", "doc", "2") .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch"); IndexRequest three = new IndexRequest("posts", "doc", "3") .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch"); bulkProcessor.add(one); bulkProcessor.add(two); bulkProcessor.add(three);
BulkProcessor 執行時,會對每個 bulk request調用 BulkProcessor.Listener , listener 提供了下面方法來訪問 BulkRequest 和 BulkResponse:
BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { int numberOfActions = request.numberOfActions(); // 在執行前獲取操作的數量 logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { // 執行后查看響應中是否包含失敗的操作 logger.warn("Bulk [{}] executed with failures", executionId); } else { logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis()); } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { logger.error("Failed to execute bulk", failure); // 請求失敗時打印信息 } };
請求添加到 BulkProcessor , 它的實例可以使用下面兩種方法關閉請求。
awaitClose() 在請求返回后或等待一定時間關閉
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
close() 立刻關閉
bulkProcessor.close();
兩個方法都會在關閉前對處理器中的請求進行刷新,並避免新的請求添加進去。
封裝好的參考方法:
/** * 批量操作 * * @param indexBeanList * @param timeValue * @param refreshPolicy * @return * @throws IOException */ public BulkResponse bulkRequest(List<IndexBean> indexBeanList, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy) throws IOException { BulkRequest bulkRequest = getBulkRequest(indexBeanList); if (null != timeValue) { bulkRequest.timeout(timeValue); } if (null != refreshPolicy) { bulkRequest.setRefreshPolicy(refreshPolicy); } return getClient().bulk(bulkRequest); } private BulkRequest getBulkRequest(List<IndexBean> indexBeanList) { BulkRequest bulkRequest = new BulkRequest(); indexBeanList.forEach(indexBean -> { if ("1".equals(indexBean.getOperateType())) { bulkRequest.add(null != indexBean.getDocId() ? new IndexRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId()) : new IndexRequest(indexBean.getIndex(), indexBean.getIndexType())); } else if ("2".equals(indexBean.getOperateType())) { if ((null != indexBean.getDocId())) { throw new ElasticsearchException("update action docId must not be null"); } bulkRequest.add(new UpdateRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId())); } else if ("3".equals(indexBean.getOperateType())) { if ((null != indexBean.getDocId())) { throw new ElasticsearchException("delete action docId must not be null"); } bulkRequest.add(new DeleteRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId())); } else { throw new ElasticsearchException("OperateType" + indexBean.getOperateType() + "is not support"); } }); return bulkRequest; } /** * 批量操作 * * @param indexBeanList * @return */ public BulkResponse bulkRequest(List<IndexBean> indexBeanList) throws IOException { return bulkRequest(indexBeanList, null, null); } /** * 批量異步操作 * * @param indexBeanList * @param bulkResponseActionListener */ public void AsyncBulkRequest(List<IndexBean> indexBeanList, ActionListener<BulkResponse> bulkResponseActionListener) { getClient().bulkAsync(getBulkRequest(indexBeanList), bulkResponseActionListener); }
7、Search APIs:
Java High Level REST Client 支持下面的 Search API:
- Search API
- Search Scroll API
- Clear Scroll API
- Multi-Search API
- Ranking Evaluation API
Search API
Search Request
searchRequest 用來完成和搜索文檔,聚合,建議等相關的任何操作同時也提供了各種方式來完成對查詢結果的高亮操作。
最基本的查詢操作如下
SearchRequest searchRequest = new SearchRequest(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.matchAllQuery()); // 添加 match_all 查詢 searchRequest.source(searchSourceBuilder); // 將 SearchSourceBuilder 添加到 SeachRequest 中
可選參數
SearchRequest searchRequest = new SearchRequest("posts"); // 設置搜索的 index searchRequest.types("doc"); // 設置搜索的 type
除了配置 index 和 type 外,還有一些其他的可選參數
searchRequest.routing("routing"); // 設置 routing 參數 searchRequest.preference("_local"); // 配置搜索時偏愛使用本地分片,默認是使用隨機分片
什么是 routing 參數?
當索引一個文檔的時候,文檔會被存儲在一個主分片上。在存儲時一般都會有多個主分片。Elasticsearch 如何知道一個文檔應該放置在哪個分片呢?這個過程是根據下面的這個公式來決定的:
shard = hash(routing) % number_of_primary_shards
routing 是一個可變值,默認是文檔的 _id ,也可以設置成一個自定義的值
number_of_primary_shards 是主分片數量
所有的文檔 API 都接受一個叫做 routing 的路由參數,通過這個參數我們可以自定義文檔到分片的映射。一個自定義的路由參數可以用來確保所有相關的文檔——例如所有屬於同一個用戶的文檔——都被存儲到同一個分片中。
使用 SearchSourceBuilder
對搜索行為的配置可以使用 SearchSourceBuilder 來完成,來看一個實例
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 默認配置 sourceBuilder.query(QueryBuilders.termQuery("user", "kimchy")); // 設置搜索,可以是任何類型的 QueryBuilder sourceBuilder.from(0); // 起始 index sourceBuilder.size(5); // 大小 size sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); // 設置搜索的超時時間
設置完成后,就可以添加到 SearchRequest 中。
SearchRequest searchRequest = new SearchRequest(); searchRequest.source(sourceBuilder);
構建查詢條件
查詢請求是通過使用 QueryBuilder 對象來完成的,並且支持 Query DSL。
DSL (domain-specific language) 領域特定語言,是指專注於某個應用程序領域的計算機語言。
可以使用構造函數來創建 QueryBuilder
MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("user", "kimchy");
QueryBuilder 創建后,就可以調用方法來配置它的查詢選項:
matchQueryBuilder.fuzziness(Fuzziness.AUTO); // 模糊查詢 matchQueryBuilder.prefixLength(3); // 前綴查詢的長度 matchQueryBuilder.maxExpansions(10); // max expansion 選項,用來控制模糊查詢
也可以使用QueryBuilders 工具類來創建 QueryBuilder 對象。這個類提供了函數式編程風格的各種方法用來快速創建 QueryBuilder 對象。
QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("user", "kimchy") .fuzziness(Fuzziness.AUTO) .prefixLength(3) .maxExpansions(10);
fuzzy-matching 拼寫錯誤時的匹配:
好的全文檢索不應該是完全相同的限定邏輯,相反,可以擴大范圍來包括可能的匹配,從而根據相關性得分將更好的匹配放在前面。
例如,搜索 quick brown fox 時會匹配一個包含 fast brown foxes 的文檔
不論什么方式創建的 QueryBuilder ,最后都需要添加到 `SearchSourceBuilder 中
searchSourceBuilder.query(matchQueryBuilder);
構建查詢 文檔中提供了一個豐富的查詢列表,里面包含各種查詢對應的QueryBuilder 對象以及QueryBuilder helper 方法,大家可以去參考。
關於構建查詢的內容會在下篇文章中講解,敬請期待。
指定排序
SearchSourceBuilder 允許添加一個或多個SortBuilder 實例。這里包含 4 種特殊的實現, (Field-, Score-, GeoDistance- 和 ScriptSortBuilder)
sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC)); // 根據分數 _score 降序排列 (默認行為) sourceBuilder.sort(new FieldSortBuilder("_uid").order(SortOrder.ASC)); // 根據 id 降序排列
過濾數據源
默認情況下,查詢請求會返回文檔的內容 _source ,當然我們也可以配置它。例如,禁止對 _source 的獲取
sourceBuilder.fetchSource(false);
也可以使用通配符模式以更細的粒度包含或排除特定的字段:
String[] includeFields = new String[] {"title", "user", "innerObject.*"}; String[] excludeFields = new String[] {"_type"}; sourceBuilder.fetchSource(includeFields, excludeFields);
高亮請求
可以通過在 SearchSourceBuilder 上設置 HighlightBuilder 完成對結果的高亮,而且可以配置不同的字段具有不同的高亮行為。
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); HighlightBuilder highlightBuilder = new HighlightBuilder(); HighlightBuilder.Field highlightTitle = new HighlightBuilder.Field("title"); // title 字段高亮 highlightTitle.highlighterType("unified"); // 配置高亮類型 highlightBuilder.field(highlightTitle); // 添加到 builder HighlightBuilder.Field highlightUser = new HighlightBuilder.Field("user"); highlightBuilder.field(highlightUser); searchSourceBuilder.highlighter(highlightBuilder);
聚合請求
要實現聚合請求分兩步
創建合適的 `AggregationBuilder
作為參數配置在 `SearchSourceBuilder 上
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_company") .field("company.keyword"); aggregation.subAggregation(AggregationBuilders.avg("average_age") .field("age")); searchSourceBuilder.aggregation(aggregation);
建議請求 Requesting Suggestions
SuggestionBuilder 實現類是由 SuggestBuilders 工廠類來創建的。
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); SuggestionBuilder termSuggestionBuilder = SuggestBuilders.termSuggestion("user").text("kmichy"); SuggestBuilder suggestBuilder = new SuggestBuilder(); suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder); searchSourceBuilder.suggest(suggestBuilder);
對請求和聚合分析
分析 API 可用來對一個特定的查詢操作中的請求和聚合進行分析,此時要將SearchSourceBuilder 的 profile標志位設置為 true
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.profile(true);
只要 SearchRequest 執行完成,對應的 SearchResponse 響應中就會包含 分析結果
同步執行
同步執行是阻塞式的,只有結果返回后才能繼續執行。
SearchResponse searchResponse = client.search(searchRequest);
異步執行
異步執行使用的是 listener 對結果進行處理。
ActionListener<SearchResponse> listener = new ActionListener<SearchResponse>() { @Override public void onResponse(SearchResponse searchResponse) { // 查詢成功 } @Override public void onFailure(Exception e) { // 查詢失敗 } };
SearchResponse
查詢執行完成后,會返回 SearchResponse 對象,並在對象中包含查詢執行的細節和符合條件的文檔集合。
歸納一下, SerchResponse 包含的信息如下
請求本身的信息,如 HTTP 狀態碼,執行時間,或者請求是否超時
RestStatus status = searchResponse.status(); // HTTP 狀態碼 TimeValue took = searchResponse.getTook(); // 查詢占用的時間 Boolean terminatedEarly = searchResponse.isTerminatedEarly(); // 是否由於 SearchSourceBuilder 中設置 terminateAfter 而過早終止 boolean timedOut = searchResponse.isTimedOut(); // 是否超時
查詢影響的分片數量的統計信息,成功和失敗的分片
int totalShards = searchResponse.getTotalShards(); int successfulShards = searchResponse.getSuccessfulShards(); int failedShards = searchResponse.getFailedShards(); for (ShardSearchFailure failure : searchResponse.getShardFailures()) { // failures should be handled here }
檢索 SearchHits
要訪問返回的文檔,首先要在響應中獲取其中的 SearchHits
SearchHits hits = searchResponse.getHits();
SearchHits 中包含了所有命中的全局信息,如查詢命中的數量或者最大分值:
long totalHits = hits.getTotalHits(); float maxScore = hits.getMaxScore();
查詢的結果嵌套在 SearchHits 中,可以通過遍歷循環獲取
SearchHit[] searchHits = hits.getHits(); for (SearchHit hit : searchHits) { // do something with the SearchHit }
SearchHit 提供了如 index , type, docId 和每個命中查詢的分數
String index = hit.getIndex(); String type = hit.getType(); String id = hit.getId(); float score = hit.getScore();
而且,還可以獲取到文檔的源數據,以 JSON-String 形式或者 key-value map 對的形式。在 map 中,字段可以是普通類型,或者是列表類型,嵌套對象。
String sourceAsString = hit.getSourceAsString(); Map<String, Object> sourceAsMap = hit.getSourceAsMap(); String documentTitle = (String) sourceAsMap.get("title"); List<Object> users = (List<Object>) sourceAsMap.get("user"); Map<String, Object> innerObject = (Map<String, Object>) sourceAsMap.get("innerObject");
Search API 查詢關系
上面的 QueryBuilder , SearchSourceBuilder 和 SearchRequest 之間都是嵌套關系, 可以參考下圖:
8、全文查詢 Full Text Queries
什么是全文查詢?
像使用 match 或者 query_string 這樣的高層查詢都屬於全文查詢,
查詢 日期(date) 或整數(integer) 字段,會將查詢字符串分別作為日期或整數對待。
查詢一個( not_analyzed )未分析的精確值字符串字段,會將整個查詢字符串作為單個詞項對待。
查詢一個( analyzed )已分析的全文字段,會先將查詢字符串傳遞到一個合適的分析器,然后生成一個供查詢的詞項列表
組成了詞項列表,后面就會對每個詞項逐一執行底層查詢,將查詢結果合並,並且為每個文檔生成最終的相關度評分。
Match
match 查詢的單個詞的步驟是什么?
檢查字段類型,查看字段是 analyzed, not_analyzed
分析查詢字符串,如果只有一個單詞項, match 查詢在執行時就會是單個底層的 term 查詢
查找匹配的文檔,會在倒排索引中查找匹配文檔,然后獲取一組包含該項的文檔
為每個文檔評分
構建 Match 查詢
match 查詢可以接受 text/numeric/dates 格式的參數,分析,並構建一個查詢。
GET /_search { "query": { "match" : { "message" : "this is a test" } } }
上面的實例中 message 是一個字段名。
對應的 QueryBuilder class : MatchQueryBuilder
具體方法 : QueryBuilders.matchQuery()
全文查詢 API 列表
基於詞項的查詢
這種類型的查詢不需要分析,它們是對單個詞項操作,只是在倒排索引中查找准確的詞項(精確匹配)並且使用 TF/IDF 算法為每個包含詞項的文檔計算相關度評分 _score。
Term
term 查詢可用作精確值匹配,精確值的類型則可以是數字,時間,布爾類型,或者是那些 not_analyzed 的字符串。
對應的 QueryBuilder class 是TermQueryBuilder
具體方法是 QueryBuilders.termQuery()
Terms
terms 查詢允許指定多個值進行匹配。如果這個字段包含了指定值中的任何一個值,就表示該文檔滿足條件。
對應的 QueryBuilder class 是 TermsQueryBuilder
具體方法是 QueryBuilders.termsQuery()
Wildcard
wildcard 通配符查詢是一種底層基於詞的查詢,它允許指定匹配的正則表達式。而且它使用的是標准的 shell 通配符查詢:
? 匹配任意字符
* 匹配 0 個或多個字符
wildcard 需要掃描倒排索引中的詞列表才能找到所有匹配的詞,然后依次獲取每個詞相關的文檔 ID。
由於通配符和正則表達式只能在查詢時才能完成,因此查詢效率會比較低,在需要高性能的場合,應當謹慎使用。
對應的 QueryBuilder class 是 WildcardQueryBuilder
具體方法是 QueryBuilders.wildcardQuery()
基於詞項 API 列表
復合查詢
什么是復合查詢?
復合查詢會將其他的復合查詢或者葉查詢包裹起來,以嵌套的形式展示和執行,得到的結果也是對各個子查詢結果和分數的合並。可以分為下面幾種:
constant_score query
經常用在使用 filter 的場合,所有匹配的文檔分數都是一個不變的常量
bool query
可以將多個葉查詢和組合查詢再組合起來,可接受的參數如下
must : 文檔必須匹配這些條件才能被包含進來
must_not 文檔必須不匹配才能被包含進來
should 如果滿足其中的任何語句,都會增加分數;即使不滿足,也沒有影響
filter 以過濾模式進行,不評分,但是必須匹配
dis_max query
叫做分離最大化查詢,它會將任何與查詢匹配的文檔都作為結果返回,但是只是將其中最佳匹配的評分作為最終的評分返回。
function_score query
允許為每個與主查詢匹配的文檔應用一個函數,可用來改變甚至替換原始的評分
boosting query
用來控制(提高或降低)復合查詢中子查詢的權重。
特殊查詢
Wrapper Query
這里比較重要的一個是 Wrapper Query,是說可以接受任何其他 base64 編碼的字符串作為子查詢。
主要應用場合就是在 Rest High-Level REST client 中接受 json 字符串作為參數。比如使用 gson 等 json 庫將要查詢的語句拼接好,直接塞到 Wrapper Query 中查詢就可以了,非常方便。
Wrapper Query 對應的 QueryBuilder class 是WrapperQueryBuilder
具體方法是 QueryBuilders.wrapperQuery()
9、關於 REST Client的完整工具類代碼
本文作者:張永清,轉載請注明出處:Elasticsearch Java API 很全的整理
public class IndexBean { //index name private String index; //index type private String indexType; //index doc id private String docId; // 1 IndexRequest 2 UpdateRequest 3 DeleteRequest private String operateType; public String getOperateType() { return operateType; } public void setOperateType(String operateType) { this.operateType = operateType; } public String getIndex() { return index; } public void setIndex(String index) { this.index = index; } public String getIndexType() { return indexType; } public void setIndexType(String indexType) { this.indexType = indexType; } public String getDocId() { return docId; } public void setDocId(String docId) { this.docId = docId; } }
/** * 自定義的es異常類 */ public class ElasticsearchException extends RuntimeException { public ElasticsearchException(String s, Exception e) { super(s, e); } public ElasticsearchException(String s){ super(s); } }
import org.apache.http.HttpHost; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.Strings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.query.MatchQueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.search.sort.SortBuilder; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; /** * es操作 * */ public class ElasticSearchClient { private String[] hostsAndPorts; public ElasticSearchClient(String[] hostsAndPorts) { this.hostsAndPorts = hostsAndPorts; } public RestHighLevelClient getClient() { RestHighLevelClient client = null; List<HttpHost> httpHosts = new ArrayList<HttpHost>(); if (hostsAndPorts.length > 0) { for (String hostsAndPort : hostsAndPorts) { String[] hp = hostsAndPort.split(":"); httpHosts.add(new HttpHost(hp[0], Integer.valueOf(hp[1]), "http")); } client = new RestHighLevelClient( RestClient.builder(httpHosts.toArray(new HttpHost[0]))); } else { client = new RestHighLevelClient( RestClient.builder(new HttpHost("127.0.0.1", 9200, "http"))); } return client; } private IndexRequest getIndexRequest(String index, String indexType, String docId, Map<String, Object> dataMap) { IndexRequest indexRequest = null; if (null == index || null == indexType) { throw new ElasticsearchException("index or indexType must not be null"); } if (null == docId) { indexRequest = new IndexRequest(index, indexType); } else { indexRequest = new IndexRequest(index, indexType, docId); } return indexRequest; } /** * 同步執行索引 * * @param index * @param indexType * @param docId * @param dataMap * @throws IOException */ public IndexResponse execIndex(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException { return getClient().index(getIndexRequest(index, indexType, docId, dataMap).source(dataMap)); } /** * 異步執行 * * @param index * @param indexType * @param docId * @param dataMap * @param indexResponseActionListener * @throws IOException */ public void asyncExecIndex(String index, String indexType, String docId, Map<String, Object> dataMap, ActionListener<IndexResponse> indexResponseActionListener) throws IOException { getClient().indexAsync(getIndexRequest(index, indexType, docId, dataMap).source(dataMap), indexResponseActionListener); } /** * @param index * @param indexType * @param docId * @param includes 返回需要包含的字段,可以傳入空 * @param excludes 返回需要不包含的字段,可以傳入為空 * @param excludes version * @param excludes versionType * @return * @throws IOException */ public GetResponse getRequest(String index, String indexType, String docId, String[] includes, String[] excludes, Integer version, VersionType versionType) throws IOException { if (null == includes || includes.length == 0) { includes = Strings.EMPTY_ARRAY; } if (null == excludes || excludes.length == 0) { excludes = Strings.EMPTY_ARRAY; } GetRequest getRequest = new GetRequest(index, indexType, docId); FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); getRequest.realtime(true); if (null != version) { getRequest.version(version); } if (null != versionType) { getRequest.versionType(versionType); } return getClient().get(getRequest.fetchSourceContext(fetchSourceContext)); } /** * @param index * @param indexType * @param docId * @param includes * @param excludes * @return * @throws IOException */ public GetResponse getRequest(String index, String indexType, String docId, String[] includes, String[] excludes) throws IOException { return getRequest(index, indexType, docId, includes, excludes, null, null); } /** * @param index * @param indexType * @param docId * @return * @throws IOException */ public GetResponse getRequest(String index, String indexType, String docId) throws IOException { GetRequest getRequest = new GetRequest(index, indexType, docId); return getClient().get(getRequest); } /** * @param index * @param indexType * @param docId * @return * @throws IOException */ public Boolean existDoc(String index, String indexType, String docId) throws IOException { GetRequest getRequest = new GetRequest(index, indexType, docId); getRequest.fetchSourceContext(new FetchSourceContext(false)); getRequest.storedFields("_none_"); return getClient().exists(getRequest); } /** * @param index * @param indexType * @param docId * @param timeValue * @param refreshPolicy * @param version * @param versionType * @return * @throws IOException */ public DeleteResponse deleteDoc(String index, String indexType, String docId, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy, Integer version, VersionType versionType) throws IOException { DeleteRequest deleteRequest = new DeleteRequest(index, indexType, docId); if (null != timeValue) { deleteRequest.timeout(timeValue); } if (null != refreshPolicy) { deleteRequest.setRefreshPolicy(refreshPolicy); } if (null != version) { deleteRequest.version(version); } if (null != versionType) { deleteRequest.versionType(versionType); } return getClient().delete(deleteRequest); } /** * @param index * @param indexType * @param docId * @return * @throws IOException */ public DeleteResponse deleteDoc(String index, String indexType, String docId) throws IOException { return deleteDoc(index, indexType, docId, null, null, null, null); } /** * @param index * @param indexType * @param docId * @param dataMap * @param timeValue * @param refreshPolicy * @param version * @param versionType * @param docAsUpsert * @param includes * @param excludes * @return * @throws IOException */ public UpdateResponse updateDoc(String index, String indexType, String docId, Map<String, Object> dataMap, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy, Integer version, VersionType versionType, Boolean docAsUpsert, String[] includes, String[] excludes) throws IOException { UpdateRequest updateRequest = new UpdateRequest(index, indexType, docId); updateRequest.doc(dataMap); if (null != timeValue) { updateRequest.timeout(timeValue); } if (null != refreshPolicy) { updateRequest.setRefreshPolicy(refreshPolicy); } if (null != version) { updateRequest.version(version); } if (null != versionType) { updateRequest.versionType(versionType); } updateRequest.docAsUpsert(docAsUpsert); //沖突時重試的次數 updateRequest.retryOnConflict(3); if (null == includes && null == excludes) { return getClient().update(updateRequest); } else { if (null == includes || includes.length == 0) { includes = Strings.EMPTY_ARRAY; } if (null == excludes || excludes.length == 0) { excludes = Strings.EMPTY_ARRAY; } return getClient().update(updateRequest.fetchSource(new FetchSourceContext(true, includes, excludes))); } } /** * 更新時不存在就插入 * * @param index * @param indexType * @param docId * @param dataMap * @return * @throws IOException */ public UpdateResponse upDdateocAsUpsert(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException { return updateDoc(index, indexType, docId, dataMap, null, null, null, null, true, null, null); } /** * 存在才更新 * * @param index * @param indexType * @param docId * @param dataMap * @return * @throws IOException */ public UpdateResponse updateDoc(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException { return updateDoc(index, indexType, docId, dataMap, null, null, null, null, false, null, null); } /** * 批量操作 * * @param indexBeanList * @param timeValue * @param refreshPolicy * @return * @throws IOException */ public BulkResponse bulkRequest(List<IndexBean> indexBeanList, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy) throws IOException { BulkRequest bulkRequest = getBulkRequest(indexBeanList); if (null != timeValue) { bulkRequest.timeout(timeValue); } if (null != refreshPolicy) { bulkRequest.setRefreshPolicy(refreshPolicy); } return getClient().bulk(bulkRequest); } private BulkRequest getBulkRequest(List<IndexBean> indexBeanList) { BulkRequest bulkRequest = new BulkRequest(); indexBeanList.forEach(indexBean -> { if ("1".equals(indexBean.getOperateType())) { bulkRequest.add(null != indexBean.getDocId() ? new IndexRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId()) : new IndexRequest(indexBean.getIndex(), indexBean.getIndexType())); } else if ("2".equals(indexBean.getOperateType())) { if ((null != indexBean.getDocId())) { throw new ElasticsearchException("update action docId must not be null"); } bulkRequest.add(new UpdateRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId())); } else if ("3".equals(indexBean.getOperateType())) { if ((null != indexBean.getDocId())) { throw new ElasticsearchException("delete action docId must not be null"); } bulkRequest.add(new DeleteRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId())); } else { throw new ElasticsearchException("OperateType" + indexBean.getOperateType() + "is not support"); } }); return bulkRequest; } /** * 批量操作 * * @param indexBeanList * @return */ public BulkResponse bulkRequest(List<IndexBean> indexBeanList) throws IOException { return bulkRequest(indexBeanList, null, null); } /** * 批量異步操作 * * @param indexBeanList * @param bulkResponseActionListener */ public void AsyncBulkRequest(List<IndexBean> indexBeanList, ActionListener<BulkResponse> bulkResponseActionListener) { getClient().bulkAsync(getBulkRequest(indexBeanList), bulkResponseActionListener); } private SearchRequest getSearchRequest(String index, String indexType) { SearchRequest searchRequest; if (null == index) { throw new ElasticsearchException("index name must not be null"); } if (null != indexType) { searchRequest = new SearchRequest(index, indexType); } else { searchRequest = new SearchRequest(index); } return searchRequest; } /** * @param index * @param indexType * @return * @throws IOException */ public SearchResponse searchRequest(String index, String indexType) throws IOException { return getClient().search(getSearchRequest(index, indexType)); } /** * @param index * @param indexType * @param from * @param size * @param termQueryBuilder * @return * @throws IOException */ public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder) throws IOException { return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, null, null, null))); } private SearchSourceBuilder getSearchSourceBuilder(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, String sortField, SortBuilder sortBuilder, Boolean fetchSource) { SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); if (null != termQueryBuilder) { searchSourceBuilder.query(termQueryBuilder); } searchSourceBuilder.from(from); searchSourceBuilder.size(size); if (null != sortField) { searchSourceBuilder.sort(sortField); } if (null != sortBuilder) { searchSourceBuilder.sort(sortBuilder); } //設置超時時間 searchSourceBuilder.timeout(new TimeValue(120, TimeUnit.SECONDS)); if (null != fetchSource) { searchSourceBuilder.fetchSource(fetchSource); } return searchSourceBuilder; } /** * @param index * @param indexType * @param from * @param size * @param termQueryBuilder * @param matchQueryBuilder * @return * @throws IOException */ public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder) throws IOException { if (null == matchQueryBuilder) { throw new ElasticsearchException("matchQueryBuilder is null"); } return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, null, null, null).query(matchQueryBuilder))); } /** * @param index * @param indexType * @param from * @param size * @param matchQueryBuilder * @return * @throws IOException */ public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, MatchQueryBuilder matchQueryBuilder) throws IOException { if (null == matchQueryBuilder) { throw new ElasticsearchException("matchQueryBuilder is null"); } return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, null, null, null, null).query(matchQueryBuilder))); } /** * @param index * @param indexType * @param from * @param size * @param matchQueryBuilder * @param sortField * @return * @throws IOException */ public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, MatchQueryBuilder matchQueryBuilder, String sortField) throws IOException { if (null == matchQueryBuilder) { throw new ElasticsearchException("matchQueryBuilder is null"); } return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, null, sortField, null, null).query(matchQueryBuilder))); } /** * @param index * @param indexType * @param from * @param size * @param matchQueryBuilder * @param sortField * @param fetchSource * @return * @throws IOException */ public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, MatchQueryBuilder matchQueryBuilder, String sortField, Boolean fetchSource) throws IOException { if (null == matchQueryBuilder) { throw new ElasticsearchException("matchQueryBuilder is null"); } return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, null, sortField, null, fetchSource).query(matchQueryBuilder))); } /** * @param index * @param indexType * @param from * @param size * @param matchQueryBuilder * @param sortBuilder * @return * @throws IOException */ public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, MatchQueryBuilder matchQueryBuilder, SortBuilder sortBuilder) throws IOException { if (null == matchQueryBuilder) { throw new ElasticsearchException("matchQueryBuilder is null"); } return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, null, null, sortBuilder, null).query(matchQueryBuilder))); } /** * 支持排序 * * @param index * @param indexType * @param from * @param size * @param termQueryBuilder * @param matchQueryBuilder * @param sortField * @return * @throws IOException */ public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder, String sortField) throws IOException { if (null == matchQueryBuilder) { throw new ElasticsearchException("matchQueryBuilder is null"); } return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, sortField, null, null).query(matchQueryBuilder))); } /** * @param index * @param indexType * @param from * @param size * @param termQueryBuilder * @param matchQueryBuilder * @param sortBuilder * @param fetchSource 開關 * @return * @throws IOException */ public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder, SortBuilder sortBuilder, Boolean fetchSource) throws IOException { if (null == matchQueryBuilder) { throw new ElasticsearchException("matchQueryBuilder is null"); } return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, null, sortBuilder, fetchSource).query(matchQueryBuilder))); } /** * @param index * @param indexType * @param from * @param size * @param termQueryBuilder * @param matchQueryBuilder * @param sortBuilder * @return * @throws IOException */ public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder, SortBuilder sortBuilder) throws IOException { if (null == matchQueryBuilder) { throw new ElasticsearchException("matchQueryBuilder is null"); } return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, null, sortBuilder, null).query(matchQueryBuilder))); } /** * @param index * @param indexType * @param from * @param size * @param termQueryBuilder * @param matchQueryBuilder * @param sortField * @param fetchSource * @return * @throws IOException */ public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder, String sortField, Boolean fetchSource) throws IOException { if (null == matchQueryBuilder) { throw new ElasticsearchException("matchQueryBuilder is null"); } return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, sortField, null, fetchSource).query(matchQueryBuilder))); } /** * 異步操作 * @param index * @param indexType * @param from * @param size * @param termQueryBuilder * @param matchQueryBuilder * @param sortBuilder * @param listener * @throws IOException */ public void asyncSearchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder, SortBuilder sortBuilder,ActionListener<SearchResponse> listener) throws IOException { if (null == matchQueryBuilder) { throw new ElasticsearchException("matchQueryBuilder is null"); } getClient().searchAsync(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, null, sortBuilder, null).query(matchQueryBuilder)),listener); } /** * 異步操作 * @param index * @param indexType * @param from * @param size * @param termQueryBuilder * @param matchQueryBuilder * @param sortField * @param listener * @throws IOException */ public void asyncSearchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder, String sortField,ActionListener<SearchResponse> listener) throws IOException { if (null == matchQueryBuilder) { throw new ElasticsearchException("matchQueryBuilder is null"); } getClient().searchAsync(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, sortField, null, null).query(matchQueryBuilder)),listener); } }
二、transportClient API
未完待續,近期繼續整理
三、Elasticsearch 架構
1、基礎概念:
1)、集群(Cluster): 包含一個或多個具有相同 cluster.name 的節點.
- 集群內節點協同工作,共享數據,並共同分擔工作負荷。
- 由於節點是從屬集群的,集群會自我重組來均勻地分發數據.
- cluster Name是很重要的,因為每個節點只能是群集的一部分,當該節點被設置為相同的名稱時,就會自動加入群集。
- 集群中通過選舉產生一個mater節點,它將負責管理集群范疇的變更,例如創建或刪除索引,添加節點到集群或從集群刪除節點。master 節點無需參與文檔層面的變更和搜索,這意味着僅有一個 master 節點並不會因流量增長而成為瓶頸。任意一個節點都可以成為 master 節點。我們例舉的集群只有一個節點,因此它會扮演 master 節點的角色。
- 作為用戶,我們可以訪問包括 master 節點在內的集群中的任一節點。每個節點都知道各個文檔的位置,並能夠將我們的請求直接轉發到擁有我們想要的數據的節點。無論我們訪問的是哪個節點,它都會控制從擁有數據的節點收集響應的過程,並返回給客戶端最終的結果。這一切都是由 Elasticsearch 透明管理的
2)、節點(node): 一個節點是一個邏輯上獨立的服務,可以存儲數據,並參與集群的索引和搜索功能, 一個節點也有唯一的名字,群集通過節點名稱進行管理和通信.
3)、索引(Index): 索引與關系型數據庫實例(Database)相當。索引只是一個 邏輯命名空間,它指向一個或多個分片(shards),內部用Apache Lucene實現索引中數據的讀寫
4)、文檔類型(Type):相當於數據庫中的table概念。每個文檔在ElasticSearch中都必須設定它的類型。文檔類型使得同一個索引中在存儲結構不同文檔時,只需要依據文檔類型就可以找到對應的參數映射(Mapping)信息,方便文檔的存取
5)、文檔(Document) :相當於數據庫中的row, 是可以被索引的基本單位。例如,你可以有一個的客戶文檔,有一個產品文檔,還有一個訂單的文檔。文檔是以JSON格式存儲的。在一個索引中,您可以存儲多個的文檔。請注意,雖然在一個索引中有多分文檔,但這些文檔的結構是一致的,並在第一次存儲的時候指定, 文檔屬於一種 類型(type),各種各樣的類型存在於一個 索引 中。你也可以通過類比傳統的關系數據庫得到一些大致的相似之處:
6)、Mapping: 相當於數據庫中的schema,用來約束字段的類型,不過 Elasticsearch 的 mapping 可以自動根據數據創建
7)、分片(shard) :是 工作單元(worker unit) 底層的一員,用來分配集群中的數據,它只負責保存索引中所有數據的一小片。
- 分片是一個獨立的Lucene實例,並且它自身也是一個完整的搜索引擎。
- 文檔存儲並且被索引在分片中,但是我們的程序並不會直接與它們通信。取而代之,它們直接與索引進行通信的
- 把分片想象成一個數據的容器。數據被存儲在分片中,然后分片又被分配在集群的節點上。當你的集群擴展或者縮小時,elasticsearch 會自動的在節點之間遷移分配分片,以便集群保持均衡
- 分片分為 主分片(primary shard) 以及 從分片(replica shard) 兩種。在你的索引中,每一個文檔都屬於一個主分片
- 從分片只是主分片的一個副本,它用於提供數據的冗余副本,在硬件故障時提供數據保護,同時服務於搜索和檢索這種只讀請求
- 索引中的主分片的數量在索引創建后就固定下來了,但是從分片的數量可以隨時改變。
- 一個索引默認設置了5個主分片,每個主分片有一個從分片對應
2、ES模塊
1)、 Gateway: 代表ES的持久化存儲方式,包含索引信息,ClusterState(集群信息),mapping,索引碎片信息,以及transaction log等
- 對於分布式集群來說,當一個或多個節點down掉了,能夠保證我們的數據不能丟,最通用的解放方案就是對失敗節點的數據進行復制,通過控制復制的份數可以保證集群有很高的可用性,復制這個方案的精髓主要是保證操作的時候沒有單點,對一個節點的操作會同步到其他的復制節點上去。
- ES一個索引會拆分成多個碎片,每個碎片可以擁有一個或多個副本(創建索引的時候可以配置),這里有個例子,每個索引分成3個碎片,每個碎片有2個副本,如下:
$ curl -XPUT http://localhost:9200/twitter/ -d ' index : number_of_shards : 3 number_of_replicas : 2
- 每個操作會自動路由主碎片所在的節點,在上面執行操作,並且同步到其他復制節點,通過使用“non blocking IO”模式所有復制的操作都是並行執行的,也就是說如果你的節點的副本越多,你網絡上的流量消耗也會越大。復制節點同樣接受來自外面的讀操作,意義就是你的復制節點越多,你的索引的可用性就越強,對搜索的可伸縮行就更好,能夠承載更多的操作
- 第一次啟動的時候,它會去持久化設備讀取集群的狀態信息(創建的索引,配置等)然后執行應用它們(創建索引,創建mapping映射等),每一次shard節點第一次實例化加入復制組,它都會從長持久化存儲里面恢復它的狀態信息
2)、 Lucence Directory: 是lucene的框架服務發現以及選主 ZenDiscovery: 用來實現節點自動發現,還有Master節點選取,假如Master出現故障,其它的這個節點會自動選舉,產生一個新的Master
它是Lucene存儲的一個抽象,由此派生了兩個類:FSDirectory和RAMDirectory,用於控制索引文件的存儲位置。使用FSDirectory類,就是存儲到硬盤;使用RAMDirectory類,則是存儲到內存
一個Directory對象是一份文件的清單。文件可能只在被創建的時候寫一次。一旦文件被創建,它將只被讀取或者刪除。在讀取的時候進行寫入操作是允許的
3)、Discovery
- 節點啟動后先ping(這里的ping是 Elasticsearch 的一個RPC命令。如果 discovery.zen.ping.unicast.hosts 有設置,則ping設置中的host,否則嘗試ping localhost 的幾個端口, Elasticsearch 支持同一個主機啟動多個節點)
- Ping的response會包含該節點的基本信息以及該節點認為的master節點
- 選舉開始,先從各節點認為的master中選,規則很簡單,按照id的字典序排序,取第一個
- 如果各節點都沒有認為的master,則從所有節點中選擇,規則同上。這里有個限制條件就是 discovery.zen.minimum_master_nodes,如果節點數達不到最小值的限制,則循環上述過程,直到節點數足夠可以開始選舉
- 最后選舉結果是肯定能選舉出一個master,如果只有一個local節點那就選出的是自己
- 如果當前節點是master,則開始等待節點數達到 minimum_master_nodes,然后提供服務, 如果當前節點不是master,則嘗試加入master.
- ES支持任意數目的集群(1-N),所以不能像 Zookeeper/Etcd 那樣限制節點必須是奇數,也就無法用投票的機制來選主,而是通過一個規則,只要所有的節點都遵循同樣的規則,得到的信息都是對等的,選出來的主節點肯定是一致的. 但分布式系統的問題就出在信息不對等的情況,這時候很容易出現腦裂(Split-Brain)的問題,大多數解決方案就是設置一個quorum值,要求可用節點必須大於quorum(一般是超過半數節點),才能對外提供服務。而 Elasticsearch 中,這個quorum的配置就是 discovery.zen.minimum_master_nodes 。
4)、memcached
- 通過memecached協議來訪問ES的接口,支持二進制和文本兩種協議.通過一個名為transport-memcached插件提供
- Memcached命令會被映射到REST接口,並且會被同樣的REST層處理,memcached命令列表包括:get/set/delete/quit
5)、River :代表es的一個數據源,也是其它存儲方式(如:數據庫)同步數據到es的一個方法。它是以插件方式存在的一個es服務,通過讀取river中的數據並把它索引到es中,官方的river有couchDB的,RabbitMQ的,Twitter的,Wikipedia的。
本文作者:張永清,轉載請注明出處:Elasticsearch Java API 很全的整理 最近發現很多轉載了本博客的文章不注明出處,作者將追究法律責任