Flink中的算子操作


一、Connect

DataStream,DataStream ->  ConnectedStream,連接兩個保持他們類型的數據流,兩個數據流被Connect之后,只是被放在了同一個流中,內部依然保持各自的數據和形式

不發生任何變化,兩個流相互獨立。

import org.apache.flink.streaming.api.scala._

object Connect {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    var stream01 = env.generateSequence(1,10)
    val stream = env.readTextFile("test001.txt")
    val stream02 = stream.flatMap(item => item.split(" ")).filter(item => item.equals("hadoop"))
    val streamConnect: ConnectedStreams[Long, String] = stream01.connect(stream02)
    //兩個流各自處理各自的,互不干擾
    val stream03: DataStream[Any] = streamConnect.map(item => item * 2, item => (item,1L))

    stream03.print()
    env.execute("Connect")
  }
}

二、CoMap,CoFlatMap

ConnectedStreams  ->  DataStream:作用於ConnectedStream上,功能與map和flatMap一樣,對ConnectedStram中的每一個Stream分別進行map和flatMap

 

三、Split 

import org.apache.flink.streaming.api.scala._

object Split {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream: DataStream[String] = env.readTextFile("test001.txt").flatMap(item => item.split(" "))
    val streamSplit: SplitStream[String] = stream.split(
      word =>
        ("hadoop".equals(word) match {
          case true => List("hadoop") //值等於hadoop的流加入到一個List中
          case false => List("other")//值不等於hadoop的流加入到一個List中
        })
    )
    //取出屬於各自部分的流
    val value01: DataStream[String] = streamSplit.select("hadoop")
    val value02: DataStream[String] = streamSplit.select("other")

    value01.print()
    value02.print()

    env.execute("Split Job")

  }

}

四、Union

DataStream -> DataStream:對兩個或者兩個以上的DataStream進行union操作,產生一個包含所有DataStream元素的新的DataStream。

注意:如果你將一個DataStream跟它自己做union操作,在新的DataStream中,你將看到每一個元素都出現兩次。

 

五、KeyBy(比較重要)

DataStream -> KeyedStream:輸入必須是Tuple類型,邏輯地將一個流拆分成不相交的分區,每個分區包含具有相同key的元素,在內部以hash的形式實現的。

把所有相同key的數據聚合在一起

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala._

object KeyBy {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream: DataStream[String] = env.readTextFile("test001.txt").flatMap(item => item.split(" "))
    //將相同key數據進行聚合
    //同一個key的數據都划分到同一個分區中
    val streamKeyBy: KeyedStream[(String, Int), Tuple] = stream.map(item => (item,1)).keyBy(0)
    
    streamKeyBy.print()
    env.execute("KeyBy Job")

  }
}

六、Reduce

KeyedStream -> DataStream,一個分組數據流的聚合操作,合並當前的元素和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果,

而不是只返回最后一次聚合的最終結果。

 

數據流如何在兩個 transformation 組件中傳輸的?

一對一流(=spark窄依賴):(比如source=>map過程)保持元素分區和排序

redistributing流(=spark寬依賴):(map=>keyBy/window 之間,以及keyBy/window與sink之間)改變了流分區。

每一個算子任務根據所選的轉換,向不同的目標子任務發送數據。

比如:keyBy,根據key的hash值重新分區、broadcast、rebalance(類似shuffle過程)。在一次 redistributing交換中,元素間排序,只針對發送方

的partition和接收partition方。最終到sink端的排序是不確定的。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM