本文翻譯自官網: Time Attributes https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/time_attributes.html
Flink能夠根據不同的時間概念處理流數據。
- Process time 是指正在執行相應操作的機器的系統時間(也稱為“掛鍾時間”)。
- Event time 是指基於附在每行上的時間戳對流數據進行處理。時間戳可以在事件發生時進行編碼。
- Ingestion time 是事件進入Flink的時間;在內部,它的處理類似於事件時間。
有關Flink中時間處理的更多信息,請參見有關事件時間和水印的介紹。
本頁說明如何在Flink的Table API和SQL中為基於時間的操作定義時間屬性。
時間屬性簡介
Table API和SQL中的基於時間的操作(例如窗口)都需要有關時間概念及其起源的信息。因此,表可以提供邏輯時間屬性,以指示時間並訪問表程序中的相應時間戳。
時間屬性可以是每個表結構的一部分。它們是從DataStream創建表時定義的,或者是在使用TableSource時預定義的。一旦在開始定義了時間屬性,就可以將其作為字段引用,並可以在基於時間的操作中使用。
只要時間屬性沒有被修改並且只是從查詢的一部分轉發到另一部分,它仍然是有效的時間屬性。時間屬性的行為類似於常規時間戳,可以進行訪問以進行計算。常規時間戳記不能與Flink的時間和水印系統配合使用,因此不能再用於基於時間的操作。
表程序要求已為流環境指定了相應的時間特征:
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default // alternatively: // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
處理時間
處理時間允許表程序根據本地計算機的時間產生結果。這是最簡單的時間概念,但不提供確定性。它既不需要時間戳提取也不需要水印生成。
有兩種定義處理時間屬性的方法。
在數據流到表的轉換期間
在結構定義期間,使用.proctime屬性定義了處理時間屬性。時間屬性只能通過其他邏輯字段擴展物理結構。因此,只能在結構定義的末尾定義它。
val stream: DataStream[(String, String)] = ... // declare an additional logical field as a processing time attribute val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime) val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
使用TableSource
處理時間屬性由實現DefinedProctimeAttribute接口的TableSource定義。邏輯時間屬性附加到由TableSource的返回類型定義的物理結構。
class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute { override def getReturnType = { val names = Array[String]("Username" , "Data") val types = Array[TypeInformation[_]](Types.STRING, Types.STRING) Types.ROW(names, types) } override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { // create stream val stream = ... stream } override def getProctimeAttribute = { // field with this name will be appended as a third field "UserActionTime" } } // register table source tEnv.registerTableSource("UserActions", new UserActionSource) val windowedTable = tEnv .scan("UserActions") .window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
事件時間
事件時間允許表程序根據每個記錄中包含的時間來產生結果。即使在無序事件或遲發事件的情況下,這也可以提供一致的結果。從持久性存儲中讀取記錄時,還可以確保表程序的可重播結果。
此外,事件時間允許批處理和流環境中的表程序使用統一語法。流環境中的時間屬性可以是批處理環境中記錄的常規字段。
為了處理亂序事件並區分流中的按時事件和延遲事件,Flink需要從事件中提取時間戳並及時進行某種處理(就是水印)。
可以在DataStream到Table的轉換期間或使用TableSource 定義事件時間屬性。
在DataStream 到 Table 的轉換期間
在結構定義期間,事件時間屬性是使用.rowtime屬性定義的。必須在轉換的DataStream中分配時間戳和水印。
將 DataStream 轉換為 Table 時,有兩種定義時間屬性的方法。根據指定的.rowtime字段名稱是否存在於DataStream的結構中,timestamp字段為
- 作為新字段附加到結構
- 替換現有字段。
無論哪種情況,事件時間時間戳字段都將保留DataStream事件時間 時間戳的值。
// Option 1: // extract timestamp and assign watermarks based on knowledge of the stream val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...) // declare an additional logical field as an event time attribute val table = tEnv.fromDataStream(stream, 'Username, 'Data, 'UserActionTime.rowtime) // Option 2: // extract timestamp from first field, and assign watermarks based on knowledge of the stream val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...) // the first field has been used for timestamp extraction, and is no longer necessary // replace first field with a logical event time attribute val table = tEnv.fromDataStream(stream, 'UserActionTime.rowtime, 'Username, 'Data) // Usage: val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
使用TableSource
事件時間屬性由實現了DefinedRowtimeAttributes接口的TableSource定義。getRowtimeAttributeDescriptors()方法返回用於描述時間屬性最終名稱的RowtimeAttributeDescriptor列表,用於導出屬性值的時間戳提取器以及與該屬性關聯的水印策略。
請確保由getDataStream()方法返回的DataStream與定義的時間屬性對齊。僅當定義了StreamRecordTimestamp時間戳提取器時,才考慮DataStream的時間戳(由TimestampAssigner分配的時間戳)。僅當定義了PreserveWatermarks水印策略時,才會保留DataStream的水印。 否則,僅TableSource的rowtime屬性的值相關。
// define a table source with a rowtime attribute class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttributes { override def getReturnType = { val names = Array[String]("Username" , "Data", "UserActionTime") val types = Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.LONG) Types.ROW(names, types) } override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { // create stream // ... // assign watermarks based on the "UserActionTime" attribute val stream = inputStream.assignTimestampsAndWatermarks(...) stream } override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = { // Mark the "UserActionTime" attribute as event-time attribute. // We create one attribute descriptor of "UserActionTime". val rowtimeAttrDescr = new RowtimeAttributeDescriptor( "UserActionTime", new ExistingField("UserActionTime"), new AscendingTimestamps) val listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr) listRowtimeAttrDescr } } // register the table source tEnv.registerTableSource("UserActions", new UserActionSource) val windowedTable = tEnv .scan("UserActions") .window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

