Flink sql.client簡單使用


數據源設置kafka,配置單獨yaml:

  • 文件名:sql.yaml
#==============================================================================
# Table Sources
#==============================================================================

# Define table sources and sinks here.

tables: # empty list
# A typical table source definition looks like:
 - name: testTable # 表名稱
   type: source   # 表類型,可配置 ‘source’ or ‘sink’
   update-mode: append  #  表中數據更新方式, append/update(update只能寫入支持更新的外部存儲,如MySQL、HBase)
   connector:  # 連接器
      property-version: 1  
      type: kafka  # 連接kafka
      version: 0.11  # 0.11以上版本選擇 universal
      topic: orderSQL  # 消費的topic
      startup-mode: earliest-offset # 消費方式 earliest-offset從頭消費,latest-offset從最新消費
      properties:  # zk、kafka端口及ip地址
      - key: zookeeper.connect
        value: localhost:2181
      - key: bootstrap.servers
        value: localhost:9092
      - key: group.id
        value: test-consumer-group
   format:  # 解析數據格式化
      property-version: 1
      type: json  
      schema: "ROW(order_id LONG, shop_id VARCHAR, member_id LONG, trade_amt DOUBLE, pay_time TIMESTAMP)"
   schema:
      - name: order_id
        type: LONG
      - name: shop_id
        type: VARCHAR
      - name: member_id
        type: LONG
      - name: trade_amt
        type: DOUBLE
      - name: payment_time  # 輸出字段由pay_time變更為 payment_time 
        type: TIMESTAMP
        rowtime:
          timestamps:
            type: "from-field"  # 時間戳字段獲取方式:來自源表字段
            from: "pay_time"  # 時間戳字段:源表的時間戳字段
          watermarks:  # 水印
            type: "periodic-bounded"  # 定義周期性水印
            delay: "60000"  # 最大延遲

#==============================================================================
# User-defined functions
#==============================================================================

# Define scalar, aggregate, or table functions here.

functions: [] # empty list
# A typical function definition looks like:
# - name: ...
#   from: class
#   class: ...
#   constructor: ...

#==============================================================================
# Execution properties
#==============================================================================

# Execution properties allow for changing the behavior of a table program.

execution:
  # 'batch' or 'streaming' execution
  type: streaming
  # allow 'event-time' or only 'processing-time' in sources
  time-characteristic: event-time
  # interval in ms for emitting periodic watermarks
  periodic-watermarks-interval: 200
  # 'changelog' or 'table' presentation of results
  result-mode: table
  # maximum number of maintained rows in 'table' presentation of results
  max-table-result-rows: 1000000
  # parallelism of the program
  parallelism: 1
  # maximum parallelism
  max-parallelism: 128
  # minimum idle state retention in ms
  min-idle-state-retention: 3600000
  # maximum idle state retention in ms
  max-idle-state-retention: 7200000

#==============================================================================
# Deployment properties
#==============================================================================

# Deployment properties allow for describing the cluster to which table
# programs are submitted to.

deployment:
  # general cluster communication timeout in ms
  response-timeout: 5000
  # (optional) address from cluster to gateway
  gateway-address: ""
  # (optional) port from cluster to gateway
  gateway-port: 0
  • 添加依賴包:

flink集群lib目錄下都得有這些jar包。

執行sql-client.sh

[root@rdx flink]# pwd
/opt/rdx/flink
[root@rdx flink]# bin/sql-client.sh embedded -d conf/sql.yaml -l lib/

看到松鼠圖案就代表跑起來了!

運行SQL語句:

  • 在運行之前往配置文件中設置的topic里輸入幾條數據
{"order_id": "1","shop_id": "AF18","member_id": "3410211","trade_amt": "100.00","pay_time": "2021-01-18T16:57:00Z"}
{"order_id": "2","shop_id": "AF20","member_id": "3410213","trade_amt": "130.00","pay_time": "2021-01-18T16:58:00Z"}
{"order_id": "3","shop_id": "AF18","member_id": "3410212","trade_amt": "120.00","pay_time": "2021-01-18T16:58:35Z"}
{"order_id": "4","shop_id": "AF19","member_id": "3410212","trade_amt": "100.00","pay_time": "2021-01-18T16:59:00Z"}
{"order_id": "5","shop_id": "AF18","member_id": "3410211","trade_amt": "150.00","pay_time": "2021-01-18T16:59:30Z"}
{"order_id": "6","shop_id": "AF18","member_id": "3410211","trade_amt": "110.00","pay_time": "2021-01-18T17:00:12Z"}
{"order_id": "7","shop_id": "AF19","member_id": "3410213","trade_amt": "110.00","pay_time": "2021-01-18T17:01:00Z"}
{"order_id": "8","shop_id": "AF20","member_id": "3410211","trade_amt": "100.00","pay_time": "2021-01-18T17:01:44Z"}
{"order_id": "9","shop_id": "AF17","member_id": "3410212","trade_amt": "110.00","pay_time": "2021-01-18T17:02:15Z"}
{"order_id": "10","shop_id": "AF20","member_id": "3410212","trade_amt": "110.00","pay_time": "2021-01-18T17:03:15Z"}
{"order_id": "11","shop_id": "AF19","member_id": "3410212","trade_amt": "110.00","pay_time": "2021-01-18T17:03:33Z"}
{"order_id": "12","shop_id": "AF17","member_id": "3410212","trade_amt": "110.00","pay_time": "2021-01-18T17:04:15Z"}
  • 執行簡單查詢sql
Flink SQL> select * from testTable;

  • 一分鍾固定窗口計算
SELECT
  shop_id
  , TUMBLE_START(payment_time, INTERVAL '1' MINUTE) AS tumble_start
  , TUMBLE_END(payment_time, INTERVAL '1' MINUTE)   AS tumble_end
  , sum(trade_amt)                             AS amt
FROM testTable
GROUP BY shop_id, TUMBLE(payment_time, INTERVAL '1' MINUTE);


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM