ES是啥?
ES就是一個開源的搜索引擎 也是一個分布式文檔數據庫
可以在極短的時間內存儲、搜索和分析大量的數據。
ES基本屬性:
字段
ES中,每個文檔,其實是以json形式存儲的。而一個文檔可以被視為多個字段的集合。
映射
每個類型中字段的定義稱為映射。例如,name字段映射為String。
索引
索引是映射類型的容器。 一個ES的索引非常像關系型世界中的數據庫,是獨立的大量文檔集合
ES各屬性對應關系數據庫
關系數據庫 -> 表名 -> 表結構 -> 一條記錄 -> 一個字段
ES -> 索引index - 類型type(1-n) -> 映射apping -> 文檔document -> 字段field
ES索引簡單原理: 采用倒排索引
Term(單詞):一段文本經過分析器分析以后就會輸出一串單詞,這一個一個的就叫做Term
Term Dictionary(單詞字典):顧名思義,它里面維護的是Term,可以理解為Term的集合
Term Index(單詞索引):為了更快的找到某個單詞,我們為單詞建立索引
Posting List(倒排列表):以前是根據ID查內容,倒排索引之后是根據內容查ID,然后再拿着ID去查詢出來真正需要的東西。
如果類比現代漢語詞典的話,那么Term就相當於詞語,Term Dictionary相當於漢語詞典本身,Term Index相當於詞典的目錄索引)
通過 單詞索引 找到單詞在單詞字典中的位置,通過單詞字典進而找到Posting List倒排列表,有了倒排列表就可以根據ID找到文檔.
(本質:通過單詞找到對應的倒排列表,根據倒排列表中的倒排項進而可以找到文檔記錄)
查詢結果分析:
took:本次操作花費的時間,單位為毫秒。
timed_out:請求是否超時
_shards:說明本次操作共搜索了哪些分片
hits:搜索命中的記錄
hits.total : 符合條件的文檔總數 hits.hits :匹配度較高的前N個文檔
hits.max_score:文檔匹配得分,這里為最高分
_score:每個文檔都有一個匹配度得分,按照降序排列。
_source:顯示了文檔的原始內容。
ES聚合
桶在概念上類似於 SQL 的分組(GROUP BY),而指標則類似於 COUNT() 、 SUM() 、 MAX() 等統計方法
ES分布式架構原理
shard就是ES索引存儲具體數據的地方,一個索引對應多個shard
多個shard存儲在不同的機器上
每個shard只放索引的一部分數據
每個shard的副本replic放在其他機器上(shard的主體primary和副本replic分開存) 保證了一定程度的高可用
ES寫入數據原理:
從客戶端寫入到shard的全過程:
- 客戶端隨機找一個ES集群節點當作協調節點,寫數據
- 協調節點將數據 根據doc id的哈希路由,寫入分配的主shard並同步到從shard
- shard將數據寫入內存buffer
- 內存buffer每一秒鍾refresh一次將數據刷進OScache緩存,一份sagementfile,一份translog日志 (translog日志的作用 有點類似於redis的RDB文件,用於ES宕機恢復數據)
- OScache緩存每5秒中刷入磁盤的translog日志文件
- 每30分鍾執行一次flush操作,執行一次commit,強制將內存buffer和OScache數據刷入新創建一個sagmentfile文件(磁盤) 落地到磁盤,多個sagementfile會有merge操作。
數據搜索主要是從OScache中拿的,所以剛寫入shard的數據要一秒后才能讀到。
寫入一條document數據時會產生一個doc id,查的時候根據doc id進行哈希,路由到對應的shard (doc id默認隨機分配,也可以手動指定,例如訂單id)
ES刪除數據原理:
1.把被刪除數據寫入.del文件(磁盤),被.del文件標識的被認為已刪除
2.當sagmentfile文件過多時ES會產生merge操作,將多個segmentfile合成一個,如果.del標識了刪除的數據 merge后不會產生在新的segmentfile.
ES根據doc id讀取數據過程:
選ES集群的任一台機器當作協調節點
協調節點根據要查找的doc id 哈希路由到對應的節點的shard,
查到結果返還給協調節點,協調節點返還給客戶端。
ES檢索數據過程:
客戶端發送讀取請求到任一台機器當作協調節點,協調節點發送給所有機器所有shard,
每個shard都會返回結果,協調節點拿到所有shard返回的匹配的結果,再次篩選最匹配的那些document,返還給客戶端.
每次查詢完都會暫時將數據存入cache中,再次
ES在數據量很大的情況下如何優化查詢(保證搜索性能):
主要思想:最大限度利用cache的高效率進行查詢(磁盤查詢效率太低)
(1) 合理利用ES+Hbase/MySQL結合來查詢(Hbase對海量數據在線存儲)
把重要的檢索字段存成ES索引 (例如gid,訂單id,訂單金額,訂單時間,訂單說明...)
其余不作為檢索條件的字段存在Hbase/Mysql
用ES根據條件快速查出gid
再用gid去Hbase/MySQL查出全部字段
(2) 數據預熱
每隔一段時間將熱門關鍵數據寫個程序去查一下,主動從磁盤寫入到cache中
(3) 冷熱分離
水平拆分,將很熱的數據單獨寫進一個索引,將冷數據熱數據拆成兩個單獨索引,放在不同機器上。
(5) 單索引查,不要join多個索引查,效率非常低 (想辦法導入ES的時候直接將要Join的表直接導成一個索引)
(4) 分頁性能 (深度分頁,性能越差)
查第100頁的10條數據/第1001-1010,由於ES分布式的存儲,必須去每台機器每個shard都查1000條,全部返回到協調節點,合並排序后再取出1001-1010。
解決:合理利用Scroll游標
scroll會一次性獲取所有數據的快照,每次翻頁通過游標移動獲取下一頁,分頁性能大很多。、
scroll不能亂跳,只能順序向下翻
ES生產集群部署架構
模板:每個索引的數據大概有多少,每個索引大概分多少shard
ES集群部署5台機器,每台6核64G,總內存320G,分8個shard
ES集群每天增量2000萬條,大約500M,每月大約6億條 15G數據
ES Java-API使用(僅供參考)
模板:
上 Query:
SearchRequest searchRequest = new SearchRequest("表名");
searchRequest.types("doc");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.termQuery( ?, ?));
boolQueryBuilder.mustNot(QueryBuilders.termQuery( ?, ?));
boolQueryBuilder.filter(QueryBuilders.rangeQuery( ?, ?).get(start).let(end));
boolQueryBuilder.should(QueryBuilder.termsQuery( ?, ?));
QueryBuilders.matchQuery("user", "kimchy");
QueryBuilders.termsQuery("", "");
QueryBuilders.rangeQuery("", "");
searchSourceBuilder.query(boolQueryBuilder);
searchSourceBuilder.from(0);
searchSourceBuilder.size(10);
中 aggregation:
TermsAggregationBuilder aggregation = AggregationBuilders.terms("count數量聚合名").field("字段名").size(1000);
aggregation.subAggregation(); //拼接新聚合
aggregation.subAggregation(AggregationBuilders.avg("平均聚合").field("字段名"));
AggregationBuilders.terms("count數量聚合").field("字段名").size(1000);
AggregationBuilders.cardinality("唯一的數量").field("字段名");
AggregationBuilders.sum("求和").field("字段名");
searchSourceBuilder.aggregation(aggregation);
searchRequest.source(searchSourceBuilder);
下 SearchResponse:
SearchResponse searchResponse = null;
try {
searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
logger.error("查詢??", e);
throw new CrmRuntimeException(ErrorCodeEnum.COMMON_FAIL, "查詢出錯");
}
SearchHits searchHits = searchResponse.getHits();
SearchHit[] hits = searchHits.getHits();
//Hits結果
for (int i = 0; i < searchHits.length; i++) {
Map<String, Object> sourceAsMap = searchHits[i].getSourceAsMap();
UserDo userDo = new UserDo();
userDo.setuCellphone(StringUtil.objectToString(sourceAsMap.get("u_cellphone")));
userDo.setNickName(StringUtil.objectToString(sourceAsMap.get("nick_name")));
int gid = StringUtil.objectToInteger(sourceAsMap.get("gid"));
int number = gid + NUMBER_OFFSET;
userDo.setNumber(number);
userDo.setGid(gid);
userDo.setAllSubmit(StringUtil.objectToInteger(sourceAsMap.get("all_submit")));
userDo.setAvatarUrl(StringUtil.objectToString(sourceAsMap.get("avatar_url")));
userDo.setVipType(StringUtil.objectToInteger(sourceAsMap.get("vip_type")));
result.add(userDo);
}
} catch(
Exception e)
{
log.error("全局搜索失敗", e);
throw new CrmRuntimeException(ErrorCodeEnum.COMMON_FAIL, "全局搜索失敗");
}
//聚合結果
Aggregations aggregations = searchResponse.getAggregations();
Terms terms = aggregations.get("signin_rel_count"); //接受聚合結果
List<? extends Terms.Bucket> buckets = terms.getBuckets();
if(CollectionUtils.isNotEmpty(buckets))
{
for (Terms.Bucket bucket : buckets) {
String salesmanId = (String) bucket.getKey();
long docCount = bucket.getDocCount();
....
Cardinality userCountAggregation = bucket.getAggregations().get("count"); //返回
Terms groupBySalesgrpAggregations = bucket.getAggregations().get("group_by_salesgrp"); //獲取bucket下的聚合函數
Sum sumDiscountAggregation = groupBySalesgrpBucket.getAggregations().get("sum_discount");
BigDecimal discount =
new BigDecimal(sumDiscountAggregation.getValue()).setScale(2, RoundingMode.HALF_UP);
}
}