ES 分頁方案
ES 分頁方案
ES 中,存在三種常見的分頁方案:
- FROM, SIZE
- Search-After
- Scroll
下面將依次比較三種方案之間的 trede-off,並給出相應建議的應用場景。
常見分頁,FROM, SIZE#
ES 提供了常見的分頁功能,通過在 search API 中,指定 from 和 size 來實現分頁的效果:
{
"from": 10, "size": 20, "sort": [{"timestamp": "desc"}], "query": { "match_all": {} # 返回所有 doc } }
from: 表示起點位置,默認是 0.
size:表示返回的數量,默認是 10.
這種分頁方式到沒什么好說的,但需要注意的是由於 ES 為了支持海量數據的查詢,本身采用了分布式的架構。
而對於分布式架構來說,存在一個典型的深度分頁的問題。
ES 在存儲數據時,會將其分配到不同的 shard 中。在查詢時,如果 from 值過大,就會導致分頁起點太深。
每個 shard 查詢時,都會將 from 之前位置的所有數據和請求 size 的總數返回給 coordinator. 簡單來說,就是想取第 n 頁的內容,但是卻返回了前 n 的內容。
而對於 coordinator 來說,會顯著導致內存和CPU使用率升高,特別是在高並發的場景下,導致性能下降或者節點故障。
舉例來說,當前 ES 共有 4 個 shard,並且每個 shard 沒有副本。假如分頁的大小為 10. 然后想取第101 頁前 5 條內容。對應的 from = 1000,size = 5.
ES 的查詢過程為:
- 每個 shard 將所在數據加載到內存並排序,然后取前 1005 個,返回給 coordinator.
- 每個 shard 都執行上面的操作。
- 最后 coordinator 將 1005 * 4 = 4020 條數據排序,然后取 5 條數據返回。
可以發現,from 的位置太深,造成了如下的問題:
- 返回給 coordinator 數值太大,明明就需要 5 條數據,但卻給 coordinator 1005 條數據
- coordinator 需要處理每個 shard 返回前 101 頁的結果。但需要的僅是第 101 頁的內容,卻對前 101 頁的內容進行了排序,浪費了內存和 cpu 的資源。
ES 為了規避這個問題,通過設置 max_result_window 來限制 from 和 size 的大小,默認大小僅支持 10000 條。當超過 10000 的大小,則會報出異常。
在頁數不深或者考慮內存,低並發等情況,可以通過臨時調整 max_result_window 來解決該問題,但如果頁數太深則建議使用的 Search-After 的方式。
SearchAfter 分頁#
為了應對深度分頁的情況,ES 推薦使用 SearchAfter 的方式,來實現數據的深度翻頁檢索。
在具體實現上,通過動態指針的技術。在第一次使用 search api 查詢時,附帶一個 sort 參數,其中 sort 的值必須唯一,可以用 _id
作為排序參數。
{
"from": 0, "size": 1, "sort": [ {"timestamp": "desc"}, {"_id": "asc"} ], "query": { "match_all": {} } }
每個 shard 在排序后會記錄當前查詢的最后位置,然后將其返回。
{
"took": 0, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 10, "relation": "eq" }, "max_score": null, "hits": [ { "_index": "cmi_alarm_info", "_type": "_doc", "_id": "1,3,d_to_s_JitterAvg", "_score": null, "_source": { "src_device_id": 1, "dst_device_id": 3, "type": "d_to_s_JitterAvg", "status": "normal", "create_time": 1617085800 }, "sort": [ 1617085800, "1,3,d_to_s_JitterAvg" ] } ] } }
下次查詢時,在 search_after
攜帶 Response 中返回的 sort 參數,實現分頁的查詢。
{
"from": 0, "size": 1, "sort": [ {"timestamp": "desc"}, {"_id": "asc"} ], "query": { "match_all": {} }, "search_after": [ 1617084900, "13,3,d_to_s_JitterAvg" ] }
和 from size 的查詢方式相比, 每個 shard 每次返回給 coordinator 的結果僅為 size 數量,將空間復雜度由 O(n) 降為 O(1).
但 Search-after 也有一些問題:
首先就是不支持跳頁的情況。
如果需求上一定需要跳頁時,只能通過 from 或者 size 的方式。同時為了避免深度分頁的問題,一般可以采用限制頁面數量的方式。在確定 size 后,設置一個最大的分頁值。在查詢時,分頁數不允許超過該值。
其次,隨着翻頁深度的增加,查詢的效率也會有所降低,但不會導致 OOM,算是可以完成深度查詢的任務。原因在於,雖然說通過排序字段,可以很好的定位出下一次翻頁的開始位置。但在每次請求時,從頭掃描該字段,找到該字段的位置。頁數越深,找到該位置的時間也就越長。
Scroll 分頁#
雖然說 search-after 可以在一定程度上避免深度分頁的問題,但在處理大數據量,效率並不高。在一些對實時性要求不高的場景,如利用 Spark 進行大規模計算時。就可以利用 scroll 分頁的方式,檢索所有數據。
scroll 的請求方式分為兩步:
- 第一次請求,ES 會返回生成生成的 scroll_id
- 之后的請求,不斷使用 scroll_id 進行查詢,直到所有數據被檢索完成。
第一次請求,添加 scroll 標識,並拿到 scroll_id 作為下次請求的參數:
POST /my-index-000001/_search?scroll=1m { "size": 100, "query": { "match": { "message": "foo" } } } Response: { "_scroll_id": "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAADlx8Wb0VzanNRSENRbUtBQVEzbHltcF9WQQ==", "took": 0, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, "hits": { } }
第二次請求,使用 scroll_id 直到遍歷完所有數據:
POST /_search/scroll
{
"scroll" : "1m", "scroll_id" : "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAD4WYm9laVYtZndUQlNsdDcwakFMNjU1QQ==" }
對於 Scroll 來說,會返回第一次請求時刻的所有文檔,之后文檔的改變並不會被查詢到,保留的時間通過 scroll 參數指定。在查詢性能上,時間和空間復雜度都為 O(1),能以恆定的速度查詢完所有數據。
在原理上,相當於第一次查詢階段, 保留所有的 doc id 信息。在隨后的查詢中,根據的需要的 doc id,在不同的 shard 中拉取不同的文檔。和 search-after 相比,省去了每次都要全局排序的過程。
總結#
from, size 適用於常見的查詢,例如需要支持跳頁並實時查詢的場景。但查詢深度過深時,會有深度分頁的問題,造成 OOM.
如果在業務上,可以不選擇跳頁的方式,可以使用的 search-after 的方式避免深度分頁的問題。但如果一定要跳頁的話,只能采用限制最大分頁數的方式。
但對於超大數據量,以及需要高並發獲取等離線場景,scroll 是比較好的一種方式。
參考#
百億數據,毫秒級返回,如何設計?--淺談實時索引構建之道
本文已整理致我的 github 地址 https://github.com/allentofight/easy-cs,歡迎大家 star 支持一下
前言
近年來公司業務迅猛發展,數據量爆炸式增長,隨之而來的的是海量數據查詢等帶來的挑戰,我們需要數據量在十億,甚至百億級別的規模時依然能以秒級甚至毫秒級的速度返回,這樣的話顯然離不開搜索引擎的幫助,在搜索引擎中,ES(ElasticSearch)毫無疑問是其中的佼佼者,連續多年在 DBRanking 的搜索引擎中評測中排名第一,也是絕大多數大公司的首選,那么它與傳統的 DB 如 MySQL 相比有啥優勢呢,ES 的數據又是如何生成的,數據達到 PB 時又是如何保證 ES 索引數據的實時性以更好地滿足業務的需求的呢。
本文會結合我司在 ES 上的實踐經驗與大家談談如何構建准實時索引的一些思路,希望對大家有所啟發。本文目錄如下
- 為什么要用搜索引擎,傳統 DB 如 MySQL 不香嗎
- MySQL 的不足
- ES 簡介
- ES 索引數據構建
- PB 級的 ES 准實時索引數據構建之道
為什么要用搜索引擎,傳統 DB 如 MySQL 不香嗎
MySQL 的不足
MySQL 架構天生不適合海量數據查詢,它只適合海量數據存儲,但無法應對海量數據下各種復雜條件的查詢,有人說加索引不是可以避免全表掃描,提升查詢速度嗎,為啥說它不適合海量數據查詢呢,有兩個原因:
1、加索引確實可以提升查詢速度,但在 MySQL 中加多個索引最終在執行 SQL 的時候它只會選擇成本最低的那個索引,如果沒有索引滿足搜索條件,就會觸發全表掃描,而且即便你使用了組合索引,也要符合最左前綴原則才能命中索引,但在海量數據多種查詢條件下很有可能不符合最左前綴原則而導致索引失效,而且我們知道存儲都是需要成本的,如果你針對每一種情況都加索引,以 innoDB 為例,每加一個索引,就會創建一顆 B+ 樹,如果是海量數據,將會增加很大的存儲成本,之前就有人反饋說他們公司的某個表實際內容的大小才 10G, 而索引大小卻有 30G!這是多么巨大的成本!所以千萬不要覺得索引建得越多越好。
2、有些查詢條件是 MySQL 加索引都解決不了的,比如我要查詢商品中所有 title 帶有「格力空調」的關鍵詞,如果你用 MySQL 寫,會寫出如下代碼
SELECT * FROM product WHERE title like '%格力空調%'
這樣的話無法命中任何索引,會觸發全表掃描,而且你不能指望所有人都能輸對他想要的商品,是人就會犯錯誤,我們經常會犯類似把「格力空調」記成「格空間」的錯誤,那么 SQL 語句就會變成:
SELECT * FROM product WHERE title like '%格空調%'
這種情況下就算你觸發了全表掃描也無法查詢到任何商品,綜上所述,MySQL 的查詢確實能力有限。
ES 簡介
與其說上面列的這些點是 MySQL 的不足,倒不如說 MySQL 本身就不是為海量數據查詢而設計的,術業有專攻,海量數據查詢還得用專門的搜索引擎,這其中 ES 是其中當之無愧的王者,它是基於 Lucene 引擎構建的開源分布式搜索分析引擎,可以提供針對 PB 數據的近實時查詢,廣泛用在全文檢索、日志分析、監控分析等場景。
它主要有以下三個特點:
輕松支持各種復雜的查詢條件
: 它是分布式實時文件存儲,會把每一個字段都編入索引(倒排索引),利用高效的倒排索引,以及自定義打分、排序能力與豐富的分詞插件等,能實現任意復雜查詢條件下的全文檢索需求可擴展性強
:天然支持分布式存儲,通過極其簡單的配置實現幾百上千台服務器的分布式橫向擴容,輕松處理 PB 級別的結構化或非結構化數據。高可用,容災性能好
:通過使用主備節點,以及故障的自動探活與恢復,有力地保障了高可用
我們先用與 MySQL 類比的形式來理解 ES 的一些重要概念

通過類比的形式不難看出 ES 的以下幾個概念
1、 MySQL 的數據庫(DataBase)相當於 Index(索引),數據的邏輯集合,ES 的工作主要也是創建索引,查詢索引。
2、 一個數據庫里會有多個表,同樣的一個 Index 也會有多個 type
3、 一個表會有多行(Row),同樣的一個 Type 也會有多個 Document。
4、 Schema 指定表名,表字段,是否建立索引等,同樣的 Mapping 也指定了 Type 字段的處理規則,即索引如何建立,是否分詞,分詞規則等
5、 在 MySQL 中索引是需要手動創建的,而在 ES 一切字段皆可被索引,只要在 Mapping 在指定即可
那么 ES 中的索引為何如此高效,能在海量數據下達到秒級的效果呢?它采用了多種優化手段,最主要的原因是它采用了一種叫做倒排索引的方式來生成索引,避免了全文檔掃描,那么什么是倒排索引呢,通過文檔來查找關鍵詞等數據的我們稱為正排索引,返之,通過關鍵詞來查找文檔的形式我們稱之為倒排索引,假設有以下三個文檔(Document)

要在其中找到含有 comming 的文檔,如果要正排索引,那么要把每個文檔的內容拿出來查找是否有此單詞,毫無疑問這樣的話會導致全表掃描,那么用倒排索引會怎么查找呢,它首先會將每個文檔內容進行分詞,小寫化等,然后建立每個分詞與包含有此分詞的文檔之前的映射關系,如果有多個文檔包含此分詞,那么就會按重要程度即文檔的權重(通常是用 TF-IDF 給文檔打分)將文檔進行排序,於是我們可以得到如下關系

這樣的話我們我要查找所有帶有 comming 的文檔,就只需查一次,而且這種情況下查詢多個單詞性能也是很好的,只要查詢多個條件對應的文檔列表,再取交集即可,極大地提升了查詢效率。
畫外音:這里簡化了一些流程,實際上還要先根據單詞表來定位單詞等,不過這些流程都很快,可以忽略,有興趣的讀者可以查閱相關資料了解。
除了倒排索引外,ES 的分布式架構也天然適合海量數據查詢,來看下 ES 的架構

一個 ES 集群由多個 node 節點組成,每個 index 是以分片(Shard,index 子集)的數據存在於多個 node 節點上的,這樣的話當一個查詢請求進來,分別在各個 node 查詢相應的結果並整合后即可,將查詢壓力分散到多個節點,避免了單個節點 CPU,磁盤,內存等處理能力的不足。
另外當新節點加入后,會自動遷移部分分片至新節點,實現負載均衡,這個功能是 ES 自動完成的,對比一個下 MySQL 的分庫分表需要開發人員引入 Mycat 等中間件並指定分庫分表規則等繁瑣的流程是不是一個巨大的進步?這也就意味着 ES 有非常強大的水平擴展的能力,集群可輕松擴展致幾百上千個節點,輕松支持 PB 級的數據查詢。
當然 ES 的強大不止於此,它還采用了比如主備分片提升搜索吞吐率,使用節點故障探測,Raft 選主機制等提升了容災能力等等,這些不是本文重點,讀者可自行查閱,總之經過上面的簡單總結大家只需要明白一點:ES 的分布式架構設計天生支持海量數據查詢。
那么 ES 的索引數據(index)是如何生成的呢,接下來我們一起來看看本文的重點
如何構建 ES 索引
要構建 ES 索引數據,首先得有數據源,一般我們會使用 MySQL 作為數據源,你可以直接從 MySQL 中取數據然后再寫入 ES,但這種方式由於直接調用了線上的數據庫查詢,可能會對線上業務造成影響,比如考慮這樣的一個場景:
在電商 APP 里用的最多的業務場景想必是用戶輸入關鍵詞來查詢相對應的商品了,那么商品會有哪些信息呢,一個商品會有多個 sku(sku 即同一個商品下不同規格的品類,比如蘋果手機有 iPhone 6, iPhone 6s等),會有其基本屬性如價格,標題等,商品會有分類(居家,服飾等),品牌,庫存等,為了保證表設計的合理性,我們會設計幾張表來存儲這些屬性,假設有 product_sku(sku 表), product_property(基本屬性表),sku_stock(庫存表),product_category(分類表)這幾張表,那么為了在商品展示列表中展示所有這些信息,就必須把這些表進行 join,然后再寫入 ES,這樣查詢的時候就會在 ES 中獲取所有的商品信息了。

這種方案由於直接在 MySQL 中執行 join 操作,在商品達到千萬級時會對線上的 DB 服務產生極大的性能影響,所以顯然不可行,那該怎么生成中間表呢,既然直接在 MySQL 中操作不可行,能否把 MySQL 中的數據同步到另一個地方再做生成中間表的操作呢,也就是加一個中間層進行處理,這樣就避免了對線上 DB 的直接操作,說到這相信大家又會對計算機界的名言有進一步的體會:沒有什么是加一個中間層不能解決的,如果有,那就再加一層。
這個中間層就是 hive
什么是 hive
hive 是基於 Hadoop 的一個數據倉庫工具,用來進行數據提取、轉化、加載,這是一種可以存儲、查詢和分析存儲在 Hadoop 中的大規模數據的機制,它的意義就是把好寫的 hive 的 sql 轉換為復雜難寫的 map-reduce 程序(map-reduce 是專門用於用於大規模數據集(大於1TB)的並行運算編程模型),也就是說如果數據量大的話通過把 MySQL 的數據同步到 hive,再由 hive 來生成上述的 product_tmp 中間表,能極大的提升性能。hive 生成臨時表存儲在 hbase(一個分布式的、面向列的開源數據庫) 中,生成后會定時觸發 dump task 調用索引程序,然后索引程序主要從 hbase 中讀入全量數據,進行業務數據處理,並刷新到 es 索引中,整個流程如下

這樣構建索引看似很美好,但我們需要知道的是首先 hive 執行 join 任務是非常耗時的,在我們的生產場景上,由於數據量高達幾千萬,執行 join 任務通常需要幾十分鍾,從執行 join 任務到最終更新至 ES 整個流程常常需要至少半小時以上,如果這期間商品的價格,庫存,上線狀態(如被下架)等重要字段發生了變更,索引是無法更新的,這樣會對用戶體驗產生極大影響,優化前我們經常會看到通過 ES 搜索出的中有狀態是上線但實際是下架的產品,嚴重影響用戶體驗,那么怎么解決呢,有一種可行的方案:建立寬表
既然我們發現 hive join 是性能的主要瓶頸,那么能否規避掉這個流程呢,能否在 MySQL 中將 product_sku,product_property,sku_stock 等表組合成一個大表(我們稱其為寬表)

這樣在每一行中商品涉及到的的數據都有了,所以將 MySQL 同步到 hive 后,hive 就不需要再執行耗時的 join 操作了,極大的提升了整體的處理時間,從 hive 同步 MySQL 再到 dump 到 ES 索引中從原來的半小時以上降低到了幾分鍾以內,看起來確實不錯,但幾分鍾的索引延遲依然是無法接受的。
為什么 hive 無法做到實時導入索引
因為 hive 構建在基於靜態批處理的Hadoop 之上,Hadoop 通常都有較高的延遲並且在作業提交和調度的時候需要大量的開銷。因此,hive 並不能夠在大規模數據集上實現低延遲快速的查詢等操作,再且千萬級別的數據全量從索引程序導入到 ES 集群至少也是分鍾級。
另外引入了寬表,它的維護也成了一個新問題,設想 sku 庫存變了,產品下架了,價格調整了,那么除了修改原表(sku_stock,product_categry 等表)記錄之外,還要將所有原表變更到的記錄對應到寬表中的所有記錄也都更新一遍,這對代碼的維護是個噩夢,因為你需要在所有商品相關的表變更的地方緊接着着變更寬表的邏輯,與寬表的變更邏輯變更緊藕合!
PB 級的 ES 准實時索引構建之道
該如何解決呢?仔細觀察上面兩個問題,其實都是同一個問題,如果我們能實時監聽到 db 的字段變更,再將變更的內容實時同步到 ES 和寬表中不就解決了我們的問題了。
怎么才能實時監聽到表字段的變更呢?
答案:binlog
來一起復習下 MySQL 的主從同步原理

- MySQL master 將數據變更寫入二進制日志( binary log, 其中記錄叫做二進制日志事件binary log events,可以通過 show binlog events 進行查看)
- MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log)
- MySQL slave 重放 relay log 中事件,將數據變更反映它自己的數據
可以看到主從復制的原理關鍵是 Master 和 Slave 遵循了一套協議才能實時監聽 binlog 日志來更新 slave 的表數據,那我們能不能也開發一個遵循這套協議的組件,當組件作為 Slave 來獲取 binlog 日志進而實時監聽表字段變更呢?阿里的開源項目 Canal 就是這個干的,它的工作原理如下:
- canal 模擬 MySQL slave 的交互協議,偽裝自己為 MySQL slave ,向 MySQL master 發送dump 協議
- MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )
- canal 解析 binary log 對象(原始為 byte 流)

這樣的話通過 canal 就能獲取 binlog 日志了,當然 canal 只是獲取接收了 master 過來的 binlog,還要對 binlog 進行解析過濾處理等,另外如果我們只對某些表的字段感興趣,該如何配置,獲取到 binlog 后要傳給誰? 這些都需要一個統一的管理組件,阿里的 otter 就是干這件事的。
什么是 otter
Otter 是由阿里提供的基於數據庫增量日志解析,准實時同步到本機房或異地機房 MySQL 數據庫的一個分布式數據庫同步系統,它的整體架構如下:

注:以上是我司根據 otter 改造后的業務架構,與原版 otter 稍有不同,不過大同小異
主要工作流程如下
- 在 Manager 配置好 zk,要監聽的表 ,負責監聽表的節點,然后將配置同步到 Nodes 中
- node 啟動后則其 canal 會監聽 binlog,然后經過 S(select),E(extract),T(transform),L(load) 四個階段后數據發送到 MQ
- 然后業務就可以訂閱 MQ 消息來做相關的邏輯處理了
畫外音:zookeeper 主要協調節點間的工作,如在跨機房數據同步時,可能要從 A 機房的節點將數據同步到 B 機房的節點,要用 zookeeper 來協調,
大家應該注意到了node 中有 S,E,T,L 四個階段,它們的主要作用如下

-
Select 階段
: 為解決數據來源的差異性,比如接入 canal 獲取增量數據,也可以接入其他系統獲取其他數據等。 -
Extract階段
: 組裝數據,針對多種數據來源,mysql,oracle,store,file等,進行數據組裝和過濾。 -
Transform 階段
: 數據提取轉換過程,把數據轉換成目標數據源要求的類型 -
Load 階段
: 數據載入,把數據載入到目標端,如寫入遷移后的數據庫, MQ,ES 等
以上這套基於阿里 otter 改造后的數據服務我們將它稱為 DTS(Data Transfer Service),即數據傳輸服務。
搭建這套服務后我們就可以通過訂閱 MQ 來實時寫入 ES 讓索引實時更新了,而且也可以通過訂閱 MQ 來實現寬表字段的更新,實現了上文中所說的寬表字段更新與原表緊藕合的邏輯,基於 DTS 服務的索引改進架構如下:

注意:「build 數據」這一模塊對實時索引更新是透明的,這個模塊主要用在更新或插入 MySQL 寬表,因為對於寬表來說,它是幾個表數據的並集,所以並不是監聽到哪個字段變更就更新哪個,它要把所有商品涉及到的所有表數據拉回來再更新到寬表中。
於是,通過 MySQL 寬表全量更新+基於 DTS 的實時索引更新我們很好地解決了索引延遲的問題,能達到秒級的 ES 索引更新!
這里有幾個問題可能大家比較關心,我簡單列一下
需要訂閱哪些字段
對於 MySQL 寬表來說由於它要保存商品的完整信息,所以它需要訂閱所有字段,但是對於紅框中的實時索引更新而言,它只需要訂閱庫存,價格等字段,因為這些字段如果不及時更新,會對銷量產生極大的影響,所以我們實時索引只關注這些敏感字段即可。
有了實時索引更新,還需要全量索引更新嗎
需要,主要有兩個原因:
- 實時更新依賴消息機制,無法百分百保證數據完整性,需要全量更新來支持,這種情況很少,而且消息積壓等會有告警,所以我們一天只會執行一次全量索引更新
- 索引集群異常或崩潰后能快速重建索引
全量索引更新的數據會覆蓋實時索引嗎
會,設想這樣一種場景,你在某一時刻觸發了實時索引,然后此時全量索引還在執行中,還未執行到實時索引更新的那條記錄,這樣在的話當全量索引執行完之后就會把之前實時索引更新的數據給覆蓋掉,針對這種情況一種可行的處理方式是如果全量索引是在構建中,實時索引更新消息可以延遲處理,等全量更新結束后再消費。也正因為這個原因,全量索引我們一般會在凌晨執行,由於是業務低峰期,最大可能規避了此類問題。
總結
本文簡單總結了我司在 PB 級數據下構建實時 ES 索引的一些思路,希望對大家有所幫助,文章只是簡單提到了 ES,canal,otter 等阿里中間件的應用,但未對這些中間件的詳細配置,原理等未作過多介紹,這些中間件的設計非常值得我們好好研究下,比如 ES 為了提高搜索效率、優化存儲空間做了很多工作,再比如 canal 如何做高可用,otter 實現異地跨機房同步的原理等,建議感興趣的讀者可以之后好好研究一番,相信你會受益匪淺。