【翻譯】Flink Table Api & SQL —Streaming 概念 ——在持續查詢中 Join


本文翻譯自官網 :  Joins in Continuous Queries   https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html

Flink Table Api & SQL 翻譯目錄

Join 是批量數據處理中連接兩個關系行的常見且易於理解的操作。但是,動態表上的 join 語義不那么明顯,甚至令人困惑。

因此,有一些方法可以使 Table API或SQL實際執行 join 。

有關語法的更多信息,請檢查Table APISQL中的 join 部分

Regular Joins 

常規 join 是最通用的 join 類型,在該 join 中,任何新記錄的更改對 join 輸入兩側都是可見的,並且會影響整個 join 結果。例如,如果左側有一個新記錄,則它將與右側的所有以前和將來的記錄合並在一起。

SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id

這些語義允許進行任何類型的更新(插入,更新,刪除)的輸入表。

但是,此操作有一個重要的含義:它要求將 join 輸入的兩端始終保持在Flink的狀態。因此,如果一個或兩個輸入表持續增長,資源使用也將無限期增長。

Time-windowed Joins

時間窗口 join 由 join 謂詞定義,該 join 謂詞檢查輸入記錄時間屬性是否在某些時間限制(即時間窗口)內。

SELECT *
FROM
  Orders o,
  Shipments s
WHERE o.id = s.orderId AND
      o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

與常規 join 操作相比,這種 join 僅支持具有時間屬性的 append-only 表。由於時間屬性是准單調遞增的,因此Flink可以從其狀態中刪除舊值,而不會影響結果的正確性。

Join with a Temporal Table Function

具有時態表函數 的 join 將 append-only 表(左側輸入/探針側,注:輸入流)與臨時表(右側輸入/構建側,注:維表)join,即隨時間變化並跟蹤其變化的表(維表)。請查看相應的頁面以獲取有關時態表的更多信息

以下示例顯示了 append-only 表 Orders,該與不斷變化的貨幣匯率表 RatesHistory 結合在一起 

Orders是一個 append-only 表,代表給定 amount 和給定貨幣(currency)的付款例如,在10:15 有一筆金額為 2 歐元的訂單

SELECT * FROM Orders;

rowtime amount currency
======= ====== =========
10:15        2 Euro
10:30        1 US Dollar
10:32       50 Yen
10:52        3 Euro
11:04        5 US Dollar

RatesHistory表示日元匯率(匯率為1)不斷變化的 append-only 表。例如,從 09:00 到 10:45 歐元對日元的匯率為 114。從10:45到11:15為116。 

SELECT * FROM RatesHistory;

rowtime currency   rate
======= ======== ======
09:00   US Dollar   102
09:00   Euro        114
09:00   Yen           1
10:45   Euro        116
11:15   Euro        119
11:49   Pounds      108

假設我們要計算所有訂單轉換為通用貨幣(日元)的金額。

例如,我們要使用給定 rowtime(114)的適當轉換率轉換以下訂單。

rowtime amount currency
======= ====== =========
10:15        2 Euro

如果不使用時態表的概念,則需要編寫如下查詢: 

SELECT
  SUM(o.amount * r.rate) AS amount
FROM Orders AS o,
  RatesHistory AS r
WHERE r.currency = o.currency
AND r.rowtime = (
  SELECT MAX(rowtime)
  FROM RatesHistory AS r2
  WHERE r2.currency = o.currency
  AND r2.rowtime <= o.rowtime);

借助時態表函數 使 RatesHistory 的匯率變化,我們可以在SQL中將查詢表示為: 

SELECT
  o.amount * r.rate AS amount
FROM
  Orders AS o,
  LATERAL TABLE (Rates(o.rowtime)) AS r
WHERE r.currency = o.currency

探針端記錄的相關時間屬性時,來自探針端的每個記錄將與構建端表的版本關聯。為了支持生成側表上先前值的更新(覆蓋),該表必須定義一個主鍵。 

在我們的示例中,from Orders 中的每個記錄都將與Rates 的 o.rowtime 時間版本 結合在一起該 currency 字段已 提前定義為 Rates 的主鍵,並且在我們的示例中用於連接兩個表。如果查詢使用的是處理時間概念,則執行操作時,新添加的訂單將始終與 Rates 的最新版本結合在一起 

常規 join 相反,這意味着如果在構建端有新記錄,則不會影響 join 的先前結果。這又使Flink可以限制必須保持狀態的元素數量。 

時間窗口連接相比,時態表 join 未定義時間范圍,(所有)關聯的數據將被 join。探測端的記錄總是在 time 屬性指定的時間與構建端的版本連接在一起。因此,構建端(時態表)的記錄可能是任意舊的。隨着時間的流逝,該記錄的先前版本和不再需要的版本(對於給定的主鍵)將從狀態中刪除。

這種行為使臨時表成為一個很好的候選者,可以用關系術語來表示流的 join。

用法

定義時態表函數 后,我們就可以開始使用它。可以使用與普通表函數相同的方式來使用時態表函數。

以下代碼段解決了我們從Orders中轉換貨幣的初衷問題

## SQL
SELECT
  SUM(o_amount * r_rate) AS amount
FROM
  Orders,
  LATERAL TABLE (Rates(o_proctime))
WHERE
  r_currency = o_currency

## Java
Table result = orders
    .joinLateral("rates(o_proctime)", "o_currency = r_currency")
    .select("(o_amount * r_rate).sum as amount");

## Scala
val result = orders
    .joinLateral(rates('o_proctime), 'r_currency === 'o_currency)
    .select(('o_amount * 'r_rate).sum as 'amount)

注意對於時態表 join ,尚未實現查詢配置中定義的狀態保留這意味着,計算查詢結果所需的狀態可能會無限增長,具體取決於歷史記錄表的不同主鍵數量。

Processing-time Temporal Joins 

基於處理時間的時間屬性,是不可能通過 過去的 時間的屬性作為參數的時間表函數(注:處理時間只會增長)。根據定義,它始終是當前時間戳。因此,處理時間時態表函數的調用將始終返回基礎表的最新已知版本,基礎歷史表中的任何更新也將立即覆蓋當前值。

僅將構建側記錄的最新版本(相對於定義的主鍵)保持在該狀態。構建端的更新將不會影響先前發出的 join 結果。

可以將處理時間的 時態表 視為一種簡單的 HashMap<K, V>,它可以存儲構建側的所有記錄。當來自構建端的新記錄與先前的記錄具有相同的鍵時,舊值將被簡單地覆蓋。總是根據HashMap的最新/當前狀態評估來自探測器端的每個記錄(注:與輸入數據 join) 

Event-time Temporal Joins

使用事件時間 的時間屬性(即行時間屬性),可以將過去的時間屬性傳遞給時態表函數。這允許在公共時間點將兩個表(時態表的兩個時間狀態)連接在一起。

與處理時間 時態表 join 相比,時態表不僅將構建側記錄的最新版本(相對於定義的主鍵)保持在狀態中,而且還存儲自上次水印以來的所有版本(按時間標識)。

例如,根據時態表的概念,將附加到探針側表的事件時間 時間戳為12:30:00的輸入行與構建側表 在時間12:30:00的版本 進行連接。因此,傳入行僅與時間戳小於或等於12:30:00的行連接,並根據主鍵應用更新直到該時間點。

根據事件時間的定義,水印允許 join 操作及時向前移動,並丟棄不再需要的構建表版本,因為不會輸入具有更低或相等時間戳的行。 

與時態表 Join 

與時態表的 join 將任意表(左側輸入/探針側)與時態表(右側輸入/構建側)join,即隨時間變化的外部數據表。請查看相應的頁面以獲取有關時態表的更多信息

注意:用戶不能將任意表用作時態表,需要使用 LookupableTableSource 支持的表。LookupableTableSource 只能作為時間表用於時間聯接有關如何定義LookupableTableSource的更多詳細信息,請參見頁面

下面的示例顯示了一個Orders流,該流與不斷變化的貨幣匯率表 LatestRates 結合在一起。

LatestRates是物化最新匯率的維度表。 在時間10:15、10:30、10:52,LatestRates 的內容如下:

10:15> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        114
Yen           1

10:30> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        114
Yen           1


10:52> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        116     <==== changed from 114 to 116
Yen           1

時間10:15和10:30的LastestRates的內容相等。 歐元匯率在10:52從114更改為116。

Orders 是一個 append-only 表,代表給定金額和給定貨幣的付款。 例如,在10:15時有一筆2歐元的訂單。

SELECT * FROM Orders;

amount currency
====== =========
     2 Euro             <== arrived at time 10:15
     1 US Dollar        <== arrived at time 10:30
     2 Euro             <== arrived at time 10:52

假設我們要計算所有Orders折算成通用貨幣(日元的金額 

例如,我們想使用LatestRates中的最新匯率轉換以下訂單。 結果將是:

amount currency     rate   amout*rate
====== ========= ======= ============
     2 Euro          114          228    <== arrived at time 10:15
     1 US Dollar     102          102    <== arrived at time 10:30
     2 Euro          116          232    <== arrived at time 10:52

借助時態表聯接,我們可以在SQL中將查詢表示為: 

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency

探針端的每個記錄都將與構建端表的當前版本關聯。 在我們的示例中,查詢使用的是處理時間概念,因此在執行操作時,新附加的訂單將始終與最新版本的LatestRates結合在一起。 注意,結果對於處理時間不是確定的。

常規 join 相反,盡管在構建方面進行了更改(注:數據修改了),但 時態表 Join 的先前結果將不會受到影響。而且,時態表 join 運算符非常輕,並且不保留任何狀態。

時間窗口 join 相比,時態表 join 沒有定義將要在其中 join 記錄的時間窗口。 在處理時,探測端的記錄總是與構建端的最新版本結合在一起。 因此,構建方面的記錄可能是任意舊的

態表函數 join  和 時態表 join 都來自相同的初衷,但是具有不同的SQL語法和運行時實現:

  • 時態表函數 join 的SQL語法是 join UDTF,而時態表聯接使用 SQL:2011 中引入的標准時態表語法。
  • 時態表函數 join 的實現實際上 join 了兩個流並使它們保持狀態,而時態表 join 僅接收唯一的輸入流並根據記錄中的鍵查找外部數據庫。
  • 時態表函數 join 通常用於 join 變更日志流,而臨時表 join 通常用於 join 外部表(即維表)。

這種行為使時態表成為一個很好的候選者,可以用關系術語來表示流的 join。

將來,時態表聯接將支持時態表功能 join 的功能,即支持時態 join 變更日志流。

用法

臨時表 join 的語法如下: 

SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>]
ON table1.column-name1 = table2.column-name1

當前,僅支持 INNER JOIN 和 LEFT JOIN。 臨時表之后應遵循 FOR SYSTEM_TIME AS OF table1.proctime .proctime是table1的處理時間屬性。 這意味着在連接左表中的每個記錄時,它會在處理時為時態表做快照。

例如,在定義時態表之后,我們可以如下使用它。

SELECT
  SUM(o_amount * r_rate) AS amount
FROM
  Orders
  JOIN LatestRates FOR SYSTEM_TIME AS OF o_proctime
  ON r_currency = o_currency

注意:僅在 Blink planner 中支持。

注意:僅在SQL中支持,而在 Table API 中尚不支持。

注意 :Flink當前不支持事件時間時態表 join。

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM