Flink執行時之流處理程序生成流圖


流處理程序生成流圖

DataStream API所編寫的流處理應用程序在生成作業圖(JobGraph)並提交給JobManager之前,會預先生成流圖(StreamGraph)。

什么是流圖

流圖(StreamGraph)是表示流處理程序拓撲的數據結構。它封裝了生成作業圖(JobGraph)的必要信息。它的類繼承關系例如以下圖所看到的:

StreamGraph-class-diagram

當你基於StreamGraph的繼承鏈向上追溯,會發現它實現了FlinkPlan接口。

Flink效仿了傳統的關系型數據庫在運行SQL時生成運行計划並對其進行優化的思路。

FlinkPlan是Flink生成運行計划的基接口。定義在Flink優化器模塊中。流處理程序相應的計划是StreamingPlan,可是當前針對流處理程序沒有進行優化,因此這個類可看作是一個預留設計。

一個簡單的實現“word count”的流處理程序,其StreamGraph的形象化表演示樣例如以下圖:

Flink-StreamGraph

Flink官方提供了一個計划可視化器來圖形化運行計划,該計划可視化器基於Flink API所生成的計划的JSON格式表示繪制圖形。可是須要注意的是,計划的JSON形式表示缺失了非常多屬性以及部分節點(比方虛擬節點等);

上面的圖是由“節點”和“邊”組成的。節點在Flink中相應的數據結構是StreamNode,而邊相應的數據結構是StreamEdge,StreamNode和StreamEdge之間有着雙向的依賴關系。StreamEdge包括了其連接的源節點sourceVertex和目的節點targetVertex:

StreamEdge-structure

而StreamNode中包括了與其連接的入邊集合inEdges和出邊集合outEdges:

StreamNode-structure

StreamEdge和StreamNode都有唯一的編號進行標識,可是各自編號的生成規則並不同樣。

StreamNode的編號id的生成是通過調用StreamTransformation的靜態方法getNewNodeId獲得的,事實上現是一個靜態計數器:

protected static Integer idCounter = 0;
public static int getNewNodeId() {   
    idCounter++;   
    return idCounter;
}

StreamEdge的編號edgeId是字符串類型,其生成的規則為:

this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + selectedNames 
                + "_" + outputPartitioner;

它是由多個段連接起來的,語義的文字表述例如以下:

源頂點_目的頂點_輸入類型數量_輸出選擇器的名稱_輸出分區器

edgeId除了用來實現StreamEdge的hashCode及equals方法之外並沒有其它實際意義。

StreamNode是表示流處理中算子的數據結構。source和sink在StreamGraph中也是以StreamNode表示,它們也是一種算子,僅僅是由於它們是流的輸入和輸出因而有特定的稱呼。

StreamNode除了存儲了輸入端和輸出端的StreamEdge集合,還封裝了算子的其它關鍵屬性,比方其並行度、分區的鍵信息、輸入與輸出類型的序列化器等。

從直觀上來看你已經知道了StreamNode和StreamEdge是StreamGraph的重要組成部分,可是為了生成JobGraph,StreamGraph非常顯然必須得包括很多其它的內容。

總結一下,StreamGraph中包括的屬性可分為三大類:

  • 流處理程序的運行配置。
  • 流處理程序拓撲中包括的節點和邊的信息。
  • 迭代相關的信息;

當然環繞這些屬性的方法非常多。比方增加邊和節點,創建迭代的source/sink等。

當中的一個關鍵方法getJobGraph將用於生成JobGraph:

public JobGraph getJobGraph() {     
    if (isIterative() && checkpointConfig.isCheckpointingEnabled() 
        && !checkpointConfig.isForceCheckpointing()) {      
        throw new UnsupportedOperationException(            
            "Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. "                  
            + "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. "                  
            + "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");   
    }   
    StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this);   
    return jobgraphGenerator.createJobGraph();
}

從上面的代碼段也可見。當流處理程序中包括迭代邏輯時。檢查點功能臨時不被支持,在異常信息中Flink闡述了緣由:在迭代作業中無法保證“恰好一次”的語義。

流處理程序依賴StreamingJobGraphGenerator來生成JobGraph,至於怎樣生成。興許會進行剖析。

生成流圖的源代碼分析

了解了什么是流圖(StreamGraph)之后。我們來分析它是怎樣生成的。流圖的生成是通過StreamExecutionEnvironment的getStreamGraph實例方法觸發的:

public StreamGraph getStreamGraph() {   
    if (transformations.size() <= 0) {      
        throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");   
    }   
    return StreamGraphGenerator.generate(this, transformations);
}

從代碼段中可見,StreamGraph的生成依賴於一個名為transformations的集合對象,它是環境對象所收集到的全部的轉換對象的集合,該集合中存儲着一個流處理程序中全部的轉換操作相應的StreamTransformation對象。

每當在DataStream對象上調用transform方法或者調用已經被實現了的一些內置的轉換函數(如map、filter等。這些轉換函數在內部也調用了transform方法)。這些調用都會使得其相應的轉換對象被增加到transformations集合中去。StreamTransformation表示創建DataStream對象的轉換,流處理程序中存在多種DataStream,每種底層都相應着一個StreamTransformation。DataStream持有運行環境對象的引用。當調用transform方法時,它會調用運行環境對象的addOperator方法,將特定的StreamTransformation對象增加到transformations集合中去。這就是transformations集合中元素的來源。

DataStream API的設計存在着多重對象的封裝,我們以flatMap轉換操作為例圖示各種對象之間的構建關系:

StreamTransformation-relationship-with-others

在Flink的源代碼中,這些對象的命名也並非那么准確。比方上圖中的SingleOutputStreamOperator事實上是一種DataStream,但卻以Operator結尾,讓人匪夷所思。因此較為准確的鑒定它們類型的方式是通過查看它們的繼承鏈來進行識別。

StreamGraph的生成依賴於生成器StreamGraphGenerator,每調用一次靜態方法generate才會在內部創建一個StreamGraphGenerator的實例。一個實例相應着一個StreamGraph對象。StreamGraphGenerator調用內部的實例方法generateInternal來遍歷transformations集合的每一個對象:

private StreamGraph generateInternal(List<StreamTransformation<?

>> transformations) { for (StreamTransformation<?> transformation: transformations) { transform(transformation); } return streamGraph; }

在transform方法中,它枚舉了Flink中每一種轉換類型,並對當前傳入的轉換類型進行推斷。然后將其分發給特定的轉換方法進行轉換。終於返回當前StreamGraph對象中跟該轉換有關的節點編號集合。

這里我們以經常使用的單輸入轉換方法transformOnInputTransform為例來進行分析:

private <IN, OUT> Collection<Integer> transformOnInputTransform(
    OneInputTransformation<IN, OUT> transform) {
    //遞歸地對該轉換的輸入端進行轉換 
    Collection<Integer> inputIds = transform(transform.getInput());   
    // 遞歸調用可能會產生反復,這里須要以轉換過的對象進行檢查 
    if (alreadyTransformed.containsKey(transform)) {      
        return alreadyTransformed.get(transform);   
    }

    //結合輸入端相應的節點編號來推斷並得出槽共享組的名稱 
    String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);   
    //將當前算子(節點)增加到流圖中
    streamGraph.addOperator(transform.getId(),         
        slotSharingGroup,         
        transform.getOperator(),         
        transform.getInputType(),         
        transform.getOutputType(),         
        transform.getName());
    //假設有鍵選擇器。則進行設置 
    if (transform.getStateKeySelector() != null) {      
        TypeSerializer<?> keySerializer = 
            transform.getStateKeyType().createSerializer(env.getConfig());      
        streamGraph.setOneInputStateKey(transform.getId(), 
            transform.getStateKeySelector(), keySerializer);   
    }   
    streamGraph.setParallelism(transform.getId(), transform.getParallelism()); 
    //構建從當前轉換相應的節點到輸入轉換相應的節點之間的邊 
    for (Integer inputId: inputIds) {      
        streamGraph.addEdge(inputId, transform.getId(), 0);   
    }   
    //返回當前轉換相應的節點編號
    return Collections.singleton(transform.getId());
}

每遍歷完一個轉換對象,就離構建完整的流圖更近一步。不同的轉換操作類型,它們為流圖提供的“部件”並不全然同樣。有的轉換僅僅構建節點(如SourceTransformation)。有的轉換除了構建節點還構建邊(如SinkTransformation),有的僅僅構建虛擬節點(如PartitionTransformation、SelectTransformation等)。

關於虛擬節點。這里須要說明的是並非全部轉換操作都具有實際的物理意義(即物理上相應詳細的算子)。有些轉換操作僅僅是邏輯概念(比如select。split,partition。union),它們不會構建真實的StreamNode對象。比方某個流處理應用相應的轉換樹例如以下圖:

StreamTransformation-demo

但在運行時,其生成的StreamGraph卻是以下這樣的形式:

StreamGraph-demo

從圖中能夠看到,轉換樹中相應的一些邏輯操作在StreamGraph中並不存在,Flink將這些邏輯轉換操作轉換成了虛擬節點。它們的信息會被綁定到從source到map轉換的邊上。

Flink當前對於流處理的程序是不作優化的。所以StreamGraph就是它的運行計划。

你能夠通過Flink提供的運行計划的可視化器將StreamGraph所表述的信息以圖形化的方式展示出來。就像上文我們展示的那幅圖一樣。那么我們怎樣查看我們自己所編寫的程序的運行計划呢?事實上非常easy,我們以Flink源代碼中flink-examples-streaming模塊中的SocketTextStreamWordCount為例。來看一下怎樣生成運行計划。

我們將SocketTextStreamWordCount最后一行代碼凝視掉:

env.execute("WordCount from SocketTextStream Example");

然后將其替換成以下這句:

System.out.println(env.getExecutionPlan());

這行語句的作用是打印當前這個程序的運行計划,它將在控制台產生該運行計划的JSON格式表示:

{"nodes":[{"id":1,"type":"Source: Socket Stream","pact":"Data Source","contents":"Source: Socket Stream", "parallelism":1},{"id":2,"type":"Flat Map","pact":"Operator","contents":"Flat Map","parallelism":2, "predecessors":[{"id":1,"ship_strategy":"REBALANCE","side":"second"}]},{"id":4,"type":"Keyed Aggregation", "pact":"Operator","contents":"Keyed Aggregation","parallelism":2,"predecessors":[{"id":2, "ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink: Unnamed","pact":"Data Sink", "contents":"Sink: Unnamed","parallelism":2,"predecessors":[{"id":4,"ship_strategy":"FORWARD", "side":"second"}]}]}

把上面這段JSON字符串拷貝到Flink的運行計划可視化器的輸入框中。然后點擊下方的“Draw”button,就可以生成。


微信掃碼關注公眾號:Apache_Flink

apache_flink_weichat


QQ掃碼關注QQ群:Apache Flink學習交流群(123414680)

qrcode_for_apache_flink_qq_group


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM