客户端连接
public void start() { try { restHighLevelClient = new RestHighLevelClient( RestClient.builder(new HttpHost(esHost,esPort,"http")).setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() { @Override public Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
// 设置相关连接配置 requestConfigBuilder.setConnectTimeout(connectTimeOut); requestConfigBuilder.setSocketTimeout(socketTimeOut); requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut); return requestConfigBuilder; } } )); } catch (Exception e) { LogBackUtils.error(e.getMessage()); } }
索引创建
public void createIndex(String indexName, String settings, String mappings) { try{ CreateIndexRequest request = new CreateIndexRequest(indexName); buildSetting(request, settings); buildIndexMapping(request, mappings); client.indices().create(request, RequestOptions.DEFAULT); LogBackUtils.info("索引创建成功"); }catch (Exception e){ LogBackUtils.error("索引创建失败:{}", e); } } /** * 设置分片 * @param request */ private void buildSetting(CreateIndexRequest request, String settings) { request.settings(settings, XContentType.JSON); } /** * 设置索引的mapping * @param request */ private void buildIndexMapping(CreateIndexRequest request, String mappings) { request.mapping(mappings, XContentType.JSON); }
判断索引是否存在
public boolean existsIndex(String indexName) { try{ GetIndexRequest request = new GetIndexRequest(indexName); boolean exists = client.indices().exists(request, RequestOptions.DEFAULT); return exists; }catch (Exception e){ LogBackUtils.error("未知错误:{}", e); } return false; }
单条数据插入
public IndexResponse sourceIndex(String indexName, Bulk bulk) { try{ IndexRequest request = new IndexRequest("post"); request.index(indexName).id(String.valueOf(bulk.getId())).source(JSON.parseObject(bulk.toElasticString()), XContentType.JSON); IndexResponse response = client.index(request, RequestOptions.DEFAULT); LogBackUtils.info("elastic 索引新增成功"); return response; }catch (Exception e){ LogBackUtils.error("索引数据变更失败:{}", e); } return null; }
批量数据插入
public BulkResponse bulkIndex(String indexName, List<Bulk> bulks) { try{ BulkRequest bulkRequest = new BulkRequest(); IndexRequest request = null; for(Bulk bulk: bulks) { request = new IndexRequest("post"); request.index(indexName).id(String.valueOf(bulk.getId())).source(bulk.toString(), XContentType.JSON); bulkRequest.add(request); } BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT); return response; }catch (Exception e){ LogBackUtils.error("批量插入索引失败:{}", e); } return null; }
批量数据processor方式插入
public bulkProcessor init() { BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { LogBackUtils.info("---尝试插入{}条数据---", request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { LogBackUtils.info("---尝试插入{}条数据成功---", request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { LogBackUtils.error("---尝试插入数据失败---", failure); } }; return BulkProcessor.builder((request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener) .setBulkActions(10000) .setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB)) .setFlushInterval(TimeValue.timeValueSeconds(5)) .setConcurrentRequests(2) .build(); } public void bulkIndex(String indexName, List<Bulk> bulks) { BulkProcessor bulkProcessor = init(); IndexRequest request = null; for(Bulk bulk: bulks) { request = new IndexRequest("post"); request.index(indexName).id(String.valueOf(bulk.getId())).source(bulk.toElasticString(), XContentType.JSON); bulkProcessor.add(request); } }
数据获取的方式暂时未加上,主要通过postman查看数据。后续有时间再补上