最近也是搭了一套elk+rabbitmq的日志系統,搬過來大哥編寫的工具類
public class ElasticSearchUtil { private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUtil.class); /** * 創建索引 * * @param index * @return */ public static boolean createIndex(String index) { if (!isIndexExist(index)) { logger.info("Index is not exits!"); } CreateIndexResponse indexresponse = null; try { indexresponse = EsClient.getTransportClient().admin().indices().prepareCreate(index).execute() .actionGet(); } catch (Exception e) { e.printStackTrace(); } logger.info("執行建立成功?" + indexresponse.isAcknowledged()); return indexresponse.isAcknowledged(); } /** * 刪除索引 * * @param index * @return */ public static boolean deleteIndex(String index) { if (!isIndexExist(index)) { logger.info("Index is not exits!"); } DeleteIndexResponse dResponse = EsClient.getTransportClient().admin().indices().prepareDelete(index).execute().actionGet(); if (dResponse.isAcknowledged()) { logger.info("delete index " + index + " successfully!"); } else { logger.info("Fail to delete index " + index); } return dResponse.isAcknowledged(); } /** * 判斷索引是否存在 * * @param index * @return */ public static boolean isIndexExist(String index) { IndicesExistsResponse inExistsResponse = EsClient.getTransportClient().admin().indices().exists(new IndicesExistsRequest (index)).actionGet(); if (inExistsResponse.isExists()) { logger.info("Index [" + index + "] is exist!"); } else { logger.info("Index [" + index + "] is not exist!"); } return inExistsResponse.isExists(); } /** * 數據添加,正定ID * * @param jsonObject 要增加的數據 * @param index 索引,類似數據庫 * @param type 類型,類似表 * @param id 數據ID * @return */ public static String addData(JSONObject jsonObject, String index, String type, String id) { IndexResponse response = EsClient.getTransportClient().prepareIndex(index, type, id).setSource(jsonObject).get(); logger.info("addData response status:{},id:{}", response.status().getStatus(), response.getId()); return response.getId(); } /** * 數據添加 * * @param jsonObject 要增加的數據 * @param index 索引,類似數據庫 * @param type 類型,類似表 * @return */ public static String addData(JSONObject jsonObject, String index, String type) { return addData(jsonObject, index, type, UUID.randomUUID().toString().replaceAll("-", "").toUpperCase()); } /** * 通過ID刪除數據 * * @param index 索引,類似數據庫 * @param type 類型,類似表 * @param id 數據ID */ public static void deleteDataById(String index, String type, String id) { DeleteResponse response = EsClient.getTransportClient().prepareDelete(index, type, id).execute().actionGet(); logger.info("deleteDataById response status:{},id:{}", response.status().getStatus(), response.getId()); } /** * 通過ID 更新數據 * * @param jsonObject 要增加的數據 * @param index 索引,類似數據庫 * @param type 類型,類似表 * @param id 數據ID * @return */ public static void updateDataById(JSONObject jsonObject, String index, String type, String id) { UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index(index).type(type).id(id).doc(jsonObject); EsClient.getTransportClient().update(updateRequest); } /** * 通過ID獲取數據 * * @param index 索引,類似數據庫 * @param type 類型,類似表 * @param id 數據ID * @param fields 需要顯示的字段,逗號分隔(缺省為全部字段) * @return */ public static Map<String, Object> searchDataById(String index, String type, String id, String fields) { GetRequestBuilder getRequestBuilder = EsClient.getTransportClient().prepareGet(index, type, id); if (StringUtils.isNotEmpty(fields)) { getRequestBuilder.setFetchSource(fields.split(","), null); } GetResponse getResponse = getRequestBuilder.execute().actionGet(); return getResponse.getSource(); } /** * 使用分詞查詢不分頁 * * @param index 索引名稱 * @param type 類型名稱,可傳入多個type逗號分隔 * @param startTime 開始時間 * @param endTime 結束時間 * @param size 文檔大小限制 * @param fields 需要顯示的字段,逗號分隔(缺省為全部字段) * @param sortField 排序字段 * @param highlightField 高亮字段 * @param operatorTag 查詢條件最外層的關系(true與,false或) * @param parmStr 過濾條件 * @return */ public static List<Map<String, Object>> searchListData(String index, String type, Integer size,long startTime, long endTime, String fields, String logType, String sortField, String highlightField, boolean operatorTag, String... parmStr) { SearchRequestBuilder searchRequestBuilder = EsClient.getTransportClient().prepareSearch(index); if (StringUtils.isNotEmpty(type)) { searchRequestBuilder.setTypes(type.split(",")); } BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); if (startTime > 0 && endTime > 0) { boolQuery.must(QueryBuilders.rangeQuery("createDate").format("epoch_millis").from(startTime).to(endTime) .includeLower(true).includeUpper(false)); } //設置查詢日志類型 boolQuery.must(QueryBuilders.matchQuery("logType", logType)); // 查詢字段 與關系 for (int i = 0, len = parmStr.length; i < len; i++) { BoolQueryBuilder tempBoolQuery = QueryBuilders.boolQuery(); if (parmStr[i].contains("&&")){ String[] tempStr =parmStr[i].split("&&"); for(int j=0,len1=tempStr.length;j<len1;j++) { String[] ss = tempStr[j].split("="); //分詞查詢 tempBoolQuery.must(QueryBuilders.matchPhraseQuery(ss[0], ss[1])); } if (operatorTag) { //如果為真則最外層是與關系 boolQuery.must(tempBoolQuery); } else { boolQuery.should(tempBoolQuery); } } else { if (parmStr[i].contains(",,")) { String[] tempStr =parmStr[i].split(",,"); for(int j=0,len1=tempStr.length;j<len1;j++) { String[] ss = tempStr[j].split("="); //分詞查詢 tempBoolQuery.should(QueryBuilders.matchPhraseQuery(ss[0], ss[1])); } if (operatorTag) { //如果為真則最外層是與關系 boolQuery.must(tempBoolQuery); } else { boolQuery.should(tempBoolQuery); } } else { if(ToolUtil.isNotEmpty(parmStr[i])) { String[] ss = parmStr[i].split("="); if (operatorTag) { //如果為真則最外層是與關系 boolQuery.must(QueryBuilders.matchPhraseQuery(ss[0], ss[1])); } else { boolQuery.should(QueryBuilders.matchPhraseQuery(ss[0], ss[1])); } } } } } if (StringUtils.isNotEmpty(highlightField)) { HighlightBuilder highlightBuilder = new HighlightBuilder(); // 設置高亮字段 highlightBuilder.field(highlightField); searchRequestBuilder.highlighter(highlightBuilder); } searchRequestBuilder.setQuery(boolQuery); if (StringUtils.isNotEmpty(fields)) { searchRequestBuilder.setFetchSource(fields.split(","), null); } searchRequestBuilder.setFetchSource(true); if (StringUtils.isNotEmpty(sortField)) { searchRequestBuilder.addSort(sortField, SortOrder.DESC); } if (size != null && size > 0) { searchRequestBuilder.setSize(size); } //打印的內容 可以在 Elasticsearch head 和 Kibana 上執行查詢 logger.info("\n{}", searchRequestBuilder); SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); long totalHits = searchResponse.getHits().totalHits; long length = searchResponse.getHits().getHits().length; logger.info("共查詢到[{}]條數據,處理數據條數[{}]", totalHits, length); if (searchResponse.status().getStatus() == 200) { // 解析對象 return setSearchResponse(searchResponse, highlightField); } return null; } /** * 使用分詞查詢,並分頁 * * @param index 索引名稱 * @param type 類型名稱,可傳入多個type逗號分隔 * @param currentPage 當前頁 * @param pageSize 每頁顯示條數 * @param startTime 開始時間 * @param endTime 結束時間 * @param fields 需要顯示的字段,逗號分隔(缺省為全部字段) * @param sortField 排序字段 * @param highlightField 高亮字段 * @param operatorTag 外層邏輯與true 或false * @param parmStr 內層邏輯與&& 或|| * @return */ public static EsPage searchDataPage(String index, String type, int currentPage, int pageSize, long startTime, long endTime, String fields, String logType, String sortField, String highlightField, boolean operatorTag, String... parmStr) { SearchRequestBuilder searchRequestBuilder = EsClient.getTransportClient().prepareSearch(index); if (StringUtils.isNotEmpty(type)) { searchRequestBuilder.setTypes(type.split(",")); } searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH); // 需要顯示的字段,逗號分隔(缺省為全部字段) if (StringUtils.isNotEmpty(fields)) { searchRequestBuilder.setFetchSource(fields.split(","), null); } //排序字段 if (StringUtils.isNotEmpty(sortField)) { searchRequestBuilder.addSort(sortField, SortOrder.DESC); } BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); if (startTime > 0 && endTime > 0) { boolQuery.must(QueryBuilders.rangeQuery("createDate").format("epoch_millis").from(startTime).to(endTime) .includeLower(true).includeUpper(true)); } //設置查詢日志類型 boolQuery.must(QueryBuilders.matchQuery("logType", logType)); // 查詢字段 與關系 for (int i = 0, len = parmStr.length; i < len; i++) { BoolQueryBuilder tempBoolQuery = QueryBuilders.boolQuery(); if (parmStr[i].contains("&&")) { String[] tempStr =parmStr[i].split("&&"); for(int j=0,len1=tempStr.length;j<len1;j++) { String[] ss = tempStr[j].split("="); //分詞查詢 tempBoolQuery.must(QueryBuilders.matchPhraseQuery(ss[0], ss[1])); } if (operatorTag) { //如果為真則最外層是與關系 boolQuery.must(tempBoolQuery); } else { boolQuery.should(tempBoolQuery); } } else { if (parmStr[i].contains(",,")) { String[] tempStr =parmStr[i].split(",,"); for(int j=0,len1=tempStr.length;j<len1;j++) { String[] ss = tempStr[j].split("="); //分詞查詢 tempBoolQuery.should(QueryBuilders.matchPhraseQuery(ss[0], ss[1])); } if (operatorTag) { //如果為真則最外層是與關系 boolQuery.must(tempBoolQuery); } else { boolQuery.should(tempBoolQuery); } } else { if(ToolUtil.isNotEmpty(parmStr[i])) { String[] ss = parmStr[i].split("="); if (operatorTag) { //如果為真則最外層是與關系 boolQuery.must(QueryBuilders.matchPhraseQuery(ss[0], ss[1])); } else { boolQuery.should(QueryBuilders.matchPhraseQuery(ss[0], ss[1])); } } } } } if (StringUtils.isNotEmpty(highlightField)) { HighlightBuilder highlightBuilder = new HighlightBuilder(); //highlightBuilder.preTags("<span style='color:red' >");//設置前綴 //highlightBuilder.postTags("</span>");//設置后綴 // 設置高亮字段 highlightBuilder.field(highlightField); searchRequestBuilder.highlighter(highlightBuilder); } searchRequestBuilder.setQuery(QueryBuilders.matchAllQuery()); searchRequestBuilder.setQuery(boolQuery); // 分頁應用 int num = (currentPage - 1) * pageSize; searchRequestBuilder.setFrom(num).setSize(pageSize); // 設置是否按查詢匹配度排序 searchRequestBuilder.setExplain(true); //打印的內容 可以在 Elasticsearch head 和 Kibana 上執行查詢 logger.info("\n{}", searchRequestBuilder); // 執行搜索,返回搜索響應信息 SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); long totalHits = searchResponse.getHits().totalHits; long length = searchResponse.getHits().getHits().length; logger.debug("共查詢到[{}]條數據,處理數據條數[{}]", totalHits, length); if (searchResponse.status().getStatus() == 200) { // 解析對象 List<Map<String, Object>> sourceList = setSearchResponse(searchResponse, highlightField); return new EsPage(currentPage, pageSize, (int) totalHits, sourceList); } return null; } /** * 高亮結果集 特殊處理 * * @param searchResponse * @param highlightField */ public static List<Map<String, Object>> setSearchResponse(SearchResponse searchResponse, String highlightField) { List<Map<String, Object>> sourceList = new ArrayList<Map<String, Object>>(); StringBuffer stringBuffer = new StringBuffer(); for (SearchHit searchHit : searchResponse.getHits().getHits()) { searchHit.getSourceAsMap().put("_id", searchHit.getId()); if (StringUtils.isNotEmpty(highlightField)) { System.out.println("遍歷 高亮結果集,覆蓋 正常結果集" + searchHit.getSourceAsMap()); Text[] text = searchHit.getHighlightFields().get(highlightField).getFragments(); if (text != null) { for (Text str : text) { stringBuffer.append(str.string()); } //遍歷 高亮結果集,覆蓋 正常結果集 searchHit.getSourceAsMap().put(highlightField, stringBuffer.toString()); } } sourceList.add(searchHit.getSourceAsMap()); } return sourceList; } /** * 查詢所有數據 * * @param index 索引名稱 * @param type type 6.0后不推薦使用 * @param fields 需要顯示的字段 * @param sortField 需要進行排序的字段 * @param highlightField 需要高亮的字段 * @param queryBuilder 查詢條件 * @return */ public static List<Map<String, Object>> searchAllData(String index, String type, String fields, String sortField, String highlightField, QueryBuilder queryBuilder ) { //指定一個index和type EsClient esClient=new EsClient(); SearchRequestBuilder searchRequestBuilder = esClient.getESClient().prepareSearch(index); // 高亮(xxx=111,aaa=222) if (StringUtils.isNotEmpty(highlightField)) { HighlightBuilder highlightBuilder = new HighlightBuilder(); //設置前綴 highlightBuilder.preTags("<span style='color:red;font-weight:bold'>"); //設置后綴 highlightBuilder.postTags("</span>"); // 設置高亮字段 highlightBuilder.field(highlightField); searchRequestBuilder.highlighter(highlightBuilder); } // 需要顯示的字段,逗號分隔(缺省為全部字段) if (StringUtils.isNotEmpty(fields)) { searchRequestBuilder.setFetchSource(fields.split(","), null); } searchRequestBuilder.setFetchSource(true); if (StringUtils.isNotEmpty(sortField)) { searchRequestBuilder.addSort(sortField, SortOrder.ASC); } //設置每批讀取的數據量 searchRequestBuilder.setSize(100); //查詢條件 searchRequestBuilder.setQuery(queryBuilder); //設置 search context 維護1分鍾的有效期 searchRequestBuilder.setScroll(TimeValue.timeValueMinutes(1)); //獲得首次的查詢結果 SearchResponse scrollResp=searchRequestBuilder.get(); //打印的內容 可以在 Elasticsearch head 和 Kibana 上執行查詢 logger.info("\n{}", searchRequestBuilder); //打印命中數量 logger.info("命中總數量:{}", scrollResp.getHits().getTotalHits()); List<Map<String, Object>> sourceList = new ArrayList<Map<String, Object>>(); StringBuffer stringBuffer = new StringBuffer(); do { //將scorllId循環傳遞 scrollResp = EsClient.getTransportClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(TimeValue.timeValueMinutes(1)).execute().actionGet(); for (SearchHit searchHit : scrollResp.getHits().getHits()) { searchHit.getSourceAsMap().put("id", searchHit.getId()); if (StringUtils.isNotEmpty(highlightField)) { if (!ToolUtil.isEmpty(searchHit.getHighlightFields().get(highlightField))) { Text[] text = searchHit.getHighlightFields().get(highlightField).getFragments(); if (text != null) { for (Text str : text) { stringBuffer.append(str.string()); } //遍歷 高亮結果集,覆蓋 正常結果集 searchHit.getSourceAsMap().put(highlightField, stringBuffer.toString()); } } } sourceList.add(searchHit.getSourceAsMap()); } //當searchHits的數組為空的時候結束循環,至此數據全部讀取完畢 } while(scrollResp.getHits().getHits().length != 0); //刪除scroll ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); clearScrollRequest.addScrollId(scrollResp.getScrollId()); EsClient.getTransportClient().clearScroll(clearScrollRequest).actionGet(); return sourceList; } /** * 批量新增數據 * * @param index 索引名稱 * @param type 索引類型 * @param dataList 需要新增的數據 */ public static void insertBatch(String index, String type, List<Map<String, Object>> dataList,int batchSize) { BulkRequestBuilder bulkRequest = EsClient.getTransportClient().prepareBulk(); for (int i = 0; i < dataList.size(); i++) { bulkRequest.add(EsClient.getTransportClient().prepareIndex(index, type, Convert.toStr(dataList.get(i).get("id"))).setSource(dataList.get(i))); // 每5000條提交一次 if ((i + 1) % batchSize == 0) { BulkResponse bulkItemResponses = bulkRequest.execute().actionGet(); bulkRequest = EsClient.getTransportClient().prepareBulk(); logger.info("已保存: {} 條,執行時間:{} ", batchSize, bulkItemResponses.getTook()); } } if(dataList.size()%batchSize !=0){ BulkResponse bulkItemResponses = bulkRequest.execute().actionGet(); logger.info("保存: {}條,執行時間:{} ", dataList.size() % batchSize, bulkItemResponses.getTook()); } } }
有需要的自己直接搬走,不謝
之前忘記了EsClient 這個類,好久沒注意,重新補上
@Component public class EsClient { private static final Logger logger = LoggerFactory.getLogger(EsClient.class); private static TransportClient transportClient; public static TransportClient getTransportClient() { return transportClient; } @Value("${elasticSearchPort}") private String elasticSearchPort; @Value("${elasticSearchClusterName}") private String elasticSearchClusterName; @Value("${elasticSearchPoolSize}") private String elasticSearchPoolSize; @Value("${elasticSearchIps}") public void connectionInit(String elasticSearchIps) { try { //設置netty內存管理非池化 String currentValue = System.getProperty("io.netty.allocator.type"); if (currentValue == null) { System.setProperty("io.netty.allocator.type","unpooled"); } // 配置信息 Settings esSetting = Settings.builder().put("cluster.name", elasticSearchClusterName).put("thread_pool" + ".search.size", Integer.parseInt(elasticSearchPoolSize))//增加線程池個數,暫時設為5 .build(); //配置信息Settings自定義,下面設置為EMPTY transportClient = new PreBuiltTransportClient(esSetting); String[] elasticSearchIpArray = elasticSearchIps.split(","); for (String elasticSearchIp : elasticSearchIpArray) { TransportAddress transportAddress = new TransportAddress(InetAddress.getByName(elasticSearchIp), Integer.valueOf(elasticSearchPort)); transportClient.addTransportAddresses(transportAddress); } } catch (Exception e) { logger.error("elasticsearch TransportClient create error!!!", e); } } }
抱歉抱歉~~~~~~~
