流概念
Data Streams上的關系查詢
關系型SQL
與stream processing
對比如下。
SQL | Stream Processing |
---|---|
有限元組 | 無限元組 |
完整數據集上的查詢 | 無法基於所有數據查詢 |
查詢會結束 | 查詢不會結束 |
Materialized View被定義為一條SQL查詢,其會緩存查詢結果。但當所查詢的表(基表)被修改時,緩存的結果將過期。
Eager View Maintenance會更新Materialized View,當基表被更新時,會立刻更新Materialized View中緩存的結果。
Eager View Maintenance和SQL Query在streams
上的關系如下。
- 數據庫表是
INSERT、UPDATE、DELETE
等DML
語句流的結果,被流稱為changelog stream。 - Materialized View被定義為一條SQL查詢。為更新
View
,查詢需要不斷處理changelog stream。 - Materialized View是
streaming SQL
查詢結果。
動態表 & 連續查詢
動態表是Flink流上Table Api & SQL
的核心概念,其隨時間動態變化;
- 查詢動態表會產生一個連續查詢;
- 連續查詢永不停止,其會產生一個動態表;
- 當所查詢的動態表發生變化時,查詢會更新結果動態表。
連續查詢的結果等同在輸入表的快照上以批處理模式執行相同查詢的結果。
流、動態表、連續查詢的關系如下圖所示。
stream
會被轉化為動態表。- 在動態表上進行連續查詢,產生新的動態表。
- 動態表會被轉化為
stream
。
動態表是一個邏輯概念。 在查詢執行期間動態表不一定(完全)
materialized
。
為理解動態表和連續查詢的概念,假設點擊事件流有如下模式。
[
user: VARCHAR, // the name of the user
cTime: TIMESTAMP, // the time when the URL was accessed
url: VARCHAR // the URL that was accessed by the user
]
流上定義表
為在流上使用關系查詢,流需要被轉化為表。流的每個記錄被解釋為結果表(動態表)上的INSERT
修改,我們從一個只有INSERT
的changelog
流中構建表。如下圖所示,點擊事件流被轉化為表,表會隨着點擊事件記錄的插入而不斷增長。
連續查詢
連續查詢作用於動態表並又會產生動態表;連續查詢不會終止並會根據其輸入表(動態表)上的更新來更新其結果表(動態表)。
下面顯示在點擊事件流上定義的clicks
表上顯示兩個查詢示例。
首先是GROUP-BY COUNT
聚合查詢示例。
當查詢開始時,clicks
表為空;當第一行插入到clicks
表中時,查詢開始計算結果表(動態表),如[Mary, ./home]插入后,結果表包含一行結果[Mary, 1];當插入第二行[Bob, ./cart]時,查詢會更新結果表並插入新記錄[Bob, 1]。第三行[Mary, ./prod=id=1]插入時,查詢會更新結果表中的[Mary, 1]記錄,將其更新為[Mary, 2]。最后一行[Liz, 1]插入clicks
表后,也會更新到結果表(插入新記錄)。
第二個查詢與第一個查詢類似,除了用戶屬性之外,還在小時滾動窗口上對clicks
表進行分組,然后對URL進行計數(基於時間的計算,如窗口基於特殊的時間屬性)。
每個小時查詢會計算結果並更新結果表。在cTime
在12:00:00 - 12:59:59
之間,clicks
表存在四條記錄,對應的查詢計算出兩條結果;下個時間窗口(13:00:00 - 13:59:59),clicks
表中存在三條記錄,對應的查詢計算出兩條結果添加值結果表中;當記錄插入至clicks
表中后,結果表也會被動態更新。
更新和附加查詢
上述兩個查詢雖然有些類似(均計算統計聚合分組),但兩者也有顯著不同:第一個查詢會更新結果表的結果,如定義在結果表上的changelog
流包含INSERT
和UPDATE
;第二個查詢僅僅往結果表中添加記錄,如定義在結果表上的changelog
流只包含INSERT
。一個查詢是否生成僅插入表(INSERT
)或更新表(UPDATE
)有一些含義:生成更新表的查詢必須要維護更多狀態,將僅插入表轉化為流與將更新表轉化為流不同。
查詢限制
很多查詢可以等同在流上的連續查詢,一些查詢由於需維護狀態的大小或計算更新代價大導致查詢計算代價太大。
- 狀態大小:無界限流上的連續查詢經常會運行數周或數月。因此,連續查詢處理的數據總量可以很大,需要以前結果(結果表)的連續查詢需要維護所有行以便進行更新。例如,第一個查詢示例中需要保存每個
user
的url
的count
以便可以增加count
,使得當輸入表(左側表)接收一行新數據時會產生新的結果(右側表)。若只跟蹤注冊用戶,那么維護cnt
大小代價不會太大(注冊用戶量不太大)。但若非注冊用戶也分配唯一的用戶名,則隨着時間的增加,維護cnt
大小代價將增大,最終導致查詢失敗。
SELECT user, COUNT(url)
FROM clicks
GROUP BY user;
- 計算更新:即使只添加或更新單行記錄,一些查詢需要重新計算和更新大部分結果行,通常這樣的查詢不適合作為連續查詢。如下查詢示例中,會根據最后一次點擊的時間為每個用戶計算
RANK
。一旦clicks
表收到新行,用戶的lastAction
被更新並且應該計算新的RANK
。然而由於不存在兩行相同RANK
,所以所有較低RANK
的行也需要被更新。
SELECT user, RANK() OVER (ORDER BY lastLogin)
FROM (
SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user
);
表到流的轉化
動態表可像傳統表一樣被INSERT、UPDATE、DELETE
修改。可能只有一行的表被持續更新;或者是沒有UPDATE、DELETE
更改的只插入表。當將動態表轉化為流或將其寫入外部系統,這些更改(修改)需要被編碼,Flink
的Table API & SQL
支持三種方式編碼動態表上的更改(修改)。
- Append-only流:僅使用
INSERT更改
進行修改的動態表可通過發出插入的行來轉化為流。 - Retract流:
Retract流
包含兩種類型消息(add消息和retract消息
),通過將動態表的INSERT更改
作為add消息
、將DELETE更改
作為retract消息
、將UPDATE更改
分解為舊記錄的retract消息
和新記錄的add消息
。下圖展示了從動態表轉化為retract流
。
- Upsert流:
Upsert流
包含兩種類型消息(upset消息和delete消息
),動態表轉化為upsert流
需要有主鍵(可復合),具有主鍵的動態表通過將INSERT、UPDATE更改
編碼為upset消息
,將DELETE更改
編碼為delete消息
。upset流
與retract流
主要區別是UPDATE更改
使用單一消息(主鍵)進行編碼,因此效率更高。下圖展示了將動態表
轉化為upset流
。
時間屬性
- Processing time(處理時間):表示事件被處理的系統時間。
- Event time(事件時間):表示事件發生時的時間。
- Ingestion time(攝入時間):表示事件進入流處理系統的時間(在內部其與
Event time
類型)。
上述時間可以在代碼中指明時間特性。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Table API & SQL
中基於時間的操作(如窗口)需要設置時間概念和及其來源信息。因此,tables
可以提供邏輯時間屬性
來指示時間並在table
程序中訪問相應時間戳。時間屬性可以是表模式
的一部分(從DataStream
中創建表時被定義),或在使用TableSource
時被預定義,一旦時間屬性被定義,那么其可以作為一個字段被引用或進行基於時間的操作。只要時間屬性沒有被修改,只是從查詢的一部分轉發到另一部分,那么它仍然是一個有效的時間屬性。時間屬性與常規時間戳相同,可被訪問並計算。如果在計算中使用時間屬性,那么其將被具象化為常規時間戳,常規時間戳不兼容Flink
的時間和水位系統,因此不能再用於基於時間的操作。
處理時間
processing time
允許表程序基於本地機器的時間輸出結果,它不需要提取時間戳和生成水位,有多種方式定義processing time
屬性。
流轉化為表過程
processing time
屬性在模式定義時使用.proctime
屬性定義,時間屬性只能通過額外的邏輯字段擴展物理模式,因此,其可被定義在模式定義的末尾,具體如下。
DataStream<Tuple2<String, String>> stream = ...;
// declare an additional logical field as a processing time attribute
Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
使用TableSource
processing time
屬性可通過實現DefinedProctimeAttribute
接口定義,邏輯時間屬性被附加到由TableSource
的返回類型定義的物理模式上。
// define a table source with a processing attribute
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
@Override
public TypeInformation<Row> getReturnType() {
String[] names = new String[] {"Username" , "Data"};
TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
return Types.ROW(names, types);
}
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
// create stream
DataStream<Row> stream = ...;
return stream;
}
@Override
public String getProctimeAttribute() {
// field with this name will be appended as a third field
return "UserActionTime";
}
}
// register table source
tEnv.registerTableSource("UserActions", new UserActionSource());
WindowedTable windowedTable = tEnv
.scan("UserActions")
.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
事件時間
Event time
允許表程序根據每條記錄中包含的時間輸出結果,這樣即使在無序事件或晚到事件情況下保持一致結果,當從持久化存儲中讀取記錄時還保證可重放結果。此外,event time
允許批和流環境中的表程序使用統一的語法,流環境中的時間屬性可以是批環境中的記錄的字段。為處理亂序事件,並區分流中准時和晚到事件,Flink
需要從事件中提取時間戳信息,並在時間戳上進行處理(水位)。event time
屬性可被定義在流到表的轉化中或者使用TableSource。Table API & SQL
假設在上述兩種情況下,都在DataStream API
中生成時間戳和水位。
流轉化為表的過程中
event time
屬性在模式定義時通過.rowtime
屬性定義;時間戳和水位必須在轉換的DataStream中已被分配;將DataStream
轉化為Table
時有如下兩種定義時間屬性的方式。
- 通過附加邏輯字段擴展物理表模式。
- 用邏輯字段替換物理字段(如提取時間戳后不再需要)。
// Option 1:
// extract timestamp and assign watermarks based on knowledge of the stream
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
// declare an additional logical field as an event time attribute
Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.rowtime");
// Option 2:
// extract timestamp from first field, and assign watermarks based on knowledge of the stream
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
// the first field has been used for timestamp extraction, and is no longer necessary
// replace first field with a logical event time attribute
Table table = tEnv.fromDataStream(stream, "UserActionTime.rowtime, Username, Data");
// Usage:
WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
使用TableSource
event time
屬性可通過實現DefinedRowtimeAttribute
接口定義,邏輯時間屬性被附加到由TableSource
的返回類型定義的物理模式上。時間戳和水位一定要在getDataStream
方法返回的流中被分配。
// define a table source with a rowtime attribute
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttribute {
@Override
public TypeInformation<Row> getReturnType() {
String[] names = new String[] {"Username" , "Data"};
TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
return Types.ROW(names, types);
}
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
// create stream
// ...
// extract timestamp and assign watermarks based on knowledge of the stream
DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
return stream;
}
@Override
public String getRowtimeAttribute() {
// field with this name will be appended as a third field
return "UserActionTime";
}
}
// register the table source
tEnv.registerTableSource("UserActions", new UserActionSource());
WindowedTable windowedTable = tEnv
.scan("UserActions")
.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
查詢配置
不管輸入是有界批量輸入還是無界流輸入,Table API & SQL
查詢都有相同的語義。在很多情況下,流上的連續查詢與離線計算具有相同准確的結果。然而,在實際情況下連續查詢必須要限制其所維護狀態的大小以避免使用完存儲空間,並能夠在長時間處理無限流數據。因此,連續查詢可能只能根據輸入數據的特征和查詢本身提供近似准確的結果。
Flink Table API & SQL
接口提供參數調整連續查詢的准確性和資源消耗。參數通過QueryConfig
對象定義,QueryConfig
對象可通過TableEnvironment
獲取並在翻譯表時被傳回。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// obtain query configuration from TableEnvironment
StreamQueryConfig qConfig = tableEnv.queryConfig();
// set query parameters
qConfig.withIdleStateRetentionTime(Time.hours(12));
// define query
Table result = ...
// create TableSink
TableSink<Row> sink = ...
// emit result Table via a TableSink
result.writeToSink(sink, qConfig);
// convert result Table into a DataStream<Row>
DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);
下面描述了QueryConfig
的參數如何影響查詢的准確性和資源消耗的。
空閑狀態保留時間
很多查詢在一個或多個關鍵屬性上聚合或連接記錄(如典型的聚合查詢),當在流上執行該查詢時,連續查詢需要維護記錄或保持每個鍵的部分結果。若涉及到流的關鍵域(活動鍵值隨時間會變化),隨着不同鍵被觀察,連續查詢會積累越來越多的狀態。然而,在一段時間后鍵將變得不活動時,它們的對應狀態將變得過期和無效。如下查詢示例中計算每個session
的clicks
數量。
SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
sessionId
被作為分組鍵,連續查詢會為每個sessionId
維護clicks
數量。sessionId
屬性隨着時間推移而變化,sessionId
值僅在session
結束前處於活動狀態(保持一段時間)。然而,由於不清楚sessionId
屬性,連續查詢期望每個sessionId
值在任何時間都有效,即會維護所有sessionId
的值。這樣會導致隨着時間的推移,所維護的sessionId
越來越多。
空閑狀態保留時間參數定義鍵的狀態不被更新,在刪除之前保留多長時間。在上述查詢中,sessionId
的計數在指定的配置時間內未被更新時將被移除。當鍵會移除后再次被添加,那么鍵將會被當成新的鍵(如上述示例中又會開始計0)。有兩個參數配置空閑狀態保留時間,最小空閑狀態保留時間和最大空閑狀態保留時間。
StreamQueryConfig qConfig = ...
// set idle state retention time: min = 12 hour, max = 16 hours
qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(16));
// set idle state retention time. min = max = 12 hours
qConfig.withIdleStateRetentionTime(Time.hours(12);
配置不同的最小和最大空閑狀態保留時間的效率更高,因為它減少了查詢內部簿記何時刪除狀態的次數。
參考鏈接
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/streaming.html