1、StreamGraph本質
本質就是按照用程序代碼的執行順序構建出來的用於向執行環境傳輸的流式圖,並且可以支持可視化展示給用戶的一種數據結構。
2、StreamGraph、StreamNode和StreamEdge的數據結構
StreamGraph構建DAG流圖時,其核心是要維護好節點及節點之間的關系即可,關於這塊主要是以下關鍵屬性:
而節點之間的關系是由節點自身數據結構來維護的,在StreamNode包含着節點和上下游節點間的關系:
節點關系的具體表征就是StreamEdge了:
以上三個類除了維護構建DAG流圖相關屬性外,還包含了其他與流程序執行相關的屬性以及一些其他參數,如輸入輸出序列化、格式化等等。
3、構建入口
從StreamExecutionEnvironment環境中構建StreamGraph時分成兩個部分:
首先從全局執行環境參數中構建出一個StreamGraphGenerator對象,並將部分全局參數設置進去;
然后再由這個StreamGraphGenerator對象去generat出StreamGraph
3、構建過程
構建過程又可以分為兩個步驟:
首先new一個StreamGraph對象,並且設置其全局的一些config參數:
然后對其中的每一個transform算子進行循環遞歸處理,組織其內部的Node與Edge關系,形成最終結果:
單個算子的處理時,根據每個算子的類型有其單獨的算法
4、Transformation抽象類的體系結構
這里沒有定義為接口而是一個抽象類,猜測設計這個類的初衷是提取公共屬性而非提取transform的模板方法了,否則也不會有上面那一大坨分類處理的ifelse
5、transform算子具體邏輯
transform算子的套路額基本相差不大,都是由transformXXX方法完成,大概都包含以下步驟:
a、包含input時先遞歸處理上游算子;
b、通過一個hashmap的緩沖池檢驗是否已經處理過,避免重復處理
c、選擇slot共享算法
d、生成StreamNode並加入StreamGraph
e、設置輸入輸出序列化方式、格式化類型等規則
f、設置算子並行度
g、生成StreamEdge,維護正確的上下游關系
其中如果有多輸入或者虛擬節點時,根據具體規則進行節點的拆分重組,然后再遞歸調用即可,下面看各類型節點的具體處理規則
OneInputTransformation<IN, OUT>
TwoInputTransformation<IN1, IN2, OUT>
SourceTransformation:source節點沒有上游節點了,其邊的關系由他的下游節點維護即可
SinkTransformation
UnionTransformation:union時,只需維護好上下游的關系即可,正確連接起來
SplitTransformation:split則拆分出兩個OutputSelector
SelectTransformation:select當做虛擬算子處理
FeedbackTransformation:feedback比較特殊,形成新類似的source/sink節點對,需要再次分別處理
CoFeedbackTransformation:對比參考FeedbackTransformation
PartitionTransformation:partition也是作為一類虛擬節點來處理,
SideOutputTransformation:類似partition
6、StreamGraph的addSink、addSource以及addOperator、addEdge方法
在transform各類算子時,其實在構建流圖時最核心的方法是調用了addOperator和addEdge方法;
addSink和addSource其實是調用了addOperator,同時存儲了對應的編碼
而addOperator則是調用了addNode方法來添加Node,順便再次補充設置輸入輸出的序列化方法和格式化類型
最后來看addNode方法,此處才是真正生成Node並且加入圖的方法
addEdge就比較簡單了,直接調用addEdgeInternal方法,在addEdgeInternal方法類進行分類處理,如果是虛擬節點就按照對應規則進行拆分重組然后再遞歸調用,如果是普通節點就構建關系銜接起來即可
至此,整個由StreamExecutionEnvironment中生成StreamGraph的全過程,尤其是構建流圖的核心邏輯解析完了
7、最后補充一個點
在StreamExecutionEnvironment中的transforms集合中其實是沒有存儲source算子的,是由source算子的下游往前找input時補充回去的,看運行時的代碼:
對比即可發現,同時在StreamExecutionEnvironment的addSource方法中是沒有添加operator操作的