Flink學習(十四) Flink 窗口、時間和水位線


Flink 框架中支持事件時間、攝入時間和處理時間三種。而當我們在流式計算環境中數據從 Source 產生,再到轉換和輸出,這個過程由於網絡和反壓的原因會導致消息亂序。因此,需要有一個機制來解決這個問題,這個特別的機制就是“水位線”。

Flink 的窗口和時間
根據窗口數據划分的不同,目前 Flink 支持如下 3 種:

滾動窗口,窗口數據有固定的大小,窗口中的數據不會疊加;

滑動窗口,窗口數據有固定的大小,並且有生成間隔;

會話窗口,窗口數據沒有固定的大小,根據用戶傳入的參數進行划分,窗口數據無疊加。

 

 

 

 

 

 


Flink 中的時間分為三種:

事件時間(Event Time),即事件實際發生的時間;
攝入時間(Ingestion Time),事件進入流處理框架的時間;
處理時間(Processing Time),事件被處理的時間。
下面的圖詳細說明了這三種時間的區別和聯系:

 

 

 

事件時間(Event Time

事件時間(Event Time)指的是數據產生的時間,這個時間一般由數據生產方自身攜帶,比如 Kafka 消息,每個生成的消息中自帶一個時間戳代表每條數據的產生時間。Event Time 從消息的產生就誕生了,不會改變,也是我們使用最頻繁的時間。

利用 Event Time 需要指定如何生成事件時間的“水印”,並且一般和窗口配合使用,具體會在下面的“水印”內容中詳細講解。

我們可以在代碼中指定 Flink 系統使用的時間類型為 EventTime:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //設置時間屬性為 EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props)); stream .keyBy( (event) -> event.getUser() ) .timeWindow(Time.hours(1)) .reduce( (a, b) -> a.add(b) ) .addSink(...);

Flink 注冊 EventTime 是通過 InternalTimerServiceImpl.registerEventTimeTimer 來實現的

可以看到,該方法有兩個入參:namespace 和 time,其中 time 是觸發定時器的時間,namespace 則被構造成為一個 TimerHeapInternalTimer 對象,然后將其放入 KeyGroupedInternalPriorityQueue 隊列中。

那么 Flink 什么時候會使用這些 timer 觸發計算呢?答案在這個方法里:

InternalTimeServiceImpl.advanceWatermark。 public void advanceWatermark(long time) throws Exception { currentWatermark = time; InternalTimer<K, N> timer; while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) { eventTimeTimersQueue.poll(); keyContext.setCurrentKey(timer.getKey()); triggerTarget.onEventTime(timer); } }

這個方法中的 while 循環部分會從 eventTimeTimersQueue 中依次取出觸發時間小於參數 time 的所有定時器,調用 triggerTarget.onEventTime() 方法進行觸發。

這就是 EventTime 從注冊到觸發的流程。

處理時間(Processing Time)
處理時間(Processing Time)指的是數據被 Flink 框架處理時機器的系統時間,Processing Time 是 Flink 的時間系統中最簡單的概念,但是這個時間存在一定的不確定性,比如消息到達處理節點延遲等影響。

我們同樣可以在代碼中指定 Flink 系統使用的時間為 Processing Time:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

同樣,也可以在源碼中找到 Flink 是如何注冊和使用 Processing Time 的。

 

registerProcessingTimeTimer() 方法為我們展示了如何注冊一個 ProcessingTime 定時器:
每當一個新的定時器被加入到 processingTimeTimersQueue 這個優先級隊列中時,如果新來的 Timer 時間戳更小,那么更小的這個 Timer 會被重新注冊 ScheduledThreadPoolExecutor 定時執行器上。

Processing Time 被觸發是在 InternalTimeServiceImpl 的 onProcessingTime() 方法中:

一直循環獲取時間小於入參 time 的所有定時器,並運行 triggerTarget 的 onProcessingTime() 方法。

攝入時間(Ingestion Time)
攝入時間(Ingestion Time)是事件進入 Flink 系統的時間,在 Flink 的 Source 中,每個事件會把當前時間作為時間戳,后續做窗口處理都會基於這個時間。理論上 Ingestion Time 處於 Event Time 和 Processing Time之間。

與事件時間相比,攝入時間無法處理延時和無序的情況,但是不需要明確執行如何生成 watermark。在系統內部,攝入時間采用更類似於事件時間的處理方式進行處理,但是有自動生成的時間戳和自動的 watermark。

可以防止 Flink 內部處理數據是發生亂序的情況,但無法解決數據到達 Flink 之前發生的亂序問題。如果需要處理此類問題,建議使用 EventTime。

Ingestion Time 的時間類型生成相關的代碼在 AutomaticWatermarkContext 中。

 

水位線(WaterMark)
水位線(WaterMark)是 Flink 框架中最晦澀難懂的概念之一,有很大一部分原因是因為翻譯的原因。

WaterMark 在正常的英文翻譯中是水位,但是在 Flink 框架中,翻譯為“水位線”更為合理,它在本質上是一個時間戳。

在上面的時間類型中我們知道,Flink 中的時間:
EventTime 每條數據都攜帶時間戳;

ProcessingTime 數據不攜帶任何時間戳的信息;
IngestionTime 和 EventTime 類似,不同的是 Flink 會使用系統時間作為時間戳綁定到每條數據,可以防止 Flink 內部處理數據是發生亂序的情況,但無法解決數據到達 Flink 之前發生的亂序問題。
所以,我們在處理消息亂序的情況時,會用 EventTime 和 WaterMark 進行配合使用。

首先我們要明確幾個基本問題。

水印的本質是什么
水印的出現是為了解決實時計算中的數據亂序問題,它的本質是 DataStream 中一個帶有時間戳的元素

如果 Flink 系統中出現了一個 WaterMark T,那么就意味着 EventTime < T 的數據都已經到達,窗口的結束時間和 T 相同的那個窗口被觸發進行計算了。

也就是說:水印是 Flink 判斷遲到數據的標准,同時也是窗口觸發的標記。

在程序並行度大於 1 的情況下,會有多個流產生水印和窗口,這時候 Flink 會選取時間戳最小的水印。

 

 

 

水位線是如何生成的

Flink 提供了 assignTimestampsAndWatermarks() 方法來實現水印的提取和指定,該方法接受的入參有 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks 兩種。

整體的類圖如下:

 

 

 

 

 

 

水位線種類


周期性水位線

我們在使用 AssignerWithPeriodicWatermarks 周期生成水印時,周期默認的時間是 200ms,這個時間的指定位置為:

@PublicEvolving public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) { this.timeCharacteristic = Preconditions.checkNotNull(characteristic); if (characteristic == TimeCharacteristic.ProcessingTime) { getConfig().setAutoWatermarkInterval(0); } else { getConfig().setAutoWatermarkInterval(200); } }

是否還記得上面我們在講時間類型時會通過 env.setStreamTimeCharacteristic() 方法指定 Flink 系統的時間類型,這個 setStreamTimeCharacteristic() 方法中會做判斷,如果用戶傳入的是 TimeCharacteristic.eventTime 類型,那么 AutoWatermarkInterval 的值則為 200ms ,如上述代碼所示。當前我們也可以使用 ExecutionConfig.setAutoWatermarkInterval() 方法來指定自動生成的時間間隔。

在上述的類圖中可以看出,我們需要通過 TimestampAssigner 的 extractTimestamp() 方法來提取 EventTime。

Flink 在這里提供了 3 種提取 EventTime() 的方法,分別是:

AscendingTimestampExtractor
BoundedOutOfOrdernessTimestampExtractor
IngestionTimeExtractor
這三種方法中 BoundedOutOfOrdernessTimestampExtractor() 用的最多,需特別注意,在這個方法中的 maxOutOfOrderness 參數,該參數指的是允許數據亂序的時間范圍。簡單說,這種方式允許數據遲到 maxOutOfOrderness 這么長的時間。

    public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
        if (maxOutOfOrderness.toMilliseconds() < 0) {
            throw new RuntimeException("Tried to set the maximum allowed " +
                "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
        }
        this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
        this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
    }

    public abstract long extractTimestamp(T element);

    @Override
    public final Watermark getCurrentWatermark() {
        long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
        if (potentialWM >= lastEmittedWatermark) {
            lastEmittedWatermark = potentialWM;
        }
        return new Watermark(lastEmittedWatermark);
    }

    @Override
    public final long extractTimestamp(T element, long previousElementTimestamp) {
        long timestamp = extractTimestamp(element);
        if (timestamp > currentMaxTimestamp) {
            currentMaxTimestamp = timestamp;
        }
        return timestamp;
    }

PunctuatedWatermark 水位線

這種水位線的生成方式 Flink 沒有提供內置實現,它適用於根據接收到的消息判斷是否需要產生水位線的情況,用這種水印生成的方式並不多見。

舉個簡單的例子,假如我們發現接收到的數據 MyData 中以字符串 watermark 開頭則產生一個水位線:

data.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<UserActionRecord>() {

      @Override
      public Watermark checkAndGetNextWatermark(MyData data, long l) {
        return data.getRecord.startsWith("watermark") ? new Watermark(l) : null;
      }

      @Override
      public long extractTimestamp(MyData data, long l) {
        return data.getTimestamp();
      }
    });
    
class MyData{
    private String record;
    private Long timestamp;
    public String getRecord() {
        return record;
    }
    public void setRecord(String record) {
        this.record = record;
    }
    public Timestamp getTimestamp() {
        return timestamp;
    }
    public void setTimestamp(Timestamp timestamp) {
        this.timestamp = timestamp;
    }
}

案例
我們上面講解了 Flink 關於水位線和時間的生成,以及使用,下面舉一個例子來講解。

模擬一個實時接收 Socket 的 DataStream 程序,代碼中使用 AssignerWithPeriodicWatermarks 來設置水位線,將接收到的數據進行轉換,分組並且在一個10
秒,間隔是5秒的滑動窗口內獲取該窗口中第二個元素最小的那條數據。

package com.wyh.windowsApi import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks} import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time object WindowTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //周期性生成watermark 默認是200毫秒
    env.getConfig.setAutoWatermarkInterval(100L) /** * 從文件中讀取數據 * * */
    //val stream = env.readTextFile("F:\\flink-study\\wyhFlinkSD\\data\\sensor.txt")
 val stream = env.socketTextStream("localhost", 7777) //Transform操作
    val dataStream: DataStream[SensorReading] = stream.map(data => { val dataArray = data.split(",") SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble) }) //===到來的數據是升序的,准時發車,用assignAscendingTimestamps //指定哪個字段是時間戳 需要的是毫秒 * 1000 // .assignAscendingTimestamps(_.timestamp * 1000) //===處理亂序數據 // .assignTimestampsAndWatermarks(new MyAssignerPeriodic()) //==底層也是周期性生成的一個方法 處理亂序數據 延遲1秒種生成水位 同時分配水位和時間戳 括號里傳的是等待延遲的時間
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { override def extractTimestamp(t: SensorReading): Long = { t.timestamp * 1000 } }) //統計10秒內的最小溫度
    val minTemPerWindowStream = dataStream .map(data => (data.id, data.temperature)) .keyBy(0) // .timeWindow(Time.seconds(10)) //開時間窗口 滾動窗口 沒有數據的窗口不會觸發 //左閉右開 包含開始 不包含結束 延遲1秒觸發的那個時間的數據不包含 //可以直接調用底層方法,第三個參數傳offset代表時區 //.window(SlidingEventTimeWindows.of(Time.seconds(15),Time.seconds(5),Time.hours(-8)))
      .timeWindow(Time.seconds(15), Time.seconds(5)) //滑動窗口,每隔5秒輸出一次
      .reduce((data1, data2) => (data1._1, data1._2.min(data2._2))) //用reduce做增量聚合
 minTemPerWindowStream.print("min temp") dataStream.print("input data") env.execute("window Test") } } //設置水位線(水印) 這里有兩種方式實現 //一種是周期性生成 一種是以數據的某種特性進行生成水位線(水印) /** * 周期性生成watermark 默認200毫秒 */
class MyAssignerPeriodic() extends AssignerWithPeriodicWatermarks[SensorReading] { val bound: Long = 60 * 1000 var maxTs: Long = Long.MaxValue override def getCurrentWatermark: Watermark = { //定義一個規則進行生成
    new Watermark(maxTs - bound) } //用什么抽取這個時間戳
  override def extractTimestamp(t: SensorReading, l: Long): Long = { //保存當前最大的時間戳
    maxTs = maxTs.max(t.timestamp) t.timestamp * 1000 } } /** * 亂序生成watermark * 每來一條數據就生成一個watermark */
class MyAssignerPunctuated() extends AssignerWithPunctuatedWatermarks[SensorReading] { override def checkAndGetNextWatermark(t: SensorReading, l: Long): Watermark = { new Watermark(l) } override def extractTimestamp(t: SensorReading, l: Long): Long = { t.timestamp * 1000 } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM