【ES】【Java High Level REST Client】官方索引和文檔操作指導


索引操作和文檔基本操作


import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;

import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;

import com.alibaba.fastjson.JSON;

/**
 *  es7.6.x 高級客戶端測試 API
 */
@SpringBootTest
public class ElasticsearchJdApplicationTests {
    // 面向對象來操作
    @Autowired
    @Qualifier("restHighLevelClient")
    private RestHighLevelClient client;

    // 測試索引的創建 Request PUT kuang_index
    @Test
    void testCreateIndex() throws IOException {
        // 1、創建索引請求
        CreateIndexRequest request = new CreateIndexRequest("kuang_index");
        // 2、客戶端執行請求 IndicesClient,請求后獲得響應
        CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
        System.out.println(createIndexResponse);
    }

    // 測試獲取索引,判斷其是否存在
    @Test
    void testExistIndex() throws IOException {
        GetIndexRequest request = new GetIndexRequest("kuang_index2");
        boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
        System.out.println(exists);
    }

    // 測試刪除索引
    @Test
    void testDeleteIndex() throws IOException {
        DeleteIndexRequest request = new DeleteIndexRequest("kuang_index");
        // 刪除
        AcknowledgedResponse delete = client.indices().delete(request, RequestOptions.DEFAULT);
        System.out.println(delete.isAcknowledged());
    }

    // 測試添加文檔
    @Test
    void testAddDocument() throws IOException {
        // 創建對象
        User user = new User("狂神說", 3);
        // 創建請求
        IndexRequest request = new IndexRequest("kuang_index");
        // 規則 put /kuang_index/_doc/1
        request.id("1");
        request.timeout(TimeValue.timeValueSeconds(1));
        request.timeout("1s");
        // 將我們的數據放入請求 json
        request.source(JSON.toJSONString(user), XContentType.JSON);
        // 客戶端發送請求 , 獲取響應的結果
        IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
        System.out.println(indexResponse.toString()); //
        System.out.println(indexResponse.status()); // 對應我們命令返回的狀態CREATED
    }

    // 獲取文檔,判斷是否存在 get /index/doc/1
    @Test
    void testIsExists() throws IOException {
        GetRequest getRequest = new GetRequest("kuang_index", "1");
        // 不獲取返回的 _source 的上下文了
        getRequest.fetchSourceContext(new FetchSourceContext(false));
        getRequest.storedFields("_none_");
        boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
        System.out.println(exists);
    }

    // 獲得文檔的信息
    @Test
    void testGetDocument() throws IOException {
        GetRequest getRequest = new GetRequest("kuang_index", "1");
        GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
        System.out.println(getResponse.getSourceAsString()); // 打印文檔的內容
        System.out.println(getResponse); // 返回的全部內容和命令式一樣的
    }

    // 更新文檔的信息
    @Test
    void testUpdateRequest() throws IOException {
        UpdateRequest updateRequest = new UpdateRequest("kuang_index", "1");
        updateRequest.timeout("1s");
        User user = new User("狂神說Java", 18);
        updateRequest.doc(JSON.toJSONString(user), XContentType.JSON);
        UpdateResponse updateResponse = client.update(updateRequest, RequestOptions.DEFAULT);
        System.out.println(updateResponse.status());
    }

    // 刪除文檔記錄
    @Test
    void testDeleteRequest() throws IOException {
        DeleteRequest request = new DeleteRequest("kuang_index", "1");
        request.timeout("1s");
        DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
        System.out.println(deleteResponse.status());
    }

    // 特殊的,真的項目一般都會批量插入數據!
    @Test
    void testBulkRequest() throws IOException {
        BulkRequest bulkRequest = new BulkRequest();
        bulkRequest.timeout("10s");
        ArrayList<User> userList = new ArrayList<>();
        userList.add(new User("kuangshen1", 3));
        userList.add(new User("kuangshen2", 3));
        userList.add(new User("kuangshen3", 3));
        userList.add(new User("qinjiang1", 3));
        userList.add(new User("qinjiang1", 3));
        userList.add(new User("qinjiang1", 3));
        // 批處理請求
        for (int i = 0; i < userList.size(); i++) {
            // 批量更新和批量刪除,就在這里修改對應的請求就可以了
            bulkRequest.add(new IndexRequest("kuang_index").id("" + (i + 1))
                .source(JSON.toJSONString(userList.get(i)), XContentType.JSON));
        }
        BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
        System.out.println(bulkResponse.hasFailures()); // 是否失敗,返回 false 代表 成功!
    }

    // 查詢
    // SearchRequest 搜索請求
    // SearchSourceBuilder 條件構造
    // HighlightBuilder 構建高亮
    // TermQueryBuilder 精確查詢
    // MatchAllQueryBuilder
    // xxx QueryBuilder 對應我們剛才看到的命令!
    @Test
    void testSearch() throws IOException {
        SearchRequest searchRequest = new SearchRequest("kuang_index");
        // 構建搜索條件
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.highlighter();
        // 查詢條件,我們可以使用 QueryBuilders 工具來實現
        // QueryBuilders.termQuery 精確
        // QueryBuilders.matchAllQuery() 匹配所有
        TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("name", "qinjiang1");
        // MatchAllQueryBuilder matchAllQueryBuilder =
        QueryBuilders.matchAllQuery();
        sourceBuilder.query(termQueryBuilder);
        sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
        searchRequest.source(sourceBuilder);
        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
        System.out.println(JSON.toJSONString(searchResponse.getHits()));
        System.out.println("=================================");
        for (SearchHit documentFields : searchResponse.getHits().getHits()) {
            System.out.println(documentFields.getSourceAsMap());
        }
    }
}

REST high level client Javadoc(7.8)

文檔接口Document API

Index API 增加文檔

ElasticSearch可以直接新增數據,只要你指定了index(索引庫名稱)即可。在新增的時候你可以自己指定主鍵ID,也可以不指定,由 ElasticSearch自身生成。Elasticsearch Java High Level REST Client新增數據提供了四種種方法。

方式一:jsonString

使用IndexRequest設置JSON格式的字符串,新增,可以借助三方件將對象直接轉換為JSON

// 指定索引
IndexRequest request = new IndexRequest("posts"); 
// 設置Document id
request.id("1"); 
// 構造JSON字符串,可以使用三方件如fastjson、jackson構造,如JSON.toJSONString(user)
String jsonString = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
        "}";
request.source(jsonString, XContentType.JSON);

方式二:Map

通過map創建,會自動轉換成JSON的數據

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
// Document source provided as a Map which gets automatically converted to JSON format
IndexRequest indexRequest = new IndexRequest("posts")
    .id("1").source(jsonMap);

方式三:XContentBuilder

可以借助XContentBuilder創建對象,會自動轉換為JSON格式

XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
    builder.field("user", "kimchy");
    builder.timeField("postDate", new Date());
    builder.field("message", "trying out Elasticsearch");
}
builder.endObject();
// Document source provided as an XContentBuilder object, the Elasticsearch built-in helpers to generate JSON content
IndexRequest indexRequest = new IndexRequest("posts")
    .id("1").source(builder); 

方式四:key-pairs形式

直接使用對象鍵對形式構建,會自動轉換為JSON格式

// Document source provided as Object key-pairs, which gets converted to JSON format
IndexRequest indexRequest = new IndexRequest("posts")
    .id("1")
    .source("user", "kimchy",
        "postDate", new Date(),
        "message", "trying out Elasticsearch");

推薦使用第2、3種方式,代碼更易讀。

可選參數

IndexRequest提供了以下可選參數:

// 路由參數
request.routing("routing"); 

// 以TimeValue形式設置主分片超時時間
request.timeout(TimeValue.timeValueSeconds(1)); 
// 以String形式設置主分片超時時間
request.timeout("1s");

// 使用WriteRequest.RefreshPolicy實例設置刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
// 使用String設置刷新策略
request.setRefreshPolicy("wait_for"); 

// 設置version
request.version(2); 
// 設置version type
request.versionType(VersionType.EXTERNAL); 

// 使用DocWriteRequest.OpType值設置操作類型
request.opType(DocWriteRequest.OpType.CREATE); 
// 使用String設置操作類型
request.opType("create"); 

// 請求執行前需要執行的 ingest pipeline
request.setPipeline("pipeline"); 

執行操作

分為同步和異步,listener說明同“查詢接口Search API->執行查詢”小節描述。注意listener泛型為IndexResponse。

// 同步
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
// 異步
client.indexAsync(request, RequestOptions.DEFAULT, listener); 

IndexResponse結果

String index = indexResponse.getIndex();
String id = indexResponse.getId();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
    // 首次創建文檔的處理
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    // 已經存在的文檔的更新
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    // 執行成功的分片數少於總分片數時,在此處處理
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure :
            shardInfo.getFailures()) {
        // 失敗處理
        String reason = failure.reason(); 
    }
}

如果版本沖突,則會拋出ElasticsearchException

IndexRequest request = new IndexRequest("posts")
    .id("1")
    .source("field", "value")
    .setIfSeqNo(10L)
    .setIfPrimaryTerm(20);
try {
    IndexResponse response = client.index(request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
        // 此處說明拋出了版本沖突異常
    }
}

如果opType被設置為create,但是要新增的數據已經在索引中存在相同id的文檔,也會拋出上述異常。

IndexRequest request = new IndexRequest("posts")
    .id("1")
    .source("field", "value")
    .opType(DocWriteRequest.OpType.CREATE);
try {
    IndexResponse response = client.index(request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
                // 此處說明拋出了版本沖突異常
    }
}

Get API 文檔查詢

使用GetRequest,可以使用SearchRequest取代該API。具體參考官方文檔。

Get Source API 文檔source字段查詢

使用GetSourceRequest,可以使用SearchRequest中的SearchSourceBuilder的fetchSource方法取代。具體用法參考官方文檔。

Exists API 文檔是否存在查詢

使用方式同Get API,也是用GetRequest。查詢的文檔存在,返回true,否則返回false。

因為exists()方法只返回boolean類型,因此推薦關閉獲取_source字段及所有存儲的字段這樣會更輕量。

// 設置請求的索引和文檔ID
GetRequest getRequest = new GetRequest(
    "posts", 
    "1");    
// 關閉獲取_source字段
getRequest.fetchSourceContext(new FetchSourceContext(false)); 
// 關閉獲取stored fields
getRequest.storedFields("_none_");     

執行操作

// 同步
boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
// 異步,listener泛型為Boolean
client.existsAsync(getRequest, RequestOptions.DEFAULT, listener); 

Delete API 刪除文檔

Delete Request

文檔刪除請求使用DeleteRequest,包括索引和文檔ID兩個參數

DeleteRequest request = new DeleteRequest(
        "posts",    // 索引
        "1");  // 文檔ID

可選參數

// 路由參數
request.routing("routing"); 

// 以TimeValue形式設置主分片超時時間
request.timeout(TimeValue.timeValueMinutes(2)); 
// 以String形式設置主分片超時時間
request.timeout("2m");

// 使用WriteRequest.RefreshPolicy實例設置刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
// 使用String設置刷新策略
request.setRefreshPolicy("wait_for"); 

// 設置version
request.version(2); 
// 設置version type
request.versionType(VersionType.EXTERNAL); 

執行操作

// 同步
DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
// 異步, ActionListener 泛型為DeleteResponse
client.deleteAsync(request, RequestOptions.DEFAULT, listener); 

DeleteResponse刪除結果

返回執行刪除操作的基本信息

String index = deleteResponse.getIndex();
String id = deleteResponse.getId();
long version = deleteResponse.getVersion();
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
     // 執行成功的分片數少於總分片數時,在此處處理
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure :
            shardInfo.getFailures()) {
        // 失敗處理
        String reason = failure.reason(); 
    }
}

可以從結果中獲取是否找到文檔

DeleteRequest request = new DeleteRequest("posts", "does_not_exist");
DeleteResponse deleteResponse = client.delete(
        request, RequestOptions.DEFAULT);
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
    // 沒找到文檔,執行相應處理
}

如果版本沖突,則拋出異常ElasticsearchException

try {
    DeleteResponse deleteResponse = client.delete(
        new DeleteRequest("posts", "1").setIfSeqNo(100).setIfPrimaryTerm(2),
            RequestOptions.DEFAULT);
} catch (ElasticsearchException exception) {
    if (exception.status() == RestStatus.CONFLICT) {
        // 版本沖突異常處理
    }
}

Update API 更新文檔

Update Request

更新文檔使用UpdateRequest,包括索引和文檔ID兩個參數。

UpdateRequest request = new UpdateRequest(
        "posts",  // 索引
        "1"); // 文檔ID

Update API 允許使用腳本更新或者更新部分文檔信息。

Update with a script

// Script parameters provided as a Map of objects
Map<String, Object> parameters = singletonMap("count", 4); 
// Create an inline script using the painless language and the previous parameters
Script inline = new Script(ScriptType.INLINE, "painless",
        "ctx._source.field += params.count", parameters);  
// Sets the script to the update request
request.script(inline);  

或者使用stored script

// Reference to a script stored under the name increment-field in the painless language
Script stored = new Script(
        ScriptType.STORED, null, "increment-field", parameters);  
// Sets the script in the update request
request.script(stored);  

Updates with a partial document

使用該方式時,會將需要更新的部分文檔與現有文檔合並

方式一 JSONString形式的部分文檔更新

UpdateRequest request = new UpdateRequest("posts", "1");
String jsonString = "{" +
        "\"updated\":\"2017-01-01\"," +
        "\"reason\":\"daily update\"" +
        "}";
// Partial document source provided as a String in JSON format
request.doc(jsonString, XContentType.JSON);

方式二 map形式的部分文檔更新

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
// Partial document source provided as a Map which gets automatically converted to JSON format
UpdateRequest request = new UpdateRequest("posts", "1")
        .doc(jsonMap); 

方式三 XContentBuilder形式的部分文檔更新

XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
    builder.timeField("updated", new Date());
    builder.field("reason", "daily update");
}
builder.endObject();
// Partial document source provided as an XContentBuilder object, the Elasticsearch built-in helpers to generate JSON content
UpdateRequest request = new UpdateRequest("posts", "1")
        .doc(builder);  

方式三 key-pairs形式的部分文檔更新

// Partial document source provided as Object key-pairs, which gets converted to JSON format
UpdateRequest request = new UpdateRequest("posts", "1")
        .doc("updated", new Date(),
             "reason", "daily update");

Upserts

如果需要更新的文檔不存在,可以使用upsert方法插入新文檔。

// Upsert document source provided as a String
String jsonString = "{\"created\":\"2017-01-01\"}";
request.upsert(jsonString, XContentType.JSON);

與上面的partial document更新一樣,upsert方法也可以采用String, Map, XContentBuilder 或者Object key-pairs方式。

可選參數

// 路由參數
request.routing("routing"); 

// 以TimeValue形式設置主分片超時時間
request.timeout(TimeValue.timeValueMinutes(2)); 
// 以String形式設置主分片超時時間
request.timeout("2m");

// 使用WriteRequest.RefreshPolicy實例設置刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
// 使用String設置刷新策略
request.setRefreshPolicy("wait_for"); 

// 設置version
request.version(2); 
// 設置version type
request.versionType(VersionType.EXTERNAL); 

// 如果在執行更新時已經被其他操作修改,重新嘗試的次數設置
request.retryOnConflict(3); 

// 開啟獲取_source字段,默認關閉
request.fetchSource(true); 

// 配置source包含的具體字段
String[] includes = new String[]{"updated", "r*"};
String[] excludes = Strings.EMPTY_ARRAY;
request.fetchSource(
        new FetchSourceContext(true, includes, excludes));

// 配置source排除的具體字段
String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"updated"};
request.fetchSource(
        new FetchSourceContext(true, includes, excludes));

// ifSeqNo
request.setIfSeqNo(2L); 
// ifPrimaryTerm
request.setIfPrimaryTerm(1L); 

// 關閉noop 探測
request.detectNoop(false);

// 設置不管文檔是否存在,腳本都被執行
request.scriptedUpsert(true); 

// 設置如果要更新的文檔不存在,則文檔變為upsert文檔
request.docAsUpsert(true); 

// 設置更新操作執行前活動的分片副本數量
request.waitForActiveShards(2); 
// 可以作為活躍分區副本的數量ActiveShardCount:取值為 ActiveShardCount.ALL, ActiveShardCount.ONE 或者  ActiveShardCount.DEFAULT (默認值)
request.waitForActiveShards(ActiveShardCount.ALL);

執行操作

// 同步
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);

// 異步。listener的泛型為UpdateResponse
client.updateAsync(request, RequestOptions.DEFAULT, listener); 

UpdateResponse更新結果

UpdateResponse獲取更新操作的執行情況:

String index = updateResponse.getIndex();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
    // 首次創建或upsert
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    // 文檔被更新
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
    // 文檔被刪除
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
    // 未對已有文檔造成影響
}

如果UpdateRequest允許通過fetchSource方法獲取source,則UpdateResponse會返回更新文檔的source信息:

// 以GetResult對象獲取被更新的文檔
GetResult result = updateResponse.getGetResult(); 
if (result.isExists()) {
    // 以String形式獲取被更新文檔的source
    String sourceAsString = result.sourceAsString(); 
    // 以Map<String, Object>形式獲取被更新文檔的source
    Map<String, Object> sourceAsMap = result.sourceAsMap(); 
    // 以byte[]形式獲取被更新文檔的source
    byte[] sourceAsBytes = result.source(); 
} else {
    // 處理響應中沒有source的情形(默認行為)
}

還可以在響應中檢查分片失敗信息:

ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    // 執行成功的分片數少於總分片數時,在此處處理
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure :
            shardInfo.getFailures()) {
        // 處理失敗信息
        String reason = failure.reason(); 
    }
}

如果UpdateRequest請求一個不存在的文檔,會返回404,ElasticsearchException會拋出:

UpdateRequest request = new UpdateRequest("posts", "does_not_exist")
        .doc("field", "value");
try {
    UpdateResponse updateResponse = client.update(
            request, RequestOptions.DEFAULT);
} catch (ElasticsearchException e) {
    if (e.status() == RestStatus.NOT_FOUND) {
        // 處理由於文檔不存在導致的異常
    }
}

如果版本沖突,會拋出ElasticsearchException:

UpdateRequest request = new UpdateRequest("posts", "1")
        .doc("field", "value")
        .setIfSeqNo(101L)
        .setIfPrimaryTerm(200L);
try {
    UpdateResponse updateResponse = client.update(
            request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
        // 版本沖突導致異常
    }
}

Bulk API 批量操作

實際項目中,批量操作更常用。

Java High Level REST Client提供了Bulk Processor結合BulkRequest使用。

Bulk Request

一個BulkRequest能夠執行多個index/update/delete操作。

該請求至少需要有一個操作。

// 創建BulkRequest
BulkRequest request = new BulkRequest(); 
// 添加創建文檔操作請求 IndexRequest
request.add(new IndexRequest("posts").id("1")  
        .source(XContentType.JSON,"field", "foo"));
// 添加第2個創建文檔操作請求 IndexRequest
request.add(new IndexRequest("posts").id("2")  
        .source(XContentType.JSON,"field", "bar"));
// 添加第3個創建文檔操作請求 IndexRequest
request.add(new IndexRequest("posts").id("3")  
        .source(XContentType.JSON,"field", "baz"));

注意,Bulk API 只支持JSON或SMILE編碼格式,使用其他格式的文檔會報錯。

不同的操作可以添加到同一個BulkRequest。

BulkRequest request = new BulkRequest();
// 添加刪除文檔操作請求DeleteRequest
request.add(new DeleteRequest("posts", "3")); 
// 添加更新文檔操作請求UpdateRequest
request.add(new UpdateRequest("posts", "2") 
        .doc(XContentType.JSON,"other", "test"));
// 添加創建文檔操作請求IndexRequst,使用SMILE格式
request.add(new IndexRequest("posts").id("4")  
        .source(XContentType.JSON,"field", "baz"));

可選參數


// 以TimeValue形式設置主分片超時時間
request.timeout(TimeValue.timeValueSeconds(1)); 
// 以String形式設置主分片超時時間
request.timeout("1s");

// 使用WriteRequest.RefreshPolicy實例設置刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
// 使用String設置刷新策略
request.setRefreshPolicy("wait_for"); 

// 設置version
request.version(2); 
// 設置version type
request.versionType(VersionType.EXTERNAL); 

// 全局pipeline,適用於所有子請求,除非子請求覆寫了pipeline
request.setPipeline("pipelineId"); 

// 設置index/update/delete操作執行前活動的分片副本數量
request.waitForActiveShards(2); 
// 可以作為活躍分區副本的數量ActiveShardCount:取值為 ActiveShardCount.ALL, ActiveShardCount.ONE 或者  ActiveShardCount.DEFAULT (默認值)
request.waitForActiveShards(ActiveShardCount.ALL);

// 設置全局路由,適用於所有子請求
request.routing("routingId"); 

// 全局索引,適用於所有子請求,除非子請求單獨設置了索引。該參數是@Nullable,且只有在BulkRequest創建時設定。
BulkRequest defaulted = new BulkRequest("posts"); 

執行操作

// 同步
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);

// 異步。listener泛型BulkResponse
client.bulkAsync(request, RequestOptions.DEFAULT, listener); 

BulkResponse批量執行結果

BulkResponse包含執行操作的信息,可以迭代獲取:

// Iterate over the results of all operations
for (BulkItemResponse bulkItemResponse : bulkResponse) { 
    // Retrieve the response of the operation (successful or not), can be IndexResponse, UpdateResponse or DeleteResponse which can all be seen as DocWriteResponse instances
    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 

    switch (bulkItemResponse.getOpType()) {
    case INDEX:    // Handle the response of an index operation
    case CREATE:   
        IndexResponse indexResponse = (IndexResponse) itemResponse;
        break;
    case UPDATE:   // Handle the response of a update operation
        UpdateResponse updateResponse = (UpdateResponse) itemResponse;
        break;
    case DELETE:   // Handle the response of a delete operation
        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
    }
}

Bulk response提供了一個快捷檢查是否有操作執行失敗的方法:

// 至少有一個執行失敗時,返回true
if (bulkResponse.hasFailures()) { 

}

如果有執行失敗的,則需要迭代獲取錯誤,並處理:

for (BulkItemResponse bulkItemResponse : bulkResponse) {
    // 判斷操作是否失敗
    if (bulkItemResponse.isFailed()) { 
        // 如果失敗,則獲取失敗信息
        BulkItemResponse.Failure failure =
                bulkItemResponse.getFailure(); 
    }
}

Bulk Processor

BulkProcessor簡化了Bulk API的使用。通過提供工具類,允許index/update/delete 操作添加到Processor中后,透明執行。

為了執行這些請求,BulkProcessor需要如下部分:

RestHighLevelClient:用於執行BulkRequest和獲取結果BulkResponse

BulkProcessor.Listener:當一個BulkRequest執行失敗或執行完成后調用listener

之后,BulkProcessor.builder可以用來創建一個新的BulkProcessor。

// Create the BulkProcessor.Listener
BulkProcessor.Listener listener = new BulkProcessor.Listener() { 
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        // This method is called before each execution of a BulkRequest
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) {
        // This method is called after each execution of a BulkRequest
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            Throwable failure) {
        // This method is called when a BulkRequest failed
    }
};

// Create the BulkProcessor by calling the build() method from the BulkProcessor.Builder. The RestHighLevelClient.bulkAsync() method will be used to execute the BulkRequest under the hood.
BulkProcessor bulkProcessor = BulkProcessor.builder(
        (request, bulkListener) ->
            client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
        listener).build();

BulkProcessor.Builder提供了配置BulkProcessor如何處理請求的方法:

BulkProcessor.Builder builder = BulkProcessor.builder(
        (request, bulkListener) ->
            client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
        listener);
// Set when to flush a new bulk request based on the number of actions currently added (defaults to 1000, use -1 to disable it)
builder.setBulkActions(500); 
// Set when to flush a new bulk request based on the size of actions currently added (defaults to 5Mb, use -1 to disable it)
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); 
// Set the number of concurrent requests allowed to be executed (default to 1, use 0 to only allow the execution of a single request)
builder.setConcurrentRequests(0); 
// Set a flush interval flushing any BulkRequest pending if the interval passes (defaults to not set)
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); 
// Set a constant back off policy that initially waits for 1 second and retries up to 3 times. See BackoffPolicy.noBackoff(), BackoffPolicy.constantBackoff() and BackoffPolicy.exponentialBackoff() for more options.
builder.setBackoffPolicy(BackoffPolicy
        .constantBackoff(TimeValue.timeValueSeconds(1L), 3)); 

創建完BulkProcessor后,可以向其中添加操作請求:

IndexRequest one = new IndexRequest("posts").id("1")
        .source(XContentType.JSON, "title",
                "In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts").id("2")
        .source(XContentType.JSON, "title",
                "Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts").id("3")
        .source(XContentType.JSON, "title",
                "The Future of Federated Search in Elasticsearch");

bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);

這些請求會被BulkProcessor執行,且每個bulk 請求后會調用 BulkProcessor.Listener。

