Flink+Kafka實時計算快速開始


Flink+Kafka實時計算快速開始

一、概述

本示例場景(SQLServer->Kafka-Connect)實時流表與維度表(MySQL) JOIN得到數據實時寫入到MySQL中的場景。

另外本示例是通過Kafka-connect 實現CDC, 實際上,Flink最新版本已直接支持CDC(截止到當前2021/11,僅支持MySQL、Oracle、PostgresSQL、MongoDB, 但還不支持SQLServer)。

FlinkCDC官方文檔:https://ververica.github.io/flink-cdc-connectors/master/

注意:本文主要講基於SQLServer的CDC, 部分涉及MySQL(如開啟CDC、注冊Connector)僅是為了記錄下來備忘。

本場景示例關鍵步驟說明:

0.開啟源庫CDC、部署好Kafka、Flink基礎環境
1.創建表鏈接實時流,kafka消息隊列中的應用日志表(app_log)實時數據
2.創建表鏈接維度表,公司維度表(dim_company)
3.創建表鏈接結果表, 公司訪問實時統計表(dws_company_vist)
4.維表與實時流表 JOIN 寫入到結果表,  insert into as select ...

二、數據庫開啟CDC

2.1.MySQL開啟支持CDC

注意:本示例無需配置mysql-cdc,僅作記錄備忘。
mysql需要binlog_format為ROW格式,我安裝的mysql版本默認即為此格式,可先查看一下

mysql> show global variables like "%binlog_format%";

手動修改配置文件my.cnf,修改完需要重啟生效

# vi /etc/my.cnf
binlog_format=ROW
log-bin=mysql-bin

也可在線修改,重啟后失效

mysql> SET global binlog_format='ROW';

2.2.SQLServer開啟支持CDC

需要監聽SQL Server中已有的數據表,需完成以下配置:

2.2.1. 執行以下命令開啟CDC配置。

-- 開啟CDC支持
USE testDB
GO
EXEC sys.sp_cdc_enable_db
GO

2.2.2. 執行以下命令開啟指定Table的CDC配置。

-- 開啟指定Table的CDC配置。
USE testdb
GO

exec sp_cdc_enable_table 
@source_schema = N'dbo', 
@source_name = N't_app', 
@role_name = null,
@supports_net_changes = 1
GO

2.2.3. 執行以下命令確認是否有權限訪問CDC Table。

EXEC sys.sp_cdc_help_change_data_capture
GO

說明 如果返回結果為空,您需要確認是否有權限訪問該表。

2.2.4. 執行以下命令確認SQL Server Agent已開啟。

EXEC master.dbo.xp_servicecontrol N'QUERYSTATE',N'SQLSERVERAGENT'
GO

說明 如果返回結果為Running,則說明SQL Server Agent已開啟。

2.2.5. 查看表cdc開啟狀態

SELECT is_tracked_by_cdc FROM sys.tables WHERE name='t_app';

說明 查詢結果為“1”,表示開啟成功。

2.2.6.SQLServer官方文檔CDC參考

https://docs.microsoft.com/zh-CN/sql/relational-databases/track-changes/track-data-changes-sql-server?view=sql-server-2017

2.3.Oracle開啟CDC

實操步驟后續待實踐后補上

可參考:https://www.cnblogs.com/myrunning/p/5329139.html

三、Kafka部署

3.1. Kafka下載

$ cd /usr/local/src/
$ wget --no-check-certificate https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.1/kafka_2.12-2.8.1.tgz
$ mkdir -p /usr/local/kafka/
$ tar xvf kafka_2.13-2.8.1.tgz -C /usr/local/kafka/

3.2. Kafka配置

# vi /usr/local/kafka/config/server.properties

log.dirs=/usr/local/kafka/logs
listeners=PLAINTEXT://192.168.1.100:9092
advertised.listeners=PLAINTEXT://192.168.1.100:9092

3.3. 啟動kafka

$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
$ bin/kafka-server-start.sh -daemon config/server.properties

3.4.安裝Kafka Connect

3.4.1. 配置Connector

#vi /usr/local/kafka/config/connect-distributed.properties

