Flink的WaterMark,及demo实例


        实际生产中,由于各种原因,导致事件创建时间与处理时间不一致,收集的规定对实时推荐有较大的影响。所以一般情况时选取创建时间,然后事先创建flink的时间窗口。但是问题来了,如何保证这个窗口的时间内所有事件都到齐了?这个时候就可以设置水位线(waterMark)。

概念:支持基于时间窗口操作,由于事件的时间来源于源头系统,很多时候由于网络延迟、分布式处理,以及源头系统等各种原因导致源头数据的事件时间可能乱序。这时可以设定一个时间阈值,或者说水位线(waterMark),其作用定义一个最大乱序时间,比如某条日志时间为2019-01-01 08:00:10,如果乱序最大允许时间为10s,那么就认为2019-01-01 08:00:00之前产生的所有事件都到齐了,可以进行计算。

时间窗口:指定一个固定时间间隔的窗口

一、滑动窗口

1、SlidingEventTimeWindows.of(Time.second(4), Time.seconds(3)):表示滑动窗口大小为4秒,滑动步长是3 秒,同时,每3秒才滑动一次;

2、每条数据存活的时间为滑动窗口的大小;

3、如果滑动窗口超过之前的窗口,那么后面来的属于前面窗口的数据会丢失;

4、来了一条数据,边移动边计算滑动窗口的数据(一个窗口停留,计算一次,不移动,不计算 ),直至窗口到达指定位置。

计算某位置时间的公式:   

//n:时间戳;size窗口大小;slide:滑动长度 //根据等差公式推导
an = a1 + (x-1)*s a1 = size - slide -1 x = [n - (size-slide)]/slide     //除数后再乘以slide
s = slide //当来了一条时间戳为n的事件,就认为指定位置时间之前的所有事件都到齐了
指定位置 = (size-slide-1) + [(n-waterMark) - (size-slide)]/slide * slide   

二、翻滚窗口

基于时间窗口,对连续数据进行迭代计算时,不会重叠。翻滚窗口是一个特殊的滑动窗口,当窗口的长度等于滑动的长度时,滑动窗口就是翻滚窗口。

计算某位置时间的公式:

指定位置 = -1 + (n-waterMark)/size * size     //除数后再乘以size,size为窗口大小,n为时间戳

三、会话窗口

时间间隔达到一定时间长度时才进行统计计算。

##

测试代码(需要集群telnet一个producer):

package com.cjs import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows} object WaterMarkTest { /** *想使用WaterMark,需要3个步骤: * 1、对数据进行timestamp提取,即调用assignTimestampsAndWatermarks函数, * 实例化BoundedOutOfOrdernessTimestampExtractor,重写extractTimestamp方法 * 2、设置使用事件时间,因为WaterMark是基于事件时间 * 3、定义时间窗口:翻滚窗口(TumblingEventWindows)、滑动窗口(timeWindow) * 任意一个没有实现,都会报异常:Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'? */ def main(args: Array[String]): Unit = { val senv = StreamExecutionEnvironment.getExecutionEnvironment val streamAdd = senv.socketTextStream("192.168.112.10",9999) val stream = streamAdd.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(0)) {  //WaterMark设置 //对数据流进行处理,获取timestamp,对数据流就够不影响
                    override def extractTimestamp(element: String): Long ={ //定义timestamp怎么从数据中抽取出来
                        val eventTime = element.split(" ")(0).toLong print(s"$eventTime \n") eventTime } }) //提取时间戳之后,该数据流是带有时间的,用于事件窗口
            .map(x=>(x.split(" ")(1),1L)).keyBy(0) //设置使用事件时间,因为WaterMark是基于事件时间
 senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //定义翻滚窗口 // stream.window(TumblingEventTimeWindows.of(Time.seconds(3))).sum(1).print() // stream.sum(1).print() //直接输出,没有用到事件时间窗口,flink默认是累计统计,来一个,统计一个 //定义滑动窗口
        stream.window(SlidingEventTimeWindows.of(Time.seconds(4),Time.seconds(2))).sum(1).print() senv.execute("watermark") } }

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM