RestHighLevelClient客戶端相關CURD操作


  客戶端連接

public void start() {
    try {
        restHighLevelClient = new RestHighLevelClient(
                RestClient.builder(new HttpHost(esHost,esPort,"http")).setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
                            @Override
                            public Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
                   // 設置相關連接配置 requestConfigBuilder.setConnectTimeout(connectTimeOut); requestConfigBuilder.setSocketTimeout(socketTimeOut); requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut); return requestConfigBuilder; } } )); } catch (Exception e) { LogBackUtils.error(e.getMessage()); } }

  索引創建

public void createIndex(String indexName, String settings, String mappings) {
        try{
            CreateIndexRequest request = new CreateIndexRequest(indexName);
            buildSetting(request, settings);
            buildIndexMapping(request, mappings);
            client.indices().create(request, RequestOptions.DEFAULT);
            LogBackUtils.info("索引創建成功");
        }catch (Exception e){
            LogBackUtils.error("索引創建失敗:{}", e);
        }
    }
/**
 * 設置分片
 * @param request
 */
private void buildSetting(CreateIndexRequest request, String settings) {
    request.settings(settings, XContentType.JSON);
}

/**
 * 設置索引的mapping
 * @param request
 */
private void buildIndexMapping(CreateIndexRequest request, String mappings) {
    request.mapping(mappings, XContentType.JSON);
}

  判斷索引是否存在

public boolean existsIndex(String indexName) {
        try{
            GetIndexRequest request = new GetIndexRequest(indexName);
            boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
            return exists;
        }catch (Exception e){
            LogBackUtils.error("未知錯誤:{}", e);
        }
        return false;
    }

  單條數據插入

public IndexResponse sourceIndex(String indexName, Bulk bulk) {
        try{
            IndexRequest request = new IndexRequest("post");
            request.index(indexName).id(String.valueOf(bulk.getId())).source(JSON.parseObject(bulk.toElasticString()), XContentType.JSON);
            IndexResponse response = client.index(request, RequestOptions.DEFAULT);
            LogBackUtils.info("elastic 索引新增成功");
            return response;
        }catch (Exception e){
            LogBackUtils.error("索引數據變更失敗:{}", e);
        }
        return null;
    }

  批量數據插入

public BulkResponse bulkIndex(String indexName, List<Bulk> bulks) {
    try{
        BulkRequest bulkRequest = new BulkRequest();
        IndexRequest request = null;
        for(Bulk bulk: bulks) {
            request = new IndexRequest("post");
            request.index(indexName).id(String.valueOf(bulk.getId())).source(bulk.toString(), XContentType.JSON);
            bulkRequest.add(request);
        }
        BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
        return response;
    }catch (Exception e){
        LogBackUtils.error("批量插入索引失敗:{}", e);
    }
    return null;
}

  批量數據processor方式插入

public bulkProcessor init() {
        BulkProcessor.Listener listener = new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                LogBackUtils.info("---嘗試插入{}條數據---", request.numberOfActions());
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                LogBackUtils.info("---嘗試插入{}條數據成功---", request.numberOfActions());
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                LogBackUtils.error("---嘗試插入數據失敗---", failure);
            }
        };

        return BulkProcessor.builder((request, bulkListener) ->
                client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener)
        .setBulkActions(10000)
        .setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB))
        .setFlushInterval(TimeValue.timeValueSeconds(5))
        .setConcurrentRequests(2)
        .build();
    }

public void bulkIndex(String indexName, List<Bulk> bulks) {
        BulkProcessor bulkProcessor = init();
        IndexRequest request = null;
        for(Bulk bulk: bulks) {
            request = new IndexRequest("post");
            request.index(indexName).id(String.valueOf(bulk.getId())).source(bulk.toElasticString(), XContentType.JSON);
            bulkProcessor.add(request);
        }
    }

 

數據獲取的方式暫時未加上,主要通過postman查看數據。后續有時間再補上


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM