索引操作和文檔基本操作
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import com.alibaba.fastjson.JSON;
/**
* es7.6.x 高級客戶端測試 API
*/
@SpringBootTest
public class ElasticsearchJdApplicationTests {
// 面向對象來操作
@Autowired
@Qualifier("restHighLevelClient")
private RestHighLevelClient client;
// 測試索引的創建 Request PUT kuang_index
@Test
void testCreateIndex() throws IOException {
// 1、創建索引請求
CreateIndexRequest request = new CreateIndexRequest("kuang_index");
// 2、客戶端執行請求 IndicesClient,請求后獲得響應
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
System.out.println(createIndexResponse);
}
// 測試獲取索引,判斷其是否存在
@Test
void testExistIndex() throws IOException {
GetIndexRequest request = new GetIndexRequest("kuang_index2");
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
System.out.println(exists);
}
// 測試刪除索引
@Test
void testDeleteIndex() throws IOException {
DeleteIndexRequest request = new DeleteIndexRequest("kuang_index");
// 刪除
AcknowledgedResponse delete = client.indices().delete(request, RequestOptions.DEFAULT);
System.out.println(delete.isAcknowledged());
}
// 測試添加文檔
@Test
void testAddDocument() throws IOException {
// 創建對象
User user = new User("狂神說", 3);
// 創建請求
IndexRequest request = new IndexRequest("kuang_index");
// 規則 put /kuang_index/_doc/1
request.id("1");
request.timeout(TimeValue.timeValueSeconds(1));
request.timeout("1s");
// 將我們的數據放入請求 json
request.source(JSON.toJSONString(user), XContentType.JSON);
// 客戶端發送請求 , 獲取響應的結果
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
System.out.println(indexResponse.toString()); //
System.out.println(indexResponse.status()); // 對應我們命令返回的狀態CREATED
}
// 獲取文檔,判斷是否存在 get /index/doc/1
@Test
void testIsExists() throws IOException {
GetRequest getRequest = new GetRequest("kuang_index", "1");
// 不獲取返回的 _source 的上下文了
getRequest.fetchSourceContext(new FetchSourceContext(false));
getRequest.storedFields("_none_");
boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
System.out.println(exists);
}
// 獲得文檔的信息
@Test
void testGetDocument() throws IOException {
GetRequest getRequest = new GetRequest("kuang_index", "1");
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
System.out.println(getResponse.getSourceAsString()); // 打印文檔的內容
System.out.println(getResponse); // 返回的全部內容和命令式一樣的
}
// 更新文檔的信息
@Test
void testUpdateRequest() throws IOException {
UpdateRequest updateRequest = new UpdateRequest("kuang_index", "1");
updateRequest.timeout("1s");
User user = new User("狂神說Java", 18);
updateRequest.doc(JSON.toJSONString(user), XContentType.JSON);
UpdateResponse updateResponse = client.update(updateRequest, RequestOptions.DEFAULT);
System.out.println(updateResponse.status());
}
// 刪除文檔記錄
@Test
void testDeleteRequest() throws IOException {
DeleteRequest request = new DeleteRequest("kuang_index", "1");
request.timeout("1s");
DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
System.out.println(deleteResponse.status());
}
// 特殊的,真的項目一般都會批量插入數據!
@Test
void testBulkRequest() throws IOException {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.timeout("10s");
ArrayList<User> userList = new ArrayList<>();
userList.add(new User("kuangshen1", 3));
userList.add(new User("kuangshen2", 3));
userList.add(new User("kuangshen3", 3));
userList.add(new User("qinjiang1", 3));
userList.add(new User("qinjiang1", 3));
userList.add(new User("qinjiang1", 3));
// 批處理請求
for (int i = 0; i < userList.size(); i++) {
// 批量更新和批量刪除,就在這里修改對應的請求就可以了
bulkRequest.add(new IndexRequest("kuang_index").id("" + (i + 1))
.source(JSON.toJSONString(userList.get(i)), XContentType.JSON));
}
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
System.out.println(bulkResponse.hasFailures()); // 是否失敗,返回 false 代表 成功!
}
// 查詢
// SearchRequest 搜索請求
// SearchSourceBuilder 條件構造
// HighlightBuilder 構建高亮
// TermQueryBuilder 精確查詢
// MatchAllQueryBuilder
// xxx QueryBuilder 對應我們剛才看到的命令!
@Test
void testSearch() throws IOException {
SearchRequest searchRequest = new SearchRequest("kuang_index");
// 構建搜索條件
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.highlighter();
// 查詢條件,我們可以使用 QueryBuilders 工具來實現
// QueryBuilders.termQuery 精確
// QueryBuilders.matchAllQuery() 匹配所有
TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("name", "qinjiang1");
// MatchAllQueryBuilder matchAllQueryBuilder =
QueryBuilders.matchAllQuery();
sourceBuilder.query(termQueryBuilder);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
System.out.println(JSON.toJSONString(searchResponse.getHits()));
System.out.println("=================================");
for (SearchHit documentFields : searchResponse.getHits().getHits()) {
System.out.println(documentFields.getSourceAsMap());
}
}
}
REST high level client Javadoc(7.8)
文檔接口Document API
Index API 增加文檔
ElasticSearch可以直接新增數據,只要你指定了index(索引庫名稱)即可。在新增的時候你可以自己指定主鍵ID,也可以不指定,由 ElasticSearch自身生成。Elasticsearch Java High Level REST Client
新增數據提供了四種種方法。
方式一:jsonString
使用IndexRequest設置JSON格式的字符串,新增,可以借助三方件將對象直接轉換為JSON
// 指定索引
IndexRequest request = new IndexRequest("posts");
// 設置Document id
request.id("1");
// 構造JSON字符串,可以使用三方件如fastjson、jackson構造,如JSON.toJSONString(user)
String jsonString = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
request.source(jsonString, XContentType.JSON);
方式二:Map
通過map創建,會自動轉換成JSON的數據
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
// Document source provided as a Map which gets automatically converted to JSON format
IndexRequest indexRequest = new IndexRequest("posts")
.id("1").source(jsonMap);
方式三:XContentBuilder
可以借助XContentBuilder創建對象,會自動轉換為JSON格式
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("user", "kimchy");
builder.timeField("postDate", new Date());
builder.field("message", "trying out Elasticsearch");
}
builder.endObject();
// Document source provided as an XContentBuilder object, the Elasticsearch built-in helpers to generate JSON content
IndexRequest indexRequest = new IndexRequest("posts")
.id("1").source(builder);
方式四:key-pairs形式
直接使用對象鍵對形式構建,會自動轉換為JSON格式
// Document source provided as Object key-pairs, which gets converted to JSON format
IndexRequest indexRequest = new IndexRequest("posts")
.id("1")
.source("user", "kimchy",
"postDate", new Date(),
"message", "trying out Elasticsearch");
推薦使用第2、3種方式,代碼更易讀。
可選參數
IndexRequest提供了以下可選參數:
// 路由參數
request.routing("routing");
// 以TimeValue形式設置主分片超時時間
request.timeout(TimeValue.timeValueSeconds(1));
// 以String形式設置主分片超時時間
request.timeout("1s");
// 使用WriteRequest.RefreshPolicy實例設置刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
// 使用String設置刷新策略
request.setRefreshPolicy("wait_for");
// 設置version
request.version(2);
// 設置version type
request.versionType(VersionType.EXTERNAL);
// 使用DocWriteRequest.OpType值設置操作類型
request.opType(DocWriteRequest.OpType.CREATE);
// 使用String設置操作類型
request.opType("create");
// 請求執行前需要執行的 ingest pipeline
request.setPipeline("pipeline");
執行操作
分為同步和異步,listener說明同“查詢接口Search API->執行查詢”小節描述。注意listener泛型為IndexResponse。
// 同步
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
// 異步
client.indexAsync(request, RequestOptions.DEFAULT, listener);
IndexResponse結果
String index = indexResponse.getIndex();
String id = indexResponse.getId();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
// 首次創建文檔的處理
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
// 已經存在的文檔的更新
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
// 執行成功的分片數少於總分片數時,在此處處理
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure :
shardInfo.getFailures()) {
// 失敗處理
String reason = failure.reason();
}
}
如果版本沖突,則會拋出ElasticsearchException
IndexRequest request = new IndexRequest("posts")
.id("1")
.source("field", "value")
.setIfSeqNo(10L)
.setIfPrimaryTerm(20);
try {
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
// 此處說明拋出了版本沖突異常
}
}
如果opType被設置為create,但是要新增的數據已經在索引中存在相同id的文檔,也會拋出上述異常。
IndexRequest request = new IndexRequest("posts")
.id("1")
.source("field", "value")
.opType(DocWriteRequest.OpType.CREATE);
try {
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
// 此處說明拋出了版本沖突異常
}
}
Get API 文檔查詢
使用GetRequest,可以使用SearchRequest取代該API。具體參考官方文檔。
Get Source API 文檔source字段查詢
使用GetSourceRequest,可以使用SearchRequest中的SearchSourceBuilder的fetchSource方法取代。具體用法參考官方文檔。
Exists API 文檔是否存在查詢
使用方式同Get API,也是用GetRequest。查詢的文檔存在,返回true,否則返回false。
因為exists()方法只返回boolean類型,因此推薦關閉獲取_source字段及所有存儲的字段這樣會更輕量。
// 設置請求的索引和文檔ID
GetRequest getRequest = new GetRequest(
"posts",
"1");
// 關閉獲取_source字段
getRequest.fetchSourceContext(new FetchSourceContext(false));
// 關閉獲取stored fields
getRequest.storedFields("_none_");
執行操作
// 同步
boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
// 異步,listener泛型為Boolean
client.existsAsync(getRequest, RequestOptions.DEFAULT, listener);
Delete API 刪除文檔
Delete Request
文檔刪除請求使用DeleteRequest,包括索引和文檔ID兩個參數
DeleteRequest request = new DeleteRequest(
"posts", // 索引
"1"); // 文檔ID
可選參數
// 路由參數
request.routing("routing");
// 以TimeValue形式設置主分片超時時間
request.timeout(TimeValue.timeValueMinutes(2));
// 以String形式設置主分片超時時間
request.timeout("2m");
// 使用WriteRequest.RefreshPolicy實例設置刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
// 使用String設置刷新策略
request.setRefreshPolicy("wait_for");
// 設置version
request.version(2);
// 設置version type
request.versionType(VersionType.EXTERNAL);
執行操作
// 同步
DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
// 異步, ActionListener 泛型為DeleteResponse
client.deleteAsync(request, RequestOptions.DEFAULT, listener);
DeleteResponse刪除結果
返回執行刪除操作的基本信息
String index = deleteResponse.getIndex();
String id = deleteResponse.getId();
long version = deleteResponse.getVersion();
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
// 執行成功的分片數少於總分片數時,在此處處理
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure :
shardInfo.getFailures()) {
// 失敗處理
String reason = failure.reason();
}
}
可以從結果中獲取是否找到文檔
DeleteRequest request = new DeleteRequest("posts", "does_not_exist");
DeleteResponse deleteResponse = client.delete(
request, RequestOptions.DEFAULT);
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
// 沒找到文檔,執行相應處理
}
如果版本沖突,則拋出異常ElasticsearchException
try {
DeleteResponse deleteResponse = client.delete(
new DeleteRequest("posts", "1").setIfSeqNo(100).setIfPrimaryTerm(2),
RequestOptions.DEFAULT);
} catch (ElasticsearchException exception) {
if (exception.status() == RestStatus.CONFLICT) {
// 版本沖突異常處理
}
}
Update API 更新文檔
Update Request
更新文檔使用UpdateRequest,包括索引和文檔ID兩個參數。
UpdateRequest request = new UpdateRequest(
"posts", // 索引
"1"); // 文檔ID
Update API 允許使用腳本更新或者更新部分文檔信息。
Update with a script
// Script parameters provided as a Map of objects
Map<String, Object> parameters = singletonMap("count", 4);
// Create an inline script using the painless language and the previous parameters
Script inline = new Script(ScriptType.INLINE, "painless",
"ctx._source.field += params.count", parameters);
// Sets the script to the update request
request.script(inline);
或者使用stored script
// Reference to a script stored under the name increment-field in the painless language
Script stored = new Script(
ScriptType.STORED, null, "increment-field", parameters);
// Sets the script in the update request
request.script(stored);
Updates with a partial document
使用該方式時,會將需要更新的部分文檔與現有文檔合並
方式一 JSONString形式的部分文檔更新
UpdateRequest request = new UpdateRequest("posts", "1");
String jsonString = "{" +
"\"updated\":\"2017-01-01\"," +
"\"reason\":\"daily update\"" +
"}";
// Partial document source provided as a String in JSON format
request.doc(jsonString, XContentType.JSON);
方式二 map形式的部分文檔更新
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
// Partial document source provided as a Map which gets automatically converted to JSON format
UpdateRequest request = new UpdateRequest("posts", "1")
.doc(jsonMap);
方式三 XContentBuilder形式的部分文檔更新
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.timeField("updated", new Date());
builder.field("reason", "daily update");
}
builder.endObject();
// Partial document source provided as an XContentBuilder object, the Elasticsearch built-in helpers to generate JSON content
UpdateRequest request = new UpdateRequest("posts", "1")
.doc(builder);
方式三 key-pairs形式的部分文檔更新
// Partial document source provided as Object key-pairs, which gets converted to JSON format
UpdateRequest request = new UpdateRequest("posts", "1")
.doc("updated", new Date(),
"reason", "daily update");
Upserts
如果需要更新的文檔不存在,可以使用upsert方法插入新文檔。
// Upsert document source provided as a String
String jsonString = "{\"created\":\"2017-01-01\"}";
request.upsert(jsonString, XContentType.JSON);
與上面的partial document更新一樣,upsert方法也可以采用String
, Map
, XContentBuilder
或者Object
key-pairs方式。
可選參數
// 路由參數
request.routing("routing");
// 以TimeValue形式設置主分片超時時間
request.timeout(TimeValue.timeValueMinutes(2));
// 以String形式設置主分片超時時間
request.timeout("2m");
// 使用WriteRequest.RefreshPolicy實例設置刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
// 使用String設置刷新策略
request.setRefreshPolicy("wait_for");
// 設置version
request.version(2);
// 設置version type
request.versionType(VersionType.EXTERNAL);
// 如果在執行更新時已經被其他操作修改,重新嘗試的次數設置
request.retryOnConflict(3);
// 開啟獲取_source字段,默認關閉
request.fetchSource(true);
// 配置source包含的具體字段
String[] includes = new String[]{"updated", "r*"};
String[] excludes = Strings.EMPTY_ARRAY;
request.fetchSource(
new FetchSourceContext(true, includes, excludes));
// 配置source排除的具體字段
String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"updated"};
request.fetchSource(
new FetchSourceContext(true, includes, excludes));
// ifSeqNo
request.setIfSeqNo(2L);
// ifPrimaryTerm
request.setIfPrimaryTerm(1L);
// 關閉noop 探測
request.detectNoop(false);
// 設置不管文檔是否存在,腳本都被執行
request.scriptedUpsert(true);
// 設置如果要更新的文檔不存在,則文檔變為upsert文檔
request.docAsUpsert(true);
// 設置更新操作執行前活動的分片副本數量
request.waitForActiveShards(2);
// 可以作為活躍分區副本的數量ActiveShardCount:取值為 ActiveShardCount.ALL, ActiveShardCount.ONE 或者 ActiveShardCount.DEFAULT (默認值)
request.waitForActiveShards(ActiveShardCount.ALL);
執行操作
// 同步
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
// 異步。listener的泛型為UpdateResponse
client.updateAsync(request, RequestOptions.DEFAULT, listener);
UpdateResponse更新結果
UpdateResponse獲取更新操作的執行情況:
String index = updateResponse.getIndex();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
// 首次創建或upsert
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
// 文檔被更新
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
// 文檔被刪除
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
// 未對已有文檔造成影響
}
如果UpdateRequest允許通過fetchSource方法獲取source,則UpdateResponse會返回更新文檔的source信息:
// 以GetResult對象獲取被更新的文檔
GetResult result = updateResponse.getGetResult();
if (result.isExists()) {
// 以String形式獲取被更新文檔的source
String sourceAsString = result.sourceAsString();
// 以Map<String, Object>形式獲取被更新文檔的source
Map<String, Object> sourceAsMap = result.sourceAsMap();
// 以byte[]形式獲取被更新文檔的source
byte[] sourceAsBytes = result.source();
} else {
// 處理響應中沒有source的情形(默認行為)
}
還可以在響應中檢查分片失敗信息:
ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
// 執行成功的分片數少於總分片數時,在此處處理
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure :
shardInfo.getFailures()) {
// 處理失敗信息
String reason = failure.reason();
}
}
如果UpdateRequest請求一個不存在的文檔,會返回404,ElasticsearchException會拋出:
UpdateRequest request = new UpdateRequest("posts", "does_not_exist")
.doc("field", "value");
try {
UpdateResponse updateResponse = client.update(
request, RequestOptions.DEFAULT);
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.NOT_FOUND) {
// 處理由於文檔不存在導致的異常
}
}
如果版本沖突,會拋出ElasticsearchException:
UpdateRequest request = new UpdateRequest("posts", "1")
.doc("field", "value")
.setIfSeqNo(101L)
.setIfPrimaryTerm(200L);
try {
UpdateResponse updateResponse = client.update(
request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
// 版本沖突導致異常
}
}
Bulk API 批量操作
實際項目中,批量操作更常用。
Java High Level REST Client提供了Bulk Processor結合BulkRequest使用。
Bulk Request
一個BulkRequest能夠執行多個index/update/delete操作。
該請求至少需要有一個操作。
// 創建BulkRequest
BulkRequest request = new BulkRequest();
// 添加創建文檔操作請求 IndexRequest
request.add(new IndexRequest("posts").id("1")
.source(XContentType.JSON,"field", "foo"));
// 添加第2個創建文檔操作請求 IndexRequest
request.add(new IndexRequest("posts").id("2")
.source(XContentType.JSON,"field", "bar"));
// 添加第3個創建文檔操作請求 IndexRequest
request.add(new IndexRequest("posts").id("3")
.source(XContentType.JSON,"field", "baz"));
注意,Bulk API 只支持JSON或SMILE編碼格式,使用其他格式的文檔會報錯。
不同的操作可以添加到同一個BulkRequest。
BulkRequest request = new BulkRequest();
// 添加刪除文檔操作請求DeleteRequest
request.add(new DeleteRequest("posts", "3"));
// 添加更新文檔操作請求UpdateRequest
request.add(new UpdateRequest("posts", "2")
.doc(XContentType.JSON,"other", "test"));
// 添加創建文檔操作請求IndexRequst,使用SMILE格式
request.add(new IndexRequest("posts").id("4")
.source(XContentType.JSON,"field", "baz"));
可選參數
// 以TimeValue形式設置主分片超時時間
request.timeout(TimeValue.timeValueSeconds(1));
// 以String形式設置主分片超時時間
request.timeout("1s");
// 使用WriteRequest.RefreshPolicy實例設置刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
// 使用String設置刷新策略
request.setRefreshPolicy("wait_for");
// 設置version
request.version(2);
// 設置version type
request.versionType(VersionType.EXTERNAL);
// 全局pipeline,適用於所有子請求,除非子請求覆寫了pipeline
request.setPipeline("pipelineId");
// 設置index/update/delete操作執行前活動的分片副本數量
request.waitForActiveShards(2);
// 可以作為活躍分區副本的數量ActiveShardCount:取值為 ActiveShardCount.ALL, ActiveShardCount.ONE 或者 ActiveShardCount.DEFAULT (默認值)
request.waitForActiveShards(ActiveShardCount.ALL);
// 設置全局路由,適用於所有子請求
request.routing("routingId");
// 全局索引,適用於所有子請求,除非子請求單獨設置了索引。該參數是@Nullable,且只有在BulkRequest創建時設定。
BulkRequest defaulted = new BulkRequest("posts");
執行操作
// 同步
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
// 異步。listener泛型BulkResponse
client.bulkAsync(request, RequestOptions.DEFAULT, listener);
BulkResponse批量執行結果
BulkResponse包含執行操作的信息,可以迭代獲取:
// Iterate over the results of all operations
for (BulkItemResponse bulkItemResponse : bulkResponse) {
// Retrieve the response of the operation (successful or not), can be IndexResponse, UpdateResponse or DeleteResponse which can all be seen as DocWriteResponse instances
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
switch (bulkItemResponse.getOpType()) {
case INDEX: // Handle the response of an index operation
case CREATE:
IndexResponse indexResponse = (IndexResponse) itemResponse;
break;
case UPDATE: // Handle the response of a update operation
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
break;
case DELETE: // Handle the response of a delete operation
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
}
Bulk response提供了一個快捷檢查是否有操作執行失敗的方法:
// 至少有一個執行失敗時,返回true
if (bulkResponse.hasFailures()) {
}
如果有執行失敗的,則需要迭代獲取錯誤,並處理:
for (BulkItemResponse bulkItemResponse : bulkResponse) {
// 判斷操作是否失敗
if (bulkItemResponse.isFailed()) {
// 如果失敗,則獲取失敗信息
BulkItemResponse.Failure failure =
bulkItemResponse.getFailure();
}
}
Bulk Processor
BulkProcessor簡化了Bulk API的使用。通過提供工具類,允許index/update/delete 操作添加到Processor中后,透明執行。
為了執行這些請求,BulkProcessor需要如下部分:
RestHighLevelClient:用於執行BulkRequest和獲取結果BulkResponse
BulkProcessor.Listener:當一個BulkRequest執行失敗或執行完成后調用listener
之后,BulkProcessor.builder可以用來創建一個新的BulkProcessor。
// Create the BulkProcessor.Listener
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
// This method is called before each execution of a BulkRequest
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
// This method is called after each execution of a BulkRequest
}
@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
// This method is called when a BulkRequest failed
}
};
// Create the BulkProcessor by calling the build() method from the BulkProcessor.Builder. The RestHighLevelClient.bulkAsync() method will be used to execute the BulkRequest under the hood.
BulkProcessor bulkProcessor = BulkProcessor.builder(
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener).build();
BulkProcessor.Builder提供了配置BulkProcessor如何處理請求的方法:
BulkProcessor.Builder builder = BulkProcessor.builder(
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener);
// Set when to flush a new bulk request based on the number of actions currently added (defaults to 1000, use -1 to disable it)
builder.setBulkActions(500);
// Set when to flush a new bulk request based on the size of actions currently added (defaults to 5Mb, use -1 to disable it)
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));
// Set the number of concurrent requests allowed to be executed (default to 1, use 0 to only allow the execution of a single request)
builder.setConcurrentRequests(0);
// Set a flush interval flushing any BulkRequest pending if the interval passes (defaults to not set)
builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
// Set a constant back off policy that initially waits for 1 second and retries up to 3 times. See BackoffPolicy.noBackoff(), BackoffPolicy.constantBackoff() and BackoffPolicy.exponentialBackoff() for more options.
builder.setBackoffPolicy(BackoffPolicy
.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
創建完BulkProcessor后,可以向其中添加操作請求:
IndexRequest one = new IndexRequest("posts").id("1")
.source(XContentType.JSON, "title",
"In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts").id("2")
.source(XContentType.JSON, "title",
"Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts").id("3")
.source(XContentType.JSON, "title",
"The Future of Federated Search in Elasticsearch");
bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);
這些請求會被BulkProcessor執行,且每個bulk 請求后會調用 BulkProcessor.Listener。
該listener提供了處理BulkRequest 和BulkResponse的途徑:
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
// Called before each execution of a BulkRequest, this method allows to know the number of operations that are going to be executed within the BulkRequest
int numberOfActions = request.numberOfActions();
logger.debug("Executing bulk [{}] with {} requests",
executionId, numberOfActions);
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
// Called after each execution of a BulkRequest, this method allows to know if the BulkResponse contains errors
if (response.hasFailures()) {
logger.warn("Bulk [{}] executed with failures", executionId);
} else {
logger.debug("Bulk [{}] completed in {} milliseconds",
executionId, response.getTook().getMillis());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
// Called if the BulkRequest failed, this method allows to know the failure
logger.error("Failed to execute bulk", failure);
}
};
當所有請求添加到BulkProcessor后,它的實例需要使用兩個可用的關閉方法的任一個關閉:
// 如果所有請求執行完成返回true,如果請求執行超時,則返回false
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
close()方法可以用來 立即關閉BulkProcessor:
bulkProcessor.close();
上述兩個關閉方法會在關閉前刷新processor中已經添加的請求,且無法向procssor中添加新的請求。
Multi-Get API 批量獲取
multiGet API可以並行執行多個Get API。
Reindex API 文檔復制
ReindexRequest用於從一個或多個索引中復制文檔到新的目標索引中。
Update By Query API批量更新文檔
UpdateByQueryRequest
UpdateByQueryRequest用於更新一個索引中的多個文檔。
一個最簡單的UpdateByQueryRequest如下:
// 在一組索引上創建UpdateByQueryRequest
UpdateByQueryRequest request = new UpdateByQueryRequest("source1", "source2");
默認情況下版本沖突會中斷UpdateByQueryRequest的執行,但是可以使用下面的設置,只進行計數
request.setConflicts("proceed");
可以通過添加一個query限制這些文檔
// 只處理user字段值為kimchy的文檔
request.setQuery(new TermQueryBuilder("user", "kimchy"));
可以通過設置maxDocs限制處理文檔的最大數量
request.setMaxDocs(10);
默認情況下,UpdateByQueryRequest一批處理1000條文檔,可以通過setBatchSize修改。
request.setBatchSize(100);
可以利用ingest 特性,指定一個pipeline
request.setPipeline("my_pipeline");
UpdateByQueryRequest支持使用腳本修改文檔。
// setScript增加用戶為kimchy的所有文檔的likes字段值
request.setScript(
new Script(
ScriptType.INLINE, "painless",
"if (ctx._source.user == 'kimchy') {ctx._source.likes++;}",
Collections.emptyMap()));
UpdateByQueryRequest可以通過setSlices使用sliced-scroll實現並行化。
request.setSlices(2);
UpdateByQueryRequest使用scroll參數來控制search context的生命周期。
request.setScroll(TimeValue.timeValueMinutes(10));
如果提供了路由,則路由會復制到scroll query中,用以限制匹配該路由值的分片。
request.setRouting("=cat");
可選參數
除了上面的配置,還有一些配置參數。
// 批量更新超時時間
request.setTimeout(TimeValue.timeValueMinutes(2));
// 調用更新操作后刷新索引
request.setRefresh(true);
// 設置索引選項
request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
執行操作
// 同步
BulkByScrollResponse bulkResponse =
client.updateByQuery(request, RequestOptions.DEFAULT);
// 異步,listener泛型為BulkByScrollResponse
client.updateByQueryAsync(request, RequestOptions.DEFAULT, listener);
UpdateByQueryResponse 批量更新結果
UpdateByQueryResponse提供了批量更新的基本信息,可以遍歷獲取。
// Get total time taken
TimeValue timeTaken = bulkResponse.getTook();
// Check if the request timed out
boolean timedOut = bulkResponse.isTimedOut();
// Get total number of docs processed
long totalDocs = bulkResponse.getTotal();
// Number of docs that were updated
long updatedDocs = bulkResponse.getUpdated();
// Number of docs that were deleted
long deletedDocs = bulkResponse.getDeleted();
// Number of batches that were executed
long batches = bulkResponse.getBatches();
// Number of skipped docs
long noops = bulkResponse.getNoops();
// Number of version conflicts
long versionConflicts = bulkResponse.getVersionConflicts();
// Number of times request had to retry bulk index operations
long bulkRetries = bulkResponse.getBulkRetries();
// Number of times request had to retry search operations
long searchRetries = bulkResponse.getSearchRetries();
// The total time this request has throttled itself not including the current throttle time if it is currently sleeping
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled();
// Remaining delay of any current throttle sleep or 0 if not sleeping
TimeValue throttledUntilMillis =
bulkResponse.getStatus().getThrottledUntil();
// Failures during search phase
List<ScrollableHitSource.SearchFailure> searchFailures =
bulkResponse.getSearchFailures();
// Failures during bulk index operation
List<BulkItemResponse.Failure> bulkFailures =
bulkResponse.getBulkFailures();
Delete By Query Request批量刪除文檔
DeleteByQueryRequest
DeleteByQueryRequest用於刪除一個索引中的多個文檔。需要存在一個或多個索引。
最簡單的DeleteByQueryRequest如下,刪除一個索引中的所有文檔。
DeleteByQueryRequest request =
new DeleteByQueryRequest("source1", "source2");
默認情況下版本沖突會中斷DeleteByQueryRequest的執行,但是可以使用下面的設置,只進行計數
request.setConflicts("proceed");
可以通過添加一個query限制這些文檔
// 只處理user字段值為kimchy的文檔
request.setQuery(new TermQueryBuilder("user", "kimchy"));
可以通過設置maxDocs限制處理文檔的最大數量
request.setMaxDocs(10);
默認情況下,DeleteByQueryRequest一批處理1000條文檔,可以通過setBatchSize修改。
request.setBatchSize(100);
DeleteByQueryRequest可以通過setSlices使用sliced-scroll實現並行化。
request.setSlices(2);
UpdateByQueryRequest使用scroll參數來控制search context的生命周期。
request.setScroll(TimeValue.timeValueMinutes(10));
如果提供了路由,則路由會復制到scroll query中,用以限制匹配該路由值的分片。
request.setRouting("=cat");
可選參數
除了上面的配置,還有一些配置參數。
// 批量更新超時時間
request.setTimeout(TimeValue.timeValueMinutes(2));
// 調用更新操作后刷新索引
request.setRefresh(true);
// 設置索引選項
request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
執行操作
// 同步
BulkByScrollResponse bulkResponse =
client.deleteByQuery(request, RequestOptions.DEFAULT);
// 異步, listener泛型BulkByScrollResponse
client.deleteByQueryAsync(request, RequestOptions.DEFAULT, listener);
DeleteByQueryResponse批量刪除結果
返回的DeleteByQueryResponse包含了批量刪除的執行信息,可以遍歷獲取。
// Get total time taken
TimeValue timeTaken = bulkResponse.getTook();
// Check if the request timed out
boolean timedOut = bulkResponse.isTimedOut();
// Get total number of docs processed
long totalDocs = bulkResponse.getTotal();
// Number of docs that were deleted
long deletedDocs = bulkResponse.getDeleted();
// Number of batches that were executed
long batches = bulkResponse.getBatches();
// Number of skipped docs
long noops = bulkResponse.getNoops();
// Number of version conflicts
long versionConflicts = bulkResponse.getVersionConflicts();
// Number of times request had to retry bulk index operations
long bulkRetries = bulkResponse.getBulkRetries();
// Number of times request had to retry search operations
long searchRetries = bulkResponse.getSearchRetries();
// The total time this request has throttled itself not including the current throttle time if it is currently sleeping
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled();
// Remaining delay of any current throttle sleep or 0 if not sleeping
TimeValue throttledUntilMillis =
bulkResponse.getStatus().getThrottledUntil();
// Failures during search phase
List<ScrollableHitSource.SearchFailure> searchFailures =
bulkResponse.getSearchFailures();
// Failures during bulk index operation
List<BulkItemResponse.Failure> bulkFailures =
bulkResponse.getBulkFailures();