一、設置最小時間間隔
當flink應用開啟Checkpoint功能,並配置Checkpoint時間間隔,應用中就會根據指定的時間間隔周期性地對應用進行Checkpoint操作。默認情況下Checkpoint操作都是同步進行,也就是說,當前面觸發的Checkpoint動作沒有完全結束時,之后的Checkpoint操作將不會被觸發。在這種情況下,如果Checkpoint過程持續的時間超過了配置的時間間隔,就會出現排隊的情況。如果有非常多的Checkpoint操作在排隊,就會占用額外的系統資源用於Checkpoint,此時用於任務計算的資源將會減少,進而影響到整個應用的性能和正常執行。
在這種情況下,如果大狀態數據確實需要很長的時間來進行Checkpoint,那么只能對Checkpoint的時間間隔進行優化,可以通過Checkpoint之間的最小間隔參數進行配置,讓Checkpoint之間根據Checkpoint執行速度進行調整,前面的Checkpoint沒有完全結束,后面的Checkpoint操作也不會觸發。
- streamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)
通過最小時間間隔參數配置,可以降低Checkpoint對系統的性能影響,但需要注意的事,對於非常大的狀態數據,最小時間間隔只能減輕Checkpoint之間的堆積情況。如果不能有效快速地完成Checkpoint,將會導致系統Checkpoint頻次越來越低,當系統出現問題時,沒有及時對狀態數據有效地持久化,可能會導致系統丟失數據。因此,對於非常大的狀態數據而言,應該對Checkpoint過程進行優化和調整,例如采用增量Checkpoint的方法等。
用戶也可以通過配置CheckpointConfig中setMaxConcurrentCheckpoints()方法設定並行執行的checkpoint數量,這種方法也能有效降低checkpoint堆積的問題,但會提高資源占用。同時,如果開始了並行checkpoint操作,當用戶以手動方式觸發savepoint的時候,checkpoint操作也將繼續執行,這將影響到savepoint過程中對狀態數據的持久化
二、預估狀態容量
除了對已經運行的任務進行checkpoint優化,對整個任務需要的狀態數據量進行預估也非常重要,這樣才能選擇合適的checkpoint策略。對任務狀態數據存儲的規划依賴於如下基本規則:
1.正常情況下應該盡可能留有足夠的資源來應對頻繁的反壓。
2.需要盡可能提供給額外的資源,以便在任務出現異常中斷的情況下處理積壓的數據。這些資源的預估都取決於任務停止過程中數據的積壓量,以及對任務恢復時間的要求。
3.系統中出現臨時性的反壓沒有太大的問題,但是如果系統中頻繁出現臨時性的反壓,例如下游外部系統臨時性變慢導致數據輸出速率下降,這種情況就需要考慮給予算子一定的資源
4.部分算子導致下游的算子的負載非常高,下游的算子完全是取決於上游算子的輸出,因此對類似於窗口算子的估計也將會影響到整個任務的執行,應該盡可能給這些算子留有足夠的資源以應對上游算子產生的影響。
三、異步Snapshot
默認情況下,應用中的checkpoint操作都是同步執行的,在條件允許的情況下應該盡可能地使用異步的snapshot,這樣講大幅度提升checkpoint的性能,尤其是在非常復雜的流式應用中,如多數據源關聯、co-functions操作或windows操作等,都會有較好的性能改善。
在使用異步快照需要確認應用遵循以下兩點要求:
1.首先必須是flink托管狀態,即使用flink內部提供的托管狀態所對應的數據結構,例如常用的有ValueState、ListState、ReducingState等類型狀態。
2.StateBackend必須支持異步快照,在flink1.2的版本之前,只有RocksDB完整地支持異步的Snapshot操作,從flink1.3版本以后可以在heap-based StateBackend中支持異步快照功能
四.壓縮狀態數據
flink中提供了針對checkpoint和savepoint的數據進行壓縮的方法,目前flink僅支持通過用snappy壓縮算法對狀態數據進行壓縮,在未來的版本中flink將支持其他壓縮算法。在壓縮過程中,flink的壓縮算法支持key-group層面壓縮,也就是不同的key-group分別被壓縮成不同的部分,因此解壓縮過程可以並發執行,這對大規模數據的壓縮和解壓縮帶來非常高的性能提升和較強的可擴展性。flink中使用的壓縮算法在ExecutionConfig中進行指定,通過將setUseSnapshotCompression方法中的值設定為true即可。
五.觀察checkpoint延遲時間
checkpoint延遲啟動時間並不會直接暴露在客戶端中,而是需要通過以下公式計算得出。如果改時間過長,則表明算子在進行barrier對齊,等待上游的算子將數據寫入到當前算子中,說明系統正處於一個反壓狀態下。checkpoint延遲時間可以通過整個端到端的計算時間減去異步持續的時間和同步持續的時間得出。