kylin streaming原理介紹與特點淺析


前言

最近搭了Kylin Streaming並初步測試了下,覺得這個東西雖然有些限制,但還是蠻好用的,所以系統寫篇文章總結下其原理和一些配置。

Kylin Streaming是Kylin3.0最新引入的一個功能,意為OLAP查詢提供亞秒級的數據延遲,即在攝入數據后,立即可以在OLAP查詢中體現出來。

用過Kylin應該都知道,它主要是通過預構建的方式,將數據從Hive預先計算然后存儲到Hbase。查詢的時候一些復雜操作直接轉成Hbase的scan和filter操作,大大降低OLAP查詢響應時間。

而在比較早版本的Kylin其實是有提供從kafka構建流式應用的,只是那時候走的還是預構建然后存到hbase的路子。這其實是微批的思路,缺點是延遲會比較高(十幾分鍾級別的延遲)。

這個延遲在某些場景下肯定是無法使用的,所以19年Kylin開始對實時計算這塊進行開發,20年5月份時候的kylin3.0.2算是第一個正式可用的streaming版本(另外2020.7.2,kylin3.1.0已發布)。數據源還是kafka,只是現在增加了其他模塊已支持亞秒級的數據延遲。

接下來主要從架構說起,再說到底層一些組件的實現方式,最后討論下一些功能方面的實現以及具體的配置。

kylin streaming設計和原理

架構介紹

kylin streaming在架構上增加了兩個模塊,可以看看架構圖。
kylin streaming架構圖

其中藍色方框內的就是增加的組件內容,那么整個kylin streaming 包含的組件有:

  • Kafka Cluster [data source]
  • HBase Cluster [historical part storage]
  • Zookeeper Cluster [receiver metadata storage]
  • MapReduce [distributed computation]
  • HDFS [distributed storage]
  • Kylin Process [job server/query server/coordinator]
  • Kylin streaming receiver Cluster [real-time part computation and storage]
  • Query Engine,Build Engine

后面兩個就是在kylin streaming中新增加的模塊,即streaming coordinator和streaming receiver cluster。其他的還有用於構建源表的kafka,存儲數據的Hdfs和Hbase,計算結果用的MapReduce。這里只重點介紹后面兩個。

streaming coordinator

streaming coordinator相當於streaming receiver cluster的master,主要負責做一些協調分配工作,比如分配kafka哪個分區的數據分配到哪個streaming receiver的副本,控制消費速率等等。

而具體要指定哪個節點為streaming coordinator,只需要指定kylin.server.mode配置為all(all模式還包含了query server和job server模塊)或stream_coordinator就行。另外可以部署多台機器為streaming coordinator以預防單點故障。

streaming receiver cluster

streaming receiver即streaming receiver cluster的worker,被streaming coordinator管理。主要負責:

  • 攝入實時數據
  • 構建基礎的cuboid
  • 接收查詢請求,根據自身數據執行請求並返回
  • 將自己緩存的segment信息持久化到HDFS

另外為了容災,多個streaming receiver可以組成一個Replica Set。這一個Replica Set中的streaming receiver都會執行同樣的任務(即消費同樣的kafka分區),它們的作用僅僅是當某個receiver失效的時候可以快速切換。

當然還有Query Engine和Build Engine,即查詢引擎和構建引擎,這些都是kylin原先就有的模塊。用於執行查詢SQL和構建cube。在kylin streaming中,都對原先的模塊做了拓展,以支持實時情況下的查詢和構建。那么接下來來看看實時情況下構建和查詢的流程。

kylin streaming數據構建流程

構建流程
在kylin streaming種,數據會先存儲在內存中,經過一定時間后會通過構建cube的方式持久化到hdfs/hbase中,而這里的數據構建流程,則是包含整個數據的生命周期。

這里主要將文檔種的內容搬過來,主要流程如下:

  1. Coordinator向流式cube的所有分區的streaming source發送請求確認信息
  2. Coordinator分配哪一個receivers消費streaming數據,並向receivers發送請求開始消費數據
  3. receiver消費數據並構建索引
  4. 一段時間以后,receiver將immutable狀態的segment從本地持久化到hdfs(關於streaming segment狀態變更參見下文)
  5. receiver通知coordinator一個segment已經持久化到hdfs
  6. 在所有receivers(多個分區)提交其對應segment后,coordinator提交一個全量cube構建任務(在內存中segment只構建最基礎的cuboid)到build engine
  7. build engine從hdfs文件中構建全量的cuboid
  8. build engine村寧次cuboid數據到hbase,然后coordinator會通知receivers刪除本地存儲的實時數據

然后接下來再看看查詢的流程

kylin streaming查詢流程

查詢流程

  1. 如果查詢命中一個streaming cube,query engine向cube的receivers的Coordinator發送請求
  2. query engine發送查詢請求到對應的receivers查詢實時segment
  3. query engine發送請求到hbase查詢已經持久化的歷史segment
  4. query engine聚合實時和歷史數據,然后返回給客戶端

以上就是kylin streaming查詢引擎和構建引擎的大致流程,接下來再說說一些內部實現細則。

kylin streaming實現細節

kylin streaming segment存儲實現

常見的流處理處理的數據時間,通常有兩種,包括事件時間(event time),處理時間(process time)。kylin streaming的存儲結構是,按照事件時間,將數據存儲成一個一個的segment。

kylin streaming segment

先說下消息接收消息的存儲邏輯,當一個消息到達的時候,會根據事件時間查詢對應的segment存在否,如果不存在則去創建對應的segment。

而segment在一開始創建的時候,它的狀態是Active,但是當一定時間(這個時間根據配置)沒有消息到達,該segment的狀態就會變成Immutable,然后存儲到Hdfs中。

初始的segment是在內存中進行數據聚合和度量計算的(注意receiver只計算基礎的coboid和指定的coboid,而非離線數據那樣計算全量的cuboid),但是到達一定大小(也是配置)后,會刷到磁盤上存儲成fragment文件,而fragment文件到達一定大小后,又會觸發merge操作,異步將多個fragment文件進行合並。

其中相同cuboid的數據會存儲在同一文件夾下,而metadata則以json格式另外存儲。

為了提高查詢性能,fragment的存儲格式是列式存儲格式,如下圖所示:

fragment文件格式

這張圖將前面講到的東西都基本包含了,數據中存儲了維度數據(dim)和度量數據(Metrics)。維度數據存儲成三個部分數據:

  • 第一部分是Dictionary(字典)編碼部分,當維度的encoding屬性設置為‘Dict’時存在。
  • 第二部分則是數據的值/dictionary編碼后的值,這部分數據會被壓縮。
  • 第三部分是倒排索引,存儲倒排索引是數據結構是Roaring Bitmap,一種優化過后的bitmap結構。

重平衡/重分配

在某些情況下(比如Kafka消息快速增長),當前的receiver集群可能出現負載不平衡的現象,這時候需要讓revicer下線,重平衡以使整個集群負載均衡。

重平衡是自動發生,這個過程可能會持續幾秒的時間。

在實際過程中,重平衡是一個從CurrentAssignment狀態到NewAssignment狀態的過程。 整個重平衡操作是一個分布式事務,可以分為四個步驟。 如果一個步驟失敗,將執行自動回滾操作。

主要流程如下:

  1. 停止當前分配狀態(CurrentAssignment)所有Receiver,並且每個Receiver將消費的offset量報告給coordinator,coordinator合並每個分區的偏移量,保留最大偏移量(Replica Set中的Receiver可能消費進度不一樣)。然后通知Receiver消費到統一(最大)偏移量,然后再次停止消耗
  2. coordinator向所有新分配狀態(NewAssignment)下的Receiver發送一個分配請求,更新其分配情況。然后coordinator向所有newAssignment的Receiver發送一個startConsumer請求,要求他們根據上一步中的分配情況開始消費。
  3. 向刪除的Replica Set所屬的所有接收方發送ImmutableCube請求,要求它們強制將所有segment轉換為Immutable segment。
  4. 更新元數據並將NewAssignment + RemovedAssignment記錄到元數據中(已刪除的Replica Set仍會接受查詢請求,直到重分配完成)

故障恢復

在流處理中,由於數據是無界的,所以故障是不可避免的,哪怕是上面提到的Replica Set,也只能盡可能減少故障影響。所以對付故障的重點並非預防,而實在於如何進行故障恢復。

kylin streaming對故障恢復的做法是在receiver端定期進行checkpoint,這樣當receiver故障重啟后數據也能正確重新處理(依賴於kafka的上游備份能力)。

而checkpoint主要有兩部分內容需要checkpoint,第一部分是消費信息,即kafka的消費offset信息。第二部分則是磁盤數據狀態信息,即最新的{segment:framentID}信息。

那么當重啟的時候,發現磁盤上的fragment數據比checkpoint記錄的磁盤信息數據新怎么辦呢?答案是刪除沒有checkpoint的數據。

kylin streaming優化

首先先來說說Coordinator和Replica Set的數量,在實際生產環境中,為了避免單點故障問題,最好是能夠將Coordinator部署兩個或以上。而Replica Set的數量則與數據源,即kafka topic的分區數相關。kylin本身提供一個配置,可以讓我們指定一個topic全部分區由多少個Replica Set消費,所以Replica Set的數量應該與topic的分區數呈倍數關系或冗余一兩個,以便充分利用集群的負載的同時增加容錯性。

還有一點,記得前面提到的一個Replica Set由多個Receiver組成嗎,所以最好一個Replica Set中配置兩個Receiver實例。

下面列舉下跟優化相關的一些配置,並且會解釋對於配置的作用。

PS:由於kylin streaming模塊還處於高速迭代的階段,有些配置的說明或默認值可能會發生更改,詳細還是以官網最新資料為准。

  • kylin.stream.receiver.use-threads-per-query:指定每個查詢默認的線程數(The parallelism of scan in receiver side),默認是8。可以根據負載和數據情況,適當調大此參數。
  • kylin.stream.index.maxrows: 指定了緩存在堆內的聚合后的事件最大行數。默認值是50000。這個參數會影響Fragment File的數量,可以根據需求適當調高。
  • kylin.stream.cube-num-of-consumer-tasks: 指定了一個topic的全部消息的攝入將由多少Replica Set來負責,即一個topic的全部分區分配到多少個Replica Set,當然也跟你當前的Replica Set數量有關。如果消息速率較大,需要適當提升這個數值。默認值是3。
  • kylin.stream.checkpoint.file.max.num: 指定了Receiver為每一個Cube保留的checkpoint文件數量。默認值是 5。
  • kylin.stream.index.checkpoint.intervals: 指定了Receiver進行checkpoint的間隔。默認值是 300秒。有關checkpoint內容請參閱上面介紹。
  • kylin.stream.cube.window: 指定了Streaming Segment的時間間隔。比如說[2019-01-01 11:00:00, 2019-01-01 12:00:00]就是一個segment的時間間隔,在這個時間內到達的消息都會歸檔到這個segment中(當然不能超過配置的大小),默認值是3600。
  • kylin.stream.cube.duration: 指定了Streaming Segment會等待遲到的消息多久,默認值7200。接上述的例子,意思是如果一個消息遲到7200秒以內,它還是會被歸檔到[2019-01-01 11:00:00, 2019-01-01 12:00:00]這個segment中。
  • kylin.stream.immutable.segments.max.num: 指定了在Receiver端,一個Cube最多可以保持多少個IMMUTABLE segment,因為Receiver端的性能和Fragment File的數量呈負相關。默認值是 100。
  • kylin.stream.segment.retention.policy: 當Segment狀態變為IMMUTABLE,該配置指定了Receiver如何處理本地Segment Cache。可選值包含purge和fullBuild。設置為purge后,Receiver會等待一定時間后刪除本地數據;設置為fullBuild后,數據會上傳到HDFS並等待構建。默認值是fullBuild。
  • kylin.stream.consume.offsets.latest:指定了Receiver從什么位置開始消費,設置成true則從最新的offset開始消費,false則從最早的(earliest)位置消費。默認值是 true。

至於上述參數的最佳實踐,暫時沒有,kylin streaming還是比較新的,可能有些配置還需要不斷試錯才能知道哪個比較好~

總結

小結一下,本篇簡單介紹了kylin streaming的功能,介紹了構建和查詢在系統內部的邏輯流程。然后討論了下kylin streaming在內部的一些實現細節。最后從在配置上說明有哪些點可以進行優化(比較簡陋)。

總的來說,kylin streaming繼承了kylin的優點,那就是查詢快,能容納大量數據。但缺點也明顯,那就是靈活性欠佳,可能改下schema就要重新構建model,cube什么的。

以上~~

參考文章:

Real-time OLAP

Real-time Streaming Design in Apache Kylin

Deep dive into Kylin's Real-time OLAP


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM