Flink原理(七)——動態表(Dynamic tables)


前言

  本文是結合Flink官網,個人理解所得,若是有誤歡迎留言指出,謝謝!文中圖皆來自官網(鏈接[1])。

  本文將隨着下面這個問題展開,針對該問題更為生動的解釋可以參見金竹老師的分享(鏈接[2])。

  SQL適合流計算場景嗎?

  對於流計算,每一條數據的到來都會觸發一次查詢產生一個結果,並發射出去。我們發現對於相同的數據源,使用相同的SQL查詢時,批、流的結果是相同的,即在不同模式下,SQL的語意是一致的(One Query One Result),最終的結果是一致。

1、動態表與連續查詢(Dynamic Table&Continuous Query)

  和動態表對應的是靜態表——常規的數據庫中的表或批處理中的表等,其在查詢時數據不再變化。動態表是隨時間變化的,即使是在查詢的時候。怎么理解了?流上的數據是源源不斷的,一條數據的到來會觸發一次查詢,這次查詢在執行時還有下一條數據到來,對表本身數據是在變化的。

  對動態表的查詢是連續的,即連續查詢(Continuous Query)。實質上, 動態表上的連續查詢與定義物理視圖(Materialized View)的查詢很相似。物理視圖定義為SQL查詢,就像常規的虛擬視圖一樣,不同的是物理視圖會緩存查詢結果,這樣在訪問時不需要重新計算,而緩存帶來的挑戰是有可能提供過時的結果,Eager View Maintenance則是用於及時跟新物理視圖的技術,這里就不展開了。

  流、動態表、連續查詢三者的關系如下圖所示:

  用一句話概括是:流被轉換為動態表,對動態表的連續查詢生成新的動態表(結果表),然后結果表被轉換為流。

2、流上定義表

  2.1 定義表

  為了在流上使用關系型查詢,需要將流轉換成表。下面的分析過程均采用官網(Ref[2])中的例子進行說明。

  1)點擊事件流的schema如下:

1 [
2   user:  VARCHAR,   // the name of the user
3   cTime: TIMESTAMP, // the time when the URL was accessed
4   url:   VARCHAR    // the URL that was accessed by the user
5 ]    

  2)從概念上來說,流上的每天記錄都是動態表進行INSERT修改。從本質上講,是從一個INSERT-Only(僅插入)的ChangeLog流上構建一個表。點擊事件流上構建表如下圖所示,且隨着更多點擊流記錄的插入,生成的表不斷增長:

  Note:定義在流上的表在內部是沒有實現的。

  2.2 連續查詢

  連續查詢不會中止,會根據輸入表來更新結果表,下面介紹兩種查詢的例子。

  1)簡單的GROUP-BY Count聚合查詢

  下圖中,左邊是輸入表click,是隨着時間updata增加的,右邊是查詢的結果表。開始clicks表中只有一條數據[Mary, ./home]時其結果表是表-1,當clicks表中新增一條數據[Bob, ./cart]時,其結果表是表-2,依次下推。每一條新數據的到來會對之前表行進update或INSERT操作,SQL語句就會根據現有數據更新的結果表。

  2)帶有窗口(window)的聚合查詢

  窗口的時間間隔是1個小時,窗口-1對應的時表-1,窗口-2對應的時表-2,依次類推。和第一種查詢不一樣的是,每一張時表只是統計對應窗口的數據,之前窗口的數據對其沒有影響,對不同窗口的查詢結果是以追加的形式寫入result表中的。

   2.3 Update和Append查詢

  2.2中的兩種例子分別對應的兩種查詢方式,

  1)例子1對應着Update查詢,這種方式需要更新之前已經發出的結果,包括INSERT和UPDATE兩種改變。改變之前已經發出的結果意味着,這種查詢需要維護更多的狀態(state)數據;

  2)例子2對應着Append查詢,這種方式查詢的結果都是以追加的形式加入到result表中,僅包含INSERT操作。這種方式生成的表和update生成的表轉換成流的方式不一樣(見下文)。

  2.4 Restrictions查詢

  對於有些SQL查詢會因需要保留的state多大或重新計算已發出的記錄用來更新的代價太大而得不償失。

  1)state size:例如下面的SQL,在連續查詢中,當一條新的消息到來時,為了更新之前已發出的結果(聯想2.2中例1),需要保存之前的計算結果即state。當時當連續查詢持續很長時間時,需要保存的state的容量會很大,且隨着時間的遞增會越來越大,更糟糕的是若不斷有新用戶(分配不同的username)加入,其要保存的count會隨着時間更加恐怖,最后有可能導致任務失敗。

1 SELECT user, COUNT(url) FROM clicks GROUP BY user;

  2)computing updates:例如下面的SQL,當clicks表新增一條記錄,為計算rank,需要對之前所有的重新計算和更新已發出結果的中很大一部分,一條記錄的的增加,有可能導致很多user的rank變化。

1 SELECT user, RANK() OVER (ORDER BY lastLogin)
2 FROM (
3   SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user
4 );

   3)查詢配置(鏈接[3])

  在常見的場景中,對長期的運行的job做連續查詢,為了防止保存的state過大超出存儲而任務失敗,可能會對state的大小做一定限制即刪除state。但這種方式可能引發另一個問題——查詢出來的結果可能不准確。Flink Table API和SQL中提供查詢參數試圖在准確性和資源消耗中找到一個平衡點。

   Idle State Retention Time含義是state的key在被刪除之前多長時間沒有被更新,即沒有被更新state的保存時間。使用方式如下:

1 StreamQueryConfig qConfig = ...
2 
3 // set idle state retention time: min = 12 hours, max = 24 hours
4 qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24));

  3、Table到流的轉換

  可以通過INSERT、UPDATE、DELETE像修改常規表一樣去改變動態表。將動態表轉換為流或將其寫入外部系統時,需要對這些更改進行encode。 Flink的Table API和SQL支持三種encode改變動態表的方法:

  1)Append-only Stream(僅追加流):僅通過INSERT操作得到的動態表可以發射插入行來轉換為流(聯想2.2中例2),這種方式轉換的流中數據都是片段性的,一個片段代表一個窗口;

  2)Retract Stream(回溯流):restract stream有兩種消息:添加(add)消息和回溯(retract)消息。將動態表轉換為回溯(retract)流,通過將INSERT更改encode為添加消息,將DELETE更改encode為回溯消息,將UPDATE更改endcode為更新(上一個)行的回溯消息以及添加消息更新新的行 。 下圖顯示了動態表到回溯流的轉換。

  流上每條消息都有一個標識位,其中+標識INSERT操作,-標識DELETE操作。在clicks表中第一、二行消息[Mary, ./home]和[Bob, ./cat]被轉換為流中1第、2條消息,當clicks表中第三行[Mary, ./prod?id=1]轉換時,會先將已發出的第1條信息標記為DELETE告訴下游,然后第4條消息重新插入user為Mary的消息,依次類推,這樣可以保證輸出結果的正確性。

  3)Upsert Stream(上插流):Upsert流包括upsert消息和刪除消息。 動態表要轉換為upsert流需要(可能是復合的)唯一鍵。 通過將INSERT和UPDATE 操作encode為upsert消息,並將DELETE更改encode為刪除消息,可以是具有唯一鍵的動態表轉換為流。 流運算需要知道唯一鍵屬性才能正確應用消息。 與回溯流的主要區別在於UPDATE使用單個消息((主鍵))進行編碼,因此更有效。

  (個人理解待驗證)Upsert流和Retract流的區別在於數據存在第三方系統中時,前者可能存在重復數據,后者沒有。

 

   NOTE:在將動態表(dynamic table)轉換為數據流(Data Stream)時,僅支持append和retract兩種方式

  

Ref:

  [1]https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion

  [2]http://www.itdks.com/Course/detail?id=13213&from=search

  [3]https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/query_configuration.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM