客戶端連接
public void start() { try { restHighLevelClient = new RestHighLevelClient( RestClient.builder(new HttpHost(esHost,esPort,"http")).setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() { @Override public Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
// 設置相關連接配置 requestConfigBuilder.setConnectTimeout(connectTimeOut); requestConfigBuilder.setSocketTimeout(socketTimeOut); requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut); return requestConfigBuilder; } } )); } catch (Exception e) { LogBackUtils.error(e.getMessage()); } }
索引創建
public void createIndex(String indexName, String settings, String mappings) { try{ CreateIndexRequest request = new CreateIndexRequest(indexName); buildSetting(request, settings); buildIndexMapping(request, mappings); client.indices().create(request, RequestOptions.DEFAULT); LogBackUtils.info("索引創建成功"); }catch (Exception e){ LogBackUtils.error("索引創建失敗:{}", e); } } /** * 設置分片 * @param request */ private void buildSetting(CreateIndexRequest request, String settings) { request.settings(settings, XContentType.JSON); } /** * 設置索引的mapping * @param request */ private void buildIndexMapping(CreateIndexRequest request, String mappings) { request.mapping(mappings, XContentType.JSON); }
判斷索引是否存在
public boolean existsIndex(String indexName) { try{ GetIndexRequest request = new GetIndexRequest(indexName); boolean exists = client.indices().exists(request, RequestOptions.DEFAULT); return exists; }catch (Exception e){ LogBackUtils.error("未知錯誤:{}", e); } return false; }
單條數據插入
public IndexResponse sourceIndex(String indexName, Bulk bulk) { try{ IndexRequest request = new IndexRequest("post"); request.index(indexName).id(String.valueOf(bulk.getId())).source(JSON.parseObject(bulk.toElasticString()), XContentType.JSON); IndexResponse response = client.index(request, RequestOptions.DEFAULT); LogBackUtils.info("elastic 索引新增成功"); return response; }catch (Exception e){ LogBackUtils.error("索引數據變更失敗:{}", e); } return null; }
批量數據插入
public BulkResponse bulkIndex(String indexName, List<Bulk> bulks) { try{ BulkRequest bulkRequest = new BulkRequest(); IndexRequest request = null; for(Bulk bulk: bulks) { request = new IndexRequest("post"); request.index(indexName).id(String.valueOf(bulk.getId())).source(bulk.toString(), XContentType.JSON); bulkRequest.add(request); } BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT); return response; }catch (Exception e){ LogBackUtils.error("批量插入索引失敗:{}", e); } return null; }
批量數據processor方式插入
public bulkProcessor init() { BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { LogBackUtils.info("---嘗試插入{}條數據---", request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { LogBackUtils.info("---嘗試插入{}條數據成功---", request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { LogBackUtils.error("---嘗試插入數據失敗---", failure); } }; return BulkProcessor.builder((request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener) .setBulkActions(10000) .setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB)) .setFlushInterval(TimeValue.timeValueSeconds(5)) .setConcurrentRequests(2) .build(); } public void bulkIndex(String indexName, List<Bulk> bulks) { BulkProcessor bulkProcessor = init(); IndexRequest request = null; for(Bulk bulk: bulks) { request = new IndexRequest("post"); request.index(indexName).id(String.valueOf(bulk.getId())).source(bulk.toElasticString(), XContentType.JSON); bulkProcessor.add(request); } }
數據獲取的方式暫時未加上,主要通過postman查看數據。后續有時間再補上