Java實現對ES數據的新增,刪除,修改,及合並
新增數據
代碼:
@Autowired
private RestHighLevelClient client;
/**
* @description ES寫入數據
* @author zae
* @date 2022/1/13 14:40
* @param index 索引庫
* @param dataList 數據集合(size為插入數據的條數)
*/
public void insertEsData(String index,List<Map<String,Object>> dataList) {
BulkProcessor bulkProcessor = null;
try {
bulkProcessor = getBulkProcessor(client);
for(Map<String,Object> dataMap:dataList){
bulkProcessor.add(new IndexRequest(index).source(dataMap));
}
// 將數據刷新到ES中
bulkProcessor.flush();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
boolean terminatedFlag = bulkProcessor.awaitClose(150L,
TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private BulkProcessor getBulkProcessor(RestHighLevelClient client) {
BulkProcessor bulkProcessor = null;
try {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
logger.info("Try to insert data number : "
+ request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
logger.info("************** Success insert data number : "
+ request.numberOfActions() + " , id: " +
executionId);
}
@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
logger.error("Bulk is unsuccess : " + failure + ", executionId: " + executionId);
}
};
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
(request, bulkListener) -> client
.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer,
listener);
builder.setBulkActions(5000);//刷新條數
builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));// 刷新大小
builder.setConcurrentRequests(10);//並發線程數
builder.setFlushInterval(TimeValue.timeValueSeconds(100L));// 時間頻率
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));//重試補償策略
bulkProcessor = builder.build();
} catch (Exception e) {
e.printStackTrace();
try {
bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);
} catch (Exception e1) {
logger.error(e1.getMessage());
}
}
return bulkProcessor;
}
注意點:
-
傳入數據中的List中的每個Map為一條數據,Map中的key為字段Field,value為值。
-
假如在插入數據前index庫中沒有定義Field及映射關系,在插入數據時會自動新增字段Field,字段的類型會默認映射為value值的類型。
-
假如新插入的數據和之前第一次插入的數據類型不一致,(相同的key但是value的類型不一樣),這樣在執行代碼時可能不會報錯失敗,但是實際上並沒有插入數據成功。
刪除數據
代碼
/**
* @description ES數據刪除
* @author zae
* @date 2022/1/13 17:14
* @param index 索引庫
* @param id 數據id
*/
public void delete(String index,String id){
DeleteRequest deleteRequest = new DeleteRequest(index,id);
DeleteResponse response = null;
try {
response = client.delete(deleteRequest,RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println(response);
}
刪除的話直接指定索引庫和要刪除的數據的id進行刪除,至於數據的id怎么獲取,參考下面更新數據中的代碼。
更新數據
直接修改
/**
* @description ES數據更新
* @author zae
* @date 2022/1/13 16:10
* @param index 索引庫
* @param key 字段名
* @param value 更新后的值
* @param id 需要修改的那條數據的id
*/
public void update(String index,String key,Object value,String id){
try {
UpdateRequest request = new UpdateRequest();
request.index(index) //索引名
.id(id)//id
.doc(
XContentFactory.jsonBuilder()
.startObject()
.field(key, value)//要修改的字段 及字段值
.endObject()
);
UpdateResponse response= client.update(request,RequestOptions.DEFAULT);
System.out.println(response.status());
} catch (Exception e) {
e.printStackTrace();
}
}
-
更新數據需要傳入指定的索引庫,需要修改的字段名稱,修改后的value值,以及代表那條數據的唯一id
-
關於唯一id怎么獲得,可以參考以下代碼。
public void selectAndUpdate(){ int count = 0; //step1:根據條件獲取需要修改的數據的id SearchHit[] searchHits = esDeal.selectIdByKey(PRODUCT_DEV_INDEX, "name"); // step2:遍歷更新查詢出來的數據 for (SearchHit searchHit : searchHits) { // 獲取到單條記錄的id String id = searchHit.getId(); //調用更新數據的核心方法 esDeal.update(PRODUCT_DEV_INDEX,"name","張三",id); count++; } System.out.println("一共更新了"+count+"條數據"); } public SearchHit[] selectIdByKey(String index,String key){ BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); boolQueryBuilder.must(QueryBuilders.existsQuery(key)); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(boolQueryBuilder) .trackTotalHits(true) .size(22); SearchRequest searchRequest = new SearchRequest() .indices(index) .source(sourceBuilder); try { SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); SearchHit[] hits = searchResponse.getHits().getHits(); return hits; } catch (IOException e) { e.printStackTrace(); } return null; }
數據合並
數據合並其實也是更新的一種,關鍵在於怎么提取及封裝需要更新的數據,根據業務場景的不同達到合並的效果,以上面的圖片為例,我需要將以上數據的20004.value里面的值合並上新的數據卻也保留着原有的數據。
@Test
public void selectAndUpdate(){
int count = 0;
//step1:根據條件獲取需要修改的數據的id
SearchHit[] searchHits = esDeal.selectIdByKey(PRODUCT_DEV_INDEX, "20004.value");
for (SearchHit searchHit : searchHits) {
String id = searchHit.getId();
// step2:組裝數據
Map<String, Object> sourceAsMap = searchHit.getSourceAsMap();
List<Map<String,Object>> mapList = (List<Map<String, Object>>) sourceAsMap.get("20004");
Map<String, Object> mapData = mapList.get(0);
// 獲取之前20004.value的數據,並添加上新的數據
List value = (List) mapData.get("value");
value.add("120021");
value.add("3990993");
mapData.put("value",value);
// step3:根據id更新數據
esDeal.update(PRODUCT_DEV_INDEX,"20004",mapList,id);
count++;
}
System.out.println("一共更新了"+count+"條數據");
}
備注:
- 代碼中調用的selectIdByKey()以及update()在更新數據的第一部分都有代碼,直接使用就好。根據以上代碼更新后的數據為:
- 在更新20004.value的值時,不需要保持和20004.value的原有數據類型一致,更新成其他類型也是可以的,這點和插入key:value的數據是有區別的。