【翻譯】Flink Table Api & SQL —Streaming 概念 —— 時態表


本文翻譯自官網: Temporal Tables https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html

Flink Table Api & SQL 翻譯目錄

時態表(注:Temporal Table , 我翻譯為時態表,可以訪問表在不同時間的內容)表示一直在修改的表上的(參數化)視圖的概念,該視圖返回表在特定時間點的內容。

更改表可以是跟蹤表的修改歷史(例如,數據庫更改日志),也可以是維表的具體修改(例如,數據庫表)。

對於表的歷史修改,Flink可以跟蹤修改,並允許在查詢中訪問表的特定時間點的內容。 在Flink中,這種表由Temporal Table Function表示

對於變化的維表,Flink允許在查詢中的處理時訪問表的內容。在Flink中,這種表由Temporal Table 表示

設計初衷

與表的修改歷史相關

假設我們有下表 RatesHistory

SELECT * FROM RatesHistory;

rowtime currency   rate
======= ======== ======
09:00   US Dollar   102
09:00   Euro        114
09:00   Yen           1
10:45   Euro        116
11:15   Euro        119
11:49   Pounds      108

RatesHistory表示一個不斷增長的關於日元的貨幣匯率的附加表(匯率為1)。例如,匯率期間從 09:0010:45的歐元日元的匯率為 114從 10:45 到 11:15 是 116

假設我們要在10:58的時間輸出所有當前匯率,則需要以下SQL查詢來計算結果表:

SELECT *
FROM RatesHistory AS r
WHERE r.rowtime = (
  SELECT MAX(rowtime)
  FROM RatesHistory AS r2
  WHERE r2.currency = r.currency
  AND r2.rowtime <= TIME '10:58');

子查詢確定對應貨幣的最大時間小於或等於所需時間。外部查詢列出具有最大時間戳的匯率。 

下表顯示了這種計算的結果。 在我們的示例中,考慮了10:45 時歐元的更新,但是 10:58 時表的版本中未考慮 11:15 時歐元的更新和新的英鎊輸入。

rowtime currency   rate
======= ======== ======
09:00   US Dollar   102
09:00   Yen           1
10:45   Euro        116

時態表的概念旨在簡化此類查詢,加快其執行速度,並減少Flink的狀態使用率。時態表是 append-only 表上的參數化視圖,該視圖將 append-only 表的行解釋為表的變更日志,並在特定時間點提供該表的版本將 append-only 表解釋為變更日志需要指定主鍵屬性和時間戳屬性。主鍵確定覆蓋哪些行,時間戳確定行有效的時間。

在上面的示例中,currency RatesHistory的主鍵,並且rowtime是timestamp屬性。

在Flink中,這由時態表函數表示

與維表變化相關

另一方面,某些用例需要連接變化的維表,該表是外部數據庫表。

假設 LatestRates 是一個以最新匯率實現的表(例如,存儲在其中)。LatestRates 是物化的 RatesHistory 歷史那么時間 10:58 的 LatestRates 表的內容將是

10:58> SELECT * FROM LatestRates;
currency   rate
======== ======
US Dollar   102
Yen           1
Euro        116

12:00 時 LatestRates的內容將是: 

12:00> SELECT * FROM LatestRates;
currency   rate
======== ======
US Dollar   102
Yen           1
Euro        119
Pounds      108

在Flink中,這由Temporal Table表示

時態表函數

為了訪問時態表中的數據,必須傳遞一個時間屬性,該屬性確定將要返回的表的版本。Flink使用表函數的SQL語法提供一種表達它的方法。

定義后,時態表函數將使用單個時間參數timeAttribute並返回一組行。該集合包含相對於給定時間屬性的所有現有主鍵的行的最新版本。

假設我們Rates(timeAttribute)基於RatesHistory定義了一個時態表函數,我們可以通過以下方式查詢該函數: 

SELECT * FROM Rates('10:15');

rowtime currency   rate
======= ======== ======
09:00   US Dollar   102
09:00   Euro        114
09:00   Yen           1

SELECT * FROM Rates('11:00');

rowtime currency   rate
======= ======== ======
09:00   US Dollar   102
10:45   Euro        116
09:00   Yen           1

對Rates(timeAttribute)的每個查詢都將返回給定timeAttribute的Rates狀態。

注意:當前 Flink 不支持使用常量時間屬性參數直接查詢時態表函數。目前,時態表函數只能在 join 中使用。上面的示例用於提供有關函數 Rates(timeAttribute)返回內容的直觀信息 

另請參閱有關用於持續查詢的 join 的頁面,以獲取有關如何與時態表 join 的更多信息。 

定義時態表函數

以下代碼段說明了如何從 append-only 表中創建時態表函數。

// Get the stream and table environments.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)

// Provide a static data set of the rates history table.
val ratesHistoryData = new mutable.MutableList[(String, Long)]
ratesHistoryData.+=(("US Dollar", 102L))
ratesHistoryData.+=(("Euro", 114L))
ratesHistoryData.+=(("Yen", 1L))
ratesHistoryData.+=(("Euro", 116L))
ratesHistoryData.+=(("Euro", 119L))

// Create and register an example table using above data set.
// In the real setup, you should replace this with your own table.
val ratesHistory = env
  .fromCollection(ratesHistoryData)
  .toTable(tEnv, 'r_currency, 'r_rate, 'r_proctime.proctime)

tEnv.registerTable("RatesHistory", ratesHistory)

// Create and register TemporalTableFunction.
// Define "r_proctime" as the time attribute and "r_currency" as the primary key.
val rates = ratesHistory.createTemporalTableFunction('r_proctime, 'r_currency) // <==== (1)
tEnv.registerFunction("Rates", rates)                                          // <==== (2)

Line (1)創建了一個 時態表函數 rates ,使我們可以在 Table API 中使用 rates 函數  

Line (2) 在表環境中以Rates名稱注冊此函數,這使我們可以在SQL中使用Rates函數

時態表

注意:僅 Blink planner 支持此功能。

為了訪問時態表中的數據,當前必須使用LookupableTableSource定義一個TableSource。 Flink 使用FOR SYSTEM_TIME AS OF 的SQL語法查詢時態表,這在SQL:2011中提出。

假設我們定義了一個時態表 LatestRates,我們可以通過以下方式查詢此類表: 

SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '10:15';

currency   rate
======== ======
US Dollar   102
Euro        114
Yen           1

SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '11:00';

currency   rate
======== ======
US Dollar   102
Euro        116
Yen           1

注意:當前,Flink不支持以固定時間直接查詢時態表。目前,時態表只能在 join 中使用。上面的示例用於提供有關時態表LatestRates返回內容的直覺

另請參閱有關用於持續查詢的 join 的頁面,獲取有關如何與時態表 join 更多信息。

定義時態表

// Get the stream and table environments.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

// Create an HBaseTableSource as a temporal table which implements LookableTableSource
// In the real setup, you should replace this with your own table.
val rates = new HBaseTableSource(conf, "Rates")
rates.setRowKey("currency", String.class)   // currency as the primary key
rates.addColumn("fam1", "rate", Double.class)

// register the temporal table into environment, then we can query it in sql
tEnv.registerTableSource("Rates", rates)

另請參閱有關如何定義LookupableTableSource的頁面 

 

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM