Flink&Blink【編程模型、核心概念、SQL代碼實戰】


一.Flink新特性

  1.支持Scala2.12

  2.對SQL功能進行完善

    a.Streaming SQL新增Temporal Tables【時態表】

      時態表:時態表記錄了數據改變的歷史狀態,該表可以返回特定時間點的表的內容。

    b.Streaming SQL支持模式匹配

      模式匹配:Flink CEP是Flink的復雜事件處理庫。它允許在流上定義一系列的模式,最終使得可以方便的抽取自己需要的重要事件。

    c.Streaming SQL支持更多特例,例如:REPLACE,REPEAT,LTRIM等函數

  3.完善Kafka的最新連接器

二.Blink簡介

  阿里巴巴內部Flink版本Blink已經於2019年1月正式開源。Blink最顯著的特點就是強大的SQL能力。

  1.強大的流計算引擎

    a.阿里雲實時流計算提供Flink SQL,支持各種Fail場景的自動恢復、保證故障情況下數據處理的准確性。

    b.支持多種內置函數,包括:字符串函數、日期函數、聚合函數等

    c.精確的計算資源控制,高度保證公共雲用戶作業的隔離性。

  2.關鍵性能指標為開源Flink的3~4倍,數據計算延遲優化到秒級甚至亞秒級。單個作業吞吐量可做到百萬級別。單集群規模為數千台。

  3.深度整合各類雲數據存儲。

三.Flink的編程模型和核心概念

  1.基本概念

    a.Dlink程序的基礎構建模塊是流【streams】與轉換【transformations】。

    b.每一個數據流起始於一個或多個source,並終止於一個或多個sink。

    

    圖解1,單並行度:

      

    圖解2,多並行度:

    

  2.窗口window

    a.流上的聚合需要由窗口來划定范圍。比如:“計算最近五分鍾”。

    b.窗口通常被區分為不同的類型,比如:滾動窗口【沒有重疊】、滑動窗口【有重疊】、以及會話窗口【有不活動的間隔所打斷】。

  3.時間time

    a.事件時間,指事件創建的時間。它通常由事件的時間戳描述,例如kafka消息中生成的時間戳。

    b.攝入時間,是事件進入Flink數據流運算符的時間。

    c.處理時間,是每一個執行時間操作的運算符的本地時間。

    圖解:

    

  4.並行度

    Flink程序由多個任務組成【source、transformation和sink】。一個任務由多個並行的實例【線程】來執行,一個任務的並行實例【線程】數目被稱為該任務的並行度。

    並行度級別:

      a.算子級別,設置flink的編程API設置。

      b.運行環境級別,設置executionEnvironment的方式設置並行度。

      c.客戶端級別,通過設置$FLINK_HOME/bin/flink的-p參數設置。

      d.系統級別,設置$FLINK_HOME/conf/flink-conf.yaml文件。

    並行度優先級:

      算子級別>運算環境級別>客戶端級別>系統級別

    備注:並行度不能大於slot的個數!

四.Flink SQL代碼實戰

  1.代碼

package cn.sql

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.types.Row

// 樣例類
case class Word(word : String, frequency : Int)

/**
  * Created by Administrator on 2020/1/22.
  */
object WordCount {
  def main(args: Array[String]) {
    val params : ParameterTool = ParameterTool.fromArgs(args)

    // 設置execution執行環境
    val env = ExecutionEnvironment.getExecutionEnvironment

    // 設置web界面有效參數
    env.getConfig.setGlobalJobParameters(params)

    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val text = "Apache Flink apache spark apache solr hbase hive flink kafka redis tachyon redis"
    val words = text.toLowerCase.split(" ").map(row => Word(row, 1))

    val input = env.fromCollection(words)
    input.count()

    //val tableData = tableEnv.fromDataSet(input)
    //tableEnv.registerTable("WordCount", tableData) // 注冊表
    tableEnv.registerDataSet("WordCount", input) // 注冊表

    val table = tableEnv.sqlQuery(
          "SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word")
        .filter("frequency > 1") // 過濾長度大於1

    table.printSchema()

    val result = tableEnv.toDataSet[Row](table) // table轉dataset。備注:必須添加[Row],否則報錯
    result.print()
  }
}

  2.執行結果

    

 

    

 

  3.備注

    table轉dataset時一定要指定[Row],不然會報錯,如下:

    


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM