官網持續查詢中的join : https://site.douban.com/108880/widget/notes/4611554/note/249729366/
Flink 官網上介紹的雙流join,有兩種:Regular Joins 和 Time-windowed Joins
以下內容翻譯自官網:
Regular Joins
常規 join 是最通用的 join 類型,其中任何新記錄或對 join 輸入兩側的任何更改都是可見的,並且會影響整個聯接結果。 例如,如果左側有一個新記錄,則它將與右側的所有以前和將來的記錄合並在一起。
SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id
這些語義允許進行任何類型的更新(插入,更新,刪除)輸入到表。
但是,此操作有一個重要的含義:它需要將 join 輸入的兩端始終保持在Flink的狀態。 因此,如果一個或兩個輸入表持續增長,資源使用也將無限期增長。
Time-windowed Joins
時間窗口 join 由 join 謂詞定義,該 join 謂詞檢查輸入記錄的時間屬性是否在某些時間限制(即時間窗口)內。
SELECT * FROM Orders o, Shipments s WHERE o.id = s.orderId AND o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
與常規 join 操作相比,這種 join 僅支持具有時間屬性的append-only tables 。 由於時間屬性是准單調遞增的,因此Flink可以從其狀態中刪除舊值,而不會影響結果的正確性。
注:在升到 Flink 1.10 后,我們開始考慮更多的使用SQL了
先看下 SQL提交程序:
// create blink table enviroment val env = StreamExecutionEnvironment.getExecutionEnvironment val settings = EnvironmentSettings .newInstance .useBlinkPlanner .inStreamingMode .build val tableEnv = StreamTableEnvironment.create(env, settings) // get sql,讀取sql 文件,按 ; 切成不同的段 val sqlList = SqlFileReadUtil.sqlFileReadUtil(sqlName) // change special sql name to loop for (sql <- sqlList) { logger.info("sql : {}", sql) tableEnv.sqlUpdate(sql) } tableEnv.execute(sqlName)
我的SQL提交程序,基本上就是 抄的 雲邪大佬之前的博客,之后一點點改動,地址如下:http://wuchong.me/blog/2019/09/02/flink-sql-1-9-read-from-kafka-write-into-mysql/
看下常規 join 的sql 文件:
-- Regular Joins like Global Join ---sourceTable -- 訂單表 CREATE TABLE t_order( order_id VARCHAR, -- 訂單 id product_id VARCHAR, -- 產品 id create_time VARCHAR -- 訂單時間 ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'order', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.0.key' = 'zookeeper.connect', 'connector.properties.0.value' = 'venn:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'venn:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ); ---sourceTable --產品表 CREATE TABLE t_product ( product_id VARCHAR, -- 產品 id price DECIMAL(38,18), -- 價格 create_time VARCHAR -- 訂單時間 ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'shipments', 'connector.startup-mode' = 'latest-offset', 'connector.properties.zookeeper.connect' = 'venn:2181', 'connector.properties.bootstrap.servers' = 'venn:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ); ---sinkTable --訂單表 關聯 產品表 成訂購表 CREATE TABLE order_detail ( order_id VARCHAR, producer_id VARCHAR , price DECIMAL(38,18), order_create_time VARCHAR, product_create_time VARCHAR ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'order_detail', 'connector.startup-mode' = 'latest-offset', 'connector.properties.zookeeper.connect' = 'venn:2181', 'connector.properties.bootstrap.servers' = 'venn:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ); ---order_sink INSERT INTO order_detail(order_id, product_id, price, create_time) SELECT a.order_id, a.product_id, b.price, a.create_time, b.create_time FROM t_order a INNER JOIN t_product b ON a.product_id = b.product_id where a.order_id is not null;
再看個 timewindow join 的:
-- time-windowd join ---sourceTable -- 訂單表 CREATE TABLE t_order( order_id VARCHAR, -- 訂單 id product_id VARCHAR, -- 產品 id create_time VARCHAR, -- 訂單時間 order_proctime as PROCTIME() ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'order', 'connector.startup-mode' = 'latest-offset', 'connector.properties.zookeeper.connect' = 'venn:2181', 'connector.properties.bootstrap.servers' = 'venn:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ); ---sourceTable --產品表 CREATE TABLE t_product ( product_id VARCHAR, -- 產品 id price DECIMAL(38,18), -- 價格 create_time VARCHAR, -- 訂單時間 product_proctime as PROCTIME() ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'shipments', 'connector.startup-mode' = 'latest-offset', 'connector.properties.zookeeper.connect' = 'venn:2181', 'connector.properties.bootstrap.servers' = 'venn:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ); ---sinkTable --訂單表 關聯 產品表 成訂購表 CREATE TABLE order_detail ( order_id VARCHAR, producer_id VARCHAR , price DECIMAL(38,18), order_create_time VARCHAR, product_create_time VARCHAR ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'order_detail', 'connector.startup-mode' = 'latest-offset', 'connector.properties.zookeeper.connect' = 'venn:2181', 'connector.properties.bootstrap.servers' = 'venn:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ); ---order_sink INSERT INTO order_detail(order_id, product_id, price, create_time) SELECT a.order_id, a.product_id, b.price, a.create_time, b.create_time FROM t_order a INNER JOIN t_product b ON a.product_id = b.product_id and a.order_proctime BETWEEN b.product_proctime - INTERVAL '10' MINUTE AND b.product_proctime + INTERVAL '10' MINUTE where a.order_id is not null;
兩個樣例基本上一樣的,只是 time window join 加了個處理時間字段,用於在 join 的時候指定時間范圍。
兩個 join 的輸入數據都是這樣的:
send topic : order,message : {"create_time":"2020-04-27 13:19:23","product_id":"bh001","order_id":"2"} send topic : shipments,message : {"create_time":"2020-04-27 13:19:25","price":"3.5","product_id":"bh001"} send topic : order,message : {"create_time":"2020-04-27 13:19:27","product_id":"bh001","order_id":"3"} send topic : shipments,message : {"create_time":"2020-04-27 13:19:28","price":"3.5","product_id":"bh001"} send topic : order,message : {"create_time":"2020-04-27 13:19:29","product_id":"bh001","order_id":"4"} send topic : shipments,message : {"create_time":"2020-04-27 13:19:30","price":"3.5","product_id":"bh001"} send topic : order,message : {"create_time":"2020-04-27 13:19:31","product_id":"bh001","order_id":"5"} send topic : shipments,message : {"create_time":"2020-04-27 13:19:32","price":"3.5","product_id":"bh001"} send topic : order,message : {"create_time":"2020-04-27 13:19:33","product_id":"bh001","order_id":"6"} send topic : shipments,message : {"create_time":"2020-04-27 13:19:34","price":"3.5","product_id":"bh001"}
輸出數據也一樣(只是數據關聯的時間范圍不一樣,一個是全局,一個是指定時間內):
{"order_id":"19","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:21:18","product_create_time":"2020-04-27 13:23:59"} {"order_id":"91","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:23:42","product_create_time":"2020-04-27 13:23:59"} {"order_id":"59","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:22:38","product_create_time":"2020-04-27 13:23:59"} {"order_id":"49","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:22:18","product_create_time":"2020-04-27 13:23:59"} {"order_id":"4","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:20:48","product_create_time":"2020-04-27 13:23:59"} {"order_id":"81","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:23:22","product_create_time":"2020-04-27 13:23:59"} {"order_id":"17","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:21:14","product_create_time":"2020-04-27 13:23:59"} {"order_id":"71","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:23:02","product_create_time":"2020-04-27 13:23:59"} {"order_id":"14","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:21:08","product_create_time":"2020-04-27 13:23:59"} {"order_id":"90","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:23:40","product_create_time":"2020-04-27 13:23:59"} {"order_id":"43","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:22:06","product_create_time":"2020-04-27 13:23:59"}
從兩種 join 看起來, regular join 更像是SQL中的join,將所有數據緩存下來,在每次觸發join 的時候,都將另一邊的全部數據拿來 關聯,對語義的支持沒問題后,需要討論的就是支持的數據量了。
在我實測的過程中,flink 保持默認配置:
JVM Parameters: -Xmx511mb -Xms511mb -XX:MaxDirectMemorySize=256mb -XX:MaxMetaspaceSize=96mb TaskManager Dynamic Configs: taskmanager.memory.framework.off-heap.size=128mb taskmanager.memory.network.max=128mb taskmanager.memory.network.min=128mb taskmanager.memory.framework.heap.size=128mb taskmanager.memory.managed.size=512mb taskmanager.cpu.cores=8.0 taskmanager.memory.task.heap.size=383mb taskmanager.memory.task.off-heap.size=0b
在兩邊數據都很小的時候,只能保持兩邊數據在30W 以內,從程序啟動,可以看到 heap 內存線性的上漲,直到 拋出 OutOfMemoryError,掛掉。time window join 如果是加大 tps 表現基本一樣。
有興趣的可以看下源碼: org.apache.flink.table.runtime.operators.join.TimeBoundedStreamJoin
從代碼上看,每條數據數據,都會去遍歷另一邊的數據,兩種join 的不同在 timewindow join 會卡時間(並且會刪除已經超過時間范圍內的數據),而不是像 stream api 一樣,可以 keyby,將需要遍歷的數據范圍縮小。比如 兩邊都是 50 W 數據,那任意一邊來一條數據,都會去遍歷 50W次,不 OOM 就怪了。加大 heap 內存當然是有效的,但是只能增加不多的數據量。
我們開始在選擇的時候,是選了 Regular join 的,后來實測效果一般, 靠 TTL 清理還不太適用,經過社區大佬指點后,換成了 time window join,在保持 時間范圍不大的情況下,還是夠我們業務使用的。目前已經在測試環境跑了 3 天,數據量在 500W(一邊),heap 使用保持在 80% 左右,看起來還是很穩定。數據量不大,tps 每秒 50 (每邊),目前看起來是夠我們業務使用的,如果tps 更大的話,join 還是選擇 使用 stream api,兩邊狀態存 mapstate 的情況下,tps 可以達到很高的量(已測到 每秒1W tps)
午休的時候,水一篇,等五一再好好寫兩篇雙流 join 和 join 任務調 heap的之后的表現
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文