FlinkSQL 編程


快速開始一個flinksql程序:

表環境准備

//注冊表環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)

 

  

注冊表

stu_row_stream:就是一條輸入流,我這里是row類型的輸入流(我把默認readtextfile的string流轉換為了row流)
//獲得table的兩種方式,兩種方式都需要導入api隱式轉換
    import org.apache.flink.table.api.scala._ //這個依賴是為了fromdatastream這個方法服務
    import org.apache.flink.streaming.api.scala._//這個以來是為了后續把table轉換datastream流服務
//    // 方式一:通過注冊表的方式將把注冊到tableEnv中
//    tableEnv.registerDataStream("stu_table",stu_row_stream,'id,'name)
//    //再次掃描表獲得表名
//    val student_table: Table = tableEnv.scan("stu_table")

    //方式二:直接從流中轉換得到表
    val student_table: Table = tableEnv.fromDataStream(stu_row_stream,'id,'name)

 

 

執行sql語句

這里我用的是scala 的語法

   //定義sql語句
    val sql=
      s"""
         |select name,count(*) as cnt from ${student_table}
         |group by name
      """.stripMargin

    val result_table: Table = tableEnv.sqlQuery(sql)

 

轉換為datastream

 //最后我把table轉為了stream,由於我用了聚合的算子所以使用toRetractStream,如果沒有用聚合算子就使用toAppendStream
val result_stream: DataStream[(Boolean, Row)] = result_table
      .toRetractStream[Row]

    result_stream.print()

   env.execute()

結果解讀:

true代表在狀態中這條數據沒有更改,false代表在狀態中這條數據已經丟棄(因為count的結果需要疊加)

 

基礎知識

table轉換為datastream流:

1.追加模式 toappendstream

2.撤回模式 retractstream 添加/刪除

3.更新插入流 upsert 

upsert中包含upsert信息和delete信息

修改/刪除,可以

兩種模式主要是看是否會用到聚合的情況。

 

 

 

動態表Dynamic Tables:
動態表是隨着時間變化的

持續查詢:連續查詢永遠不會停止,並會生成另一個動態表,查詢不斷的更新動態的結果。

每條流數據其實可以理解為對結果表的插入操作。

 

 

 

每條流數據來了只會輸出有改變的數據。

 

時間特性 time attributes:

時間屬性可以是每個表schema的一部分,定義了時間屬性就可以作為一個字段引用,就是常規的時間戳,可以訪問並且計算。

定義處理時間:processing time

.proctime

只需要在注冊表時最后一個字段加上proctime即可。注意:最后一個字段不是原有的字段,而是在方法中自己添加的字段,因為是處理時間。

 

 

 定義事件時間:event time

事件時間和水位線需要在流式數據上進行設置。

你可以追加一個事件時間字段,或者使用原來時間字段作為事件時間

.rowtime 

 

 

 

 

 

 

 

 

 

窗口:

時間語義需要配合窗口操作才能發揮作用

 

gruop windows 分組窗口:(整張表)

根據時間或行數據,把數據聚合到組中。

 

 

 

 

滾動窗口:tumbling windows

可以通過時間開窗,也可以通過行數開窗,之所以行數也要時間字段是因為需要對數據進行排序

 

 

滑動窗口:sliding windows

 

 

會話窗口:session window

 

 

 

over windows 開窗函數:(針對某行)

針對每個輸入行,計算附近行范圍的聚合。

適應需求:針對當前行,它之前之后固定范圍要做的事情。但是目前還不支持之后的數據,只支持之前的數據。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM