一、環境搭建
參考以下兩個鏈接介紹:
ES集群安裝:https://www.jianshu.com/p/57c3061bb6cb
ES集群 + kibana安裝:https://blog.csdn.net/cxfeugene/article/details/82710504
二、搭建Demo
有以下幾種方式:
(1)使用Java API即使用TransportClient操作Es(目前官方已不推薦使用)
(2)官方給出了基於HTTP的客戶端REST Client(推薦使用),官方給出來的REST Client有Java Low Level REST Client和Java Hight Level REST Client(API官方文檔:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.2/java-rest-high-supported-apis.html)兩個,前者兼容所有版本的ES,后者是基於前者開發出來的,只暴露了部分API,待完善
(3)使用 spring-data-elasticsearch,具體可參考博文:https://blog.csdn.net/jacksonary/article/details/82729556
我采用最后一種,即使用springboot2.2.0 + spring-data-elasticsearch3.2.0組合;搭建springboot2.2.0項目,然后引入spring-data-es即可:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
注意事項:
(1)不要特意去指定版本,如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>
由於版本兼容性不明白,會導致各種兼容問題(缺包,沖突等),所以指定了spring-boot版本之后,其他的使用其默認的(最新版本)即可,如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
(2)spring和elasticsearch有兩種鏈接方式,一種是用TCP協議,默認端口是9300,還有一種用http協議
三、項目實戰
1、熟悉了ES之后,大家都知道,使用ES第一步則是創建一個index(跟ES官網說的那樣,index就好比一個數據庫,但在ES7.x之后,index已經不像一個數據庫了,而更像數據庫中的一張表,因為淡化了type的概念);spring項目中如何創建ES的index呢?
有如下兩種方式:
(1)使用json格式定義mapping以及setting

具體內容:
mapping.json
{ "xxx": { "properties": { "id": { "type": "long" }, "name": { "type": "text" }...
setting.json
{ "index": { "number_of_shards": "2", "number_of_replicas": "0" } }
然后再定義實體類:
@Setter @Getter //ES的三個注解 //指定index索引名稱為項目名 指定type類型名稱為實體名
@Document(indexName = "xxx", type = "xxx") //相當於ES中的mapping 注意對比文件中的json和原生json 最外層的key是沒有的
@Mapping(mappingPath = "/mapping.json") //相當於ES中的settings 注意對比文件中的json和原生json 最外層的key是沒有的
@Setting(settingPath = "/setting.json") public class Builder { //id
@Id private Long id; ...
}
(2)不使用json文件,直接在實體類定義
@Data @Document(indexName = "xxx",type = "xxx",replicas = 0, shards = 1) // 這里缺省type會默認為實體類名 public class xxx{ @Id private String aid; @Field(type = FieldType.Text,fielddata = true) private String name; ... }
在這里需要清楚這幾個mapping(@Field 內的參數)參數:
- fielddata:text類型不支持doc_values屬性,因此無法對text類型進行聚合、排序、腳本取值等操作,可以使用fielddata屬性設置,設置其為true即可
- index:es默認將每個字段進行倒排索引的構建,這樣會耗費空間,所以在不需要索引的字段務必設置index=false
- format:用於對日期格式的數據進行格式化
- ignore_above:不會對超過指定長度的字符串構建索引以及store,通常來講,是對keyword類型使用,而不能對text字段使用
- fields:一個 string 類型字段可以被映射成 text 字段作為 full-text 進行搜索,同時也可以作為 keyword 字段用於排序和聚合
{
"mappings": {
"my_type": {
"properties": {
"city": {
"type": "text",
"fields": {
"raw": {
"type": "keyword"
}
}
}
}
}
}
}
- norms:norms用於計算相關性得分,但會消耗較多的磁盤空間。如果不需要對某個字段進行評分,最好不要開啟norms
2、當一切就緒之后,先插入數據
(1)首先定義dao層,如下:
public interface XxxRepository extends ElasticsearchRepository<xxx, String> { xxx findByAid(String aid); List<xxx> findByAidIn(List<String> aids); }
繼承ElasticsearchRepository類,里面有基本的CURD方法,基本夠用。
當然上面這種方法有局限性,因為其只有一些比較基本常用的操作,如果需要比較復雜的操作,怎么辦?那就是獲取原生的 ElasticsearchTemplate,因為上面那種方式其實也是使用的這個東西,只是幫你封裝好了一些方法,當我們發現上面那種方式
不夠用時就使用第二種:
@Autowired protected ElasticsearchTemplate elasticsearchTemplate;
只需注入即可,以下是使用該方法實現upSet(有記錄時就更新該記錄,無記錄時就插入)方法:
/** * @author liuzj */ @Component public class EsTemplateRepository<T> { @Autowired protected ElasticsearchTemplate elasticsearchTemplate; /** * 更新/插入 * * @param list 對象集合 * @return 更新/插入數量 * @throws Exception 異常 */
public int upSert(List<T> list) throws Exception { if (CollectionUtils.isEmpty(list)) { return 0; } // 驗證對象是否有唯一標識
T entity = list.get(0); Field id = null; for (Field field : entity.getClass().getDeclaredFields()) { Id businessID = field.getAnnotation(Id.class); if (businessID != null) { id = field; break; } } if (id == null) { throw new Exception("Can't find @Id on " + entity.getClass().getName()); } Document document = ReflectUtil.getDocument(entity.getClass()); List<UpdateQuery> updateQueries = new ArrayList<>(); for (T obj : list) { UpdateQuery updateQuery = new UpdateQuery(); updateQuery.setIndexName(document.indexName()); updateQuery.setType(document.type()); updateQuery.setId(ReflectUtil.getFieldValue(id, obj).toString()); // 插入
IndexRequest indexRequest = new IndexRequest(updateQuery.getIndexName(), updateQuery.getType(), updateQuery.getId()) .source(ReflectUtil.Obj2Map(obj, true)); // 更新
UpdateRequest updateRequest = new UpdateRequest(updateQuery.getIndexName(), updateQuery.getType(), updateQuery.getId()) .doc(ReflectUtil.Obj2Map(obj, false)) .upsert(indexRequest); updateQuery.setUpdateRequest(updateRequest); updateQuery.setClazz(obj.getClass()); updateQueries.add(updateQuery); } if (!CollectionUtils.isEmpty(updateQueries)) { elasticsearchTemplate.bulkUpdate(updateQueries); } return list.size(); } /** * 單個更新/插入 * * @param obj 數據 * @return int * @throws Exception 異常 */
public int upSert(T obj) throws Exception { List<T> objs = Lists.newArrayList(); objs.add(obj); return upSert(objs); } }
當然,如果上面那種方式還是無法滿足你的需求,那么你還可以使用更原始的方式,ElasticsearchTemplate 類提供了getClient()方法,直接獲取ES client,滿足你使用原生Api
3、數據插入基本搞定,現在了解一下數據查詢
使用查詢難免遇到它:QueryBuilders,顧名思義,它是一個查詢的構造者,它能構造出各種查詢,具體可以看其源碼

現在看一下一些常見查詢
(1)fuzzyQuery
功能:模糊匹配
原理:fuzzy搜索技術。搜索的時候,可能輸入的搜索文本會出現誤拼寫的情況,自動將拼寫錯誤的搜索文本,進行糾正,糾正以后去嘗試匹配索引中的數據,糾正在一定的范圍內如果差別大無法搜索出來
總體代碼邏輯:
// 構造一個多條件查詢
BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery(); // 構造子條件查詢
FuzzyQueryBuilder fuzzyQuery = QueryBuilders.fuzzyQuery("name","xxx"); boolBuilder.filter(matchQuery); xxxRepository.search(boolBuilder);
類似於es代碼:
GET /my_index/my_type/_search
{
"query": {
"fuzzy": {
"text": {
"value": "surprize",
"fuzziness": 2
}
}
}
}
// fuzziness 即為最多糾正兩個字母然后去匹配,默認為 auto(2)
(2)matchQuery
功能:模糊匹配
// 構造一個多條件查詢
BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery(); // 構造子條件查詢
FuzzyQueryBuilder fuzzyQuery = QueryBuilders.matchQuery("name","xxx"); boolBuilder.filter(matchQuery); xxxRepository.search(boolBuilder);
類似於es代碼:
GET my_index/my_type/_search
{
"query": {
"match": {
"xxx": "Quick Foxes!"
}
}
}
(3)termQuery
功能:精確匹配
java代碼方式同上
es代碼:
GET bigdata-archive/_search
{
"query": {
"term" : {
"cid" : {
"value" : "5137376667422s31000000"
}
}
}
}
(4)rangeQuery
功能:范圍查詢
java代碼方式同上
es代碼:
GET bigdata-archive/_search
{
"query": {
"range" : {
"personFileCreateTime" : {
"from" : 1572331788000,
"to" : 1572331789000,
"include_lower" : true,
"include_upper" : true
}
}
}
}
(5)existsQuery
功能:是否存在查詢,即是否為null
java代碼方式同上
es代碼:
GET bigdata-archive/_search
{
"query": {
"exists" : {
"field" : "cid",
"boost" : 1.0
}
}
}
(6)聚合查詢
功能:查詢統計
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); // 查詢
queryBuilder.must(QueryBuilders.rangeQuery("age") .gte(startAge) .lte(endAge)); // 聚合
AggregationBuilder maxAggregator = AggregationBuilders.max("bathDate").field("time"); TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("group_by_age").field("age") .subAggregation(maxAggregator); SearchQuery build = new NativeSearchQueryBuilder() .withQuery(queryBuilder) .addAggregation(termsAggregationBuilder) .build(); AggregatedPage<XXX> testEntities = elasticsearchTemplate.queryForPage(build, XXX.class); // 取出聚合結果
Aggregations entitiesAggregations = testEntities.getAggregations(); Terms terms = entitiesAggregations.get("group_by_age");
(7)分頁查詢,一般的業務分頁都會采用from to 但是這個在ES里面是越往后查詢消耗越大,因為數據分片,每次查詢一頁數據都要從每個分片去除一頁,頁數越多就相當耗內存了,於是如果要往后翻很多頁,也就是所謂深分頁,官方建議使用ScrollSearch,采用游標方式記錄上一次你查到哪里了,然后再基於上一次查詢的地方往下查 ↓
Client client = elasticsearchTemplate.getClient(); SearchResponse scrollResp = client.prepareSearch("index 名") .addSort("排序字段名", SortOrder.DESC) .setScroll(new TimeValue(60000)) .addDocValueField("需要取出的字段") .setSize(personPropertiest.getArchiveImageCountAndRecentSnapTimeUpdateBatch()).get(); do { SearchHit[] hits = scrollResp.getHits().getHits(); if (hits != null && hits.length > 0) { // 業務邏輯 } scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet(); } while(scrollResp.getHits().getHits().length != 0);
(8)桶過濾分頁
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); // 查詢 queryBuilder.must(QueryBuilders.termQuery("xx",xxx)); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); queryBuilder.must(QueryBuilders.rangeQuery("time") .gte(dateFormat.parse(startTime).getTime()) .lte(dateFormat.parse(endTime).getTime())); // 聚合 MinAggregationBuilder minAggregationBuilder = AggregationBuilders.min("startTime").field("time"); MaxAggregationBuilder maxAggregationBuilder = AggregationBuilders.max("endTime").field("time"); Map<String,String> bucketMap = Maps.newHashMap(); bucketMap.put("count","_count"); TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("group_by_source_id").field("sourceId").order(BucketOrder.key(false)).size(Integer.MAX_VALUE) .subAggregation(minAggregationBuilder) .subAggregation(maxAggregationBuilder); // 桶過濾 if (activityRoutineType == 0) { termsAggregationBuilder.subAggregation(PipelineAggregatorBuilders.bucketSelector("terms_count",bucketMap,new Script("params.count >= 3"))); }else { termsAggregationBuilder.subAggregation(PipelineAggregatorBuilders.bucketSelector("terms_count",bucketMap,new Script("params.count < 3"))); } // 桶分頁 List<FieldSortBuilder> sorts = Lists.newArrayList(); sorts.add(SortBuilders.fieldSort("_count").order(SortOrder.DESC)); termsAggregationBuilder.subAggregation(PipelineAggregatorBuilders.bucketSort("count_sort",sorts).from((page - 1) * perpage).size(perpage)); SearchQuery build = new NativeSearchQueryBuilder() .withQuery(queryBuilder) .addAggregation(termsAggregationBuilder) .build(); // 執行查詢 AggregatedPage<XXX> testEntities = elasticsearchTemplate.queryForPage(build, XXX.class); // 取出聚合結果 Aggregations entitiesAggregations = testEntities.getAggregations(); Terms terms = entitiesAggregations.get("group_by_source_id"); if (terms == null) { return new Page<>(); } Page<XXX> result = new Page<>(); for (Terms.Bucket bucket : terms.getBuckets()) { // TODO } return result;
以上代碼相當於ES腳本:
GET bigdata_event/_search
{
"query": {
"bool": {
"must": [{
"term": {
"xx": {
"value": "4622090581533787699",
"boost": 1.0
}
}
},
{
"range": {
"time": {
"from": 1548395063000,
"to": 1577256244000,
"include_lower": true,
"include_upper": true,
"boost": 1.0
}
}
}
],
"adjust_pure_negative": true,
"boost": 1.0
}
},
"aggs": {
"group_by_source_id": {
"terms": {
"field": "sourceId",
"size": 2147483647,
"min_doc_count": 1,
"shard_min_doc_count": 0,
"show_term_doc_count_error": false,
"order": {
"_key": "desc"
}
},
"aggregations": {
"startTime": {
"min": {
"field": "time"
}
},
"endTime": {
"max": {
"field": "time"
}
},
"terms_count": {
"bucket_selector": {
"buckets_path": {
"count": "_count"
},
"script": {
"source": "params.count >= 2",
"lang": "painless"
},
"gap_policy": "skip"
}
},
"count_sort": {
"bucket_sort": {
"sort": [{
"_count": {
"order": "desc"
}
}],
"from": 0,
"size": 20,
"gap_policy": "SKIP"
}
}
}
}
}
}
參考於官方API:https://www.elastic.co/guide/en/elasticsearch/client/java-api/6.5/java-search-scrolling.html
(9)ES 游標查詢
Client client = elasticsearchTemplate.getClient(); SearchResponse scrollResp = client.prepareSearch("index_name") .addSort("xxx", SortOrder.DESC) .setScroll(new TimeValue(60000)) .addDocValueField("xxx") // 需要查詢出來的字段 .setSize(10000).get();// 每次查詢出來的數據量 do { SearchHit[] hits = scrollResp.getHits().getHits(); if (hits != null && hits.length > 0) { List<XXX> xxx = esEventService.findSnapTimeAndImageCount(hits[hits.length - 1].field("aid").getValue().toString(),hits[0].field("xxx").getValue().toString()); if (!CollectionUtils.isEmpty(xxx)) { // TODO } } scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet(); } while(scrollResp.getHits().getHits().length != 0);
4、使用過程遇到的坑
(1)插入問題
當有重復數據插入時,ES的插入是采用覆蓋的方式,如何讓他不覆蓋某些字段呢?
當然你可以讓不需要覆蓋的字段不賦任何值,而且還不能為null,因為null其實也是分配了空間的,
其轉為json仍然按有:xxx=null,所以此時仍然會覆蓋,而且會被置為null,所以你想使用此方法必須
創建另一個對象,不需要覆蓋的字段就不能還有此字段,比如使用Map,但是ElasticsearchRepository
的save方法並不支持你傳map,因為ElasticsearchRepository是用泛型限制了,而且即使你指定Map泛型
但是也沒法指定index等信息,所以在這樣的窘境下,采用了ElasticsearchTemplate的update方法,而且
其支持upSert
(2)搭建集群時報錯:max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
解決方案:
切換到root用戶
執行命令:
sysctl -w vm.max_map_count=262144
查看結果:
sysctl -a|grep vm.max_map_count
顯示:
vm.max_map_count = 262144
上述方法修改之后,如果重啟虛擬機將失效,所以:
解決辦法:
在 /etc/sysctl.conf文件最后添加一行
vm.max_map_count=262144
即可永久修改
(3)報錯2:"discovery.zen.minimum_master_nodes" is too low
解決方案:
(4)報錯3:org.elasticsearch.index.mapper.MapperParsingException: No type specified for field [feature_info]
解決方案:
@Data
@Document(indexName = "bigdata-event",type = "event",replicas = 0, shards = 1)
public class Event {
@Field(index = false,type = FieldType.Text)
private String feature_info;
...
注意如上代碼,注意一:如果在字段上面加了@Field注解就務必加上type,否則就容易報如上的錯;注意二:在class上的@Document注解上務必加上type否則就容易導致索引構建失敗
(5)注意事項:discovery.zen.minimum_master_nodes參數設置是為了防止腦裂問題,一般設置為N/2 + 1 設置不當會報錯
(6)報錯4:Mapping definition for [dt] has unsupported parameters: [fielddata : true]
原因:fielddata = true 支持 text類型,不支持其他類型
(7)報錯5:ElasticsearchException$1: Fielddata is disabled on text fields by default. Set fielddata=true on [aid] in order to load fielddata in memory by uninverting the inverted index. Note that this can however use significant memory. Alternatively use a keyword field instead.
原因:默認情況下text類型的數據,fielddata = false 的,所以在使用該text字段進行聚合的時候就會報這個錯,錯誤中也給我們提出了兩個解決方案:第一就是修改該字段的fielddata的值;第二就是將該字段的類型修改為keyword,但是es是不支持修改已存在的mapping的,所以需要重新創建一個index,然后將數據遷移至新的index
5、ES 集成 ik 插件
(1)下載ik插件zip包;注意下載ES版本相對應的ik包,否則報錯,例如ES6.5.3就應該下載下面那個zip包

(2)將下載的zip包,解壓至ES_HOME/plugins/ik下面,如果沒有這個目錄則手動創建

(3)重啟ES
(4)檢查是否成功

可以觀察到分詞效果

6、學習參考鏈接
(1)ES基礎參考:https://blog.csdn.net/define_us/article/details/81909374
(2)ES腦裂問題詳解:https://blog.csdn.net/kakaluoteyy/article/details/81068387
(3)ES寫入速度優化:https://www.easyice.cn/archives/207
(4)ES java查詢參考:https://blog.csdn.net/weixin_43310252/article/details/83752485 ;
以及官方JAVA api:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index.html
(5)mapping 屬性解析參考:https://www.jianshu.com/p/8cef58be90ff
(6)IK集成參看:https://blog.csdn.net/q15102780705/article/details/101872729
