Flink之Window的使用(2):時間窗口


相關文章鏈接

Flink之Window的使用(1):計數窗口

Flink之Window的使用(2):時間窗口

Flink之Window的使用(3):WindowFunction的使用

具體實現代碼如下所示:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val sensorStream: DataStream[SensorReading] = env
    .socketTextStream("localhost", 9999)
    .map(new MyMapToSensorReading)

// 1、使用window方法進行開窗設置
// 1.1、滾動窗口
/**
 * 知識點:
 * 1、在該方法中,可以使用 TumblingProcessingTimeWindows 和 TumblingEventTimeWindows 類,分別是創建處理時間窗口 和 事件時間窗口(事件時間窗口需要設置時間特性)
 * 2、滾動窗口中,of方法可以設置2個參數,第一個是窗口的大小,第二個是時間偏移量(不設置時默認使用倫敦時間,當設置為-8時,為使用北京時間),偏移量設置時需要小於窗口大小
 */
val windowStream_1: DataStream[SensorReading] = sensorStream
    .keyBy(_.id)
    //            .window(TumblingProcessingTimeWindows.of(Time.days(5), Time.hours(-8)))   // 偏移量設置時需要小於窗口大小
    //            .window(TumblingEventTimeWindows.of(Time.seconds(5)))                     // 事件時間窗口
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))

// 1.2、滑動窗口
/**
 * 知識點:
 * 1、在該方法中,可以使用 TumblingProcessingTimeWindows 和 TumblingEventTimeWindows 類
 * 2、滑動窗口中,of方法可以設置3個參數,第一個是窗口大小,第二個是滑動步長,第三個是偏移量
 */
val windowStream_2: DataStream[SensorReading] = sensorStream
    .keyBy(_.id)
    //            .window(SlidingProcessingTimeWindows.of(Time.days(7), Time.days(1), Time.hours(-8)))  // 偏移量設置時需要小於窗口大小
    //            .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(5)))                // 事件時間窗口
    .window(SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(5)))
    .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))

// 1.3、會話窗口
val windowStream_3: DataStream[SensorReading] = sensorStream
    .keyBy(_.id)
    //            .window(EventTimeSessionWindows.withGap(Time.minutes(10)))            // 事件時間會話窗口
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))

// 2、使用timeWindow方法進行開窗
// 2.1、滾動窗口
val timeWindowStream_1: DataStream[SensorReading] = sensorStream
    .keyBy(_.id)
    .timeWindow(Time.seconds(5))
    .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
// 2.2、滑動窗口
val timeWindowStream_2: DataStream[SensorReading] = sensorStream
    .keyBy(_.id)
    .timeWindow(Time.seconds(15), Time.seconds(5))
    .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))

windowStream_1.print()

env.execute("TimeWindowDemo")

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM