1.下載flink 安裝包
官網地址
https://flink.apache.org/downloads.html#apache-flink-1112
下載地址:
https://www.apache.org/dyn/closer.lua/flink/flink-1.11.2/flink-1.11.2-bin-scala_2.11.tgz
2.解壓安裝包
tar -xvf flink-1.11.2-bin-scala_2.11.tgz
3.啟動flink
sudo ./bin/start-cluster.sh
(是否啟動成功請訪問 http://192.168.**.**:8081/#/overview )
4.啟動 sql-client
sudo ./bin/sql-client.sh embedded
5.啟動成功 會進入 flink sql> 命令行界面 ( 輸入 quit; 退出)
6.1 基本用法
SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS nameTable(name) GROUP BY name;
6.2 連接kafka
6.2.1 增加擴展包(默認的只支持csv,file等文件系統)
創建新文件夾kafka, 增加 flink-json-1.11.2.jar, flink-sql-connector-kafka_2.12-1.11.2.jar (可以通過maven來下載到本地,再復制到kafka文件夾里)
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka_2.12</artifactId> <version>1.11.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency>
6.2.2 啟動sql-client
sudo ./bin/sql-client.sh embedded -l kafka/
-l,--library <JAR directory> 可以參考 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sqlClient.html
6.2.3 創建 kafka Table
CREATE TABLE CustomerStatusChangedEvent(customerId int, oStatus int, nStatus int)with('connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.properties.group.id' = 'g2.group1', 'connector.properties.bootstrap.servers' = '192.168.1.85:9092,192.168.1.86:9092', 'connector.properties.zookeeper.connect' = '192.168.1.85:2181', 'connector.topic' = 'customer_statusChangedEvent', 'connector.startup-mode' = 'earliest-offset', 'format.type' = 'json');
6.2.4 執行查詢
select * from CustomerStatusChangedEvent ;
6.2.5 結果輸出
6.2.5 插入數據
insert into CustomerStatusChangedEvent(customerId,oStatus,nStatus) values(1001,1,2);
insert into CustomerStatusChangedEvent(customerId,oStatus,nStatus) values(1001,1,2),(1002,10,2),(1003,1,20);