Java實現對ES數據的新增,刪除,修改,及合並


Java實現對ES數據的新增,刪除,修改,及合並

新增數據

代碼:

   @Autowired
    private RestHighLevelClient client;
/**
     * @description ES寫入數據
     * @author zae
     * @date 2022/1/13 14:40
     * @param index 索引庫
     * @param dataList 數據集合(size為插入數據的條數)
     */
    public void insertEsData(String index,List<Map<String,Object>> dataList) {
        BulkProcessor bulkProcessor = null;
        try {
            bulkProcessor = getBulkProcessor(client);
            for(Map<String,Object> dataMap:dataList){
                bulkProcessor.add(new IndexRequest(index).source(dataMap));
            }
            // 將數據刷新到ES中
            bulkProcessor.flush();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                boolean terminatedFlag = bulkProcessor.awaitClose(150L,
                        TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

 private BulkProcessor getBulkProcessor(RestHighLevelClient client) {
        BulkProcessor bulkProcessor = null;
        try {
            BulkProcessor.Listener listener = new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId, BulkRequest request) {
                    logger.info("Try to insert data number : "
                            + request.numberOfActions());
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request,
                                      BulkResponse response) {
                    logger.info("************** Success insert data number : "
                            + request.numberOfActions() + " , id: " +
                            executionId);
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request,
                                      Throwable failure) {
                    logger.error("Bulk is unsuccess : " + failure + ", executionId: " + executionId);
                }
            };
            BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
                    (request, bulkListener) -> client
                            .bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
            BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer,
                    listener);
            builder.setBulkActions(5000);//刷新條數
            builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));// 刷新大小
            builder.setConcurrentRequests(10);//並發線程數
            builder.setFlushInterval(TimeValue.timeValueSeconds(100L));// 時間頻率
            builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));//重試補償策略
            bulkProcessor = builder.build();
        } catch (Exception e) {
            e.printStackTrace();
            try {
                bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);
            } catch (Exception e1) {
                logger.error(e1.getMessage());
            }
        }
        return bulkProcessor;
    }

注意點:

  • 傳入數據中的List中的每個Map為一條數據,Map中的key為字段Field,value為值。

  • 假如在插入數據前index庫中沒有定義Field及映射關系,在插入數據時會自動新增字段Field,字段的類型會默認映射為value值的類型。

  • 假如新插入的數據和之前第一次插入的數據類型不一致,(相同的key但是value的類型不一樣),這樣在執行代碼時可能不會報錯失敗,但是實際上並沒有插入數據成功。

刪除數據

代碼

/**
     * @description ES數據刪除
     * @author zae
     * @date 2022/1/13 17:14
     * @param index 索引庫
     * @param id 數據id
     */
    public void delete(String index,String id){
        DeleteRequest deleteRequest = new DeleteRequest(index,id);
        DeleteResponse response = null;
        try {
            response = client.delete(deleteRequest,RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println(response);
    }

刪除的話直接指定索引庫和要刪除的數據的id進行刪除,至於數據的id怎么獲取,參考下面更新數據中的代碼。

更新數據

直接修改

 /**
     * @description ES數據更新
     * @author zae
     * @date 2022/1/13 16:10
     * @param index 索引庫
     * @param key 字段名
     * @param value 更新后的值
     * @param id  需要修改的那條數據的id
     */
    public void update(String index,String key,Object value,String id){
        try {
            UpdateRequest request = new UpdateRequest();
            request.index(index) //索引名
                    .id(id)//id
                    .doc(
                            XContentFactory.jsonBuilder()
                                    .startObject()
                                    .field(key, value)//要修改的字段 及字段值
                                    .endObject()
                    );
            UpdateResponse response= client.update(request,RequestOptions.DEFAULT);
            System.out.println(response.status());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
  • 更新數據需要傳入指定的索引庫,需要修改的字段名稱,修改后的value值,以及代表那條數據的唯一id

  • 關於唯一id怎么獲得,可以參考以下代碼。

     public void selectAndUpdate(){
            int count = 0;
           //step1:根據條件獲取需要修改的數據的id
            SearchHit[] searchHits = esDeal.selectIdByKey(PRODUCT_DEV_INDEX, "name");
           
         // step2:遍歷更新查詢出來的數據
         for (SearchHit searchHit : searchHits) {
               // 獲取到單條記錄的id
                String id = searchHit.getId();
                //調用更新數據的核心方法
                esDeal.update(PRODUCT_DEV_INDEX,"name","張三",id);
                count++;
            }
            System.out.println("一共更新了"+count+"條數據");
        } 
    
    public SearchHit[] selectIdByKey(String index,String key){
            BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
            boolQueryBuilder.must(QueryBuilders.existsQuery(key));
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            sourceBuilder.query(boolQueryBuilder)
                    .trackTotalHits(true)
                    .size(22);
    
            SearchRequest searchRequest = new SearchRequest()
                    .indices(index)
                    .source(sourceBuilder);
            try {
                SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
                SearchHit[] hits = searchResponse.getHits().getHits();
                return hits;
            } catch (IOException e) {
                e.printStackTrace();
            }
            return null;
        }
    

數據合並

數據合並其實也是更新的一種,關鍵在於怎么提取及封裝需要更新的數據,根據業務場景的不同達到合並的效果,以上面的圖片為例,我需要將以上數據的20004.value里面的值合並上新的數據卻也保留着原有的數據。

    @Test
    public void selectAndUpdate(){
        int count = 0;
       //step1:根據條件獲取需要修改的數據的id
        SearchHit[] searchHits = esDeal.selectIdByKey(PRODUCT_DEV_INDEX, "20004.value");
        for (SearchHit searchHit : searchHits) {
            String id = searchHit.getId();
            // step2:組裝數據
            Map<String, Object> sourceAsMap = searchHit.getSourceAsMap();
            List<Map<String,Object>> mapList = (List<Map<String, Object>>) sourceAsMap.get("20004");
            Map<String, Object> mapData = mapList.get(0);
            // 獲取之前20004.value的數據,並添加上新的數據
            List value = (List) mapData.get("value");
            value.add("120021");
            value.add("3990993");
            mapData.put("value",value);
            // step3:根據id更新數據
            esDeal.update(PRODUCT_DEV_INDEX,"20004",mapList,id);
            count++;
        }
        System.out.println("一共更新了"+count+"條數據");
    }

備注:

  • 代碼中調用的selectIdByKey()以及update()在更新數據的第一部分都有代碼,直接使用就好。根據以上代碼更新后的數據為:

  • 在更新20004.value的值時,不需要保持和20004.value的原有數據類型一致,更新成其他類型也是可以的,這點和插入key:value的數據是有區別的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM