Flink之Watermark的設置和使用


具體實現代碼如下所示:

main函數中代碼如下:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 設置生成watermark的時間間隔,系統默認為200毫秒,一般使用系統默認即可
env.getConfig.setAutoWatermarkInterval(5000)

val sensorStream: DataStream[SensorReading] = env
    .readTextFile("D:\\Project\\IDEA\\bigdata-study\\flink-demo\\src\\main\\resources\\source.txt")
    .map(new MyMapToSensorReading)

// 1、引入Watermark(使用已有的類)
// 1.1、給一個沒有亂序,時間為升序的流設置一個EventTime
val ascendingStream: DataStream[SensorReading] = sensorStream.assignAscendingTimestamps(_.timestamp)
// 1.2、當流中存在時間亂序問題,引入watermark,並設置延遲時間
/**
 * 知識點:
 * 1、BoundedOutOfOrdernessTimestampExtractor中的泛型為流中數據的類型
 * 2、傳入的參數為 watermark 的最大延遲時間(即允許數據遲到的時間)
 * 3、重寫的extractTimestamp方法返回的是設置數據中EventTime的字段,單位為毫秒,需要將時間轉換成Long(最近時間為13位的長整形)才能返回
 * 4、當我們能大約估計到流中的最大亂序時,建議使用此中方式,比較方便
 */
val watermarkStream: DataStream[SensorReading] = sensorStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
    override def extractTimestamp(element: SensorReading): Long = {
        element.timestamp * 1000
    }
})

// 2、使用 TimestampAssigner 引入 Watermark
// 2.1、Assigner with periodic watermarks(周期性引入watermark)
/**
 * 知識點:
 * 1、系統會周期性的將watermark插入到流中,默認周期是200毫秒,可以使用ExecutionConfig.setAutoWatermarkInterval()方法進行設置,單位為毫秒
 * 2、產生watermark的邏輯:每隔5秒鍾,Flink會調用AssignerWithPeriodicWatermarks的getCurrentWatermark()方法,如果大於流中最大watermark就插入,小於就不插入
 * 3、如下,可以自定義一個周期性的時間戳抽取(需要實現 AssignerWithPeriodicWatermarks 接口)
 */
env.getConfig.setAutoWatermarkInterval(5000)
val periodicWatermarkStream: DataStream[SensorReading] = sensorStream.assignTimestampsAndWatermarks(new MyPeriodicAssigner(10))

env.execute("WatermarkDemo")

自定義類實現ProcessFunction接口:

/**
 * 自定義一個周期生成watermark的類
 * @param bound watermark的延時時間(毫秒)
 */
class MyPeriodicAssigner(bound: Long) extends AssignerWithPeriodicWatermarks[SensorReading] {

    // 當前為止的最大時間戳(毫秒)
    var maxTs: Long = Long.MinValue

    /**
     * 獲取當前的watermark(默認200毫秒獲取一次,可以通過 env.getConfig.setAutoWatermarkInterval(5000) 來設置)
     * @return 當前watermark,當前最大時間戳 - 延時時間
     */
    override def getCurrentWatermark: Watermark = {
        new Watermark(maxTs - bound)
    }

    /**
     * 指定eventTime對應的字段(流中每條數據都會調用一次此方法)
     * @param element 流中的每條數據
     * @param previousElementTimestamp 無
     * @return 當前流的eventTime(單位:毫秒)
     */
    override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {
        // 每條數據都獲取其中的時間戳,跟最大時間戳取大,並重新賦值給最大時間戳
        maxTs = maxTs.max(element.timestamp * 1000)
        element.timestamp * 1000
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM