一、雙流join
在數據庫中的靜態表上做OLAP分析時,兩表join是非常常見的操作。同理,在流式處理作業中,有時也需要在兩條流上做join以獲得更豐富的信息。
1、Tumbling Window Join
代碼示例:
DataStream<Integer> orangeStream = ... DataStream<Integer> greenStream = ... orangeStream.join(greenStream) .where(<KeySelector>) .equalTo(<KeySelector>) .window(TumblingEventTimeWindows.of(Time.milliseconds(2))) .apply(new JoinFunction<Integer, Integer, String> (){ @Override public String join(Integer first, Integer second) { return first + "," + second; } });
2、Sliding Window Join
示例代碼:
DataStream<Integer> orangeStream = ... DataStream<Integer> greenStream = ... orangeStream.join(greenStream) .where(<KeySelector>) .equalTo(<KeySelector>) .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)) .apply(new JoinFunction<Integer, Integer, String> (){ @Override public String join(Integer first, Integer second) { return first + "," + second; } });
3、Session Window Join
示例代碼:
DataStream<Integer> orangeStream = ... DataStream<Integer> greenStream = ... orangeStream.join(greenStream) .where(<KeySelector>) .equalTo(<KeySelector>) .window(EventTimeSessionWindows.withGap(Time.milliseconds(1))) .apply(new JoinFunction<Integer, Integer, String> (){ @Override public String join(Integer first, Integer second) { return first + "," + second; } });
以上3種都是“inner join”,只是窗口類型不一樣。
4、Interval Join
右流相對左流偏移的時間區間進行關聯,即:
right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]
In the example above, we join two streams ‘orange’ and ‘green’ with a lower bound of -2 milliseconds and an upper bound of +1 millisecond. Be default, these boundaries are inclusive, but .lowerBoundExclusive()
and .upperBoundExclusive
can be applied to change the behaviour.
Using the more formal notation again this will translate to
orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound
注意:目前 interval join 只支持 Event time
示例代碼:
DataStream<Integer> orangeStream = ... DataStream<Integer> greenStream = ... orangeStream .keyBy(<KeySelector>) .intervalJoin(greenStream.keyBy(<KeySelector>)) .between(Time.milliseconds(-2), Time.milliseconds(1)) .process(new ProcessJoinFunction<Integer, Integer, String(){ @Override public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) { out.collect(first + "," + second); } });
5、coGroup
只有inner join肯定還不夠,如何實現left/right outer join呢?答案就是利用coGroup()算子。它的調用方式類似於join()算子,也需要開窗,但是CoGroupFunction比JoinFunction更加靈活,可以按照用戶指定的邏輯匹配左流和/或右流的數據並輸出。
以下的例子就實現了點擊流left join訂單流的功能,是很朴素的nested loop join思想(二重循環)。
clickRecordStream .coGroup(orderRecordStream) .where(record -> record.getMerchandiseId()) .equalTo(record -> record.getMerchandiseId()) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .apply(new CoGroupFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, Tuple2<String, Long>>() { @Override public void coGroup(Iterable<AnalyticsAccessLogRecord> accessRecords, Iterable<OrderDoneLogRecord> orderRecords,
Collector<Tuple2<String, Long>> collector) throws Exception { for (AnalyticsAccessLogRecord accessRecord : accessRecords) { boolean isMatched = false; for (OrderDoneLogRecord orderRecord : orderRecords) { // 右流中有對應的記錄 collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), orderRecord.getPrice())); isMatched = true; } if (!isMatched) { // 右流中沒有對應的記錄 collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), null)); } } } }) .print().setParallelism(1);
二、維表join
1、 預加載維表
通過定義一個類實現RichMapFunction,在open()中讀取維表數據加載到內存中,在probe流map()方法中與維表數據進行關聯。
RichMapFunction中open方法里加載維表數據到內存的方式特點如下:
- 優點:實現簡單
- 缺點:因為數據存於內存,所以只適合小數據量並且維表數據更新頻率不高的情況下。雖然可以在open中定義一個定時器定時更新維表,但是還是存在維表更新不及時的情況。
class MapJoinDemo1 extends RichMapFunction<Tuple2<String, Integer>, Tuple3<String, Integer, String>> { //定義一個變量,用於保存維表數據在內存 Map<Integer, String> dim; @Override public void open(Configuration parameters) throws Exception { //在open方法中讀取維表數據,可以從數據中讀取、文件中讀取、接口中讀取等等。 dim = new HashMap<>(); dim.put(1001, "beijing"); dim.put(1002, "shanghai"); dim.put(1003, "wuhan"); dim.put(1004, "changsha"); } @Override public Tuple3<String, Integer, String> map(Tuple2<String, Integer> value) throws Exception { //在map方法中進行主流和維表的關聯 String cityName = ""; if (dim.containsKey(value.f1)) { cityName = dim.get(value.f1); } return new Tuple3<>(value.f0, value.f1, cityName); } } }
2、 熱存儲維表
這種方式是將維表數據存儲在Redis、HBase、MySQL等外部存儲中,實時流在關聯維表數據的時候實時去外部存儲中查詢,這種方式特點如下:
- 優點:維度數據量不受內存限制,可以存儲很大的數據量。
- 缺點:因為維表數據在外部存儲中,讀取速度受制於外部存儲的讀取速度;另外維表的同步也有延遲。
(1) 使用cache來減輕訪問壓力
可以使用緩存來存儲一部分常訪問的維表數據,以減少訪問外部系統的次數,比如使用guava Cache。
class MapJoinDemo1 extends RichMapFunction<Tuple2<String, Integer>, Tuple3<String, Integer, String>> { LoadingCache<Integer, String> dim; @Override public void open(Configuration parameters) throws Exception { //使用google LoadingCache來進行緩存 dim = CacheBuilder.newBuilder() //最多緩存個數,超過了就根據最近最少使用算法來移除緩存 .maximumSize(1000) //在更新后的指定時間后就回收 .expireAfterWrite(10, TimeUnit.MINUTES) //指定移除通知 .removalListener(new RemovalListener<Integer, String>() { @Override public void onRemoval(RemovalNotification<Integer, String> removalNotification) { System.out.println(removalNotification.getKey() + "被移除了,值為:" + removalNotification.getValue()); } }) .build( //指定加載緩存的邏輯 new CacheLoader<Integer, String>() { @Override public String load(Integer cityId) throws Exception { String cityName = readFromHbase(cityId); return cityName; } } ); } private String readFromHbase(Integer cityId) { //讀取hbase,模擬從hbase讀取數據 Map<Integer, String> temp = new HashMap<>(); temp.put(1001, "beijing"); temp.put(1002, "shanghai"); temp.put(1003, "wuhan"); temp.put(1004, "changsha"); String cityName = ""; if (temp.containsKey(cityId)) { cityName = temp.get(cityId); } return cityName; } @Override public Tuple3<String, Integer, String> map(Tuple2<String, Integer> value) throws Exception { //在map方法中進行主流和維表的關聯 String cityName = ""; if (dim.get(value.f1) != null) { cityName = dim.get(value.f1); } return new Tuple3<>(value.f0, value.f1, cityName); } } }
(2) 使用異步IO來提高訪問吞吐量
Flink與外部存儲系統進行讀寫操作的時候可以使用同步方式,也就是發送一個請求后等待外部系統響應,然后再發送第二個讀寫請求,這樣的方式吞吐量比較低,可以用提高並行度的方式來提高吞吐量,但是並行度多了也就導致了進程數量多了,占用了大量的資源。
Flink中可以使用異步IO來讀寫外部系統,這要求外部系統客戶端支持異步IO,不過目前很多系統都支持異步IO客戶端。但是如果使用異步就要涉及到三個問題:
- 超時:如果查詢超時那么就認為是讀寫失敗,需要按失敗處理;
- 並發數量:如果並發數量太多,就要觸發Flink的反壓機制來抑制上游的寫入。
- 返回順序錯亂:順序錯亂了要根據實際情況來處理,Flink支持兩種方式:允許亂序、保證順序。
public class JoinDemo3 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n") .map(p -> { //輸入格式為:user,1000,分別是用戶名稱和城市編號 String[] list = p.split(","); return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1])); }) .returns(new TypeHint<Tuple2<String, Integer>>() { }); DataStream<Tuple3<String,Integer, String>> orderedResult = AsyncDataStream //保證順序:異步返回的結果保證順序,超時時間1秒,最大容量2,超出容量觸發反壓 .orderedWait(textStream, new JoinDemo3AyncFunction(), 1000L, TimeUnit.MILLISECONDS, 2) .setParallelism(1); DataStream<Tuple3<String,Integer, String>> unorderedResult = AsyncDataStream //允許亂序:異步返回的結果允許亂序,超時時間1秒,最大容量2,超出容量觸發反壓 .unorderedWait(textStream, new JoinDemo3AyncFunction(), 1000L, TimeUnit.MILLISECONDS, 2) .setParallelism(1); orderedResult.print(); unorderedResult.print(); env.execute("joinDemo"); } //定義個類,繼承RichAsyncFunction,實現異步查詢存儲在mysql里的維表 //輸入用戶名、城市ID,返回 Tuple3<用戶名、城市ID,城市名稱> static class JoinDemo3AyncFunction extends RichAsyncFunction<Tuple2<String, Integer>, Tuple3<String, Integer, String>> { // 鏈接 private static String jdbcUrl = "jdbc:mysql://192.168.145.1:3306?useSSL=false"; private static String username = "root"; private static String password = "123"; private static String driverName = "com.mysql.jdbc.Driver"; java.sql.Connection conn; PreparedStatement ps; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); Class.forName(driverName); conn = DriverManager.getConnection(jdbcUrl, username, password); ps = conn.prepareStatement("select city_name from tmp.city_info where id = ?"); } @Override public void close() throws Exception { super.close(); conn.close(); } //異步查詢方法 @Override public void asyncInvoke(Tuple2<String, Integer> input, ResultFuture<Tuple3<String,Integer, String>> resultFuture) throws Exception { // 使用 city id 查詢 ps.setInt(1, input.f1); ResultSet rs = ps.executeQuery(); String cityName = null; if (rs.next()) { cityName = rs.getString(1); } List list = new ArrayList<Tuple2<Integer, String>>(); list.add(new Tuple3<>(input.f0,input.f1, cityName)); resultFuture.complete(list); } //超時處理 @Override public void timeout(Tuple2<String, Integer> input, ResultFuture<Tuple3<String,Integer, String>> resultFuture) throws Exception { List list = new ArrayList<Tuple2<Integer, String>>(); list.add(new Tuple3<>(input.f0,input.f1, "")); resultFuture.complete(list); } } }
3、 廣播維表
利用Flink的Broadcast State將維度數據流廣播到下游做join操作。特點如下:
- 優點:維度數據變更后可以即時更新到結果中。
- 缺點:數據保存在內存中,支持的維度數據量比較小。
//定義城市流 DataStream<Tuple2<Integer, String>> cityStream = env.socketTextStream("localhost", 9001, "\n") .map(p -> { //輸入格式為:城市ID,城市名稱 String[] list = p.split(","); return new Tuple2<Integer, String>(Integer.valueOf(list[0]), list[1]); }) .returns(new TypeHint<Tuple2<Integer, String>>() { }); //將城市流定義為廣播流 final MapStateDescriptor<Integer, String> broadcastDesc = new MapStateDescriptor("broad1", Integer.class, String.class); BroadcastStream<Tuple2<Integer, String>> broadcastStream = cityStream.broadcast(broadcastDesc); DataStream result = textStream.connect(broadcastStream) .process(new BroadcastProcessFunction<Tuple2<String, Integer>, Tuple2<Integer, String>, Tuple3<String, Integer, String>>() { //處理非廣播流,關聯維度 @Override public void processElement(Tuple2<String, Integer> value, ReadOnlyContext ctx, Collector<Tuple3<String, Integer, String>> out) throws Exception { ReadOnlyBroadcastState<Integer, String> state = ctx.getBroadcastState(broadcastDesc); String cityName = ""; if (state.contains(value.f1)) { cityName = state.get(value.f1); } out.collect(new Tuple3<>(value.f0, value.f1, cityName)); } @Override public void processBroadcastElement(Tuple2<Integer, String> value, Context ctx, Collector<Tuple3<String, Integer, String>> out) throws Exception { System.out.println("收到廣播數據:" + value); ctx.getBroadcastState(broadcastDesc).put(value.f0, value.f1); } });
4、 Temporal table function join
Temporal table是持續變化表上某一時刻的視圖,Temporal table function是一個表函數,傳遞一個時間參數,返回Temporal table這一指定時刻的視圖。
可以將維度數據流映射為Temporal table,主流與這個Temporal table進行關聯,可以關聯到某一個版本(歷史上某一個時刻)的維度數據。
Temporal table function join的特點如下:
- 優點:維度數據量可以很大,維度數據更新及時,不依賴外部存儲,可以關聯不同版本的維度數據。
- 缺點:只支持在Flink SQL API中使用。
public class JoinDemo5 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings); //定義主流 DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n") .map(p -> { //輸入格式為:user,1000,分別是用戶名稱和城市編號 String[] list = p.split(","); return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1])); }) .returns(new TypeHint<Tuple2<String, Integer>>() { }); //定義城市流 DataStream<Tuple2<Integer, String>> cityStream = env.socketTextStream("localhost", 9001, "\n") .map(p -> { //輸入格式為:城市ID,城市名稱 String[] list = p.split(","); return new Tuple2<Integer, String>(Integer.valueOf(list[0]), list[1]); }) .returns(new TypeHint<Tuple2<Integer, String>>() { }); //轉變為Table Table userTable = tableEnv.fromDataStream(textStream, "user_name,city_id,ps.proctime"); Table cityTable = tableEnv.fromDataStream(cityStream, "city_id,city_name,ps.proctime"); //定義一個TemporalTableFunction TemporalTableFunction dimCity = cityTable.createTemporalTableFunction("ps", "city_id"); //注冊表函數 tableEnv.registerFunction("dimCity", dimCity); //關聯查詢 Table result = tableEnv .sqlQuery("select u.user_name,u.city_id,d.city_name from " + userTable + " as u " + ", Lateral table (dimCity(u.ps)) d " + "where u.city_id=d.city_id"); //打印輸出 DataStream resultDs = tableEnv.toAppendStream(result, Row.class); resultDs.print(); env.execute("joinDemo"); } }
5、四種維表關聯方式比較
參考:
https://www.jianshu.com/p/45ec888332df
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html
https://blog.csdn.net/chybin500/article/details/106482620