ElasticSearch7.3學習(四)----結合Spring boot進行增刪改查和批量(bulk)詳解


1、前置

java api 文檔 https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.3/java-rest-overview.html。low : 偏向底層。high:高級封裝。

導入相關maven依賴

    <!--es客戶端-->
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>7.3.0</version>
        <exclusions>
            <exclusion>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>7.3.0</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <version>2.0.6.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
        <version>2.0.6.RELEASE</version>
    </dependency>

2、配置application.yml

spring:
  application:
    name: search-service
config:
  elasticsearch:
    hostlist: 127.0.0.1:9200 #多個節點用逗號分隔

3、配置類

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ElasticsearchConfig {
   @Value(
"${config.elasticsearch.hostlist}") private String hostlist; @Bean(destroyMethod = "close") public RestHighLevelClient restHighLevelClient(){ String[] split = hostlist.split(","); HttpHost[] httpHostsArray = new HttpHost[split.length]; for (int i = 0; i < split.length; i++) { String item=split[i]; httpHostsArray[i]=new HttpHost(item.split(":")[0],Integer.parseInt(item.split(":")[1]),"http"); } return new RestHighLevelClient(RestClient.builder(httpHostsArray)); } }

 4、查詢測試方法

@SpringBootTest
@RunWith(SpringRunner.class)
//查詢文檔
       @Test
    public void testGet() throws IOException {
        //構建請求
        GetRequest getRequest = new GetRequest("test_post", "1");

        //========================可選參數 start======================
        //為特定字段配置_source_include
//        String[] includes = new String[]{"user", "message"};
//        String[] excludes = Strings.EMPTY_ARRAY;
//        FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
//        getRequest.fetchSourceContext(fetchSourceContext);

        //為特定字段配置_source_excludes
//        String[] includes1 = new String[]{"user", "message"};
//        String[] excludes1 = Strings.EMPTY_ARRAY;
//        FetchSourceContext fetchSourceContext1 = new FetchSourceContext(true, includes1, excludes1);
//        getRequest.fetchSourceContext(fetchSourceContext1);

        //設置路由
//        getRequest.routing("routing");

        // ========================可選參數 end=====================

        //查詢 同步查詢
      GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);

        //異步查詢
//        ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
//            //查詢成功時的立馬執行的方法
//            @Override
//            public void onResponse(GetResponse getResponse) {
//                long version = getResponse.getVersion();
//                String sourceAsString = getResponse.getSourceAsString();//檢索文檔(String形式)
//                System.out.println(sourceAsString);
//            }
//
//            //查詢失敗時的立馬執行的方法
//            @Override
//            public void onFailure(Exception e) {
//                e.printStackTrace();
//            }
//        };
//        //執行異步請求
//        client.getAsync(getRequest, RequestOptions.DEFAULT, listener);
//        try {
//            Thread.sleep(5000);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }

        // 獲取結果
        if (getResponse.isExists()) {
            long version = getResponse.getVersion();

            String sourceAsString = getResponse.getSourceAsString();//檢索文檔(String形式)
            System.out.println(sourceAsString);
            byte[] sourceAsBytes = getResponse.getSourceAsBytes();//以字節形式返回
            Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
            System.out.println(sourceAsMap);
        }
    }

5、新增測試方法

