Flink基礎(十):DS簡介(10) 基於時間和窗口的操作符(二)


1 窗口操作符

窗口操作是流處理程序中很常見的操作。窗口操作允許我們在無限流上的一段有界區間上面做聚合之類的操作。而我們使用基於時間的邏輯來定義區間。窗口操作符提供了一種將數據放進一個桶,並根據桶中的數據做計算的方法。例如,我們可以將事件放進5分鍾的滾動窗口中,然后計數。

無限流轉化成有限數據的方法:使用窗口。

1.1 定義窗口操作符

Window算子可以在keyed stream或者nokeyed stream上面使用。

創建一個Window算子,需要指定兩個部分:

  1. window assigner定義了流的元素如何分配到window中。window assigner將會產生一條WindowedStream(或者AllWindowedStream,如果是nonkeyed DataStream的話)

  2. window function用來處理WindowedStream(AllWindowedStream)中的元素。

下面的代碼說明了如何使用窗口操作符。

stream
  .keyBy(...)
  .window(...)  // 指定window assigner
  .reduce/aggregate/process(...) // 指定window function

stream
  .windowAll(...) // 指定window assigner
  .reduce/aggregate/process(...) // 指定window function

我們的學習重點是Keyed WindowedStream。

1.2 內置的窗口分配器

窗口分配器將會根據事件的事件時間或者處理時間來將事件分配到對應的窗口中去。窗口包含開始時間和結束時間這兩個時間戳。

所有的窗口分配器都包含一個默認的觸發器:

  • 對於事件時間:當水位線超過窗口結束時間,觸發窗口的求值操作。
  • 對於處理時間:當機器時間超過窗口結束時間,觸發窗口的求值操作。

需要注意的是:當處於某個窗口的第一個事件到達的時候,這個窗口才會被創建。Flink不會對空窗口求值。

Flink創建的窗口類型是TimeWindow,包含開始時間和結束時間,區間是左閉右開的,也就是說包含開始時間戳,不包含結束時間戳。

滾動窗口(tumbling windows)

 

DataStream<SensorReading> sensorData = ...

DataStream<T> avgTemp = sensorData
  .keyBy(r -> r.id)
  // group readings in 1s event-time windows
  .window(TumblingEventTimeWindows.of(Time.seconds(1)))
  .process(new TemperatureAverager);

DataStream<T> avgTemp = sensorData
  .keyBy(r -> r.id)
  // group readings in 1s processing-time windows
  .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
  .process(new TemperatureAverager);

// 其實就是之前的
// shortcut for window.(TumblingEventTimeWindows.of(size))
DataStream<T> avgTemp = sensorData
  .keyBy(r -> r.id)
  .timeWindow(Time.seconds(1))
  .process(new TemperatureAverager);

默認情況下,滾動窗口會和1970-01-01-00:00:00.000對齊,例如一個1小時的滾動窗口將會定義以下開始時間的窗口:00:00:00,01:00:00,02:00:00,等等。

滑動窗口(sliding window)

對於滑動窗口,我們需要指定窗口的大小和滑動的步長。當滑動步長小於窗口大小時,窗口將會出現重疊,而元素會被分配到不止一個窗口中去。當滑動步長大於窗口大小時,一些元素可能不會被分配到任何窗口中去,會被直接丟棄。

下面的代碼定義了窗口大小為1小時,滑動步長為15分鍾的窗口。每一個元素將被分配到4個窗口中去。

 

DataStream<T> slidingAvgTemp = sensorData
  .keyBy(r -> r.id)
  .window(
    SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(15))
  )
  .process(new TemperatureAverager);

DataStream<T> slidingAvgTemp = sensorData
  .keyBy(r -> r.id)
  .window(
    SlidingProcessingTimeWindows.of(Time.hours(1), Time.minutes(15))
  )
  .process(new TemperatureAverager);

DataStream<T> slidingAvgTemp = sensorData
  .keyBy(r -> r.id)
  .timeWindow(Time.hours(1), Time.minutes(15))
  .process(new TemperatureAverager);

會話窗口(session windows)

會話窗口不可能重疊,並且會話窗口的大小也不是固定的。不活躍的時間長度定義了會話窗口的界限。不活躍的時間是指這段時間沒有元素到達。下圖展示了元素如何被分配到會話窗口。

 

DataStream<T> sessionWindows = sensorData
  .keyBy(r -> r.id)
  .window(EventTimeSessionWindows.withGap(Time.minutes(15)))
  .process(...);

DataStream<T> sessionWindows = sensorData
  .keyBy(r -> r.id)
  .window(ProcessingTimeSessionWindows.withGap(Time.minutes(15)))
  .process(...);

由於會話窗口的開始時間和結束時間取決於接收到的元素,所以窗口分配器無法立即將所有的元素分配到正確的窗口中去。相反,會話窗口分配器最開始時先將每一個元素分配到它自己獨有的窗口中去,窗口開始時間是這個元素的時間戳,窗口大小是session gap的大小。接下來,會話窗口分配器會將出現重疊的窗口合並成一個窗口。

1.3 調用窗口計算函數

window functions定義了窗口中數據的計算邏輯。有兩種計算邏輯:

  1. 增量聚合函數(Incremental aggregation functions):當一個事件被添加到窗口時,觸發函數計算,並且更新window的狀態(單個值)。最終聚合的結果將作為輸出。ReduceFunction和AggregateFunction是增量聚合函數。

  2. 全窗口函數(Full window functions):這個函數將會收集窗口中所有的元素,可以做一些復雜計算。ProcessWindowFunction是window function。

ReduceFunction

例子: 計算每個傳感器15s窗口中的溫度最小值

scala version

val minTempPerWindow = sensorData
  .map(r => (r.id, r.temperature))
  .keyBy(_._1)
  .timeWindow(Time.seconds(15))
  .reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))

java version

DataStream<Tuple2<String, Double>> minTempPerwindow = sensorData
    .map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
        @Override
        public Tuple2<String, Double> map(SensorReading value) throws Exception {
            return Tuple2.of(value.id, value.temperature);
        }
    })
    .keyBy(r -> r.f0)
    .timeWindow(Time.seconds(5))
    .reduce(new ReduceFunction<Tuple2<String, Double>>() {
        @Override
        public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2) throws Exception {
            if (value1.f1 < value2.f1) {
                return value1;
            } else {
                return value2;
            }
        }
    })

AggregateFunction

先來看接口定義

public interface AggregateFunction<IN, ACC, OUT>
  extends Function, Serializable {

  // create a new accumulator to start a new aggregate
  ACC createAccumulator();

  // add an input element to the accumulator and return the accumulator
  ACC add(IN value, ACC accumulator);

  // compute the result from the accumulator and return it.
  OUT getResult(ACC accumulator);

  // merge two accumulators and return the result.
  ACC merge(ACC a, ACC b);
}

IN是輸入元素的類型,ACC是累加器的類型,OUT是輸出元素的類型。

例子

val avgTempPerWindow: DataStream[(String, Double)] = sensorData
  .map(r => (r.id, r.temperature))
  .keyBy(_._1)
  .timeWindow(Time.seconds(15))
  .aggregate(new AvgTempFunction)

// An AggregateFunction to compute the average temperature per sensor.
// The accumulator holds the sum of temperatures and an event count.
class AvgTempFunction
  extends AggregateFunction[(String, Double),
    (String, Double, Int), (String, Double)] {

  override def createAccumulator() = {
    ("", 0.0, 0)
  }

  override def add(in: (String, Double), acc: (String, Double, Int)) = {
    (in._1, in._2 + acc._2, 1 + acc._3)
  }

  override def getResult(acc: (String, Double, Int)) = {
    (acc._1, acc._2 / acc._3)
  }

  override def merge(acc1: (String, Double, Int),
    acc2: (String, Double, Int)) = {
    (acc1._1, acc1._2 + acc2._2, acc1._3 + acc2._3)
  }
}

ProcessWindowFunction

一些業務場景,我們需要收集窗口內所有的數據進行計算,例如計算窗口數據的中位數,或者計算窗口數據中出現頻率最高的值。這樣的需求,使用ReduceFunction和AggregateFunction就無法實現了。這個時候就需要ProcessWindowFunction了。

先來看接口定義

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>
  extends AbstractRichFunction {

  // Evaluates the window
  void process(KEY key, Context ctx, Iterable<IN> vals, Collector<OUT> out)
    throws Exception;

  // Deletes any custom per-window state when the window is purged
  public void clear(Context ctx) throws Exception {}

  // The context holding window metadata
  public abstract class Context implements Serializable {
    // Returns the metadata of the window
    public abstract W window();

    // Returns the current processing time
    public abstract long currentProcessingTime();

    // Returns the current event-time watermark
    public abstract long currentWatermark();

    // State accessor for per-window state
    public abstract KeyedStateStore windowState();

    // State accessor for per-key global state
    public abstract KeyedStateStore globalState();

    // Emits a record to the side output identified by the OutputTag.
    public abstract <X> void output(OutputTag<X> outputTag, X value);
  }
}

process()方法接受的參數為:

  window的key,

  Iterable迭代器包含窗口的所有元素,

  Collector用於輸出結果流。

  Context參數和別的process方法一樣。而ProcessWindowFunction的Context對象還可以訪問window的元數據(窗口開始和結束時間),當前處理時間和水位線,per-window state和per-key global state,side outputs。

  • per-window state: 用於保存一些信息,這些信息可以被process()訪問,只要process所處理的元素屬於這個窗口。
  • per-key global state: 同一個key,也就是在一條KeyedStream上,不同的window可以訪問per-key global state保存的值。

例子:計算5s滾動窗口中的最低和最高的溫度。輸出的元素包含了(流的Key, 最低溫度, 最高溫度, 窗口結束時間)。

val minMaxTempPerWindow: DataStream[MinMaxTemp] = sensorData
  .keyBy(_.id)
  .timeWindow(Time.seconds(5))
  .process(new HighAndLowTempProcessFunction)

case class MinMaxTemp(id: String, min: Double, max: Double, endTs: Long)

class HighAndLowTempProcessFunction
  extends ProcessWindowFunction[SensorReading,
    MinMaxTemp, String, TimeWindow] {
  override def process(key: String,
                       ctx: Context,
                       vals: Iterable[SensorReading],
                       out: Collector[MinMaxTemp]): Unit = {
    val temps = vals.map(_.temperature)
    val windowEnd = ctx.window.getEnd

    out.collect(MinMaxTemp(key, temps.min, temps.max, windowEnd))
  }
}

  我們還可以將ReduceFunction/AggregateFunction和ProcessWindowFunction結合起來使用。ReduceFunction/AggregateFunction做增量聚合,ProcessWindowFunction提供更多的對數據流的訪問權限。如果只使用ProcessWindowFunction(底層的實現為將事件都保存在ListState中),將會非常占用空間。分配到某個窗口的元素將被提前聚合,而當窗口的trigger觸發時,也就是窗口收集完數據關閉時,將會把聚合結果發送到ProcessWindowFunction中,這時Iterable參數將會只有一個值,就是前面聚合的值。

例子

input
  .keyBy(...)
  .timeWindow(...)
  .reduce(
    incrAggregator: ReduceFunction[IN],
    function: ProcessWindowFunction[IN, OUT, K, W])

input
  .keyBy(...)
  .timeWindow(...)
  .aggregate(
    incrAggregator: AggregateFunction[IN, ACC, V],
    windowFunction: ProcessWindowFunction[V, OUT, K, W])

我們把之前的需求重新使用以上兩種方法實現一下。

case class MinMaxTemp(id: String, min: Double, max: Double, endTs: Long)

val minMaxTempPerWindow2: DataStream[MinMaxTemp] = sensorData
  .map(r => (r.id, r.temperature, r.temperature))
  .keyBy(_._1)
  .timeWindow(Time.seconds(5))
  .reduce(
    (r1: (String, Double, Double), r2: (String, Double, Double)) => {
      (r1._1, r1._2.min(r2._2), r1._3.max(r2._3))
    },
    new AssignWindowEndProcessFunction
  )

class AssignWindowEndProcessFunction
  extends ProcessWindowFunction[(String, Double, Double),
    MinMaxTemp, String, TimeWindow] {
    override def process(key: String,
                       ctx: Context,
                       minMaxIt: Iterable[(String, Double, Double)],
                       out: Collector[MinMaxTemp]): Unit = {
    val minMax = minMaxIt.head
    val windowEnd = ctx.window.getEnd
    out.collect(MinMaxTemp(key, minMax._2, minMax._3, windowEnd))
  }
}

1.4 自定義窗口操作符

Flink內置的window operators分配器已經已經足夠應付大多數應用場景。盡管如此,如果我們需要實現一些復雜的窗口邏輯,例如:可以發射早到的事件或者碰到遲到的事件就更新窗口的結果,或者窗口的開始和結束決定於特定事件的接收。

DataStream API暴露了接口和方法來自定義窗口操作符。

  • 自定義窗口分配器
  • 自定義窗口計算觸發器(trigger)
  • 自定義窗口數據清理功能(evictor)

當一個事件來到窗口操作符,首先將會傳給WindowAssigner來處理。WindowAssigner決定了事件將被分配到哪些窗口。如果窗口不存在,WindowAssigner將會創建一個新的窗口。

如果一個window operator接受了一個增量聚合函數作為參數,例如ReduceFunction或者AggregateFunction,新到的元素將會立即被聚合,而聚合結果result將存儲在window中。如果window operator沒有使用增量聚合函數,那么新元素將被添加到ListState中,ListState中保存了所有分配給窗口的元素。

新元素被添加到窗口時,這個新元素同時也被傳給了window的trigger。trigger定義了window何時准備好求值,何時window被清空。trigger可以基於window被分配的元素和注冊的定時器來對窗口的所有元素求值或者在特定事件清空window中所有的元素。

當window operator只接收一個增量聚合函數作為參數時:

當window operator只接收一個全窗口函數作為參數時:

當window operator接收一個增量聚合函數和一個全窗口函數作為參數時:

evictor是一個可選的組件,可以被注入到ProcessWindowFunction之前或者之后調用。evictor可以清除掉window中收集的元素。由於evictor需要迭代所有的元素,所以evictor只能使用在沒有增量聚合函數作為參數的情況下。

下面的代碼說明了如果使用自定義的trigger和evictor定義一個window operator:

stream
  .keyBy(...)
  .window(...)
 [.trigger(...)]
 [.evictor(...)]
  .reduce/aggregate/process(...)

注意:每個WindowAssigner都有一個默認的trigger。

窗口生命周期

當WindowAssigner分配某個窗口的第一個元素時,這個窗口才會被創建。所以不存在沒有元素的窗口。

一個窗口包含了如下狀態:

  • Window content
    • 分配到這個窗口的元素
    • 增量聚合的結果(如果window operator接收了ReduceFunction或者AggregateFunction作為參數)。
  • Window object
    • WindowAssigner返回0個,1個或者多個window object。
    • window operator根據返回的window object來聚合元素。
    • 每一個window object包含一個windowEnd時間戳,來區別於其他窗口。
  • 觸發器的定時器:一個觸發器可以注冊定時事件,到了定時的時間可以執行相應的回調函數,例如:對窗口進行求值或者清空窗口。
  • 觸發器中的自定義狀態:觸發器可以定義和使用自定義的、per-window或者per-key狀態。這個狀態完全被觸發器所控制。而不是被window operator控制。

當窗口結束時間來到,window operator將刪掉這個窗口。窗口結束時間是由window object的end timestamp所定義的。無論是使用processing time還是event time,窗口結束時間是什么類型可以調用WindowAssigner.isEventTime()方法獲得。

窗口分配器(window assigners)

WindowAssigner將會把元素分配到0個,1個或者多個窗口中去。我們看一下WindowAssigner接口:

public abstract class WindowAssigner<T, W extends Window>
    implements Serializable {

  public abstract Collection<W> assignWindows(
    T element,
    long timestamp,
    WindowAssignerContext context);

  public abstract Trigger<T, W> getDefaultTriger(
    StreamExecutionEnvironment env);

  public abstract TypeSerializer<W> getWindowSerializer(
    ExecutionConfig executionConfig);

  public abstract boolean isEventTime();

  public abstract static class WindowAssignerContext {
    public abstract long getCurrentProcessingTime();
  }
}

WindowAssigner有兩個泛型參數:

  • T: 事件的數據類型
  • W: 窗口的類型

下面的代碼創建了一個自定義窗口分配器,是一個30秒的滾動事件時間窗口。

class ThirtySecondsWindows
    extends WindowAssigner[Object, TimeWindow] {

  val windowSize: Long = 30 * 1000L

  override def assignWindows(
    o: Object,
    ts: Long,
    ctx: WindowAssigner.WindowAssignerContext
  ): java.util.List[TimeWindow] = {

    val startTime = ts - (ts % windowSize)
    val endTime = startTime + windowSize
    Collections.singletonList(new TimeWindow(startTime, endTime))
  }

  override def getDefaultTrigger(
    env: environment.StreamExecutionEnvironment
  ): Trigger[Object, TimeWindow] = {
      EventTimeTrigger.create()
  }

  override def getWindowSerializer(
    executionConfig: ExecutionConfig
  ): TypeSerializer[TimeWindow] = {
    new TimeWindow.Serializer
  }

  override def isEventTime = true
}

增量聚合示意圖

全窗口聚合示意圖

增量聚合和全窗口聚合結合使用的示意圖

觸發器(Triggers)

觸發器定義了window何時會被求值以及何時發送求值結果。觸發器可以到了特定的時間觸發也可以碰到特定的事件觸發。例如:觀察到事件數量符合一定條件或者觀察到了特定的事件。

默認的觸發器將會在兩種情況下觸發

  • 處理時間:機器時間到達處理時間
  • 事件時間:水位線超過了窗口的結束時間

  觸發器可以訪問流的時間屬性以及定時器,還可以對state狀態編程。所以觸發器和process function一樣強大。

  例如我們可以實現一個觸發邏輯:當窗口接收到一定數量的元素時,觸發器觸發。再比如當窗口接收到一個特定元素時,觸發器觸發。還有就是當窗口接收到的元素里面包含特定模式(5秒鍾內接收到了兩個同樣類型的事件),觸發器也可以觸發。在一個事件時間的窗口中,一個自定義的觸發器可以提前(在水位線沒過窗口結束時間之前)計算和發射計算結果。這是一個常見的低延遲計算策略,盡管計算不完全,但不像默認的那樣需要等待水位線沒過窗口結束時間。

每次調用觸發器都會產生一個TriggerResult來決定窗口接下來發生什么。TriggerResult可以取以下結果:

  • CONTINUE:什么都不做
  • FIRE:如果window operator有ProcessWindowFunction這個參數,將會調用這個ProcessWindowFunction。如果窗口僅有增量聚合函數(ReduceFunction或者AggregateFunction)作為參數,那么當前的聚合結果將會被發送。窗口的state不變。
  • PURGE:窗口所有內容包括窗口的元數據都將被丟棄。
  • FIRE_AND_PURGE:先對窗口進行求值,再將窗口中的內容丟棄。

TriggerResult可能的取值使得我們可以實現很復雜的窗口邏輯。一個自定義觸發器可以觸發多次,可以計算或者更新結果,可以在發送結果之前清空窗口。

接下來我們看一下Trigger API:

public abstract class Trigger<T, W extends Window>
    implements Serializable {

  TriggerResult onElement(
    long timestamp,
    W window,
    TriggerContext ctx);

  public abstract TriggerResult onProcessingTime(
    long timestamp,
    W window,
    TriggerContext ctx);

  public abstract TriggerResult onEventTime(
    long timestamp,
    W window,
    TriggerContext ctx);

  public boolean canMerge();

  public void onMerge(W window, OnMergeContext ctx);

  public abstract void clear(W window, TriggerContext ctx);
}

public interface TriggerContext {

  long getCurrentProcessingTime();

  long getCurrentWatermark();

  void registerProcessingTimeTimer(long time);

  void registerEventTimeTimer(long time);

  void deleteProcessingTimeTimer(long time);

  void deleteEventTimeTimer(long time);

  <S extends State> S getPartitionedState(
    StateDescriptor<S, ?> stateDescriptor);
}

public interface OnMergeContext extends TriggerContext {

  void mergePartitionedState(
    StateDescriptor<S, ?> stateDescriptor
  );
}

這里要注意兩個地方:清空state和merging合並觸發器。

當在觸發器中使用per-window state時,這里我們需要保證當窗口被刪除時state也要被刪除,否則隨着時間的推移,window operator將會積累越來越多的數據,最終可能使應用崩潰。

當窗口被刪除時,為了清空所有狀態,觸發器的clear()方法需要需要刪掉所有的自定義per-window state,以及使用TriggerContext對象將處理時間和事件時間的定時器都刪除。

下面的例子展示了一個觸發器在窗口結束時間之前觸發。當第一個事件被分配到窗口時,這個觸發器注冊了一個定時器,定時時間為水位線之前一秒鍾。當定時事件執行,將會注冊一個新的定時事件,這樣,這個觸發器每秒鍾最多觸發一次。

scala version

class OneSecondIntervalTrigger
    extends Trigger[SensorReading, TimeWindow] {

  override def onElement(
    SensorReading r,
    timestamp: Long,
    window: TimeWindow,
    ctx: Trigger.TriggerContext
  ): TriggerResult = {
    val firstSeen: ValueState[Boolean] = ctx
      .getPartitionedState(
        new ValueStateDescriptor[Boolean](
          "firstSeen", classOf[Boolean]
        )
      )

    if (!firstSeen.value()) {
      val t = ctx.getCurrentWatermark
       + (1000 - (ctx.getCurrentWatermark % 1000))
      ctx.registerEventTimeTimer(t)
      ctx.registerEventTimeTimer(window.getEnd)
      firstSeen.update(true)
    }

    TriggerResult.CONTINUE
  }

  override def onEventTime(
    timestamp: Long,
    window: TimeWindow,
    ctx: Trigger.TriggerContext
  ): TriggerResult = {
    if (timestamp == window.getEnd) {
      TriggerResult.FIRE_AND_PURGE
    } else {
      val t = ctx.getCurrentWatermark
       + (1000 - (ctx.getCurrentWatermark % 1000))
      if (t < window.getEnd) {
        ctx.registerEventTimeTimer(t)
      }
      TriggerResult.FIRE
    }
  }

  override def onProcessingTime(
    timestamp: Long,
    window: TimeWindow,
    ctx: Trigger.TriggerContext
  ): TriggerResult = {
    TriggerResult.CONTINUE
  }

  override def clear(
    window: TimeWindow,
    ctx: Trigger.TriggerContext
  ): Unit = {
    val firstSeen: ValueState[Boolean] = ctx
      .getPartitionedState(
        new ValueStateDescriptor[Boolean](
          "firstSeen", classOf[Boolean]
        )
      )
    firstSeen.clear()
  }
}

java version

public class TriggerExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String s) throws Exception {
                        String[] arr = s.split(" ");
                        return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L);
                    }
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                            @Override
                            public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
                                return stringLongTuple2.f1;
                            }
                        })
                )
                .keyBy(r -> r.f0)
                .timeWindow(Time.seconds(5))
                .trigger(new OneSecondIntervalTrigger())
                .process(new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
                    @Override
                    public void process(String s, Context context, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception {
                        long count = 0L;
                        for (Tuple2<String, Long> i : iterable) count += 1;
                        collector.collect("窗口中有 " + count + " 條元素");
                    }
                })
                .print();

        env.execute();
    }

    public static class OneSecondIntervalTrigger extends Trigger<Tuple2<String, Long>, TimeWindow> {
        // 來一條調用一次
        @Override
        public TriggerResult onElement(Tuple2<String, Long> r, long l, TimeWindow window, TriggerContext ctx) throws Exception {
            ValueState<Boolean> firstSeen = ctx.getPartitionedState(
                    new ValueStateDescriptor<Boolean>("first-seen", Types.BOOLEAN)
            );

            if (firstSeen.value() == null) {
                // 4999 + (1000 - 4999 % 1000) = 5000
                System.out.println("第一條數據來的時候 ctx.getCurrentWatermark() 的值是 " + ctx.getCurrentWatermark());
                long t = ctx.getCurrentWatermark() + (1000L - ctx.getCurrentWatermark() % 1000L);
                ctx.registerEventTimeTimer(t);
                ctx.registerEventTimeTimer(window.getEnd());
                firstSeen.update(true);
            }
            return TriggerResult.CONTINUE;
        }

        // 定時器邏輯
        @Override
        public TriggerResult onEventTime(long ts, TimeWindow window, TriggerContext ctx) throws Exception {
            if (ts == window.getEnd()) {
                return TriggerResult.FIRE_AND_PURGE;
            } else {
                System.out.println("當前水位線是:" + ctx.getCurrentWatermark());
                long t = ctx.getCurrentWatermark() + (1000L - ctx.getCurrentWatermark() % 1000L);
                if (t < window.getEnd()) {
                    ctx.registerEventTimeTimer(t);
                }
                return TriggerResult.FIRE;
            }
        }

        @Override
        public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
            return TriggerResult.CONTINUE;
        }

        @Override
        public void clear(TimeWindow timeWindow, TriggerContext ctx) throws Exception {
            ValueState<Boolean> firstSeen = ctx.getPartitionedState(
                    new ValueStateDescriptor<Boolean>("first-seen", Types.BOOLEAN)
            );
            firstSeen.clear();
        }
    }
}

清理器(EVICTORS)

evictor可以在window function求值之前或者之后移除窗口中的元素。

我們看一下Evictor的接口定義:

public interface Evictor<T, W extends Window>
    extends Serializable {
  void evictBefore(
    Iterable<TimestampedValue<T>> elements,
    int size,
    W window,
    EvictorContext evictorContext);

  void evictAfter(
    Iterable<TimestampedValue<T>> elements,
    int size,
    W window,
    EvictorContext evictorContext);

  interface EvictorContext {

    long getCurrentProcessingTime();

    long getCurrentWatermark();
  }
}

evictBefore()和evictAfter()分別在window function計算之前或者之后調用。Iterable迭代器包含了窗口所有的元素,size為窗口中元素的數量,window object和EvictorContext可以訪問當前處理時間和水位線。可以對Iterator調用remove()方法來移除窗口中的元素。

evictor也經常被用在GlobalWindow上,用來清除部分元素,而不是將窗口中的元素全部清空。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM