具體實現代碼如下所示:
main函數中代碼如下:
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 設置生成watermark的時間間隔,系統默認為200毫秒,一般使用系統默認即可 env.getConfig.setAutoWatermarkInterval(5000) val sensorStream: DataStream[SensorReading] = env .readTextFile("D:\\Project\\IDEA\\bigdata-study\\flink-demo\\src\\main\\resources\\source.txt") .map(new MyMapToSensorReading) // 1、引入Watermark(使用已有的類) // 1.1、給一個沒有亂序,時間為升序的流設置一個EventTime val ascendingStream: DataStream[SensorReading] = sensorStream.assignAscendingTimestamps(_.timestamp) // 1.2、當流中存在時間亂序問題,引入watermark,並設置延遲時間 /** * 知識點: * 1、BoundedOutOfOrdernessTimestampExtractor中的泛型為流中數據的類型 * 2、傳入的參數為 watermark 的最大延遲時間(即允許數據遲到的時間) * 3、重寫的extractTimestamp方法返回的是設置數據中EventTime的字段,單位為毫秒,需要將時間轉換成Long(最近時間為13位的長整形)才能返回 * 4、當我們能大約估計到流中的最大亂序時,建議使用此中方式,比較方便 */ val watermarkStream: DataStream[SensorReading] = sensorStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { override def extractTimestamp(element: SensorReading): Long = { element.timestamp * 1000 } }) // 2、使用 TimestampAssigner 引入 Watermark // 2.1、Assigner with periodic watermarks(周期性引入watermark) /** * 知識點: * 1、系統會周期性的將watermark插入到流中,默認周期是200毫秒,可以使用ExecutionConfig.setAutoWatermarkInterval()方法進行設置,單位為毫秒 * 2、產生watermark的邏輯:每隔5秒鍾,Flink會調用AssignerWithPeriodicWatermarks的getCurrentWatermark()方法,如果大於流中最大watermark就插入,小於就不插入 * 3、如下,可以自定義一個周期性的時間戳抽取(需要實現 AssignerWithPeriodicWatermarks 接口) */ env.getConfig.setAutoWatermarkInterval(5000) val periodicWatermarkStream: DataStream[SensorReading] = sensorStream.assignTimestampsAndWatermarks(new MyPeriodicAssigner(10)) env.execute("WatermarkDemo")
自定義類實現ProcessFunction接口:
/** * 自定義一個周期生成watermark的類 * @param bound watermark的延時時間(毫秒) */ class MyPeriodicAssigner(bound: Long) extends AssignerWithPeriodicWatermarks[SensorReading] { // 當前為止的最大時間戳(毫秒) var maxTs: Long = Long.MinValue /** * 獲取當前的watermark(默認200毫秒獲取一次,可以通過 env.getConfig.setAutoWatermarkInterval(5000) 來設置) * @return 當前watermark,當前最大時間戳 - 延時時間 */ override def getCurrentWatermark: Watermark = { new Watermark(maxTs - bound) } /** * 指定eventTime對應的字段(流中每條數據都會調用一次此方法) * @param element 流中的每條數據 * @param previousElementTimestamp 無 * @return 當前流的eventTime(單位:毫秒) */ override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = { // 每條數據都獲取其中的時間戳,跟最大時間戳取大,並重新賦值給最大時間戳 maxTs = maxTs.max(element.timestamp * 1000) element.timestamp * 1000 } }