代碼地址:https://gitee.com/xiexiandong/abc_bigdata.git
一、窗口函數
- 在定義了窗口分配器之后,我們需要為每一個窗口明確的指定計算邏輯,這個就是窗口函數要做的事情,當系統決定一個窗口已經准備好執行之后,這個窗口函數將被用 來處理窗口中的每一個元素(可能是分組的)。
1.ReduceFunction
- 含義:ReduceFunction定義了如何把兩個輸入的元素進行合並來生成相同類型的輸出元素的過程, Flink使用ReduceFunction來對窗口中的元素進行增量聚合
2.AggregateFunction
- AggregateFunction是ReduceFunction的 普適版本,它需要指定三個類型:輸入類 型(IN)、累加器類型(ACC)和輸出類型 (OUT)。輸入類型是輸入流中的元素類型, AggregateFunction有一個方法可以將一個輸入元素添加到一個累加器中。該接口還具有創建初始累加器、將兩個累加 器合並到一個累加器以及從累加器中提 取輸出(類 型為OUT)的方法。
3、FoldFunction
- 含義:FoldFunction指定了一個輸入元素如何與一個指定輸出類型的元素合並的過程,這個FoldFunction 會被每一個加入到窗口中的元素和當前的輸出值增量地調用,第一個元 組合(混搭)
4、WindowFunction/AllWindowFunction
- 含義:一個WindowFunction將獲得一個包含了window中的所有元素迭代(Iterable),並且提供靈活性。這些帶來了性能的成本和資源的消耗,因為window中的元素無法 進行增量迭代,而是緩存起來直到window被認為是可以處理時為止
- 可以跟ReduceFunction /AggregateFunction/FoldFunction結合使用
5、ProcessWindowFunction/ProcessAllWindowFunction
- 含義:ProcessWindowFunction獲得一個包含窗口所有元素的可迭代器,以及一個具有時間和狀態信息訪問權的上下文對象,這使得它比其他窗口函數提供更大的靈活 性。這是以性能和資源消耗為代價的,因為元素不能增量地聚合,而是需要在內部緩沖,直到認為窗口可以處理為止。
- WindowFunctionde的升級版,可以跟ReduceFunction / AggregateFunction/FoldFunction結合使用。
二、windows的operator demo
1、Windows的基礎使用
import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object ReduceFunctionOnCountWindow { def main(args: Array[String]): Unit = { import org.apache.flink.api.scala._ //生成配置對象 val config = new Configuration() //開啟spark-webui config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日志文件,否則打印日志到控制台 config.setString("web.log.path", "/tmp/logs/flink_log") //配置taskManager的日志文件,否則打印日志到控制台 config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/tmp/logs/flink_log") //配置tm有多少個slot config.setString("taskmanager.numberOfTaskSlots", "8") // 獲取運行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config) val tuple = List( ("xxd", "class12", "小王", 50), ("xxd", "class12", "小李", 55), ("xxd", "class12", "小二", 55), ("xxd", "class12", "小三", 55), ("xxd", "class11", "小張", 50), ("xxd", "class11", "小孫", 50)) // 定義socket數據源,使用集合生成 val input = env.fromCollection(tuple) //先分組,然后數據按分組進行不同的窗口,當窗口數據量達到兩條時,啟動reduce計算兩條記錄的分組合 val windows: DataStream[(String, String, String, Int)] = input.keyBy(1).countWindow(2).reduce((a, b) => (a._1, a._2, a._3, a._4 + b._4)) windows.print() env.execute() } }
結果:
import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object ReduceFunctionOnCountWindowAll { def main(args: Array[String]): Unit = { import org.apache.flink.api.scala._ //生成配置對象 val config = new Configuration() //開啟spark-webui config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日志文件,否則打印日志到控制台 config.setString("web.log.path", "/tmp/logs/flink_log") //配置taskManager的日志文件,否則打印日志到控制台 config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/tmp/logs/flink_log") //配置tm有多少個slot config.setString("taskmanager.numberOfTaskSlots", "8") // 獲取運行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config) val tuple = List( ("xxd", "class12", "小王", 50), ("xxd", "class11", "小李", 55), ("xxd", "class12", "小張", 50), ("xxd", "class11", "小孫", 45), ("xxd", "class11", "小強", 45)) // 定義socket數據源,使用集合生成 val input = env.fromCollection(tuple) //先分組,然后數據按分組進行不同的窗口,當窗口數據量達到兩條時,啟動reduce計算兩條記錄的分組合 //WindowAll與Windows的區別是一個windows里的數據只能在一個task中進行運行 val windows: DataStream[(String, String, String, Int)] = input.keyBy(1).countWindowAll(2).reduce((a, b) => (a._1 + "\t" + b._1, a._2 + "\t" + b._2, a._3 + "\t" + b._3, a._4 + b._4)) windows.print() env.execute() } }
2、Windows的Aggregate窗口自定義聚合函數
import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object AggFunctionOnCountWindow { def main(args: Array[String]): Unit = { import org.apache.flink.api.scala._ //生成配置對象 val config = new Configuration() //開啟spark-webui config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日志文件,否則打印日志到控制台 config.setString("web.log.path", "/tmp/logs/flink_log") //配置taskManager的日志文件,否則打印日志到控制台 config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/tmp/logs/flink_log") //配置tm有多少個slot config.setString("taskmanager.numberOfTaskSlots", "8") // 獲取運行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config) val tuple = List( ("xxd", "class12", "小王", 50), ("xxd", "class11", "小張", 50), ("xxd", "class12", "小李", 55)) // 定義socket數據源,使用集合生成 val input = env.fromCollection(tuple) //先分組,然后數據按分組進行不同的窗口,當窗口數據量達到兩條時,啟動aggregate計算兩條記錄的分組合 input .keyBy(1) .countWindow(2) .aggregate(new SumAggregate) .print() env.execute() } } class SumAggregate extends AggregateFunction[(String, String, String, Int), (String, Long), (String, Long)] { /** * 創建累加器來保存中間狀態(name和count) */ override def createAccumulator(): (String, Long) = { ("", 0L) } /** * 將元素添加到累加器並返回新的累加器value */ override def add(value: (String, String, String, Int), accumulator: (String, Long)): (String, Long) = { (s"${value._3}\t${accumulator._1}", accumulator._2 + value._4) } /** * 從累加器提取結果 */ override def getResult(accumulator: (String, Long)): (String, Long) = { accumulator } /** * 合並兩個累加器並返回 */ override def merge(a: (String, Long), b: (String, Long)): (String, Long) = { (s"${a._1}\t${b._1}", a._2 + b._2) } }
3、Windows的Process窗口自定義聚合函數
import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.windows.GlobalWindow import org.apache.flink.util.Collector object ProcessWinFunOnCountWindow { def main(args: Array[String]): Unit = { import org.apache.flink.api.scala._ //生成配置對象 val config = new Configuration() //開啟spark-webui config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日志文件,否則打印日志到控制台 config.setString("web.log.path", "/tmp/logs/flink_log") //配置taskManager的日志文件,否則打印日志到控制台 config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/tmp/logs/flink_log") //配置tm有多少個slot config.setString("taskmanager.numberOfTaskSlots", "8") // 獲取運行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config) val tuple = List( ("xxd", "class12", "小王", 50), ("xxd", "class11", "小張", 50), ("xxd", "class12", "小李", 55)) // 定義socket數據源,使用集合生成 val input = env.fromCollection(tuple) //先分組,然后數據按分組進行不同的窗口,當窗口數據量達到兩條時,啟動process計算兩條記錄的平均值 input .keyBy(f => f._2) .countWindow(2) .process(new AvgProcessWindowFunction) .print() env.execute() } } class AvgProcessWindowFunction extends ProcessWindowFunction[(String, String, String, Int), String, String, GlobalWindow] { /** * 分組並計算windows里所有數據的平均值 * * @param key 分組key * @param context windows上下文 * @param elements 分組的value * @param out operator的輸出結果 */ override def process(key: String, context: Context, elements: Iterable[(String, String, String, Int)], out: Collector[String]): Unit = { var sum = 0 var count = 0 for (in <- elements) { sum += in._4 count += 1 } out.collect(s"Window:${context.window} count:${count} avg:${sum / count}"); } }
4、windows join
- cogroup
- 側重於group,是對同一個key上的兩組集合進行操作
- CoGroup的作用和join基本相同,但有一點不一樣的是,如果未能找到新到來的數據與另一個流在window中存在的匹配數據,仍會可將其輸出
- 只能在window中用
import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.CountTrigger import org.apache.flink.util.Collector import scala.collection.mutable.ListBuffer object CoGroupOnSessionWindow { def main(args: Array[String]): Unit = { import org.apache.flink.api.scala._ //生成配置對象 val config = new Configuration() //開啟spark-webui config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日志文件,否則打印日志到控制台 config.setString("web.log.path", "/tmp/logs/flink_log") //配置taskManager的日志文件,否則打印日志到控制台 config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/tmp/logs/flink_log") //配置tm有多少個slot config.setString("taskmanager.numberOfTaskSlots", "8") // 獲取運行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config) // 定義socket數據源1 val input1 = env.socketTextStream("localhost", 6666, '\n') val map1: DataStream[(String, Int)] = input1.flatMap(_.split(" ")).map((_, 1)) // 定義socket數據源2 val input2 = env.socketTextStream("localhost", 7777, '\n') val map2: DataStream[(String, Int)] = input2.flatMap(_.split(" ")).map((_, 1)) /** * 1、創建兩個socket stream。輸入的字符串以空格為界分割成Array[String]。然后再取出其中前兩個元素組成(String, String)類型的tuple。 * 2、join條件為兩個流中的數據((String, String))第一個元素相同。 * 3、為測試方便,這里使用session window。只有兩個元素到來時間前后相差不大於10秒之時才會被匹配。 * Session window的特點為,沒有固定的開始和結束時間,只要兩個元素之間的時間間隔不大於設定值,就會分配到同一個window中,否則后來的元素會進入新的window。 * 4、將window默認的trigger修改為count trigger。這里的含義為每到來一個元素,都會立刻觸發計算。 * 5、由於設置的並行度為8,所以有8個task * 6、所以兩邊相同的key會跑到其中一個task中,這樣才能達到join的目的 * 但是由於使用的是cogroup所以兩邊流跑到一個task中的key無論能不能匹配,都會以執行打印 * 不能匹配的原因可能其中一個流相同的那個key還沒有發送過來 * */ map1.coGroup(map2) .where(_._1) .equalTo(_._1) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))) .trigger(CountTrigger.of(1)) .apply((a, b, o: Collector[String]) => { val list: ListBuffer[String] = ListBuffer[String]("Data in stream1: ") a.foreach(f => list += s"${f._1}<->${f._2}\n") list += "Data in stream2: " b.foreach(f => list += s"${f._1}<->${f._2}\n") o.collect(list.reduce(_ + _)) }).print() env.execute() } }
- join
-
而join是對同一個key上的每對元素進行操作
-
類似inner join
-
按照一定條件分別取出兩個流中匹配的元素,返回給下游處理
-
Join是cogroup 的特例
-
只能在window中用
-
import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.CountTrigger object JoinOnSessionWindow { def main(args: Array[String]): Unit = { import org.apache.flink.api.scala._ //生成配置對象 val config = new Configuration() //開啟spark-webui config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日志文件,否則打印日志到控制台 config.setString("web.log.path", "/tmp/logs/flink_log") //配置taskManager的日志文件,否則打印日志到控制台 config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/tmp/logs/flink_log") //配置tm有多少個slot config.setString("taskmanager.numberOfTaskSlots", "8") // 獲取運行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config) // 定義socket數據源1 val input1 = env.socketTextStream("localhost", 6666, '\n') val map1: DataStream[(String, Int)] = input1.flatMap(_.split(" ")).map((_, 1)) // 定義socket數據源2 val input2 = env.socketTextStream("localhost", 7777, '\n') val map2: DataStream[(String, Int)] = input2.flatMap(_.split(" ")).map((_, 1)) /** * 1、創建兩個個socket stream。輸入的字符串以空格為界分割成Array[String]。然后再取出其中前兩個元素組成(String, String)類型的tuple。 * 2、join條件為兩個流中的數據((String, String))第一個元素相同。 * 3、為測試方便,這里使用session window。只有兩個元素到來時間前后相差不大於10秒之時才會被匹配。 * Session window的特點為,沒有固定的開始和結束時間,只要兩個元素之間的時間間隔不大於設定值,就會分配到同一個window中,否則后來的元素會進入新的window。 * 4、將window默認的trigger修改為count trigger。這里的含義為每到來一個元素,都會立刻觸發計算。 * 5、處理匹配到的兩個數據,例如到來的數據為(1, "xxd")和(1, "xxd"),輸出到下游則為"xxd == xxd" * 6、結論: * a、join只返回匹配到的數據對。若在window中沒有能夠與之匹配的數據,則不會有輸出。 * b、join會輸出window中所有的匹配數據對。 * c、不在window內的數據不會被匹配到。 **/ map1.join(map2) .where(_._1) .equalTo(_._1) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))) .trigger(CountTrigger.of(1)) .apply((a, b) => { s"${a._1} == ${b._1}" }).print() env.execute() } }
- Interval Join
- KeyedStream,KeyedStream → DataStream
- 在給定的時間邊界內(默認包含邊界),相當於一個窗口,按照指定的key對兩個KeyedStream進行join操作,把符合join條件的兩個event拉到一起,然后怎么處理由用 戶你來定義。
- key1 == key2 && e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment} import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector case class MyClass(school: String, classs: String, name: String, time: Long) case class MyResult(name: String, result: Int, time: Long) case class MyJoin(classs: String, name: String, result: Int) object IntervalJoin { def main(args: Array[String]): Unit = { //生成配置對象 val config = new Configuration() //開啟spark-webui config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日志文件,否則打印日志到控制台 config.setString("web.log.path", "/tmp/flink_log") //配置taskManager的日志文件,否則打印日志到控制台 config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/tmp/flink_log") //配置tm有多少個slot config.setString("taskmanager.numberOfTaskSlots", "8") // 獲取運行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config) //設置流數據處理的時間為事件時間 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tuple = List( MyClass("xxd", "class12", "小王", System.currentTimeMillis), MyClass("xxd", "class12", "小李", System.currentTimeMillis), MyClass("xxd", "class11", "小張", System.currentTimeMillis), MyClass("xxd", "class11", "小強", System.currentTimeMillis)) val tuple2 = List( MyResult("小王", 88, System.currentTimeMillis), MyResult("小李", 88, System.currentTimeMillis), MyResult("小張", 88, System.currentTimeMillis), MyResult("小強", 88, System.currentTimeMillis)) val input1: DataStream[MyClass] = env.fromCollection(tuple).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyClass] { override def extractAscendingTimestamp(element: MyClass): Long = { element.time } }) val input2: DataStream[MyResult] = env.fromCollection(tuple2).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyResult] { override def extractAscendingTimestamp(element: MyResult): Long = { element.time } }) val keyedStream: KeyedStream[MyClass, String] = input1.keyBy(_.name) val otherKeyedStream: KeyedStream[MyResult, String] = input2.keyBy(_.name) //e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound // key1 == key2 && leftTs - 20 <= rightTs <= leftTs + 20 keyedStream.intervalJoin(otherKeyedStream) .between(Time.milliseconds(-20), Time.milliseconds(20)) .upperBoundExclusive() .lowerBoundExclusive() .process(new ProcessJoinFunction[MyClass, MyResult, MyJoin]() { override def processElement(left: MyClass, right: MyResult, ctx: ProcessJoinFunction[MyClass, MyResult, MyJoin]#Context, out: Collector[MyJoin]) = { out.collect(MyJoin(left.classs, left.name, right.result)) } }).print() env.execute("IntervalJoin") } }