Flink+Kafka實時計算快速開始
一、概述
本示例場景(SQLServer->Kafka-Connect)實時流表與維度表(MySQL) JOIN得到數據實時寫入到MySQL中的場景。
另外本示例是通過Kafka-connect 實現CDC, 實際上,Flink最新版本已直接支持CDC(截止到當前2021/11,僅支持MySQL、Oracle、PostgresSQL、MongoDB, 但還不支持SQLServer)。
FlinkCDC官方文檔:https://ververica.github.io/flink-cdc-connectors/master/
注意:本文主要講基於SQLServer的CDC, 部分涉及MySQL(如開啟CDC、注冊Connector)僅是為了記錄下來備忘。
本場景示例關鍵步驟說明:
0.開啟源庫CDC、部署好Kafka、Flink基礎環境
1.創建表鏈接實時流,kafka消息隊列中的應用日志表(app_log)實時數據
2.創建表鏈接維度表,公司維度表(dim_company)
3.創建表鏈接結果表, 公司訪問實時統計表(dws_company_vist)
4.維表與實時流表 JOIN 寫入到結果表, insert into as select ...
二、數據庫開啟CDC
2.1.MySQL開啟支持CDC
注意:本示例無需配置mysql-cdc,僅作記錄備忘。
mysql需要binlog_format為ROW格式,我安裝的mysql版本默認即為此格式,可先查看一下
mysql> show global variables like "%binlog_format%";
手動修改配置文件my.cnf,修改完需要重啟生效
# vi /etc/my.cnf
binlog_format=ROW
log-bin=mysql-bin
也可在線修改,重啟后失效
mysql> SET global binlog_format='ROW';
2.2.SQLServer開啟支持CDC
需要監聽SQL Server中已有的數據表,需完成以下配置:
2.2.1. 執行以下命令開啟CDC配置。
-- 開啟CDC支持
USE testDB
GO
EXEC sys.sp_cdc_enable_db
GO
2.2.2. 執行以下命令開啟指定Table的CDC配置。
-- 開啟指定Table的CDC配置。
USE testdb
GO
exec sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N't_app',
@role_name = null,
@supports_net_changes = 1
GO
2.2.3. 執行以下命令確認是否有權限訪問CDC Table。
EXEC sys.sp_cdc_help_change_data_capture
GO
說明 如果返回結果為空,您需要確認是否有權限訪問該表。
2.2.4. 執行以下命令確認SQL Server Agent已開啟。
EXEC master.dbo.xp_servicecontrol N'QUERYSTATE',N'SQLSERVERAGENT'
GO
說明 如果返回結果為Running
,則說明SQL Server Agent已開啟。
2.2.5. 查看表cdc開啟狀態
SELECT is_tracked_by_cdc FROM sys.tables WHERE name='t_app';
說明 查詢結果為“1”,表示開啟成功。
2.2.6.SQLServer官方文檔CDC參考
2.3.Oracle開啟CDC
實操步驟后續待實踐后補上
可參考:https://www.cnblogs.com/myrunning/p/5329139.html
三、Kafka部署
3.1. Kafka下載
$ cd /usr/local/src/
$ wget --no-check-certificate https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.1/kafka_2.12-2.8.1.tgz
$ mkdir -p /usr/local/kafka/
$ tar xvf kafka_2.13-2.8.1.tgz -C /usr/local/kafka/
3.2. Kafka配置
# vi /usr/local/kafka/config/server.properties
log.dirs=/usr/local/kafka/logs
listeners=PLAINTEXT://192.168.1.100:9092
advertised.listeners=PLAINTEXT://192.168.1.100:9092
3.3. 啟動kafka
$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
$ bin/kafka-server-start.sh -daemon config/server.properties
3.4.安裝Kafka Connect
3.4.1. 配置Connector
#vi /usr/local/kafka/config/connect-distributed.properties
#kafka集群地址
bootstrap.servers=192.168.1.100:9092
# 同一集群中group.id需要配置一致,且不能和別的消費者同名
group.id=connect-cluster
#使用json數據需配置成false
key.converter.schemas.enable=false
value.converter.schemas.enable=false
#保存connectors的路徑
plugin.path=/usr/local/kafka/plugins
3.4.2.安裝connectors插件
$ mkdir -p /usr/local/kafka/plugins
從https://www.confluent.io/hub/ 下載 connector 解壓到/usr/local/kafka/plugins
我這里下載了如下:
debezium-debezium-connector-mysql-1.7.0.zip
debezium-debezium-connector-sqlserver-1.7.0.zip
confluentinc-kafka-connect-oracle-cdc-1.3.1.zip
confluentinc-kafka-connect-jdbc-10.2.5.zip
3.4.3.啟動Kafka Connect
$ cd /usr/local/kafka
$ bin/connect-distributed.sh -daemon config/connect-distributed.properties
成功啟動后就會自動創建相應的Topic
3.4.4.通過rest接口查看已安裝的connector
$ curl -X GET http://192.168.1.100:8083/connector-plugins
返回信息如下:
[
{
"class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"type": "sink",
"version": "10.2.5"
},
{
"class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"type": "source",
"version": "10.2.5"
},
{
"class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector",
"type": "source",
"version": "1.3.1"
},
{
"class": "io.debezium.connector.mysql.MySqlConnector",
"type": "source",
"version": "1.7.0.Final"
},
{
"class": "io.debezium.connector.sqlserver.SqlServerConnector",
"type": "source",
"version": "1.7.0.Final"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "2.8.1"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "2.8.1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "1"
}
]
3.5.提交Connector用戶配置
通過kafka-connect rest-api接口進行注冊配置connector
#注冊connector,sqlserver-cdc
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://192.168.1.100:8083/connectors/ -d '{"name":"myserver-cdc-connector","config":{"connector.class":"io.debezium.connector.sqlserver.SqlServerConnector","database.hostname":"192.168.1.102","database.port":"1433","database.user":"sa","database.password":"xxx","database.dbname":"testdb","database.server.name":"myserver","table.include.list":"dbo.app_log","database.history.kafka.bootstrap.servers":"192.168.1.100:9092","database.history.kafka.topic":"dbhistory.myserver","decimal.handling.mode":"double","time.precision.mode":"connect"}}'
##更新connector, sqlserver-cdc
$ curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://192.168.1.100:8083/connectors/myserver-cdc-connector/config/ -d '{"connector.class":"io.debezium.connector.sqlserver.SqlServerConnector","database.hostname":"192.168.1.102","database.port":"1433","database.user":"sa","database.password":"xxx","database.dbname":"testdb","database.server.name":"myserver","table.include.list":"dbo.app_log","database.history.kafka.bootstrap.servers":"192.168.1.100:9092","database.history.kafka.topic":"dbhistory.myserver","decimal.handling.mode":"double","time.precision.mode":"connect"}'
#查看狀態
$ curl -k http://192.168.1.100:8083/connectors/myserver-cdc-connector/status
#查看隊列(可能會有延遲)
$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
# 通過kafka命令工具獲取之前的消息(加上--from-beginning)
$ ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.100:9092 --topic myserver.dbo.app_log --from-beginning
針對SQLServer表配置示例
{
"name": "myserver-cdc-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.hostname": "192.168.1.102",
"database.port": "1433",
"database.user": "sa",
"database.password": "xxx",
"database.dbname": "testdb",
"database.server.name": "myserver",
"table.include.list": "dbo.app_log",
"database.history.kafka.bootstrap.servers": "192.168.1.100:9092",
"database.history.kafka.topic": "dbhistory.myserver",
"decimal.handling.mode":"double",
"time.precision.mode":"connect"
}
}
官方參考文檔:https://debezium.io/documentation/reference/stable/connectors/sqlserver.html
針對MySQL表配置示例
{
"name":"myserver-mysql-cdc-connector",
"config":{
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"database.hostname":"192.168.1.101",
"database.port":"3306",
"database.user":"root",
"database.password":"xxx",
"database.server.id":"1",
"database.server.name":"myserver",
"database.include.list":"testdb",
"table.include.list":"testdb.app_log",
"database.history.kafka.bootstrap.servers":"192.168.1.100:9092",
"database.history.kafka.topic":"dbhistory.myserver",
"include.schema.changes":"true",
"decimal.handling.mode":"double"
}
}
官方參考文檔:https://debezium.io/documentation/reference/stable/connectors/mysql.html
connector的常見管理操作API:
GET /connectors – 返回所有正在運行的connector名。
POST /connectors – 新建一個connector; 請求體必須是json格式並且需要包含name字段和config字段,name是connector的名字,config是json格式,必須包含你的connector的配置信息。
GET /connectors/{name} – 獲取指定connetor的信息。
GET /connectors/{name}/config – 獲取指定connector的配置信息。
PUT /connectors/{name}/config – 更新指定connector的配置信息。
GET /connectors/{name}/status – 獲取指定connector的狀態,包括它是否在運行、停止、或者失敗,如果發生錯誤,還會列出錯誤的具體信息。
GET /connectors/{name}/tasks – 獲取指定connector正在運行的task。
GET /connectors/{name}/tasks/{taskid}/status – 獲取指定connector的task的狀態信息。
PUT /connectors/{name}/pause – 暫停connector和它的task,停止數據處理知道它被恢復。
PUT /connectors/{name}/resume – 恢復一個被暫停的connector。
POST /connectors/{name}/restart – 重啟一個connector,尤其是在一個connector運行失敗的情況下比較常用
POST /connectors/{name}/tasks/{taskId}/restart – 重啟一個task,一般是因為它運行失敗才這樣做。
DELETE /connectors/{name} – 刪除一個connector,停止它的所有task並刪除配置。
四、Flink部署
4.1.下載安裝
$ cd /usr/local/src/
$ wget --no-check-certificate https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.11.tgz
$ mkdir -p /usr/local/flink/
$ tar xvf flink-1.13.3-bin-scala_2.11.tgz -C /usr/local/flink/
$ mkdir -p /usr/local/flink/sql-lib/
$ wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.13.3/flink-sql-connector-kafka_2.11-1.13.3.jar
$ wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.13.3/flink-connector-jdbc_2.11-1.13.3.jar
$ wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar
#如果直接使用flink-cdc,如flink-cdc-mysql 可再下載flink-sql-connector-mysql-cdc-xxx.jar
$ wget https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-mysql-cdc/1.4.0/flink-sql-connector-mysql-cdc-1.4.0.jar
4.2.FlinkSQL開發
基於FlinkSQL開發,可以通過Flink自帶的sql-client.sh命令行方式執行sql腳本,也可以通過Zepplin或其他第三方數據開發平台來執行。
4.2.1.基於sql-client方式執行
$ cd /usr/local/flink/
$ ./bin/sql-client.sh embedded -l sql-lib
相關腳本
#創建表鏈接維度表,公司維度表(dim_company)
CREATE TABLE IF NOT EXISTS dim_app (
`app_code` varchar(64) NOT NULL COMMENT '應用編碼',
`app_name` varchar(255) NULL COMMENT '應用名稱',
`company_code` varchar(64) NOT NULL COMMENT '公司編碼',
`company_name` varchar(255) NOT NULL COMMENT '公司名稱',
PRIMARY KEY (app_code) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.1.101:3306/testdb',
'table-name' = 'dim_app',
'username' = 'root',
'password' = 'xxx'
);
#創建表鏈接實時流,kafka消息隊列中的應用日志表(app_log)實時數據
CREATE TABLE IF NOT EXISTS ods_app_log (
`id` varchar(64) NOT NULL COMMENT '主鍵ID',
`app_code` varchar(64) NOT NULL COMMENT '應用編碼',
`vist_time` datetime NOT NULL COMMENT '訪問時間'
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'myserver.dbo.app_log',
'properties.bootstrap.servers' = '192.168.1.100:9092',
'properties.group.id' = 'myserver',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-json'
);
#創建表鏈接結果表, 公司訪問實時統計表(dws_company_vist)、
CREATE TABLE IF NOT EXISTS dws_company_vist (
`company_code` varchar(64) NOT NULL COMMENT '公司編碼',
`total_vist_cnt` int COMMENT '總訪問量',
PRIMARY KEY (project_code) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.1.101:3306/testdb',
'table-name' = 'dws_company_vist',
'username' = 'root',
'password' = 'xxx'
);
#維表與實時流表 JOIN 寫入到結果表, insert into as select ...
insert into dws_company_vist select
c.company_code,
count(a.id) as total_vist_cnt
from dim_company c,ods_app_log a
where c.app_code=a.app_code
group by c.company_code;
4.2.2.基於Zepplin方式執行
%flink.conf
flink.execution.jars /usr/local/flink/sql-lib/flink-connector-jdbc_2.11-1.13.3.jar,/usr/local/flink/sql-lib/flink-sql-connector-kafka_2.11-1.13.3.jar,/usr/local/flink/sql-lib/mysql-connector-java-8.0.26.jar
%flink.ssql(type=update)
CREATE TABLE IF NOT EXISTS dim_app ...
CREATE TABLE IF NOT EXISTS ods_app_log ...
CREATE TABLE IF NOT EXISTS dws_company_vist ...
%flink.ssql(jobName="dws_company_vist")
insert into xxx as select ...
相關參考:
4.2.3.基於FlinkSQL-Java方式執行
package demo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkCdcDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String createOdsTablSql = "CREATE TABLE IF NOT EXISTS ods_app_log ...";
String createDimTableSql = "CREATE TABLE IF NOT EXISTS dim_app ...";
String createDwsResultSql = "CREATE TABLE IF NOT EXISTS dws_company_vist ...";
String insertIntoSql = "insert into xxx as select ...";
tableEnv.executeSql(createOdsTablSql);
tableEnv.executeSql(createDimTableSql);
tableEnv.executeSql(createDwsResultSql);
tableEnv.executeSql(insertIntoSql);
}
}
pom.xml部分內容
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.14.0</flink.version>
<flink.jdbc.version>1.10.3</flink.jdbc.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_${scala.binary.version}</artifactId>
<version>${flink.jdbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.26</version>
<scope>compile</scope>
</dependency>
</dependencies>