@Test
    public void testAdd() throws IOException {
//        1構建請求
        IndexRequest request=new IndexRequest("test_posts");
        request.id("3");
//        =======================構建文檔============================
//        構建方法1
        String jsonString="{\n" +
                "  \"user\":\"tomas J\",\n" +
                "  \"postDate\":\"2019-07-18\",\n" +
                "  \"message\":\"trying out es3\"\n" +
                "}";
        request.source(jsonString, XContentType.JSON);

//        構建方法2
//        Map<String,Object> jsonMap=new HashMap<>();
//        jsonMap.put("user", "tomas");
//        jsonMap.put("postDate", "2019-07-18");
//        jsonMap.put("message", "trying out es2");
//        request.source(jsonMap);

//        構建方法3
//        XContentBuilder builder= XContentFactory.jsonBuilder();
//        builder.startObject();
//        {
//            builder.field("user", "tomas");
//            builder.timeField("postDate", new Date());
//            builder.field("message", "trying out es2");
//        }
//        builder.endObject();
//        request.source(builder);
//        構建方法4
//        request.source("user","tomas",
//                    "postDate",new Date(),
//                "message","trying out es2");
//
//        ========================可選參數===================================
        //設置超時時間
        request.timeout(TimeValue.timeValueSeconds(1));
        request.timeout("1s");

        //自己維護版本號
//        request.version(2);
//        request.versionType(VersionType.EXTERNAL);

//        2執行
        //同步
        IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
        //異步
//        ActionListener<IndexResponse> listener=new ActionListener<IndexResponse>() {
//            @Override
//            public void onResponse(IndexResponse indexResponse) {
//
//            }
//
//            @Override
//            public void onFailure(Exception e) {
//
//            }
//        };
//        client.indexAsync(request,RequestOptions.DEFAULT, listener ); 
//        try {
//            Thread.sleep(5000);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }


//        3獲取結果
        String index = indexResponse.getIndex();
        String id = indexResponse.getId();
        //獲取插入的類型
        if(indexResponse.getResult()== DocWriteResponse.Result.CREATED){
            DocWriteResponse.Result result=indexResponse.getResult();
            System.out.println("CREATED:"+result);
        }else if(indexResponse.getResult()== DocWriteResponse.Result.UPDATED){
            DocWriteResponse.Result result=indexResponse.getResult();
            System.out.println("UPDATED:"+result);
        }

        ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
        if(shardInfo.getTotal()!=shardInfo.getSuccessful()){
            System.out.println("處理成功的分片數少於總分片!");
        }
        if(shardInfo.getFailed()>0){
           for (ReplicationResponse.ShardInfo.Failure failure:shardInfo.getFailures()) {
               String reason = failure.reason();//處理潛在的失敗原因
               System.out.println(reason);
           }
        }
    }

6、修改測試方法

  @Test
    public void testUpdate() throws IOException {
//        1構建請求
        UpdateRequest request = new UpdateRequest("test_posts", "3");
        Map<String, Object> jsonMap = new HashMap<>();
        jsonMap.put("user", "tomas JJ");
        request.doc(jsonMap);
//===============================可選參數==========================================
        request.timeout("1s");//超時時間

        //重試次數
        request.retryOnConflict(3);

        //設置在繼續更新之前,必須激活的分片數
//        request.waitForActiveShards(2);
        //所有分片都是active狀態,才更新
//        request.waitForActiveShards(ActiveShardCount.ALL);

//        2執行
//        同步
        UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
//        異步

//        3獲取數據
        updateResponse.getId();
        updateResponse.getIndex();

        //判斷結果
        if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
            DocWriteResponse.Result result = updateResponse.getResult();
            System.out.println("CREATED:" + result);
        } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
            DocWriteResponse.Result result = updateResponse.getResult();
            System.out.println("UPDATED:" + result);
        }else if(updateResponse.getResult() == DocWriteResponse.Result.DELETED){
            DocWriteResponse.Result result = updateResponse.getResult();
            System.out.println("DELETED:" + result);
        }else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP){
            //沒有操作
            DocWriteResponse.Result result = updateResponse.getResult();
            System.out.println("NOOP:" + result);
        }
    }

7、刪除測試方法

 @Test
    public void testDelete() throws IOException {
//        1構建請求
        DeleteRequest request =new DeleteRequest("test_posts","3");
        //可選參數

//        2執行
        DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);

//        3獲取數據
        deleteResponse.getId();
        deleteResponse.getIndex();

        DocWriteResponse.Result result = deleteResponse.getResult();
        System.out.println(result);
}

8、批量(bulk)測試方法

@Test
    public void testBulk() throws IOException {
//        1創建請求
        BulkRequest request = new BulkRequest();
//        request.add(new IndexRequest("post").id("1").source(XContentType.JSON, "field", "1"));
//        request.add(new IndexRequest("post").id("2").source(XContentType.JSON, "field", "2"));

        request.add(new UpdateRequest("post","2").doc(XContentType.JSON, "field", "3"));
        request.add(new DeleteRequest("post").id("1"));

//        2執行
        BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);

        for (BulkItemResponse itemResponse : bulkResponse) {
            DocWriteResponse itemResponseResponse = itemResponse.getResponse();

            switch (itemResponse.getOpType()) {
                case INDEX:
                case CREATE:
                    IndexResponse indexResponse = (IndexResponse) itemResponseResponse;
                    indexResponse.getId();
                    System.out.println(indexResponse.getResult());
                    break;
                case UPDATE:
                    UpdateResponse updateResponse = (UpdateResponse) itemResponseResponse;
                    updateResponse.getIndex();
                    System.out.println(updateResponse.getResult());
                    break;
                case DELETE:
                    DeleteResponse deleteResponse = (DeleteResponse) itemResponseResponse;
                    System.out.println(deleteResponse.getResult());
                    break;
            }
        }
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM