數棧是雲原生—站式數據中台PaaS,我們在github和gitee上有一個有趣的開源項目:FlinkX,FlinkX是一個基於Flink的批流統一的數據同步工具,既可以采集靜態的數據,也可以采集實時變化的數據,是全域、異構、批流一體的數據同步引擎。大家喜歡的話請給我們點個star!star!star!
github開源項目:https://github.com/DTStack/flinkx
gitee開源項目:https://gitee.com/dtstack_dev_0/flinkx
平台建設背景
傳統離線數據開發時效性較差,無法滿足快速迭代的互聯網需求。伴隨着以Flink為代表的實時技術的飛速發展,實時計算越來越多的被企業使用,但是在使用中下面提到的各種問題也隨之而來。開發者使用門檻高、產出的業務數據質量沒有保障、企業缺少統一平台管理難以維護等。在諸多不利因素的影響下,我們決定利用現有的Flink技術構建一套完整的實時計算平台。
平台總體架構
從總體架構來看,實時計算平台大體可以分為三層,計算平台、調度平台、資源平台。每層承擔着相應的功能,同時層於層之間又有交互,符合高內聚、低耦合的設計原則,架構圖如下:
01
計算平台
直接面向開發人員使用,可以根據業務接入各種外部數據源,提供后續任務使用。數據源配置完成后,就可以在上面做基於Flink框架可視化的數據同步、sql化的數據計算的工作,並且可以對運行中的任務進行多維度的監控和告警。
02
調度平台
該層接收到平台傳過來的任務內容、配置后,接下來就是比較核心的工作,也是下文中重點展開的內容,這里先做一個大體的介紹。根據任務類型的不同將使用不同的插件進行解析。
- 數據同步任務:接收到上層傳過來的json后,進入到FlinkX框架中,根據數據源端和寫出目標端的不同生成對應的DataStream,最后轉換成JobGraph。
- 數據計算任務:接收到上層傳過來的sql后,進入到FlinkStreamSql框架中,解析sql、注冊成表、生成transformation,最后轉換成JobGraph。
調度平台將得到的JobGraph提交到對應的資源平台,完成任務的提交。
03
資源平台
目前可以對接多套不同的資源集群,並且也可以對接不同的資源類型,如:yarn和k8s.
數據同步和數據計算
在調度平台中,接收到用戶的任務后就開始了后面的一系列的轉換操作,最終讓任務運行起來。我們從底層的技術細節看看到底是如何基於Flink構建實時計算平台,如何使用FlinkX、FlinkStreamSql做一站式開發。
01
FlinkX
作為數據處理的第一步,也是最基礎的一步,我們看看FlinkX是如何在Flink的基礎上做二次開發,使用用戶只需要關注同步任務的json腳本和一些配置,無需關心調用Flink的細節,並支持下圖中的功能。
我們先看下Flink任務提交中涉及到流程,其中的交互流程圖如下:
那么FlinkX又是如何在Flink的基礎對上述組件進行封裝和調用的,使得Flink作為數據同步工具使用更加簡單,主要從Client、JobManager、TaskManager三個部分進行擴展,涉及到的內容如下圖:
1、Client端:
FlinkX對原生的client做了部分定制化開發,在FlinkX-launcher模塊下,主要有以下幾個步驟:
1)解析參數,如:並行度、savepoint路徑、程序的入口jar包(平常寫的Flink demo)、Flink-conf.yml中的配置等。
2)通過程序的入口jar包、外部傳入參數、savepoint參數生成PackagedProgram
3)通過反射調用PackagedProgram中指定的程序的入口jar包的main方法,在main方法中,通過用戶配置的reader和writer的不同,加載對應的插件。
4)生成JobGraph,將其中需要的資源(Flink需要的jar包、reader和writer的jar包、Flink配置文件等)加入到YarnClusterDescriptor的shipFiles中,最后YarnClusterDescriptor就可以和yarn交互啟動JobManager
5)任務提交成功后,Client端就可得到yarn返回的applicationId,后續既可以通過application跟蹤任務的狀態。
2、JobManager端:
client端提交完后,隨后yarn啟動jobmanager,jobmanager會啟動一些自己內部服務,並且會構建ExecutionGraph在這個過程中FlinkX主要做了以下兩件事:
1)不同插件重寫InputFormat接口中的createInputSplits方法創建分片,在上游數據量較大或者需要多並行度讀取的時候,該方法就起到給每個並行度設置不同的分片作用。
比如:在兩個並行度讀取mysql時,通過配置的分片字段(比如自增主鍵id)。
第一個並行度讀取sql為:select * from table where id mod 2=0;
第二個並行度讀取sql為:select * from table where id mod 2=1;
2)分片創建完后通過getInputSplitAssigner按順序返回分配給各個並發實例。
3、TaskManager端:
在TaskManager端接收到JobManager調度過來的task之后,就開始了自己的生命周期的調用,主要包含以下幾個重要的階段。
1)initialize-operator-states():循環遍歷該task所有的operator,並調用實現了CheckpointedFunction接口的 initializeState 方法,在FlinkX中為DtInputFormatSourceFunction和DtOutputFormatSinkFunction,該方法在任務第一次啟動的時候會被調用,作用是恢復狀態,當任務失敗時可以從最近一次checkpoint恢復讀取位置已經,從而可以達到續跑的目的,如下圖所示。
2)open-operators():該方法調用OperatorChain中所有StreamOperator的open方法,最后調用的是BaseRichInputFormat中的open方法。
該方法主要做以下幾件事
- 初始化累加器,記錄讀入、寫出的條數、字節數
- 初始化自定義的Metric
- 開啟限速器
- 初始化狀態
- 打開讀取數據源的連接(根據數據源的不同,每個插件各自實現)
3)run():調用InputFormat中的nextRecord方法、OutputFormat中的writeRecord方法進行數據的處理了數據處理。4)close-operators():做一些關閉操作,例如調用InputFormat、OutputFormat的 close 方法等,並做一些清理工作。以上就是TaskManager中StreamTask整體的生命流程,除了上面介紹的FlinkX是如何調用Flink接口,FlinkX還有如下一些特性。
4、FlinkX的特性
1)自定義累加器
累加器是從用戶函數和操作中,分布式地統計或者聚合信息。每個並行實例創建並更新自己的Accumulator對象, 然后合並收集不同並行實例,在作業結束時由系統合並,並可將結果推動到普羅米修斯中,如圖:
2)支持離線和實時同步
我們知道FlinkX是一個支持離線和實時同步的框架,這里以mysql數據源為例,看看是如何實現的。
- 離線任務:
在DtInputFormatSourceFunction的run方法中會調用InputFormat的open方法讀取數據記錄到resultSet中,之后再調用reachedEnd方法的判斷resultSet的數據是否讀取完,如果讀取完,就走后續的close流程。
- 實時任務:
open方法和離線一致,在reachedEnd時判斷是否是輪詢任務,如果是則會進入到間隔輪詢的分支中,將上一次輪詢讀取到的最大的一個增量字段值,作為本次輪詢開始位置進行下一次輪詢,輪詢流程圖如下:
3)臟數據管理和錯誤控制
是把寫入數據源時出錯的數據記錄下來,並把錯誤原因分類,然后寫入配置的臟數據表。
錯誤原因目前有:類型轉換錯誤、空指針、主鍵沖突和其它錯誤四類。
錯誤控制是基於Flink的累加器,運行過程中記錄出錯的記錄數,然后在單獨的線程里定時判斷錯誤的記錄數是否已經超出配置的最大值,如果超出,則拋出異常使任務失敗。這樣可以對數據精確度要求不同的任務,做不同的錯誤控制,控制流程圖如下:
4)限速器
對於一些上游數據產生過快的任務,會對下游數據庫造成較大的壓力,故而需要在源端做一些速率控制,FlinkX使用的是令牌桶限流方式控制速率,如下圖。當源端產生數據的速率達到某個閾值時,就不會在讀取新的數據,在BaseRichInputFormat的open階段也初始化了限速器。
以上就是FlinkX數據同步的基本原理,但是數據業務場景中數據同步只是第一步,由於FlinkX目前的版本中只有ETL中的EL,並不具備對數據的轉換和計算的能力,故而需要將產生的數據流入到下游的FlinkStreamSql。
02
FlinkStreamSql
基於Flink,對其實時sql進行擴展,主要擴展了流與維表的join,並支持原生Flink SQL所有的語法,目前FlinkStreamSql source端只能對接kafka,所以默認上游數據來源都是kafka。
我們看看FlinkStreamSql 又是如何在Flink基礎之上做到用戶只需要關注業務sql代碼,屏蔽底層是如何調用Flink api。整體流程和上面介紹的FlinkX基本類似,不同點在Client端,這里主要包括sql解析、注冊表、執行sql 三個部分,所以這里重點介紹這部分。
1、解析SQL這里主要是解析用戶寫的create function、create table、create view、insert into四種sql語句,封裝到結構化的SqlTree數據結構中,SqlTree中包含了自定義函數集合、外部數據源表集合、視圖語句集合、寫數據語句集合。2、表注冊得到了上面解析的SqlTree之后,就可以將sql中create table語句對應的外部數據源集合作為表注冊到tableEnv中,並且將用戶自定的udf注冊進tableEnv中。3、執行SQL將數據源注冊成表之后,就可以執行后面的insert into的sql語句了,執行sql這里會分兩種情況1)sql中沒有關聯維表,就直接執行sql
2)sql中關聯了維表,由於在Flink早期版本中不支持維表join語法,我們在這塊做了擴展,不過在FlinkStreamsql v1.11之后和社區保持了一致,支持了和維表join的語法。根據維表的類型不同,使用不同的關聯方式
- 全量維表:將上游數據作為輸入,使用RichFlatMapFunction作為查詢算子,初始化時將數據全表撈到內存中,然后和輸入數據組拼得到打寬后的數據,然后重新注冊一張大表,供后續sql使用。
- 異步維表:將上游數據作為輸入,使用RichAsyncFunction作為查詢算子,並將查詢得到的數據使用LRU緩存,然后和輸入數據組拼得到打寬后的數據,然后重新注冊一張大表,供后續sql使用。
上面介紹的就是和FlinkX在client端的不同之處,由於source端只有kafka且使用了社區原生的kafka-connector,所以在jobmanager端也沒有數據分片的邏輯,taskmanager邏輯和FlinkX基本類似,這里不再介紹。
任務運維
當使用FlinkX和FlinkStreamSql開發完業務之后,接下來進入到了任務運維階段了,在運維階段,我們主要在任務運行信息、數據進出指標metrics、數據延遲、反壓、數據傾斜等維度做了監控。
01
任務運行信息
我們知道FlinkStreamSql是基於Flinksql封裝的,所以在提交任務運行時最終還是走的Flinksql的解析、驗證、邏輯計划、邏輯計划優化、物理計划,最后將任務運行起來,也就得到了我們經常看見的DAG圖:
但是由於Flinksql對任務做了很多優化,以至於我們只能看到如上圖的大體DAG圖,子DAG圖里面的一些細節我們是沒法直觀的看到發生了什么事情。
所以我們在原來生成DAG圖的方式上進行了一定的改造,這樣就能直觀的看到子DAG圖中每個Operator和每個並行度里面發生了什么事情,有了詳細的DAG圖后其他的一些監控維度就能直觀的展示,比如:數據輸入輸出、延時、反壓、數據傾斜,在出現問題時就能具體定位到,如下圖的反壓:
了解了上面的結構后,我們看看是如何實現的。我們知道在client提交任務時,會生成JobGraph,JobGraph中的taskVertices集合就封裝了上圖完整的信息,我們將taskVertices生成json后,然后在結合LatencyMarker和相關的metrics,在前端即可生成上圖,並做相應的告警。除了上面的DAG以外,還有自定義metrics、數據延時獲取等,這里不具體介紹,有興趣的同學可以參考FlinkStreamSql項目。
使用案例
通過上面的介紹后,我們看下如何在平台上使用,下面展示了一個完整的案例:使用FlinkX將mysql中新增用戶數據實時同步到kafka,然后使用Flinkstreamsql消費kafka實時計算每分鍾新增用戶數,產出結果落庫到下游mysql,供業務使用。
01
實時同步mysql新增數據
02
實時計算每分鍾新增用戶數
03
運行信息
整體DAG,可以直觀的顯示上面提到的多項指標
解析后的詳細DAG圖,可以看到子DAG內部的多項指標
以上就是Flink在袋鼠雲實時計算平台總體架構和一些關鍵的技術點,如有不足之處歡迎大家指出。