快速開始一個flinksql程序:
表環境准備
//注冊表環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)
注冊表
stu_row_stream:就是一條輸入流,我這里是row類型的輸入流(我把默認readtextfile的string流轉換為了row流)
//獲得table的兩種方式,兩種方式都需要導入api隱式轉換 import org.apache.flink.table.api.scala._ //這個依賴是為了fromdatastream這個方法服務 import org.apache.flink.streaming.api.scala._//這個以來是為了后續把table轉換datastream流服務 // // 方式一:通過注冊表的方式將把注冊到tableEnv中 // tableEnv.registerDataStream("stu_table",stu_row_stream,'id,'name) // //再次掃描表獲得表名 // val student_table: Table = tableEnv.scan("stu_table") //方式二:直接從流中轉換得到表 val student_table: Table = tableEnv.fromDataStream(stu_row_stream,'id,'name)
執行sql語句
這里我用的是scala 的語法
//定義sql語句 val sql= s""" |select name,count(*) as cnt from ${student_table} |group by name """.stripMargin val result_table: Table = tableEnv.sqlQuery(sql)
轉換為datastream
//最后我把table轉為了stream,由於我用了聚合的算子所以使用toRetractStream,如果沒有用聚合算子就使用toAppendStream
val result_stream: DataStream[(Boolean, Row)] = result_table
.toRetractStream[Row]
result_stream.print()
env.execute()
結果解讀:
true代表在狀態中這條數據沒有更改,false代表在狀態中這條數據已經丟棄(因為count的結果需要疊加)

基礎知識
table轉換為datastream流:
1.追加模式 toappendstream
2.撤回模式 retractstream 添加/刪除
3.更新插入流 upsert
upsert中包含upsert信息和delete信息
修改/刪除,可以
兩種模式主要是看是否會用到聚合的情況。

動態表Dynamic Tables:
動態表是隨着時間變化的
持續查詢:連續查詢永遠不會停止,並會生成另一個動態表,查詢不斷的更新動態的結果。
每條流數據其實可以理解為對結果表的插入操作。

每條流數據來了只會輸出有改變的數據。
時間特性 time attributes:
時間屬性可以是每個表schema的一部分,定義了時間屬性就可以作為一個字段引用,就是常規的時間戳,可以訪問並且計算。
定義處理時間:processing time
.proctime
只需要在注冊表時最后一個字段加上proctime即可。注意:最后一個字段不是原有的字段,而是在方法中自己添加的字段,因為是處理時間。

定義事件時間:event time
事件時間和水位線需要在流式數據上進行設置。
你可以追加一個事件時間字段,或者使用原來時間字段作為事件時間
.rowtime

窗口:
時間語義需要配合窗口操作才能發揮作用
gruop windows 分組窗口:(整張表)
根據時間或行數據,把數據聚合到組中。

滾動窗口:tumbling windows
可以通過時間開窗,也可以通過行數開窗,之所以行數也要時間字段是因為需要對數據進行排序
滑動窗口:sliding windows
會話窗口:session window
over windows 開窗函數:(針對某行)
針對每個輸入行,計算附近行范圍的聚合。
適應需求:針對當前行,它之前之后固定范圍要做的事情。但是目前還不支持之后的數據,只支持之前的數據。

