通常的認識是:Flink 流模式跑流任務,批模式跑批任務,用流模式跑離線任務也是個有意思的事情
雖然新版 Flink 已經在 sql 上實現了一定程度的流批一體,但是 DataStream 和 DataSet API 還是相差比較大的
用 Flink 跑離線任務也是機緣巧合(也是必然,畢竟我不會 Spark)
現在的項目組經常會跑歷史數據,當然是批模式的,在用 Flink batch 被遇到各種各樣的問題困擾之后
深入思考了我們需要跑的歷史任務的特性,將流模式的的概念稍微變通一下,也可以達到離線的效果
這一切都有一個前提: Flink 任務的算子,在處理完全部數據后,就自動退出了,基於這個前提,就可以達到離線任務的效果
比如,看下面的 Mysql Source
class MysqlSourceFunction(sql: String) extends RichSourceFunction[String] { private val logger = LoggerFactory.getLogger("MysqlSourceFunction") private var connection: java.sql.Connection = _ private var queryPS: PreparedStatement = null private var count: Long = 0 override def open(parameters: Configuration): Unit = { connection = DriverManager.getConnection(Common.MYSQL_URL, Common.MYSQL_USER, Common.MYSQL_PASS) queryPS = connection.prepareStatement(sql) } override def run(ctx: SourceFunction.SourceContext[String]): Unit = { logger.info("star query userId") // execute sql val resultSet = queryPS.executeQuery() while (resultSet.next()) { count += 1 val userId = resultSet.getString(1) ctx.collect(userId) } queryPS.close() connection.close() logger.info("query userId finish, load {} item, source exit", count) } override def cancel(): Unit = { } }
任務啟動后會從 source 開始執行,MysqlSourceFunction 就是 MySQL 查詢數據,
並將查詢出來的數據用流的形式,調用 ctx.collect(userId) 一條一條的發送到下游算子,
在處理完數據后,MysqlSourceFunction 的 run 方法執行完成,Flink 會自動將 Source 標記為 “FINISH”
后續的算子也是一樣,雖然是流模式的任務,Source 完成后,后續的算子也會依次完成,
跟批模式的區別是,所以算子都是同步執行的,Source 還在繼續生產數據,Sink 也在同步的輸出之前的數據,
而批的任務必須要上一算子完成,才會開始執行下一個算子。
基於這樣的前提,開始設計我的跑批任務的流程序。
簡單介紹下離線任務要做的東西: 我們主要做的事情是對用戶進行實時的檢測,並輸出一下我們任務有異常的行為數據,判斷異常需要對該用戶的歷史行為進行分析,離線任務就是對用戶的歷史行為分析的程序。
真正做的事情就是,基於每個用戶,從存儲引擎中拿出該用戶一定時間范圍的歷史數據,對歷史數據統計、分類等處理后,輸出處理的結果,用於實時檢測程序,Sink 完成后,會自動釋放資源,提交對應的輸出句柄(比如寫 HDFS 會提交文件)。
可以使用流模式來跑這樣的離線任務的前提是,離線程序是基於每個用戶的歷史行為的統計,而不是像 BI 報表分析一樣,將所有用戶的數據聚合起來再分析。
到這來就簡單了,所有這樣的離線任務都分成三部分,Source -> Process -> Sink
Source 中根據不同的需求,生產不同的數據
Process 中根據收到的不同的 userId,從存儲引擎中拖出所有該用戶的數據,所有統計、分類邏輯都在這部分完成
Sink 只負責結果的輸出
目前我處理離線數據除了需要用機器學習庫的任務,都全部用 Flink 實現,並且性能和資源消耗會比用 Spark 還小(在我不懈的努力(推薦下) 我們組已經有其他人開始嘗試用 Flink 這種方式跑離線任務了)
分區策略選擇
用這種弄方式跑離線任務和真正的流任務在分區策略的選擇上會有所不同,通常在流模式的任務中,在最主要的處理算子都會用keyBy 算子,讓下游的算子可以使用 Flink 的 Key State,
而由於流任務並不想批任務一樣資源使用達到配置的全部,分區數據傾斜的問題,並不一定會暴露(通常給流任務的資源會大於任務高峰時期的資源,以應對某些情況)。
而在這種流式的離線任務中,數據傾斜的問題就會比較明顯的感知到對任務的影響,其中最明顯的影響就是導致任務實際需要的時間,大於計算的理論時間。
即使使用 rebalance 這種均衡的分區策略,也會在離線任務數據處理的時間不一致上,影響任務完成的時間。
部分並發完成全部數據標記完成,其他算子還未完成,導致實際完成時間,大於任務啟動時,根據數據處理速度計算的大致任務耗時
下面圖可以看出,數據傾斜導致任務執行時間超過預期的情況,數據少的並發,2小時多點就已經完成了,數據多的現象,跑了 3 個半小時,還沒有結束
即使在數據均衡的情況下,也會有上面的情況,只是沒有這么極端,如下圖
甚至集群的負載也會對任務有比較大的影響,之前一次集群某些集群負載很高的時候(服務器的 CPU 達到 90%),會出現,負載高的機器上的任務會比負載低的機器出了慢很多
這些都是影響任務完成時間的因素,當然這些也是影響真正流任務的因素,不過因為流任務高峰時間有限(而且也有削峰的策略),所以影響不太嚴重。
在流模式的離線任務中由於一直處於最高負載(source 數據生產速度達到最高,很短時間就生產全部數據),所以影響很大。
注: Flink 還不能根據下游算子的處理速度動態的調整上游的數據分區策略
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文