【Flink】流-表概念


流概念

Data Streams上的關系查詢

關系型SQLstream processing對比如下。

SQL Stream Processing
有限元組 無限元組
完整數據集上的查詢 無法基於所有數據查詢
查詢會結束 查詢不會結束

Materialized View被定義為一條SQL查詢,其會緩存查詢結果。但當所查詢的表(基表)被修改時,緩存的結果將過期。
Eager View Maintenance會更新Materialized View,當基表被更新時,會立刻更新Materialized View中緩存的結果。

Eager View MaintenanceSQL Querystreams上的關系如下。

  • 數據庫表是INSERT、UPDATE、DELETEDML語句流的結果,被流稱為changelog stream
  • Materialized View被定義為一條SQL查詢。為更新View,查詢需要不斷處理changelog stream
  • Materialized Viewstreaming SQL查詢結果。

動態表 & 連續查詢

動態表是Flink流上Table Api & SQL的核心概念,其隨時間動態變化;

  • 查詢動態表會產生一個連續查詢;
  • 連續查詢永不停止,其會產生一個動態表;
  • 當所查詢的動態表發生變化時,查詢會更新結果動態表。

連續查詢的結果等同在輸入表的快照上以批處理模式執行相同查詢的結果。

流、動態表、連續查詢的關系如下圖所示。

  • stream會被轉化為動態表。
  • 在動態表上進行連續查詢,產生新的動態表。
  • 動態表會被轉化為stream

動態表是一個邏輯概念。 在查詢執行期間動態表不一定(完全)materialized

為理解動態表和連續查詢的概念,假設點擊事件流有如下模式。

[
user: VARCHAR, // the name of the user
cTime: TIMESTAMP, // the time when the URL was accessed
url: VARCHAR // the URL that was accessed by the user
]

流上定義表

為在流上使用關系查詢,流需要被轉化為表。流的每個記錄被解釋為結果表(動態表)上的INSERT修改,我們從一個只有INSERTchangelog流中構建表。如下圖所示,點擊事件流被轉化為表,表會隨着點擊事件記錄的插入而不斷增長。

連續查詢

連續查詢作用於動態表並又會產生動態表;連續查詢不會終止並會根據其輸入表(動態表)上的更新來更新其結果表(動態表)。
下面顯示在點擊事件流上定義的clicks表上顯示兩個查詢示例。

首先是GROUP-BY COUNT聚合查詢示例。

當查詢開始時,clicks表為空;當第一行插入到clicks表中時,查詢開始計算結果表(動態表),如[Mary, ./home]插入后,結果表包含一行結果[Mary, 1];當插入第二行[Bob, ./cart]時,查詢會更新結果表並插入新記錄[Bob, 1]。第三行[Mary, ./prod=id=1]插入時,查詢會更新結果表中的[Mary, 1]記錄,將其更新為[Mary, 2]。最后一行[Liz, 1]插入clicks表后,也會更新到結果表(插入新記錄)。

第二個查詢與第一個查詢類似,除了用戶屬性之外,還在小時滾動窗口上對clicks表進行分組,然后對URL進行計數(基於時間的計算,如窗口基於特殊的時間屬性)。

每個小時查詢會計算結果並更新結果表。在cTime12:00:00 - 12:59:59之間,clicks表存在四條記錄,對應的查詢計算出兩條結果;下個時間窗口(13:00:00 - 13:59:59),clicks表中存在三條記錄,對應的查詢計算出兩條結果添加值結果表中;當記錄插入至clicks表中后,結果表也會被動態更新。

更新和附加查詢

上述兩個查詢雖然有些類似(均計算統計聚合分組),但兩者也有顯著不同:第一個查詢會更新結果表的結果,如定義在結果表上的changelog流包含INSERTUPDATE;第二個查詢僅僅往結果表中添加記錄,如定義在結果表上的changelog流只包含INSERT。一個查詢是否生成僅插入表(INSERT)或更新表(UPDATE)有一些含義:生成更新表的查詢必須要維護更多狀態,將僅插入表轉化為流與將更新表轉化為流不同。

查詢限制

很多查詢可以等同在流上的連續查詢,一些查詢由於需維護狀態的大小或計算更新代價大導致查詢計算代價太大。

  • 狀態大小:無界限流上的連續查詢經常會運行數周或數月。因此,連續查詢處理的數據總量可以很大,需要以前結果(結果表)的連續查詢需要維護所有行以便進行更新。例如,第一個查詢示例中需要保存每個userurlcount以便可以增加count,使得當輸入表(左側表)接收一行新數據時會產生新的結果(右側表)。若只跟蹤注冊用戶,那么維護cnt大小代價不會太大(注冊用戶量不太大)。但若非注冊用戶也分配唯一的用戶名,則隨着時間的增加,維護cnt大小代價將增大,最終導致查詢失敗。

SELECT user, COUNT(url)
FROM clicks
GROUP BY user;

  • 計算更新:即使只添加或更新單行記錄,一些查詢需要重新計算和更新大部分結果行,通常這樣的查詢不適合作為連續查詢。如下查詢示例中,會根據最后一次點擊的時間為每個用戶計算RANK。一旦clicks表收到新行,用戶的lastAction被更新並且應該計算新的RANK。然而由於不存在兩行相同RANK,所以所有較低RANK的行也需要被更新。

SELECT user, RANK() OVER (ORDER BY lastLogin)
FROM (
SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user
);

表到流的轉化

動態表可像傳統表一樣被INSERT、UPDATE、DELETE修改。可能只有一行的表被持續更新;或者是沒有UPDATE、DELETE更改的只插入表。當將動態表轉化為流或將其寫入外部系統,這些更改(修改)需要被編碼,FlinkTable API & SQL支持三種方式編碼動態表上的更改(修改)。

  • Append-only流:僅使用INSERT更改進行修改的動態表可通過發出插入的行來轉化為流。
  • Retract流:Retract流包含兩種類型消息(add消息和retract消息),通過將動態表的INSERT更改作為add消息、將DELETE更改作為retract消息、將UPDATE更改分解為舊記錄的retract消息和新記錄的add消息。下圖展示了從動態表轉化為retract流

  • Upsert流:Upsert流包含兩種類型消息(upset消息和delete消息),動態表轉化為upsert流需要有主鍵(可復合),具有主鍵的動態表通過將INSERT、UPDATE更改編碼為upset消息,將DELETE更改編碼為delete消息upset流retract流主要區別是UPDATE更改使用單一消息(主鍵)進行編碼,因此效率更高。下圖展示了將動態表轉化為upset流

時間屬性

  • Processing time(處理時間):表示事件被處理的系統時間。
  • Event time(事件時間):表示事件發生時的時間。
  • Ingestion time(攝入時間):表示事件進入流處理系統的時間(在內部其與Event time類型)。

上述時間可以在代碼中指明時間特性。


final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default

// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

Table API & SQL中基於時間的操作(如窗口)需要設置時間概念和及其來源信息。因此,tables可以提供邏輯時間屬性來指示時間並在table程序中訪問相應時間戳。時間屬性可以是表模式的一部分(從DataStream中創建表時被定義),或在使用TableSource時被預定義,一旦時間屬性被定義,那么其可以作為一個字段被引用或進行基於時間的操作。只要時間屬性沒有被修改,只是從查詢的一部分轉發到另一部分,那么它仍然是一個有效的時間屬性。時間屬性與常規時間戳相同,可被訪問並計算。如果在計算中使用時間屬性,那么其將被具象化為常規時間戳,常規時間戳不兼容Flink的時間和水位系統,因此不能再用於基於時間的操作。

處理時間

processing time允許表程序基於本地機器的時間輸出結果,它不需要提取時間戳和生成水位,有多種方式定義processing time屬性。

流轉化為表過程

processing time屬性在模式定義時使用.proctime屬性定義,時間屬性只能通過額外的邏輯字段擴展物理模式,因此,其可被定義在模式定義的末尾,具體如下。


DataStream<Tuple2<String, String>> stream = ...;

// declare an additional logical field as a processing time attribute
Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");

WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));

使用TableSource

processing time屬性可通過實現DefinedProctimeAttribute接口定義,邏輯時間屬性被附加到由TableSource的返回類型定義的物理模式上。


// define a table source with a processing attribute
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {

	@Override
	public TypeInformation<Row> getReturnType() {
		String[] names = new String[] {"Username" , "Data"};
		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
		return Types.ROW(names, types);
	}

	@Override
	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
		// create stream 
		DataStream<Row> stream = ...;
		return stream;
	}

	@Override
	public String getProctimeAttribute() {
		// field with this name will be appended as a third field 
		return "UserActionTime";
	}
}

// register table source
tEnv.registerTableSource("UserActions", new UserActionSource());

WindowedTable windowedTable = tEnv
	.scan("UserActions")
	.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));

事件時間

Event time允許表程序根據每條記錄中包含的時間輸出結果,這樣即使在無序事件或晚到事件情況下保持一致結果,當從持久化存儲中讀取記錄時還保證可重放結果。此外,event time允許批和流環境中的表程序使用統一的語法,流環境中的時間屬性可以是批環境中的記錄的字段。為處理亂序事件,並區分流中准時和晚到事件,Flink需要從事件中提取時間戳信息,並在時間戳上進行處理(水位)。event time屬性可被定義在流到表的轉化中或者使用TableSourceTable API & SQL假設在上述兩種情況下,都在DataStream API中生成時間戳和水位。

流轉化為表的過程中

event time屬性在模式定義時通過.rowtime屬性定義;時間戳和水位必須在轉換的DataStream中已被分配;將DataStream轉化為Table時有如下兩種定義時間屬性的方式。

  • 通過附加邏輯字段擴展物理表模式。
  • 用邏輯字段替換物理字段(如提取時間戳后不再需要)。

// Option 1:

// extract timestamp and assign watermarks based on knowledge of the stream
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// declare an additional logical field as an event time attribute
Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.rowtime");


// Option 2:

// extract timestamp from first field, and assign watermarks based on knowledge of the stream
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// the first field has been used for timestamp extraction, and is no longer necessary
// replace first field with a logical event time attribute
Table table = tEnv.fromDataStream(stream, "UserActionTime.rowtime, Username, Data");

// Usage:

WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));

使用TableSource

event time屬性可通過實現DefinedRowtimeAttribute 接口定義,邏輯時間屬性被附加到由TableSource的返回類型定義的物理模式上。時間戳和水位一定要在getDataStream方法返回的流中被分配。


// define a table source with a rowtime attribute
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttribute {

	@Override
	public TypeInformation<Row> getReturnType() {
		String[] names = new String[] {"Username" , "Data"};
		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
		return Types.ROW(names, types);
	}

	@Override
	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
		// create stream 
		// ...
		// extract timestamp and assign watermarks based on knowledge of the stream
		DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
		return stream;
	}

	@Override
	public String getRowtimeAttribute() {
		// field with this name will be appended as a third field 
		return "UserActionTime";
	}
}

// register the table source
tEnv.registerTableSource("UserActions", new UserActionSource());

WindowedTable windowedTable = tEnv
	.scan("UserActions")
	.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));

查詢配置

不管輸入是有界批量輸入還是無界流輸入,Table API & SQL查詢都有相同的語義。在很多情況下,流上的連續查詢與離線計算具有相同准確的結果。然而,在實際情況下連續查詢必須要限制其所維護狀態的大小以避免使用完存儲空間,並能夠在長時間處理無限流數據。因此,連續查詢可能只能根據輸入數據的特征和查詢本身提供近似准確的結果。

Flink Table API & SQL接口提供參數調整連續查詢的准確性和資源消耗。參數通過QueryConfig對象定義,QueryConfig對象可通過TableEnvironment獲取並在翻譯表時被傳回。


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// obtain query configuration from TableEnvironment
StreamQueryConfig qConfig = tableEnv.queryConfig();
// set query parameters
qConfig.withIdleStateRetentionTime(Time.hours(12));

// define query
Table result = ...

// create TableSink
TableSink<Row> sink = ...

// emit result Table via a TableSink
result.writeToSink(sink, qConfig);

// convert result Table into a DataStream<Row>
DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);

下面描述了QueryConfig的參數如何影響查詢的准確性和資源消耗的。

空閑狀態保留時間

很多查詢在一個或多個關鍵屬性上聚合或連接記錄(如典型的聚合查詢),當在流上執行該查詢時,連續查詢需要維護記錄或保持每個鍵的部分結果。若涉及到流的關鍵域(活動鍵值隨時間會變化),隨着不同鍵被觀察,連續查詢會積累越來越多的狀態。然而,在一段時間后鍵將變得不活動時,它們的對應狀態將變得過期和無效。如下查詢示例中計算每個sessionclicks數量。

SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;

sessionId被作為分組鍵,連續查詢會為每個sessionId維護clicks數量。sessionId屬性隨着時間推移而變化,sessionId值僅在session結束前處於活動狀態(保持一段時間)。然而,由於不清楚sessionId屬性,連續查詢期望每個sessionId值在任何時間都有效,即會維護所有sessionId的值。這樣會導致隨着時間的推移,所維護的sessionId越來越多。

空閑狀態保留時間參數定義鍵的狀態不被更新,在刪除之前保留多長時間。在上述查詢中,sessionId的計數在指定的配置時間內未被更新時將被移除。當鍵會移除后再次被添加,那么鍵將會被當成新的鍵(如上述示例中又會開始計0)。有兩個參數配置空閑狀態保留時間最小空閑狀態保留時間最大空閑狀態保留時間


StreamQueryConfig qConfig = ...

// set idle state retention time: min = 12 hour, max = 16 hours
qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(16));
// set idle state retention time. min = max = 12 hours
qConfig.withIdleStateRetentionTime(Time.hours(12);

配置不同的最小和最大空閑狀態保留時間的效率更高,因為它減少了查詢內部簿記何時刪除狀態的次數。

參考鏈接

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/streaming.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM