Flink 問題總結
作業運行流程
新增的 operator 會被 transform 封裝,例如 map(udf) -> OneInputTransformation,里面有序列化的 udf和operator配置(名稱、uid、並行度等),並記錄前一個 transformation 作為輸入。
當 execute 被調用,client 先遍歷 transformation 構造 StreamGraph -> JobGraph -> 合並 chain 最后到達 JM。JM 將其翻譯為 ExecutionGraph。ExecutionJobVertex 有一個或多個並行度且可能被調度和執行多次,其中一個並行度的一次執行稱為 Execution,JobManager 的 Scheduler 會為每個 Execution 分配 slot。
Watermark
基礎
Process 和 Event 的選擇:是否需要重現。
watermark代表了 timestamp 數值,表示以后到來的數據已經再也沒有小於或等於這個時間的了。
生成方式:
- SourceFunction:collectWithTimestamp 或 emitWatermark
- Stream:assignTimestampsAndWatermarks
- 定期
- 數據特征
傳播方式:廣播,單輸入取其大,多輸入取其小(因此多次輸入相同的 watermark,並不會影響當前的 watermark)
缺陷:對於同一個流的不同 partition,我們對他做這種強制的時鍾同步是沒有問題的,因為一開始就把一條流拆散成不同的部分,但每一個部分之間共享相同的時鍾(多輸入取其小)。但是 JOIN 流中,多流強制同步時鍾,對於快慢流關聯就要求快流緩存大量數據等待慢流。
watermark處理:operator 先更新 watermark,然后遍歷計時器觸發 trigger,watermark 發送下游。
Table API 中的時間
- processing time 可以從一個 DataStream,把增加一列為時間來轉化成一個 Table,或直接通過 TableSource 定義 DefinedRowtimeAttributes 生成
- event time:需要保證 DataStream 中已經存在 Record Timestamp 和 watermark,從 TableSource 生成,也需要已經有 long 字段。
操作:over window、group by、window join、order(對一個 DataStream 轉化成 Table 進行排序的話,只能是按照時間列進行排序,當然同時也可以指定一些其他的列,但是時間列這個是必須的,並且必須放在第一位)。這些操作在flink底層都是按照時間列掃描計算的,這也是流處理的特點或者相對於批處理的劣勢。掃描過程中積累的狀態不能無限增長是流處理的前提(其實批處理也一樣,但批處理模型在這方面的性能應該好些)。
原理擴展
在 event-time 場景下,如果 source 沒有收到數據,那么 watermark 就有可能停滯,這里有兩種情況:
-
source 某個 partition 沒有新數據
此時 source function 可以調用 sourceContext.markAsTemporarilyIdle() 來把該 partition 設置為 idle,在這之后的 watermark 生成機制就不會考慮這個停滯了的當前 watermark,進而讓 operator 隨着 active partition 的最小 watermark 繼續推進。
源碼可參考:
SourceFunction.markAsTemporarilyIdle(), StreamStatus, StreamTaskNetworkInput.processElement, StatusWatermarkValve.inputStreamStatus 和 inputWatermark -
整個 source 沒有數據
這種情況就要考慮 AssignerWithPeriodicWatermarks,用戶自己判斷多久的 idle 后,把 event-time 改為某種 process-time 形式的推進。
參考:
State
基礎
| state分類 | operator | keyed |
|---|---|---|
| 存儲對象是否 on heap | 是 | 是/否(RocksDB) |
| 是否手動編寫快照(snapshot)和恢復 (restore) | 是 | 不用 |
| 數據規模 | 通常小 | 通常大 |
backend分類

默認使用 memory backend,不管運行state還是 checkpoint 數據都存儲在 JM heap(如果JM掛了,連 cp 都不存在了),生產環境正常情況只考慮 file 和 rocksdb backend。其中 file 運行時 state 存儲在 heap,性能更好,但有 OOM 風險,且不支持增量 checkpoint;rocksdb 不管運行還是最終 checkpoint 數據都在數據庫中,無需擔心 OOM。
- HeapKeyedStateBackend 有兩種實現:
- 支持異步 Checkpoint(默認):存儲格式 CopyOnWriteStateMap
- 僅支持同步 Checkpoint:存儲格式 NestedStateMap
- RocksDBKeyedStateBackend,每個 state 都存儲在一個單獨的 column family 內,其中 keyGroup,Key 和 Namespace 進行序列化存儲在 DB 作為 key。
RocksDB StateBackend 概覽和相關配置討論
所有存儲的 key,value 均被序列化成 bytes 進行存儲。

在 RocksDB 中,每個 state 獨享一個 Column Family,而每個 Column family 使用各自獨享的 write buffer 和 block cache,上圖中的 window state 和 value state實際上分屬不同的 column family。
對性能比較有影響的參數配置
| state.backend.rocksdb.thread.num | 后台 flush 和 compaction 的線程數. 默認值 ‘1‘. 建議調大 |
|---|---|
| state.backend.rocksdb.writebuffer.count | 每個 column family 的 write buffer 數目,默認值 ‘2‘. 如果有需要可以適當調大 |
| state.backend.rocksdb.writebuffer.size | 每個 write buffer 的 size,默認值‘64MB‘. 對於寫頻繁的場景,建議調大 |
| state.backend.rocksdb.block.cache-size | 每個 column family 的 block cache大小,默認值‘8MB’,如果存在重復讀的場景,建議調大 |
實踐
小心存儲大量元素到 operator state
operator state 的結構是一個 list,由於沒有 key group,為了實現改並發恢復的功能,需要對 operator state 中的每一個序列化后的元素存儲一個位置偏移 offset。這個 offset 是一個 long 數組,但數量一大,這個 offset 數據就會很大。在 checkpoint 的時候,JM 需要接收這個 offset 數組作為原數據,進而引起 JM 的內存超用問題。
UnionListState的使用
從檢查點恢復之后每個並發 task 內拿到的是原先所有operator 上的 state。切記恢復的 task 只取其中的一部分進行處理和用於下一次 snapshot,否則有可能隨着作業不斷的重啟而導致 state 規模不斷增長。
keyed state 的清空
state.clear() 實際上只能清理當前 key 對應的 value 值,如果想要清空整個 state,需要借助於 applyToAllKeys 方法。如果需求中只是對 state 有過期需求,借助於 state TTL 功能來清理會是一個性能更好的方案。
RocksDB運行
默認使用 Flink managed memory 方式的情況下,state.backend.rocksdb.metrics.block-cache-usage ,state.backend.rocksdb.metrics.mem-table-flush-pending,state.backend.rocksdb.metrics.num-running-compactions 以及 state.backend.rocksdb.metrics.num-running-flushes 是比較重要的相關 metrics。
Flink-1.10 之后,由於引入了 RocksDB 的內存托管機制,在絕大部分情況下, RocksDB 的這一部分 native 內存是可控的,不過受限於 RocksDB 的相關 cache 實現限制,在某些場景下,無法做到完美控制,這時候建議打開上文提到的 native metrics,觀察相關 block cache 內存使用是否存在超用情況,可以將相關內存添加到 taskmanager.memory.task.off-heap.size 中,使得 Flink 有更多的空間給 native 內存使用。
大狀態
大狀態基本只考慮 RocksDB。
-
將state進行拆分,使用 MapState 來替代 ListState 或者 ValueState,因為RocksDB 的 map state 並不是將整個 map 作為 value 進行存儲,而是將 map 中的一個條目作為鍵值對進行存儲。
-
SSD磁盤
-
多硬盤分擔壓力(單塊磁盤的多個目錄無意義):在
flink-conf.yaml中配置state.backend.rocksdb.localdir參數來指定 RocksDB 在磁盤中的存儲目錄。當一個 TaskManager 包含 3 個 slot 時,那么單個服務器上的三個並行度都對磁盤造成頻繁讀寫,從而導致三個並行度的之間相互爭搶同一個磁盤 io,這樣務必導致三個並行度的吞吐量都會下降。Flink 的 state.backend.rocksdb.localdir 參數可以指定多個目錄,一般大數據服務器都會掛載很多塊硬盤,我們期望同一個 TaskManager 的三個 slot 使用不同的硬盤從而減少資源競爭。state.backend.rocksdb.localdir: /data1/flink/rocksdb,/data2/flink/rocksdb對於硬盤較少的情況,flink 默認的隨機策略容易碰撞,考慮采用自定義磁盤選擇策略,比如輪訓。具體參考下面的 flink 大狀態優化。
參考:
CheckPoint/SavePoint
基礎
flink 的一致性快照,實際是當前 state 的數據快照。
flink 會周期性地進行 cp,且過程是異步的,所以 cp 期間仍可以處理數據。 cp 數據根據 statebackend 的不同會存儲到不同的地方。
原理:失敗/暫停重啟時,flink 會根據最新成功 cp 的數據來初始化重啟的 state,這個 state 包括 source 中記錄的消費位移,從而讓整個 flink 狀態回到該 cp 完成的那一刻。
分布式快照
- JM 的 Checkpoint Coordinator 向所有 source 節點 trigger Checkpoint
- source 接收到 cp barrier 后,觸發本地 state 的 cp,將數據存儲到持久存儲,將備份數據的地址(state handle)通知給 Checkpoint coordinator,然后廣播 cp barrier 到下游。當下游獲得其中一個CB時,就會暫停處理這個CB對應的數據,並將這些數據存到緩沖區,直到其他相同ID的CB都到齊(checkpoint barrier 對齊),就會觸發本地 state 的 cp,並廣播 cp barrier,然后處理緩存的數據。最后,當所有 task 的 state handle 都被 Checkpoint Coordinator 收集,本次 cp 就算是完成了,最后向持久化存儲中再備份一個 Checkpoint meta 文件。

優化:
- RocksDB的異步checkpoint:首先 RocksDB 會全量刷數據到磁盤上,然后 Flink 框架會從中選擇沒有上傳的文件進行持久化備份。后台線程異步發送快照到遠程storage。
- 如果是 at-least-once,就不會進行 checkpoint barrier 對齊。
checkpoint 和 savepoint
| checkpoint | savepoint |
|---|---|
| 用戶觸發和刪除 | 配置自動觸發,默認只保留最新 |
| 標准化格式存儲,允許作業升級、bug修復,A/B Test等場景,需要用戶指定路徑 | 作業 failed 或者 canceled 后重啟,不需指定路徑 |
| 全量,每次的時間較長,數據量較大 | 增量,每次的時間較短,數據量較小 |
配置
- 間隔不宜太短,默認情況,如果一個 cp 時間超過 cp 觸發間隔,那么這個 cp 一旦完成,就會馬上出發下一次 cp。可以考慮設置
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds) - 大 state 要適當增加超時時間(默認10min)
實踐
cp 失敗排查
-
cp webui 界面
Acknowledged一列表示有多少個 subtask 對這個 Checkpoint 進行了 ackLatest Acknowledgement表示該 operator 的所有 subtask 最后 ack 的時間;End to End Duration表示整個 operator 的所有 subtask 中完成 snapshot 的最長時間;State Size表示當前 Checkpoint 的 state 大小 -- 主要這里如果是增量 checkpoint 的話,則表示增量大小;Buffered During Alignment表示在 barrier 對齊階段積攢了多少數據,如果這個數據過大也間接表示對齊比較慢); -
失敗原因
-
Checkpoint Decline
// JM日志 Decline checkpoint 10423 by task 0b60f08bf8984085b59f8d9bc74ce2e1 of job 85d268e6fbc19411185f7e4868a44178 // 可以從 JM日志 查找 0b60f08bf8984085b59f8d9bc74ce2e1 被分到哪個 TM org.apache.flink.runtime.executiongraph.ExecutionGraph - XXXXXXXXXXX (100/289) (87b751b1fd90e32af55f02bb2f9a9892) switched from SCHEDULED to DEPLOYING. org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying XXXXXXXXXXX (100/289) (attempt #0) to slot container_e24_1566836790522_8088_04_013155_1 on hostnameABCDE // 從上面日志可以確定被調度到 hostnameABCDE 的 container_e24_1566836790522_8088_04_013155_1 slot,當相應 TM 查看日志。 -
Checkpoint cancel
當前 Flink 中如果較小的 Checkpoint 還沒有對齊的情況下,收到了更大的 Checkpoint,則會把較小的 Checkpoint 給取消掉。
-
Checkpoint Expire
// 由下面日志可知,參考上面 Checkpoint Decline 的方法找到對應的 TM 日志。 Received late message for now expired checkpoint attempt 1 from 0b60f08bf8984085b59f8d9bc74ce2e1 of job 85d268e6fbc19411185f7e4868a44178.開啟debug后,可以通過日志分析出慢在哪個階段。
// barrier 對齊后,准備 cp Starting checkpoint (6751) CHECKPOINT on task taskNameWithSubtasks // 同步階段 org.apache.flink.runtime.state.AbstractSnapshotStrategy - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@70442baf, checkpointDirectory=xxxxxxxx, sharedStateDirectory=xxxxxxxx, taskOwnedStateDirectory=xxxxxx, metadataFilePath=xxxxxx, reference=(default), fileStateSizeThreshold=1024}, synchronous part) in thread Thread[Async calls on Source: xxxxxx _source -> Filter (27/70),5,Flink Task Threads] took 0 ms. // 異步階段 ... asynchronous part) in thread Thread[pool-48-thread-14,5,Flink Task Threads] took 369 ms分析原因:
-
source trigger 慢
- 搶不到鎖:一般不會,在舊版可能因為搶不到鎖,如果對應 TM 沒有 准備 cp 日志,則可以考慮這種情況,並用 jstack 分析鎖情況。在新版已經使用 mailBox 優化。
- 反壓或數據傾斜
- 主線程cpu消耗太高:在 task 端,所有的處理都是單線程的,數據處理和 barrier 處理都由主線程處理,如果主線程在處理太慢(比如使用 RocksDBBackend,state 操作慢導致整體處理慢),導致 barrier 處理的慢,也會影響整體 Checkpoint 的進度,在這一步我們需要能夠查看某個 PID 對應 hotmethod。使用工具 AsyncProfile dump 一份火焰圖,查看占用 CPU 最多的棧
-
同步階段慢
同步階段一般不會太慢,但是如果我們通過日志發現同步階段比較慢的話,對於非 RocksDBBackend 我們可以考慮查看是否開啟了異步 snapshot,如果開啟了異步 snapshot 還是慢,需要看整個 JVM 在干嘛,也可以使用前一節中的工具。對於 RocksDBBackend 來說,我們可以用
iostate查看磁盤的壓力如何,另外可以查看 tm 端 RocksDB 的 log 的日志如何,查看其中 SNAPSHOT 的時間總共開銷多少。 -
異步慢
對於異步階段來說,tm 端主要將 state 備份到持久化存儲上,對於非 RocksDBBackend 來說,主要瓶頸來自於網絡,這個階段可以考慮觀察網絡的 metric,或者對應機器上能夠觀察到網絡流量的情況(比如
iftop)。對於 RocksDB 來說,則需要從本地讀取文件,寫入到遠程的持久化存儲上,所以不僅需要考慮網絡的瓶頸,還需要考慮本地磁盤的性能。另外對於 RocksDBBackend 來說,如果覺得網絡流量不是瓶頸,但是上傳比較慢的話,還可以嘗試考慮開啟多線程上傳功能
-
-
cp 超時的排查(可以結合下面反壓的排查思路)
- Barrier對齊,由於某些
- 查看 JM 日志,看是哪些 task 的問題,有可能是數據傾斜等原因。
- 狀態大,異步狀態遍歷和寫hdfs耗時:考慮使用 RocksDB 的增量 cp,考慮 state 是否可以用 mapstate 優化。
cp 執行情況
- 通過 ui 可以計算公式 end_to_end_duration - synchronous_duration - asynchronous_duration = checkpoint_start_delay。如果計算結果通常比較大,那說明 checkpoint barrier 不能暢通地流經所有的 operator,有可能有反壓存在。
- 對於 exactly-once,如果緩存隊列在 cp 時很高,那說明 operator 處理數據的效率不均,可能數據傾斜。
上面兩個數值如果一直高,那很可能是 cp 本身的問題。
Tuning Checkpoints and Large State(未完)
原理擴展
增量 checkpoint
Flink 的增量 checkpoint 以 RocksDB 的 checkpoint 為基礎。RocksDB 把所有的修改保存在內存的可變緩存中(稱為 memtable),所有對 memtable 中 key 的修改,會覆蓋之前的 value,當前 memtable 滿了之后,RocksDB 會將所有數據以有序的寫到磁盤。當 RocksDB 將 memtable 寫到磁盤后,整個文件就不再可變,稱為有序字符串表(sstable)。RocksDB 的后台壓縮線程會將 sstable 進行合並,就重復的鍵進行合並,合並后的 sstable 包含所有的鍵值對,RocksDB 會刪除合並前的 sstable。
在這個基礎上,Flink 會記錄上次 checkpoint 之后所有新生成和刪除的 sstable,另外因為 sstable 是不可變的,Flink 用 sstable 來記錄狀態的變化。為此,Flink 調用 RocksDB 的 flush,強制將 memtable 的數據全部寫到 sstable,並硬鏈到一個臨時目錄中。這個步驟是在同步階段完成,其他剩下的部分都在異步階段完成,不會阻塞正常的數據處理。
Flink 將所有新生成的 sstable 備份到持久化存儲(比如 HDFS,S3),並在新的 checkpoint 中引用。Flink 並不備份前一個 checkpoint 中已經存在的 sstable,而是引用他們。Flink 還能夠保證所有的 checkpoint 都不會引用已經刪除的文件。

增量 checkpoint 可以減少 checkpoint 的總時間,但是也可能導致恢復的時候需要更長的時間。(從上面的流程可知,被持久化的sstable會包含未被刪除等多余數據,所以在恢復時,TM 下載的state數據量更大,再要經過一次全體的merge才能做到去重,才能最終用於state的初始化)
checkpoint源碼
- JM CheckpointCoordinator trigger checkpoint
- Source 收到 trigger checkpoint 的 PRC,並往下游發送 barrier,自己開始做 snapshot。
- 下游接收 barrier(需要 barrier 都到齊才會開始做 checkpoint)
- Task 開始同步階段 snapshot
- Task 開始異步階段 snapshot
- Task snapshot 完成,匯報給 JM
ExecutionGraphBuilder.buildGraph() {
if config checkpoint {
executionGraph.enableCheckpointing() {
checkpointCoordinator.createActivatorDeactivator
}
}
}
// ActivatorDeactivator 這個對象在 JobStatus 變為 RUNNING 時會調用
coordinator.startCheckpointScheduler(){
scheduleTriggerWithDelay(){
return timer.scheduleAtFixedRate(new ScheduledTrigger()...) // 返回一個 ScheduledFuture
}
}
// 上面的 ScheduledTrigger 會在設置的時候執行 run 方法,這個方法就是 triggerCheckpoint
// CheckpointCoordinator 在實例化時就被傳入下面三個數組(buildGraph時生成)
/** Tasks who need to be sent a message when a checkpoint is started. */
private final ExecutionVertex[] tasksToTrigger;
/** Tasks who need to acknowledge a checkpoint before it succeeds. */
private final ExecutionVertex[] tasksToWaitFor;
/** Tasks who need to be sent a message when a checkpoint is confirmed. */
private final ExecutionVertex[] tasksToCommitTo;
{
檢查 tasksToTrigger、tasksToWaitFor 數組,看 task 是否都符合 checkpoint 條件
for execution {
triggerCheckpoint(checkpointID, timestamp, checkpointOptions) {
taskManagerGateway.triggerCheckpoint(){
taskExecutorGateway.triggerCheckpoint(){ // rpc調用,tm 封裝了 te
task.triggerCheckpointBarrier(){
invokable.triggerCheckpointAsync
}
}
}
}
}
}
// source function
SourceStreamTask.triggerCheckpointAsync(){
mailboxProcessor.getMainMailboxExecutor().submit(() -> triggerCheckpoint(){
StreamTask.performCheckpoint(){
prepareSnapshotPreBarrier()
broadcastCheckpointBarrier()
checkpointState(){
storage = checkpointStorage.resolveCheckpointStorageLocation
new CheckpointingOperation.executeCheckpointing {
// synchronous checkpoints
for (StreamOperator<?> op : allOperators) {
checkpointStreamOperator(op);
}
// asynchronous part
StreamTask.asyncOperationsThreadPool.execute(new AsyncCheckpointRunnable);
}
}
}
});
}
// non-source function. The CheckpointedInputGate uses CheckpointBarrierHandler to handle incoming CheckpointBarrier from the InputGate.
pollNext(){
if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
CheckpointBarrierAligner.processBarrier() {
// regular case
onBarrier(channelIndex) {
blockedChannels[channelIndex] = true;
}
// 判斷是否已經全部對齊
notifyCheckpoint() {
toNotifyOnCheckpoint.triggerCheckpointOnBarrier() {
performCheckpoint() //
}
}
}
}
}
參考:
Apache Flink 進階(三):Checkpoint 原理剖析與應用實踐
《Stream Processing with Apache Flink》
Apache Flink 管理大型狀態之增量 Checkpoint 詳解
Tuning Checkpoints and Large State
Back Pressure
基礎
概念:數據管道中某個節點處理速率跟不上上游發送數據的速率,而對上游,直到數據源,進行限速。
原理:Flink 拓撲中每個節點(Task)間的數據都以阻塞隊列的方式傳輸,下游來不及消費導致隊列被占滿后,上游的生產也會被阻塞,最終導致數據源的攝入被阻塞。

影響:
- 潛在的性能瓶頸,可能導致更大的數據延遲。
- 增加 checkpoint 時長,因為 checkpoint barrier 不會超過普通數據,而數據的阻塞也導致 barrier 的阻塞。
- 在 exactly-once 下,state 變大,因為 checkpoint barrier 需要對齊,導致快的節點要等慢的節點,此時快的節點可能已經處理了很多數據,這些數據在慢節點完成 checkpoint 前都要被緩存加到 state 中。(對於 heap-base statebackend 影響更大,可能 oom)
處理
-
定位:
-
基於網絡的反壓 metrics 並不能定位到具體的 Operator,只能定位到 Task。

-
TaskManager 傳輸數據時,不同的 TaskManager 上的兩個 Subtask 間通常根據 key 的數量有多個 Channel,這些 Channel 會復用同一個 TaskManager 級別的 TCP 鏈接,並且共享接收端 Subtask 級別的 Buffer Pool。在接收端,每個 Channl 在初始階段會被分配固定數量的 Exclusive Buffer,這些 Buffer 會被用於存儲接受到的數據,交給 Operator 使用后再次被釋放。Channel 接收端空閑的 Buffer 數量稱為 Credit,Credit 會被定時同步給發送端被后者用於決定發送多少個 Buffer 的數據。在流量較大時,Channel 的 Exclusive Buffer 可能會被寫滿,此時 Flink 會向 Buffer Pool 申請剩余的 Floating Buffer。這些 Floating Buffer 屬於備用 Buffer,哪個 Channel 需要就去哪里。而在 Channel 發送端,一個 Subtask 所有的 Channel 會共享同一個 Buffer Pool,這邊就沒有區分 Exclusive Buffer 和 Floating Buffer。
| Metris | 描述 |
| :--------------------------------- | :------------------------------- |
| outPoolUsage | 發送端 Buffer 的使用率 |
| inPoolUsage | 接收端 Buffer 的使用率 |
| floatingBuffersUsage(1.9 以上) | 接收端 Floating Buffer 的使用率 |
| exclusiveBuffersUsage (1.9 以上) | 接收端 Exclusive Buffer 的使用率 |
inPoolUsage = floatingBuffersUsage + exclusiveBuffersUsage

當 outPoolUsage 和 inPoolUsage 使用率同低正常,同高被下游反壓,不同時,要么是反壓傳遞階段,要么就是反壓根源。如果一個 Subtask 的 outPoolUsage 是高,通常是被下游 Task 所影響,所以可以排查它本身是反壓根源的可能性。如果一個 Subtask 的 outPoolUsage 是低,但其 inPoolUsage 是高,則表明它有可能是反壓的根源。反壓有時是短暫的且影響不大,比如來自某個 Channel 的短暫網絡延遲或者 TaskManager 的正常 GC。

通常來說,floatingBuffersUsage 為高則表明反壓正在傳導至上游,而 exclusiveBuffersUsage 則表明了反壓是否存在傾斜(floatingBuffersUsage 高、exclusiveBuffersUsage 低為有傾斜,因為少數 channel 占用了大部分的 Floating Buffer)。
-
Web UI:提供了 SubTask 級別的反壓監控。
通過周期性對 Task 線程的棧信息采樣,得到線程被阻塞在請求 Buffer(意味着被下游隊列阻塞)的頻率來判斷該節點是否處於反壓狀態。默認配置下,這個頻率在 0.1 以下則為
OK,0.1 至 0.5 為LOW,而超過 0.5 則為HIGH。問題有兩種可能:- 節點的發送速率跟不上它的產生數據速率(如 flatmap 中一條輸入產生多條輸出)
- 下游節點反壓導致。反壓面板監控的是發送端,如果某個節點是性能瓶頸並不會導致它本身出現高反壓,而是導致它的上游出現高反壓。
如果我們找到第一個出現反壓的節點,那么反壓根源要么是就這個節點,要么是它緊接着的下游節點。區分這兩種狀態需要結合上面的 metrics。
如果作業的節點數很多或者並行度很大,由於要采集所有 Task 的棧信息,反壓面板的壓力也會很大甚至不可用。
-
分析
- Web UI 各個 SubTask 的 Records Sent 和 Record Received
- Checkpoint detail 里不同 SubTask 的 State size
- 對 TaskManager 進行 CPU profile,從中我們可以分析到 Task Thread 是否跑滿一個 CPU 核,是的話,是哪個函數效率低;不是的話,哪里阻塞。
未來的版本 Flink 將會直接在 WebUI 提供 JVM 的 CPU 火焰圖。
- 如果是內存或GC相關,可以啟動G1優化,加上
-XX:+PrintGCDetails來觀察日志。
原理擴展
首先 Producer Operator 從自己的上游或者外部數據源讀取到數據后,對一條條的數據進行處理,處理完的數據首先輸出到 Producer Operator 對應的 NetWork Buffer 中。Buffer 寫滿或者超時或者特殊事件(如 checkpoint barrier)后,就會觸發將 NetWork Buffer 中的數據拷貝到 Producer 端 Netty 的 ChannelOutbound Buffer(嚴格來講,Output flusher 不提供任何保證——它只向 Netty 發送通知,而 Netty 線程會按照能力與意願進行處理,所以即便觸發了 flush,也不一定發送數據),之后又把數據拷貝到 Socket 的 Send Buffer 中,這里有一個從用戶態拷貝到內核態的過程,最后通過 Socket 發送網絡請求,把 Send Buffer 中的數據發送到 Consumer 端的 Receive Buffer。數據到達 Consumer 端后,再依次從 Socket 的 Receive Buffer 拷貝到 Netty 的 ChannelInbound Buffer,再拷貝到 Consumer Operator 的 NetWork Buffer,最后 Consumer Operator 就可以讀到數據進行處理了。這就是兩個 TaskManager 之間的數據傳輸過程,我們可以看到發送方和接收方各有三層的 Buffer。

每個 Operator 計算數據時,輸出和輸入都有對應的 NetWork Buffer,這個 NetWork Buffer 對應到 Flink 就是圖中所示的 ResultSubPartition 和 InputChannel。ResultSubPartition 和 InputChannel 都是向 LocalBufferPool 申請 Buffer 空間,然后 LocalBufferPool 再向 NetWork BufferPool 申請內存空間。這里,NetWork BufferPool 是 TaskManager 內所有 Task 共享的 BufferPool,TaskManager 初始化時就會向堆外內存申請 NetWork BufferPool。LocalBufferPool 是每個 Task 自己的 BufferPool,假如一個 TaskManager 內運行着 5 個 Task,那么就會有 5 個 LocalBufferPool,但 TaskManager 內永遠只有一個 NetWork BufferPool。Netty 的 Buffer 也是初始化時直接向堆外內存申請內存空間。雖然可以申請,但是必須明白內存申請肯定是有限制的,不可能無限制的申請,我們在啟動任務時可以指定該任務最多可能申請多大的內存空間用於 NetWork Buffer。
Flink 1.5 后才用 credit 反壓機制,例如,上游 SubTask A.2 發送完數據后,還有 5 個 Buffer 被積壓,那么會把發送數據和 Backlog size = 5 一塊發送給下游 SubTask B.4,下游接受到數據后,知道上游積壓了 5 個Buffer,於是向 Buffer Pool 申請 Buffer,由於容量有限,下游 InputChannel 目前僅有 2 個 Buffer 空間,所以,SubTask B.4 會向上游 SubTask A.2 反饋 Channel Credit = 2。然后上游下一次最多只給下游發送 2 個 Buffer 的數據,這樣每次上游發送的數據都是下游 InputChannel 的 Buffer 可以承受的數據量。當可發數據為0時,上游會定期地僅發送 backlog size 給下游,直到下游反饋大於0的 credit。
當然,有了這個機制也並不代表 Flink 能解決外部的反壓問題。比如 Flink 寫入 ES,而 ES 沒有反饋機制,那么就會導致 ES 的 socket 被塞滿,甚至響應 timeout,結果任務就失敗了。Kafka 有反饋功能。
參考
語義/exactly-once
基礎
-
AT-MOST-ONCE:do nothing,數據丟失,適合准確度要求不高的。
-
AT-LEAST-ONCE:保證沒有數據丟失,即便對數據進行重復處理。適合計算最值的情況。
-
EXACTLY-ONCE:沒有數據丟失、事件只會產生一次最終結果。本質還是處理多次,但之前的處理被抹去。
end-to-end exactly-once 原理:
前提:end-to-end exactly-once 需要外部組件提供 commit 和 roll back 功能。二階段提交是兼容這兩個功能的常用方案。下面以 kafka - flink - kafka 為例。數據的輸出必須全部在一個 transaction 里,commit 包含兩個 cp 間的所有數據,這樣來保證數據輸出能夠 roll back。在分布式的場景下,commit 和 rollback 需要整體的 agree,這里就要使用 2pc 了。
過程:開始 cp 表示 pre-commit,JM 發送 checkpoint barrier 來對數據進行分割,barrier 前面為本次 cp 數據,后面為下次 cp 數據。barrier 經過 operator 時會觸發該 operator 的 state backend 快照它的 state。當所有 operator 的快照完成,包括 pre-committed external state,這時 cp 就完成了。下一步 JM 通知所有 operators cp 完成,但實際上只有 sink 需要響應,即進行最終 commit。
這個過程的 pre-commit 如果有失敗,整個 cp 都是失敗,馬上進行回滾。另外,在 commit 階段,必須在 kafka transaction timeout 內正常完成(期間可能出現網絡異常、flink重啟等),否則會丟失該批 commit 數據的結果。
消費端注意:isolation.level 為 read_committed
使用
TwoPhaseCommitSinkFunction
1. beginTransaction - to begin the transaction, we create a temporary file in a temporary directory on our destination file system. Subsequently, we can write data to this file as we process it.
2. preCommit - on pre-commit, we flush the file, close it, and never write to it again. We’ll also start a new transaction for any subsequent writes that belong to the next checkpoint.
3. commit - on commit, we atomically move the pre-committed file to the actual destination directory. Please note that this increases the latency in the visibility of the output data.
4. abort - on abort, we delete the temporary file.
原理擴展
public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
extends RichSinkFunction<IN>
implements CheckpointedFunction, CheckpointListener {
// 首先在 initializeState 方法中開啟事務,對於 Flink sink 的兩階段提交.
// 第一階段就是執行 CheckpointedFunction#snapshotState 當所有 task 的 checkpoint 都完成之后,每個 task 會執行 CheckpointedFunction#notifyCheckpointComplete 也就是所謂的第二階段。
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// this is like the pre-commit of a 2-phase-commit transaction
// we are ready to commit and remember the transaction
long checkpointId = context.getCheckpointId();
// 第一次調用的事務都在 initializeState 方法中
preCommit(currentTransactionHolder.handle);
// 保存了每個 checkpoint 對應的事務
pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
// 下一次的事務處理者
currentTransactionHolder = beginTransactionInternal();
state.clear();
state.add(new State<>(
this.currentTransactionHolder,
new ArrayList<>(pendingCommitTransactions.values()),
userContext));
}
@Override
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
// the following scenarios are possible here
//
// (1) there is exactly one transaction from the latest checkpoint that
// was triggered and completed. That should be the common case.
// Simply commit that transaction in that case.
//
// (2) there are multiple pending transactions because one previous
// checkpoint was skipped. That is a rare case, but can happen
// for example when:
//
// - the master cannot persist the metadata of the last
// checkpoint (temporary outage in the storage system) but
// could persist a successive checkpoint (the one notified here)
//
// - other tasks could not persist their status during
// the previous checkpoint, but did not trigger a failure because they
// could hold onto their state and could successfully persist it in
// a successive checkpoint (the one notified here)
//
// In both cases, the prior checkpoint never reach a committed state, but
// this checkpoint is always expected to subsume the prior one and cover all
// changes since the last successful one. As a consequence, we need to commit
// all pending transactions.
//
// (3) Multiple transactions are pending, but the checkpoint complete notification
// relates not to the latest. That is possible, because notification messages
// can be delayed (in an extreme case till arrive after a succeeding checkpoint
// was triggered) and because there can be concurrent overlapping checkpoints
// (a new one is started before the previous fully finished).
//
// ==> There should never be a case where we have no pending transaction here
//
Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
Throwable firstError = null;
// 全部事務提交
while (pendingTransactionIterator.hasNext()) {
Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
Long pendingTransactionCheckpointId = entry.getKey();
TransactionHolder<TXN> pendingTransaction = entry.getValue();
if (pendingTransactionCheckpointId > checkpointId) {
continue;
}
logWarningIfTimeoutAlmostReached(pendingTransaction);
try {
commit(pendingTransaction.handle);
} catch (Throwable t) {
if (firstError == null) {
firstError = t;
}
}
pendingTransactionIterator.remove();
}
if (firstError != null) {
throw ...
}
}
}
參考:
An Overview of End-to-End Exactly-Once Processing in Apache Flink
資源管理
Flink 的內存管理也主要指 TaskManager 的內存管理。TM 的資源(主要是內存)分為三個層級,分別是最粗粒度的進程級(TaskManager 進程本身),線程級(TaskManager 的 slot)和 SubTask 級(多個 SubTask 共用一個 slot)。
-
進程:
- Heap Memory: 由 JVM 直接管理的 heap 內存,留給用戶代碼以及沒有顯式內存管理的 Flink 系統活動使用(比如 StateBackend、ResourceManager 的元數據管理等)。
- Network Memory: 用於網絡傳輸(比如 shuffle、broadcast)的內存 Buffer 池,屬於 Direct Memory 並由 Flink 管理。
- Cutoff Memory: 在容器化環境下進程使用的物理內存有上限,需要預留一部分內存給 JVM 本身,比如線程棧內存、class 等元數據內存、GC 內存等。
- Managed Memory: 由 Flink Memory Manager 直接管理的內存,是數據在 Operator 內部的物理表示。Managed Memory 可以被配置為 on-heap 或者 off-heap (direct memory)的,off-heap 的 Managed Memory 將有效減小 JVM heap 的大小並減輕 GC 負擔。目前 Managed Memory 只用於 Batch 類型的作業,需要緩存數據的操作比如 hash join、sort 等都依賴於它。
-
線程:
TaskManager 會將其資源均分為若干個 slot (在 YARN/Mesos/K8s 環境通常是每個 TaskManager 只包含 1 個 slot),沒有 slot sharing 的情況下每個 slot 可以運行一個 SubTask 線程。除了 Managed Memory,屬於同一 TaskManager 的 slot 之間基本是沒有資源隔離的,包括 Heap Memory、Network Buffer、Cutoff Memory 都是共享的。所以目前 slot 主要的用處是限制一個 TaskManager 的 SubTask 數。默認情況下, Flink 允許多個 SubTask 共用一個 slot 的資源,前提是這些 SubTask 屬於同一個 Job 的不同 Task。這樣能夠節省 slot(線程數),且有效利用資源(比如在同一個 slot 的 source 和 map,source 主要使用網絡 IO,而 map 可能主要需要 cpu)
目前 Flink 的內存管理是比較粗粒度的,資源隔離並不是很完整,而且在不同部署模式下(Standalone/YARN/Mesos/K8s)或不同計算模式下(Streaming/Batch)的內存分配也不太一致,為深度平台化及大規模應用增添了難度。
目前 Flink 的資源是預先靜態分配的,也就是說 TaskManager 進程啟動后 slot 的數目和每個 slot 的資源數都是固定的而且不能改變,這些 slot 的生命周期和 TaskManager 是相同的。Flink Job 后續只能向 TaskManager 申請和釋放這些 slot,而沒有對 slot 資源數的話語權。
Flink 1.10 的改進
-
統一內存配置

-
動態 slot:目前涉及到 Managed Memory 資源,TaskManager 的其他資源比如 JVM heap 還是多個 slot 共享的。
-
細粒度的算子資源管理:
- 戶使用 API 構建的 Operator(以 Transformation 表示)會附帶
ResourceSpecs,描述該 Operator 需要的資源,默認為unknown。 - 當生成 JobGraph 的時候,StreamingJobGraphGenerator 根據
ResourceSpecs計算出每個 Operator 占的資源比例(主要是 Managed Memory 的比例)。 - 進行調度的時候,Operator 的資源將被加總成為 Task 的
ResourceProfiles(包括 Managed Memory 和根據 Task 總資源算出的 Network Memory)。這些 Task 會被划分為 SubTask 實例被部署到 TaskManager 上。 - 當 TaskManager 啟動 SubTask 的時候,會根據各 Operator 的資源占比划分 Slot Managed Memory。划分的方式可以是用戶指定每個 Operator 的資源占比,或者默認均等分。
- 戶使用 API 構建的 Operator(以 Transformation 表示)會附帶
Mechine Learning/AI
基礎
流批統一框架

- 首先是數據的管理和獲取階段(Data Acquisition),在這個階段 Flink 提供了非常豐富的 connector(包括對 HDFS,Kafka 等多種存儲的支持),Flink 目前還沒有提供對整個數據集的管理。
- 下一個階段是整個數據的預處理(Preprocessing)及特征工程部分,在這個階段 Flink 已經是一個批流統一的計算引擎,並且提供了較強的 SQL 支持。
- 之后是模型訓練過程(Model Training),在這個過程中,Flink 提供了 Iterator 的支持,並且有如 Alink,MLlib 這樣豐富的機器學習庫支持,且支持 TensorFlow,Pytorch 這樣的深度學習框架。
- 模型產出之后是模型驗證和管理階段(Model Validation & Serving),這個階段 Flink 目前還沒有涉足。
- 最后是線上推理階段(Inference),這個階段 Flink 還沒有形成一套完整的方案。同時形成了 Flink ML Pipeline,以及目前正在做的 Flink AI Flow。
上面框架涉及到兩個開源項目
https://github.com/alibaba/Alink
https://github.com/alibaba/flink-ai-extended
Flink ML Pipeline
Pipeline 主要涉及兩個抽象,第一個是 Transformer 抽象,是對數據預處理和在線推理的抽象。第二個抽象是 Estimator 抽象,主要是對整個模型訓練的抽象。兩個抽象最大的差異是 Transformer 是將一份數據轉化為另一份處理后的數據,而 Estimator 是將數據進行訓練轉化為模型。
參考:
