本文是博主閱讀官網文檔、博客及書籍后自己所思所得,若是存在有誤的地方,歡迎留言分享,謝謝!
一、任務調度
Flink是通過task slot的來定義執行資源的,為優化資源的利用率,Flink通過slot共享,可以將多個連續的task任務組成的一個pipeline放在一個slot中運行。當任務並行度>1時,並行任務中的每個pipeline就會分配到一個slot去執行,這樣就會有一個問題,若是任務的並行度大於集群中slot的個數了,會咋辦?首先,毫無疑問的一點是集群中的slot中都會有pipeline在跑;其次,多的任務就會等待現有的運行結束再去運行。下面結合官網中提供的例子說明一般情況下pipeline的分配情況[1]。
下圖中,一個pipeline由Source - Map - Reduce組成,其中MapFunction的並行度為4,ReduceFunction的並行度為3,集群有兩個TaskManager,其中每個TaskManager有3個slot。
圖中,每一個pipeline由一個顏色表示,其中包含3個小圈,每一個圈代表一個算子,ReduceFunction的並行度為3,而MapFunction的為4,所以從Map->Reduce會發生shuffer。圖中,任務會以ExecutionVertex 組成的 DAG 圖的形式分配到兩個TaskManage的slot中,在TaskManager2的slot中,運行在其中一個slot的DAG僅有兩個ExecutionVertex ,這里會發生網絡shuffer。
二、JobManager 數據結構
運行在各個TaskManager的slot中任務的調度是通過JobManager完成,除此之外,JobManager還負責失敗任務的重啟等。
當JobManager接受到JobGraph(JobGraph 是數據流的表現形式,包括JobVertex和中間結果IntermediateDataSet,每個算子都有諸如並行度和執行代碼等屬性)會將其轉換為ExecutionGraph,兩者之間的關系如下圖所示:
對每個 JobVertex,可以看成是經過算子優化組成一個個operator chain(每個operator chain可以是一個或多個算子)和相關信息組成,而ExecutionVertex可以看做是JobVertex的並行版,假設組成一個JobVertex的operator chain的並行度為100,則在ExecutionGraph中,ExecutionVertex有100個,對應關系可以多看看上圖。
在JobGraph轉換到ExecutionGraph的過程中[2],主要發生了以下轉變:
- 加入了並行度的概念,成為真正可調度的圖結構
- 生成了與JobVertex對應的ExecutionJobVertex,ExecutionVertex,與IntermediateDataSet對應的IntermediateResult和IntermediateResultPartition等,並行將通過這些類實現。
每個 ExecutionGraph 都有一個與其相關聯的作業狀態。此作業狀態指示作業執行的當前狀態,具體的狀態圖如下:
圖中各個狀態說明情況很清楚,就不詳細說明,需要注意的是暫停狀態的作業將可能不會被完全清理。暫停狀態(suspended)僅處於本地終止狀態,在Flink的HA模式下,意味着作業的執行僅在相應的 JobManager 上終止,但集群的另一個 JobManager 可以從持久的HA存儲中恢復這個作業並重新啟動。
Ref:
[1]https://ci.apache.org/projects/flink/flink-docs-release-1.6/internals/job_scheduling.html