如何提高 Flink 任務性能
一、Operator Chain
為了更高效地分布式執行,Flink 會盡可能地將 operator 的 subtask 鏈接(chain)在一起形成 task,每個 task 在一個線程中執行。將 operators 鏈接成 task 是非常有效的優化:它能減少線程之間的切換,減少消息的序列化/反序列化,減少數據在緩沖區的交換,減少了延遲的同時提高整體的吞吐量。
Flink 會在生成 JobGraph 階段,將代碼中可以優化的算子優化成一個算子鏈(Operator Chains)以放到一個 task(一個線程)中執行,以減少線程之間的切換和緩沖的開銷,提高整體的吞吐量和延遲。下面以官網中的例子進行說明。
上圖中,source、map、[keyBy|window|apply]、sink 算子的並行度分別是 2、2、2、1,經過 Flink 優化后,source 和 map 算子組成一個算子鏈,作為一個 task 運行在一個線程上,其簡圖如圖中 condensed view 所示,並行圖如 parallelized view 所示。算子之間是否可以組成一個Operator Chains,看是否滿足以下條件:
- 上下游算子的並行度一致;
- 上下游節點都在同一個slot group 中;
- 下游節點的chain策略為ALWAYS;
- 上游節點的chain策略為ALWAYS或HEAD;
- 兩個節點間數據分區方式是forward;
- 用戶沒有禁用chain。
二、Slot Sharing
Slot Sharing 是指,來自同一個 Job 且擁有相同 slotSharingGroup(默認:default)名稱的不同 Task 的 SubTask 之間可以共享一個 Slot,這使得一個 Slot 有機會持有 Job 的一整條 Pipeline,這也是上文提到的在默認 slotSharing 的條件下 Job 啟動所需的 Slot 數和 Job 中 Operator 的最大 parallelism 相等的原因。通過 Slot Sharing 機制可以更進一步提高 Job 運行性能,在 Slot 數不變的情況下增加了 Operator 可設置的最大的並行度,讓類似 window 這種消耗資源的 Task 以最大的並行度分布在不同 TM 上,同時像 map、filter 這種較簡單的操作也不會獨占 Slot 資源,降低資源浪費的可能性。
圖中包含 source-map[6 parallelism]、keyBy/window/apply[6 parallelism]、sink[1 parallelism] 三種 Task,總計占用了 6 個 Slot;由左向右開始第一個 slot 內部運行着 3 個 SubTask[3 Thread],持有 Job 的一條完整 pipeline;剩下 5 個 Slot 內分別運行着 2 個 SubTask[2 Thread],數據最終通過網絡傳遞給 Sink 完成數據處理。
三、Flink 異步 IO
流式計算中,常常需要與外部系統進行交互,而往往一次連接中你那個獲取連接等待通信的耗時會占比較高。下圖是兩種方式對比示例:
圖中棕色的長條表示等待時間,可以發現網絡等待時間極大地阻礙了吞吐和延遲。為了解決同步訪問的問題,異步模式可以並發地處理多個請求和回復。也就是說,你可以連續地向數據庫發送用戶 a、b、c 等的請求,與此同時,哪個請求的回復先返回了就處理哪個回復,從而連續的請求之間不需要阻塞等待,如上圖右邊所示。這也正是 Async I/O 的實現原理。
四、Checkpoint 優化
Flink 實現了一套強大的 checkpoint 機制,使它在獲取高吞吐量性能的同時,也能保證 Exactly Once 級別的快速恢復。
首先提升各節點 checkpoint 的性能考慮的就是存儲引擎的執行效率。Flink官方支持的三種 checkpoint state 存儲方案中,Memory 僅用於調試級別,無法做故障后的數據恢復。其次還有 Hdfs 與 Rocksdb,當所做 Checkpoint 的數據大小較大時,可以考慮采用 Rocksdb 來作為 checkpoint 的存儲以提升效率。
其次的思路是資源設置,我們都知道 checkpoint 機制是在每個 task 上都會進行,那么當總的狀態數據大小不變的情況下,如何分配減少單個 task 所分的 checkpoint 數據變成了提升 checkpoint 執行效率的關鍵。
最后,增量快照。非增量快照下,每次 checkpoint 都包含了作業所有狀態數據。而大部分場景下,前后 checkpoint 里,數據發生變更的部分相對很少,所以設置增量 checkpoint,僅會對上次 checkpoint 和本次 checkpoint 之間狀態的差異進行存儲計算,減少了 checkpoint 的耗時。
使用 checkpoint 的使用建議
■ Checkpoint 間隔不要太短
雖然理論上 Flink 支持很短的 checkpoint 間隔,但是在實際生產中,過短的間隔對於底層分布式文件系統而言,會帶來很大的壓力。另一方面,由於檢查點的語義,所以實際上 Flink 作業處理 record 與執行 checkpoint 存在互斥鎖,過於頻繁的 checkpoint,可能會影響整體的性能。當然,這個建議的出發點是底層分布式文件系統的壓力考慮。
■ 合理設置超時時間
默認的超時時間是 10min,如果 state 規模大,則需要合理配置。最壞情況是分布式地創建速度大於單點(job master 端)的刪除速度,導致整體存儲集群可用空間壓力較大。建議當檢查點頻繁因為超時而失敗時,增大超時時間。
五、資源配置
1、並行度(parallelism):保證足夠的並行度,並行度也不是越大越好,太多會加重數據在多個solt/task manager之間數據傳輸壓力,包括序列化和反序列化帶來的壓力。
2、CPU:CPU資源是task manager上的solt共享的,注意監控CPU的使用。
3、內存:內存是分solt隔離使用的,注意存儲大state的時候,內存要足夠。
4、網絡:大數據處理,flink節點之間數據傳輸會很多,服務器網卡盡量使用萬兆網卡。
總結
Operator Chain 是將多個 Operator 鏈接在一起放置在一個 Task 中,只針對 Operator。Slot Sharing 是在一個 Slot 中執行多個 Task,針對的是 Operator Chain 之后的 Task。這兩種優化都充分利用了計算資源,減少了不必要的開銷,提升了 Job 的運行性能。異步IO能解決需要高效訪問其他系統的問題,提升任務執行的性能。Checkpoint優化是集群配置上的優化,提升集群本身的處理能力。
參考:
https://www.infoq.cn/article/ZmL7TCcEchvANY-9jG1H
https://blog.icocoro.me/2019/06/10/1906-apache-flink-asyncio/