ES分布式搜索引擎架構原理


ES是啥?

ES就是一個開源的搜索引擎    也是一個分布式文檔數據庫

可以在極短的時間內存儲、搜索和分析大量的數據。

 

ES基本屬性:

 

字段

ES中,每個文檔,其實是以json形式存儲的。而一個文檔可以被視為多個字段的集合。

 

映射

每個類型中字段的定義稱為映射。例如,name字段映射為String

 

索引

索引是映射類型的容器。 一個ES的索引非常像關系型世界中的數據庫,是獨立的大量文檔集合

 

ES各屬性對應關系數據庫

關系數據庫 ->   表名                ->  表結構    ->  一條記錄       -> 一個字段

ES        -> 索引index - 類型type(1-n) -> 映射apping  ->  文檔document  -> 字段field 

 

 

ES索引簡單原理: 采用倒排索引

 

Term(單詞):一段文本經過分析器分析以后就會輸出一串單詞,這一個一個的就叫做Term

 

Term Dictionary(單詞字典):顧名思義,它里面維護的是Term,可以理解為Term的集合

 

Term Index(單詞索引):為了更快的找到某個單詞,我們為單詞建立索引

 

Posting List(倒排列表):以前是根據ID查內容,倒排索引之后是根據內容查ID,然后再拿着ID去查詢出來真正需要的東西。

如果類比現代漢語詞典的話,那么Term就相當於詞語,Term Dictionary相當於漢語詞典本身,Term Index相當於詞典的目錄索引)

 

通過 單詞索引 找到單詞在單詞字典中的位置,通過單詞字典進而找到Posting List倒排列表,有了倒排列表就可以根據ID找到文檔.

(本質:通過單詞找到對應的倒排列表,根據倒排列表中的倒排項進而可以找到文檔記錄)

 

查詢結果分析:

took:本次操作花費的時間,單位為毫秒。

timed_out:請求是否超時

_shards:說明本次操作共搜索了哪些分片

hits:搜索命中的記錄

hits.total : 符合條件的文檔總數 hits.hits :匹配度較高的前N個文檔

hits.max_score:文檔匹配得分,這里為最高分

_score:每個文檔都有一個匹配度得分,按照降序排列。

_source:顯示了文檔的原始內容。

 

ES聚合

桶在概念上類似於 SQL 的分組(GROUP BY),而指標則類似於 COUNT() SUM() MAX() 等統計方法

 

 

ES分布式架構原理

shard就是ES索引存儲具體數據的地方,一個索引對應多個shard

多個shard存儲在不同的機器上

每個shard只放索引的一部分數據

每個shard的副本replic放在其他機器上(shard的主體primary和副本replic分開存) 保證了一定程度的高可用

 

 

 

ES寫入數據原理:

從客戶端寫入到shard的全過程:

 

  1. 客戶端隨機找一個ES集群節點當作協調節點,寫數據
  2. 協調節點將數據 根據doc id的哈希路由,寫入分配的shard並同步到從shard
  3. shard將數據寫入內存buffer
  4. 內存buffer每一秒鍾refresh一次將數據刷進OScache緩存,一份sagementfile,一份translog日志 (translog日志的作用 有點類似於redis的RDB文件,用於ES宕機恢復數據)  
  5. OScache緩存每5秒中刷入磁盤的translog日志文件
  6. 30分鍾執行一次flush操作,執行一次commit,強制將內存buffer和OScache數據刷入新創建一個sagmentfile文件(磁盤) 落地到磁盤,多個sagementfile會有merge操作。

 數據搜索主要是從OScache中拿的,所以剛寫入shard的數據要一秒后才能讀到。

寫入一條document數據時會產生一個doc id,查的時候根據doc id進行哈希,路由到對應的shard  (doc id默認隨機分配,也可以手動指定,例如訂單id)

 

ES刪除數據原理:

1.把被刪除數據寫入.del文件(磁盤),被.del文件標識的被認為已刪除

2.sagmentfile文件過多時ES會產生merge操作,將多個segmentfile合成一個,如果.del標識了刪除的數據 merge后不會產生在新的segmentfile.

 

 

ES根據doc id讀取數據過程:

選ES集群的任一台機器當作協調節點

協調節點根據要查找的doc id 哈希路由到對應的節點的shard,

查到結果返還給協調節點,協調節點返還給客戶端。

 

 

ES檢索數據過程:

客戶端發送讀取請求到任一台機器當作協調節點,協調節點發送給所有機器所有shard

每個shard都會返回結果,協調節點拿到所有shard返回的匹配的結果,再次篩選最匹配的那些document,返還給客戶端.

每次查詢完都會暫時將數據存入cache中,再次

 

 

ES在數據量很大的情況下如何優化查詢(保證搜索性能):

主要思想:最大限度利用cache的高效率進行查詢(磁盤查詢效率太低)

(1) 合理利用ES+Hbase/MySQL結合來查詢(Hbase對海量數據在線存儲)

把重要的檢索字段存成ES索引 (例如gid,訂單id,訂單金額,訂單時間,訂單說明...)

其余不作為檢索條件的字段存在Hbase/Mysql

ES根據條件快速查出gid

再用gidHbase/MySQL查出全部字段

(2) 數據預熱

每隔一段時間將熱門關鍵數據寫個程序去查一下,主動從磁盤寫入到cache

(3) 冷熱分離

水平拆分,將很熱的數據單獨寫進一個索引,將冷數據熱數據拆成兩個單獨索引,放在不同機器上。

(5) 單索引查,不要join多個索引查,效率非常低 (想辦法導入ES的時候直接將要Join的表直接導成一個索引)

(4) 分頁性能 (深度分頁,性能越差)

查第100頁的10條數據/1001-1010,由於ES分布式的存儲,必須去每台機器每個shard都查1000條,全部返回到協調節點,合並排序后再取出1001-1010

解決:合理利用Scroll游標

scroll會一次性獲取所有數據的快照,每次翻頁通過游標移動獲取下一頁,分頁性能大很多。、

scroll不能亂跳,只能順序向下翻

 

 

ES生產集群部署架構

模板:每個索引的數據大概有多少,每個索引大概分多少shard

ES集群部署5台機器,每台664G,總內存320G,分8個shard

ES集群每天增量2000萬條,大約500M,每月大約6億條 15G數據

 

 

ES  Java-API使用(僅供參考)

    模板:
     Query:
    SearchRequest searchRequest = new SearchRequest("表名");
    searchRequest.types("doc");
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();


    BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
    boolQueryBuilder.must(QueryBuilders.termQuery( ?, ?));
    boolQueryBuilder.mustNot(QueryBuilders.termQuery( ?, ?));
    boolQueryBuilder.filter(QueryBuilders.rangeQuery( ?, ?).get(start).let(end));
    boolQueryBuilder.should(QueryBuilder.termsQuery( ?, ?));

    QueryBuilders.matchQuery("user", "kimchy");
    QueryBuilders.termsQuery("", "");
    QueryBuilders.rangeQuery("", "");

    searchSourceBuilder.query(boolQueryBuilder);
    searchSourceBuilder.from(0);
    searchSourceBuilder.size(10);

     aggregation:
    TermsAggregationBuilder aggregation = AggregationBuilders.terms("count數量聚合名").field("字段名").size(1000);
    aggregation.subAggregation(); //拼接新聚合
    aggregation.subAggregation(AggregationBuilders.avg("平均聚合").field("字段名"));

    AggregationBuilders.terms("count數量聚合").field("字段名").size(1000);
    AggregationBuilders.cardinality("唯一的數量").field("字段名");
    AggregationBuilders.sum("求和").field("字段名");

    searchSourceBuilder.aggregation(aggregation);


    searchRequest.source(searchSourceBuilder);


     SearchResponse:
    SearchResponse searchResponse = null;
    try {
        searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
    } catch (IOException e) {
        logger.error("查詢??", e);
        throw new CrmRuntimeException(ErrorCodeEnum.COMMON_FAIL, "查詢出錯");
    }
    SearchHits searchHits = searchResponse.getHits();
    SearchHit[] hits = searchHits.getHits();

    //Hits結果
    for (int i = 0; i < searchHits.length; i++) {
        Map<String, Object> sourceAsMap = searchHits[i].getSourceAsMap();
        UserDo userDo = new UserDo();
        userDo.setuCellphone(StringUtil.objectToString(sourceAsMap.get("u_cellphone")));
        userDo.setNickName(StringUtil.objectToString(sourceAsMap.get("nick_name")));
        int gid = StringUtil.objectToInteger(sourceAsMap.get("gid"));
        int number = gid + NUMBER_OFFSET;
        userDo.setNumber(number);
        userDo.setGid(gid);
        userDo.setAllSubmit(StringUtil.objectToInteger(sourceAsMap.get("all_submit")));
        userDo.setAvatarUrl(StringUtil.objectToString(sourceAsMap.get("avatar_url")));
        userDo.setVipType(StringUtil.objectToInteger(sourceAsMap.get("vip_type")));
        result.add(userDo);
    }

     } catch(
         Exception e)

     {
    log.error("全局搜索失敗", e);
    throw new CrmRuntimeException(ErrorCodeEnum.COMMON_FAIL, "全局搜索失敗");
        }

//聚合結果
Aggregations aggregations = searchResponse.getAggregations();
Terms terms = aggregations.get("signin_rel_count");     //接受聚合結果
List<? extends Terms.Bucket> buckets = terms.getBuckets();
if(CollectionUtils.isNotEmpty(buckets))

{
    for (Terms.Bucket bucket : buckets) {
        String salesmanId = (String) bucket.getKey();
        long docCount = bucket.getDocCount();
           ....
        Cardinality userCountAggregation = bucket.getAggregations().get("count");       //返回
        Terms groupBySalesgrpAggregations = bucket.getAggregations().get("group_by_salesgrp");   //獲取bucket下的聚合函數

        Sum sumDiscountAggregation = groupBySalesgrpBucket.getAggregations().get("sum_discount");
        BigDecimal discount =
                new BigDecimal(sumDiscountAggregation.getValue()).setScale(2, RoundingMode.HALF_UP);
    }


}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM