Flink SQL 雙流 join demo


官網持續查詢中的join :  https://site.douban.com/108880/widget/notes/4611554/note/249729366/

Flink 官網上介紹的雙流join,有兩種:Regular Joins 和 Time-windowed Joins

以下內容翻譯自官網:

Regular Joins

常規 join 是最通用的 join 類型,其中任何新記錄或對 join 輸入兩側的任何更改都是可見的,並且會影響整個聯接結果。 例如,如果左側有一個新記錄,則它將與右側的所有以前和將來的記錄合並在一起。

SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id

這些語義允許進行任何類型的更新(插入,更新,刪除)輸入到表。

但是,此操作有一個重要的含義:它需要將 join 輸入的兩端始終保持在Flink的狀態。 因此,如果一個或兩個輸入表持續增長,資源使用也將無限期增長。

Time-windowed Joins

 時間窗口 join 由 join 謂詞定義,該 join 謂詞檢查輸入記錄的時間屬性是否在某些時間限制(即時間窗口)內。

SELECT *
FROM
  Orders o,
  Shipments s
WHERE o.id = s.orderId AND
      o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

與常規 join 操作相比,這種 join 僅支持具有時間屬性的append-only tables 。 由於時間屬性是准單調遞增的,因此Flink可以從其狀態中刪除舊值,而不會影響結果的正確性。

注:在升到 Flink 1.10 后,我們開始考慮更多的使用SQL了

先看下 SQL提交程序:

// create blink table enviroment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val settings = EnvironmentSettings
  .newInstance
  .useBlinkPlanner
  .inStreamingMode
  .build

val tableEnv = StreamTableEnvironment.create(env, settings)

// get sql,讀取sql 文件,按 ; 切成不同的段
val sqlList = SqlFileReadUtil.sqlFileReadUtil(sqlName)

// change special sql name to loop
for (sql <- sqlList) {
  logger.info("sql : {}", sql)
  tableEnv.sqlUpdate(sql)
}

tableEnv.execute(sqlName)

我的SQL提交程序,基本上就是 抄的 雲邪大佬之前的博客,之后一點點改動,地址如下:http://wuchong.me/blog/2019/09/02/flink-sql-1-9-read-from-kafka-write-into-mysql/

看下常規 join 的sql 文件:

-- Regular Joins like Global Join
---sourceTable
-- 訂單表
CREATE TABLE t_order(
    order_id VARCHAR,         -- 訂單 id
    product_id VARCHAR,       -- 產品 id
    create_time VARCHAR -- 訂單時間
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'order',
    'connector.startup-mode' = 'earliest-offset',
    'connector.properties.0.key' = 'zookeeper.connect',
    'connector.properties.0.value' = 'venn:2181',
    'connector.properties.1.key' = 'bootstrap.servers',
    'connector.properties.1.value' = 'venn:9092',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.derive-schema' = 'true'
);
---sourceTable
--產品表
CREATE TABLE t_product (
    product_id VARCHAR,     -- 產品 id
    price DECIMAL(38,18),          -- 價格
    create_time VARCHAR -- 訂單時間
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'shipments',
    'connector.startup-mode' = 'latest-offset',
    'connector.properties.zookeeper.connect' = 'venn:2181',
    'connector.properties.bootstrap.servers' = 'venn:9092',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.derive-schema' = 'true'
);

---sinkTable
--訂單表 關聯 產品表 成訂購表
CREATE TABLE order_detail (
    order_id VARCHAR,
    producer_id VARCHAR ,
    price DECIMAL(38,18),
    order_create_time VARCHAR,
    product_create_time VARCHAR
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'order_detail',
    'connector.startup-mode' = 'latest-offset',
    'connector.properties.zookeeper.connect' = 'venn:2181',
    'connector.properties.bootstrap.servers' = 'venn:9092',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.derive-schema' = 'true'
);

---order_sink
INSERT INTO order_detail(order_id, product_id, price, create_time)
SELECT a.order_id, a.product_id, b.price, a.create_time, b.create_time
FROM t_order a
  INNER JOIN t_product b ON a.product_id = b.product_id
where a.order_id is not null;

再看個 timewindow join 的:

-- time-windowd join
---sourceTable
-- 訂單表
CREATE TABLE t_order(
    order_id VARCHAR,         -- 訂單 id
    product_id VARCHAR,       -- 產品 id
    create_time VARCHAR, -- 訂單時間
    order_proctime as PROCTIME()
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'order',
    'connector.startup-mode' = 'latest-offset',
    'connector.properties.zookeeper.connect' = 'venn:2181',
    'connector.properties.bootstrap.servers' = 'venn:9092',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.derive-schema' = 'true'
);
---sourceTable
--產品表
CREATE TABLE t_product (
    product_id VARCHAR,     -- 產品 id
    price DECIMAL(38,18),          -- 價格
    create_time VARCHAR, -- 訂單時間
    product_proctime as PROCTIME()
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'shipments',
    'connector.startup-mode' = 'latest-offset',
    'connector.properties.zookeeper.connect' = 'venn:2181',
    'connector.properties.bootstrap.servers' = 'venn:9092',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.derive-schema' = 'true'
);

---sinkTable
--訂單表 關聯 產品表 成訂購表
CREATE TABLE order_detail (
    order_id VARCHAR,
    producer_id VARCHAR ,
    price DECIMAL(38,18),
    order_create_time VARCHAR,
    product_create_time VARCHAR
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'order_detail',
    'connector.startup-mode' = 'latest-offset',
    'connector.properties.zookeeper.connect' = 'venn:2181',
    'connector.properties.bootstrap.servers' = 'venn:9092',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.derive-schema' = 'true'
);

---order_sink
INSERT INTO order_detail(order_id, product_id, price, create_time)
SELECT a.order_id, a.product_id, b.price, a.create_time, b.create_time
FROM t_order a
  INNER JOIN t_product b ON a.product_id = b.product_id and a.order_proctime BETWEEN b.product_proctime - INTERVAL '10' MINUTE AND b.product_proctime + INTERVAL '10' MINUTE
where a.order_id is not null;

兩個樣例基本上一樣的,只是 time window join 加了個處理時間字段,用於在 join 的時候指定時間范圍。

兩個 join 的輸入數據都是這樣的:

send topic : order,message : {"create_time":"2020-04-27 13:19:23","product_id":"bh001","order_id":"2"}
send topic : shipments,message : {"create_time":"2020-04-27 13:19:25","price":"3.5","product_id":"bh001"}
send topic : order,message : {"create_time":"2020-04-27 13:19:27","product_id":"bh001","order_id":"3"}
send topic : shipments,message : {"create_time":"2020-04-27 13:19:28","price":"3.5","product_id":"bh001"}
send topic : order,message : {"create_time":"2020-04-27 13:19:29","product_id":"bh001","order_id":"4"}
send topic : shipments,message : {"create_time":"2020-04-27 13:19:30","price":"3.5","product_id":"bh001"}
send topic : order,message : {"create_time":"2020-04-27 13:19:31","product_id":"bh001","order_id":"5"}
send topic : shipments,message : {"create_time":"2020-04-27 13:19:32","price":"3.5","product_id":"bh001"}
send topic : order,message : {"create_time":"2020-04-27 13:19:33","product_id":"bh001","order_id":"6"}
send topic : shipments,message : {"create_time":"2020-04-27 13:19:34","price":"3.5","product_id":"bh001"}

輸出數據也一樣(只是數據關聯的時間范圍不一樣,一個是全局,一個是指定時間內):

{"order_id":"19","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:21:18","product_create_time":"2020-04-27 13:23:59"}
{"order_id":"91","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:23:42","product_create_time":"2020-04-27 13:23:59"}
{"order_id":"59","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:22:38","product_create_time":"2020-04-27 13:23:59"}
{"order_id":"49","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:22:18","product_create_time":"2020-04-27 13:23:59"}
{"order_id":"4","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:20:48","product_create_time":"2020-04-27 13:23:59"}
{"order_id":"81","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:23:22","product_create_time":"2020-04-27 13:23:59"}
{"order_id":"17","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:21:14","product_create_time":"2020-04-27 13:23:59"}
{"order_id":"71","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:23:02","product_create_time":"2020-04-27 13:23:59"}
{"order_id":"14","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:21:08","product_create_time":"2020-04-27 13:23:59"}
{"order_id":"90","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:23:40","product_create_time":"2020-04-27 13:23:59"}
{"order_id":"43","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:22:06","product_create_time":"2020-04-27 13:23:59"}

從兩種 join 看起來, regular join 更像是SQL中的join,將所有數據緩存下來,在每次觸發join 的時候,都將另一邊的全部數據拿來 關聯,對語義的支持沒問題后,需要討論的就是支持的數據量了。

在我實測的過程中,flink 保持默認配置:

JVM Parameters:
    -Xmx511mb
    -Xms511mb
    -XX:MaxDirectMemorySize=256mb
    -XX:MaxMetaspaceSize=96mb

TaskManager Dynamic Configs:
    taskmanager.memory.framework.off-heap.size=128mb
    taskmanager.memory.network.max=128mb
    taskmanager.memory.network.min=128mb
    taskmanager.memory.framework.heap.size=128mb
    taskmanager.memory.managed.size=512mb
    taskmanager.cpu.cores=8.0
    taskmanager.memory.task.heap.size=383mb
    taskmanager.memory.task.off-heap.size=0b

在兩邊數據都很小的時候,只能保持兩邊數據在30W 以內,從程序啟動,可以看到 heap 內存線性的上漲,直到 拋出 OutOfMemoryError,掛掉。time window join 如果是加大 tps 表現基本一樣。

有興趣的可以看下源碼:  org.apache.flink.table.runtime.operators.join.TimeBoundedStreamJoin 

從代碼上看,每條數據數據,都會去遍歷另一邊的數據,兩種join 的不同在 timewindow join 會卡時間(並且會刪除已經超過時間范圍內的數據),而不是像 stream api 一樣,可以 keyby,將需要遍歷的數據范圍縮小。比如 兩邊都是 50 W 數據,那任意一邊來一條數據,都會去遍歷 50W次,不 OOM 就怪了。加大 heap 內存當然是有效的,但是只能增加不多的數據量。

我們開始在選擇的時候,是選了 Regular join 的,后來實測效果一般, 靠 TTL 清理還不太適用,經過社區大佬指點后,換成了 time window join,在保持 時間范圍不大的情況下,還是夠我們業務使用的。目前已經在測試環境跑了 3 天,數據量在 500W(一邊),heap 使用保持在  80% 左右,看起來還是很穩定。數據量不大,tps 每秒 50 (每邊),目前看起來是夠我們業務使用的,如果tps 更大的話,join 還是選擇 使用 stream api,兩邊狀態存 mapstate 的情況下,tps 可以達到很高的量(已測到 每秒1W tps)

午休的時候,水一篇,等五一再好好寫兩篇雙流 join 和 join 任務調 heap的之后的表現

 

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM