數據源設置kafka,配置單獨yaml:
- 文件名:sql.yaml
#==============================================================================
# Table Sources
#==============================================================================
# Define table sources and sinks here.
tables: # empty list
# A typical table source definition looks like:
- name: testTable # 表名稱
type: source # 表類型,可配置 ‘source’ or ‘sink’
update-mode: append # 表中數據更新方式, append/update(update只能寫入支持更新的外部存儲,如MySQL、HBase)
connector: # 連接器
property-version: 1
type: kafka # 連接kafka
version: 0.11 # 0.11以上版本選擇 universal
topic: orderSQL # 消費的topic
startup-mode: earliest-offset # 消費方式 earliest-offset從頭消費,latest-offset從最新消費
properties: # zk、kafka端口及ip地址
- key: zookeeper.connect
value: localhost:2181
- key: bootstrap.servers
value: localhost:9092
- key: group.id
value: test-consumer-group
format: # 解析數據格式化
property-version: 1
type: json
schema: "ROW(order_id LONG, shop_id VARCHAR, member_id LONG, trade_amt DOUBLE, pay_time TIMESTAMP)"
schema:
- name: order_id
type: LONG
- name: shop_id
type: VARCHAR
- name: member_id
type: LONG
- name: trade_amt
type: DOUBLE
- name: payment_time # 輸出字段由pay_time變更為 payment_time
type: TIMESTAMP
rowtime:
timestamps:
type: "from-field" # 時間戳字段獲取方式:來自源表字段
from: "pay_time" # 時間戳字段:源表的時間戳字段
watermarks: # 水印
type: "periodic-bounded" # 定義周期性水印
delay: "60000" # 最大延遲
#==============================================================================
# User-defined functions
#==============================================================================
# Define scalar, aggregate, or table functions here.
functions: [] # empty list
# A typical function definition looks like:
# - name: ...
# from: class
# class: ...
# constructor: ...
#==============================================================================
# Execution properties
#==============================================================================
# Execution properties allow for changing the behavior of a table program.
execution:
# 'batch' or 'streaming' execution
type: streaming
# allow 'event-time' or only 'processing-time' in sources
time-characteristic: event-time
# interval in ms for emitting periodic watermarks
periodic-watermarks-interval: 200
# 'changelog' or 'table' presentation of results
result-mode: table
# maximum number of maintained rows in 'table' presentation of results
max-table-result-rows: 1000000
# parallelism of the program
parallelism: 1
# maximum parallelism
max-parallelism: 128
# minimum idle state retention in ms
min-idle-state-retention: 3600000
# maximum idle state retention in ms
max-idle-state-retention: 7200000
#==============================================================================
# Deployment properties
#==============================================================================
# Deployment properties allow for describing the cluster to which table
# programs are submitted to.
deployment:
# general cluster communication timeout in ms
response-timeout: 5000
# (optional) address from cluster to gateway
gateway-address: ""
# (optional) port from cluster to gateway
gateway-port: 0
-
添加依賴包:
flink集群lib目錄下都得有這些jar包。
執行sql-client.sh
[root@rdx flink]# pwd
/opt/rdx/flink
[root@rdx flink]# bin/sql-client.sh embedded -d conf/sql.yaml -l lib/
看到松鼠圖案就代表跑起來了!
運行SQL語句:
- 在運行之前往配置文件中設置的topic里輸入幾條數據
{"order_id": "1","shop_id": "AF18","member_id": "3410211","trade_amt": "100.00","pay_time": "2021-01-18T16:57:00Z"}
{"order_id": "2","shop_id": "AF20","member_id": "3410213","trade_amt": "130.00","pay_time": "2021-01-18T16:58:00Z"}
{"order_id": "3","shop_id": "AF18","member_id": "3410212","trade_amt": "120.00","pay_time": "2021-01-18T16:58:35Z"}
{"order_id": "4","shop_id": "AF19","member_id": "3410212","trade_amt": "100.00","pay_time": "2021-01-18T16:59:00Z"}
{"order_id": "5","shop_id": "AF18","member_id": "3410211","trade_amt": "150.00","pay_time": "2021-01-18T16:59:30Z"}
{"order_id": "6","shop_id": "AF18","member_id": "3410211","trade_amt": "110.00","pay_time": "2021-01-18T17:00:12Z"}
{"order_id": "7","shop_id": "AF19","member_id": "3410213","trade_amt": "110.00","pay_time": "2021-01-18T17:01:00Z"}
{"order_id": "8","shop_id": "AF20","member_id": "3410211","trade_amt": "100.00","pay_time": "2021-01-18T17:01:44Z"}
{"order_id": "9","shop_id": "AF17","member_id": "3410212","trade_amt": "110.00","pay_time": "2021-01-18T17:02:15Z"}
{"order_id": "10","shop_id": "AF20","member_id": "3410212","trade_amt": "110.00","pay_time": "2021-01-18T17:03:15Z"}
{"order_id": "11","shop_id": "AF19","member_id": "3410212","trade_amt": "110.00","pay_time": "2021-01-18T17:03:33Z"}
{"order_id": "12","shop_id": "AF17","member_id": "3410212","trade_amt": "110.00","pay_time": "2021-01-18T17:04:15Z"}
- 執行簡單查詢sql
Flink SQL> select * from testTable;
- 一分鍾固定窗口計算
SELECT
shop_id
, TUMBLE_START(payment_time, INTERVAL '1' MINUTE) AS tumble_start
, TUMBLE_END(payment_time, INTERVAL '1' MINUTE) AS tumble_end
, sum(trade_amt) AS amt
FROM testTable
GROUP BY shop_id, TUMBLE(payment_time, INTERVAL '1' MINUTE);