前言
最近搭了Kylin Streaming並初步測試了下,覺得這個東西雖然有些限制,但還是蠻好用的,所以系統寫篇文章總結下其原理和一些配置。
Kylin Streaming是Kylin3.0最新引入的一個功能,意為OLAP查詢提供亞秒級的數據延遲,即在攝入數據后,立即可以在OLAP查詢中體現出來。
用過Kylin應該都知道,它主要是通過預構建的方式,將數據從Hive預先計算然后存儲到Hbase。查詢的時候一些復雜操作直接轉成Hbase的scan和filter操作,大大降低OLAP查詢響應時間。
而在比較早版本的Kylin其實是有提供從kafka構建流式應用的,只是那時候走的還是預構建然后存到hbase的路子。這其實是微批的思路,缺點是延遲會比較高(十幾分鍾級別的延遲)。
這個延遲在某些場景下肯定是無法使用的,所以19年Kylin開始對實時計算這塊進行開發,20年5月份時候的kylin3.0.2算是第一個正式可用的streaming版本(另外2020.7.2,kylin3.1.0已發布)。數據源還是kafka,只是現在增加了其他模塊已支持亞秒級的數據延遲。
接下來主要從架構說起,再說到底層一些組件的實現方式,最后討論下一些功能方面的實現以及具體的配置。
kylin streaming設計和原理
架構介紹
kylin streaming在架構上增加了兩個模塊,可以看看架構圖。
其中藍色方框內的就是增加的組件內容,那么整個kylin streaming 包含的組件有:
- Kafka Cluster [data source]
- HBase Cluster [historical part storage]
- Zookeeper Cluster [receiver metadata storage]
- MapReduce [distributed computation]
- HDFS [distributed storage]
- Kylin Process [job server/query server/coordinator]
- Kylin streaming receiver Cluster [real-time part computation and storage]
- Query Engine,Build Engine
后面兩個就是在kylin streaming中新增加的模塊,即streaming coordinator和streaming receiver cluster。其他的還有用於構建源表的kafka,存儲數據的Hdfs和Hbase,計算結果用的MapReduce。這里只重點介紹后面兩個。
streaming coordinator
streaming coordinator相當於streaming receiver cluster的master,主要負責做一些協調分配工作,比如分配kafka哪個分區的數據分配到哪個streaming receiver的副本,控制消費速率等等。
而具體要指定哪個節點為streaming coordinator,只需要指定kylin.server.mode配置為all(all模式還包含了query server和job server模塊)或stream_coordinator就行。另外可以部署多台機器為streaming coordinator以預防單點故障。
streaming receiver cluster
streaming receiver即streaming receiver cluster的worker,被streaming coordinator管理。主要負責:
- 攝入實時數據
- 構建基礎的cuboid
- 接收查詢請求,根據自身數據執行請求並返回
- 將自己緩存的segment信息持久化到HDFS
另外為了容災,多個streaming receiver可以組成一個Replica Set。這一個Replica Set中的streaming receiver都會執行同樣的任務(即消費同樣的kafka分區),它們的作用僅僅是當某個receiver失效的時候可以快速切換。
當然還有Query Engine和Build Engine,即查詢引擎和構建引擎,這些都是kylin原先就有的模塊。用於執行查詢SQL和構建cube。在kylin streaming中,都對原先的模塊做了拓展,以支持實時情況下的查詢和構建。那么接下來來看看實時情況下構建和查詢的流程。
kylin streaming數據構建流程
在kylin streaming種,數據會先存儲在內存中,經過一定時間后會通過構建cube的方式持久化到hdfs/hbase中,而這里的數據構建流程,則是包含整個數據的生命周期。
這里主要將文檔種的內容搬過來,主要流程如下:
- Coordinator向流式cube的所有分區的streaming source發送請求確認信息
- Coordinator分配哪一個receivers消費streaming數據,並向receivers發送請求開始消費數據
- receiver消費數據並構建索引
- 一段時間以后,receiver將immutable狀態的segment從本地持久化到hdfs(關於streaming segment狀態變更參見下文)
- receiver通知coordinator一個segment已經持久化到hdfs
- 在所有receivers(多個分區)提交其對應segment后,coordinator提交一個全量cube構建任務(在內存中segment只構建最基礎的cuboid)到build engine
- build engine從hdfs文件中構建全量的cuboid
- build engine村寧次cuboid數據到hbase,然后coordinator會通知receivers刪除本地存儲的實時數據
然后接下來再看看查詢的流程
kylin streaming查詢流程
- 如果查詢命中一個streaming cube,query engine向cube的receivers的Coordinator發送請求
- query engine發送查詢請求到對應的receivers查詢實時segment
- query engine發送請求到hbase查詢已經持久化的歷史segment
- query engine聚合實時和歷史數據,然后返回給客戶端
以上就是kylin streaming查詢引擎和構建引擎的大致流程,接下來再說說一些內部實現細則。
kylin streaming實現細節
kylin streaming segment存儲實現
常見的流處理處理的數據時間,通常有兩種,包括事件時間(event time),處理時間(process time)。kylin streaming的存儲結構是,按照事件時間,將數據存儲成一個一個的segment。
先說下消息接收消息的存儲邏輯,當一個消息到達的時候,會根據事件時間查詢對應的segment存在否,如果不存在則去創建對應的segment。
而segment在一開始創建的時候,它的狀態是Active,但是當一定時間(這個時間根據配置)沒有消息到達,該segment的狀態就會變成Immutable,然后存儲到Hdfs中。
初始的segment是在內存中進行數據聚合和度量計算的(注意receiver只計算基礎的coboid和指定的coboid,而非離線數據那樣計算全量的cuboid),但是到達一定大小(也是配置)后,會刷到磁盤上存儲成fragment文件,而fragment文件到達一定大小后,又會觸發merge操作,異步將多個fragment文件進行合並。
其中相同cuboid的數據會存儲在同一文件夾下,而metadata則以json格式另外存儲。
為了提高查詢性能,fragment的存儲格式是列式存儲格式,如下圖所示:
這張圖將前面講到的東西都基本包含了,數據中存儲了維度數據(dim)和度量數據(Metrics)。維度數據存儲成三個部分數據:
- 第一部分是Dictionary(字典)編碼部分,當維度的encoding屬性設置為‘Dict’時存在。
- 第二部分則是數據的值/dictionary編碼后的值,這部分數據會被壓縮。
- 第三部分是倒排索引,存儲倒排索引是數據結構是Roaring Bitmap,一種優化過后的bitmap結構。
重平衡/重分配
在某些情況下(比如Kafka消息快速增長),當前的receiver集群可能出現負載不平衡的現象,這時候需要讓revicer下線,重平衡以使整個集群負載均衡。
重平衡是自動發生,這個過程可能會持續幾秒的時間。
在實際過程中,重平衡是一個從CurrentAssignment狀態到NewAssignment狀態的過程。 整個重平衡操作是一個分布式事務,可以分為四個步驟。 如果一個步驟失敗,將執行自動回滾操作。
主要流程如下:
- 停止當前分配狀態(CurrentAssignment)所有Receiver,並且每個Receiver將消費的offset量報告給coordinator,coordinator合並每個分區的偏移量,保留最大偏移量(Replica Set中的Receiver可能消費進度不一樣)。然后通知Receiver消費到統一(最大)偏移量,然后再次停止消耗
- coordinator向所有新分配狀態(NewAssignment)下的Receiver發送一個分配請求,更新其分配情況。然后coordinator向所有newAssignment的Receiver發送一個startConsumer請求,要求他們根據上一步中的分配情況開始消費。
- 向刪除的Replica Set所屬的所有接收方發送ImmutableCube請求,要求它們強制將所有segment轉換為Immutable segment。
- 更新元數據並將NewAssignment + RemovedAssignment記錄到元數據中(已刪除的Replica Set仍會接受查詢請求,直到重分配完成)
故障恢復
在流處理中,由於數據是無界的,所以故障是不可避免的,哪怕是上面提到的Replica Set,也只能盡可能減少故障影響。所以對付故障的重點並非預防,而實在於如何進行故障恢復。
kylin streaming對故障恢復的做法是在receiver端定期進行checkpoint,這樣當receiver故障重啟后數據也能正確重新處理(依賴於kafka的上游備份能力)。
而checkpoint主要有兩部分內容需要checkpoint,第一部分是消費信息,即kafka的消費offset信息。第二部分則是磁盤數據狀態信息,即最新的{segment:framentID}信息。
那么當重啟的時候,發現磁盤上的fragment數據比checkpoint記錄的磁盤信息數據新怎么辦呢?答案是刪除沒有checkpoint的數據。
kylin streaming優化
首先先來說說Coordinator和Replica Set的數量,在實際生產環境中,為了避免單點故障問題,最好是能夠將Coordinator部署兩個或以上。而Replica Set的數量則與數據源,即kafka topic的分區數相關。kylin本身提供一個配置,可以讓我們指定一個topic全部分區由多少個Replica Set消費,所以Replica Set的數量應該與topic的分區數呈倍數關系或冗余一兩個,以便充分利用集群的負載的同時增加容錯性。
還有一點,記得前面提到的一個Replica Set由多個Receiver組成嗎,所以最好一個Replica Set中配置兩個Receiver實例。
下面列舉下跟優化相關的一些配置,並且會解釋對於配置的作用。
PS:由於kylin streaming模塊還處於高速迭代的階段,有些配置的說明或默認值可能會發生更改,詳細還是以官網最新資料為准。
- kylin.stream.receiver.use-threads-per-query:指定每個查詢默認的線程數(The parallelism of scan in receiver side),默認是8。可以根據負載和數據情況,適當調大此參數。
- kylin.stream.index.maxrows: 指定了緩存在堆內的聚合后的事件最大行數。默認值是50000。這個參數會影響Fragment File的數量,可以根據需求適當調高。
- kylin.stream.cube-num-of-consumer-tasks: 指定了一個topic的全部消息的攝入將由多少Replica Set來負責,即一個topic的全部分區分配到多少個Replica Set,當然也跟你當前的Replica Set數量有關。如果消息速率較大,需要適當提升這個數值。默認值是3。
- kylin.stream.checkpoint.file.max.num: 指定了Receiver為每一個Cube保留的checkpoint文件數量。默認值是 5。
- kylin.stream.index.checkpoint.intervals: 指定了Receiver進行checkpoint的間隔。默認值是 300秒。有關checkpoint內容請參閱上面介紹。
- kylin.stream.cube.window: 指定了Streaming Segment的時間間隔。比如說[2019-01-01 11:00:00, 2019-01-01 12:00:00]就是一個segment的時間間隔,在這個時間內到達的消息都會歸檔到這個segment中(當然不能超過配置的大小),默認值是3600。
- kylin.stream.cube.duration: 指定了Streaming Segment會等待遲到的消息多久,默認值7200。接上述的例子,意思是如果一個消息遲到7200秒以內,它還是會被歸檔到[2019-01-01 11:00:00, 2019-01-01 12:00:00]這個segment中。
- kylin.stream.immutable.segments.max.num: 指定了在Receiver端,一個Cube最多可以保持多少個IMMUTABLE segment,因為Receiver端的性能和Fragment File的數量呈負相關。默認值是 100。
- kylin.stream.segment.retention.policy: 當Segment狀態變為IMMUTABLE,該配置指定了Receiver如何處理本地Segment Cache。可選值包含purge和fullBuild。設置為purge后,Receiver會等待一定時間后刪除本地數據;設置為fullBuild后,數據會上傳到HDFS並等待構建。默認值是fullBuild。
- kylin.stream.consume.offsets.latest:指定了Receiver從什么位置開始消費,設置成true則從最新的offset開始消費,false則從最早的(earliest)位置消費。默認值是 true。
至於上述參數的最佳實踐,暫時沒有,kylin streaming還是比較新的,可能有些配置還需要不斷試錯才能知道哪個比較好~
總結
小結一下,本篇簡單介紹了kylin streaming的功能,介紹了構建和查詢在系統內部的邏輯流程。然后討論了下kylin streaming在內部的一些實現細節。最后從在配置上說明有哪些點可以進行優化(比較簡陋)。
總的來說,kylin streaming繼承了kylin的優點,那就是查詢快,能容納大量數據。但缺點也明顯,那就是靈活性欠佳,可能改下schema就要重新構建model,cube什么的。
以上~~
參考文章: