flink-sql-client使用kafka表格


1.下載flink 安裝包

官網地址

https://flink.apache.org/downloads.html#apache-flink-1112

下載地址:

https://www.apache.org/dyn/closer.lua/flink/flink-1.11.2/flink-1.11.2-bin-scala_2.11.tgz

 

2.解壓安裝包

tar -xvf flink-1.11.2-bin-scala_2.11.tgz

3.啟動flink

sudo ./bin/start-cluster.sh

(是否啟動成功請訪問 http://192.168.**.**:8081/#/overview )

 

 

4.啟動 sql-client

sudo ./bin/sql-client.sh embedded

 

5.啟動成功 會進入 flink sql> 命令行界面 ( 輸入 quit; 退出)

 

 

6.1 基本用法

SELECT name, COUNT(*) AS cnt 
FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS nameTable(name) 
GROUP BY name;

6.2 連接kafka

  6.2.1 增加擴展包(默認的只支持csv,file等文件系統)

    創建新文件夾kafka, 增加 flink-json-1.11.2.jar, flink-sql-connector-kafka_2.12-1.11.2.jar   (可以通過maven來下載到本地,再復制到kafka文件夾里)

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

 

  6.2.2 啟動sql-client

  sudo ./bin/sql-client.sh embedded -l kafka/

 -l,--library <JAR directory>  可以參考 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sqlClient.html

  6.2.3 創建 kafka Table

  

CREATE TABLE CustomerStatusChangedEvent(customerId int,
oStatus int,
nStatus int)with('connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.properties.group.id' = 'g2.group1',
'connector.properties.bootstrap.servers' = '192.168.1.85:9092,192.168.1.86:9092',
'connector.properties.zookeeper.connect' = '192.168.1.85:2181',
'connector.topic' = 'customer_statusChangedEvent',
'connector.startup-mode' = 'earliest-offset',
'format.type' = 'json');

6.2.4 執行查詢

select *  from CustomerStatusChangedEvent ;

6.2.5 結果輸出

 

 6.2.5 插入數據

insert into CustomerStatusChangedEvent(customerId,oStatus,nStatus)
values(1001,1,2);

 

insert into CustomerStatusChangedEvent(customerId,oStatus,nStatus)
values(1001,1,2),(1002,10,2),(1003,1,20);

 

 

 

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM