【翻譯】Flink Table Api & SQL —Streaming 概念 ——時間屬性


本文翻譯自官網: Time Attributes   https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/time_attributes.html

Flink Table Api & SQL 翻譯目錄

Flink能夠根據不同的時間概念處理流數據

  • Process time 是指正在執行相應操作的機器的系統時間(也稱為“掛鍾時間”)。
  • Event time 是指基於附在每行上的時間戳對流數據進行處理。時間戳可以在事件發生時進行編碼。
  • Ingestion time 是事件進入Flink的時間;在內部,它的處理類似於事件時間。

有關Flink中時間處理的更多信息,請參見有關事件時間和水印的介紹

本頁說明如何在Flink的Table API和SQL中為基於時間的操作定義時間屬性。

時間屬性簡介

Table APISQL中的基於時間的操作(例如窗口)都需要有關時間概念及其起源的信息。因此,表可以提供邏輯時間屬性,以指示時間並訪問表程序中的相應時間戳。

時間屬性可以是每個表結構的一部分。它們是從DataStream創建表時定義的,或者是在使用TableSource時預定義的一旦在開始定義了時間屬性,就可以將其作為字段引用,並可以在基於時間的操作中使用。

只要時間屬性沒有被修改並且只是從查詢的一部分轉發到另一部分,它仍然是有效的時間屬性。時間屬性的行為類似於常規時間戳,可以進行訪問以進行計算。常規時間戳記不能與Flink的時間和水印系統配合使用,因此不能再用於基於時間的操作

表程序要求已為流環境指定了相應的時間特征:

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default

// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

處理時間

處理時間允許表程序根據本地計算機的時間產生結果。這是最簡單的時間概念,但不提供確定性。它既不需要時間戳提取也不需要水印生成。

有兩種定義處理時間屬性的方法。

在數據流到表的轉換期間

在結構定義期間,使用.proctime屬性定義了處理時間屬性。時間屬性只能通過其他邏輯字段擴展物理結構因此,只能在結構定義的末尾定義它。

val stream: DataStream[(String, String)] = ...

// declare an additional logical field as a processing time attribute
val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)

val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)

使用TableSource

處理時間屬性由實現DefinedProctimeAttribute接口的TableSource定義。邏輯時間屬性附加到由TableSource的返回類型定義的物理結構

class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {

    override def getReturnType = {
        val names = Array[String]("Username" , "Data")
        val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
        Types.ROW(names, types)
    }

    override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
        // create stream
        val stream = ...
        stream
    }

    override def getProctimeAttribute = {
        // field with this name will be appended as a third field
        "UserActionTime"
    }
}

// register table source
tEnv.registerTableSource("UserActions", new UserActionSource)

val windowedTable = tEnv
    .scan("UserActions")
    .window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)

事件時間

事件時間允許表程序根據每個記錄中包含的時間來產生結果。即使在無序事件或遲發事件的情況下,這也可以提供一致的結果。從持久性存儲中讀取記錄時,還可以確保表程序的可重播結果。

此外,事件時間允許批處理和流環境中的表程序使用統一語法。流環境中的時間屬性可以是批處理環境中記錄的常規字段。

為了處理亂序事件並區分流中的按時事件和延遲事件,Flink需要從事件中提取時間戳並及時進行某種處理(就是水印)。

可以在DataStream到Table的轉換期間或使用TableSource 定義事件時間屬性。

在DataStream 到 Table 的轉換期間

在結構定義期間,事件時間屬性是使用.rowtime屬性定義的。必須在轉換的DataStream中分配時間戳和水印

將 DataStream 轉換為 Table 時,有兩種定義時間屬性的方法。根據指定的.rowtime字段名稱是否存在於DataStream的結構中,timestamp字段為

  • 作為新字段附加到結構
  • 替換現有字段。

無論哪種情況,事件時間時間戳字段都將保留DataStream事件時間 時間戳的值。

// Option 1:

// extract timestamp and assign watermarks based on knowledge of the stream
val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...)

// declare an additional logical field as an event time attribute
val table = tEnv.fromDataStream(stream, 'Username, 'Data, 'UserActionTime.rowtime)


// Option 2:

// extract timestamp from first field, and assign watermarks based on knowledge of the stream
val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)

// the first field has been used for timestamp extraction, and is no longer necessary
// replace first field with a logical event time attribute
val table = tEnv.fromDataStream(stream, 'UserActionTime.rowtime, 'Username, 'Data)

// Usage:

val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)

使用TableSource

 事件時間屬性由實現了DefinedRowtimeAttributes接口的TableSource定義。getRowtimeAttributeDescriptors()方法返回用於描述時間屬性最終名稱的RowtimeAttributeDescriptor列表,用於導出屬性值的時間戳提取器以及與該屬性關聯的水印策略。

請確保由getDataStream()方法返回的DataStream與定義的時間屬性對齊。僅當定義了StreamRecordTimestamp時間戳提取器時,才考慮DataStream的時間戳(由TimestampAssigner分配的時間戳)。僅當定義了PreserveWatermarks水印策略時,才會保留DataStream的水印。 否則,僅TableSource的rowtime屬性的值相關。

// define a table source with a rowtime attribute
class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {

    override def getReturnType = {
        val names = Array[String]("Username" , "Data", "UserActionTime")
        val types = Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.LONG)
        Types.ROW(names, types)
    }

    override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
        // create stream
        // ...
        // assign watermarks based on the "UserActionTime" attribute
        val stream = inputStream.assignTimestampsAndWatermarks(...)
        stream
    }

    override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
        // Mark the "UserActionTime" attribute as event-time attribute.
        // We create one attribute descriptor of "UserActionTime".
        val rowtimeAttrDescr = new RowtimeAttributeDescriptor(
            "UserActionTime",
            new ExistingField("UserActionTime"),
            new AscendingTimestamps)
        val listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr)
        listRowtimeAttrDescr
    }
}

// register the table source
tEnv.registerTableSource("UserActions", new UserActionSource)

val windowedTable = tEnv
    .scan("UserActions")
    .window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM