Scheduling:
- Flink中的執行資源通過任務槽(Task Slots)定義。每個TaskManager都有一個或多個任務槽,每個槽都可以運行一個並行任務管道(pipeline)。管道由多個連續的任務組成,例如第n個MapFunction並行實例和第n個ReduceFunction並行實例。Flink經常並發地執行連續的任務:對於流程序,這在任何情況下都會發生,對於批處理程序,它也經常發生。
- 下圖說明了這一點。考慮一個具有數據源、MapFunction和ReduceFunction的程序。數據源和MapFunction的並行度為4,而ReduceFunction的並行度為3。一個管道由Source-Map-Reduce序列組成。在一個具有2個TaskManager(每個TaskManager都有3個插槽)的集群中,程序將按照如下所述執行。
- 關於Flink調度,有兩個非常重要的原則:1.同一個operator的各個subtask是不能呆在同一個SharedSlot中的,例如
FlatMap[1]和FlatMap[2]是不能在同一個SharedSlot中的。2.Flink是按照拓撲順序從Source一個個調度到Sink的。例如WordCount(Source並行度為1,其他並行度為2),那么調度的順序依次是:Source->FlatMap[1]->FlatMap[2]->KeyAgg->Sink[1]->KeyAgg->Sink[2]。假設現在有2個TaskManager,每個只有1個slot,那么分配slot的過程如圖所示:
- 為
Source分配slot。首先,我們從TaskManager1中分配出一個SharedSlot。並從SharedSlot中為Source分配出一個SimpleSlot。如上圖中的①和②。 - 為
FlatMap[1]分配slot。目前已經有一個SharedSlot,則從該SharedSlot中分配出一個SimpleSlot用來部署FlatMap[1]。如上圖中的③。 - 為
FlatMap[2]分配slot。由於TaskManager1的SharedSlot中已經有同operator的FlatMap[1]了,我們只能分配到其他SharedSlot中去。從TaskManager2中分配出一個SharedSlot,並從該SharedSlot中為FlatMap[2]分配出一個SimpleSlot。如上圖的④和⑤。 - 為
Key->Sink[1]分配slot。目前兩個SharedSlot都符合條件,從TaskManager1的SharedSlot中分配出一個SimpleSlot用來部署Key->Sink[1]。如上圖中的⑥。 - 為
Key->Sink[2]分配slot。TaskManager1的SharedSlot中已經有同operator的Key->Sink[1]了,則只能選擇另一個SharedSlot中分配出一個SimpleSlot用來部署Key->Sink[2]。如上圖中的⑦。
最后
Source、FlatMap[1]、Key->Sink[1]這些subtask都會部署到TaskManager1的唯一個slot中,並啟動對應的線程。FlatMap[2]、Key->Sink[2]這些subtask都會被部署到TaskManager2的唯一個slot中,並啟動對應的線程。從而實現了slot共享。 - 為
- 最簡單的情況下,一個slot只持有一個task,也就是
SimpleSlot的實現。復雜點的情況,一個slot能共享給多個task使用,也就是SharedSlot的實現。SharedSlot能包含其他的SharedSlot,也能包含SimpleSlot。所以一個SharedSlot能定義出一棵slots樹。
JobManager 數據結構:
- 在job執行期間,JobManager跟蹤分布式任務,決定何時調度下一個任務(或一組任務),並對完成的任務或執行失敗作出反應。
- JobManager接收JobGraph,這是由運算符(JobVertex)和中間結果(IntermediateDataSet)組成的數據流的表示。每個運算符都有屬性,比如並行性和它執行的代碼。此外,JobGraph有一組附加的庫,這些庫是執行操作符代碼所必需的。
- JobManager 將 JobGraph 轉換為 ExecutionGraph。ExecutionGraph 是 JobGraph 的並行版本:對於每個 JobVertex,它包含每個並行子任務的 ExecutionVertex。並行度為100的運算符將有一個 JobVertex 和100個 ExecutionVertex。ExecutionVertex 跟蹤特定子任務的執行狀態。一個 JobVertex 中的所有 ExecutionVertex 都保存在 ExecutionJobVertex 中,它會跟蹤操作符的整體狀態。除頂點外,執行圖還包含 IntermediateResult 和 IntermediateResultPartition。
每個ExecutionGraph都有一個與之相關聯的job狀態。這個job狀態指示當前工作的執行狀態。 - Flink job首先處於創建(created)狀態,然后切換到運行(running)狀態,完成所有工作后切換到已完成(finished)狀態。在出現故障的情況下,job首先切換到故障(failing)狀態,取消所有正在運行的任務。如果所有job頂點都已達到最終狀態,且job不可重新啟動,則job轉換為失敗。如果job可以重新啟動,那么它將進入重新啟動狀態。一旦任務完全重新啟動,它將到達創建狀態。如果用戶取消job,它將進入取消(cancelling)狀態。這還需要取消所有當前正在運行的任務。一旦所有運行的任務都達到了最終狀態,任務轉換到該狀態就會被取消。
- 與表示全局終端狀態並觸發清理作業的已完成、已取消和已失敗狀態不同,暫停(suspended)狀態僅是本地終端。本地終端意味着job的執行已經在相應的JobManager上終止,但是Flink集群的另一個JobManager可以從持久的HA存儲中檢索這個job並重新啟動它。因此,達到暫停狀態的作業不會被完全清理。

- 在執行ExecutionGraph過程中,每個並行任務都經歷多個階段,從創建到完成或失敗。下面的圖表說明了它們之間的狀態和可能的轉換。一個任務可以多次執行(例如在故障恢復過程中)。由於這個原因,ExecutionVertex的執行被跟蹤。每個ExecutionVertex都有當前的執行,以及先前的執行。

