當我們談論批流一體,我們在談論什么?
一、流計算與批計算
一)流計算與批計算
流計算:無限數據之上的計算
批計算:有限數據之上的計算
二)流計算與批計算的比較
特性 | 批計算 | 流計算 |
---|---|---|
數據范圍 | 有界數據 | 無界數據 |
任務執行 | 分批執行、有終止 | 全部執行、無終止 |
延時 | 小時級、天級 | 秒級、分鍾級 |
數據場景 | 數據量超大數據、無法以流的形式交付 | 數據以流的形式交付 |
資源消耗 | 大 | 小 |
數據質量 | 要求低 | 要求高 |
業務場景 | 清算對賬、報表生成、特征生成 | 欺詐檢測、實時風控、實時推薦 |
關注點 | 可擴展性、吞吐、容錯 | 可擴展性、延遲、容錯、消息一致性、消息持久性 |
處理語義 | 僅有一次 | 至少一次、至多一次、僅有一次 |
代表引擎 | MR SPARK | Storm Spark streaming Flink Kafka streaming |
三)為什么要搞流批一體
1.減少學習成本
2.減少資源消耗
3.降低架構復雜性
4.提升價值產出效率
二、流批一體的場景
一)數據集成的流批一體
在大數據場景下經常需要數據同步或者數據集成,也就是將數據庫中的數據同步到大數據的數倉或者其他存儲中。上圖中的左邊是傳統的經典數據集成的模式之一,全量的同步和增量的同步實際上是兩套技術,需要定期將全量同步的數據跟增量同步數據做 merge,不斷的迭代來把數據庫的數據同步到數據倉庫中。
基於 Flink 流批一體,整個數據集成的架構將不同。因為 Flink SQL 也支持數據庫(像 MySQL 和 PG)的 CDC 語義,所以可以用 Flink SQL 一鍵同步數據庫的數據到 Hive、ClickHouse、TiDB 等開源的數據庫或開源的 KV 存儲中。在 Flink 流批一體架構的基礎上,Flink 的 connector 也是流批混合的,它可以先讀取數據庫全量數據同步到數倉中,然后自動切換到增量模式,通過 CDC 讀 Binlog 進行增量和全量的同步,Flink 內部都可以自動的去協調好,這是流批一體的價值。
二)數倉架構的流批一體
目前主流數倉架構都是一套典型的離線數倉和一套新的實時數倉,但這兩套技術棧是分開的。在離線數倉里,還是習慣用 Hive 或者 Spark,在實時數倉中用 Flink 加 Kafka。有三個問題需要解決:兩套開發流程,成本高;數據鏈路冗余,兩套鏈路將數據相關的操作做了兩遍;數據口徑的一致性難以保證,因為它是由兩套引擎算出來的。
用流批一體架構來解決,以上難題將極大降低。
- 首先,Flink 是一套 Flink SQL 開發,不存在兩套開發成本。一個開發團隊,一套技術棧,就可以做所有的離線和實時業務統計的問題。
- 第二,數據鏈路也不存在冗余,明細層的計算一次即可,不需要離線再算一遍。
- 第三,數據口徑天然一致。無論是離線的流程,還是實時的流程,都是一套引擎,一套 SQL,一套 UDF,一套開發人員,所以它天然是一致的,不存在實時和離線數據口徑不一致的問題。
三)數據湖的流批一體
Hive 元數據的管理是瓶頸,Hive 不支持數據的實時更新。Hive 沒有辦法實時,或者准實時化地提供數倉能力。現在比較新的數據湖架構,可以解決更具擴展性的元數據的問題,而且數據湖的存儲支持數據的更新,是一個流批一體的存儲。數據湖存儲與 Flink 結合,就可以將實時離線一體化的數倉架構演變成實時離線一體化的數據湖架構。
四)存儲的流批一體
1.Pulsar
Pulsar的組件架構圖
- 首先在計算層,Pulsar Broker 不保存任何狀態數據、不做任何數據存儲,稱之為服務層。
- 其次,Pulsar 擁有一個專門為消息和流設計的存儲引擎 BookKeeper,稱之為數據層。
- 如果要支持更多的 Producer 和 Consumer,可擴充上面無狀態的 Broker 層;
- 如果要支持更多的數據存儲,可單獨擴充底層存儲層。
Pulsar的流批概念
這種分層的架構為做批流融合打好了基礎。因為它原生分成了兩層,可以根據用戶的使用場景和批流的不同訪問模式,來提供兩套不同的 API。
-
如果是實時數據的訪問,可以通過上層 Broker 提供的 Consumer 接口;
-
如果是歷史數據的訪問,可以跳過 Broker,用存儲層的 reader 接口,直接訪問底層存儲層。
2.Hologres
1)Hologres的架構圖
Hologres的架構從下往上看,最底層是統一的存儲系統,可以是阿里雲統一的Pangu、業務的HDFS或者OSS、S3等,存儲上面是計算層,提供類似的MMP架構計算服務,再往上是FE層,根據查詢信息將Plan分發到各個計算節點,再往上就是PostgreSQL生態的對接,只要有JDBC/ODBC Driver就能對Hologres做查詢。
Hologres的架構是完全是存儲計算分離,計算完全部署在K8s上,存儲可以使用共享存儲,可以根據業務需求選擇HDFS或者雲上的OSS,這樣用戶就能根據業務需求對資源做彈性擴縮容,完美解決資源不夠帶來的並發問題。
存儲優勢
-
全異步:支持高並發寫入,能夠將CPU最大化利用;
-
無鎖:寫入能力隨資源線性擴展,直到將CPU全部寫滿;
-
內存管理:提供數據cache,支持高並發查詢。
計算優勢
-
高性能混合負載:慢查詢和快查詢混合一起跑,通過內部的調度系統,避免慢查詢影響快查詢;
-
向量化計算:列式數據通過向量化計算達到查詢加速的能力;
-
存儲優化:能夠定制查詢引擎,但是對存儲在Hologres數據查詢性能會更優。
問題提出
大致根據查詢並發度要求或者查詢Latency要求,將Patterns分為四類:
- Batch:離線計算
- Analytical:交互式分析
- Servering:高QPS的在線服務
- Transaction:與錢相關的傳統數據庫(絕大多數業務並不需要)
目前市面上都在說HTAP,經過調研HTAP是個偽命題,因為A和T的優化方向不一樣。為了做T,寫入鏈路將非常復雜,QPS無法滿足需求。若是對T的要求降低一點,就會發現Analytical和Severing的聯系非常緊密,這兩塊的技術是可以共用的,所以放棄了T就相當於放棄了Transaction,於是提出新的一個架構叫做HSAP,需要做的就是把提供服務和分析的數據存儲在一個系統里,通過一套分析引擎來做處理。
2)Hologres的流批一體
數據實時寫入至Flink,經由Flink做實時預處理,比如實時ETL或者實時訓練,把處理的結果直接寫入Hologres,Hologres提供維表關聯點查、結果緩存、復雜實時交互、離線查詢和聯邦查詢等,這樣整個業務系統只需要通過Hologres來做唯一的數據入口,在線系統可以通過PostgreSQL生態在Hologres中訪問數據,無需對接其他系統,這樣也能解決之前傳統架構的各種查詢、存儲問題。
三、Flink中的流批一體
2020 年,Flink 在流批一體上走出了堅實的一步,可以抽象的總結為 Flink 1.10 和 1.11 這兩個大的版本,主要是完成 SQL 層的流批一體化和實現生產可用性。實現了統一的流批一體的 SQL 和 Table 的表達能力,以及統一的 Query Processor,統一的 Runtime。在1.12 版本中,對 DataStream API 進行了流批一體化。在 DataStream 原生的流的算子上增加批的算子,也就是說 DataStream 也可以有兩種執行模式,批模式和流模式里面也可以混合批算子和流算子。在1.13 的版本中,實現 DataStream 流批一體化的算子,整個的計算框架和 SQL 一樣,完全都是流批一體化的計算能力。這樣一來,原來 Flink 中的 DataSet 這套老的 API 就可以去掉,完全實現真正的流批一體的架構。
一)流批一體的DataStream
1.目前的SDK
-
Table/SQL 是一種 Relational 的高級 SDK,主要用在一些數據分析的場景中,既可以支持 Bounded 也可以支持 Unbounded 的輸入。Table/SQL 可以支持 Batch 和 Streaming 兩種執行模式。Relatinal SDK 功能雖然強大,但也存在一些局限:不支持對 State、Timer 的操作。
-
DataStream 屬於一種 Physical SDK。DataStream 是一種 Imperative SDK,所以對物理執行計划有很好的“掌控力”。
-
DataSet 是一種僅支持 Bounded 輸入的 Physical SDK,會根據 Bounded 的特性對某些算子進行做一定的優化,但是不支持 EventTime 和 State 等操作。
利用已有的 Physical SDK ,無法寫出流批一體的application。另外,兩套SDK的學習和理解的成本比較高,兩套SDK 在語義上有不同的地方,例如 DataStream 上有 Watermark、EventTime,而 DataSet 卻沒有,對於用戶來說,理解兩套機制的門檻也不小;並且這兩 SDK 不兼容。
2.期望的SDK
-
為什么選擇了 DataStream 統一 DataSet ?DataSet 在社區的影響力逐漸下降。DataSet 算子的實現,在流的場景完全無法復用,例如 Join 等。而對於 DataStream 則不然,可以進行大量的復用。
-
提升DataStream的批效率。DataStream 是給 Unbounded 的場景下使用的,而 Unounded 一個主要的特點就是亂序,解決亂序引起了大量的序列化、反序列化和隨機磁盤讀寫;DataSet 中,數據有的,通過優化避免隨機磁盤 I/O 訪問,同時也對序列化和反序列化做優化。通過單 Key 的 BatchStateBackend 幾乎完全避免了對所有算子重寫,同時還得到了非常不錯的效果。
-
DataStream一致性的兼容,DataStream 寫的 Application 都采用 Streaming 的執行模式,一致性依賴 Flink Checkpoint 機制的 2PC 協議,但這種模式的弊端是資源消耗大、容錯成本高。提出了一個全新 Unified Sink API,從而讓開發者提供 What to commit 和 How to commit,系統應該根據不同的執行模式,選擇 Where to commit 和 When to commit 來保證端到端的 Exactly Once。
二)流體一體的DAG Scheduler
Flink 有兩種調度的模式:一種是流的調度模式,在這種模式下,Scheduler 會申請到一個作業所需要的全部資源,然后同時調度這個作業的全部 Task,所有的 Task 之間采取 Pipeline 的方式進行通信。一種是批的調度模式,所有 Task 都是可以獨立申請資源,Task 之間都是通過 Batch Shuffle 進行通訊。這種方式的好處是容錯代價比較小,不足是Task 之間的數據都是通過磁盤來進行交互,引發了大量的磁盤 IO。
基於 Pipeline Region 的統一調度
Unified DAG Scheduler 允許在一個 DAG 圖中,Task 之間既可以通過 Pipeline 通訊,也可以通過 Blocking 方式進行通訊。這些由 Pipeline 的數據交換方式連接的 Task 被稱為一個 Pipeline Region。基於以上概念,Flink 引入 Pipeline Region 的概念,不管是流作業還是批作業,都是按照 Pipeline Region 粒度來申請資源和調度任務。
在 Flink 中,不同 Task 之間有兩種連接方式,一種是 All-to-All 的連接方式,上游 Task 會和下游的所有的 Task 進行連接;一種是 PointWise 的鏈接方式,上游的 Task 只會和下游的部分 Task 進行連接。Flink Planner 可以根據實際運行場景,定制哪些 Task 之間采取 Pipeline 的傳輸方式,哪些 Task 之間采取 Batch 的傳輸方式方式。
自適應調度
調度的本質是給物理執行計划進行資源分配的決策過程。對於批作業來說靜態生成物理執行計划存在一些問題,配置人力成本高,需要手動調整批作業的並發度,一旦業務邏輯發生變化,又要不斷的重復這個過程,也可能會出現誤判的情況導致無法滿足用戶 SLA;資源利用率低,中低優先級的作業以默認值作為並發度,造成資源的浪費;高優先級的作業不及時調低並發讀,也造成大量的資源浪費現象;
為批作業引入了自適應調度功能,和原來的靜態物理執行計划相比,利用這個特性可以大幅提高用戶資源利用率。 Adaptive Scheduler 可以根據一個 JobVertex 的上游 JobVertex 的執行情況,動態決定當前 JobVertex 的並發度。未來,也可以根據上游 JobVertex 產出的數據,動態決定下游采用什么樣的算子。
三)流批一體的Shuffle
Shuffle 本質上是為了對數據進行重新划分(re-partition),目標是提供一套統一的 Shuffle 架構,既可以滿足不同 Shuffle 在策略上的定制,同時還能避免在共性需求上進行重復開發。批作業和流作業的 Shuffle 有差異也有共性,共性主要體現在:數據的 Meta 管理,所謂 Shuffle Meta 是指邏輯數據划分到數據物理位置的映射;數據傳輸,在分布式系統中,對數據的重新划分都涉及到跨線程、進程、機器的數據傳輸。
流批一體的 Shuffle 架構
Unified Shuffle 架構抽象出三個組件: Shuffle Master、Shuffle Reader、Shuffle Writer。Flink通過和這三個組件交互完成算子間的數據的重新划分。通過這三個組件可以滿足不同Shuffle插件在具體策略上的差異:
- Shuffle Master 資源申請和資源釋放。也就是說插件需要通知框架 How to request/release resource。而由 Flink 來決定 When to call it;
- Shuffle Writer 上游的算子利用 Writer 把數據寫入 Shuffle Service——Streaming Shuffle 會把數據寫入內存;External/Remote Batch Shuffle 可以把數據寫入到外部存儲中;
- Shuffle Reader 下游的算子可以通過 Reader 讀取 Shuffle 數據;
同時,為流批 Shuffle 的共性——Meta 管理、數據傳輸、服務部署——提供了架構層面的支持,從而避免對復雜組件的重復開發。高效穩定的數據傳輸,是分布式系統最復雜的子系統之一,例如在傳輸中都要解決上下游反壓、數據壓縮、內存零拷貝等問題。
四)流批一體的容錯
Flink 現有容錯策略以檢查點為前提,無論是單個 Task 出現失敗還是JobMaster 失敗, 都會按照最近的檢查點重啟整個作業。Flink Batch 運行模式下不會開啟檢查點,一旦出現任何錯誤,整個作業都要從頭執行。以下兩個改進就主要為了提升批作業的容錯能力。
Task的改進 Pipeline Region Failover
Batch 執行模式下,Flink允許 Task 之間通過 Blocking Shuffle 進行通信。對於讀取 Blocking Shuffle 的 Task 發生失敗之后,由於 Blocking Shuffle 中存儲了這個 Task 所需要的全部數據,所以只需要重啟這個 Task 以及通過 Pipeline Shuffle 與其相連的全部下游任務即可,而不需要重啟整個作業。
JM的改進 Operation Log
JM 是一個作業的控制中心,包含了作業的各種執行狀態,一旦 JM 發生錯誤之后,新 JM 無法判斷現有的狀態是否滿足調度下游任務的條件——所有的輸入數據都已經產生。JM Failover 的關鍵就是如何讓一個 JM“恢復記憶”,通過基於 Operation Log 機制恢復 JM 的關鍵狀態。
五)流批一體的總圖
上圖是一個Flink為了實現流批一體的引擎層所規划的框架圖,其中很多還是規划和開發當中,在目前Flink最新版本1.14中,還沒有完全實現上述的架構,但相信繼續經過幾個版本的迭代,Flink就可以在引擎層面完成流批一體的統一。
注:
本文綜合自
https://mp.weixin.qq.com/s/AnBU9ntRVwbsWQoiDkzZHg
https://mp.weixin.qq.com/s/4w8VSUjaX7JHiaPMxaGj7g