一、Connect
DataStream,DataStream -> ConnectedStream,連接兩個保持他們類型的數據流,兩個數據流被Connect之后,只是被放在了同一個流中,內部依然保持各自的數據和形式
不發生任何變化,兩個流相互獨立。
import org.apache.flink.streaming.api.scala._ object Connect { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment var stream01 = env.generateSequence(1,10) val stream = env.readTextFile("test001.txt") val stream02 = stream.flatMap(item => item.split(" ")).filter(item => item.equals("hadoop")) val streamConnect: ConnectedStreams[Long, String] = stream01.connect(stream02) //兩個流各自處理各自的,互不干擾 val stream03: DataStream[Any] = streamConnect.map(item => item * 2, item => (item,1L)) stream03.print() env.execute("Connect") } }
二、CoMap,CoFlatMap
ConnectedStreams -> DataStream:作用於ConnectedStream上,功能與map和flatMap一樣,對ConnectedStram中的每一個Stream分別進行map和flatMap
三、Split
import org.apache.flink.streaming.api.scala._ object Split { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream: DataStream[String] = env.readTextFile("test001.txt").flatMap(item => item.split(" ")) val streamSplit: SplitStream[String] = stream.split( word => ("hadoop".equals(word) match { case true => List("hadoop") //值等於hadoop的流加入到一個List中 case false => List("other")//值不等於hadoop的流加入到一個List中 }) ) //取出屬於各自部分的流 val value01: DataStream[String] = streamSplit.select("hadoop") val value02: DataStream[String] = streamSplit.select("other") value01.print() value02.print() env.execute("Split Job") } }
四、Union
DataStream -> DataStream:對兩個或者兩個以上的DataStream進行union操作,產生一個包含所有DataStream元素的新的DataStream。
注意:如果你將一個DataStream跟它自己做union操作,在新的DataStream中,你將看到每一個元素都出現兩次。
五、KeyBy(比較重要)
DataStream -> KeyedStream:輸入必須是Tuple類型,邏輯地將一個流拆分成不相交的分區,每個分區包含具有相同key的元素,在內部以hash的形式實現的。
把所有相同key的數據聚合在一起
import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala._ object KeyBy { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream: DataStream[String] = env.readTextFile("test001.txt").flatMap(item => item.split(" ")) //將相同key數據進行聚合 //同一個key的數據都划分到同一個分區中 val streamKeyBy: KeyedStream[(String, Int), Tuple] = stream.map(item => (item,1)).keyBy(0) streamKeyBy.print() env.execute("KeyBy Job") } }
六、Reduce
KeyedStream -> DataStream,一個分組數據流的聚合操作,合並當前的元素和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果,
而不是只返回最后一次聚合的最終結果。
數據流如何在兩個 transformation 組件中傳輸的?
一對一流(=spark窄依賴):(比如source=>map過程)保持元素分區和排序
redistributing流(=spark寬依賴):(map=>keyBy/window 之間,以及keyBy/window與sink之間)改變了流分區。
每一個算子任務根據所選的轉換,向不同的目標子任務發送數據。
比如:keyBy,根據key的hash值重新分區、broadcast、rebalance(類似shuffle過程)。在一次 redistributing交換中,元素間排序,只針對發送方
的partition和接收partition方。最終到sink端的排序是不確定的。