[源碼分析] 從源碼入手看 Flink Watermark 之傳播過程
0x00 摘要
本文將通過源碼分析,帶領大家熟悉Flink Watermark 之傳播過程,順便也可以對Flink整體邏輯有一個大致把握。
0x01 總述
從靜態角度講,watermarks是實現流式計算的核心概念;從動態角度說,watermarks貫穿整個流處理程序。所以為了講解watermarks的傳播,需要對flink的很多模塊/概念進行了解,涉及幾乎各個階段。我首先會講解相關概念,然后會根據一個實例代碼從以下幾部分來解釋:程序邏輯/計算圖模型/程序執行。最后是詳細Flink源碼分析(略冗長,可以選擇性閱讀)。
0x02 相關概念
流計算被抽象成四個問題,what,where,when,how。
window解決的是where,也就是將無界數據划分成有界數據。
window的數據何時被計算是when?解決這個問題用的方式是watermark和trigger,watermark用來標記窗口的完整性。trigger用來設計窗口數據觸發條件。
1. 亂序處理
亂序問題一般是和event time關聯的, 對於一個流式處理系統的process time來說,是不存在亂序問題的。所以下面介紹的watermark/allowedLateness也只是在event time作為主時間才生效。
Flink中處理亂序依賴的 watermark+window+trigger,屬於全局性的處理;Flink同時對於window而言,還提供了allowedLateness方法,使得更大限度的允許亂序,屬於局部性的處理;
即watermark是全局的,不止針對window計算,而allowedLateness讓某一個特定window函數能自己控制處理延遲數據的策略,allowedLateness是窗口函數的屬性。
2. Watermark(水位線)
watermark是流式系統中主要用於解決流式系統中數據亂序問題的機制,方法是用於標記當前處理到什么水位的數據了,這意味着再早於這個水位的數據過來會被直接丟棄。這使得引擎可以自動跟蹤數據中的當前事件時間,並嘗試相應地清除舊狀態。
Watermarking表示多長時間以前的數據將不再更新,您可以通過指定事件時間列來定義查詢的Watermarking,並根據事件時間預測數據的延遲時間。也就是說每次窗口滑動之前會進行Watermarking的計算。當一組數據或新接收的數據事件時間小於Watermarking時,則該數據不會更新,在內存中就不會維護該組數據的狀態。
換一種說法,閾值內的滯后數據將被聚合,但是晚於閾值到來的數據(其實際時間比watermark小)將被丟棄。
watermark和數據本身一樣作為正常的消息在流中流動。
3. Trigger
Trigger 指明在哪些條件下觸發window計算,基於處理數據時的時間以及事件的特定屬性。一般trigger的實現是當watermark處於某種時間條件下或者窗口數據達到一定條件,窗口的數據開始計算。
每個窗口分配器都會有一個默認的Trigger。如果默認的Trigger不能滿足你的需求,你可以指定一個自定義的trigger()。Flink Trigger接口有如下方法允許trigger對不同的事件做出反應:
* onElement():進入窗口的每個元素都會調用該方法。
* onEventTime():事件時間timer觸發的時候被調用。
* onProcessingTime():處理時間timer觸發的時候會被調用。
* onMerge():有狀態的觸發器相關,並在它們相應的窗口合並時合並兩個觸發器的狀態,例如使用會話窗口。
* clear():該方法主要是執行窗口的刪除操作。
每次trigger,都是要對新增的數據,相關的window進行重新計算,並輸出。輸出有complete, append,update三種輸出模式:
-
Complete mode:Result Table 全量輸出,也就是重新計算過的window結果都輸出。意味着這種模式下,每次讀了新增的input數據,output的時候會把內存中resulttable中所有window的結果都輸出一遍。
-
Append mode (default):只有 Result Table 中新增的行才會被輸出,所謂新增是指自上一次 trigger 的時候。因為只是輸出新增的行,所以如果老數據有改動就不適合使用這種模式。 更新的window並不輸出,否則外存里的key就重了。
-
Update mode:只要更新的 Row 都會被輸出,相當於 Append mode 的加強版。而且是對外存中的相同key進行update,而不是append,需要外存是能kv操作的!只會輸出新增和更新過的window的結果。
從上面能看出來,流式框架對於window的結果數據是存在一個 result table里的!
4. allowedLateness
Flink中借助watermark以及window和trigger來處理基於event time的亂序問題,那么如何處理“late element”呢?
也許還有人會問,out-of-order element與late element有什么區別?不都是一回事么?答案是一回事,都是為了處理亂序問題而產生的概念。要說區別,可以總結如下:
- 通過watermark機制來處理out-of-order的問題,屬於第一層防護,屬於全局性的防護,通常說的亂序問題的解決辦法,就是指這類;
- 通過窗口上的allowedLateness機制來處理out-of-order的問題,屬於第二層防護,屬於特定window operator的防護,late element的問題就是指這類。
默認情況下,當watermark通過end-of-window之后,再有之前的數據到達時,這些數據會被刪除。為了避免有些遲到的數據被刪除,因此產生了allowedLateness的概念。
簡單來講,allowedLateness就是針對event time而言,對於watermark超過end-of-window之后,還允許有一段時間(也是以event time來衡量)來等待之前的數據到達,以便再次處理這些數據。
5. 處理消息過程
- windowoperator接到消息以后,首先存到state,存放的格式為k,v,key的格式是key + window,value是key和window對應的數據。
- 注冊一個timer,timer的數據結構為 [key,window,window邊界 - 1],將timer放到集合中去。
- 當windowoperator收到watermark以后,取出集合中小於watermark的timer,觸發其window。觸發的過程中將state里面對應key及window的數據取出來,這里要經過序列化的過程,發送給windowfunction計算。
- 數據發送給windowfunction,實現windowfunction的window數據計算邏輯。
比如某窗口有三個數據:[key A, window A, 0], [key A, window A, 4999], [key A, window A, 5000]
對於固定窗口,當第一個watermark (Watermark 5000)到達時候,[key A, window A, 0], [key A, window A, 4999] 會被計算,當第二個watermark (Watermark 9999)到達時候,[key A, window A, 5000]會被計算。
6. 累加(再次)計算
watermark是全局性的參數,用於管理消息的亂序,watermark超過window的endtime之后,就會觸發窗口計算。一般情況下,觸發窗口計算之后,窗口就銷毀掉了,后面再來的數據也不會再計算。
因為加入了allowedLateness,所以計算會和之前不同了。window這個allowedLateness屬性,默認為0,如果allowedLateness > 0,那么在某一個特定watermark到來之前,這個觸發過計算的窗口還會繼續保留,這個保留主要是窗口里的消息。
這個特定的watermark是什么呢? watermark-allowedLateness>=窗口endtime。這個特定watermark來了之后,窗口就要消失了,后面再來屬於這個窗口的消息,就丟掉了。在 "watermark(=窗口endtime)" ~ “watermark(=endtime+allowedLateness)" 這段時間之間,對應窗口可能會多次計算。那么要window的endtime+allowedLateness <= watermark的時候,window才會被清掉。
比如window的endtime是5000,allowedLateness=0,那么如果watermark 5000到來之后,這個window就應該被清除。但是如果allowedLateness = 1000,則需要等water 6000(endtime + allowedLateness)到來之后,這個window才會被清掉。
Flink的allowedLateness可用於TumblingEventTimeWindow、SlidingEventTimeWindow以及EventTimeSessionWindows,這可能使得窗口再次被觸發,相當於對前一次窗口的窗口的修正(累加計算或者累加撤回計算);
注意:對於trigger是默認的EventTimeTrigger的情況下,allowedLateness會再次觸發窗口的計算,而之前觸發的數據,會buffer起來,直到watermark超過end-of-window + allowedLateness的時間,窗口的數據及元數據信息才會被刪除。再次計算就是DataFlow模型中的Accumulating的情況。
同時,對於sessionWindow的情況,當late element在allowedLateness范圍之內到達時,可能會引起窗口的merge,這樣之前窗口的數據會在新窗口中累加計算,這就是DataFlow模型中的AccumulatingAndRetracting的情況。
7. Watermark傳播
生產任務的pipeline中通常有多個stage,在源頭產生的watermark會在pipeline的多個stage間傳遞。了解watermark如何在一個pipeline的多個stage間進行傳遞,可以更好的了解watermark對整個pipeline的影響,以及對pipeline結果延時的影響。我們在pipeline的各stage的邊界上對watermark做如下定義:
- 輸入watermark(An input watermark):捕捉上游各階段數據處理進度。對源頭算子,input watermark是個特殊的function,對進入的數據產生watermark。對非源頭算子,input watermark是上游stage中,所有shard/partition/instance產生的最小的watermark
- 輸出watermark(An output watermark):捕捉本stage的數據進度,實質上指本stage中,所有input watermark的最小值,和本stage中所有非late event的數據的event time。比如,該stage中,被緩存起來等待做聚合的數據等。
每個stage內的操作並不是線性遞增的。概念上,每個stage的操作都可以被分為幾個組件(components),每個組件都會影響pipeline的輸出watermark。每個組件的特性與具體的實現方式和包含的算子相關。理論上,這類算子會緩存數據,直到觸發某個計算。比如緩存一部分數據並將其存入狀態(state)中,直到觸發聚合計算,並將計算結果寫入下游stage。
watermark可以是以下項的最小值:
- 每個source的watermark(Per-source watermark) - 每個發送數據的stage.
- 每個外部數據源的watermark(Per-external input watermark) - pipeline之外的數據源
- 每個狀態組件的watermark(Per-state component watermark) - 每種需要寫入的state類型
- 每個輸出buffer的watermark(Per-output buffer watermark) - 每個接收stage
這種精度的watermark能夠更好的描述系統內部狀態。能夠更簡單的跟蹤數據在系統各個buffer中的流轉狀態,有助於排查數據堵塞問題。
watermark以廣播的形式在算子之間傳播,當一個算子收到watermark時都要干些什么事情呢?
- 更新算子時間
- 遍歷計時器隊列觸發回調
- 將watermark發送到下游
0x03. Flink 程序結構 & 核心概念
1. 程序結構
Flink程序像常規的程序一樣對數據集合進行轉換操作,每個程序由下面幾部分組成:
- 獲取一個執行環境
- 加載/創建初始化數據
- 指定對於數據的transformations操作
- 指定計算的輸出結果(打印或者輸出到文件)
- 觸發程序執行
flink流式計算的核心概念,就是將數據從輸入流一個個傳遞給Operator進行鏈式處理,最后交給輸出流的過程。對數據的每一次處理在邏輯上成為一個operator,並且為了本地化處理的效率起見,operator之間也可以串成一個chain一起處理。
下面這張圖表明了flink是如何看待用戶的處理流程的:用戶操作被抽象化為一系列operator。以source開始,以sink結尾,中間的operator做的操作叫做transform,並且可以把幾個操作串在一起執行。
Source ---> Transformation ----> Transformation ----> Sink
以下是一個樣例代碼,后續的分析會基於此代碼。
DataStream<String> text = env.socketTextStream(hostname, port);
DataStream counts = text
.filter(new FilterClass())
.map(new LineSplitter())
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator)
.keyBy(0)
.timeWindow(Time.seconds(10))
.sum(2)
counts.print()
System.out.println(env.getExecutionPlan());
2. 核心類/接口
在用戶設計程序時候,對應如下核心類/接口:
- DataStream:描述的是一個具有相同數據類型的數據流,底層是通過具體的Transformation來實現,其負責提供各種對流上的數據進行操作轉換的API接口。
- Transformation:描述了構建一個DataStream的操作,以及該操作的並行度、輸出數據類型等信息,並有一個屬性,用來持有StreamOperator的一個具體實例;
上述代碼邏輯中,對數據流做了如下操作:filter, map, keyBy, assignTimestampsAndWatermarks, timeWindow, sum。每次轉換都生成了一個新的DataStream。
比如實例代碼中的timeWindow最后生成了windowedStream。windowedStream之上執行的apply方法會生成了WindowOperator,初始化時包含了trigger以及allowedLateness的值。然后經過transform轉換,實際上是執行了DataStream中的transform方法,最后生成了SingleOutputStreamOperator。SingleOutputStreamOperator這個類名字有點誤導,實際上它是DataStream的子類。
public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
KeySelector<T, K> keySel = input.getKeySelector(); //根據keyedStream獲取key
WindowOperator<K, T, Iterable<T>, R, W> operator;
operator = new WindowOperator<>(windowAssigner, ... ,
new InternalIterableWindowFunction<>(function),
trigger,
allowedLateness,
legacyWindowOpType);
return input.transform(opName, resultType, operator);//根據operator name,窗口函數的類型,以及window operator,執行keyedStream.transaform操作
}
0x04. Flink 執行圖模型
Flink 中的執行圖可以分成四層:StreamGraph ---> JobGraph ---> ExecutionGraph -> 物理執行圖。
- StreamGraph:是對用戶邏輯的映射,代表程序的拓撲結構,是根據用戶通過 Stream API 編寫的代碼生成的最初的圖。
- JobGraph:StreamGraph經過優化后生成了 JobGraph,提交給 JobManager 的數據結構。主要的優化為,將多個符合條件的節點 chain 在一起作為一個節點,這樣可以減少數據在節點之間流動所需要的序列化/反序列化/傳輸消耗。
- ExecutionGraph:JobManager 根據 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的並行化版本,是調度層最核心的數據結構。
- 物理執行圖:JobManager 根據 ExecutionGraph 對 Job 進行調度后,在各個TaskManager 上部署 Task 后形成的“圖”,並不是一個具體的數據結構。
我們這里重點看StreamGraph,其相關重點數據結構是:
- StreamNode 是用來描述 operator 的邏輯節點,並具有所有相關的屬性,如並發度、入邊和出邊等。
- StreamEdge 是用來描述兩個 StreamNode(operator) 邏輯的鏈接邊。
我們可以直接打印 Execution Plan
System.out.println(env.getExecutionPlan());
其內部調用 StreamExecutionEnvironment.getExecutionPlan 得到 StreamGraph。
public String getExecutionPlan() {
return getStreamGraph(DEFAULT_JOB_NAME, false).getStreamingPlanAsJSON();
}
StreamGraph的轉換流是:
* Source --> Filter --> Map --> Timestamps/Watermarks --> Window(SumAggregator) --> Sink
下面是我把 示例代碼 打印StreamGraph結果整理出來一個靜態架構。可以看出代碼中的轉換被翻譯成了如下執行Unit(在下面圖中,其執行序列是由上而下)。
* +-----> Data Source(ID = 1) [ Source Socket Stream ]
* | // env.socketTextStream(hostname, port) 方法中生成了一個 Data Source
* |
* +-----> Operator(ID = 2) [ Filter ]
* |
* |
* +-----> Operator(ID = 3) [ Map ]
* |
* |
* +-----> Operator(ID = 4) [ Timestamps/Watermarks ]
* |
* |
* +-----> Operator(ID = 6) [ Window(SumAggregator) ]
* | // 多個Operator被構建成 Operator Chain
* |
* |
* +-----> Data Sink(ID = 7) [ Sink : Print to Std. Out ]
* // counts.print() 是在數據流最后添加了個 Data Sink,用於承接統計結果
示例代碼中,Flink生成StreamGraph的大致處理流程是:
- 首先處理的
Source,生成了Source的StreamNode。 - 處理
Filter,生成了Filter的StreamNode,並生成StreamEdge連接上游Source和Filter。 - 處理
Map,生成了Map的StreamNode,並生成StreamEdge連接上游Filter和Map。 - 處理
assignTimestampsAndWatermarks,生成了Timestamps/Watermarks的StreamNode,並生成StreamEdge連接上游Map和Timestamps/Watermarks。 - 處理
keyBy/timeWindow/sum,生成了Window的StreamNode以及Operator Chain,並生成StreamEdge連接上游Timestamps/Watermarks和Window。 - 最后處理
Sink,創建Sink的StreamNode,並生成StreamEdge與上游Window相連。
0x05. 執行模塊生命周期
這里主要核心類是:
-
Function:用戶通過繼承該接口的不同子類來實現用戶自己的數據處理邏輯。如子類SocketTextStreamFunction實現從指定hostname和port來接收數據,並轉發字符串的邏輯;
-
Task: 是Flink中執行的基本單位,代表一個 TaskManager 中所起的並行子任務,執行封裝的 flink 算子並運行,提供以下服務:消費輸入data、生產 IntermediateResultPartition [ flink關於中間結果的抽象 ]、與 JobManager 交互。
-
StreamTask : 是本地執行的基本單位,由TaskManagers部署執行。包含了多個StreamOperator,封裝了算子的處理邏輯。
-
StreamOperator:DataStream 上的每一個 Transformation 都對應了一個 StreamOperator,StreamOperator是運行時的具體實現,會決定UDF(User-Defined Funtion)的調用方式。
-
StreamSource 是StreamOperator接口的一個具體實現類,其構造函數入參就是SourceFunction的子類,這里就是SocketTextStreamFunction的實例。
Task 是直接受 TaskManager 管理和調度的,而 Task 又會調用 StreamTask(主要是其各種子類),StreamTask 中封裝了算子(StreamOperator)的處理邏輯。StreamSource是用來開啟整個流的算子。我們接下來就說說動態邏輯。
我們的示例代碼中,所有程序邏輯都是運行在StreamTask(主要是其各種子類)中,filter/map對應了StreamOperator;assignTimestampsAndWatermarks用來生成Watermarks,傳遞給下游的.keyBy.timeWindow(WindowOperator)。而keyBy/timeWindow/sum又被構建成OperatorChain。所以我們下面就逐一講解這些概念。
1. Task
Task,它是在線程中執行的Runable對象,每個Task都是由一組Operators Chaining在一起的工作集合,Flink Job的執行過程可看作一張DAG圖,Task是DAG圖上的頂點(Vertex),頂點之間通過數據傳遞方式相互鏈接構成整個Job的Execution Graph。
Task 是直接受 TaskManager 管理和調度的,Flink最后通過RPC方法提交task,實際會調用到TaskExecutor.submitTask方法中。這個方法會創建真正的Task,然后調用task.startTaskThread();開始task的執行。而startTaskThread方法,則會執行executingThread.start,從而調用Task.run方法。
它的最核心的代碼如下:
* public class Task implements Runnable...
* The Task represents one execution of a parallel subtask on a TaskManager.
* A Task wraps a Flink operator (which may be a user function) and runs it
*
* -- doRun()
* |
* +----> 從 NetworkEnvironment 中申請 BufferPool
* | 包括 InputGate 的接收 pool 以及 task 的每個 ResultPartition 的輸出 pool
* +----> invokable = loadAndInstantiateInvokable(userCodeClassLoader,
* | nameOfInvokableClass) 通過反射創建
* | load and instantiate the task's invokable code
* | invokable即為operator對象實例,例如OneInputStreamTask,SourceStreamTask等
* | OneInputStreamTask繼承了StreamTask,這里實際調用的invoke()方法是StreamTask里的
* +----> invokable.invoke()
* | run the invokable,
* |
* |
* OneInputStreamTask<IN,OUT> extends StreamTask<OUT,OneInputStreamOperator<IN, OUT>>
這個nameOfInvokableClass是哪里生成的呢?其實早在生成StreamGraph的時候,這就已經確定了,見StreamGraph.addOperator方法
if (operatorObject instanceof StoppableStreamSource) {
addNode(vertexID, slotSharingGroup, StoppableSourceStreamTask.class, operatorObject, operatorName);
} else if (operatorObject instanceof StreamSource) {
addNode(vertexID, slotSharingGroup, SourceStreamTask.class, operatorObject, operatorName);
} else {
addNode(vertexID, slotSharingGroup, OneInputStreamTask.class, operatorObject, operatorName);
}
這里的OneInputStreamTask.class即為生成的StreamNode的vertexClass。這個值會一直傳遞
StreamGraph --> JobVertex.invokableClass --> ExecutionJobVertex.TaskInformation.invokableClassName --> Task
2. StreamTask
是本地執行的基本單位,由TaskManagers部署執行,Task會調用 StreamTask。StreamTask包含了headOperator 和 operatorChain,封裝了算子的處理邏輯。可以理解為,StreamTask是執行流程框架,OperatorChain(StreamOperator)是負責具體算子邏輯,嵌入到StreamTask的執行流程框架中。
直接從StreamTask的注釋中,能看到StreamTask的生命周期。
其中,每個operator的open()方法都被StreamTask的openAllOperators()方法調用。該方法(指openAllOperators)執行所有的operational的初始化,例如使用定時器服務注冊定時器。單個task可能正在執行多個operator,消耗其前驅的輸出,在這種情況下,該open()方法在最后一個operator中調用,即這個operator的輸出也是task本身的輸出。這樣做使得當第一個operator開始處理任務的輸入時,它的所有下游operator都准備好接收其輸出。
OperatorChain是在StreamTask的invoke方法中被創建的,在執行的時候,如果一個operator無法被chain起來,那它就只有headOperator,chain里就沒有其他operator了。
注意: task中的連續operator是從最后到第一個依次open。
以OneInputStreamTask為例,Task的核心執行代碼即為OneInputStreamTask.invoke方法,它會調用StreamTask.invoke方法。
* The life cycle of the task(StreamTask) is set up as follows:
* {@code
* -- setInitialState -> provides state of all operators in the chain
* |
* +----> 重新初始化task的state,並且在如下兩種情況下尤為重要:
* | 1. 當任務從故障中恢復並從最后一個成功的checkpoint點重新啟動時
* | 2. 從一個保存點恢復時。
* -- invoke()
* |
* +----> Create basic utils (config, etc) and load the chain of operators
* +----> operators.setup() //創建 operatorChain 並設置為 headOperator 的 Output
* --------> openAllOperators()
* +----> task specific init()
* +----> initialize-operator-states()
* +----> open-operators() //執行 operatorChain 中所有 operator 的 open 方法
* +----> run() //runMailboxLoop()方法將一直運行,直到沒有更多的輸入數據
* --------> mailboxProcessor.runMailboxLoop();
* --------> StreamTask.processInput()
* --------> StreamTask.inputProcessor.processInput()
* --------> 間接調用 operator的processElement()和processWatermark()方法
* +----> close-operators() //執行 operatorChain 中所有 operator 的 close 方法
* +----> dispose-operators()
* +----> common cleanup
* +----> task specific cleanup()
* }
3. OneInputStreamTask
OneInputStreamTask是 StreamTask 的實現類之一,具有代表性。我們示例代碼中基本都是由OneInputStreamTask來做具體執行。
看看OneInputStreamTask 是如何生成的?
* 生成StreamNode時候
*
* -- StreamGraph.addOperator()
* |
* +----> addNode(... OneInputStreamTask.class, operatorObject, operatorName);
* | 將 OneInputStreamTask 等 StreamTask 設置到 StreamNode 的節點屬性中
*
*
* 在 JobVertex 的節點構造時也會做一次初始化
* |
* +----> jobVertex.setInvokableClass(streamNode.getJobVertexClass());
后續在 TaskDeploymentDescriptor 實例化的時候會獲取 jobVertex 中的屬性。
再看看OneInputStreamTask 的 init() 和run() 分別都做了什么
* OneInputStreamTask
* class OneInputStreamTask<IN,OUT> extends StreamTask<OUT,OneInputStreamOperator<IN, OUT>> * {@code
* -- init方法
* |
* +----> 獲取算子對應的輸入序列化器 TypeSerializer
* +----> CheckpointedInputGate inputGate = createCheckpointedInputGate();
* 獲取輸入數據 InputGate[],InputGate 是 flink 網絡傳輸的核心抽象之一
* 其在內部封裝了消息的接收和內存的管理,從 InputGate 可以拿到上游傳送過來的數據
* +----> inputProcessor = new StreamOneInputProcessor<>(input,output,operatorChain)
* | 1. StreamInputProcessor,是 StreamTask 內部用來處理 Record 的組件,
* | 里面封裝了外部 IO 邏輯【內存不夠時將 buffer 吐到磁盤上】以及 時間對齊邏輯【Watermark】
* | 2. output 是 StreamTaskNetworkOutput, input是StreamTaskNetworkInput
* | 這樣就把input, output 他倆聚合進StreamOneInputProcessor
* +----> headOperator.getMetricGroup().gauge
* +----> getEnvironment().getMetricGroup().gauge
* 設置一些 metrics 及 累加器
*
*
* -- run方法(就是基類StreamTask.run)
* +----> StreamTask.runMailboxLoop
* | 從 StreamTask.runMailboxLoop 開始,下面是一層層的調用關系
* -----> StreamTask.processInput()
* -----> StreamTask.inputProcessor.processInput()
* -----> StreamOneInputProcessor.processInput
* -----> input.emitNext(output)
* -----> StreamTaskNetworkInput.emitNext()
* | while(true) {從輸入source讀取一個record, output是 StreamTaskNetworkOutput}
* -----> StreamTaskNetworkInput.processElement() //具體處理record
* | 根據StreamElement的不同類型做不同處理
* | if (recordOrMark.isRecord()) output.emitRecord()
* ------------> StreamTaskNetworkOutput.emitRecord()
* ----------------> operator.processElement(record)
* | if (recordOrMark.isWatermark()) statusWatermarkValve.inputWatermark()
* | if (recordOrMark.isLatencyMarker()) output.emitLatencyMarker()
* | if (recordOrMark.isStreamStatus()) statusWatermarkValve.inputStreamStatus()
4. OperatorChain
flink 中的一個 operator 代表一個最頂級的 api 接口,拿 streaming 來說就是,在 DataStream 上做諸如 map/reduce/keyBy 等操作均會生成一個算子。
Operator Chain是指在生成JobGraph階段,將Job中的Operators按照一定策略(例如:single output operator可以chain在一起)鏈接起來並放置在一個Task線程中執行。減少了數據傳遞/線程切換等環節,降低系統開銷的同時增加了資源利用率和Job性能。
chained operators實際上是從下游往上游去反向一個個創建和setup的。假設chained operators為:StreamGroupedReduce - StreamFilter - StreamSink,而實際初始化順序則相反:StreamSink - StreamFilter - StreamGroupedReduce。
* OperatorChain(
* StreamTask<OUT, OP> containingTask,
* RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate)
* {@code
* -- collect
* |
* +----> pushToOperator(StreamRecord<X> record)
* +---------> operator.processElement(castRecord);
* //這里的operator是chainedOperator,即除了headOperator之外,剩余的operators的chain。
* //這個operator.processElement,會循環調用operator chain所有operator,直到chain end。
* //比如 Operator A 對應的 ChainingOutput collect 調用了對應的算子 A 的 processElement 方法,這里又會調用 B 的 ChainingOutput 的 collect 方法,以此類推。這樣便實現了可 chain 算子的本地處理,最終經由網絡輸出 RecordWriterOutput 發送到下游節點。
5. StreamOperator
StreamTask會調用Operator,所以我們需要看看Operator的生命周期。
邏輯算子Transformation最后會對應到物理算子Operator,這個概念對應的就是StreamOperator。
StreamOperator是根接口。對於 Streaming 來說所有的算子都繼承自 StreamOperator。繼承了StreamOperator的擴展接口則有OneInputStreamOperator,TwoInputStreamOperator。實現了StreamOperator的抽象類有AbstractStreamOperator以及它的子類AbstractStreamUdfOperator。
其中operator處理輸入的數據(elements)可以是以下之一:input element,watermark和checkpoint barriers。他們中的每一個都有一個特殊的單元來處理。element由processElement()方法處理,watermark由processWatermark()處理,checkpoint barriers由異步調用的snapshotState()方法處理,此方法會觸發一次checkpoint 。
processElement()方法也是UDF的邏輯被調用的地方,例如MapFunction里的map()方法。
* AbstractUdfStreamOperator, which is the basic class for all operators that execute UDFs.
*
* // initialization phase
* //初始化operator-specific方法,如RuntimeContext和metric collection
* OPERATOR::setup
* UDF::setRuntimeContext
* //setup的調用鏈是invoke(StreamTask) -> constructor(OperatorChain) -> setup
* //調用setup時,StreamTask已經在各個TaskManager節點上
* //給出一個用來初始state的operator
*
* OPERATOR::initializeState
* //執行所有operator-specific的初始化
* OPERATOR::open
* UDF::open
*
* // processing phase (called on every element/watermark)
* OPERATOR::processElement
* UDF::run //給定一個operator可以有一個用戶定義的函數(UDF)
* OPERATOR::processWatermark
*
* // checkpointing phase (called asynchronously on every checkpoint)
* OPERATOR::snapshotState
*
* // termination phase
* OPERATOR::close
* UDF::close
* OPERATOR::dispose
OneInputStreamOperator與TwoInputStreamOperator接口。這兩個接口非常類似,本質上就是處理流上存在的三種元素StreamRecord,Watermark和LatencyMarker。一個用作單流輸入,一個用作雙流輸入。
6. StreamSource
StreamSource是用來開啟整個流的算子(繼承AbstractUdfStreamOperator)。StreamSource因為沒有輸入,所以沒有實現InputStreamOperator的接口。比較特殊的是ChainingStrategy初始化為HEAD。
在StreamSource這個類中,在運行時由SourceStreamTask調用SourceFunction的run方法來啟動source。
* class StreamSource<OUT, SRC extends SourceFunction<OUT>>
* extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT>
*
*
* -- run()
* |
* +----> latencyEmitter = new LatencyMarksEmitter
* | 用來產生延遲監控的LatencyMarker
* +----> this.ctx = StreamSourceContexts.getSourceContext
* | 據時間模式(EventTime/IngestionTime/ProcessingTime)生成相應SourceConext
* | 包含了產生element關聯的timestamp的方法和生成watermark的方法
* +----> userFunction.run(ctx);
* | 調用SourceFunction的run方法來啟動source,進行數據的轉發
*
public {
//讀到數據后,把數據交給collect方法,collect方法負責把數據交到合適的位置(如發布為br變量,或者交給下個operator,或者通過網絡發出去)
private transient SourceFunction.SourceContext<OUT> ctx;
private transient volatile boolean canceledOrStopped = false;
private transient volatile boolean hasSentMaxWatermark = false;
public void run(final Object lockingObject,
final StreamStatusMaintainer streamStatusMaintainer,
final Output<StreamRecord<OUT>> collector,
final OperatorChain<?, ?> operatorChain) throws Exception {
userFunction.run(ctx);
}
}
7. StreamMap
StreamFilter,StreamMap與StreamFlatMap算子在實現的processElement分別調用傳入的FilterFunction,MapFunction, FlatMapFunction的udf將element傳到下游。這里用StreamMap舉例:
public class StreamMap<IN, OUT>
extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
public StreamMap(MapFunction<IN, OUT> mapper) {
super(mapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
output.collect(element.replace(userFunction.map(element.getValue())));
}
}
8. WindowOperator
Flink通過水位線分配器(TimestampsAndPeriodicWatermarksOperator和TimestampsAndPunctuatedWatermarksOperator這兩個算子)向事件流中注入水位線。
我們示例代碼中,timeWindow()最終對應了WindowStream,窗口算子WindowOperator是窗口機制的底層實現。assignTimestampsAndWatermarks 則對應了TimestampsAndPeriodicWatermarksOperator算子,它把產生的Watermark傳遞給了WindowOperator。
元素在streaming dataflow引擎中流動到WindowOperator時,會被分為兩撥,分別是普通事件和水位線。
-
如果是普通的事件,則會調用processElement方法進行處理,在processElement方法中,首先會利用窗口分配器為當前接收到的元素分配窗口,接着會調用觸發器的onElement方法進行逐元素觸發。對於時間相關的觸發器,通常會注冊事件時間或者處理時間定時器,這些定時器會被存儲在WindowOperator的處理時間定時器隊列和水位線定時器隊列中,如果觸發的結果是FIRE,則對窗口進行計算。
-
如果是水位線(事件時間場景),則方法processWatermark將會被調用,它將會處理水位線定時器隊列中的定時器。如果時間戳滿足條件,則利用觸發器的onEventTime方法進行處理。
而對於處理時間的場景,WindowOperator將自身實現為一個基於處理時間的觸發器,以觸發trigger方法來消費處理時間定時器隊列中的定時器滿足條件則會調用窗口觸發器的onProcessingTime,根據觸發結果判斷是否對窗口進行計算。
* public class WindowOperator<K, IN, ACC, OUT, W extends Window>
* extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
* implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W>
*
* -- processElement()
* |
* +----> windowAssigner.assignWindows
* | //通過WindowAssigner為element分配一系列windows
* +----> windowState.add(element.getValue())
* | //把當前的element加入buffer state
* +----> TriggerResult triggerResult = triggerContext.onElement(element)
* | //觸發onElment,得到triggerResult
* +----> Trigger.OnMergeContext.onElement()
* +----> trigger.onElement(element.getValue(), element.getTimestamp(), window,...)
* +----> EventTimeTriggers.onElement()
* | //如果當前window.maxTimestamp已經小於CurrentWatermark,直接觸發
* | //否則將window.maxTimestamp注冊到TimeService中,等待觸發
* +----> contents = windowState.get(); emitWindowContents(actualWindow, contents)
* | //對triggerResult做各種處理,如果fire,真正去計算窗口中的elements
* -- processWatermark()
* -----> 最終進入基類AbstractStreamOperator.processWatermark
* -----> AbstractStreamOperator.processWatermark(watermark)
* -----> timeServiceManager.advanceWatermark(mark); 第一步處理watermark
* -----> output.emitWatermark(mark) 第二步將watermark發送到下游
* -----> InternalTimeServiceManager.advanceWatermark
0x06. 處理 Watermark 的簡要流程
最后是處理 Watermark 的簡要流程(OneInputStreamTask為例)
* -- OneInputStreamTask.invoke()
* |
* +----> StreamTask.init
* | 把StreamTaskNetworkOutput/StreamTaskNetworkInput聚合StreamOneInputProcessor
* +----> StreamTask.runMailboxLoop
* | 從 StreamTask.runMailboxLoop 開始,下面是一層層的調用關系
* -----> StreamTask.processInput()
* -----> StreamTask.inputProcessor.processInput()
* -----> StreamOneInputProcessor.processInput
* -----> input.emitNext(output)
* -----> StreamTaskNetworkInput.emitNext()
* -----> StreamTaskNetworkInput.processElement()
* 下面是處理普通 Record
* -- StreamTaskNetworkInput.processElement()
* |
* | 下面都是一層層的調用關系
* -----> output.emitRecord(recordOrMark.asRecord())
* -----> StreamTaskNetworkOutput.emitRecord()
* -----> operator.processElement(record)
* 進入具體算子 processElement 的處理,比如StreamFlatMap.processElement
* -----> StreamFlatMap.processElement(record)
* -----> userFunction.flatMap()
* -- 下面是處理 Watermark
* -- StreamTaskNetworkInput.processElement()
* |
* | 下面都是一層層的調用關系
* -----> StatusWatermarkValve.inputWatermark()
* -----> StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels()
* -----> output.emitWatermark()
* -----> StreamTaskNetworkOutput.emitWatermark()
* -----> operator.processWatermark(watermark)
* -----> KeyedProcessOperator.processWatermark(watermark)
* 具體算子processWatermark處理,如WindowOperator/KeyedProcessOperator.processWatermark
* 最終進入基類AbstractStreamOperator.processWatermark
* -----> AbstractStreamOperator.processWatermark(watermark)
* -----> timeServiceManager.advanceWatermark(mark); 第一步處理watermark
* output.emitWatermark(mark) 第二步將watermark發送到下游
* -----> InternalTimeServiceManager.advanceWatermark
* -----> 下面看看第一步處理watermark
* -----> InternalTimerServiceImpl.advanceWatermark
* 邏輯timer時間小於watermark的都應該被觸發回調。從eventTimeTimersQueue從小到大取timer,如果小於傳入的water mark,那么說明這個window需要觸發。注意watermarker是沒有key的,所以當一個watermark來的時候是會觸發所有timer,而timer的key是不一定的,所以這里一定要設置keyContext,否則就亂了
* -----> triggerTarget.onEventTime(timer);
* triggerTarget是具體operator對象,open時通過InternalTimeServiceManager.getInternalTimerService傳遞到HeapInternalTimerService
* -----> KeyedProcessOperator.onEeventTime()
* 調用用戶實現的keyedProcessFunction.onTimer去做具體事情。對於window來說也是調用onEventTime或者onProcessTime來從key和window對應的狀態中的數據發送到windowFunction中去計算並發送到下游節點
* -----> invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
* -----> userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
* -- DataStream 設置定時發送Watermark,是加了個chain的TimestampsAndPeriodicWatermarksOperator
* -- StreamTaskNetworkInput.processElement()
* -----> TimestampsAndPeriodicWatermarksOperator.processElement
* 會調用AssignerWithPeriodicWatermarks.extractTimestamp提取event time
* 然后更新StreamRecord的時間
* -----> WindowOperator.processElement
* 在windowAssigner.assignWindows時以element的timestamp作為assign時間
0x07 處理 Watermark 的詳細流程(源碼分析)
下面代碼分析略冗長。
我們再看看樣例代碼
DataStream<String> text = env.socketTextStream(hostname, port);
DataStream counts = text
.filter(new FilterClass())
.map(new LineSplitter())
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator)
.keyBy(0)
.timeWindow(Time.seconds(10))
.sum(2)
counts.print()
System.out.println(env.getExecutionPlan());
1. 程序邏輯 DataStream & Transformation
首先看看邏輯API。
DataStream是數據流概念。A DataStream represents a stream of elements of the same type。
Transformation是一個邏輯API概念。Transformation代表了流的轉換,將一個或多個DataStream轉換為新的DataStream。A Transformation is applied on one or more data streams or data sets and results in one or more output data streams or data sets。
我們認為Transformation就是邏輯算子,而 Transformation 對應的物理概念是Operators。
DataStream類在內部組合了一個 Transformation類,實際的轉換操作均通過該類完成,描述了這個DataStream是怎么來的。
針對示例代碼,"assignTimestampsAndWatermarks","Filter","Map"這幾種,都被轉換為 SingleOutputStreamOperator,繼續由用戶進行邏輯處理。SingleOutputStreamOperator這個類名字有點誤導,實際上它是DataStream的子類。
@Public
public class DataStream<T> {
protected final StreamExecutionEnvironment environment;
protected final Transformation<T> transformation;
//assignTimestampsAndWatermarks這個操作實際上也生成了一個SingleOutputStreamOperator算子
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {
final int inputParallelism = getTransformation().getParallelism();
final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);
TimestampsAndPeriodicWatermarksOperator<T> operator =
new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);
return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
.setParallelism(inputParallelism);
}
//Map是一個OneInputStreamOperator算子。
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {
return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}
@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(
String operatorName,
TypeInformation<R> outTypeInfo,
OneInputStreamOperatorFactory<T, R> operatorFactory) {
return doTransform(operatorName, outTypeInfo, operatorFactory);
}
protected <R> SingleOutputStreamOperator<R> doTransform(
String operatorName,
TypeInformation<R> outTypeInfo,
StreamOperatorFactory<R> operatorFactory) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operatorFactory,
outTypeInfo,
environment.getParallelism());
// SingleOutputStreamOperator 實際上是 DataStream 的子類,名字里面有Operator容易誤導大家。
@SuppressWarnings({"unchecked", "rawtypes"})
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
//就是把Transformation加到運行環境上去。
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
}
針對示例代碼,絕大多數邏輯算子都轉換為OneInputTransformation,每個Transformation里面間接記錄了對應的物理Operator。注冊到Env上。
// OneInputTransformation對應了單輸入的算子
@Internal
public class OneInputTransformation<IN, OUT> extends PhysicalTransformation<OUT> {
private final Transformation<IN> input;
private final StreamOperatorFactory<OUT> operatorFactory; // 這里間接記錄了本Transformation對應的物理Operator。比如StreamMap。
private KeySelector<IN, ?> stateKeySelector;
private TypeInformation<?> stateKeyType;
public OneInputTransformation(
Transformation<IN> input,
String name,
OneInputStreamOperator<IN, OUT> operator, // 比如StreamMap
TypeInformation<OUT> outputType,
int parallelism) {
this(input, name, SimpleOperatorFactory.of(operator), outputType, parallelism);
}
}
回到樣例代碼,DataStream.keyBy會返回一個KeyedStream。KeyedStream. timeWindow會返回一個WindowedStream。同時內部把各種 Transformation 注冊到了 Env 中。
WindowedStream內部對應WindowedOperator。WindowedStream卻不是Stream的子類! 而是把 KeyedStream 包含在內作為一個成員變量。
// 這個居然不是Stream的子類! 而是把 KeyedStream 包含在內作為一個成員變量。
@Public
public class WindowedStream<T, K, W extends Window> {
private final KeyedStream<T, K> input; // 這里包含了DataStream。
private final WindowAssigner<? super T, W> windowAssigner;
private Trigger<? super T, ? super W> trigger;
private Evictor<? super T, ? super W> evictor;
private long allowedLateness = 0L;
// reduce, fold等函數也是類似操作。
private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, Function originalFunction) {
final String opName = generateOperatorName(windowAssigner, trigger, evictor, originalFunction, null);
KeySelector<T, K> keySel = input.getKeySelector();
WindowOperator<K, T, Iterable<T>, R, W> operator;
ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
input.getType().createSerializer(getExecutionEnvironment().getConfig()));
// 這里直接生成了 WindowOperator
operator =
new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
function,
trigger,
allowedLateness,
lateDataOutputTag);
}
return input.transform(opName, resultType, operator);
}
在生成了程序邏輯之后,Env里面就有了 一系列 transformation(每個transformation里面記錄了自己對應的物理 operator,比如StreamMap,WindowOperator),這個是后面生成計算圖的基礎。
當調用env.execute時,通過StreamGraphGenerator.generate遍歷其中的transformation集合構造出StreamGraph。
2. 生成計算圖
我們這里重點介紹StreamGraph以及如何生成,JobGraph,ExecutionGraph只是簡介。
StreamGraph代表程序的拓撲結構,是從用戶代碼直接生成的圖。StreamOperator是具體的物理算子。
一個很重要的點是,把 SourceStreamTask / OneInputStreamTask 添加到StreamNode上,作為 jobVertexClass,這個是真實計算的部分。
StreamOperator是一個接口。StreamOperator 是 數據流操作符的基礎接口,該接口的具體實現子類中,會有保存用戶自定義數據處理邏輯的函數的屬性,負責對userFunction的調用,以及調用時傳入所需參數,比如在StreamSource這個類中,在調用SourceFunction的run方法時,會構建一個SourceContext的具體實例,作為入參,用於run方法中,進行數據的轉發;
StreamOperator
PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Disposable, Serializable {
}
AbstractStreamOperator
AbstractStreamOperator抽象類實現了StreamOperator。在AbstractStreamOperator中有一些重要的成員變量,總體來說可以分為幾類,一類是運行時相關的,一類是狀態相關的,一類是配置相關的,一類是時間相關的,還有一類是監控相關的。
@PublicEvolving
public abstract class AbstractStreamOperator<OUT>
implements StreamOperator<OUT>, SetupableStreamOperator<OUT>, Serializable {
protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
private transient StreamTask<?, ?> container;
protected transient StreamConfig config;
protected transient Output<StreamRecord<OUT>> output;
private transient StreamingRuntimeContext runtimeContext;
public void processWatermark(Watermark mark) throws Exception {
if (timeServiceManager != null) {
timeServiceManager.advanceWatermark(mark); //第一步處理watermark
}
output.emitWatermark(mark);//第二步,將watermark發送到下游
}
}
AbstractUdfStreamOperator
AbstractUdfStreamOperator抽象類繼承了AbstractStreamOperator,對其部分方法做了增強,多了一個成員變量UserFunction。提供了一些通用功能,比如把context賦給算子,保存快照等等。此外還實現了OutputTypeConfigurable接口的setOutputType方法對輸出數據的類型做了設置。
@PublicEvolving
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
extends AbstractStreamOperator<OUT>
implements OutputTypeConfigurable<OUT> {
protected final F userFunction;/** The user function. */
}
KeyedProcessOperator & WindowOperator。
KeyedStream,WindowedStream分別對應KeyedProcessOperator,WindowOperator。
@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {
protected final WindowAssigner<? super IN, W> windowAssigner;
private final KeySelector<IN, K> keySelector;
private final Trigger<? super IN, ? super W> trigger;
private final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;
protected final TypeSerializer<K> keySerializer;
protected final TypeSerializer<W> windowSerializer;
}
@Internal
public class KeyedProcessOperator<K, IN, OUT>
extends AbstractUdfStreamOperator<OUT, KeyedProcessFunction<K, IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
private transient TimestampedCollector<OUT> collector;
private transient ContextImpl context;
private transient OnTimerContextImpl onTimerContext;
@Override
public void open() throws Exception {
super.open();
collector = new TimestampedCollector<>(output);
InternalTimerService<VoidNamespace> internalTimerService =
getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
TimerService timerService = new SimpleTimerService(internalTimerService);
context = new ContextImpl(userFunction, timerService);
onTimerContext = new OnTimerContextImpl(userFunction, timerService);
}
@Override
public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.setAbsoluteTimestamp(timer.getTimestamp());
invokeUserFunction(TimeDomain.EVENT_TIME, timer);
}
@Override
public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.eraseTimestamp();
invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
collector.setTimestamp(element);
context.element = element;
userFunction.processElement(element.getValue(), context, collector);
context.element = null;
}
private void invokeUserFunction(
TimeDomain timeDomain,
InternalTimer<K, VoidNamespace> timer) throws Exception {
onTimerContext.timeDomain = timeDomain;
onTimerContext.timer = timer;
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
onTimerContext.timeDomain = null;
onTimerContext.timer = null;
}
}
OneInputStreamOperator & TwoInputStreamOperator
承接輸入數據並進行處理的算子就是OneInputStreamOperator、TwoInputStreamOperator等。 這兩個接口非常類似,本質上就是處理流上存在的三種元素StreamRecord,Watermark和LatencyMarker。一個用作單流輸入,一個用作雙流輸入。除了StreamSource以外的所有Stream算子都必須實現並且只能實現其中一個接口。
@PublicEvolving
public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> {
void processElement(StreamRecord<IN> element) throws Exception;
void processWatermark(Watermark mark) throws Exception;
void processLatencyMarker(LatencyMarker latencyMarker) throws Exception;
}
StreamMap & StreamFlatMap
map,filter等常用操作都是OneInputStreamOperator。下面給出StreamMap,StreamFlatMap作為具體例子。
// 用StreamMap里做個實際算子的例子@
Internal
public class StreamMap<IN, OUT>
extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L;
public StreamMap(MapFunction<IN, OUT> mapper) {
super(mapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
output.collect(element.replace(userFunction.map(element.getValue())));
}
}
// 用StreamFlatMap里做個實際算子的例子
@Internal
public class StreamFlatMap<IN, OUT>
extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
private transient TimestampedCollector<OUT> collector;
public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
super(flatMapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void open() throws Exception {
super.open();
collector = new TimestampedCollector<>(output);
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
collector.setTimestamp(element);
userFunction.flatMap(element.getValue(), collector);
}
}
生成StreamGraph
程序執行即env.execute("Java WordCount from SocketTextStream Example")這行代碼的時候,就會生成StreamGraph。代表程序的拓撲結構,是從用戶代碼直接生成的圖。
StreamGraph生成函數分析
實際生成StreamGraph的入口是StreamGraphGenerator.generate(env, transformations) 。其中的transformations是一個list,里面記錄的就是我們在transform方法中放進來的算子。最終會調用 transformXXX 來對具體的Transformation進行轉換。
@Internal
public class StreamGraphGenerator {
private final List<Transformation<?>> transformations;
private StreamGraph streamGraph;
public StreamGraph generate() {
//注意,StreamGraph的生成是從sink開始的
streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
for (Transformation<?> transformation: transformations) {
transform(transformation);
}
final StreamGraph builtStreamGraph = streamGraph;
return builtStreamGraph;
}
private Collection<Integer> transform(Transformation<?> transform) {
//這個方法的核心邏輯就是判斷傳入的steamOperator是哪種類型,並執行相應的操作,詳情見下面那一大堆if-else
//這里對操作符的類型進行判斷,並以此調用相應的處理邏輯.簡而言之,處理的核心無非是遞歸的將該節點和節點的上游節點加入圖
Collection<Integer> transformedIds;
if (transform instanceof OneInputTransformation<?, ?>) {
transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
}
.......
}
//因為map,filter等常用操作都是OneInputStreamOperator,我們就來看看StreamGraphGenerator.transformOneInputTransform((OneInputTransformation<?, ?>) transform)方法。
//該函數首先會對該transform的上游transform進行遞歸轉換,確保上游的都已經完成了轉化。然后通過transform構造出StreamNode,最后與上游的transform進行連接,構造出StreamNode。
private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
//就是遞歸處理節點,為當前節點和它的依賴節點建立邊,處理邊之類的,把節點加到圖里。
Collection<Integer> inputIds = transform(transform.getInput());
String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
// 這里添加Operator到streamGraph上。
streamGraph.addOperator(transform.getId(),
slotSharingGroup,
transform.getCoLocationGroupKey(),
transform.getOperatorFactory(),
transform.getInputType(),
transform.getOutputType(),
transform.getName());
if (transform.getStateKeySelector() != null) {
TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig);
streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
}
int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
transform.getParallelism() : executionConfig.getParallelism();
streamGraph.setParallelism(transform.getId(), parallelism);
streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
for (Integer inputId: inputIds) {
streamGraph.addEdge(inputId, transform.getId(), 0);
}
return Collections.singleton(transform.getId());
}
}
streamGraph.addOperator
在之前的生成圖代碼中,有streamGraph.addOperator,我們具體看看實現。
這里重要的是把 SourceStreamTask / OneInputStreamTask 添加到StreamNode上,作為 jobVertexClass。
@Internal
public class StreamGraph implements Pipeline {
public <IN, OUT> void addOperator(
Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
StreamOperatorFactory<OUT> operatorFactory,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
// 這里添加了 OneInputStreamTask/SourceStreamTask,這個是日后真實運行的地方。
if (operatorFactory.isStreamSource()) {
addNode(vertexID, slotSharingGroup, coLocationGroup, SourceStreamTask.class, operatorFactory, operatorName);
} else {
addNode(vertexID, slotSharingGroup, coLocationGroup, OneInputStreamTask.class, operatorFactory, operatorName);
}
}
protected StreamNode addNode(Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
Class<? extends AbstractInvokable> vertexClass, // 這里是OneInputStreamTask...
StreamOperatorFactory<?> operatorFactory,
String operatorName) {
StreamNode vertex = new StreamNode(
vertexID,
slotSharingGroup,
coLocationGroup,
operatorFactory,
operatorName,
new ArrayList<OutputSelector<?>>(),
vertexClass);
streamNodes.put(vertexID, vertex);
return vertex;
}
}
關鍵類StreamNode
@Internal
public class StreamNode implements Serializable {
private transient StreamOperatorFactory<?> operatorFactory;
private List<OutputSelector<?>> outputSelectors;
private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
private final Class<? extends AbstractInvokable> jobVertexClass; // OneInputStreamTask
@VisibleForTesting
public StreamNode(
Integer id,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
StreamOperator<?> operator,
String operatorName,
List<OutputSelector<?>> outputSelector,
Class<? extends AbstractInvokable> jobVertexClass) {
this(id, slotSharingGroup, coLocationGroup, SimpleOperatorFactory.of(operator),
operatorName, outputSelector, jobVertexClass);
}
public Class<? extends AbstractInvokable> getJobVertexClass() {
return jobVertexClass;
}
}
3. Task之間數據交換機制
Flink中的數據交換構建在如下兩條設計原則之上:
- 數據交換的控制流(例如,為實例化交換而進行的消息傳輸)是接收端初始化的,這非常像最初的MapReduce。
- 數據交換的數據流(例如,在網絡上最終傳輸的數據)被抽象成一個叫做IntermediateResult的概念,它是可插拔的。這意味着系統基於相同的實現邏輯可以既支持流數據,又支持批處理數據的傳輸。
數據在task之間傳輸整體過程
- 第一步必然是准備一個ResultPartition;
- 通知JobMaster;
- JobMaster通知下游節點;如果下游節點尚未部署,則部署之;
- 下游節點向上游請求數據
- 開始傳輸數據
數據在task之間具體傳輸
描述了數據從生產者傳輸到消費者的完整生命周期。
數據在task之間傳遞有如下幾步:
- 數據在本operator處理完后,通過Collector收集,這些記錄被傳給RecordWriter對象。每條記錄都要選擇一個下游節點,所以要經過ChannelSelector。一個ChannelSelector選擇一個或者多個序列化器來處理記錄。如果記錄在broadcast中,它們將被傳遞給每一個序列化器。如果記錄是基於hash分區的,ChannelSelector將會計算記錄的hash值,然后選擇合適的序列化器。
- 每個channel都有一個serializer,序列化器將record數據記錄序列化成二進制的表示形式。然后將它們放到大小合適的buffer中(記錄也可以被切割到多個buffer中)。
- 接下來數據被寫入ResultPartition下的各個subPartition (ResultSubpartition - RS,用於為特定的消費者收集buffer數據)里,此時該數據已經存入DirectBuffer(MemorySegment)。既然首個buffer進來了,RS就對消費者變成可訪問的狀態了(注意,這個行為實現了一個streaming shuffle),然后它通知JobManager。
- JobManager查找RS的消費者,然后通知TaskManager一個數據塊已經可以訪問了。通知TM2的消息會被發送到InputChannel,該inputchannel被認為是接收這個buffer的,接着通知RS2可以初始化一個網絡傳輸了。然后,RS2通過TM1的網絡棧請求該buffer,然后雙方基於netty准備進行數據傳輸。網絡連接是在TaskManager(而非特定的task)之間長時間存在的。
- 單獨的線程控制數據的flush速度,一旦觸發flush,則通過Netty的nio通道向對端寫入。
- 對端的netty client接收到數據,decode出來,把數據拷貝到buffer里,然后通知InputChannel
- 一旦buffer被TM2接收,它會穿過一個類似的對象棧,起始於InputChannel(接收端 等價於IRPQ),進入InputGate(它包含多個IC),最終進入一個RecordDeserializer,它用於從buffer中還原成類型化的記錄,然后將其傳遞給接收task。
- 有可用的數據時,下游算子從阻塞醒來。從InputChannel取出buffer,再解序列化成record,交給算子執行用戶代碼。
4. 數據源的邏輯——StreamSource與時間模型
SourceFunction是所有stream source的根接口。
StreamSource抽象了一個數據源,並且指定了一些如何處理數據的模式。StreamSource是用來開啟整個流的算子。SourceFunction定義了兩個接口方法:
run : 啟動一個source,即對接一個外部數據源然后emit元素形成stream(大部分情況下會通過在該方法里運行一個while循環的形式來產生stream)。
cancel : 取消一個source,也即將run中的循環emit元素的行為終止。
@Public
public interface SourceFunction<T> extends Function, Serializable {
void run(SourceContext<T> ctx) throws Exception;
void cancel();
@Public // Interface might be extended in the future with additional methods.
//SourceContex則是用來進行數據發送的接口。
interface SourceContext<T> {
void collect(T element);
@PublicEvolving
void collectWithTimestamp(T element, long timestamp);
@PublicEvolving
void emitWatermark(Watermark mark);
@PublicEvolving
void markAsTemporarilyIdle();
Object getCheckpointLock();
void close();
}
}
public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {
//讀到數據后,把數據交給collect方法,collect方法負責把數據交到合適的位置(如發布為br變量,或者交給下個operator,或者通過網絡發出去)
private transient SourceFunction.SourceContext<OUT> ctx;
private transient volatile boolean canceledOrStopped = false;
private transient volatile boolean hasSentMaxWatermark = false;
public void run(final Object lockingObject,
final StreamStatusMaintainer streamStatusMaintainer,
final Output<StreamRecord<OUT>> collector,
final OperatorChain<?, ?> operatorChain) throws Exception {
userFunction.run(ctx);
}
}
SocketTextStreamFunction
回到實例代碼,env.socketTextStream(hostname, port)就是生成了SocketTextStreamFunction。
run方法的邏輯如上,邏輯很清晰,就是從指定的hostname和port持續不斷的讀取數據,按行分隔符划分成一個個字符串,然后轉發到下游。
cancel方法的實現如下,就是將運行狀態的標識isRunning屬性設置為false,並根據需要關閉當前socket。
@PublicEvolving
public class SocketTextStreamFunction implements SourceFunction<String> {
private final String hostname;
private final int port;
private final String delimiter;
private final long maxNumRetries;
private final long delayBetweenRetries;
private transient Socket currentSocket;
private volatile boolean isRunning = true;
public SocketTextStreamFunction(String hostname, int port, String delimiter, long maxNumRetries) {
this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP);
}
public void run(SourceContext<String> ctx) throws Exception {
final StringBuilder buffer = new StringBuilder();
long attempt = 0;
/** 這里是第一層循環,只要當前處於運行狀態,該循環就不會退出,會一直循環 */
while (isRunning) {
try (Socket socket = new Socket()) {
/** 對指定的hostname和port,建立Socket連接,並構建一個BufferedReader,用來從Socket中讀取數據 */
currentSocket = socket;
LOG.info("Connecting to server socket " + hostname + ':' + port);
socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
char[] cbuf = new char[8192];
int bytesRead;
/** 這里是第二層循環,對運行狀態進行了雙重校驗,同時對從Socket中讀取的字節數進行判斷 */
while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
buffer.append(cbuf, 0, bytesRead);
int delimPos;
/** 這里是第三層循環,就是對從Socket中讀取到的數據,按行分隔符進行分割,並將每行數據作為一個整體字符串向下游轉發 */
while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) {
String record = buffer.substring(0, delimPos);
if (delimiter.equals("\n") && record.endsWith("\r")) {
record = record.substring(0, record.length() - 1);
}
/** 用入參ctx,進行數據的轉發 */
ctx.collect(record);
buffer.delete(0, delimPos + delimiter.length());
}
}
}
/** 如果由於遇到EOF字符,導致從循環中退出,則根據運行狀態,以及設置的最大重試嘗試次數,決定是否進行 sleep and retry,或者直接退出循環 */
if (isRunning) {
attempt++;
if (maxNumRetries == -1 || attempt < maxNumRetries) {
LOG.warn("Lost connection to server socket. Retrying in " + delayBetweenRetries + " msecs...");
Thread.sleep(delayBetweenRetries);
}
else {
break;
}
}
}
/** 在最外層的循環都退出后,最后檢查下緩存中是否還有數據,如果有,則向下游轉發 */
if (buffer.length() > 0) {
ctx.collect(buffer.toString());
}
}
public void cancel() {
isRunning = false;
Socket theSocket = this.currentSocket;
/** 如果當前socket不為null,則進行關閉操作 */
if (theSocket != null) {
IOUtils.closeSocket(theSocket);
}
}
}
5. StreamTask
回到實例代碼,filter,map是在StreamTask中執行,可以看看StreamTask等具體定義。
@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
extends AbstractInvokable
implements AsyncExceptionHandler {
private final StreamTaskActionExecutor actionExecutor;
/**
* The input processor. Initialized in {@link #init()} method.
*/
@Nullable
protected StreamInputProcessor inputProcessor; // 這個是處理關鍵。
/** the head operator that consumes the input streams of this task. */
protected OP headOperator;
/** The chain of operators executed by this task. */
protected OperatorChain<OUT, OP> operatorChain;
/** The configuration of this streaming task. */
protected final StreamConfig configuration;
/** Our state backend. We use this to create checkpoint streams and a keyed state backend. */
protected StateBackend stateBackend;
/** The external storage where checkpoint data is persisted. */
private CheckpointStorageWorkerView checkpointStorage;
/**
* The internal {@link TimerService} used to define the current
* processing time (default = {@code System.currentTimeMillis()}) and
* register timers for tasks to be executed in the future.
*/
protected TimerService timerService;
private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
/** The map of user-defined accumulators of this task. */
private final Map<String, Accumulator<?, ?>> accumulatorMap;
/** The currently active background materialization threads. */
private final CloseableRegistry cancelables = new CloseableRegistry();
private final StreamTaskAsyncExceptionHandler asyncExceptionHandler;
/**
* Flag to mark the task "in operation", in which case check needs to be initialized to true,
* so that early cancel() before invoke() behaves correctly.
*/
private volatile boolean isRunning;
/** Flag to mark this task as canceled. */
private volatile boolean canceled;
private boolean disposedOperators;
/** Thread pool for async snapshot workers. */
private ExecutorService asyncOperationsThreadPool;
private final RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriter;
protected final MailboxProcessor mailboxProcessor;
private Long syncSavepointId = null;
@Override
public final void invoke() throws Exception {
try {
beforeInvoke();
// final check to exit early before starting to run
if (canceled) {
throw new CancelTaskException();
}
// let the task do its work
isRunning = true;
runMailboxLoop(); //MailboxProcessor.runMailboxLoop會調用StreamTask.processInput
// if this left the run() method cleanly despite the fact that this was canceled,
// make sure the "clean shutdown" is not attempted
if (canceled) {
throw new CancelTaskException();
}
afterInvoke();
}
finally {
cleanUpInvoke();
}
}
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
InputStatus status = inputProcessor.processInput(); // 這里會具體從source讀取數據。
if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {
return;
}
if (status == InputStatus.END_OF_INPUT) {
controller.allActionsCompleted();
return;
}
//具體執行操作。
CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);
MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();
jointFuture.thenRun(suspendedDefaultAction::resume);
}
}
前面提到,Task對象在執行過程中,把執行的任務交給了StreamTask這個類去執行。在我們的wordcount例子中,實際初始化的是OneInputStreamTask的對象。那么這個對象是如何執行用戶的代碼的呢?
它做的如下:
首先,初始化 initialize-operator-states()。
然后 open-operators() 方法。
最后調用 StreamTask#runMailboxLoop,便開始處理Source端消費的數據,並流入下游算子處理。
具體來說,就是把任務直接交給了InputProcessor去執行processInput方法。這是一個StreamInputProcessor的實例,該processor的任務就是處理輸入的數據,包括用戶數據、watermark和checkpoint數據等。
具體到OneInputStreamTask,OneInputStreamTask.inputProcessor 是 StreamOneInputProcessor 類型,它把input, output聚合在一起。input是StreamTaskNetworkInput類型。output是StreamTaskNetworkOutput類型。
具體代碼如下
@Internal
public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> {
//這是OneInputStreamTask的init方法,從configs里面獲取StreamOperator信息,生成自己的inputProcessor。
@Override
public void init() throws Exception {
StreamConfig configuration = getConfiguration();
int numberOfInputs = configuration.getNumberOfInputs();
if (numberOfInputs > 0) {
CheckpointedInputGate inputGate = createCheckpointedInputGate();
DataOutput<IN> output = createDataOutput(); // 這里生成了 StreamTaskNetworkOutput
StreamTaskInput<IN> input = createTaskInput(inputGate, output);
inputProcessor = new StreamOneInputProcessor<>( // 這里把input, output通過Processor配置到了一起。
input,
output,
operatorChain);
}
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
}
private StreamTaskInput<IN> createTaskInput(CheckpointedInputGate inputGate, DataOutput<IN> output) {
int numberOfInputChannels = inputGate.getNumberOfInputChannels();
StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(numberOfInputChannels, output);
TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
return new StreamTaskNetworkInput<>(
inputGate,
inSerializer,
getEnvironment().getIOManager(),
statusWatermarkValve,
0);
}
/**
* The network data output implementation used for processing stream elements
* from {@link StreamTaskNetworkInput} in one input processor.
*/
private static class StreamTaskNetworkOutput<IN> extends AbstractDataOutput<IN> {
private final OneInputStreamOperator<IN, ?> operator;
private final WatermarkGauge watermarkGauge;
private final Counter numRecordsIn;
private StreamTaskNetworkOutput(
OneInputStreamOperator<IN, ?> operator, // 這個就是注冊的Operator
StreamStatusMaintainer streamStatusMaintainer,
WatermarkGauge watermarkGauge,
Counter numRecordsIn) {
super(streamStatusMaintainer);
this.operator = checkNotNull(operator);
this.watermarkGauge = checkNotNull(watermarkGauge);
this.numRecordsIn = checkNotNull(numRecordsIn);
}
@Override
public void emitRecord(StreamRecord<IN> record) throws Exception {
numRecordsIn.inc();
operator.setKeyContextElement1(record);
operator.processElement(record);
}
@Override
public void emitWatermark(Watermark watermark) throws Exception {
watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
operator.processWatermark(watermark); // 這里就進入了processWatermark具體處理,比如WindowOperator的
}
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
operator.processLatencyMarker(latencyMarker);
}
}
}
@Internal
public interface StreamInputProcessor extends AvailabilityProvider, Closeable {
InputStatus processInput() throws Exception;
}
@Internal
public final class StreamOneInputProcessor<IN> implements StreamInputProcessor {
@Override
public InputStatus processInput() throws Exception {
InputStatus status = input.emitNext(output); // 這里是開始從輸入source讀取一個record。input, output分別是 StreamTaskNetworkInput,StreamTaskNetworkOutput。
if (status == InputStatus.END_OF_INPUT) {
operatorChain.endHeadOperatorInput(1);
}
return status;
}
}
@Internal
public final class StreamTaskNetworkInput<T> implements StreamTaskInput<T> {
@Override
public InputStatus emitNext(DataOutput<T> output) throws Exception {
while (true) {
// get the stream element from the deserializer
if (currentRecordDeserializer != null) {
DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
if (result.isBufferConsumed()) {
currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
currentRecordDeserializer = null;
}
if (result.isFullRecord()) {
processElement(deserializationDelegate.getInstance(), output); //具體處理record
return InputStatus.MORE_AVAILABLE;
}
}
Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
if (bufferOrEvent.isPresent()) {
processBufferOrEvent(bufferOrEvent.get());
} else {
if (checkpointedInputGate.isFinished()) {
checkState(checkpointedInputGate.getAvailableFuture().isDone(), "Finished BarrierHandler should be available");
if (!checkpointedInputGate.isEmpty()) {
throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
}
return InputStatus.END_OF_INPUT;
}
return InputStatus.NOTHING_AVAILABLE;
}
}
}
// 根據record類型,來處理record還是watermark
private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {
if (recordOrMark.isRecord()){
output.emitRecord(recordOrMark.asRecord()); // 調用 StreamTaskNetworkOutput,最終調用到operator.processElement(record);
} else if (recordOrMark.isWatermark()) {
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel);
} else if (recordOrMark.isLatencyMarker()) {
output.emitLatencyMarker(recordOrMark.asLatencyMarker());
} else if (recordOrMark.isStreamStatus()) {
statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel);
} else {
throw new UnsupportedOperationException("Unknown type of StreamElement");
}
}
}
@PublicEvolving
public abstract class AbstractStreamOperator<OUT>
implements StreamOperator<OUT>, SetupableStreamOperator<OUT>, Serializable {
protected transient InternalTimeServiceManager<?> timeServiceManager;
public void processWatermark(Watermark mark) throws Exception {
if (timeServiceManager != null) {
timeServiceManager.advanceWatermark(mark);
}
output.emitWatermark(mark);
}
}
@Internal
public class InternalTimeServiceManager<K> {
private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;
public void advanceWatermark(Watermark watermark) throws Exception {
for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
service.advanceWatermark(watermark.getTimestamp());
}
}
}
public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N> {
private final ProcessingTimeService processingTimeService;
private final KeyContext keyContext;
private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue;
private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue;
private final KeyGroupRange localKeyGroupRange;
private final int localKeyGroupRangeStartIdx;
public void advanceWatermark(long time) throws Exception {
currentWatermark = time;
InternalTimer<K, N> timer;
while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
eventTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onEventTime(timer);
}
}
}
上面的代碼中,StreamTaskNetworkOutput.emitRecord中的operator.processElement(record);才是真正處理用戶邏輯的代碼。
StatusWatermarkValve就是用來處理watermark的。
@Internal
public class StatusWatermarkValve {
private final DataOutput output;
public void inputWatermark(Watermark watermark, int channelIndex) throws Exception {
// ignore the input watermark if its input channel, or all input channels are idle (i.e. overall the valve is idle).
if (lastOutputStreamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isActive()) {
long watermarkMillis = watermark.getTimestamp();
// if the input watermark's value is less than the last received watermark for its input channel, ignore it also.
if (watermarkMillis > channelStatuses[channelIndex].watermark) {
channelStatuses[channelIndex].watermark = watermarkMillis;
// previously unaligned input channels are now aligned if its watermark has caught up
if (!channelStatuses[channelIndex].isWatermarkAligned && watermarkMillis >= lastOutputWatermark) {
channelStatuses[channelIndex].isWatermarkAligned = true;
}
// now, attempt to find a new min watermark across all aligned channels
findAndOutputNewMinWatermarkAcrossAlignedChannels();
}
}
}
private void findAndOutputNewMinWatermarkAcrossAlignedChannels() throws Exception {
long newMinWatermark = Long.MAX_VALUE;
boolean hasAlignedChannels = false;
// determine new overall watermark by considering only watermark-aligned channels across all channels
for (InputChannelStatus channelStatus : channelStatuses) {
if (channelStatus.isWatermarkAligned) {
hasAlignedChannels = true;
newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
}
}
// we acknowledge and output the new overall watermark if it really is aggregated
// from some remaining aligned channel, and is also larger than the last output watermark
if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
lastOutputWatermark = newMinWatermark;
output.emitWatermark(new Watermark(lastOutputWatermark)); // 這里會最終emit watermark
}
}
}
6. Watermarks的生成
而Watermark的產生是在Apache Flink的Source節點 或 Watermark生成器計算產生(如Apache Flink內置的 Periodic Watermark實現)
There are two ways to assign timestamps and generate Watermarks:
- Directly in the data stream source 自定義數據源設置 Timestamp/Watermark
- Via a TimestampAssigner / WatermarkGenerator 在數據流中設置 Timestamp/Watermark。
自定義數據源設置 Timestamp/Watermark
自定義的數據源類需要繼承並實現 SourceFunction[T] 接口,其中 run 方法是定義數據生產的地方:
//自定義的數據源為自定義類型MyType
class MySource extends SourceFunction[MyType]{
//重寫run方法,定義數據生產的邏輯
override def run(ctx: SourceContext[MyType]): Unit = {
while (/* condition */) {
val next: MyType = getNext()
//設置timestamp從MyType的哪個字段獲取(eventTimestamp)
ctx.collectWithTimestamp(next, next.eventTimestamp)
if (next.hasWatermarkTime) {
//設置watermark從MyType的那個方法獲取(getWatermarkTime)
ctx.emitWatermark(new Watermark(next.getWatermarkTime))
}
}
}
}
在數據流中設置 Timestamp/Watermark
在數據流中,可以設置 stream 的 Timestamp Assigner ,該 Assigner 將會接收一個 stream,並生產一個帶 Timestamp和Watermark 的新 stream。
Flink通過水位線分配器(TimestampsAndPeriodicWatermarksOperator和TimestampsAndPunctuatedWatermarksOperator這兩個算子)向事件流中注入水位線。元素在streaming dataflow引擎中流動到WindowOperator時,會被分為兩撥,分別是普通事件和水位線。
回到實例代碼,assignTimestampsAndWatermarks 就是生成一個TimestampsAndPeriodicWatermarksOperator。
TimestampsAndPeriodicWatermarksOperator的具體處理 Watermark代碼如下。其中processWatermark具體是阻斷上游水位線,這樣下游就只能用自身產生的水位線了。
public class TimestampsAndPeriodicWatermarksOperator<T>
extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>>
implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {
private transient long watermarkInterval;
private transient long currentWatermark;
//可以看到在processElement會調用AssignerWithPeriodicWatermarks.extractTimestamp提取event time, 然后更新StreamRecord的時間。
@Override
public void processElement(StreamRecord<T> element) throws Exception {
final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
output.collect(element.replace(element.getValue(), newTimestamp));
}
@Override
public void onProcessingTime(long timestamp) throws Exception {
// register next timer
Watermark newWatermark = userFunction.getCurrentWatermark(); //定時調用用戶自定義的getCurrentWatermark
if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
currentWatermark = newWatermark.getTimestamp();
// emit watermark
output.emitWatermark(newWatermark);
}
long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}
@Override
public void processWatermark(Watermark mark) throws Exception {
// if we receive a Long.MAX_VALUE watermark we forward it since it is used
// to signal the end of input and to not block watermark progress downstream
if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
currentWatermark = Long.MAX_VALUE;
output.emitWatermark(mark);
}
}
}
7. WindowOperator的實現
最后的 .keyBy(0) .timeWindow(Time.seconds(10)) 是由 WindowOperator處理。
Flink通過水位線分配器(TimestampsAndPeriodicWatermarksOperator和TimestampsAndPunctuatedWatermarksOperator這兩個算子)向事件流中注入水位線。元素在streaming dataflow引擎中流動到WindowOperator時,會被分為兩撥,分別是普通事件和水位線。
如果是普通的事件,則會調用processElement方法進行處理,在processElement方法中,首先會利用窗口分配器為當前接收到的元素分配窗口,接着會調用觸發器的onElement方法進行逐元素觸發。對於時間相關的觸發器,通常會注冊事件時間或者處理時間定時器,這些定時器會被存儲在WindowOperator的處理時間定時器隊列和水位線定時器隊列中,如果觸發的結果是FIRE,則對窗口進行計算。
如果是水位線(事件時間場景),則方法processWatermark將會被調用,它將會處理水位線定時器隊列中的定時器。如果時間戳滿足條件,則利用觸發器的onEventTime方法進行處理。processWatermark 用來處理上游發送過來的watermark,可以認為不做任何處理,下游的watermark只與其上游最近的生成方式相關。
WindowOperator內部有觸發器上下文對象接口的實現——Context,它主要提供了三種類型的方法:
- 提供狀態存儲與訪問;
- 定時器的注冊與刪除;
- 窗口觸發器process系列方法的包裝;
在注冊定時器時,會新建定時器對象並將其加入到定時器隊列中。等到時間相關的處理方法(processWatermark和trigger)被觸發調用,則會從定時器隊列中消費定時器對象並調用窗口觸發器,然后根據觸發結果來判斷是否觸動窗口的計算。
@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {
protected final WindowAssigner<? super IN, W> windowAssigner;
protected transient TimestampedCollector<OUT> timestampedCollector;
protected transient Context triggerContext = new Context(null, null); //觸發器上下文對象
protected transient WindowContext processContext;
protected transient WindowAssigner.WindowAssignerContext windowAssignerContext;
無論是windowOperator還是KeyedProcessOperator都持有InternalTimerService具體實現的對象,通過這個對象用戶可以注冊EventTime及ProcessTime的timer,當watermark 越過這些timer的時候,調用回調函數執行一定的操作。
window operator通過WindowAssigner和Trigger來實現它的邏輯。當一個element到達時,通過KeySelector先assign一個key,並且通過WindowAssigner assign若干個windows(指定element分配到哪個window去),這樣這個element會被放入若干個pane。一個pane會存放所有相同key和相同window的elements。
比如 SlidingEventTimeWindows 的實現。
* public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
Collection<TimeWindow> assignWindows(Object element, long timestamp, ...) {
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
//可以看到這里會assign多個TimeWindow,因為是slide
windows.add(new TimeWindow(start, start + size));
}
return windows;
}
再比如 TumblingProcessingTimeWindows
public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
Collection<TimeWindow> assignWindows(Object element, long timestamp, ...) {
final long now = context.getCurrentProcessingTime();
long start = now - (now % size);
//很簡單,分配一個TimeWindow
return Collections.singletonList(new TimeWindow(start, start + size));
}
processWatermark
首先看看處理Watermark
public void processWatermark(Watermark mark) throws Exception {
//定義一個標識,表示是否仍有定時器滿足觸發條件
boolean fire;
do {
//從水位線定時器隊列中查找隊首的一個定時器,注意此處並不是出隊(注意跟remove方法的區別)
Timer<k, w=""> timer = watermarkTimersQueue.peek();
//如果定時器存在,且其時間戳戳不大於水位線的時間戳
//(注意理解條件是:不大於,水位線用於表示小於該時間戳的元素都已到達,所以所有不大於水位線的觸發時間戳都該被觸發)
if (timer != null && timer.timestamp <= mark.getTimestamp()) {
//置標識為真,表示找到滿足觸發條件的定時器
fire = true;
//將該元素從隊首出隊
watermarkTimers.remove(timer);
watermarkTimersQueue.remove();
//構建新的上下文
context.key = timer.key;
context.window = timer.window;
setKeyContext(timer.key);
//窗口所使用的狀態存儲類型為可追加的狀態存儲
AppendingState<in, acc=""> windowState;
MergingWindowSet<w> mergingWindows = null;
//如果分配器是合並分配器(比如會話窗口)
if (windowAssigner instanceof MergingWindowAssigner) {
//獲得合並窗口幫助類MergingWindowSet的實例
mergingWindows = getMergingWindowSet();
//獲得當前窗口對應的狀態窗口(狀態窗口對應着狀態后端存儲的命名空間)
W stateWindow = mergingWindows.getStateWindow(context.window);
//如果沒有對應的狀態窗口,則跳過本次循環
if (stateWindow == null) {
continue;
}
//獲得當前窗口對應的狀態表示
windowState = getPartitionedState(stateWindow,
windowSerializer, windowStateDescriptor);
} else {
//如果不是合並分配器,則直接獲取窗口對應的狀態表示
windowState = getPartitionedState(context.window,
windowSerializer, windowStateDescriptor);
}
//從窗口狀態表示中獲得窗口中所有的元素
ACC contents = windowState.get();
if (contents == null) {
// if we have no state, there is nothing to do
continue;
}
//通過上下文對象調用窗口觸發器的事件時間處理方法並獲得觸發結果對象
TriggerResult triggerResult = context.onEventTime(timer.timestamp);
//如果觸發的結果是FIRE(觸動窗口計算),則調用fire方法進行窗口計算
if (triggerResult.isFire()) {
fire(context.window, contents);
}
//而如果觸動的結果是清理窗口,或者事件時間等於窗口的清理時間(通常為窗口的maxTimestamp屬性)
if (triggerResult.isPurge() ||
(windowAssigner.isEventTime()
&& isCleanupTime(context.window, timer.timestamp))) {
//清理窗口及元素
cleanup(context.window, windowState, mergingWindows);
}
} else {
//隊列中沒有符合條件的定時器,置標識為否,終止循環
fire = false;
}
} while (fire);
//向下游發射水位線,把waterMark傳遞下去
output.emitWatermark(mark);
//更新currentWaterMark, 將當前算子的水位線屬性用新水位線的時間戳覆蓋
this.currentWatermark = mark.getTimestamp();
}
以上方法雖然冗長但流程還算清晰,其中的fire方法用於對窗口進行計算,它會調用內部窗口函數(即InternalWindowFunction,它包裝了WindowFunction)的apply方法。
processElement
處理element到達的邏輯,將當前的element的value加到對應的window中,觸發onElement
public void processElement(StreamRecord<IN> element) throws Exception {
Collection<W> elementWindows = windowAssigner.assignWindows( //通過WindowAssigner為element分配一系列windows
element.getValue(), element.getTimestamp(), windowAssignerContext);
final K key = (K) getStateBackend().getCurrentKey();
if (windowAssigner instanceof MergingWindowAssigner) { //如果是MergingWindow
//.......
} else { //如果是普通window
for (W window: elementWindows) {
// drop if the window is already late
if (isLate(window)) { //late data的處理,默認是丟棄
continue;
}
AppendingState<IN, ACC> windowState = getPartitionedState( //從backend中取出該window的狀態,就是buffer的element
window, windowSerializer, windowStateDescriptor);
windowState.add(element.getValue()); //把當前的element加入buffer state
context.key = key;
context.window = window; //context的設計相當tricky和晦澀
TriggerResult triggerResult = context.onElement(element); //觸發onElment,得到triggerResult
if (triggerResult.isFire()) { //對triggerResult做各種處理
ACC contents = windowState.get();
if (contents == null) {
continue;
}
fire(window, contents); //如果fire,真正去計算窗口中的elements
}
if (triggerResult.isPurge()) {
cleanup(window, windowState, null); //purge,即去cleanup elements
} else {
registerCleanupTimer(window);
}
}
}
}
判斷是否是late data的邏輯
protected boolean isLate(W window) {
return (windowAssigner.isEventTime() && (cleanupTime(window) <= currentWatermark));
}
而isCleanupTime和cleanup這對方法主要涉及到窗口的清理。如果當前窗口是時間窗口,且窗口的時間到達了清理時間,則會進行清理窗口清理。那么清理時間如何判斷呢?Flink是通過窗口的最大時間戳屬性結合允許延遲的時間聯合計算的
private long cleanupTime(W window) {
//清理時間被預置為窗口的最大時間戳加上允許的延遲事件
long cleanupTime = window.maxTimestamp() + allowedLateness;
//如果窗口為非時間窗口(其maxTimestamp屬性值為Long.MAX_VALUE),則其加上允許延遲的時間,
//會造成Long溢出,從而會變成負數,導致cleanupTime < window.maxTimestamp 條件成立,
//則直接將清理時間設置為Long.MAX_VALUE
return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
}
trigger
這個是用來觸發onProcessingTime,這個需要依賴系統時間的定時器來觸發,邏輯和processWatermark基本等同,只是觸發條件不一樣
@Override
public void trigger(long time) throws Exception {
boolean fire;
//Remove information about the triggering task
processingTimeTimerFutures.remove(time);
processingTimeTimerTimestamps.remove(time, processingTimeTimerTimestamps.count(time));
do {
Timer<K, W> timer = processingTimeTimersQueue.peek();
if (timer != null && timer.timestamp <= time) {
fire = true;
processingTimeTimers.remove(timer);
processingTimeTimersQueue.remove();
context.key = timer.key;
context.window = timer.window;
setKeyContext(timer.key);
AppendingState<IN, ACC> windowState;
MergingWindowSet<W> mergingWindows = null;
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(context.window);
if (stateWindow == null) {
// then the window is already purged and this is a cleanup
// timer set due to allowed lateness that has nothing to clean,
// so it is safe to just ignore
continue;
}
windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
} else {
windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
}
ACC contents = windowState.get();
if (contents == null) {
// if we have no state, there is nothing to do
continue;
}
TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);
if (triggerResult.isFire()) {
fire(context.window, contents);
}
if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {
cleanup(context.window, windowState, mergingWindows);
}
} else {
fire = false;
}
} while (fire);
}
0x08 參考
Flink原理與實現:如何生成ExecutionGraph及物理執行圖
Apache Flink源碼解析 (四)Stream Operator
Apache Flink 進階(六):Flink 作業執行深度解析
Streaming System 第三章:Watermarks
Apache Flink源碼解析之stream-source
Flink源碼系列——Flink中一個簡單的數據處理功能的實現過程
聊聊flink的Execution Plan Visualization
Flink源碼系列——Flink中一個簡單的數據處理功能的實現過程
Flink源碼解讀系列1——分析一個簡單Flink程序的執行過程
Flink timer注冊與watermark觸發[轉載自網易雲音樂實時計算平台經典實踐知乎專欄]
[Flink – process watermark](https://cnblogs.com/fxjwind/p/7657058.html)
Flink流計算編程--Flink中allowedLateness詳細介紹及思考
「Spark-2.2.0」Structured Streaming - Watermarking操作詳解
flink的window計算、watermark、allowedLateness、trigger
Apache Flink源碼解析 (四)Stream Operator
Flink入門教程--Task Lifecycle(任務的生命周期簡介)
