相關文章鏈接
Flink之Window的使用(3):WindowFunction的使用
具體實現代碼如下所示:
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val sensorStream: DataStream[SensorReading] = env .socketTextStream("localhost", 9999) .map(new MyMapToSensorReading) // 1、使用window方法進行開窗設置 // 1.1、滾動窗口 /** * 知識點: * 1、在該方法中,可以使用 TumblingProcessingTimeWindows 和 TumblingEventTimeWindows 類,分別是創建處理時間窗口 和 事件時間窗口(事件時間窗口需要設置時間特性) * 2、滾動窗口中,of方法可以設置2個參數,第一個是窗口的大小,第二個是時間偏移量(不設置時默認使用倫敦時間,當設置為-8時,為使用北京時間),偏移量設置時需要小於窗口大小 */ val windowStream_1: DataStream[SensorReading] = sensorStream .keyBy(_.id) // .window(TumblingProcessingTimeWindows.of(Time.days(5), Time.hours(-8))) // 偏移量設置時需要小於窗口大小 // .window(TumblingEventTimeWindows.of(Time.seconds(5))) // 事件時間窗口 .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature)) // 1.2、滑動窗口 /** * 知識點: * 1、在該方法中,可以使用 TumblingProcessingTimeWindows 和 TumblingEventTimeWindows 類 * 2、滑動窗口中,of方法可以設置3個參數,第一個是窗口大小,第二個是滑動步長,第三個是偏移量 */ val windowStream_2: DataStream[SensorReading] = sensorStream .keyBy(_.id) // .window(SlidingProcessingTimeWindows.of(Time.days(7), Time.days(1), Time.hours(-8))) // 偏移量設置時需要小於窗口大小 // .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(5))) // 事件時間窗口 .window(SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(5))) .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature)) // 1.3、會話窗口 val windowStream_3: DataStream[SensorReading] = sensorStream .keyBy(_.id) // .window(EventTimeSessionWindows.withGap(Time.minutes(10))) // 事件時間會話窗口 .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature)) // 2、使用timeWindow方法進行開窗 // 2.1、滾動窗口 val timeWindowStream_1: DataStream[SensorReading] = sensorStream .keyBy(_.id) .timeWindow(Time.seconds(5)) .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature)) // 2.2、滑動窗口 val timeWindowStream_2: DataStream[SensorReading] = sensorStream .keyBy(_.id) .timeWindow(Time.seconds(15), Time.seconds(5)) .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature)) windowStream_1.print() env.execute("TimeWindowDemo")