該listener提供了處理BulkRequest 和BulkResponse的途徑:

BulkProcessor.Listener listener = new BulkProcessor.Listener() {
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        // Called before each execution of a BulkRequest, this method allows to know the number of operations that are going to be executed within the BulkRequest
        int numberOfActions = request.numberOfActions(); 
        logger.debug("Executing bulk [{}] with {} requests",
                executionId, numberOfActions);
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) {
        // Called after each execution of a BulkRequest, this method allows to know if the BulkResponse contains errors
        if (response.hasFailures()) { 
            logger.warn("Bulk [{}] executed with failures", executionId);
        } else {
            logger.debug("Bulk [{}] completed in {} milliseconds",
                    executionId, response.getTook().getMillis());
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            Throwable failure) {
        // Called if the BulkRequest failed, this method allows to know the failure
        logger.error("Failed to execute bulk", failure); 
    }
};

當所有請求添加到BulkProcessor后,它的實例需要使用兩個可用的關閉方法的任一個關閉:

// 如果所有請求執行完成返回true,如果請求執行超時,則返回false
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS); 

close()方法可以用來 立即關閉BulkProcessor:

bulkProcessor.close();

上述兩個關閉方法會在關閉前刷新processor中已經添加的請求,且無法向procssor中添加新的請求。

Multi-Get API 批量獲取

multiGet API可以並行執行多個Get API。

Reindex API 文檔復制

ReindexRequest用於從一個或多個索引中復制文檔到新的目標索引中。

Update By Query API批量更新文檔

UpdateByQueryRequest

UpdateByQueryRequest用於更新一個索引中的多個文檔。

一個最簡單的UpdateByQueryRequest如下:

// 在一組索引上創建UpdateByQueryRequest
UpdateByQueryRequest request =  new UpdateByQueryRequest("source1", "source2"); 

默認情況下版本沖突會中斷UpdateByQueryRequest的執行,但是可以使用下面的設置,只進行計數

request.setConflicts("proceed");

可以通過添加一個query限制這些文檔

// 只處理user字段值為kimchy的文檔
request.setQuery(new TermQueryBuilder("user", "kimchy")); 

可以通過設置maxDocs限制處理文檔的最大數量

request.setMaxDocs(10); 

默認情況下,UpdateByQueryRequest一批處理1000條文檔,可以通過setBatchSize修改。

request.setBatchSize(100); 

可以利用ingest 特性,指定一個pipeline

request.setPipeline("my_pipeline"); 

UpdateByQueryRequest支持使用腳本修改文檔。

// setScript增加用戶為kimchy的所有文檔的likes字段值
request.setScript(
    new Script(
        ScriptType.INLINE, "painless",
        "if (ctx._source.user == 'kimchy') {ctx._source.likes++;}",
        Collections.emptyMap()));

UpdateByQueryRequest可以通過setSlices使用sliced-scroll實現並行化。

request.setSlices(2); 

UpdateByQueryRequest使用scroll參數來控制search context的生命周期。

request.setScroll(TimeValue.timeValueMinutes(10)); 

如果提供了路由,則路由會復制到scroll query中,用以限制匹配該路由值的分片。

request.setRouting("=cat"); 

可選參數

除了上面的配置,還有一些配置參數。

// 批量更新超時時間
request.setTimeout(TimeValue.timeValueMinutes(2)); 
// 調用更新操作后刷新索引
request.setRefresh(true); 
// 設置索引選項
request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); 

執行操作

// 同步
BulkByScrollResponse bulkResponse =
        client.updateByQuery(request, RequestOptions.DEFAULT);
// 異步,listener泛型為BulkByScrollResponse
client.updateByQueryAsync(request, RequestOptions.DEFAULT, listener); 

UpdateByQueryResponse 批量更新結果

UpdateByQueryResponse提供了批量更新的基本信息,可以遍歷獲取。

// Get total time taken
TimeValue timeTaken = bulkResponse.getTook(); 
// Check if the request timed out
boolean timedOut = bulkResponse.isTimedOut(); 
// Get total number of docs processed
long totalDocs = bulkResponse.getTotal(); 
// Number of docs that were updated
long updatedDocs = bulkResponse.getUpdated(); 
// Number of docs that were deleted
long deletedDocs = bulkResponse.getDeleted(); 
// Number of batches that were executed
long batches = bulkResponse.getBatches(); 
// Number of skipped docs
long noops = bulkResponse.getNoops(); 
// Number of version conflicts
long versionConflicts = bulkResponse.getVersionConflicts(); 
// Number of times request had to retry bulk index operations
long bulkRetries = bulkResponse.getBulkRetries(); 
// Number of times request had to retry search operations
long searchRetries = bulkResponse.getSearchRetries(); 
// The total time this request has throttled itself not including the current throttle time if it is currently sleeping
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled(); 
// Remaining delay of any current throttle sleep or 0 if not sleeping
TimeValue throttledUntilMillis =
        bulkResponse.getStatus().getThrottledUntil(); 
// Failures during search phase
List<ScrollableHitSource.SearchFailure> searchFailures =
        bulkResponse.getSearchFailures(); 
// Failures during bulk index operation
List<BulkItemResponse.Failure> bulkFailures =
        bulkResponse.getBulkFailures();

Delete By Query Request批量刪除文檔

DeleteByQueryRequest

DeleteByQueryRequest用於刪除一個索引中的多個文檔。需要存在一個或多個索引。

最簡單的DeleteByQueryRequest如下,刪除一個索引中的所有文檔。

DeleteByQueryRequest request =
        new DeleteByQueryRequest("source1", "source2"); 

默認情況下版本沖突會中斷DeleteByQueryRequest的執行,但是可以使用下面的設置,只進行計數

request.setConflicts("proceed");

可以通過添加一個query限制這些文檔

// 只處理user字段值為kimchy的文檔
request.setQuery(new TermQueryBuilder("user", "kimchy")); 

可以通過設置maxDocs限制處理文檔的最大數量

request.setMaxDocs(10); 

默認情況下,DeleteByQueryRequest一批處理1000條文檔,可以通過setBatchSize修改。

request.setBatchSize(100); 

DeleteByQueryRequest可以通過setSlices使用sliced-scroll實現並行化。

request.setSlices(2); 

UpdateByQueryRequest使用scroll參數來控制search context的生命周期。

request.setScroll(TimeValue.timeValueMinutes(10)); 

如果提供了路由,則路由會復制到scroll query中,用以限制匹配該路由值的分片。

request.setRouting("=cat"); 

可選參數

除了上面的配置,還有一些配置參數。

// 批量更新超時時間
request.setTimeout(TimeValue.timeValueMinutes(2)); 
// 調用更新操作后刷新索引
request.setRefresh(true); 
// 設置索引選項
request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); 

執行操作

// 同步
BulkByScrollResponse bulkResponse =
        client.deleteByQuery(request, RequestOptions.DEFAULT);
// 異步, listener泛型BulkByScrollResponse
client.deleteByQueryAsync(request, RequestOptions.DEFAULT, listener); 

DeleteByQueryResponse批量刪除結果

返回的DeleteByQueryResponse包含了批量刪除的執行信息,可以遍歷獲取。

// Get total time taken
TimeValue timeTaken = bulkResponse.getTook(); 
// Check if the request timed out
boolean timedOut = bulkResponse.isTimedOut(); 
// Get total number of docs processed
long totalDocs = bulkResponse.getTotal(); 
// Number of docs that were deleted
long deletedDocs = bulkResponse.getDeleted(); 
// Number of batches that were executed
long batches = bulkResponse.getBatches(); 
// Number of skipped docs
long noops = bulkResponse.getNoops(); 
// Number of version conflicts
long versionConflicts = bulkResponse.getVersionConflicts(); 
// Number of times request had to retry bulk index operations
long bulkRetries = bulkResponse.getBulkRetries(); 
// Number of times request had to retry search operations
long searchRetries = bulkResponse.getSearchRetries(); 
// The total time this request has throttled itself not including the current throttle time if it is currently sleeping
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled(); 
// Remaining delay of any current throttle sleep or 0 if not sleeping
TimeValue throttledUntilMillis =
        bulkResponse.getStatus().getThrottledUntil(); 
// Failures during search phase
List<ScrollableHitSource.SearchFailure> searchFailures =
        bulkResponse.getSearchFailures(); 
// Failures during bulk index operation
List<BulkItemResponse.Failure> bulkFailures =
        bulkResponse.getBulkFailures(); 

Rethrottle API

Multi Term Vectors API


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM