1 基於時間的雙流Join
數據流操作的另一個常見需求是對兩條數據流中的事件進行聯結(connect)或Join。Flink DataStream API中內置有兩個可以根據時間條件對數據流進行Join的算子:基於間隔的Join和基於窗口的Join。本節我們會對它們進行介紹。
如果Flink內置的Join算子無法表達所需的Join語義,那么你可以通過CoProcessFunction、BroadcastProcessFunction或KeyedBroadcastProcessFunction實現自定義的Join邏輯。
注意,你要設計的Join算子需要具備高效的狀態訪問模式及有效的狀態清理策略。
1.1 基於間隔的Join
基於間隔的Join會對兩條流中擁有相同鍵值以及彼此之間時間戳不超過某一指定間隔的事件進行Join。
下圖展示了兩條流(A和B)上基於間隔的Join,如果B中事件的時間戳相較於A中事件的時間戳不早於1小時且不晚於15分鍾,則會將兩個事件Join起來。Join間隔具有對稱性,因此上面的條件也可以表示為A中事件的時間戳相較B中事件的時間戳不早於15分鍾且不晚於1小時。

基於間隔的Join目前只支持事件時間以及INNER JOIN語義(無法發出未匹配成功的事件)。下面的例子定義了一個基於間隔的Join。
input1 .intervalJoin(input2) .between(<lower-bound>, <upper-bound>) // 相對於input1的上下界 .process(ProcessJoinFunction) // 處理匹配的事件對
Join成功的事件對會發送給ProcessJoinFunction。下界和上界分別由負時間間隔和正時間間隔來定義,例如between(Time.hour(-1), Time.minute(15))。在滿足下界值小於上界值的前提下,你可以任意對它們賦值。例如,允許出現B中事件的時間戳相較A中事件的時間戳早1~2小時這樣的條件。
基於間隔的Join需要同時對雙流的記錄進行緩沖。對第一個輸入而言,所有時間戳大於當前水位線減去間隔上界的數據都會被緩沖起來;對第二個輸入而言,所有時間戳大於當前水位線加上間隔下界的數據都會被緩沖起來。注意,兩側邊界值都有可能為負。上圖中的Join需要存儲數據流A中所有時間戳大於當前水位線減去15分鍾的記錄,以及數據流B中所有時間戳大於當前水位線減去1小時的記錄。不難想象,如果兩條流的事件時間不同步,那么Join所需的存儲就會顯著增加,因為水位線總是由“較慢”的那條流來決定。
例子:每個用戶的點擊Join這個用戶最近10分鍾內的瀏覽
scala version
object IntervalJoinExample { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) /* A.intervalJoin(B).between(lowerBound, upperBound) B.intervalJoin(A).between(-upperBound, -lowerBound) */ val stream1 = env .fromElements( ("user_1", 10 * 60 * 1000L, "click"), ("user_1", 16 * 60 * 1000L, "click") ) .assignAscendingTimestamps(_._2) .keyBy(r => r._1) val stream2 = env .fromElements( ("user_1", 5 * 60 * 1000L, "browse"), ("user_1", 6 * 60 * 1000L, "browse") ) .assignAscendingTimestamps(_._2) .keyBy(r => r._1) stream1 .intervalJoin(stream2) .between(Time.minutes(-10), Time.minutes(0)) .process(new ProcessJoinFunction[(String, Long, String), (String, Long, String), String] { override def processElement(in1: (String, Long, String), in2: (String, Long, String), context: ProcessJoinFunction[(String, Long, String), (String, Long, String), String]#Context, collector: Collector[String]): Unit = { collector.collect(in1 + " => " + in2) } }) .print() stream2 .intervalJoin(stream1) .between(Time.minutes(0), Time.minutes(10)) .process(new ProcessJoinFunction[(String, Long, String), (String, Long, String), String] { override def processElement(in1: (String, Long, String), in2: (String, Long, String), context: ProcessJoinFunction[(String, Long, String), (String, Long, String), String]#Context, collector: Collector[String]): Unit = { collector.collect(in1 + " => " + in2) } }) .print() env.execute() } }
java version
public class IntervalJoinExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); KeyedStream<Tuple3<String, Long, String>, String> stream1 = env .fromElements( Tuple3.of("user_1", 10 * 60 * 1000L, "click") ) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple3<String, Long, String>>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Long, String>>() { @Override public long extractTimestamp(Tuple3<String, Long, String> stringLongStringTuple3, long l) { return stringLongStringTuple3.f1; } }) ) .keyBy(r -> r.f0); KeyedStream<Tuple3<String, Long, String>, String> stream2 = env .fromElements( Tuple3.of("user_1", 5 * 60 * 1000L, "browse"), Tuple3.of("user_1", 6 * 60 * 1000L, "browse") ) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple3<String, Long, String>>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Long, String>>() { @Override public long extractTimestamp(Tuple3<String, Long, String> stringLongStringTuple3, long l) { return stringLongStringTuple3.f1; } }) ) .keyBy(r -> r.f0); stream1 .intervalJoin(stream2) .between(Time.minutes(-10), Time.minutes(0)) .process(new ProcessJoinFunction<Tuple3<String, Long, String>, Tuple3<String, Long, String>, String>() { @Override public void processElement(Tuple3<String, Long, String> stringLongStringTuple3, Tuple3<String, Long, String> stringLongStringTuple32, Context context, Collector<String> collector) throws Exception { collector.collect(stringLongStringTuple3 + " => " + stringLongStringTuple32); } }) .print(); env.execute(); } }
1.2 基於窗口的Join
顧名思義,基於窗口的Join需要用到Flink中的窗口機制。其原理是將兩條輸入流中的元素分配到公共窗口中並在窗口完成時進行Join(或Cogroup)。
下面的例子展示了如何定義基於窗口的Join。
input1.join(input2) .where(...) // 為input1指定鍵值屬性 .equalTo(...) // 為input2指定鍵值屬性 .window(...) // 指定WindowAssigner [.trigger(...)] // 選擇性的指定Trigger [.evictor(...)] // 選擇性的指定Evictor .apply(...) // 指定JoinFunction
下圖展示了DataStream API中基於窗口的Join是如何工作的。

兩條輸入流都會根據各自的鍵值屬性進行分區,公共窗口分配器會將二者的事件映射到公共窗口內(其中同時存儲了兩條流中的數據)。當窗口的計時器觸發時,算子會遍歷兩個輸入中元素的每個組合(叉乘積)去調用JoinFunction。同時你也可以自定義觸發器或移除器。由於兩條流中的事件會被映射到同一個窗口中,因此該過程中的觸發器和移除器與常規窗口算子中的完全相同。
除了對窗口中的兩條流進行Join,你還可以對它們進行Cogroup,只需將算子定義開始位置的join改為coGroup()即可。Join和Cogroup的總體邏輯相同,二者的唯一區別是:Join會為兩側輸入中的每個事件對調用JoinFunction;而Cogroup中用到的CoGroupFunction會以兩個輸入的元素遍歷器為參數,只在每個窗口中被調用一次。
注意,對划分窗口后的數據流進行Join可能會產生意想不到的語義。例如,假設你為執行Join操作的算子配置了1小時的滾動窗口,那么一旦來自兩個輸入的元素沒有被划分到同一窗口,它們就無法Join在一起,即使二者彼此僅相差1秒鍾。
scala version
object TwoWindowJoinExample { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val stream1 = env .fromElements( ("a", 1000L), ("a", 2000L) ) .assignAscendingTimestamps(_._2) val stream2 = env .fromElements( ("a", 3000L), ("a", 4000L) ) .assignAscendingTimestamps(_._2) stream1 .join(stream2) // on A.id = B.id .where(_._1) .equalTo(_._1) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .apply(new JoinFunction[(String, Long), (String, Long), String] { override def join(in1: (String, Long), in2: (String, Long)): String = { in1 + " => " + in2 } }) .print() env.execute() } }
java version
public class TwoWindowJoinExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<Tuple2<String, Long>> stream1 = env .fromElements( Tuple2.of("a", 1000L), Tuple2.of("b", 1000L), Tuple2.of("a", 2000L), Tuple2.of("b", 2000L) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple2<String, Long>>forMonotonousTimestamps() .withTimestampAssigner( new SerializableTimestampAssigner<Tuple2<String, Long>>() { @Override public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) { return stringLongTuple2.f1; } } ) ); DataStream<Tuple2<String, Long>> stream2 = env .fromElements( Tuple2.of("a", 3000L), Tuple2.of("b", 3000L), Tuple2.of("a", 4000L), Tuple2.of("b", 4000L) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple2<String, Long>>forMonotonousTimestamps() .withTimestampAssigner( new SerializableTimestampAssigner<Tuple2<String, Long>>() { @Override public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) { return stringLongTuple2.f1; } } ) ); stream1 .join(stream2) .where(r -> r.f0) .equalTo(r -> r.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() { @Override public String join(Tuple2<String, Long> stringLongTuple2, Tuple2<String, Long> stringLongTuple22) throws Exception { return stringLongTuple2 + " => " + stringLongTuple22; } }) .print(); env.execute(); } }
2 處理遲到的元素
水位線可以用來平衡計算的完整性和延遲兩方面。除非我們選擇一種非常保守的水位線策略(最大延時設置的非常大,以至於包含了所有的元素,但結果是非常大的延遲),否則我們總需要處理遲到的元素。
遲到的元素是指當這個元素來到時,這個元素所對應的窗口已經計算完畢了(也就是說水位線已經沒過窗口結束時間了)。這說明遲到這個特性只針對事件時間。
DataStream API提供了三種策略來處理遲到元素
- 直接拋棄遲到的元素
- 將遲到的元素發送到另一條流中去
- 可以更新窗口已經計算完的結果,並發出計算結果。
2.1 拋棄遲到元素
拋棄遲到的元素是event time window operator的默認行為。也就是說一個遲到的元素不會創建一個新的窗口。
process function可以通過比較遲到元素的時間戳和當前水位線的大小來很輕易的過濾掉遲到元素。
2.2 重定向遲到元素
遲到的元素也可以使用側輸出(side output)特性被重定向到另外的一條流中去。遲到元素所組成的側輸出流可以繼續處理或者sink到持久化設施中去。
例子
scala version
val readings = env .socketTextStream("localhost", 9999, '\n') .map(line => { val arr = line.split(" ") (arr(0), arr(1).toLong * 1000) }) .assignAscendingTimestamps(_._2) val countPer10Secs = readings .keyBy(_._1) .timeWindow(Time.seconds(10)) .sideOutputLateData( new OutputTag[(String, Long)]("late-readings") ) .process(new CountFunction()) val lateStream = countPer10Secs .getSideOutput( new OutputTag[(String, Long)]("late-readings") ) lateStream.print()
實現CountFunction:
class CountFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] { override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[String]): Unit = { out.collect("窗口共有" + elements.size + "條數據") } }
java version
public class RedirectLateEvent { private static OutputTag<Tuple2<String, Long>> output = new OutputTag<Tuple2<String, Long>>("late-readings"){}; public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<Tuple2<String, Long>> stream = env .socketTextStream("localhost", 9999) .map(new MapFunction<String, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(String s) throws Exception { String[] arr = s.split(" "); return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L); } }) .assignTimestampsAndWatermarks( WatermarkStrategy. // like scala: assignAscendingTimestamps(_._2) <Tuple2<String, Long>>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() { @Override public long extractTimestamp(Tuple2<String, Long> value, long l) { return value.f1; } }) ); SingleOutputStreamOperator<String> lateReadings = stream .keyBy(r -> r.f0) .timeWindow(Time.seconds(5)) .sideOutputLateData(output) // use after keyBy and timeWindow .process(new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception { long exactSizeIfKnown = iterable.spliterator().getExactSizeIfKnown(); collector.collect(exactSizeIfKnown + " of elements"); } }); lateReadings.print(); lateReadings.getSideOutput(output).print(); env.execute(); } }
下面這個例子展示了ProcessFunction如何過濾掉遲到的元素然后將遲到的元素發送到側輸出流中去。
scala version
val readings: DataStream[SensorReading] = ... val filteredReadings: DataStream[SensorReading] = readings .process(new LateReadingsFilter) // retrieve late readings val lateReadings: DataStream[SensorReading] = filteredReadings .getSideOutput(new OutputTag[SensorReading]("late-readings")) /** A ProcessFunction that filters out late sensor readings and * re-directs them to a side output */ class LateReadingsFilter extends ProcessFunction[SensorReading, SensorReading] { val lateReadingsOut = new OutputTag[SensorReading]("late-readings") override def processElement( SensorReading r, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = { // compare record timestamp with current watermark if (r.timestamp < ctx.timerService().currentWatermark()) { // this is a late reading => redirect it to the side output ctx.output(lateReadingsOut, r) } else { out.collect(r) } } }
java version
public class RedirectLateEvent { private static OutputTag<String> output = new OutputTag<String>("late-readings"){}; public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); SingleOutputStreamOperator<Tuple2<String, Long>> stream = env .socketTextStream("localhost", 9999) .map(new MapFunction<String, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(String s) throws Exception { String[] arr = s.split(" "); return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L); } }) .assignTimestampsAndWatermarks( WatermarkStrategy. <Tuple2<String, Long>>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() { @Override public long extractTimestamp(Tuple2<String, Long> value, long l) { return value.f1; } }) ) .process(new ProcessFunction<Tuple2<String, Long>, Tuple2<String, Long>>() { @Override public void processElement(Tuple2<String, Long> stringLongTuple2, Context context, Collector<Tuple2<String, Long>> collector) throws Exception { if (stringLongTuple2.f1 < context.timerService().currentWatermark()) { context.output(output, "late event is comming!"); } else { collector.collect(stringLongTuple2); } } }); stream.print(); stream.getSideOutput(output).print(); env.execute(); } }
2.3 使用遲到元素更新窗口計算結果
由於存在遲到的元素,所以已經計算出的窗口結果是不准確和不完全的。我們可以使用遲到元素更新已經計算完的窗口結果。
如果我們要求一個operator支持重新計算和更新已經發出的結果,就需要在第一次發出結果以后也要保存之前所有的狀態。但顯然我們不能一直保存所有的狀態,肯定會在某一個時間點將狀態清空,而一旦狀態被清空,結果就再也不能重新計算或者更新了。而遲到的元素只能被拋棄或者發送到側輸出流。
window operator API提供了方法來明確聲明我們要等待遲到元素。當使用event-time window,我們可以指定一個時間段叫做allowed lateness。window operator如果設置了allowed lateness,這個window operator在水位線沒過窗口結束時間時也將不會刪除窗口和窗口中的狀態。窗口會在一段時間內(allowed lateness設置的)保留所有的元素。
當遲到元素在allowed lateness時間內到達時,這個遲到元素會被實時處理並發送到觸發器(trigger)。當水位線沒過了窗口結束時間+allowed lateness時間時,窗口會被刪除,並且所有后來的遲到的元素都會被丟棄。
Allowed lateness可以使用allowedLateness()方法來指定,如下所示:
val readings: DataStream[SensorReading] = ... val countPer10Secs: DataStream[(String, Long, Int, String)] = readings .keyBy(_.id) .timeWindow(Time.seconds(10)) // process late readings for 5 additional seconds .allowedLateness(Time.seconds(5)) // count readings and update results if late readings arrive .process(new UpdatingWindowCountFunction) /** A counting WindowProcessFunction that distinguishes between * first results and updates. */ class UpdatingWindowCountFunction extends ProcessWindowFunction[SensorReading, (String, Long, Int, String), String, TimeWindow] { override def process( id: String, ctx: Context, elements: Iterable[SensorReading], out: Collector[(String, Long, Int, String)]): Unit = { // count the number of readings val cnt = elements.count(_ => true) // state to check if this is // the first evaluation of the window or not val isUpdate = ctx.windowState.getState( new ValueStateDescriptor[Boolean]( "isUpdate", Types.of[Boolean])) if (!isUpdate.value()) { // first evaluation, emit first result out.collect((id, ctx.window.getEnd, cnt, "first")) isUpdate.update(true) } else { // not the first evaluation, emit an update out.collect((id, ctx.window.getEnd, cnt, "update")) } } }
java version
public class UpdateWindowResultWithLateEvent { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStreamSource<String> stream = env.socketTextStream("localhost", 9999); stream .map(new MapFunction<String, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(String s) throws Exception { String[] arr = s.split(" "); return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L); } }) .assignTimestampsAndWatermarks( WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() { @Override public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) { return stringLongTuple2.f1; } }) ) .keyBy(r -> r.f0) .timeWindow(Time.seconds(5)) .allowedLateness(Time.seconds(5)) .process(new UpdateWindowResult()) .print(); env.execute(); } public static class UpdateWindowResult extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> { @Override public void process(String s, Context context, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception { long count = 0L; for (Tuple2<String, Long> i : iterable) { count += 1; } // 可見范圍比getRuntimeContext.getState更小,只對當前key、當前window可見 // 基於窗口的狀態變量,只能當前key和當前窗口訪問 ValueState<Boolean> isUpdate = context.windowState().getState( new ValueStateDescriptor<Boolean>("isUpdate", Types.BOOLEAN) ); // 當水位線超過窗口結束時間時,觸發窗口的第一次計算! if (isUpdate.value() == null) { collector.collect("窗口第一次觸發計算!一共有 " + count + " 條數據!"); isUpdate.update(true); } else { collector.collect("窗口更新了!一共有 " + count + " 條數據!"); } } } }
