Flink之Window的使用(3):WindowFunction的使用


相關文章鏈接

Flink之Window的使用(1):計數窗口

Flink之Window的使用(2):時間窗口

Flink之Window的使用(3):WindowFunction的使用

具體實現代碼如下所示:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val sensorStream: WindowedStream[SensorReading, String, TimeWindow] = env
    .socketTextStream("localhost", 9999)
    .map(new MyMapToSensorReading)
    .keyBy(_.id)
    .timeWindow(Time.seconds(5))

// 1、incremental aggregation functions(增量聚合函數)(來一條數據,計算一次)
// 1.1、ReduceFunction 增量集合函數(使用匿名內部類)
val reduceResult: DataStream[SensorReading] = sensorStream.reduce(new ReduceFunction[SensorReading] {
    override def reduce(value1: SensorReading, value2: SensorReading): SensorReading = {
        SensorReading(value2.id, value2.timestamp, value2.temperature + value2.temperature)
    }
})
// 1.2、AggregateFunction(相比reduce,優勢是可以指定累加值類型,輸入類型和輸出類型也可以不一樣)
val aggregateResult: DataStream[Long] = sensorStream.aggregate(new AggregateFunction[SensorReading, Long, Long] {
    // 初始化累加值
    override def createAccumulator(): Long = 0L

    // 累加方法
    override def add(value: SensorReading, accumulator: Long): Long = accumulator + 1

    // 獲取結果
    override def getResult(accumulator: Long): Long = accumulator

    // 分區的歸並操作
    override def merge(a: Long, b: Long): Long = a + b
})

// 2、full window functions(全窗口函數)
/**
 * 知識點:
 *  1、apply方法中,可以添加WindowFunction對象,會將該窗口中所有的數據先緩存,當時間到了一次性計算
 *  2、需要設置4個類型,分別是:輸入類型,輸出類型,keyBy時key的類型(如果用字符串來划分key類型為Tuple,窗口類型
 *  3、所有的計算都在apply中進行,可以通過window獲取窗口的信息,比如開始時間,結束時間
 */
val applyResult: DataStream[(Long, Int)] = sensorStream.apply(new WindowFunction[SensorReading, (Long, Int), String, TimeWindow] {
    override def apply(key: String, window: TimeWindow, input: Iterable[SensorReading], out: Collector[(Long, Int)]): Unit = {
        out.collect((window.getStart, input.size))
    }
})

// 3、窗口函數中其他API
val otherResult: DataStream[SensorReading] = sensorStream
    .allowedLateness(Time.seconds(1))                       // 允許處理遲到的數據
    .sideOutputLateData(new OutputTag[SensorReading]("late"))    // 將遲到的數據放入側輸出流
    .reduce((x, y) => SensorReading(y.id, y.timestamp, x.temperature + y.temperature))
// 獲取側輸出流(側輸出流為遲到很久的數據,當allowedLateness和watermark之后還是沒到的數據會放入側輸出流,可以在最后統一處理)
val sideOutputStream: DataStream[SensorReading] = otherResult.getSideOutput(new OutputTag[SensorReading]("late"))


// 打印輸出
applyResult.print()

env.execute("WindowFunctionDemo")

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM