Operator Chains(操作鏈)
-
Flink出於分布式執行的目的,將operator的subtask鏈接在一起形成task(類似spark中的管道)。
-
每個task在一個線程中執行。
-
將operators鏈接成task是非常有效的優化:它可以減少線程與線程間的切換和數據緩沖的開銷,並在降低延遲的同時提高整體吞吐量。
-
鏈接的行為可以在編程API中進行指定,詳情請見代碼OperatorChainTest。
-
開啟操作鏈 和 禁用操作鏈的對比圖(默認開啟):
-
Flink默認會將多個operator進行串聯,形成任務鏈(task chain)
-
注意: task chain 可以理解為就是 operator chain 只是不同場景下,稱呼不同。
-
我們也可以禁用任務鏈,讓每個operator形成一個task。
-
StreamExecutionEnvironment.disableOperatorChaining() 這個方法會禁用整條工作鏈
-
操作鏈其實就是類似spark的pipeline管道模式,一個task可以執行同一個窄依賴中的算子操作。
-
我們也可以細粒度的控制工作鏈的形成,比如調用dataStreamSource.map(...).startNewChain(),但不能使用dataStreamSource.startNewChain()
-
dataStreamSource.filter(...).map(...).startNewChain().map(...),需要注意的是,當這樣寫時相當於source和filter組成一條鏈,兩個map組成一條鏈。
-
即在filter和map之間斷開,各自形成單獨的鏈。
-
代碼:
package com.ronnie.flink.stream.test; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * 開啟與禁用工作鏈時,輸出的結果不一樣。 * 當開啟工作鏈時(默認啟動),operator map1與map2 組成一個task. * 此時task運行時,對於hello,flink 這兩條數據是: * 先打印 hello ---- 1 , hello->1 ---- 2 * 后打印 flink ---- 1 , flink->1 ---- 2 * 當禁用工作鏈時,operator map1與map2 分別在兩個task中執行 * 此時task運行時,對於hello,flink 這兩條數據是: * 先打印 hello ---- 1 , flink ---- 1 * 后打印 hello->1 ---- 2 , flink->1 ---- 2 * * 注:操作鏈類似spark的管道,一個task執行多個的算子. */ public class OperatorChainTest { public static final String[] WORDS = new String[] { "hello", "flink", "spark", "hbase" }; public static void main(String[] args) { // 設置執行環境, 類似spark中初始化sparkContext一樣 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 關閉操作鏈.. env.disableOperatorChaining(); DataStreamSource<String> dataStreamSource = env.fromElements(WORDS); SingleOutputStreamOperator<String> pairStream = dataStreamSource.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { System.err.println(value + " ---- 1"); return value + "->1"; } }).map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { System.err.println(value + " ---- 2"); return value + "->2"; } }); // 還可以控制更細粒度的任務鏈,比如指明從哪個operator開始形成一條新的鏈 // someStream.map(...).startNewChain(),但不能使用someStream.startNewChain()。 try { env.execute(); } catch (Exception e) { e.printStackTrace(); } } }
Task slots(任務槽)
- TaskManager 是一個 JVM 進程,並會以獨立的線程來執行一個task或多個subtask。
- 為了控制一個 TaskManager 能接受多少個 task,Flink 提出了 Task Slot 的概念。
- Flink 中的計算資源通過 Task Slot 來定義。每個 task slot 代表了 TaskManager 的一個固定大小的資源子集。
- 例如,一個擁有3個slot的 TaskManager,會將其管理的內存平均分成三分分給各個 slot。
- 將資源 slot 化意味着來自不同job的task不會為了內存而競爭,而是每個task都擁有一定數量的內存儲備。
- 需要注意的是,這里不會涉及到CPU的隔離,slot目前僅僅用來隔離task的內存。
- 通過調整 task slot 的數量,用戶可以定義task之間是如何相互隔離的。
- 每個 TaskManager 有一個slot,也就意味着每個task運行在獨立的 JVM 中。
- 每個 TaskManager 有多個slot的話,也就是說多個task運行在同一個JVM中。
- 而在同一個JVM進程中的task,可以共享TCP連接(基於多路復用)和心跳消息,可以減少數據的網絡傳輸。
- 也能共享一些數據結構,一定程度上減少了每個task的消耗。
- 如圖中所示,5個Task可能會在TaskManager的slots中分布,圖中共2個TaskManager,每個有3個slot。