Kafka+kylin——kylin2.5.0流式構建


版權聲明:本文為博主原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。

               

本文鏈接:https://blog.csdn.net/a_drjiaoda/article/details/88290620

《一份詳細的Kylin執行樣例——基於kylin2.5.0》一文中,小廚介紹了如何使用kylin,今天開辟一個新路線,個人感覺隨着時代的發展,數據實時分析會成為主力軍,因為今天介紹一下Kafka+kylin,流式構建cubekylin是從1.5版本開始,引入Streaming Table,目的是減少OLAP分析的延時,到目前的2.5.0版本 Streaming做的已經相當較成熟了。

 

主要方法:kylin通過已經創建好的虛擬表 Streaming Table,周期性地從kafka中讀取數據,根據我們定義的Modelcube,將計算好的數據存入到Hbase,供用戶查詢。

 

我們的業務場景和上文中的一樣,並且數據格式跟之前文章中提過的類似,依然是統計pv uv的值,只不過這里講其都轉換成了JSON格式,據說到目前,kylin流式構建只支持JSON的數據格式。{"access_time":1437494731879,"cookieId":"M211FDN88TXQHM9QEU","regionId":"G01","cityId":"G0102","site":"5143","os":"Android 7.0","pv":"9"}

 

主要步驟:

 

Kylin中建立新的Project

Kylin中同步數據源(Load DataSource);

Kylin中建立數據模型(New Model);

定義New Cube

Build Cube

查詢Cube

注意:在 kylin v2.4.0 以前,Streaming Cube 不支持與 lookup 表進行 join;當定義數據模型時,只選擇 fact 表,不選 lookup ;這意味着你JSON含有哪些字段,之后你只能顯示和操作哪些字段。舉個例子,上面我們的"regionId":"G01"代表的是省會(安徽、陝西等),"cityId":"G0102"代表城市(合肥、西安),我們期望能通過關聯其他兩張表(region_tb1city_tb1)來顯示相關文字,但是在v2.4.0之前不可能做到,在其之后可以通過lookup表的方式來關聯表之間的關系。

 

一、定義數據表

 

首先,我們將JSON格式的數據放入到Streaming DataSourceJSON框中,點擊》圖標,kylin會自動映射成表的字段,輸入Table Name,這里需要注意的是,進入的字段可能映射的字段的類型不准確,比如我的pv它映射成了date,但是我后面的是要將其作為sum的,所以需要手動修改一下,省的后來還得回來重新創建表。這里面我們后來是要分鍾級別的構建cube,因此我們要選到minute為止。之后配置你的Kafka集群的信息,包括topic name以及broker list,我這里nameweb_pvuv_kylin_streaming_topic

 

 

 

注意:Kylin允許用戶挑選一些從業務時間戳上衍生出來的時間維度(Derived Time Dimension),具體來說有如下幾種。

·minute_start:業務時間戳所在的分鍾起始時間,類型為Timestampyyyy-MM-dd HHmmss)。

·hour_start:業務時間戳所在的小時起始時間,類型為Timestampyyyy-MM-dd HHmmss)。

·day_start:業務時間戳所在的天起始時間,類型為Dateyyyy-MMdd)。

·week_start:業務時間戳所在的周起始時間,類型為Dateyyyy-MMdd)。

·month_start:業務時間戳所在的月起始時間,類型為Dateyyyy-MMdd)。

·quarter_start:業務時間戳所在的季度起始時間,類型為DateyyyyMM-dd)。

·year_start:業務時間戳所在的年起始時間,類型為Dateyyyy-MMdd)。

 

這些衍生時間維度都是可選的,如果用戶選擇了這些衍生維度,那么在對應的時間粒度上進行聚合時就能夠獲得更好的查詢性能,一般來說不推薦把原始的業務時間戳選擇成一個單獨的維度,因為該列的基數一般都是很大的。

 

知識點:經過測試發現,通過Streaming建造的表都是默認在DEFAULT數據庫中,目前沒找到辦法改。回到model中,我們看DataSource中剛剛創建的表如下:

 

 

二、創建Model

 

1、創建Model,選擇new Model。輸入名稱為:web_pvuv_kylin_streaming_model。(和下圖些許不同,但沒啥影響)

 

 

2、依然是選擇事實表,並且添加lookup tablej建立事實表與衍生表之間的關系。

 

 

添加第二張lookup table

 

 

選擇事實表和lookup 表之后,單擊next

 

 

3、主要是選擇各個表的維度,我們既可以選擇普通的維度,又可以選擇衍生的時間維度。注意,一般不推薦直接選擇業務時間戳作為維度(也就是這里的access_time),因為業務時間戳的精度往往是精確到秒級甚至是毫秒級的,使用它作為一個維度失去了聚合的意義,也會讓整個Cube的體積變得非常龐大

4、選擇需要度量指標的字段,本例中使用cookidPV

 

 

5、在創建Model對話框的第五步設置中,一般選擇最小粒度的衍生時間維度作為分割時間列,在這里我們選MINUTE_START,它的數據格式為yyyy-MM-dd HHmmss。有了分割時間列,就可以對Cube進行分鍾級的流式構建了

 

 

至此,點擊save保存,modelweb_pvuv_kylin_streaming就創建完成了。

 

三、創建cube

 

1、點擊 new cube,綁定剛剛創建的模型,輸入Cube名稱:web_pvuv_kylin_streaming_cube

 

 

 2、不要使用access_time,因為它的粒度太細。注意:這里我們不選擇access_time,否則創建的cube體積太大。

 

 

3Measures為度量指標的設計。這里我們對pv進行sum匯總計算,對cookieid進行count distinct去重過濾統計個數。為了讓大家更清楚,我們將添加的Measure的頁面也截出來,這里面有很多聚合函數,Topcountsum等等;

 

 

 

 

4、創建Cube的第四步是設置Cube的自動合並時間。因為流式構建需要頻繁地構建較小的Segment,為了不對存儲器造成過大的壓力,同時也為了獲取較好的查詢性能,因此需要通過自動合並將已有的多個小Segment合並成一個較大的Segment。所以,這里將設置一個層級的自動合並時間:0.5小時、4小時、1天、7天、28天。此外,設置保留時間為30天。此外,我們設置分區時間是從2013年開始,因為我們的模擬數據是從13年開始。

 

 

5、在Advanced SettingAggregation Groups設置中,可以把衍生時間維度設置為Hierarchy關系,

 

 

調整Rowkey,調整衍生時間維度,一般把上面時間衍生維度的行放在上面,拖拽即可,如下圖:

 

 

單擊下圖中的save,即可成功創建cube

 

 

結束之后,我們可以在Model中查看到這個cube,並且它的狀態時disable的,cube Size0

 

 

四、配置運行kafka

 

kafka創建topic之前,我們先看看三台虛機都要啟動哪些進程,如下圖所示:

 

 

1、在kakfa創建Topicweb_pvuv_kylin_streaming_topic。使用以下命令,確保已經配置了kafka的環境變量:

 

kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 3 --partitions 3 --topic web_pvuv_kylin_streaming_topic

 

 

五、首次Build cube + simple Kylin test

 

創建好topic之后,我們先去試一下剛剛的web_pvuv_kylin_streaming_cube是否能夠build

 

【注意:】這里在提一句,v2.5.0版本的kylin支持在web界面產生build操作,有些低版本的kylin不直接GUI方式build,只能通過在kylinbin目錄下使用命令來手動去build cube。之后為了保證流處理方式,每隔幾分鍾自動去創建build cube,實質上也就是linux在隔幾分鍾去執行 build cube的命令,下文我們會提到。

 

顯然,在topic創建好立即執行build 操作會報錯,這是由於Kylin 會記錄每一個 build offsets;當收到一個 build 請求,它將會從上一個結束的位置開始,然后從 Kafka 獲取最新的 offsets。第一次沒有數據時,沒有offset則會報錯。

 

 

我們使用eclipse多做kafka producer像該topic中發送數據,先發五條(這里我的計數是從0開始),立即停止,驗證第一build時能否正確拿數據。停止之后,我們去kylinweb界面中build cube(這一步不貼圖了)。

 

 

我們使用kafka -console-consumer.shcli中查看是否是五條,如下,驗證通過。

 

 

此時回到kylinweb界面,在monitor上界面上可以看到上述的cube在經歷23步之后已經build完成,歷時7.9mins,大家可能回想為什么僅僅四條數據而已,為什么會花費這么多時間,其實這個只是第一次build的時候時間長,需要建表等一系列操作,我們在測試時發現,之后再build的時候時間會縮小一些。

 

 

從下圖Model可以看出,建立的cube已經是READY狀態,正好是五條數據。

 

 

那我先來做個簡單的測試,從這五條數中查詢一些信息,當然大家會想才5條數據而已,查詢起來這很簡單,但是這只是測試用例。下面我們會讓kafka消息隊列中產生大量的數據,再進行測試。從下圖可以看出,我們查出來的數據是正確的,而且速度也非常快,可以提交給上級業務部門進行使用啦。

 

 

 

 

六、終極Test

 

簡單測試之后,我們現在來讓kafka+kylin自動跑起來。

 

要求:@1、使用Kakfa程序向topic中每秒發送10條數據,即每分鍾600條數據;@2cube每隔5分鍾自動創建一次。

 

步驟:

 

1、寫kakfa Producer程序,發送Json數據。有興趣的同學,可以查看我的另外一篇博客《kafkaProducer實現》,分享使我快樂,如果覺得有幫助,點個贊吧。哈哈

 

2、使用 crontab -e 命令,輸入以下:(單擊三下可復制)

 

*/5 * * * *  echo $(date)  >> /usr/kylin/streamingBuild.txt; curl -X PUT --user ADMIN:KYLIN -H "Content-Type: application/json;charset=utf-8" -d '{ "sourceOffsetStart": 0, "sourceOffsetEnd": 9223372036854775807, "buildType": "BUILD"}' http://192.168.1.203:7070/kylin/api/cubes/web_pvuv_kylin_streaming_cube/build2

 

下圖就是通過自動構建的命令,自動build cube,可以看出,是正在建立cube

 

 

七、流式構建原理

 

流式構建需要達到分鍾級的數據更新頻率,Kylin的做法是每隔一段時間(INTERVAL)就啟動一次微構建,用於處理最新的一批數據。這種做法的理念有一些類似於Spark Streaming,它們也是將流數據視作一種特殊的微批次來處理的。由於分布式網絡存在延遲等因素,消息可能存在延遲,因此不能為某一時刻剛剛過去的那幾分鍾立刻構建微批次。

 

舉例來說,如果在每個微構建中要處理5分鍾的增量數據,假設消息隊列中的消息最多可能有10分鍾的延遲(對應於“Margin”),那么就不能在100的時候立刻嘗試去構建0551005分鍾的數據,因為這部分數據的消息最遲可能在110分才會到齊,否則構建出來的Segment就存在很大的遺漏數據的風險。此時,需要像增量構建中提到的“數據持續更新”的情形一樣,對過往的Segment進行刷新操作。但是目前流式構建並不支持Segment刷新操作,所以,最早只能在110開始構建055100這部分的數據。這中間的延遲我們稱之為DELAY,它等於每個微構建批次的時間(INTERVAL)加上消息最長可能延遲的時間(MARGIN)。

 

 

 

————————————————

版權聲明:本文為CSDN博主「夏天小廚」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。

原文鏈接:https://blog.csdn.net/a_drjiaoda/article/details/88290620


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM