es通用工具類ElasticSearchUtil


最近也是搭了一套elk+rabbitmq的日志系統,搬過來大哥編寫的工具類

public class ElasticSearchUtil {
    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUtil.class);
    /**
     * 創建索引
     *
     * @param index
     * @return
     */
    public static boolean createIndex(String index) {
        if (!isIndexExist(index)) {
            logger.info("Index is not exits!");
        }
        CreateIndexResponse indexresponse = null;
        try {
            indexresponse = EsClient.getTransportClient().admin().indices().prepareCreate(index).execute()
                    .actionGet();
        } catch (Exception e) {
            e.printStackTrace();
        }
        logger.info("執行建立成功?" + indexresponse.isAcknowledged());

        return indexresponse.isAcknowledged();
    }

    /**
     * 刪除索引
     *
     * @param index
     * @return
     */
    public static boolean deleteIndex(String index) {
        if (!isIndexExist(index)) {
            logger.info("Index is not exits!");
        }
        DeleteIndexResponse dResponse = EsClient.getTransportClient().admin().indices().prepareDelete(index).execute().actionGet();
        if (dResponse.isAcknowledged()) {
            logger.info("delete index " + index + "  successfully!");
        } else {
            logger.info("Fail to delete index " + index);
        }
        return dResponse.isAcknowledged();
    }

    /**
     * 判斷索引是否存在
     *
     * @param index
     * @return
     */
    public static boolean isIndexExist(String index) {
        IndicesExistsResponse inExistsResponse = EsClient.getTransportClient().admin().indices().exists(new IndicesExistsRequest
                (index)).actionGet();
        if (inExistsResponse.isExists()) {
            logger.info("Index [" + index + "] is exist!");
        } else {
            logger.info("Index [" + index + "] is not exist!");
        }
        return inExistsResponse.isExists();
    }

    /**
     * 數據添加,正定ID
     *
     * @param jsonObject 要增加的數據
     * @param index      索引,類似數據庫
     * @param type       類型,類似表
     * @param id         數據ID
     * @return
     */
    public static String addData(JSONObject jsonObject, String index, String type, String id) {

        IndexResponse response = EsClient.getTransportClient().prepareIndex(index, type, id).setSource(jsonObject).get();

        logger.info("addData response status:{},id:{}", response.status().getStatus(), response.getId());

        return response.getId();
    }

    /**
     * 數據添加
     *
     * @param jsonObject 要增加的數據
     * @param index      索引,類似數據庫
     * @param type       類型,類似表
     * @return
     */
    public static String addData(JSONObject jsonObject, String index, String type) {
        return addData(jsonObject, index, type, UUID.randomUUID().toString().replaceAll("-", "").toUpperCase());
    }

    /**
     * 通過ID刪除數據
     *
     * @param index 索引,類似數據庫
     * @param type  類型,類似表
     * @param id    數據ID
     */
    public static void deleteDataById(String index, String type, String id) {

        DeleteResponse response = EsClient.getTransportClient().prepareDelete(index, type, id).execute().actionGet();

        logger.info("deleteDataById response status:{},id:{}", response.status().getStatus(), response.getId());
    }

    /**
     * 通過ID 更新數據
     *
     * @param jsonObject 要增加的數據
     * @param index      索引,類似數據庫
     * @param type       類型,類似表
     * @param id         數據ID
     * @return
     */
    public static void updateDataById(JSONObject jsonObject, String index, String type, String id) {

        UpdateRequest updateRequest = new UpdateRequest();

        updateRequest.index(index).type(type).id(id).doc(jsonObject);

        EsClient.getTransportClient().update(updateRequest);

    }

    /**
     * 通過ID獲取數據
     *
     * @param index  索引,類似數據庫
     * @param type   類型,類似表
     * @param id     數據ID
     * @param fields 需要顯示的字段,逗號分隔(缺省為全部字段)
     * @return
     */
    public static Map<String, Object> searchDataById(String index, String type, String id, String fields) {

        GetRequestBuilder getRequestBuilder = EsClient.getTransportClient().prepareGet(index, type, id);

        if (StringUtils.isNotEmpty(fields)) {
            getRequestBuilder.setFetchSource(fields.split(","), null);
        }

        GetResponse getResponse = getRequestBuilder.execute().actionGet();

        return getResponse.getSource();
    }

    /**
     * 使用分詞查詢不分頁
     *
     * @param index          索引名稱
     * @param type           類型名稱,可傳入多個type逗號分隔
     * @param startTime      開始時間
     * @param endTime        結束時間
     * @param size           文檔大小限制
     * @param fields         需要顯示的字段,逗號分隔(缺省為全部字段)
     * @param sortField      排序字段
     * @param highlightField 高亮字段
     * @param operatorTag    查詢條件最外層的關系(true與,false或)
     * @param parmStr        過濾條件
     * @return
     */
    public static List<Map<String, Object>> searchListData(String index, String type, Integer size,long startTime,
            long endTime, String fields, String logType, String sortField, String highlightField, boolean
            operatorTag, String... parmStr) {
        SearchRequestBuilder searchRequestBuilder = EsClient.getTransportClient().prepareSearch(index);
        if (StringUtils.isNotEmpty(type)) {
            searchRequestBuilder.setTypes(type.split(","));
        }
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
        if (startTime > 0 && endTime > 0) {
            boolQuery.must(QueryBuilders.rangeQuery("createDate").format("epoch_millis").from(startTime).to(endTime)
                    .includeLower(true).includeUpper(false));
        }
        //設置查詢日志類型
        boolQuery.must(QueryBuilders.matchQuery("logType", logType));
        // 查詢字段 與關系
        for (int i = 0, len = parmStr.length; i < len; i++) {
            BoolQueryBuilder tempBoolQuery = QueryBuilders.boolQuery();
            if (parmStr[i].contains("&&")){
                String[] tempStr =parmStr[i].split("&&");
                for(int j=0,len1=tempStr.length;j<len1;j++)
                {
                    String[] ss = tempStr[j].split("=");
                    //分詞查詢
                    tempBoolQuery.must(QueryBuilders.matchPhraseQuery(ss[0], ss[1]));
                }
                if (operatorTag) {
                    //如果為真則最外層是與關系
                    boolQuery.must(tempBoolQuery);
                } else {
                    boolQuery.should(tempBoolQuery);
                }
            } else {
                if (parmStr[i].contains(",,")) {
                    String[] tempStr =parmStr[i].split(",,");
                    for(int j=0,len1=tempStr.length;j<len1;j++)
                    {
                        String[] ss = tempStr[j].split("=");
                        //分詞查詢
                        tempBoolQuery.should(QueryBuilders.matchPhraseQuery(ss[0], ss[1]));
                    }
                    if (operatorTag) {
                        //如果為真則最外層是與關系
                        boolQuery.must(tempBoolQuery);
                    } else {
                        boolQuery.should(tempBoolQuery);
                    }
                } else {
                    if(ToolUtil.isNotEmpty(parmStr[i])) {
                        String[] ss = parmStr[i].split("=");
                        if (operatorTag) {
                            //如果為真則最外層是與關系
                            boolQuery.must(QueryBuilders.matchPhraseQuery(ss[0], ss[1]));
                        } else {
                            boolQuery.should(QueryBuilders.matchPhraseQuery(ss[0], ss[1]));
                        }
                    }
                }
            }
        }
        if (StringUtils.isNotEmpty(highlightField)) {
            HighlightBuilder highlightBuilder = new HighlightBuilder();
            // 設置高亮字段
            highlightBuilder.field(highlightField);
            searchRequestBuilder.highlighter(highlightBuilder);
        }
        searchRequestBuilder.setQuery(boolQuery);
        if (StringUtils.isNotEmpty(fields)) {
            searchRequestBuilder.setFetchSource(fields.split(","), null);
        }
        searchRequestBuilder.setFetchSource(true);

        if (StringUtils.isNotEmpty(sortField)) {
            searchRequestBuilder.addSort(sortField, SortOrder.DESC);
        }
        if (size != null && size > 0) {
            searchRequestBuilder.setSize(size);
        }
        //打印的內容 可以在 Elasticsearch head 和 Kibana  上執行查詢
        logger.info("\n{}", searchRequestBuilder);
        SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();

        long totalHits = searchResponse.getHits().totalHits;
        long length = searchResponse.getHits().getHits().length;
        logger.info("共查詢到[{}]條數據,處理數據條數[{}]", totalHits, length);
        if (searchResponse.status().getStatus() == 200) {
            // 解析對象
            return setSearchResponse(searchResponse, highlightField);
        }
        return null;
    }

    /**
     * 使用分詞查詢,並分頁
     *
     * @param index          索引名稱
     * @param type           類型名稱,可傳入多個type逗號分隔
     * @param currentPage    當前頁
     * @param pageSize       每頁顯示條數
     * @param startTime      開始時間
     * @param endTime        結束時間
     * @param fields         需要顯示的字段,逗號分隔(缺省為全部字段)
     * @param sortField      排序字段
     * @param highlightField 高亮字段
     * @param operatorTag    外層邏輯與true 或false
     * @param parmStr        內層邏輯與&& 或||
     * @return
     */
    public static EsPage searchDataPage(String index, String type, int currentPage, int pageSize, long startTime,
                                        long endTime, String fields, String logType, String sortField, String highlightField, boolean
            operatorTag, String... parmStr) {
        SearchRequestBuilder searchRequestBuilder = EsClient.getTransportClient().prepareSearch(index);
        if (StringUtils.isNotEmpty(type)) {
            searchRequestBuilder.setTypes(type.split(","));
        }
        searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH);
        // 需要顯示的字段,逗號分隔(缺省為全部字段)
        if (StringUtils.isNotEmpty(fields)) {
            searchRequestBuilder.setFetchSource(fields.split(","), null);
        }
        //排序字段
        if (StringUtils.isNotEmpty(sortField)) {
            searchRequestBuilder.addSort(sortField, SortOrder.DESC);
        }
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
        if (startTime > 0 && endTime > 0) {
            boolQuery.must(QueryBuilders.rangeQuery("createDate").format("epoch_millis").from(startTime).to(endTime)
                    .includeLower(true).includeUpper(true));
        }
        //設置查詢日志類型
        boolQuery.must(QueryBuilders.matchQuery("logType", logType));
        // 查詢字段 與關系
        for (int i = 0, len = parmStr.length; i < len; i++) {
            BoolQueryBuilder tempBoolQuery = QueryBuilders.boolQuery();
            if (parmStr[i].contains("&&")) {
                String[] tempStr =parmStr[i].split("&&");
                for(int j=0,len1=tempStr.length;j<len1;j++)
                {
                    String[] ss = tempStr[j].split("=");
                    //分詞查詢
                    tempBoolQuery.must(QueryBuilders.matchPhraseQuery(ss[0], ss[1]));
                }
                if (operatorTag) {
                    //如果為真則最外層是與關系
                    boolQuery.must(tempBoolQuery);
                } else {
                    boolQuery.should(tempBoolQuery);
                }
            } else {
                if (parmStr[i].contains(",,")) {
                    String[] tempStr =parmStr[i].split(",,");
                    for(int j=0,len1=tempStr.length;j<len1;j++)
                    {
                        String[] ss = tempStr[j].split("=");
                        //分詞查詢
                        tempBoolQuery.should(QueryBuilders.matchPhraseQuery(ss[0], ss[1]));
                    }
                    if (operatorTag) {
                        //如果為真則最外層是與關系
                        boolQuery.must(tempBoolQuery);
                    } else {
                        boolQuery.should(tempBoolQuery);
                    }
                } else {
                    if(ToolUtil.isNotEmpty(parmStr[i])) {
                        String[] ss = parmStr[i].split("=");
                        if (operatorTag) {
                            //如果為真則最外層是與關系
                            boolQuery.must(QueryBuilders.matchPhraseQuery(ss[0], ss[1]));
                        } else {
                            boolQuery.should(QueryBuilders.matchPhraseQuery(ss[0], ss[1]));
                        }
                    }
                }
            }
        }
        if (StringUtils.isNotEmpty(highlightField)) {
            HighlightBuilder highlightBuilder = new HighlightBuilder();
            //highlightBuilder.preTags("<span style='color:red' >");//設置前綴
            //highlightBuilder.postTags("</span>");//設置后綴
            // 設置高亮字段
            highlightBuilder.field(highlightField);
            searchRequestBuilder.highlighter(highlightBuilder);
        }
        searchRequestBuilder.setQuery(QueryBuilders.matchAllQuery());
        searchRequestBuilder.setQuery(boolQuery);
        // 分頁應用
        int num = (currentPage - 1) * pageSize;
        searchRequestBuilder.setFrom(num).setSize(pageSize);
        // 設置是否按查詢匹配度排序
        searchRequestBuilder.setExplain(true);
        //打印的內容 可以在 Elasticsearch head 和 Kibana  上執行查詢
        logger.info("\n{}", searchRequestBuilder);
        // 執行搜索,返回搜索響應信息
        SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
        long totalHits = searchResponse.getHits().totalHits;
        long length = searchResponse.getHits().getHits().length;
        logger.debug("共查詢到[{}]條數據,處理數據條數[{}]", totalHits, length);
        if (searchResponse.status().getStatus() == 200) {
            // 解析對象
            List<Map<String, Object>> sourceList = setSearchResponse(searchResponse, highlightField);

            return new EsPage(currentPage, pageSize, (int) totalHits, sourceList);
        }
        return null;
    }

    /**
     * 高亮結果集 特殊處理
     *
     * @param searchResponse
     * @param highlightField
     */
    public static List<Map<String, Object>> setSearchResponse(SearchResponse searchResponse, String highlightField) {
        List<Map<String, Object>> sourceList = new ArrayList<Map<String, Object>>();
        StringBuffer stringBuffer = new StringBuffer();

        for (SearchHit searchHit : searchResponse.getHits().getHits()) {
            searchHit.getSourceAsMap().put("_id", searchHit.getId());

            if (StringUtils.isNotEmpty(highlightField)) {

                System.out.println("遍歷 高亮結果集,覆蓋 正常結果集" + searchHit.getSourceAsMap());
                Text[] text = searchHit.getHighlightFields().get(highlightField).getFragments();

                if (text != null) {
                    for (Text str : text) {
                        stringBuffer.append(str.string());
                    }
                    //遍歷 高亮結果集,覆蓋 正常結果集
                    searchHit.getSourceAsMap().put(highlightField, stringBuffer.toString());
                }
            }
            sourceList.add(searchHit.getSourceAsMap());
        }

        return sourceList;
    }
    /**
     * 查詢所有數據
     *
     * @param index 索引名稱
     * @param type type 6.0后不推薦使用
     * @param fields 需要顯示的字段
     * @param sortField 需要進行排序的字段
     * @param highlightField 需要高亮的字段
     * @param queryBuilder 查詢條件
     * @return
     */
    public static List<Map<String, Object>> searchAllData(String index, String type, String fields, String sortField, String highlightField, QueryBuilder queryBuilder ) {
        //指定一個index和type
        EsClient esClient=new EsClient();
        SearchRequestBuilder searchRequestBuilder = esClient.getESClient().prepareSearch(index);
        // 高亮(xxx=111,aaa=222)
        if (StringUtils.isNotEmpty(highlightField)) {
            HighlightBuilder highlightBuilder = new HighlightBuilder();
            //設置前綴
            highlightBuilder.preTags("<span style='color:red;font-weight:bold'>");
            //設置后綴
            highlightBuilder.postTags("</span>");
            // 設置高亮字段
            highlightBuilder.field(highlightField);
            searchRequestBuilder.highlighter(highlightBuilder);
        }
        // 需要顯示的字段,逗號分隔(缺省為全部字段)
        if (StringUtils.isNotEmpty(fields)) {
            searchRequestBuilder.setFetchSource(fields.split(","), null);
        }
        searchRequestBuilder.setFetchSource(true);
        if (StringUtils.isNotEmpty(sortField)) {
            searchRequestBuilder.addSort(sortField, SortOrder.ASC);
        }
        //設置每批讀取的數據量
        searchRequestBuilder.setSize(100);
        //查詢條件
        searchRequestBuilder.setQuery(queryBuilder);
        //設置 search context 維護1分鍾的有效期
        searchRequestBuilder.setScroll(TimeValue.timeValueMinutes(1));
        //獲得首次的查詢結果
        SearchResponse scrollResp=searchRequestBuilder.get();
        //打印的內容 可以在 Elasticsearch head 和 Kibana  上執行查詢
        logger.info("\n{}", searchRequestBuilder);
        //打印命中數量
        logger.info("命中總數量:{}", scrollResp.getHits().getTotalHits());
        List<Map<String, Object>> sourceList = new ArrayList<Map<String, Object>>();
        StringBuffer stringBuffer = new StringBuffer();
        do {
            //將scorllId循環傳遞
            scrollResp = EsClient.getTransportClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(TimeValue.timeValueMinutes(1)).execute().actionGet();
            for (SearchHit searchHit : scrollResp.getHits().getHits()) {
                searchHit.getSourceAsMap().put("id", searchHit.getId());
                if (StringUtils.isNotEmpty(highlightField)) {
                    if (!ToolUtil.isEmpty(searchHit.getHighlightFields().get(highlightField))) {
                        Text[] text = searchHit.getHighlightFields().get(highlightField).getFragments();
                        if (text != null) {
                            for (Text str : text) {
                                stringBuffer.append(str.string());
                            }
                            //遍歷 高亮結果集,覆蓋 正常結果集
                            searchHit.getSourceAsMap().put(highlightField, stringBuffer.toString());
                        }
                    }
                }
                sourceList.add(searchHit.getSourceAsMap());
            }
            //當searchHits的數組為空的時候結束循環,至此數據全部讀取完畢
        } while(scrollResp.getHits().getHits().length != 0);
        //刪除scroll
        ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
        clearScrollRequest.addScrollId(scrollResp.getScrollId());
        EsClient.getTransportClient().clearScroll(clearScrollRequest).actionGet();
        return sourceList;
    }
    /**
     * 批量新增數據
     *
     * @param index    索引名稱
     * @param type     索引類型
     * @param dataList 需要新增的數據
     */
    public static void insertBatch(String index, String type, List<Map<String, Object>> dataList,int batchSize) {
        BulkRequestBuilder bulkRequest = EsClient.getTransportClient().prepareBulk();
        for (int i = 0; i < dataList.size(); i++) {
            bulkRequest.add(EsClient.getTransportClient().prepareIndex(index, type, Convert.toStr(dataList.get(i).get("id"))).setSource(dataList.get(i)));
            // 每5000條提交一次
            if ((i + 1) % batchSize == 0) {
                BulkResponse bulkItemResponses = bulkRequest.execute().actionGet();
                bulkRequest = EsClient.getTransportClient().prepareBulk();
                logger.info("已保存: {} 條,執行時間:{} ", batchSize, bulkItemResponses.getTook());
            }
        }
        if(dataList.size()%batchSize !=0){
            BulkResponse bulkItemResponses = bulkRequest.execute().actionGet();
            logger.info("保存: {}條,執行時間:{} ", dataList.size() % batchSize, bulkItemResponses.getTook());
        }
    }
}

有需要的自己直接搬走,不謝

 

之前忘記了EsClient 這個類,好久沒注意,重新補上

@Component
public class EsClient {

    private static final Logger logger = LoggerFactory.getLogger(EsClient.class);

    private static TransportClient transportClient;

    public static TransportClient getTransportClient() {
        return transportClient;
    }

    @Value("${elasticSearchPort}")
    private String elasticSearchPort;
    @Value("${elasticSearchClusterName}")
    private String elasticSearchClusterName;
    @Value("${elasticSearchPoolSize}")
    private String elasticSearchPoolSize;

    @Value("${elasticSearchIps}")
    public void connectionInit(String elasticSearchIps) {
        try {
            //設置netty內存管理非池化
            String currentValue = System.getProperty("io.netty.allocator.type");
            if (currentValue == null) {
                System.setProperty("io.netty.allocator.type","unpooled");
            }
            // 配置信息
            Settings esSetting = Settings.builder().put("cluster.name", elasticSearchClusterName).put("thread_pool" +
                    ".search.size", Integer.parseInt(elasticSearchPoolSize))//增加線程池個數,暫時設為5
                    .build();
            //配置信息Settings自定義,下面設置為EMPTY
            transportClient = new PreBuiltTransportClient(esSetting);
            String[] elasticSearchIpArray = elasticSearchIps.split(",");
            for (String elasticSearchIp : elasticSearchIpArray) {
                TransportAddress transportAddress = new TransportAddress(InetAddress.getByName(elasticSearchIp),
                        Integer.valueOf(elasticSearchPort));
                transportClient.addTransportAddresses(transportAddress);
            }
        }
        catch (Exception e) {
            logger.error("elasticsearch TransportClient create error!!!", e);
        }
    }
}

抱歉抱歉~~~~~~~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM