版權聲明:本文為博主原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。
本文鏈接:https://blog.csdn.net/a_drjiaoda/article/details/88290620
《一份詳細的Kylin執行樣例——基於kylin2.5.0》一文中,小廚介紹了如何使用kylin,今天開辟一個新路線,個人感覺隨着時代的發展,數據實時分析會成為主力軍,因為今天介紹一下Kafka+kylin,流式構建cube。kylin是從1.5版本開始,引入Streaming Table,目的是減少OLAP分析的延時,到目前的2.5.0版本 Streaming做的已經相當較成熟了。
主要方法:kylin通過已經創建好的虛擬表 Streaming Table,周期性地從kafka中讀取數據,根據我們定義的Model和cube,將計算好的數據存入到Hbase,供用戶查詢。
我們的業務場景和上文中的一樣,並且數據格式跟之前文章中提過的類似,依然是統計pv uv的值,只不過這里講其都轉換成了JSON格式,據說到目前,kylin流式構建只支持JSON的數據格式。{"access_time":1437494731879,"cookieId":"M211FDN88TXQHM9QEU","regionId":"G01","cityId":"G0102","site":"5143","os":"Android 7.0","pv":"9"}。
主要步驟:
在Kylin中建立新的Project;
Kylin中同步數據源(Load DataSource);
Kylin中建立數據模型(New Model);
定義New Cube
Build Cube;
查詢Cube
注意:在 kylin v2.4.0 以前,Streaming Cube 不支持與 lookup 表進行 join;當定義數據模型時,只選擇 fact 表,不選 lookup 表;這意味着你JSON含有哪些字段,之后你只能顯示和操作哪些字段。舉個例子,上面我們的"regionId":"G01"代表的是省會(安徽、陝西等),"cityId":"G0102"代表城市(合肥、西安),我們期望能通過關聯其他兩張表(region_tb1和city_tb1)來顯示相關文字,但是在v2.4.0之前不可能做到,在其之后可以通過lookup表的方式來關聯表之間的關系。
一、定義數據表
首先,我們將JSON格式的數據放入到Streaming DataSource的JSON框中,點擊》圖標,kylin會自動映射成表的字段,輸入Table Name,這里需要注意的是,進入的字段可能映射的字段的類型不准確,比如我的pv它映射成了date,但是我后面的是要將其作為sum的,所以需要手動修改一下,省的后來還得回來重新創建表。這里面我們后來是要分鍾級別的構建cube,因此我們要選到minute為止。之后配置你的Kafka集群的信息,包括topic name以及broker list,我這里name是web_pvuv_kylin_streaming_topic。
注意:Kylin允許用戶挑選一些從業務時間戳上衍生出來的時間維度(Derived Time Dimension),具體來說有如下幾種。
·minute_start:業務時間戳所在的分鍾起始時間,類型為Timestamp(yyyy-MM-dd HH:mm:ss)。
·hour_start:業務時間戳所在的小時起始時間,類型為Timestamp(yyyy-MM-dd HH:mm:ss)。
·day_start:業務時間戳所在的天起始時間,類型為Date(yyyy-MMdd)。
·week_start:業務時間戳所在的周起始時間,類型為Date(yyyy-MMdd)。
·month_start:業務時間戳所在的月起始時間,類型為Date(yyyy-MMdd)。
·quarter_start:業務時間戳所在的季度起始時間,類型為Date(yyyyMM-dd)。
·year_start:業務時間戳所在的年起始時間,類型為Date(yyyy-MMdd)。
這些衍生時間維度都是可選的,如果用戶選擇了這些衍生維度,那么在對應的時間粒度上進行聚合時就能夠獲得更好的查詢性能,一般來說不推薦把原始的業務時間戳選擇成一個單獨的維度,因為該列的基數一般都是很大的。
知識點:經過測試發現,通過Streaming建造的表都是默認在DEFAULT數據庫中,目前沒找到辦法改。回到model中,我們看DataSource中剛剛創建的表如下:
二、創建Model
1、創建Model,選擇new Model。輸入名稱為:web_pvuv_kylin_streaming_model。(和下圖些許不同,但沒啥影響)
2、依然是選擇事實表,並且添加lookup table,j建立事實表與衍生表之間的關系。
添加第二張lookup table
選擇事實表和lookup 表之后,單擊next
3、主要是選擇各個表的維度,我們既可以選擇普通的維度,又可以選擇衍生的時間維度。注意,一般不推薦直接選擇業務時間戳作為維度(也就是這里的access_time),因為業務時間戳的精度往往是精確到秒級甚至是毫秒級的,使用它作為一個維度失去了聚合的意義,也會讓整個Cube的體積變得非常龐大
4、選擇需要度量指標的字段,本例中使用cookid和PV
5、在創建Model對話框的第五步設置中,一般選擇最小粒度的衍生時間維度作為分割時間列,在這里我們選MINUTE_START,它的數據格式為yyyy-MM-dd HH:mm:ss。有了分割時間列,就可以對Cube進行分鍾級的流式構建了
至此,點擊save保存,model:web_pvuv_kylin_streaming就創建完成了。
三、創建cube
1、點擊 new cube,綁定剛剛創建的模型,輸入Cube名稱:web_pvuv_kylin_streaming_cube
2、不要使用access_time,因為它的粒度太細。注意:這里我們不選擇access_time,否則創建的cube體積太大。
3、Measures為度量指標的設計。這里我們對pv進行sum匯總計算,對cookieid進行count distinct去重過濾統計個數。為了讓大家更清楚,我們將添加的Measure的頁面也截出來,這里面有很多聚合函數,Top,count、sum等等;
4、創建Cube的第四步是設置Cube的自動合並時間。因為流式構建需要頻繁地構建較小的Segment,為了不對存儲器造成過大的壓力,同時也為了獲取較好的查詢性能,因此需要通過自動合並將已有的多個小Segment合並成一個較大的Segment。所以,這里將設置一個層級的自動合並時間:0.5小時、4小時、1天、7天、28天。此外,設置保留時間為30天。此外,我們設置分區時間是從2013年開始,因為我們的模擬數據是從13年開始。
5、在Advanced Setting,Aggregation Groups設置中,可以把衍生時間維度設置為Hierarchy關系,
調整Rowkey,調整衍生時間維度,一般把上面時間衍生維度的行放在上面,拖拽即可,如下圖:
單擊下圖中的save,即可成功創建cube。
結束之后,我們可以在Model中查看到這個cube,並且它的狀態時disable的,cube Size為0。
四、配置運行kafka
在kafka創建topic之前,我們先看看三台虛機都要啟動哪些進程,如下圖所示:
1、在kakfa創建Topic:web_pvuv_kylin_streaming_topic。使用以下命令,確保已經配置了kafka的環境變量:
kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 3 --partitions 3 --topic web_pvuv_kylin_streaming_topic
五、首次Build cube + simple Kylin test
創建好topic之后,我們先去試一下剛剛的web_pvuv_kylin_streaming_cube是否能夠build。
【注意:】這里在提一句,v2.5.0版本的kylin支持在web界面產生build操作,有些低版本的kylin不直接GUI方式build,只能通過在kylin的bin目錄下使用命令來手動去build cube。之后為了保證流處理方式,每隔幾分鍾自動去創建build cube,實質上也就是linux在隔幾分鍾去執行 build cube的命令,下文我們會提到。
顯然,在topic創建好立即執行build 操作會報錯,這是由於Kylin 會記錄每一個 build 的 offsets;當收到一個 build 請求,它將會從上一個結束的位置開始,然后從 Kafka 獲取最新的 offsets。第一次沒有數據時,沒有offset則會報錯。
我們使用eclipse多做kafka producer像該topic中發送數據,先發五條(這里我的計數是從0開始),立即停止,驗證第一build時能否正確拿數據。停止之后,我們去kylin的web界面中build cube(這一步不貼圖了)。
我們使用kafka -console-consumer.sh在cli中查看是否是五條,如下,驗證通過。
此時回到kylin的web界面,在monitor上界面上可以看到上述的cube在經歷23步之后已經build完成,歷時7.9mins,大家可能回想為什么僅僅四條數據而已,為什么會花費這么多時間,其實這個只是第一次build的時候時間長,需要建表等一系列操作,我們在測試時發現,之后再build的時候時間會縮小一些。
從下圖Model可以看出,建立的cube已經是READY狀態,正好是五條數據。
那我先來做個簡單的測試,從這五條數中查詢一些信息,當然大家會想才5條數據而已,查詢起來這很簡單,但是這只是測試用例。下面我們會讓kafka消息隊列中產生大量的數據,再進行測試。從下圖可以看出,我們查出來的數據是正確的,而且速度也非常快,可以提交給上級業務部門進行使用啦。
六、終極Test
簡單測試之后,我們現在來讓kafka+kylin自動跑起來。
要求:@1、使用Kakfa程序向topic中每秒發送10條數據,即每分鍾600條數據;@2、cube每隔5分鍾自動創建一次。
步驟:
1、寫kakfa Producer程序,發送Json數據。有興趣的同學,可以查看我的另外一篇博客《kafka之Producer實現》,分享使我快樂,如果覺得有幫助,點個贊吧。哈哈
2、使用 crontab -e 命令,輸入以下:(單擊三下可復制)
*/5 * * * * echo $(date) >> /usr/kylin/streamingBuild.txt; curl -X PUT --user ADMIN:KYLIN -H "Content-Type: application/json;charset=utf-8" -d '{ "sourceOffsetStart": 0, "sourceOffsetEnd": 9223372036854775807, "buildType": "BUILD"}' http://192.168.1.203:7070/kylin/api/cubes/web_pvuv_kylin_streaming_cube/build2
下圖就是通過自動構建的命令,自動build cube,可以看出,是正在建立cube。
七、流式構建原理
流式構建需要達到分鍾級的數據更新頻率,Kylin的做法是每隔一段時間(INTERVAL)就啟動一次微構建,用於處理最新的一批數據。這種做法的理念有一些類似於Spark Streaming,它們也是將流數據視作一種特殊的微批次來處理的。由於分布式網絡存在延遲等因素,消息可能存在延遲,因此不能為某一時刻剛剛過去的那幾分鍾立刻構建微批次。
舉例來說,如果在每個微構建中要處理5分鍾的增量數據,假設消息隊列中的消息最多可能有10分鍾的延遲(對應於“Margin”),那么就不能在1:00的時候立刻嘗試去構建0:55到1:00這5分鍾的數據,因為這部分數據的消息最遲可能在1:10分才會到齊,否則構建出來的Segment就存在很大的遺漏數據的風險。此時,需要像增量構建中提到的“數據持續更新”的情形一樣,對過往的Segment進行刷新操作。但是目前流式構建並不支持Segment刷新操作,所以,最早只能在1:10開始構建0:55到1:00這部分的數據。這中間的延遲我們稱之為DELAY,它等於每個微構建批次的時間(INTERVAL)加上消息最長可能延遲的時間(MARGIN)。
————————————————
版權聲明:本文為CSDN博主「夏天小廚」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/a_drjiaoda/article/details/88290620