Flink基礎(十八):Table API 和 Flink SQL(三)流處理中的特殊概念


  Table API和SQL,本質上還是基於關系型表的操作方式;而關系型表、關系代數,以及SQL本身,一般是有界的,更適合批處理的場景。這就導致在進行流處理的過程中,理解會稍微復雜一些,需要引入一些特殊概念。

1 流處理和關系代數(表,及SQL)的區別

  關系代數(表)/SQL 流處理
處理的數據對象 字段元組的有界集合 無法訪問所有數據,必須持續等待流式輸入
查詢(Query)對數據的訪問 可以訪問到完整的數據輸入 無法訪問所有數據,必須持續等待流式輸入
查詢終止條件 生成固定大小的結果集后終止 永不停止,根據持續收到的數據不斷更新查詢結果

可以看到,其實關系代數(主要就是指關系型數據庫中的表)和SQL,主要就是針對批處理的,這和流處理有天生的隔閡。

2 動態表

因為流處理面對的數據,是連續不斷的,這和我們熟悉的關系型數據庫中保存的“表”完全不同。所以,如果我們把流數據轉換成Table,然后執行類似於table的select操作,結果就不是一成不變的,而是隨着新數據的到來,會不停更新。

我們可以隨着新數據的到來,不停地在之前的基礎上更新結果。這樣得到的表,在Flink Table API概念里,就叫做“動態表”(Dynamic Tables)。

動態表是Flink對流數據的Table API和SQL支持的核心概念。與表示批處理數據的靜態表不同,動態表是隨時間變化的。動態表可以像靜態的批處理表一樣進行查詢,查詢一個動態表會產生持續查詢(Continuous Query)。連續查詢永遠不會終止,並會生成另一個動態表。查詢(Query)會不斷更新其動態結果表,以反映其動態輸入表上的更改。

3 流式持續查詢的過程

下圖顯示了流、動態表和連續查詢的關系:

流式持續查詢的過程為:

  1. 流被轉換為動態表。
  2. 對動態表計算連續查詢,生成新的動態表。
  3. 生成的動態表被轉換回流。

3.1 將流轉換成表(Table)

為了處理帶有關系查詢的流,必須先將其轉換為表。

從概念上講,流的每個數據記錄,都被解釋為對結果表的插入(Insert)修改。因為流式持續不斷的,而且之前的輸出結果無法改變。本質上,我們其實是從一個、只有插入操作的changelog(更新日志)流,來構建一個表。

為了更好地說明動態表和持續查詢的概念,我們來舉一個具體的例子。

比如,我們現在的輸入數據,就是用戶在網站上的訪問行為,數據類型(Schema)如下:

{
  user:  VARCHAR,   // 用戶名
  cTime: TIMESTAMP, // 訪問某個URL的時間戳
  url:   VARCHAR    // 用戶訪問的URL
}

下圖顯示了如何將訪問URL事件流,或者叫點擊事件流(左側)轉換為表(右側)。

隨着插入更多的訪問事件流記錄,生成的表將不斷增長。

3.2 持續查詢(Continuous Query)

持續查詢,會在動態表上做計算處理,並作為結果生成新的動態表。與批處理查詢不同,連續查詢從不終止,並根據輸入表上的更新更新其結果表。

在任何時間點,連續查詢的結果在語義上,等同於在輸入表的快照上,以批處理模式執行的同一查詢的結果。

在下面的示例中,我們展示了對點擊事件流中的一個持續查詢。

這個Query很簡單,是一個分組聚合做count統計的查詢。它將用戶字段上的clicks表分組,並統計訪問的url數。圖中顯示了隨着時間的推移,當clicks表被其他行更新時如何計算查詢。

3.3 將動態表轉換成流

與常規的數據庫表一樣,動態表可以通過插入(Insert)、更新(Update)和刪除(Delete)更改,進行持續的修改。將動態表轉換為流或將其寫入外部系統時,需要對這些更改進行編碼。Flink的Table API和SQL支持三種方式對動態表的更改進行編碼:

  1. 僅追加(Append-only)流

僅通過插入(Insert)更改,來修改的動態表,可以直接轉換為“僅追加”流。這個流中發出的數據,就是動態表中新增的每一行。

  1. 撤回(Retract)流

Retract流是包含兩類消息的流,添加(Add)消息和撤回(Retract)消息。

動態表通過將INSERT 編碼為add消息、DELETE 編碼為retract消息、UPDATE編碼為被更改行(前一行)的retract消息和更新后行(新行)的add消息,轉換為retract流。

下圖顯示了將動態表轉換為Retract流的過程。

  1. Upsert(更新插入)流

Upsert流包含兩種類型的消息:Upsert消息和delete消息。轉換為upsert流的動態表,需要有唯一的鍵(key)。

通過將INSERT和UPDATE更改編碼為upsert消息,將DELETE更改編碼為DELETE消息,就可以將具有唯一鍵(Unique Key)的動態表轉換為流。

下圖顯示了將動態表轉換為upsert流的過程。

這些概念我們之前都已提到過。需要注意的是,在代碼里將動態表轉換為DataStream時,僅支持Append和Retract流。而向外部系統輸出動態表的TableSink接口,則可以有不同的實現,比如之前我們講到的ES,就可以有Upsert模式。

4 時間特性

基於時間的操作(比如Table API和SQL中窗口操作),需要定義相關的時間語義和時間數據來源的信息。所以,Table可以提供一個邏輯上的時間字段,用於在表處理程序中,指示時間和訪問相應的時間戳。

時間屬性,可以是每個表schema的一部分。一旦定義了時間屬性,它就可以作為一個字段引用,並且可以在基於時間的操作中使用。

時間屬性的行為類似於常規時間戳,可以訪問,並且進行計算。

4.1 處理時間

處理時間語義下,允許表處理程序根據機器的本地時間生成結果。它是時間的最簡單概念。它既不需要提取時間戳,也不需要生成watermark。

定義處理時間屬性有三種方法:在DataStream轉化時直接指定;在定義Table Schema時指定;在創建表的DDL中指定。

  1. DataStream轉化成Table時指定

由DataStream轉換成表時,可以在后面指定字段名來定義Schema。在定義Schema期間,可以使用.proctime,定義處理時間字段。

注意,這個proctime屬性只能通過附加邏輯字段,來擴展物理schema。因此,只能在schema定義的末尾定義它。

代碼如下:

val stream = env.addSource(new SensorSource)
val sensorTable = tableEnv
  .fromDataStream(stream, $"id", $"timestamp", $"temperature", $"pt".proctime())
  1. 創建表的DDL中指定

在創建表的DDL中,增加一個字段並指定成proctime,也可以指定當前的時間字段。

代碼如下:

val sinkDDL: String =
  """
    |create table dataTable (
    |  id varchar(20) not null,
    |  ts bigint,
    |  temperature double,
    |  pt AS PROCTIME()
    |) with (
    |  'connector.type' = 'filesystem',
    |  'connector.path' = 'sensor.txt',
    |  'format.type' = 'csv'
    |)
  """.stripMargin

tableEnv.sqlUpdate(sinkDDL) // 執行 DDL

注意:運行這段DDL,必須使用Blink Planner。

4.2 事件時間(Event Time)

事件時間語義,允許表處理程序根據每個記錄中包含的時間生成結果。這樣即使在有亂序事件或者延遲事件時,也可以獲得正確的結果。

為了處理無序事件,並區分流中的准時和遲到事件;Flink需要從事件數據中,提取時間戳,並用來推進事件時間的進展(watermark)。

  1. DataStream轉化成Table時指定

在DataStream轉換成Table,schema的定義期間,使用.rowtime可以定義事件時間屬性。注意,必須在轉換的數據流中分配時間戳和watermark。

在將數據流轉換為表時,有兩種定義時間屬性的方法。根據指定的.rowtime字段名是否存在於數據流的架構中,timestamp字段可以:

  • 作為新字段追加到schema
  • 替換現有字段

在這兩種情況下,定義的事件時間戳字段,都將保存DataStream中事件時間戳的值。

代碼如下:

val stream = env
  .addSource(new SensorSource)
  .assignAscendingTimestamps(r => r.timestamp)
// 將 DataStream轉換為 Table,並指定時間字段
val sensorTable = tableEnv
  .fromDataStream(stream, $"id", $"timestamp".rowtime(), 'temperature)
  1. 創建表的DDL中指定

事件時間屬性,是使用CREATE TABLE DDL中的WARDMARK語句定義的。watermark語句,定義現有事件時間字段上的watermark生成表達式,該表達式將事件時間字段標記為事件時間屬性。

代碼如下:

val sinkDDL: String =
  """
    |create table dataTable (
    |  id varchar(20) not null,
    |  ts bigint,
    |  temperature double,
    |  rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ),
    |  watermark for rt as rt - interval '1' second
    |) with (
    |  'connector.type' = 'filesystem',
    |  'connector.path' = 'file:///D:\\..\\sensor.txt',
    |  'format.type' = 'csv'
    |)
  """.stripMargin

tableEnv.sqlUpdate(sinkDDL) // 執行 DDL

這里FROM_UNIXTIME是系統內置的時間函數,用來將一個整數(秒數)轉換成“YYYY-MM-DD hh:mm:ss”格式(默認,也可以作為第二個String參數傳入)的日期時間字符串(date time string);然后再用TO_TIMESTAMP將其轉換成Timestamp。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM