Flink中的數據交換是圍繞着下面的原則設計的:
1.數據交換的控制流(即,為了啟動交換而傳遞的消息)是由接收者發起的,就像原始的MapReduce一樣。
2.用於數據交換的數據流,即通過電纜的實際數據傳輸,被抽象為了IntermediateResult,並且是可插拔的。 這意味着系統可以使用同一實現同時支持流數據傳輸和批處理數據傳輸。
數據交換也涉及到了一些角色,包括:
1.JobManager,master節點,負責任務調度,異常恢復,任務協調,並且通過ExecutionGraph這樣的數據結構來保存一個作業的全景圖。
2.TaskManagers,工作節點,負責將多個任務並行的在線程中執行,每個TM中包含一個CommunicationManager(在tasks之間共享)和一個MemoryManager (在tasks之間共享)。TM之間通過TCP連接來交互數據。
需要注意的是,在Flink中,數據交換是發生在TM之間的,而不是task之間,在同一個TM中的不同task會復用同一個網絡連接。
ExecutionGraph,執行圖是一個數據結構,其中包含有關作業計算的“基本事實”。 它由代表計算任務的頂點(ExecutionVertex)和代表任務產生的數據的中間結果(IntermediateResultPartition)組成。 頂點通過ExecutionEdges(EE)鏈接到它們消耗的中間結果:
這些是JobManager中存在的邏輯數據結構。它們具有與運行時等效的結構,這些結構負責TaskManager上的實際數據處理。與IntermediateResultPartition等效的運行時稱為ResultPartition。
ResultPartition(RP)表示BufferWriter寫入的數據塊,即單個任務產生的數據塊。 RP是結果子分區(RS)的集合。這是為了區分發往不同接收者的數據,例如,在用於reduce或join的分區混洗的情況下。
ResultSubpartition(RS)表示由operator創建的數據的一個分區,以及將數據轉發給接收operator的邏輯。 RS的特定實現確定了實際的數據傳輸邏輯,這是允許系統支持各種數據傳輸的可插拔機制。例如,PipelinedSubpartition是支持流數據交換的管道實現。 SpillableSubpartition是一個阻止實現,支持批量數據交換。
InputGate:接收方RP的邏輯等效項。 它負責收集數據緩沖區並將其移交給上游。
InputChannel:接收方RS的邏輯等效項。 它負責為特定分區收集數據緩沖區。
序列化器和反序列化器將類型化的記錄可靠地轉換為原始字節緩沖區,反之亦然,處理跨越多個緩沖區的記錄等。
Control flow for data exchange
該圖片表示具有兩個並行任務的簡單map-reduce作業。我們有兩個TaskManager,兩個任務(一個映射任務和一個reduce任務)在兩個不同的節點中運行,一個JobManager在第三個節點中運行。我們專注於任務M1和R2之間轉移的啟動。數據傳輸使用粗箭頭表示,消息使用細箭頭表示。首先,M1產生一個ResultPartition(RP1)(箭頭1)。當RP可供消費時(我們稍后再討論),它會通知JobManager(箭頭2)。 JobManager通知該分區(任務R1和R2)的預期接收者該分區已准備就緒。如果尚未安排接收方,則實際上將觸發任務的部署(箭頭3a,3b)。然后,接收器將向RP請求數據(箭頭4a和4b)。這將在本地(案例5a)或通過TaskManagers的網絡堆棧(5b)啟動任務之間的數據傳輸(箭頭5a和5b)。當RP決定將其可用性通知JobManager時,該過程具有一定的自由度。例如,如果RP1在通知JM之前完全產生了自身(並且可能已寫入文件中),則數據交換大致相當於Hadoop中實現的批量交換。如果RP1在產生第一個記錄后立即通知JM,我們就可以進行流數據交換。
Transfer of a byte buffer between two tasks
序列化程序將記錄序列化為它們的二進制表示形式,並將它們放置在固定大小的緩沖區中(記錄可以跨越多個緩沖區)。這些緩沖區並移交給BufferWriter並寫出到ResultPartition(RP)。 RP由幾個子分區(ResultSubpartitions-RS)組成,這些子分區收集特定使用者的緩沖區。在圖中,該緩沖區發往第二個reducer(在TaskManager 2中),並將其放置在RS2中。由於這是第一個緩沖區,因此RS2可供使用(請注意,此行為實現了流式分發),並通知JobManager。
JobManager查找RS2的使用者,並通知TaskManager 2可用數據塊。發送到TM2的消息向下傳播到應該接收此緩沖區的InputChannel,后者進而通知RS2可以啟動網絡傳輸。然后,RS2將緩沖區移交給TM1的網絡堆棧,后者又將其移交給Netty進行運輸。網絡連接是長期運行的,並且存在於TaskManager之間,而不是單個任務之間。
一旦TM2接收到緩沖區,它就會通過相似的對象層次結構,從InputChannel(與IRPQ等效的接收器端)開始,到達InputGate(包含多個IC),最后在RecordDeserializer中結束,從緩沖區生成類型化的記錄,並將其交給接收任務,在這種情況下為ReduceDriver。