本文翻譯自官網 : Joins in Continuous Queries https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html
Join 是批量數據處理中連接兩個關系行的常見且易於理解的操作。但是,動態表上的 join 語義不那么明顯,甚至令人困惑。
因此,有一些方法可以使 Table API或SQL實際執行 join 。
有關語法的更多信息,請檢查Table API和SQL中的 join 部分。
- 常規 join (Regular Joins)
- 時間窗口 join
- 與時態表函數 join
- 時態表 join
Regular Joins
常規 join 是最通用的 join 類型,在該 join 中,任何新記錄的更改對 join 輸入兩側都是可見的,並且會影響整個 join 結果。例如,如果左側有一個新記錄,則它將與右側的所有以前和將來的記錄合並在一起。
SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id
這些語義允許進行任何類型的更新(插入,更新,刪除)的輸入表。
但是,此操作有一個重要的含義:它要求將 join 輸入的兩端始終保持在Flink的狀態。因此,如果一個或兩個輸入表持續增長,資源使用也將無限期增長。
Time-windowed Joins
時間窗口 join 由 join 謂詞定義,該 join 謂詞檢查輸入記錄的時間屬性是否在某些時間限制(即時間窗口)內。
SELECT * FROM Orders o, Shipments s WHERE o.id = s.orderId AND o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
與常規 join 操作相比,這種 join 僅支持具有時間屬性的 append-only 表。由於時間屬性是准單調遞增的,因此Flink可以從其狀態中刪除舊值,而不會影響結果的正確性。
Join with a Temporal Table Function
具有時態表函數 的 join 將 append-only 表(左側輸入/探針側,注:輸入流)與臨時表(右側輸入/構建側,注:維表)join,即隨時間變化並跟蹤其變化的表(維表)。請查看相應的頁面以獲取有關時態表的更多信息。
以下示例顯示了 append-only 表 Orders
,該表與不斷變化的貨幣匯率表 RatesHistory 結合在一起 。
Orders
是一個 append-only 表,代表給定 amount
和給定貨幣(currency)的付款。例如,在10:15
有一筆金額為 2 歐元的訂單。
SELECT * FROM Orders; rowtime amount currency ======= ====== ========= 10:15 2 Euro 10:30 1 US Dollar 10:32 50 Yen 10:52 3 Euro 11:04 5 US Dollar
RatesHistory表示日元匯率(匯率為1)不斷變化的 append-only 表。例如,從 09:00 到 10:45 歐元對日元的匯率為 114。從10:45到11:15為116。
SELECT * FROM RatesHistory;
rowtime currency rate
======= ======== ======
09:00 US Dollar 102
09:00 Euro 114
09:00 Yen 1
10:45 Euro 116
11:15 Euro 119
11:49 Pounds 108
假設我們要計算所有訂單轉換為通用貨幣(日元)的金額。
例如,我們要使用給定 rowtime(114)的適當轉換率轉換以下訂單。
rowtime amount currency
======= ====== =========
10:15 2 Euro
如果不使用時態表的概念,則需要編寫如下查詢:
SELECT SUM(o.amount * r.rate) AS amount FROM Orders AS o, RatesHistory AS r WHERE r.currency = o.currency AND r.rowtime = ( SELECT MAX(rowtime) FROM RatesHistory AS r2 WHERE r2.currency = o.currency AND r2.rowtime <= o.rowtime);
借助時態表函數 使 RatesHistory 的匯率變化,我們可以在SQL中將查詢表示為:
SELECT o.amount * r.rate AS amount FROM Orders AS o, LATERAL TABLE (Rates(o.rowtime)) AS r WHERE r.currency = o.currency
探針端記錄的相關時間屬性時,來自探針端的每個記錄將與構建端表的版本關聯。為了支持生成側表上先前值的更新(覆蓋),該表必須定義一個主鍵。
在我們的示例中,from Orders 中的每個記錄都將與Rates 的 o.rowtime
時間版本 結合在一起。該 currency
字段已 提前定義為 Rates
的主鍵,並且在我們的示例中用於連接兩個表。如果查詢使用的是處理時間概念,則執行操作時,新添加的訂單將始終與 Rates 的最新版本結合在一起。
與常規 join 相反,這意味着如果在構建端有新記錄,則不會影響 join 的先前結果。這又使Flink可以限制必須保持狀態的元素數量。
與時間窗口連接相比,時態表 join 未定義時間范圍,(所有)關聯的數據將被 join。探測端的記錄總是在 time 屬性指定的時間與構建端的版本連接在一起。因此,構建端(時態表)的記錄可能是任意舊的。隨着時間的流逝,該記錄的先前版本和不再需要的版本(對於給定的主鍵)將從狀態中刪除。
這種行為使臨時表成為一個很好的候選者,可以用關系術語來表示流的 join。
用法
定義時態表函數 后,我們就可以開始使用它。可以使用與普通表函數相同的方式來使用時態表函數。
以下代碼段解決了我們從Orders
表中轉換貨幣的初衷問題:
## SQL SELECT SUM(o_amount * r_rate) AS amount FROM Orders, LATERAL TABLE (Rates(o_proctime)) WHERE r_currency = o_currency ## Java Table result = orders .joinLateral("rates(o_proctime)", "o_currency = r_currency") .select("(o_amount * r_rate).sum as amount"); ## Scala val result = orders .joinLateral(rates('o_proctime), 'r_currency === 'o_currency) .select(('o_amount * 'r_rate).sum as 'amount)
注意:對於時態表 join ,尚未實現在查詢配置中定義的狀態保留。這意味着,計算查詢結果所需的狀態可能會無限增長,具體取決於歷史記錄表的不同主鍵數量。
Processing-time Temporal Joins
基於處理時間的時間屬性,是不可能通過 過去的 時間的屬性作為參數的時間表函數(注:處理時間只會增長)。根據定義,它始終是當前時間戳。因此,處理時間時態表函數的調用將始終返回基礎表的最新已知版本,基礎歷史表中的任何更新也將立即覆蓋當前值。
僅將構建側記錄的最新版本(相對於定義的主鍵)保持在該狀態。構建端的更新將不會影響先前發出的 join 結果。
可以將處理時間的 時態表 視為一種簡單的 HashMap<K, V>
,它可以存儲構建側的所有記錄。當來自構建端的新記錄與先前的記錄具有相同的鍵時,舊值將被簡單地覆蓋。總是根據HashMap的最新/當前狀態評估來自探測器端的每個記錄(注:與輸入數據 join)。
Event-time Temporal Joins
使用事件時間 的時間屬性(即行時間屬性),可以將過去的時間屬性傳遞給時態表函數。這允許在公共時間點將兩個表(時態表的兩個時間狀態)連接在一起。
與處理時間 時態表 join 相比,時態表不僅將構建側記錄的最新版本(相對於定義的主鍵)保持在狀態中,而且還存儲自上次水印以來的所有版本(按時間標識)。
例如,根據時態表的概念,將附加到探針側表的事件時間 時間戳為12:30:00的輸入行與構建側表 在時間12:30:00的版本 進行連接。因此,傳入行僅與時間戳小於或等於12:30:00的行連接,並根據主鍵應用更新直到該時間點。
根據事件時間的定義,水印允許 join 操作及時向前移動,並丟棄不再需要的構建表版本,因為不會輸入具有更低或相等時間戳的行。
與時態表 Join
與時態表的 join 將任意表(左側輸入/探針側)與時態表(右側輸入/構建側)join,即隨時間變化的外部數據表。請查看相應的頁面以獲取有關時態表的更多信息。
注意:用戶不能將任意表用作時態表,需要使用 LookupableTableSource 支持的表。LookupableTableSource 只能作為時間表用於時間聯接。有關如何定義LookupableTableSource的更多詳細信息,請參見頁面。
下面的示例顯示了一個Orders流,該流與不斷變化的貨幣匯率表 LatestRates 結合在一起。
LatestRates是物化最新匯率的維度表。 在時間10:15、10:30、10:52,LatestRates 的內容如下:
10:15> SELECT * FROM LatestRates; currency rate ======== ====== US Dollar 102 Euro 114 Yen 1 10:30> SELECT * FROM LatestRates; currency rate ======== ====== US Dollar 102 Euro 114 Yen 1 10:52> SELECT * FROM LatestRates; currency rate ======== ====== US Dollar 102 Euro 116 <==== changed from 114 to 116 Yen 1
時間10:15和10:30的LastestRates的內容相等。 歐元匯率在10:52從114更改為116。
Orders 是一個 append-only 表,代表給定金額和給定貨幣的付款。 例如,在10:15時有一筆2歐元的訂單。
SELECT * FROM Orders;
amount currency
====== =========
2 Euro <== arrived at time 10:15
1 US Dollar <== arrived at time 10:30
2 Euro <== arrived at time 10:52
假設我們要計算所有Orders
折算成通用貨幣(日元
)的金額。
例如,我們想使用LatestRates中的最新匯率轉換以下訂單。 結果將是:
amount currency rate amout*rate
====== ========= ======= ============
2 Euro 114 228 <== arrived at time 10:15
1 US Dollar 102 102 <== arrived at time 10:30
2 Euro 116 232 <== arrived at time 10:52
借助時態表聯接,我們可以在SQL中將查詢表示為:
SELECT o.amout, o.currency, r.rate, o.amount * r.rate FROM Orders AS o JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r ON r.currency = o.currency
探針端的每個記錄都將與構建端表的當前版本關聯。 在我們的示例中,查詢使用的是處理時間概念,因此在執行操作時,新附加的訂單將始終與最新版本的LatestRates結合在一起。 注意,結果對於處理時間不是確定的。
與常規 join 相反,盡管在構建方面進行了更改(注:數據修改了),但 時態表 Join 的先前結果將不會受到影響。而且,時態表 join 運算符非常輕,並且不保留任何狀態。
與時間窗口 join 相比,時態表 join 沒有定義將要在其中 join 記錄的時間窗口。 在處理時,探測端的記錄總是與構建端的最新版本結合在一起。 因此,構建方面的記錄可能是任意舊的。
時態表函數 join 和 時態表 join 都來自相同的初衷,但是具有不同的SQL語法和運行時實現:
- 時態表函數 join 的SQL語法是 join UDTF,而時態表聯接使用 SQL:2011 中引入的標准時態表語法。
- 時態表函數 join 的實現實際上 join 了兩個流並使它們保持狀態,而時態表 join 僅接收唯一的輸入流並根據記錄中的鍵查找外部數據庫。
- 時態表函數 join 通常用於 join 變更日志流,而臨時表 join 通常用於 join 外部表(即維表)。
這種行為使時態表成為一個很好的候選者,可以用關系術語來表示流的 join。
將來,時態表聯接將支持時態表功能 join 的功能,即支持時態 join 變更日志流。
用法
臨時表 join 的語法如下:
SELECT [column_list] FROM table1 [AS <alias1>] [LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>] ON table1.column-name1 = table2.column-name1
當前,僅支持 INNER JOIN 和 LEFT JOIN。 臨時表之后應遵循 FOR SYSTEM_TIME AS OF table1.proctime .proctime是table1的處理時間屬性。 這意味着在連接左表中的每個記錄時,它會在處理時為時態表做快照。
例如,在定義時態表之后,我們可以如下使用它。
SELECT SUM(o_amount * r_rate) AS amount FROM Orders JOIN LatestRates FOR SYSTEM_TIME AS OF o_proctime ON r_currency = o_currency
注意:僅在 Blink planner 中支持。
注意:僅在SQL中支持,而在 Table API 中尚不支持。
注意 :Flink當前不支持事件時間時態表 join。
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文