一.Flink新特性
1.支持Scala2.12
2.對SQL功能進行完善
a.Streaming SQL新增Temporal Tables【時態表】
時態表:時態表記錄了數據改變的歷史狀態,該表可以返回特定時間點的表的內容。
b.Streaming SQL支持模式匹配
模式匹配:Flink CEP是Flink的復雜事件處理庫。它允許在流上定義一系列的模式,最終使得可以方便的抽取自己需要的重要事件。
c.Streaming SQL支持更多特例,例如:REPLACE,REPEAT,LTRIM等函數
3.完善Kafka的最新連接器
二.Blink簡介
阿里巴巴內部Flink版本Blink已經於2019年1月正式開源。Blink最顯著的特點就是強大的SQL能力。
1.強大的流計算引擎
a.阿里雲實時流計算提供Flink SQL,支持各種Fail場景的自動恢復、保證故障情況下數據處理的准確性。
b.支持多種內置函數,包括:字符串函數、日期函數、聚合函數等
c.精確的計算資源控制,高度保證公共雲用戶作業的隔離性。
2.關鍵性能指標為開源Flink的3~4倍,數據計算延遲優化到秒級甚至亞秒級。單個作業吞吐量可做到百萬級別。單集群規模為數千台。
3.深度整合各類雲數據存儲。
三.Flink的編程模型和核心概念
1.基本概念
a.Dlink程序的基礎構建模塊是流【streams】與轉換【transformations】。
b.每一個數據流起始於一個或多個source,並終止於一個或多個sink。

圖解1,單並行度:
圖解2,多並行度:

2.窗口window
a.流上的聚合需要由窗口來划定范圍。比如:“計算最近五分鍾”。
b.窗口通常被區分為不同的類型,比如:滾動窗口【沒有重疊】、滑動窗口【有重疊】、以及會話窗口【有不活動的間隔所打斷】。
3.時間time
a.事件時間,指事件創建的時間。它通常由事件的時間戳描述,例如kafka消息中生成的時間戳。
b.攝入時間,是事件進入Flink數據流運算符的時間。
c.處理時間,是每一個執行時間操作的運算符的本地時間。
圖解:

4.並行度
Flink程序由多個任務組成【source、transformation和sink】。一個任務由多個並行的實例【線程】來執行,一個任務的並行實例【線程】數目被稱為該任務的並行度。
並行度級別:
a.算子級別,設置flink的編程API設置。
b.運行環境級別,設置executionEnvironment的方式設置並行度。
c.客戶端級別,通過設置$FLINK_HOME/bin/flink的-p參數設置。
d.系統級別,設置$FLINK_HOME/conf/flink-conf.yaml文件。
並行度優先級:
算子級別>運算環境級別>客戶端級別>系統級別
備注:並行度不能大於slot的個數!
四.Flink SQL代碼實戰
1.代碼
package cn.sql import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.api.scala._ import org.apache.flink.table.api.TableEnvironment import org.apache.flink.types.Row // 樣例類 case class Word(word : String, frequency : Int) /** * Created by Administrator on 2020/1/22. */ object WordCount { def main(args: Array[String]) { val params : ParameterTool = ParameterTool.fromArgs(args) // 設置execution執行環境 val env = ExecutionEnvironment.getExecutionEnvironment // 設置web界面有效參數 env.getConfig.setGlobalJobParameters(params) val tableEnv = TableEnvironment.getTableEnvironment(env) val text = "Apache Flink apache spark apache solr hbase hive flink kafka redis tachyon redis" val words = text.toLowerCase.split(" ").map(row => Word(row, 1)) val input = env.fromCollection(words) input.count() //val tableData = tableEnv.fromDataSet(input) //tableEnv.registerTable("WordCount", tableData) // 注冊表 tableEnv.registerDataSet("WordCount", input) // 注冊表 val table = tableEnv.sqlQuery( "SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word") .filter("frequency > 1") // 過濾長度大於1 table.printSchema() val result = tableEnv.toDataSet[Row](table) // table轉dataset。備注:必須添加[Row],否則報錯 result.print() } }
2.執行結果


3.備注
table轉dataset時一定要指定[Row],不然會報錯,如下:

