一、jobmanage
JobManager負責接收 flink 的作業,調度 task,收集 job 的狀態、管理 TaskManagers。jobmanage啟動,再啟動task。
二、taskmanage
所有執行任務的基本容器,提供了內存管理、IO管理、通信管理等。
將所有對象序列化后放在自己的MemorySegment上進行管理。IOManager flink通過IOManager管理磁盤IO的過程,提供了同步和異步兩種寫模式。NetworkEnvironment 是TaskManager的網絡 IO 組件,包含了追蹤中間結果和數據交換的數據結構。執行task就是把收到的TaskDeploymentDescriptor
對象轉換成一個task並執行的過程。輸入的InputGate和輸出的ResultPartition的定義,該task要作為幾個subtask執行。
invokable.invoke();
方法。為什么這么說呢,因為這個方法就是用戶代碼所真正被執行的入口。
三、為執行保駕護航——Fault Tolerant與保證Exactly-Once語義
離線任務,如果失敗了只需要清空已有結果,重新跑一次就可以了。對於流任務,如果要保證能夠重新處理已處理過的數據,就要把數據保存下來;而這就面臨着幾個問題:比如一是保存多久的數據?二是重復計算的數據應該怎么處理,怎么保證冪等性?
分布式快照應運而生,快速記錄下來當前的operator的狀態、當前正在處理的元素。當整個程序的最后一個算子sink都收到了這個barrier,也就意味着這個barrier和上個barrier之間所夾雜的這批元素已經全部落袋為安。這時,最后一個算子通知JobManager整個流程已經完成,而JobManager隨后發出通知,要求所有算子刪除本次快照內容,以完成清理。這整個部分,就是Flink的兩階段提交的checkpoint過程。
要完成一次checkpoint,第一步必然是發起checkpoint請求。那么,這個請求是哪里發出的,怎么發出的,又由誰控制呢?CheckpointCoordinator
四、保存barrirs的State、StateBackend
State分為 KeyedState和OperatorState
StateBackend目前提供了三個backend,MemoryStateBackend,FsStateBackend,RocksDBStateBackend
五、數據交換
MemorySegment
就是Flink的內存抽象。默認情況下,一個MemorySegment可以被看做是一個32kb大的內存塊的抽象。NetworkBuffer
,是對MemorySegment
的包裝。Flink在各個TaskManager之間傳遞數據時,使用的是這一層的抽象。
最底層內存抽象是MemorySegment,用於數據傳輸的是Buffer,那么,承上啟下對接從Java對象轉為Buffer的中間對象是什么呢?是StreamRecord
。
六、數據在task之間交換
數據在各個task之間exchange的過程。
- 第一步必然是准備一個ResultPartition;
- 通知JobMaster;
- JobMaster通知下游節點;如果下游節點尚未部署,則部署之;
- 下游節點向上游請求數據
- 開始傳輸數據
數據在task之間傳遞有如下幾步:
- 數據在本operator處理完后,交給
RecordWriter
。每條記錄都要選擇一個下游節點,所以要經過ChannelSelector
。 - 每個channel都有一個serializer(我認為這應該是為了避免多線程寫的麻煩),把這條Record序列化為ByteBuffer
- 接下來數據被寫入ResultPartition下的各個subPartition里,此時該數據已經存入DirectBuffer(MemorySegment)
- 單獨的線程控制數據的flush速度,一旦觸發flush,則通過Netty的nio通道向對端寫入
- 對端的netty client接收到數據,decode出來,把數據拷貝到buffer里,然后通知
InputChannel
- 有可用的數據時,下游算子從阻塞醒來,從InputChannel取出buffer,再解序列化成record,交給算子執行用戶代碼
背壓問題
Flink來說,就是在數據的接收端和發送端放置了緩存池,用以緩沖數據,並且設置閘門阻止數據向下流。
當數據發送太多,下游處理不過來了,那么首先InputChannel會被填滿,然后是InputChannel能申請到的內存達到最大,於是下游停止讀取數據,上游負責發送數據的nettyServer會得到響應,停止從ResultSubPartition讀取緩存,那么ResultPartition很快也將存滿數據不能被消費,從而生產數據的邏輯被阻塞在獲取新buffer上,非常自然地形成背壓的效果。