#kafka集群地址
bootstrap.servers=192.168.1.100:9092

# 同一集群中group.id需要配置一致,且不能和別的消費者同名
group.id=connect-cluster

#使用json數據需配置成false
key.converter.schemas.enable=false
value.converter.schemas.enable=false

#保存connectors的路徑
plugin.path=/usr/local/kafka/plugins

3.4.2.安裝connectors插件

$ mkdir -p /usr/local/kafka/plugins


從https://www.confluent.io/hub/ 下載 connector 解壓到/usr/local/kafka/plugins
我這里下載了如下:
debezium-debezium-connector-mysql-1.7.0.zip
debezium-debezium-connector-sqlserver-1.7.0.zip
confluentinc-kafka-connect-oracle-cdc-1.3.1.zip
confluentinc-kafka-connect-jdbc-10.2.5.zip

3.4.3.啟動Kafka Connect

$ cd /usr/local/kafka
$ bin/connect-distributed.sh -daemon config/connect-distributed.properties

成功啟動后就會自動創建相應的Topic

3.4.4.通過rest接口查看已安裝的connector

$ curl -X GET http://192.168.1.100:8083/connector-plugins

返回信息如下:

[
  {
    "class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "type": "sink",
    "version": "10.2.5"
  },
  {
    "class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "type": "source",
    "version": "10.2.5"
  },
  {
    "class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector",
    "type": "source",
    "version": "1.3.1"
  },
  {
    "class": "io.debezium.connector.mysql.MySqlConnector",
    "type": "source",
    "version": "1.7.0.Final"
  },
  {
    "class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "type": "source",
    "version": "1.7.0.Final"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "2.8.1"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "2.8.1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "1"
  }
]

3.5.提交Connector用戶配置

通過kafka-connect rest-api接口進行注冊配置connector

#注冊connector,sqlserver-cdc
$ curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://192.168.1.100:8083/connectors/  -d '{"name":"myserver-cdc-connector","config":{"connector.class":"io.debezium.connector.sqlserver.SqlServerConnector","database.hostname":"192.168.1.102","database.port":"1433","database.user":"sa","database.password":"xxx","database.dbname":"testdb","database.server.name":"myserver","table.include.list":"dbo.app_log","database.history.kafka.bootstrap.servers":"192.168.1.100:9092","database.history.kafka.topic":"dbhistory.myserver","decimal.handling.mode":"double","time.precision.mode":"connect"}}'
##更新connector, sqlserver-cdc
$ curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" http://192.168.1.100:8083/connectors/myserver-cdc-connector/config/  -d '{"connector.class":"io.debezium.connector.sqlserver.SqlServerConnector","database.hostname":"192.168.1.102","database.port":"1433","database.user":"sa","database.password":"xxx","database.dbname":"testdb","database.server.name":"myserver","table.include.list":"dbo.app_log","database.history.kafka.bootstrap.servers":"192.168.1.100:9092","database.history.kafka.topic":"dbhistory.myserver","decimal.handling.mode":"double","time.precision.mode":"connect"}'
#查看狀態 
$ curl -k http://192.168.1.100:8083/connectors/myserver-cdc-connector/status
#查看隊列(可能會有延遲)
$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181

# 通過kafka命令工具獲取之前的消息(加上--from-beginning)
$ ./bin/kafka-console-consumer.sh  --bootstrap-server 192.168.1.100:9092 --topic myserver.dbo.app_log --from-beginning

針對SQLServer表配置示例

{
    "name": "myserver-cdc-connector", 
    "config": {
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", 
        "database.hostname": "192.168.1.102", 
        "database.port": "1433", 
        "database.user": "sa", 
        "database.password": "xxx", 
        "database.dbname": "testdb", 
        "database.server.name": "myserver", 
        "table.include.list": "dbo.app_log", 
        "database.history.kafka.bootstrap.servers": "192.168.1.100:9092", 
        "database.history.kafka.topic": "dbhistory.myserver",
		"decimal.handling.mode":"double",
		"time.precision.mode":"connect"
    }
}

官方參考文檔:https://debezium.io/documentation/reference/stable/connectors/sqlserver.html

針對MySQL表配置示例

{
    "name":"myserver-mysql-cdc-connector",
    "config":{
        "connector.class":"io.debezium.connector.mysql.MySqlConnector",
        "database.hostname":"192.168.1.101",
        "database.port":"3306",
        "database.user":"root",
        "database.password":"xxx",
        "database.server.id":"1",
        "database.server.name":"myserver",
        "database.include.list":"testdb",
        "table.include.list":"testdb.app_log",
        "database.history.kafka.bootstrap.servers":"192.168.1.100:9092",
        "database.history.kafka.topic":"dbhistory.myserver",
        "include.schema.changes":"true",
        "decimal.handling.mode":"double"
    }
}

官方參考文檔:https://debezium.io/documentation/reference/stable/connectors/mysql.html

connector的常見管理操作API:

GET /connectors – 返回所有正在運行的connector名。
POST /connectors – 新建一個connector; 請求體必須是json格式並且需要包含name字段和config字段,name是connector的名字,config是json格式,必須包含你的connector的配置信息。
GET /connectors/{name} – 獲取指定connetor的信息。
GET /connectors/{name}/config – 獲取指定connector的配置信息。
PUT /connectors/{name}/config – 更新指定connector的配置信息。
GET /connectors/{name}/status – 獲取指定connector的狀態,包括它是否在運行、停止、或者失敗,如果發生錯誤,還會列出錯誤的具體信息。
GET /connectors/{name}/tasks – 獲取指定connector正在運行的task。
GET /connectors/{name}/tasks/{taskid}/status – 獲取指定connector的task的狀態信息。
PUT /connectors/{name}/pause – 暫停connector和它的task,停止數據處理知道它被恢復。
PUT /connectors/{name}/resume – 恢復一個被暫停的connector。
POST /connectors/{name}/restart – 重啟一個connector,尤其是在一個connector運行失敗的情況下比較常用
POST /connectors/{name}/tasks/{taskId}/restart – 重啟一個task,一般是因為它運行失敗才這樣做。
DELETE /connectors/{name} – 刪除一個connector,停止它的所有task並刪除配置。

四、Flink部署

4.1.下載安裝

$ cd /usr/local/src/
$ wget --no-check-certificate https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.11.tgz
$ mkdir -p /usr/local/flink/
$ tar xvf flink-1.13.3-bin-scala_2.11.tgz -C /usr/local/flink/


$ mkdir -p /usr/local/flink/sql-lib/
$ wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.13.3/flink-sql-connector-kafka_2.11-1.13.3.jar
$ wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.13.3/flink-connector-jdbc_2.11-1.13.3.jar
$ wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar


#如果直接使用flink-cdc,如flink-cdc-mysql 可再下載flink-sql-connector-mysql-cdc-xxx.jar
$ wget https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-mysql-cdc/1.4.0/flink-sql-connector-mysql-cdc-1.4.0.jar

4.2.FlinkSQL開發

基於FlinkSQL開發,可以通過Flink自帶的sql-client.sh命令行方式執行sql腳本,也可以通過Zepplin或其他第三方數據開發平台來執行。

4.2.1.基於sql-client方式執行

$ cd /usr/local/flink/
$ ./bin/sql-client.sh embedded -l sql-lib

相關腳本

#創建表鏈接維度表,公司維度表(dim_company)
CREATE TABLE IF NOT EXISTS dim_app (
  `app_code` varchar(64) NOT NULL COMMENT '應用編碼',
  `app_name` varchar(255) NULL COMMENT '應用名稱',
  `company_code` varchar(64) NOT NULL COMMENT '公司編碼',
  `company_name` varchar(255) NOT NULL COMMENT '公司名稱',
  PRIMARY KEY (app_code) NOT ENFORCED
 ) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://192.168.1.101:3306/testdb',
   'table-name' = 'dim_app',
   'username' = 'root',
   'password' = 'xxx'
);


#創建表鏈接實時流,kafka消息隊列中的應用日志表(app_log)實時數據 
CREATE TABLE IF NOT EXISTS ods_app_log (
	`id` varchar(64) NOT NULL  COMMENT '主鍵ID',
	`app_code` varchar(64) NOT NULL COMMENT '應用編碼',
	`vist_time` datetime NOT NULL COMMENT '訪問時間'
	PRIMARY KEY (id) NOT ENFORCED
 ) WITH (
  'connector' = 'kafka', 
  'topic' = 'myserver.dbo.app_log', 
  'properties.bootstrap.servers' = '192.168.1.100:9092',
  'properties.group.id' = 'myserver',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'debezium-json' 
);


#創建表鏈接結果表, 公司訪問實時統計表(dws_company_vist)、
CREATE TABLE IF NOT EXISTS dws_company_vist (
	`company_code` varchar(64) NOT NULL COMMENT '公司編碼',
	`total_vist_cnt` int  COMMENT '總訪問量',
    PRIMARY KEY (project_code) NOT ENFORCED
 ) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://192.168.1.101:3306/testdb',
   'table-name' = 'dws_company_vist',
   'username' = 'root',
   'password' = 'xxx'
);

#維表與實時流表 JOIN 寫入到結果表,  insert into as select ...
insert into dws_company_vist select 
c.company_code,
count(a.id) as total_vist_cnt
from dim_company c,ods_app_log a 
where c.app_code=a.app_code
group by c.company_code;

4.2.2.基於Zepplin方式執行

%flink.conf
flink.execution.jars /usr/local/flink/sql-lib/flink-connector-jdbc_2.11-1.13.3.jar,/usr/local/flink/sql-lib/flink-sql-connector-kafka_2.11-1.13.3.jar,/usr/local/flink/sql-lib/mysql-connector-java-8.0.26.jar
%flink.ssql(type=update)
CREATE TABLE IF NOT EXISTS dim_app ...
CREATE TABLE IF NOT EXISTS ods_app_log ...
CREATE TABLE IF NOT EXISTS dws_company_vist ...
%flink.ssql(jobName="dws_company_vist")
insert into xxx as  select ...

相關參考:

4.2.3.基於FlinkSQL-Java方式執行

package demo;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkCdcDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        String createOdsTablSql = "CREATE TABLE IF NOT EXISTS ods_app_log ...";
        String createDimTableSql = "CREATE TABLE IF NOT EXISTS dim_app ...";
		String createDwsResultSql = "CREATE TABLE IF NOT EXISTS dws_company_vist ...";
		String insertIntoSql = "insert into xxx as  select ...";
        
		tableEnv.executeSql(createOdsTablSql);
        tableEnv.executeSql(createDimTableSql);
        tableEnv.executeSql(createDwsResultSql);
        tableEnv.executeSql(insertIntoSql);
    }
}

pom.xml部分內容


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.14.0</flink.version>
        <flink.jdbc.version>1.10.3</flink.jdbc.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>

   <dependencies>
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-java</artifactId>
           <version>${flink.version}</version>
           <scope>compile</scope>
       </dependency>
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
           <version>${flink.version}</version>
           <scope>compile</scope>
       </dependency>

       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-connector-base</artifactId>
           <version>${flink.version}</version>
       </dependency>

       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-clients_${scala.binary.version}</artifactId>
           <version>${flink.version}</version>
           <scope>compile</scope>
       </dependency>
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
           <version>${flink.version}</version>
       </dependency>

       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-json</artifactId>
           <version>${flink.version}</version>
           <scope>compile</scope>
       </dependency>

       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
           <version>${flink.version}</version>
           <scope>compile</scope>
       </dependency>
      <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-table-api-java</artifactId>
           <version>${flink.version}</version>
          <scope>compile</scope>
       </dependency>
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
           <version>${flink.version}</version>
           <scope>compile</scope>
       </dependency>
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-jdbc_${scala.binary.version}</artifactId>
           <version>${flink.jdbc.version}</version>
       </dependency>
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
           <version>${flink.version}</version>
       </dependency>

       <dependency>
           <groupId>mysql</groupId>
           <artifactId>mysql-connector-java</artifactId>
           <version>8.0.26</version>
           <scope>compile</scope>
       </dependency>
   </dependencies>


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM