方式一:使用TransportClient方式:
public ESConfiguration() { if(EnvUtils.isOnlineEnv()) { hostName = "xxxxx1"; hostName2 = "xxxx2"; hostName3 = "xxxx3"; port = "9300"; clusterName = "yyyy"; }else { hostName = "vvvvv1"; hostName2 = "vvvv2"; hostName3 = "vvvv3"; port = "9300"; clusterName = "zzzz"; } createTransportClient(); } public void createTransportClient() { try { // 配置信息 -- 配置 集群的名字 + 連接池的個數 Settings esSetting = Settings.builder().put("cluster.name", clusterName) //設置連接的集群名稱 .put("client.transport.sniff", false) //增加嗅探機制,找到ES集群 .put("thread_pool.search.size", Integer.parseInt(poolSize)) // 增加線程池個數,暫時設為5 .build(); client = new PreBuiltTransportClient(esSetting); //配置host 和 端口port InetSocketTransportAddress inetSocketTransportAddress = new InetSocketTransportAddress(InetAddress.getByName(hostName), Integer.valueOf(port)); InetSocketTransportAddress inetSocketTransportAddress2 = new InetSocketTransportAddress(InetAddress.getByName(hostName2), Integer.valueOf(port)); InetSocketTransportAddress inetSocketTransportAddress3 = new InetSocketTransportAddress(InetAddress.getByName(hostName3), Integer.valueOf(port)); client.addTransportAddresses(inetSocketTransportAddress).addTransportAddresses(inetSocketTransportAddress2).addTransportAddresses(inetSocketTransportAddress3); } catch (Exception e) { logger.error("elasticsearch TransportClient create error!!!", e); } } public TransportClient getInstance() { return client; }
方式二:使用 RestHighLevelClient + http 方式
/** * es集群地址 */ private String servers = "xxxx1,xxxx2,xxxx3"; /** * 端口 */ private int port = 9301; private int size = 3; private String scheme = "http"; private RestHighLevelClient restHighLevelClient; @PostConstruct public void init() { logger.info("init Es Client..."); RestClientBuilder builder = getRestClientBuilder(); restHighLevelClient = new RestHighLevelClient(builder); logger.info("init Es Client complete..."); } public RestClientBuilder getRestClientBuilder() { String[] address = StringUtils.split(servers, ","); if (ArrayUtils.isNotEmpty(address) && address.length == size) { return RestClient.builder(new HttpHost(address[0], port, scheme), new HttpHost(address[1], port, scheme), new HttpHost(address[2], port, scheme)); } return null; } public RestHighLevelClient getInstance() { if (restHighLevelClient == null) { init(); } return restHighLevelClient; }
使用highlevelClient,使用bulk方式插入數據:
public String executeBulkDocInsert(List<Map> jsonList) { try { BulkRequest request = new BulkRequest(); for(Map crashInfo : jsonList) { IndexRequest indexRequest = new IndexRequest("crash_bulk_index_2020-01-01", "crash", "11").source(crashInfo); //UpdateRequest updateRequest = new UpdateRequest("twitter", "_doc", "11").doc(new IndexRequest("crash_bulk_index_2020-01-02", "_type", "11").source(jsonStr)); request.add(indexRequest); //request.add(updateRequest); } request.timeout(TimeValue.timeValueMinutes(2)); request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); BulkResponse bulkResponse = restHighLevelClient.bulk(request, RequestOptions.DEFAULT); for (BulkItemResponse bulkItemResponse : bulkResponse) { if (bulkItemResponse.getFailure() != null) { BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); System.out.println(failure.getCause()); if(failure.getStatus() == RestStatus.BAD_REQUEST) { System.out.println("id=" + bulkItemResponse.getId() + "為非法的請求!"); continue; } } DocWriteResponse itemResponse = bulkItemResponse.getResponse(); if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { if(bulkItemResponse.getFailure() != null && bulkItemResponse.getFailure().getStatus() == RestStatus.CONFLICT) { System.out.println("id=" + bulkItemResponse.getId() + "與現在文檔沖突"); continue; } IndexResponse indexResponse = (IndexResponse) itemResponse; System.out.println("id=" + indexResponse.getId() + "的文檔創建成功"); System.out.println("id=" + indexResponse.getId() + "文檔操作類型:" + itemResponse.getResult()); } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { UpdateResponse updateResponse = (UpdateResponse) itemResponse; System.out.println("id=" + updateResponse.getId() + "的文檔更新成功"); System.out.println("id=" + updateResponse.getId() +"文檔內容為:" + updateResponse.getGetResult().sourceAsString()); } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { DeleteResponse deleteResponse = (DeleteResponse) itemResponse; if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { System.out.println("id=" + deleteResponse.getId() + "的文檔未找到,未執行刪除!"); }else { System.out.println("id=" + deleteResponse.getId() + "的文檔刪除成功"); } } } } catch (Exception e) { e.printStackTrace(); return "bulk insert into index failed"; } finally { } return null; }
************************************************************ matchQuery,termQuery, multiMatchQuery區別與實現 ************************************************************
區別1:matchPhraseQuery和matchQuery等的區別,在使用matchQuery等時,在執行查詢時,搜索的詞會被分詞器分詞,而使用matchPhraseQuery時,
不會被分詞器分詞,而是直接以一個短語的形式查詢,而如果你在創建索引所使用的field的value中沒有這么一個短語(順序無差,且連接在一起),那么將查詢不出任何結果。
區別2:
matchQuery:會將搜索詞分詞,再與目標查詢字段進行匹配,若分詞中的任意一個詞與目標字段匹配上,則可查詢到。
termQuery:不會對搜索詞進行分詞處理,而是作為一個整體與目標字段進行匹配,若完全匹配,則可查詢到。
matchQuery多條件查詢模板:
public BootstrapTablePaginationVo<String> searchMsgByParam(BasicCrashInfoSearchParam param) throws Exception { /**處理和檢查入參**/ String index = param.getIndex(); String type = param.getType(); String filed = param.getField(); String keyWord = param.getKeyWord(); if(index == null || filed == null || keyWord == null) { LOG.info("index、field、keyword 存在數據為null,無法正常查詢!"); return null; } /**查詢前檢查索引和client客戶端**/ if(client == null) { LOG.info("client為null,初始化異常,無法正常查詢!"); return null; } // 校驗索引是否成功 if (!isIndexExist(index)) { return null; } //todo 處理查詢過程 BootstrapTablePaginationVo<String> vo = new BootstrapTablePaginationVo<String>(); // 響應信息 List<String> responseStrList = new ArrayList<String>(); MatchQueryBuilder matchQueryBuilder = QueryBuilders.matchQuery(filed, keyWord); SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource(); sourceBuilder.query(matchQueryBuilder); // 去重的字段 if (param.getDistictField() != null) { // 去重的信息 CollapseBuilder cb = new CollapseBuilder(param.getDistictField()); sourceBuilder.collapse(cb); } CardinalityAggregationBuilder acb = AggregationBuilders.cardinality("count_id").field(param.getDistictField()); sourceBuilder.aggregation(acb).from(param.getOffset()).size(param.getLimit()); SearchRequest searchRequest = new SearchRequest(index).source(sourceBuilder); if(StringUtils.isNotBlank(type)){ searchRequest.types(type); } // 列表參數 SearchResponse response = new SearchResponse(); response = client.search(searchRequest, RequestOptions.DEFAULT); SearchHits shList = response.getHits(); for (SearchHit searchHit : shList) { responseStrList.add(searchHit.getSourceAsString()); } vo.setRows(responseStrList); // 統計模塊 NumericMetricsAggregation.SingleValue responseAgg = response.getAggregations().get("count_id"); int count = 0; if (responseAgg != null) { double value = responseAgg.value(); count = getInt(value); } vo.setTotal(count); return vo; }
termQuery多條件查詢模板:
private Map<Long, Long> getExternalTagCountByRiskType(String id, long startTime, long endTime, List<Long> tagIds, UserStatEnum field){ //構建查詢條件 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); boolQueryBuilder.must(termQuery("id", StringUtils.lowerCase(id))); boolQueryBuilder.must(rangeQuery("time").gte(startTime).lte(endTime)); boolQueryBuilder.must(termsQuery("type", tagIds)); //不需要返回內容 sourceBuilder.size(0); //構建聚合條件 AggregationBuilder dateAggBuilder = AggregationBuilders.terms(groupByExternalTag) .field("type").order(Terms.Order.count(false)).size(1000) .minDocCount(0); String date = LocalDate.fromDateFields(new Date(startTime)).toString(); Map<Long, Long> result = Maps.newHashMap(); //一天以內精確統計 if(endTime - startTime <= DAY){ sourceBuilder.query(boolQueryBuilder); sourceBuilder.aggregation(dateAggBuilder); UserStatEnum intervalEnum = UserStatEnum.DAILY; SearchResponse response = esClientService.getAbnormalUserSearchResponse(sourceBuilder, field, intervalEnum, date, appId); Terms agg = response.getAggregations().get(groupByExternalTag); for (Terms.Bucket entry : agg.getBuckets()) { result.put((long)entry.getKey(), entry.getDocCount()); } } else { AggregationBuilder cardinalityAggBuilder = AggregationBuilders.cardinality("total") .field(field.getDesc() + ".keyword").precisionThreshold(10000); dateAggBuilder.subAggregation(cardinalityAggBuilder); sourceBuilder.query(boolQueryBuilder); sourceBuilder.aggregation(dateAggBuilder); UserStatEnum intervalEnum = UserStatEnum.DAILY; SearchResponse response = esClientService.getAbnormalUserSearchResponse(sourceBuilder, field, intervalEnum, date, appId); Terms agg = response.getAggregations().get(groupByExternalTag); for (Terms.Bucket entry : agg.getBuckets()) { Cardinality cardinality = entry.getAggregations().get("total"); result.put((long)entry.getKey(), cardinality.getValue()); } } return result; }
matchPhraseQuery多條件查詢模板:
/** * 用戶添加索引數據文檔 --- 多條件查詢 * @param param 查詢參數入口 * @return * @throws Exception */ public BootstrapTablePaginationVo<String> searchMsgByMultiParam(BasicCrashInfoSearchParam param) throws Exception { // 響應信息 List<String> responseStrList = new ArrayList<String>(); /**處理和檢查入參**/ String index = param.getIndex(); //index String type = param.getType(); //type HashMap<String, String> map = param.getMultkeyWord(); //精確條件 map String startTime = param.getStartTime(); //起始時間范圍查詢 String endTime = param.getEndTime(); //終止時間 String sortWord = param.getSortWord(); //排序關鍵字 if(index == null || map == null) { LOG.info("index、map 存在數據為null,無法正常查詢!"); return null; } /**查詢前檢查索引和client客戶端**/ if(client == null) { LOG.info("client為null,初始化異常,無法正常查詢!"); return null; } // 校驗別名索引是否成功 if (!isIndexExist(index)) { return null; } /**處理查詢過程,先匹配精確條件,然后匹配時間范圍,最后匹配排序**/ BootstrapTablePaginationVo<String> vo = new BootstrapTablePaginationVo<String>(); //精確條件遍歷,分別添加,must表示and BoolQueryBuilder qb = QueryBuilders.boolQuery(); for(Map.Entry<String, String> entry : map.entrySet()) { String filed = entry.getKey(); String keyWord = entry.getValue(); MatchPhraseQueryBuilder mpq = QueryBuilders.matchPhraseQuery(filed,keyWord); qb.must(mpq); //must表示and should表示or } //時間范圍檢索條件,時間范圍的設定 if(startTime != null && endTime != null) { RangeQueryBuilder rangequerybuilder = QueryBuilders.rangeQuery("xxxxxx").from(startTime).to(endTime); qb.must(rangequerybuilder); } //查詢建立,index type SearchRequest searchRequest = new SearchRequest(index); if(StringUtils.isNotBlank(type)) { searchRequest.types(type); } SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource(); //聚合分析參數 CardinalityAggregationBuilder acb = null; if(param.getDistictField() != null) { acb = AggregationBuilders.cardinality("count_id").field(param.getDistictField()).precisionThreshold(10000); } SearchResponse response = null; //按照關鍵字排序 if(sortWord == null) { if(param.getDistictField() != null) { sourceBuilder.query(qb).aggregation(acb).from(param.getOffset()).size(param.getLimit()).explain(true); }else { sourceBuilder.query(qb).from(param.getOffset()).size(param.getLimit()).explain(true); } }else { if(param.getDistictField() != null) { sourceBuilder.query(qb).aggregation(acb).from(param.getOffset()).size(param.getLimit()) //.addSort(sortWord, SortOrder.ASC) .sort(sortWord, SortOrder.DESC) .explain(true); }else { sourceBuilder.query(qb).from(param.getOffset()).size(param.getLimit()) //.addSort(sortWord, SortOrder.ASC) .sort(sortWord, SortOrder.DESC) .explain(true); } } response = client.search(searchRequest.source(sourceBuilder), RequestOptions.DEFAULT); SearchHits shList = response.getHits(); // 列表參數 for (SearchHit searchHit : shList) { responseStrList.add(searchHit.getSourceAsString()); } vo.setRows(responseStrList); // 統計模塊 if(param.getDistictField() != null) { NumericMetricsAggregation.SingleValue responseAgg = response.getAggregations().get("count_id"); //聚合分析 int count = 0; if (responseAgg != null) { double value = responseAgg.value(); count = getInt(value); } vo.setTotal(count); } return vo; }
GET查詢,加.keyword與不加.keyword的區別是什么,為什么沒有結果:
1.ES5.0及以后的版本取消了string
類型,將原先的string
類型拆分為text
和keyword
兩種類型。它們的區別在於text
會對字段進行分詞處理而keyword
則不會。
2.當你沒有以IndexTemplate等形式為你的索引字段預先指定mapping的話,ES就會使用Dynamic Mapping,通過推斷你傳入的文檔中字段的值對字段進行動態映射。例如傳入的文檔中字段price的值為12,那么price將被映射為long
類型;字段addr的值為"192.168.0.1",那么addr將被映射為ip
類型。然而對於不滿足ip和date格式的普通字符串來說,情況有些不同:ES會將它們映射為text類型,但為了保留對這些字段做精確查詢以及聚合的能力,又同時對它們做了keyword類型的映射,作為該字段的fields屬性寫到_mapping中。例如,當ES遇到一個新的字段"foobar": "some string"時,會對它做如下的Dynamic Mapping:
{ "foobar": { "type" "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } }
在之后的查詢中使用foobar是將foobar作為text類型查詢,而使用foobar.keyword則是將foobar作為keyword類型查詢。前者會對查詢內容做分詞處理之后再匹配,而后者則是直接對查詢結果做精確匹配。
3.ES的term query做的是精確匹配而不是分詞查詢,因此對text類型的字段做term查詢將是查不到結果的(除非字段本身經過分詞器處理后不變,未被轉換或分詞)。此時,必須使用foobar.keyword來對foobar字段以keyword類型進行精確匹配。