Flink SQL Client初探


歡迎訪問我的GitHub

https://github.com/zq2599/blog_demos

內容:所有原創文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;

Flink Table & SQL的API實現了通過SQL語言處理實時技術算業務,但還是要編寫部分Java代碼(或Scala),並且還要編譯構建才能提交到Flink運行環境,這對於不熟悉Java或Scala的開發者就略有些不友好了;
SQL Client的目標就是解決上述問題(官方原話with a build tool before being submitted to a cluster.

局限性

遺憾的是,在Flink-1.10.0版本中,SQL Client只是個Beta版本(不適合用於生產環境),並且只能連接到本地Flink,不能像mysql、cassandra等客戶端工具那樣遠程連接server,這些在將來的版本會解決:
在這里插入圖片描述

環境信息

接下來采用實戰的方式對Flink SQL Client做初步嘗試,環境信息如下:

  1. 電腦:MacBook Pro2018 13寸,macOS Catalina 10.15.3
  2. Flink:1.10.0
  3. JDK:1.8.0_211
  1. 下載flink包,地址:http://ftp.kddilabs.jp/infosystems/apache/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz
  2. 解壓:tar -zxvf flink-1.10.0-bin-scala_2.11.tgz
  3. 進目錄flink-1.10.0/bin/,執行命令./start-cluster.sh啟動本地flink;
  4. 訪問該機器的8081端口,可見本地flink啟動成功:
    5

啟動SQL Client CLI

  1. 在目錄flink-1.10.0/bin/執行./sql-client.sh即可啟動SQL Client CLI,如下圖所示,紅框中的BETA提醒着在生產環境如果要用此工具:
    在這里插入圖片描述

  2. 第一個要掌握的是HELP命令:
    在這里插入圖片描述

  3. 從hello world開始把,執行命令select ‘Hello world!’;,控制台輸出如下圖所示,輸入Q可退出:
    在這里插入圖片描述

兩種展示模式

  1. 第一種是table mode,效果像是對普通數據表的查詢,設置該模式的命令:
SET execution.result-mode=table;
  1. 第二種是changelog mode,效果像是打印每一次數據變更的日志,設置該模式的命令:
SET execution.result-mode=changelog;
  1. 設置table mode后,執行以下命令作一次簡單的分組查詢:
SELECT name, 
  COUNT(*) AS cnt 
  FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) 
  AS NameTable(name) 
  GROUP BY name;
  1. 為了便於對比,下圖同時貼上兩種模式的查詢結果,注意綠框中顯示了該行記錄是增加還是刪除:
    在這里插入圖片描述

  2. 不論是哪種模式,查詢結構都保存在SQL Client CLI進程的堆內存中;

  3. 在chenglog模式下,為了保證控制台可以正常輸入輸出,查詢結果只展示最近1000條;

  4. table模式下,可以翻頁查詢更多結果,結果數量受配置項max-table-result-rows以及可用堆內存限制;

進一步體驗

前面寫了幾行SQL,對Flink SQL Client有了最基本的感受,接下來做進一步的體驗,內容如下:

  1. 創建CSV文件,這是個最簡單的圖書信息表,只有三個字段:名字、數量、類目,一共十條記錄;
  2. 創建SQL Client用到的環境配置文件,該文件描述了數據源以及對應的表的信息;
  3. 啟動SQL Client,執行SQL查詢上述CSV文件;
  4. 整個操作步驟如下圖所示:
    在這里插入圖片描述

操作

  1. 首先請確保Flink已經啟動;
  2. 創建名為book-store.csv的文件,內容如下:
name001,1,aaa
name002,2,aaa
name003,3,bbb
name004,4,bbb
name005,5,bbb
name006,6,ccc
name007,7,ccc
name008,8,ccc
name009,9,ccc
name010,10,ccc
  1. flink-1.10.0/conf目錄下創建名為book-store.yaml的文件,內容如下:
tables:
  - name: BookStore
    type: source-table
    update-mode: append
    connector:
      type: filesystem
      path: "/Users/zhaoqin/temp/202004/26/book-store.csv"
    format:
      type: csv
      fields:
        - name: BookName
          type: VARCHAR
        - name: BookAmount
          type: INT
        - name: BookCatalog
          type: VARCHAR
      line-delimiter: "\n"
      comment-prefix: ","
    schema:
      - name: BookName
        type: VARCHAR
      - name: BookAmount
        type: INT
      - name: BookCatalog
        type: VARCHAR
  - name: MyBookView
    type: view
    query: "SELECT BookCatalog, SUM(BookAmount) AS Amount FROM BookStore GROUP BY BookCatalog"


execution:
  planner: blink                    # optional: either 'blink' (default) or 'old'
  type: streaming                   # required: execution mode either 'batch' or 'streaming'
  result-mode: table                # required: either 'table' or 'changelog'
  max-table-result-rows: 1000000    # optional: maximum number of maintained rows in
                                    #   'table' mode (1000000 by default, smaller 1 means unlimited)
  time-characteristic: event-time   # optional: 'processing-time' or 'event-time' (default)
  parallelism: 1                    # optional: Flink's parallelism (1 by default)
  periodic-watermarks-interval: 200 # optional: interval for periodic watermarks (200 ms by default)
  max-parallelism: 16               # optional: Flink's maximum parallelism (128 by default)
  min-idle-state-retention: 0       # optional: table program's minimum idle state time
  max-idle-state-retention: 0       # optional: table program's maximum idle state time

                                    #   (default database of the current catalog by default)
  restart-strategy:                 # optional: restart strategy
    type: fallback                  #   "fallback" to global restart strategy by default

# Configuration options for adjusting and tuning table programs.

# A full list of options and their default values can be found
# on the dedicated "Configuration" page.
configuration:
  table.optimizer.join-reorder-enabled: true
  table.exec.spill-compression.enabled: true
  table.exec.spill-compression.block-size: 128kb

# Properties that describe the cluster to which table programs are submitted to.

deployment:
  response-timeout: 5000
  1. 對於book-store.yaml文件,有以下幾處需要注意:

a. tables.type等於source-table,表明這是數據源的配置信息;

b. tables.connector描述了詳細的數據源信息,path是book-store.csv文件的完整路徑;

c. tables.format描述了文件內容;

d. tables.schema描述了數據源表的表結構;

e. type為view表示MyBookView是個視圖(參考數據庫的視圖概念);

  1. flink-1.10.0目錄執行以下命令,即可啟動SQL Client,並指定book-store.yaml為環境配置:
bin/sql-client.sh embedded -d conf/book-store.yaml
  1. 查全表:
SELECT * FROM BookStore;

在這里插入圖片描述

  1. 按照BookCatalog分組統計記錄數:
SELECT BookCatalog, COUNT(*) AS BookCount FROM BookStore GROUP BY BookCatalog;

在這里插入圖片描述

  1. 查詢視圖:
select * from MyBookView;

在這里插入圖片描述

至此,Flink SQL Client的初次體驗就完成了,咱們此工具算是有了基本了解,接下來的文章會進一步使用Flink SQL Client做些復雜的操作;

歡迎關注公眾號:程序員欣宸

微信搜索「程序員欣宸」,我是欣宸,期待與您一同暢游Java世界...
https://github.com/zq2599/blog_demos


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM