Apache Flink - 常見數據流類型


DataStream:

  • DataStream 是 Flink 流處理 API 中最核心的數據結構。它代表了一個運行在多個分區上的並行流。一個 DataStream 可以從 StreamExecutionEnvironment 通過env.addSource(SourceFunction) 獲得。
  • DataStream 上的轉換操作都是逐條的,比如 map()flatMap()filter()。DataStream 也可以執行 rebalance(再平衡,用來減輕數據傾斜)和 broadcaseted(廣播)等分區轉換。
  • 如上圖的執行圖所示,DataStream 各個算子會並行運行,算子之間是數據流分區。如 Source 的第一個並行實例(S1)和 flatMap() 的第一個並行實例(m1)之間就是一個數據流分區。而在 flatMap() 和 map() 之間由於加了 rebalance(),它們之間的數據流分區就有3個子分區(m1的數據流向3個map()實例)。

KeyedStream:

  • KeyedStream用來表示根據指定的key進行分組的數據流。
  • KeyedStream可以通過調用DataStream.keyBy()來獲得。而在KeyedStream上進行任何transformation都將轉變回DataStream。在實現中,KeyedStream是把key的信息寫入到了transformation中。每條記錄只能訪問所屬key的狀態,其上的聚合函數可以方便地操作和保存對應key的狀態。

WindowedStream & AllWindowedStream:

  • WindowedStream代表了根據key分組,並且基於WindowAssigner切分窗口的數據流。所以WindowedStream都是從KeyedStream衍生而來的。而在WindowedStream上進行任何transformation也都將轉變回DataStream
    DataStream[MyType] stream = ...
    WindowedDataStream[MyType] windowed = stream
            .keyBy("userId")
            .window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
    DataStream[ResultType] result = windowed.reduce(myReducer)

    Flink 的窗口實現中會將到達的數據緩存在對應的窗口buffer中(一個數據可能會對應多個窗口)。當到達窗口發送的條件時(由Trigger控制),Flink 會對整個窗口中的數據進行處理。Flink 在聚合類窗口有一定的優化,即不會保存窗口中的所有值,而是每到一個元素執行一次聚合函數,最終只保存一份數據即可。

  • 在key分組的流上進行窗口切分是比較常用的場景,也能夠很好地並行化(不同的key上的窗口聚合可以分配到不同的task去處理)。不過當我們需要在普通流上進行窗口操作時,就要用到 AllWindowedStreamAllWindowedStream是直接在DataStream上進行windowAll(...)操作。AllWindowedStream 的實現是基於 WindowedStream 的。Flink 不推薦使用AllWindowedStream,因為在普通流上進行窗口操作,就勢必需要將所有分區的流都匯集到單個的Task中,而這個單個的Task很顯然就會成為整個Job的瓶頸。

JoinedStreams & CoGroupedStreams:

  • co-group 側重的是group,是對同一個key上的兩組集合進行操作,而 join 側重的是pair,是對同一個key上的每對元素進行操作, join 只是 co-group 的一個特例。
  • JoinedStreams 和 CoGroupedStreams 是基於 Window 上實現的,所以 CoGroupedStreams 最終又調用了 WindowedStream 來實現。
    DataStream[MyType] firstInput = ...
    DataStream[AnotherType] secondInput = ...
     
    DataStream[(MyType, AnotherType)] result = firstInput.join(secondInput)
        .where("userId").equalTo("id")
        .window(TumblingEventTimeWindows.of(Time.seconds(3)))
        .apply (new JoinFunction () {...})

    雙流上的數據在同一個key的會被分別分配到同一個window窗口的左右兩個籃子里,當window結束的時候,會對左右籃子進行笛卡爾積從而得到每一對pair,對每一對pair應用 JoinFunction。

ConnectedStreams:

  • 在 DataStream 上有一個 union 的轉換 dataStream.union(otherStream1, otherStream2, ...),用來合並多個流,新的流會包含所有流中的數據。union 有一個限制,就是所有合並的流的類型必須是一致的。
  • union 有一個限制,就是所有合並的流的類型必須是一致的。ConnectedStreams 提供了和 union 類似的功能,用來連接兩個流,但是與 union 轉換有以下幾個區別:
    1. ConnectedStreams 只能連接兩個流,而 union 可以連接多個流。
    2. ConnectedStreams 連接的兩個流類型可以不一致,而 union 連接的流的類型必須一致。
    3. ConnectedStreams 會對兩個流的數據應用不同的處理方法,並且雙流之間可以共享狀態。這在第一個流的輸入會影響第二個流時, 會非常有用。
  • 如下 ConnectedStreams 的樣例,連接 input 和 other 流,並在input流上應用map1方法,在other上應用map2方法,雙流可以共享狀態(比如計數)。
    DataStream[MyType] input = ...
    DataStream[AnotherType] other = ...
     
    ConnectedStreams[MyType, AnotherType] connected = input.connect(other)
     
    DataStream[ResultType] result = 
            connected.map(new CoMapFunction[MyType, AnotherType, ResultType]() {
                override def map1(value: MyType): ResultType = { ... }
                override def map2(value: AnotherType): ResultType = { ... }
            })

    當並行度為2時:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM