客戶端連接
public void start() {
try {
restHighLevelClient = new RestHighLevelClient(
RestClient.builder(new HttpHost(esHost,esPort,"http")).setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
// 設置相關連接配置
requestConfigBuilder.setConnectTimeout(connectTimeOut);
requestConfigBuilder.setSocketTimeout(socketTimeOut);
requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut);
return requestConfigBuilder;
}
}
));
} catch (Exception e) {
LogBackUtils.error(e.getMessage());
}
}
索引創建
public void createIndex(String indexName, String settings, String mappings) {
try{
CreateIndexRequest request = new CreateIndexRequest(indexName);
buildSetting(request, settings);
buildIndexMapping(request, mappings);
client.indices().create(request, RequestOptions.DEFAULT);
LogBackUtils.info("索引創建成功");
}catch (Exception e){
LogBackUtils.error("索引創建失敗:{}", e);
}
}
/**
* 設置分片
* @param request
*/
private void buildSetting(CreateIndexRequest request, String settings) {
request.settings(settings, XContentType.JSON);
}
/**
* 設置索引的mapping
* @param request
*/
private void buildIndexMapping(CreateIndexRequest request, String mappings) {
request.mapping(mappings, XContentType.JSON);
}
判斷索引是否存在
public boolean existsIndex(String indexName) {
try{
GetIndexRequest request = new GetIndexRequest(indexName);
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
return exists;
}catch (Exception e){
LogBackUtils.error("未知錯誤:{}", e);
}
return false;
}
單條數據插入
public IndexResponse sourceIndex(String indexName, Bulk bulk) {
try{
IndexRequest request = new IndexRequest("post");
request.index(indexName).id(String.valueOf(bulk.getId())).source(JSON.parseObject(bulk.toElasticString()), XContentType.JSON);
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
LogBackUtils.info("elastic 索引新增成功");
return response;
}catch (Exception e){
LogBackUtils.error("索引數據變更失敗:{}", e);
}
return null;
}
批量數據插入
public BulkResponse bulkIndex(String indexName, List<Bulk> bulks) {
try{
BulkRequest bulkRequest = new BulkRequest();
IndexRequest request = null;
for(Bulk bulk: bulks) {
request = new IndexRequest("post");
request.index(indexName).id(String.valueOf(bulk.getId())).source(bulk.toString(), XContentType.JSON);
bulkRequest.add(request);
}
BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
return response;
}catch (Exception e){
LogBackUtils.error("批量插入索引失敗:{}", e);
}
return null;
}
批量數據processor方式插入
public bulkProcessor init() {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
LogBackUtils.info("---嘗試插入{}條數據---", request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
LogBackUtils.info("---嘗試插入{}條數據成功---", request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
LogBackUtils.error("---嘗試插入數據失敗---", failure);
}
};
return BulkProcessor.builder((request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener)
.setBulkActions(10000)
.setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(2)
.build();
}
public void bulkIndex(String indexName, List<Bulk> bulks) {
BulkProcessor bulkProcessor = init();
IndexRequest request = null;
for(Bulk bulk: bulks) {
request = new IndexRequest("post");
request.index(indexName).id(String.valueOf(bulk.getId())).source(bulk.toElasticString(), XContentType.JSON);
bulkProcessor.add(request);
}
}
數據獲取的方式暫時未加上,主要通過postman查看數據。后續有時間再補上
