Flink 操作鏈與任務槽


Operator Chains(操作鏈)

  • Flink出於分布式執行的目的,將operator的subtask鏈接在一起形成task(類似spark中的管道)。

  • 每個task在一個線程中執行。

  • 將operators鏈接成task是非常有效的優化:它可以減少線程與線程間的切換和數據緩沖的開銷,並在降低延遲的同時提高整體吞吐量。

  • 鏈接的行為可以在編程API中進行指定,詳情請見代碼OperatorChainTest。

  • 開啟操作鏈 和 禁用操作鏈的對比圖(默認開啟):

    image-20191113202723946

    image-20191113202731844

  • Flink默認會將多個operator進行串聯,形成任務鏈(task chain)

  • 注意: task chain 可以理解為就是 operator chain 只是不同場景下,稱呼不同。

  • 我們也可以禁用任務鏈,讓每個operator形成一個task。

  • StreamExecutionEnvironment.disableOperatorChaining() 這個方法會禁用整條工作鏈

  • 操作鏈其實就是類似spark的pipeline管道模式,一個task可以執行同一個窄依賴中的算子操作。

  • 我們也可以細粒度的控制工作鏈的形成,比如調用dataStreamSource.map(...).startNewChain(),但不能使用dataStreamSource.startNewChain()

  • dataStreamSource.filter(...).map(...).startNewChain().map(...),需要注意的是,當這樣寫時相當於source和filter組成一條鏈,兩個map組成一條鏈。

  • 即在filter和map之間斷開,各自形成單獨的鏈。

  • 代碼:

    package com.ronnie.flink.stream.test;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    /**
     *  開啟與禁用工作鏈時,輸出的結果不一樣。
     *  當開啟工作鏈時(默認啟動),operator map1與map2 組成一個task.
     *     此時task運行時,對於hello,flink 這兩條數據是:
     *     先打印 hello ---- 1 , hello->1 ---- 2
     *     后打印 flink ---- 1 , flink->1 ---- 2
     *  當禁用工作鏈時,operator map1與map2 分別在兩個task中執行
     *     此時task運行時,對於hello,flink 這兩條數據是:
     *     先打印 hello ---- 1 , flink ---- 1
     *     后打印 hello->1 ---- 2  , flink->1 ---- 2
     *
     *  注:操作鏈類似spark的管道,一個task執行多個的算子.
     */
    public class OperatorChainTest {
    
        public static final String[] WORDS = new String[] {
                "hello",
                "flink",
                "spark",
                "hbase"
        };
    
        public static void main(String[] args) {
            // 設置執行環境, 類似spark中初始化sparkContext一樣
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.setParallelism(1);
    
            // 關閉操作鏈..
            env.disableOperatorChaining();
    
            DataStreamSource<String> dataStreamSource = env.fromElements(WORDS);
    
            SingleOutputStreamOperator<String> pairStream = dataStreamSource.map(new MapFunction<String, String>() {
                @Override
                public String map(String value) throws Exception {
                    System.err.println(value + " ---- 1");
                    return value + "->1";
                }
            }).map(new MapFunction<String, String>() {
                @Override
                public String map(String value) throws Exception {
                    System.err.println(value + " ---- 2");
                    return value + "->2";
                }
            });
    
            // 還可以控制更細粒度的任務鏈,比如指明從哪個operator開始形成一條新的鏈
            // someStream.map(...).startNewChain(),但不能使用someStream.startNewChain()。
            try {
                env.execute();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    

Task slots(任務槽)

image-20191113203045376

  • TaskManager 是一個 JVM 進程,並會以獨立的線程來執行一個task或多個subtask。
  • 為了控制一個 TaskManager 能接受多少個 task,Flink 提出了 Task Slot 的概念。
  • Flink 中的計算資源通過 Task Slot 來定義。每個 task slot 代表了 TaskManager 的一個固定大小的資源子集。
  • 例如,一個擁有3個slot的 TaskManager,會將其管理的內存平均分成三分分給各個 slot。
  • 將資源 slot 化意味着來自不同job的task不會為了內存而競爭,而是每個task都擁有一定數量的內存儲備。
  • 需要注意的是,這里不會涉及到CPU的隔離,slot目前僅僅用來隔離task的內存。
  • 通過調整 task slot 的數量,用戶可以定義task之間是如何相互隔離的。
  • 每個 TaskManager 有一個slot,也就意味着每個task運行在獨立的 JVM 中。
  • 每個 TaskManager 有多個slot的話,也就是說多個task運行在同一個JVM中。
  • 而在同一個JVM進程中的task,可以共享TCP連接(基於多路復用)和心跳消息,可以減少數據的網絡傳輸。
  • 也能共享一些數據結構,一定程度上減少了每個task的消耗。
  • 如圖中所示,5個Task可能會在TaskManager的slots中分布,圖中共2個TaskManager,每個有3個slot。

image-20191113203453845


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM