Flink Table&Sql 時間特性


1、時間特性

    基於時間的操作(比如 Table API 和 SQL 中窗口操作),需要定義相關的時間語義和時間 數據來源的信息。
所以,Table 可以提供一個邏輯上的時間字段,用於在表處理程序中,指示時間和訪問相應的時間戳
時間屬性,可以是每個表 schema 的一部分。一旦定義了時間屬性,它就可以作為一個字段引用,並且可以在基於時間的操作中使用。 時間屬性的行為類似於常規時間戳,可以訪問,並且進行計算。

2、處理時間(Processing Time)

        處理時間語義下,允許表處理程序根據機器的本地時間生成結果。它是時間的最簡單概 念。它既不需要提取時間戳,也不需要生成 watermark。
        
        定義處理時間屬性有三種方法:在 DataStream 轉化時直接指定在定義 Table Schema 時指定在創建表的 DDL 中指定。
        
    a) DataStream 轉化成 Table 時指定
        由 DataStream 轉換成表時,可以在后面指定字段名來定義 Schema。在定義 Schema 期 間,可以使用.proctime,定義處理時間字段。 
        注意,這個 proctime 屬性只能通過附加邏輯字段,來擴展物理 schema。因此,只能在 schema 定義的末尾定義它
        
        代碼段:
            // 定義好 DataStream 
            val inputStream: DataStream[String] = env.readTextFile("\\sensor.txt") 
            val dataStream: DataStream[SensorReading] = inputStream .map(data => {
            val dataArray = data.split(",") 
              SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble) 
            }) 
            // 將 DataStream 轉換為 Table,並指定時間字段 
            val sensorTable = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp, 'pt.proctime)
            
    b)定義 Table Schema 時指定
        這種方法其實也很簡單,只要在定義 Schema 的時候,加上一個新的字段,並指定成 proctime 就可以了
        代碼段:
                tableEnv.connect( new FileSystem().path("..\\sensor.txt"))
                  .withFormat(new Csv()) .withSchema(new Schema()
                  .field("id", DataTypes.STRING())
                  .field("timestamp", DataTypes.BIGINT())
                  .field("temperature", DataTypes.DOUBLE())
                  .field("pt", DataTypes.TIMESTAMP(3)).proctime() // 指定 pt 字段為處理時間 
                  ) // 定義表結構
                  .createTemporaryTable("inputTable") // 創建臨時表      
              
    c)創建表的 DDL 中指定
        在創建表的 DDL 中,增加一個字段並指定成 proctime,也可以指定當前的時間字段
        代碼段:
                val sinkDDL: String =
                          """
                            |create table dataTable ( 
                            | id varchar(20) not null, 
                            | ts bigint, 
                            | temperature double, 
                            | pt AS PROCTIME() |) with ( 
                            | 'connector.type' = 'filesystem', 
                            | 'connector.path' = 'file:///D:\\..\\sensor.txt', 
                            | 'format.type' = 'csv' |) """.stripMargin 
                        tableEnv.sqlUpdate(sinkDDL) // 執行 DDL

3、事件時間(Event Time)

    事件時間語義,允許表處理程序根據每個記錄中包含的時間生成結果。這樣即使在有亂 序事件或者延遲事件時,也可以獲得正確的結果。 
    
    為了處理無序事件,並區分流中的准時和遲到事件;Flink 需要從事件數據中,提取時間戳,並用來推進事件時間的進展(watermark)。
    
    a)DataStream 轉化成 Table 時指定
        在 DataStream 轉換成 Table,schema 的定義期間,使用.rowtime 可以定義事件時間屬性。 注意必須在轉換的數據流中分配時間戳和 watermark

在將數據流轉換為表時,有兩種定義時間屬性的方法。根據指定的.rowtime 字段名是否存在於數據流的架構中,timestamp 字段可以:
1、作為新字段追加到 schema 2、替換現有字段 在這兩種情況下,定義的事件時間戳字段,都將保存 DataStream 中事件時間戳的值。 代碼段: val inputStream: DataStream[String] = env.readTextFile("\\sensor.txt") val dataStream: DataStream[SensorReading] = inputStream.map(data => { val dataArray = data.split(",") SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble) }) .assignAscendingTimestamps(_.timestamp * 1000L) // 將 DataStream 轉換為 Table,並指定時間字段 val sensorTable = tableEnv.fromDataStream(dataStream, 'id, 'timestamp.rowtime, 'temperature) // 或者,直接追加字段 val sensorTable2 = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp, 'rt.rowtime) b)定義 Table Schema 時指定 這種方法只要在定義 Schema 的時候,將事件時間字段,並指定成 rowtime 就可以了 代碼段: tableEnv.connect( new FileSystem().path("sensor.txt")) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .rowtime( new Rowtime() .timestampsFromField("timestamp") // 從字段中提取時間戳 .watermarksPeriodicBounded(1000)) // watermark 延遲 1 秒 .field("temperature", DataTypes.DOUBLE()) )// 定義表結構 .createTemporaryTable("inputTable") // 創建臨時表 c)創建表的 DDL 中指定 事件時間屬性,是使用 CREATE TABLE DDL 中的 WARDMARK 語句定義的。watermark 語 句,定義現有事件時間字段上的 watermark 生成表達式,該表達式將事件時間字段標記為事 件時間屬性。 代碼段: val sinkDDL: String = """ |create table dataTable ( | id varchar(20) not null, | ts bigint, | temperature double, | rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ), | watermark for rt as rt - interval '1' second |) with ( | 'connector.type' = 'filesystem', | 'connector.path' = 'file:///D:\\..\\sensor.txt', | 'format.type' = 'csv' |) """.stripMargin tableEnv.sqlUpdate(sinkDDL) // 執行 DDL 注意:這里 FROM_UNIXTIME 是系統內置的時間函數,用來將一個整數(秒數)轉換成 “YYYY-MM-DD hh:mm:ss”格式(默認,也可以作為第二個 String 參數傳入)的日期時間 字符串(date time string);
     然后再用 TO_TIMESTAMP 將其轉換成 Timestamp。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM