簡介: 作為 Dataflow 模型的最早采用者之一,Apache Flink 在流批一體特性的完成度上在開源項目中是十分領先的。本文將基於社區資料和筆者的經驗,介紹 Flink 目前(1.10)流批一體的現狀以及未來的發展規划。
自 Google Dataflow 模型被提出以來,流批一體就成為分布式計算引擎最為主流的發展趨勢。流批一體意味着計算引擎同時具備流計算的低延遲和批計算的高吞吐高穩定性,提供統一編程接口開發兩種場景的應用並保證它們的底層執行邏輯是一致的。對用戶來說流批一體很大程度上減少了開發維護的成本,但同時這對計算引擎來說是一個很大的挑戰。
作為 Dataflow 模型的最早采用者之一,Apache Flink 在流批一體特性的完成度上在開源項目中是十分領先的。本文將基於社區資料和筆者的經驗,介紹 Flink 目前(1.10)流批一體的現狀以及未來的發展規划。
概況
相信不少讀者都知道,Flink 遵循 Dataflow 模型的理念: 批處理是流處理的特例。不過出於批處理場景的執行效率、資源需求和復雜度各方面的考慮,在 Flink 設計之初流處理應用和批處理應用盡管底層都是流處理,但在編程 API 上是分開的。這允許 Flink 在執行層面仍沿用批處理的優化技術,並簡化掉架構移除掉不需要的 watermark、checkpoint 等特性。
圖1. Flink 經典架構
在 Flink 架構上,負責物理執行環境的 Runtime 層是統一的流處理,上面分別有獨立的 DataStream 和 DataSet 兩個 API,兩者基於不同的任務類型(Stream Task/Batch Task)和 UDF 接口(Transformation/Operator)。而更上層基於關系代數的 Table API 和 SQL API 雖然表面上是統一的,但實際上編程入口(Environment)是分開的,且內部將流批作業分別翻譯到 DataStream API 和 DataSet API 的邏輯也是不一致的。
因此,要實現真正的流批一體,Flink 需完成 Table/SQL API 的和 DataStream/DataSet API 兩層的改造,將批處理完全移植到流處理之上,並且需要兼顧作為批處理立身之本的效率和穩定性。目前流批一體也是 Flink 長期目標中很重要一點,流批一體的完成將標志着 Flink 進入 2.x 的新大版本時代。
流批一體完成以后理想的架構如下:
圖2. Flink 未來架構
其中 Planner 從 Table/SQL API 層獨立出來變為可插拔的模塊,而原先的 DataStream/DataSet 層則會簡化為只有 DataStream(圖 2 中的 StreamTransformation 和 Stream Operator 是 Stream DAG 的主要內容,分別表示 UDF 和執行 UDF 的算子),DataSet API 將被廢棄。
Table/SQL API 的改進
Table/SQL API 的改造開始得比較早,截止 1.10 版本發布已經達到階段性的流批一體目標。然而在 1.7 版本時,Table API 只是作為基於 DataStream/DataSet API 的 lib,並沒有得到社區的重點關注。
而當時阿里的 Blink 已經在 Table/SQL 上做了大量的優化,為了合並 Blink 的先進特性到 Flink,阿里的工程師推進社區重構了 Table 模塊的架構[5]並將 Table/SQL API 提升為主要編程 API。
自此 Table 層中負責將 SQL/Table API 翻譯為 DataStream/DataSet API 的代碼被抽象為可插拔的 Table Planner 模塊,而 Blink 也將主要的特性以 Blink Planner 的形式貢獻給社區,於是有了目前兩個 Planner 共存的狀態。
圖3. Flink 目前過渡架構
Flink 默認的 Legacy Planner 會將 SQL/Table 程序翻譯為 DataStream 或 DataSet 程序,而新的 Blink Planner 則統一翻譯為 DataStream 程序。也就是說通過 Blink Planner,Flink Table API 事實上已經實現了流批一體的計算。要了解 Blink Planner 是如何做到的,首先要對 Planner 的工作原理有一定的了解。
Legacy Planner 對於用戶邏輯的表示在 Flink 架構中不同層的演變過程如下:
圖4. Legacy Planner 架構
- 用基於 Calcite 的 SQL parser 解析用戶提交的 SQL,將不同類型的 SQL 解析為不同 Operation(比如 DDL 對應 CreateTableOperation,DSL 對應 QueryOperation),並將 AST 以關系代數 Calcite RelNode 的形式表示。
- 根據用戶指定 TableEnvironment 的不同,分別使用不同的翻譯途徑,將邏輯關系代數節點 RelNode 翻譯為 Stream 的 Transformation 或者 Batch 的 Operator Tree。
- 調用 DataStream 和 DataSet 對應環境的方法將 Transformation 或 Operator Tree 翻譯為包含執行環境配置的作業表示,即 StreamGraph 或 Plan。
- 優化 StreamGraph 和 Plan,並包裝為可序列化的 JobGraph。
因為 Batch SQL 與 Streaming SQL 在大部分語法及語義上是一致的,不同點在於 Streaming SQL 另有拓展語法的來支持 Watermark、Time Characteristic 等流處理領域的特性,因此 SQL parser 是 Batch/Stream 共用的。關鍵點在於對於關系代數 RelNode 的翻譯上。
圖5. Legacy Planner RelNode
Flink 基於 Calcite RelNode 拓展了自己的 FlinkRelNode,FlinkRelNode 有三個子類 FlinkLogicalRel、DataSetRel 和 DataStreamRel。FlinkLogicalRel 表示邏輯的關系代數節點,比如常見的 Map 函數對應的 FlinkLogicalRel 是 DataStreamCalc。DataSetRel 和 DataStreamRel 則分別表示 FlinkLogicalRel 在批處理和流處理下各自的物理執行計算。
在 SQL 優化過程中,根據編程入口的不同 FlinkLogicalRel 被轉化為 DataSetRel 或 DataStreamRel。BatchTableEnvironment 使用 BatchOptimizer 基於 Calcite Rule 的優化,而 StreamTableEnvironment 使用 StreamOptimizer 進行優化。比如 TableScan 這樣一個 RelNode,在 Batch 環境下被翻譯為 BatchTableSourceScan,在 Stream 環境下被翻譯為 StreamTableSourceScan,而這兩類物理關系代數節點將可以直接映射到 DataSet 的 Operator 或 DataStream 的 Transformation 上。
上述的方式最大的問題在於 Calcite 的優化規則無法復用,比如對數據源進行過濾器下推的優化,那么需要給 DateSetRel 和 DataStreamRel 分別做一套,而且 DataSet 和 DataStream 層的算子也要分別進行相應的修改,開發維護成本很高,而這也是 Blink Planner 推動流批一體的主要動力。
如上文所說,Blink Planner 做的最重要的一點就是廢棄了 DataSet 相關的翻譯途徑,將 DateSetRel 也移植到 DataStream 之上,那么前提當然是 DataStream 要可以表達 DataSet 的語義。熟悉批處理的同學可能會有疑問: 批處理特有的排序等算子,在 DataStream 中是沒有的,這將如何表達?
事實上 Table Planner 廣泛采用了動態代碼生成,可以繞過 DataStream API 直接翻譯至底層的 Transformation 和 StreamOperator 上,並不一定需要 DataStream 有現成的算子,因此使用 Blink Planner 的 Table API 與 DataStream API 的關系更多是並列的關系。這也是 FLIP-32[5] 所提到的解耦 Table API 和 DataStream/DataSet API 的意思:
Decouple table programs from DataStream/DataSet API
Allow table programs to be self-contained. No need for a Stream/ExecutionEnvironment entrypoint anymore. A table program definition is just API that reads and writes to catalog tables.
Table 改造完成后整個 API 架構如下,這也是目前 1.10 版本已經實現的架構:
https://qr.dingtalk.com/action/joingroup?code=v1,k1,sX+rQjO7HUrD0gh+2OoGZWUBOG71dGhOJvJ4xGoEO0g=<br>http://weixin.qq.com/r/_y7l-d-EbIITrZue93vp (二維碼自動識別)
圖6. Blink Planner 架構
事實上,早前版本的 DataStream 對批作業的支持並不是太好,為了支持 Blink Planner 的 Batch on Stream,DataStream 方面也先做了不少的優化。這些優化是對於 Table API 是必要的,因此在 Blink Planner 合並到 Flink master 的前置工作,這將和 DataStream 還未完成的改進一起放在下文分析。
另外雖然 Blink Planner 在計算上是流批一體的,但 Flink Table API 的 TableSource 和 TableSink 仍是流批分離的,這意味着目前絕大數批處理場景的基於 BatchTableSource/BatchTableSink 的 Table 無法很好地跟流批一體的計算合作,這將在 FLIP-95[9] 中處理。
DataStream API 的改進
在 DataStream API 方面,雖然目前的 DataStream API 已經可以支持有界數據流,但這個支持並不完整且效率上比起 DataSet API 仍有差距。為了實現完全的流批一體,Flink 社區准備在 DataStream 引入 BoundedStream 的概念來表示有界的數據流,完全從各種意義上代替 DataSet。
BoundedStream 將是 DataStream 的特例,同樣使用 Transformation 和 StreamOperator,且同時需要繼承 DataSet 的批處理優化。這些優化可以分為 Task 線程模式、調度策略及容錯和計算模型及算法這幾部分。
Task 線程模型
批處理業務場景通常更重視高吞吐,出於這點考慮,Batch Task 是 pull-based 的,方便 Task 批量拉取數據。Task 啟動后會主動通過 DataSet 的 Source API InputFormat 來讀取外部數據源,每個 Task 同時只讀取和處理一個 Split。
相比之下,一般流處理業務場景則更注重延遲,因此 Stream Task 是 push-based 的。
DataStream 的 Source API SourceFunction 會被獨立的 Source Thread 執行,並一直讀取外部數據,源源不斷地將數據 push 給 Stream Task。每個 Source Thread 可以並發讀取一個到多個 Split/Partition/Shard。
圖7. Stream/Batch 線程模型(圖來源 Flink Forward)
為了解決 Task 線程模型上的差異,Flink 社區計划重構 Source API 來統一不同外部存儲和業務場景下的 Task 線程模型。總體的買二手游戲賬號平台地圖思路是新增一套新的 Source API,可以支持多種線程模型,覆蓋流批兩種業務需求,具體可見 FLIP-27[6] 或筆者早前的一篇博客[7]。目前 FLIP-27 仍處於初步的開發階段。
調度策略及容錯
眾所周知,批處理作業和流處理作業在 Task 調度上是很不同的。批處理作業的多個 Task 並不需要同時在線,可以根據依賴關系先調度一批 Task,等它們結束后再運行另一批。
相反地,流作業的所有 Task 需要在作業啟動的時候就全部被調度,然后才可以開始處理數據。前一種調度策略通常稱為懶調度(Lazy Scheduling),后一種通常稱為激進調度(Eager Scheduling)。為了實現流批一體,Flink 需要在 StreamGraph 中同時支持這兩種調度模式,也就是說新增懶調度。
隨調度而來的問題還有容錯,這並不難理解,因為 Task 出現錯誤后需要重新調度來恢復。而懶調度的一大特點是,Task 計算的中間結果需要保存在某個高可用的存儲中,然后下個 Task 啟動后才能去獲取。
而在 1.9 版本以前,Flink 並沒有持久化中間結果。這就導致了如果該 TaskManager 崩潰,中間結果會丟失,整個作業需要從頭讀取數據或者從 checkpoint 來恢復。這對於實時流處理來說是很正常的,然而批處理作業並沒有 checkpoint 這個概念,批處理通常依賴中間結果的持久化來減小需要重算的 Task 范圍,因此 Flink 社區引入了可插拔的 Shuffle Service 來提供 Suffle 數據的持久化以支持細粒度的容錯恢復,具體可見 FLIP-31[8]。
計算模型及算法
與 Table API 相似,同一種計算在流處理和批處理中的算法可能是不同的。典型的一個例子是 Join: 它在流處理中表現為兩個流的元素的持續關聯,任何一方的有新的輸入都需要跟另外一方的全部元素進行關聯操作,也就是最基礎的 Nested-Loop Join;而在批處理中,Flink 可以將它優化為 Hash Join,即先讀取一方的全部數據構建 Hash Table,再讀取另外一方進行和 Hash Table 進行關聯(見圖8)。
圖8. Join 批處理優化
這種差異性本質是算子在數據集有界的情況下的優化。拓展來看,數據集是否有界是 Flink 在判斷算子如何執行時的一種優化參數,這也印證了批處理是流處理的特例的理念。因此從編程接口上看,BoundedStream 作為 DataStream 的子類,基於輸入的有界性可以提供如下優化:
- 提供只可以應用於有界數據流的算子,比如 sort。
- 對某些算子可以進行算法上的優化,比如 join。
此外,批處理還有個特點是不需要在計算時輸出中間結果,只要在結束時輸出最終結果,這很大程度上避免了處理多個中間結果的復雜性。因此,BoundedStream 還會支持非增量(non-incremental)執行模式。這主要會作用於與 Time Charateritic 相關的算子:
- Processing Time Timer 將被屏蔽。
- Watermark 的提取算法不再生效,Watermark 直接從開始時的 -∞ 跳到結束時的 +∞。
總 結
基於批處理是流處理的特例的理念,用流處理表達批處理在語義上是完全可行的,而流批一體的難點在於批處理場景作為特殊場景的優化。對 Flink 而言,難點主要體現批處理作業在 Task 線程模型、調度策略和計算模型及算法的差異性上。目前 Flink 已經在偏聲明式的 Table/SQL API 上實現了流批一體,而更底層偏過程式的 DataStream API 也將在 Flink 2.0 實現流批一體。
Tips:原版文章及詳細參考資料請見下方原文鏈接~
原文鏈接:
http://www.whitewood.me/2020/03/30/Flink-流批一體的實踐與探索/
作者介紹:
林小鉑,網易游戲高級開發工程師,負責游戲數據中心實時平台的開發及運維工作,目前專注於 Apache Flink 的開發及應用。探究問題本來就是一種樂趣。
# 社區活動推薦 #
普惠全球開發者,這一次,格外與眾不同!首個 Apache 頂級項目在線會議 Flink Forward 全球直播中文精華版來啦,聚焦 Alibaba、Google、AWS、Uber、Netflix、新浪微博等海內外一線廠商,經典 Flink 應用場景,最新功能、未來規划一覽無余。