Debezium監控MySQL,PGsql,SQLServer


1. Debezium簡介

Debezium 是一個分布式平台,它將現有的數據庫轉換為事件流,應用程序消費事件流,就可以知道數據庫中的每一個行級更改,並立即做出響應。

Debezium 構建在 Apache Kafka 之上,並提供 Kafka 連接器來監視特定的數據庫。在介紹 Debezium 之前,我們要先了解一下什么是 Kafka Connect

2. Debezium架構

最常見的是,Debezium是通過Apache Kafka連接部署的。Kafka Connect是一個用於實現和操作的框架和運行時

源連接器,如Debezium,它將數據攝取到Kafka和接收連接器,它將數據從Kafka主題傳播到其他系統。下圖顯示了一個基於DebeziumCDC管道的架構:

 

 

 

 

 

 

除了Kafka代理本身之外,Kafka Connect是作為一個單獨的服務來操作的。部署了用於MySQLPostgresDebezium連接器來捕獲這兩個數據庫的更改。為此,兩個連接器使用客戶端庫建立到兩個源數據庫的連接,在使用MySQL時訪問binlog,在使用Postgres時從邏輯復制流讀取數據。

默認情況下,來自一個捕獲表的更改被寫入一個對應的Kafka主題。如果需要,可以在Debezium的主題路由SMT的幫助下調整主題名稱,例如,使用與捕獲的表名不同的主題名稱,或者將多個表的更改轉換為單個主題。

一旦更改事件位於Apache Kafka中,來自Kafka Connect生態系統的不同連接器就可以將更改流到其他系統和數據庫,如Elasticsearch、數據倉庫和分析系統或Infinispan等緩存。根據所選的接收連接器,可能需要應用Debezium的新記錄狀態提取SMT,它只會將“after”結構從Debezium的事件信封傳播到接收連接器。

3. Debezium特性

DebeziumApache Kafka Connect的一組源連接器,使用change data capture (CDC)從不同的數據庫中獲取更改。與其他方法如輪詢或雙寫不同,基於日志的CDCDebezium實現:

確保捕獲所有數據更改以非常低的延遲(例如,MySQLPostgresms范圍)生成更改事件,同時避免增加頻繁輪詢的CPU使用量不需要更改數據模型(最后更新)可以捕獲刪除可以捕獲舊記錄狀態和其他元數據,如事務id和引發查詢(取決於數據庫的功能和配置)要了解更多關於基於日志的CDC的優點,請參閱本文。

Debezium的實際變化數據捕獲特性被修改了一系列相關的功能和選項:

快照:可選的,一個初始數據庫的當前狀態的快照可以采取如果連接器被啟動並不是所有日志仍然存在(通常在數據庫已經運行了一段時間和丟棄任何事務日志不再需要事務恢復或復制);快照有不同的模式,請參考特定連接器的文檔以了解更多信息過濾器:可以通過白名單/黑名單過濾器配置捕獲的模式、表和列集屏蔽:可以屏蔽特定列中的值,例如敏感數據監視:大多數連接器都可以使用JMX進行監視不同的即時消息轉換:例如,用於消息路由、提取新記錄狀態(關系連接器、MongoDB)和從事務性發件箱表中路由事件有關所有受支持的數據庫的列表,以及關於每個連接器的功能和配置選項的詳細信息,請參閱連接器文檔。

https://debezium.io/documentation/reference/1.5

4. Kafka Connect 簡介

Kafka 相信大家都很熟悉,是一款分布式,高性能的消息隊列框架。

一般情況下,讀寫 Kafka 數據,都是用 Consumer Producer  Api 來完成,但是自己實現這些需要去考慮很多額外的東西,比如管理 Schema,容錯,並行化,數據延遲,監控等等問題。

而在 0.9.0.0 版本之后,官方推出了 Kafka Connect ,大大減少了程序員的工作量,它有下面的特性:

統一而通用的框架;

支持分布式模式和單機模式;

REST 接口,用來查看和管理Kafka connectors

自動化的offset管理,開發人員不必擔心錯誤處理的影響;

分布式、可擴展;

/批處理集成。


Kafka Connect 有兩個核心的概念:Source SinkSource 負責導入數據到 KafkaSink 負責從 Kafka 導出數據,它們都被稱為是 Connector

如下圖,左邊的 Source 負責從源數據(RDBMSFile等)讀數據到 Kafka,右邊的 Sinks 負責從 Kafka 消費到其他系統。

 

 

 

5. Debezium的安裝

因為Debezium依賴KafkaKafka依賴ZK,所以先把KafkaZK安裝

5.1. ZK安裝

5.1.1. 3.1 分布式安裝部署

1)集群規划

hadoop102hadoop103hadoop104三個節點上部署Zookeeper

2解壓安裝

1解壓Zookeeper安裝包到/opt/module/目錄下

[atguigu@hadoop102 software]$ tar -zxvf zookeeper-3.5.7.tar.gz -C /opt/module/

2)修改/opt/module/apache-zookeeper-3.5.7-bin名稱為zookeeper-3.5.7

[atguigu@hadoop102 module]$ mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.7

3)同步/opt/module/zookeeper-3.5.7目錄內容到hadoop103hadoop104

[atguigu@hadoop102 module]$ xsync zookeeper-3.5.7/

3)配置服務器編號

1)在/opt/module/zookeeper-3.5.7/這個目錄下創建zkData

[atguigu@hadoop102 zookeeper-3.5.7]$ mkdir zkData

2)在/opt/module/zookeeper-3.5.7/zkData目錄下創建一個myid的文件

[atguigu@hadoop102 zkData]$ vi myid

添加myid文件,注意一定要在linux里面創建,在notepad++里面很可能亂碼

在文件中添加與server對應的編號:

2

3)拷貝配置好的zookeeper到其他機器上

[atguigu@hadoop102 zkData]$ xsync myid

並分別hadoop103hadoop104修改myid文件中內容為34

4)配置zoo.cfg文件

1)重命名/opt/module/zookeeper-3.5.7/conf這個目錄下的zoo_sample.cfgzoo.cfg

[atguigu@hadoop102 conf]$ mv zoo_sample.cfg zoo.cfg

2打開zoo.cfg文件

[atguigu@hadoop102 conf]$ vim zoo.cfg

修改數據存儲路徑配置

dataDir=/opt/module/zookeeper-3.5.7/zkData

增加如下配置

#######################cluster##########################

server.2=hadoop102:2888:3888

server.3=hadoop103:2888:3888

server.4=hadoop104:2888:3888

3同步zoo.cfg配置文件

[atguigu@hadoop102 conf]$ xsync zoo.cfg

4)配置參數解讀

server.A=B:C:D

A是一個數字,表示這個是第幾號服務器;

集群模式下配置一個文件myid這個文件在dataDir目錄下,這個文件里面有一個數據就是A的值,Zookeeper啟動時讀取此文件,拿到里面數據與zoo.cfg里面的配置信息比較從而判斷到底是哪個server

B是這個服務器的地址;

C是這個服務器Follower與集群中的Leader服務器交換信息的端口;

D萬一集群中的Leader服務器掛了,需要一個端口來重新進行選舉,選出一個新的Leader,而這個端口就是用來執行選舉時服務器相互通信的端口。

5集群操作

1)分別啟動Zookeeper

[atguigu@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh start

[atguigu@hadoop103 zookeeper-3.5.7]$ bin/zkServer.sh start

[atguigu@hadoop104 zookeeper-3.5.7]$ bin/zkServer.sh start

2)查看狀態

[atguigu@hadoop102 zookeeper-3.5.7]# bin/zkServer.sh status

JMX enabled by default

Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg

Mode: follower

[atguigu@hadoop103 zookeeper-3.5.7]# bin/zkServer.sh status

JMX enabled by default

Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg

Mode: leader

[atguigu@hadoop104 zookeeper-3.4.5]# bin/zkServer.sh status

JMX enabled by default

Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg

Mode: follower

5.1.2. 3.2 客戶端命令行操作

命令基本語法

功能描述

help

顯示所有操作命令

ls path

使用 ls 命令來查看當前znode的子節點

-w  監聽子節點變化

-s   附加次級信息

create

普通創建

-s  含有序列

-e  臨時(重啟或者超時消失)

get path

獲得節點的值

-w  監聽節點內容變化

-s   附加次級信息

set

設置節點的具體值

stat

查看節點狀態

delete

刪除節點

deleteall

遞歸刪除節點

1)啟動客戶端

[atguigu@hadoop103 zookeeper-3.5.7]$ bin/zkCli.sh

5.2. Kafka安裝

5.2.1. 集群規划

hadoop102

hadoop103

hadoop104

zk

zk

zk

kafka

kafka

kafka

5.2.2. jar下載

http://kafka.apache.org/downloads

5.2.3. 集群部署

1解壓安裝包

[atguigu@hadoop102 software]$ tar -zxvf kafka_2.11-2.4.1.tgz -C /opt/module/

2修改解壓后的文件名稱

[atguigu@hadoop102 module]$ mv kafka_2.11-2.4.1/ kafka

3)在/opt/module/kafka目錄下創建logs文件夾

[atguigu@hadoop102 kafka]$ mkdir logs

4修改配置文件

[atguigu@hadoop102 kafka]$ cd config/

[atguigu@hadoop102 config]$ vi server.properties

修改或者增加以下內容:

#broker全局唯一編號,不能重復

broker.id=0

#刪除topic功能使能

delete.topic.enable=true

#kafka運行日志存放的路徑

log.dirs=/opt/module/kafka/data

#配置連接Zookeeper集群地址

zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

5)配置環境變量

[atguigu@hadoop102 module]$ sudo vi /etc/profile.d/my_env.sh

 

#KAFKA_HOME

export KAFKA_HOME=/opt/module/kafka

export PATH=$PATH:$KAFKA_HOME/bin

 

[atguigu@hadoop102 module]$ source /etc/profile.d/my_env.sh

6)分發安裝包

[atguigu@hadoop102 module]$ xsync kafka/

注意:分發之后記得配置其他機器的環境變量

7)分別在hadoop103hadoop104上修改配置文件/opt/module/kafka/config/server.propertiesbroker.id=1broker.id=2

broker.id不得重復

8啟動集群

依次在hadoop102hadoop103hadoop104節點上啟動kafka

[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties

[atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon  /opt/module/kafka/config/server.properties

[atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon  /opt/module/kafka/config/server.properties

9)關閉集群

[atguigu@hadoop102 kafka]$ bin/kafka-server-stop.sh

[atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh

[atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh

10kafka群起腳本

1)在/home/atguigu/bin目錄下創建腳本kf.sh

[atguigu@hadoop102 bin]$ vim kf.sh

腳本中填寫如下內容

#!/bin/bash

 

case $1 in

"start"){

    for i in hadoop102 hadoop103 hadoop104

    do

        echo " --------啟動 $i Kafka-------"

        ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties "

    done

};;

"stop"){

    for i in hadoop102 hadoop103 hadoop104

    do

        echo " --------停止 $i Kafka-------"

        ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop"

    done

};;

esac

2)增加腳本執行權限

[atguigu@hadoop102 bin]$ chmod 777 kf.sh

3kf集群啟動腳本

[atguigu@hadoop102 module]$ kf.sh start

4kf集群停止腳本

[atguigu@hadoop102 module]$ kf.sh stop

5.2.4. Kafka命令行操作

1)查看當前服務器中的所有topic

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --list

2)創建topic

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka \

--create --replication-factor 3 --partitions 1 --topic first

選項說明:

--topic 定義topic

--replication-factor  定義副本數

--partitions  定義分區數

3)刪除topic

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka \

--delete --topic first

需要server.properties中設置delete.topic.enable=true否則只是標記刪除。

4)發送消息

[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh \

--broker-list hadoop102:9092 --topic first

>hello world

>atguigu  atguigu

5)消費消息

[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh \

--bootstrap-server hadoop102:9092 --from-beginning --topic first

 

[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh \

--bootstrap-server hadoop102:9092 --from-beginning --topic first

--from-beginning會把主題中以往所有的數據都讀取出來。

6)查看某個Topic的詳情

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka \

--describe --topic first

7)修改分區數

[atguigu@hadoop102 kafka]$bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --alter --topic first --partitions 6

5.3. Debezium安裝

5.3.1. 安裝包下載

https://repo1.maven.org/maven2/io/debezium/

我這邊選擇的都是1.5版本

MySQL包下載地址

https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.5.0.Final/debezium-connector-mysql-1.5.0.Final-plugin.tar.gz

PGSql包下載地址

https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.5.0.Final/debezium-connector-postgres-1.5.0.Final-plugin.tar.gz

SqlServer下載地址

https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/1.5.0.Final/debezium-connector-sqlserver-1.5.0.Final-plugin.tar.gz

5.3.2. 部署安裝

1) 上傳解壓安裝包

上傳壓縮包到/opt/software目錄下

 

 

 

解壓壓縮包到/opt/module/kafka/plugins目錄下

新建文件夾plugins

mkdir -p  /opt/module/kafka/plugins

解壓文件(Oracle暫時還沒有驗證通過)

tar -zxvf debezium-connector-mysql-1.5.0.Final-plugin.tar.gz -C /opt/module/kafka/plugins

tar -zxvf debezium-connector-postgres-1.5.0.Final-plugin.tar.gz -C /opt/module/kafka/plugins

tar -zxvf debezium-connector-sqlserver-1.5.0.Final-plugin.tar.gz -C /opt/module/kafka/plugins

2) 修改配置文件

進入kafka配置文件

修改connect-distributed.properties文件

[atguigu@hadoop102 config]$ pwd

/opt/module/kafka/config

 

[atguigu@hadoop102 config]$ vim connect-distributed.properties

# 配置插件位置

plugin.path=/opt/module/kafka/plugins/

# keyvalueschemas去掉,減少冗余存儲

key.converter.schemas.enable=false

value.converter.schemas.enable=false

分發plugins文件和connect-distributed.properties到其它機器

3) 啟動連接器

[atguigu@hadoop102 kafka]$ bin/connect-distributed.sh -daemon config/connect-distributed.properties

 

[atguigu@hadoop102 config]$ jps

41232 QuorumPeerMain

50594 ConnectDistributed

52212 Jps

41644 Kafka

4) 查看日志

[atguigu@hadoop102 kafka]$ tail -f logs/connectDistributed.out

 

5) 查看是否正常啟動

[atguigu@hadoop102 kafka]$ curl -H "Accept:application/json" hadoop102:8083/connectors/

[]

# 返回空列表,表示正常啟動

6. Debezium實操案例

參考官網:

https://debezium.io/documentation/reference/1.5/connectors/mysql.html

 

6.1. Debezium監控MySQL

1)准備MySQL

查看MySQL配置文件路徑

[atguigu@hadoop102 etc]$ sudo find / -name my.cnf

/etc/my.cnf

 

#添加如下配置

log-bin=/var/lib/mysql/mysql-bin.log  # 指定binlog日志存儲位置

binlog_format=ROW  # 這里一定是row格式

expire-logs-days = 14 # 日志保留時間

max-binlog-size = 500M # 日志滾動大小

server-id=1

 

重啟數據庫,然后查看日志是否開啟

mysql>  show variables like 'log_bin';

+---------------+-------+

| Variable_name | Value |

+---------------+-------+

| log_bin       | ON    |

+---------------+-------+

1 row in set (0.01 sec)

2)配置MySQL連接信息

[atguigu@hadoop102 config]$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" hadoop102:8083/connectors/ -d '{"name":"bd_test-mysql-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"localhost","database.port":"3306","database.user":"bd_test","database.password":"123456","database.server.id":"184054","database.server.name":"bd_test","database.include.list":"test_db","database.history.kafka.bootstrap.servers":"hadoop102:9092","database.history.kafka.topic":"dbhistory.test_db","decimal.handling.mode":"String","snapshot.mode":"schema_only","tombstones.on.delete":"false","table.include.list":"test_db.test0430,test_db.zd_business_type,test_db.zd_business_type_copy1,test_db.zd_business_type_copy2"}}'

3)格式化(方便查看)

{

    "name":"bd_test-mysql-connector",

    "config":{

        "connector.class":"io.debezium.connector.mysql.MySqlConnector",

        "tasks.max":"1",

        "database.hostname":"localhost",

        "database.port":"3306",

        "database.user":"bd_test",

        "database.password":"123456",

        "database.server.id":"184054",

        "database.server.name":"bd_test",

        "database.include.list":"test_db",

        "database.history.kafka.bootstrap.servers":"hadoop102:9092",

        "database.history.kafka.topic":"dbhistory.test_db",

        "decimal.handling.mode":"String",

        "snapshot.mode":"schema_only",

        "tombstones.on.delete":"false",

        "table.include.list":"test_db.test0430,test_db.zd_business_type,test_db.zd_business_type_copy1,test_db.zd_business_type_copy2"

    }

}

4)參數解析

name:標識連接器的名稱

connector.class:對應數據庫類

tasks.max:默認1

database.hostname:數據庫ip

database.port:數據庫端口

database.user:數據庫登錄名

database.password:數據庫密碼

database.server.id:數據庫id,標識當前庫,不重復就行

database.server.name:給數據庫取別名

database.include.list:類似白名單,里面的庫可以監控到,不在里面監控不到,多庫逗號分隔,支持正則匹配

database.history.kafka.bootstrap.servers:表DDL相關信息知道kafka地址

database.history.kafka.topic:表DDL相關信息會保存在這個topic里面

decimal.handling.mode:當處理decimalInt類型時,默認是二進制顯示,我們改為字符串顯示

snapshot.mode:快照模式,這個需要具體情況,具體分析,因為我只需要實時數據,不需要歷史數據,所以設置為schema_only

tombstones.on.delete:默認是True,當我們刪除記錄的時候,會產生兩天數據,第二條為NULL,但是我們不希望出現NULL,所以設置為False

table.include.list:類似白名單,里面的表可以監控到,不在里面監控不到,多表逗號分隔,支持正則匹配

5)查看是否新建成功

[atguigu@hadoop102 config]$ curl -H "Accept:application/json" hadoop102:8083/connectors/

["bd_test-mysql-connector"]

 

# 出現剛剛配置的名稱說明成功新建

6)查看KafkaTopic信息

[atguigu@hadoop103 bin]$ ./kafka-topics.sh --bootstrap-server hadoop102:9092 --list

 

7)可以往mysql插入數據,然后查看對應Topic的是否有數據,只要表數據進行更新,Topic會自動創建。

6.2. Debezium監控PGSql

1)准備PGSql

Ø 更改配置文件postgresql.conf

# 更改wal日志方式為logical

wal_level = logical           # minimal, replica, or logical

 

# 中斷那些停止活動超過指定毫秒數的復制連接,可以適當設置大一點(默認60s

wal_sender_timeout = 180s    # in milliseconds; 0 disable  

Ø 重啟pg服務生效,所以一般是在業務低峰期更改

Ø 新建用戶並且給用戶復制流權限

-- pg新建用戶

CREATE USER user WITH PASSWORD 'pwd';

 

-- 給用戶復制流權限

ALTER ROLE user replication;

 

-- 給用戶登錄數據庫權限

grant CONNECT ON DATABASE test to user;

 

-- 把當前庫public下所有表查詢權限賦給用戶

GRANT SELECT ON ALL TABLES IN SCHEMA public TO user;

Ø 發布表

-- 更改復制標識包含更新和刪除之前值

ALTER TABLE test0425 REPLICA IDENTITY FULL;

 

-- 查看復制標識(為f標識說明設置成功)

select relreplident from pg_class where relname='test0425';

Ø 更改表的復制標識包含更新和刪除的值

-- 更改復制標識包含更新和刪除之前值

ALTER TABLE test0425 REPLICA IDENTITY FULL;

 

-- 查看復制標識(為f標識說明設置成功)

select relreplident from pg_class where relname='test0425';

2)配置PGsql連接信息

[atguigu@hadoop102 config]$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" hadoop102:8083/connectors/ -d '{"name":"pgsql-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","database.hostname":"localhost","database.port":"5432","database.user":"bd_test","database.password":"123456","database.dbname":"bd_test","database.server.name":"pgsql_cs","schema.include.list":"public","slot.name":"pgsql_cs_slot","snapshot.mode":"never","table.include.list":"public.test0425,public.test0425_copy1,public.zd_business_type_copy1","publication.autocreate.mode":"filtered","decimal.handling.mode":"String","heartbeat.interval.ms":"100","tombstones.on.delete":"false","plugin.name":"pgoutput"}}'

3)格式化(方便查看)

{

    "name":"pgsql-connector",

    "config":{

        "connector.class":"io.debezium.connector.postgresql.PostgresConnector",

        "database.hostname":"localhost",

        "database.port":"5432",

        "database.user":"bd_test",

        "database.password":"123456",

        "database.dbname":"bd_test",

        "database.server.name":"pgsql_cs",

        "schema.include.list":"public",

        "slot.name":"pgsql_cs_slot",

        "snapshot.mode":"never",

        "table.include.list":"public.test0425,public.test0425_copy1",

        "publication.autocreate.mode":"filtered",

        "decimal.handling.mode":"String",

        "heartbeat.interval.ms":"100",

        "tombstones.on.delete":"false",

        "plugin.name":"pgoutput"

    }

}

4)參數解析

name:標識連接器的名稱

connector.class:對應數據庫類

database.hostname:數據庫ip

database.port:數據庫端口

database.user:數據庫登錄名

database.password:數據庫密碼

database.dbname:數據庫名稱

database.server.name:給數據庫取別名

schema.include.list:類似白名單,里面的模式可以監控到,不在里面監控不到,多模式逗號分隔,支持正則匹配

slot.nameslot的名稱

snapshot.mode:快照模式,這個需要具體情況,具體分析,因為我只需要實時數據,不需要歷史數據,所以設置為never

table.include.list:類似白名單,里面的表可以監控到,不在里面監控不到,多表逗號分隔,支持正則匹配

publication.autocreate.mode:發布表處理策略,具體查看官網

decimal.handling.mode:當處理decimalInt類型時,默認是二進制顯示,我們改為字符串顯示

heartbeat.interval.ms:控制連接器向 Kafka 主題發送心跳消息的頻率。默認行為是連接器不發送心跳消息(毫秒)

tombstones.on.delete:默認是True,當我們刪除記錄的時候,會產生兩天數據,第二條為NULL,但是我們不希望出現NULL,所以設置為False

plugin.name:使用pgoutput插件,是pgsql自帶的,不需要安裝

5)查看是否新建成功

[atguigu@hadoop102 config]$ curl -H "Accept:application/json" hadoop102:8083/connectors/

["bd_test-mysql-connector"]

 

# 出現剛剛配置的名稱說明成功新建

6)查看KafkaTopic信息

[atguigu@hadoop103 bin]$ ./kafka-topics.sh --bootstrap-server hadoop102:9092 --list

 

7)可以往PGSQL表插入數據,然后查看對應Topic的是否有數據,只要表數據進行更新,Topic會自動創建。

6.3. Debezium監控SQLServer

1)准備SqlServer

啟用表cdc

USE TestDB  

GO  

EXEC sys.sp_cdc_enable_db  

GO  

啟用表cdc

USE TestDB

GO

EXEC sys.sp_cdc_enable_table

@source_schema = N'dbo',

@source_name   = N'tableName',

@role_name     = NULL,

@supports_net_changes = 1

GO

查詢是否啟用

-- 庫是否啟用cdc

SELECT name,is_cdc_enabled

FROM sys.databases;

 

-- 表是否啟用cdc

SELECT name,is_tracked_by_cdc

FROM sys.tables;

 

2)配置SQLServer連接信息

[atguigu@hadoop102 config]$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" hadoop102:8083/connectors/ -d '{"name":"SqlServer-connector","config":{"connector.class":"io.debezium.connector.sqlserver.SqlServerConnector","database.hostname":"localhost","database.port":"1433","database.user":"bd_test","database.password":"123456","database.dbname":"TestDB","database.server.name":"sql_server_test","snapshot.mode":"schema_only","table.include.list":"dbo.test0601,dbo.test0531","decimal.handling.mode":"String","heartbeat.interval.ms":"100","database.history.kafka.bootstrap.servers":"hadoop102:9092","database.history.kafka.topic":"sql_server_test.dbhistory","tombstones.on.delete":"false"}}'

3)格式化(可讀性強)

{

    "name":"SqlServer-connector",

    "config":{

        "connector.class":"io.debezium.connector.sqlserver.SqlServerConnector",

        "database.hostname":"localhost",

        "database.port":"1433",

        "database.user":"bd_test",

        "database.password":"123456",

        "database.dbname":"TestDB",

        "database.server.name":"sql_server_test",

        "snapshot.mode":"schema_only",

        "table.include.list":"dbo.test0601,dbo.test0531",

        "decimal.handling.mode":"String",

        "heartbeat.interval.ms":"100",

        "database.history.kafka.bootstrap.servers":"hadoop102:9092",

        "database.history.kafka.topic":"sql_server_test.dbhistory",

        "tombstones.on.delete":"false"

    }

}

4)參數解析

name:標識連接器的名稱

connector.class:對應數據庫類

database.hostname:數據庫ip

database.port:數據庫端口

database.user:數據庫登錄名

database.password:數據庫密碼

database.server.name:給數據庫取別名

snapshot.mode:快照模式,這個需要具體情況,具體分析,因為我只需要實時數據,不需要歷史數據,所以設置為schema_only

database.include.list:類似白名單,里面的庫可以監控到,不在里面監控不到,多庫逗號分隔,支持正則匹配

decimal.handling.mode:當處理decimalInt類型時,默認是二進制顯示,我們改為字符串顯示

heartbeat.interval.ms:控制連接器向 Kafka 主題發送心跳消息的頻率。默認行為是連接器不發送心跳消息(毫秒)

database.history.kafka.bootstrap.servers:表DDL相關信息知道kafka地址

database.history.kafka.topic:表DDL相關信息會保存在這個topic里面

tombstones.on.delete:默認是True,當我們刪除記錄的時候,會產生兩天數據,第二條為NULL,但是我們不希望出現NULL,所以設置為False

5)查看是否新建成功

[atguigu@hadoop102 config]$ curl -H "Accept:application/json" hadoop102:8083/connectors/

["SqlServer-connector"]

 

# 出現剛剛配置的名稱說明成功新建

6)查看KafkaTopic信息

[atguigu@hadoop103 bin]$ ./kafka-topics.sh --bootstrap-server hadoop102:9092 --list

 

7)可以往SQLServer插入數據,然后查看對應Topic的是否有數據,只要表數據進行更新,Topic會自動創建。

6.4. Debezium監控Oracle(不推薦用)

Debezium使用本機LogMiner數據庫包或XStream APIOracle接收變更事件。雖然連接器可以與各種Oracle版本和版本配合使用,但只有Oracle EE 1219經過測試。

1)准備Oracle

新建用戶

create user userName identified by '123456';

給用戶賦權限

UGRANT CREATE SESSION TO userName                        ;

GRANT SET CONTAINER TO userName                         ;

GRANT SELECT ON V_$DATABASE to userName                 ;

GRANT FLASHBACK ANY TABLE TO userName                   ;

GRANT SELECT ANY TABLE TO userName                      ;

GRANT SELECT_CATALOG_ROLE TO userName                   ;

GRANT EXECUTE_CATALOG_ROLE TO userName                  ;

GRANT SELECT ANY TRANSACTION TO userName                ;

GRANT LOGMINING TO userName                             ;

GRANT CREATE TABLE TO userName                          ;

GRANT LOCK ANY TABLE TO userName                        ;

GRANT ALTER ANY TABLE TO userName                       ;

GRANT CREATE SEQUENCE TO userName                       ;

GRANT EXECUTE ON DBMS_LOGMNR TO userName                ;

GRANT EXECUTE ON DBMS_LOGMNR_D TO userName              ;

GRANT SELECT ON V_$LOG TO userName                      ;

GRANT SELECT ON V_$LOG_HISTORY TO userName              ;

GRANT SELECT ON V_$LOGMNR_LOGS TO userName              ;

GRANT SELECT ON V_$LOGMNR_CONTENTS TO userName          ;

GRANT SELECT ON V_$LOGMNR_PARAMETERS TO userName        ;

GRANT SELECT ON V_$LOGFILE TO userName                  ;

GRANT SELECT ON V_$ARCHIVED_LOG TO userName             ;

GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO userName      ;

grant connect to userName;

開啟表補充日志,開啟LOG_MODE模式為ARCHIVELOG

--給庫表開啟補充日志,所有表都要執行

SQL> ALTER DATABASE add SUPPLEMENTAL LOG DATA ;

SQL> ALTER TABLE inventory.test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

 

--登錄Oracle

[root@hadoop102 bin]$ su - oracle

[oracle@hadoop102 bin]$ sqlplus /nolog

SQL> conn /as sysdba

 

-- 開啟ARCHIVELOG

關閉數據庫

SQL> shutdown immediate;

啟動數據庫到mount狀態

SQL> startup mount;

啟動歸檔模式

SQL> alter database archivelog;

啟動數據庫

SQL> alter database open;

查看是否開啟

SQL> archive log list

Database log mode       Archive Mode

Automatic archival       Enabled

Archive destination       USE_DB_RECOVERY_FILE_DEST

Oldest online log sequence     114

Next log sequence to archive   114

Current log sequence       117

2)配置Oracle連接信息

[atguigu@hadoop102 config]$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" hadoop102:8083/connectors/ -d '{"name":"SqlServer-connector","config":{"connector.class":"io.debezium.connector.sqlserver.SqlServerConnector","database.hostname":"localhost","database.port":"1433","database.user":"bd_test","database.password":"123456","database.dbname":"TestDB","database.server.name":"sql_server_test","snapshot.mode":"schema_only","table.include.list":"dbo.test0601,dbo.test0531","decimal.handling.mode":"String","heartbeat.interval.ms":"100","database.history.kafka.bootstrap.servers":"hadoop102:9092","database.history.kafka.topic":"sql_server_test.dbhistory","tombstones.on.delete":"false"}}'

3)格式化(可讀性強)

{

    "name":"SqlServer-connector",

    "config":{

        "connector.class":"io.debezium.connector.sqlserver.SqlServerConnector",

        "database.hostname":"localhost",

        "database.port":"1433",

        "database.user":"bd_test",

        "database.password":"123456",

        "database.dbname":"TestDB",

        "database.server.name":"sql_server_test",

        "snapshot.mode":"schema_only",

        "table.include.list":"dbo.test0601,dbo.test0531",

        "decimal.handling.mode":"String",

        "heartbeat.interval.ms":"100",

        "database.history.kafka.bootstrap.servers":"hadoop102:9092",

        "database.history.kafka.topic":"sql_server_test.dbhistory",

        "tombstones.on.delete":"false"

    }

}

4)參數解析

name:標識連接器的名稱

connector.class:對應數據庫類

database.hostname:數據庫ip

database.port:數據庫端口

database.user:數據庫登錄名

database.password:數據庫密碼

database.server.name:給數據庫取別名

snapshot.mode:快照模式,這個需要具體情況,具體分析,因為我只需要實時數據,不需要歷史數據,所以設置為schema_only

database.include.list:類似白名單,里面的庫可以監控到,不在里面監控不到,多庫逗號分隔,支持正則匹配

decimal.handling.mode:當處理decimalInt類型時,默認是二進制顯示,我們改為字符串顯示

heartbeat.interval.ms:控制連接器向 Kafka 主題發送心跳消息的頻率。默認行為是連接器不發送心跳消息(毫秒)

database.history.kafka.bootstrap.servers:表DDL相關信息知道kafka地址

database.history.kafka.topic:表DDL相關信息會保存在這個topic里面

tombstones.on.delete:默認是True,當我們刪除記錄的時候,會產生兩天數據,第二條為NULL,但是我們不希望出現NULL,所以設置為False

5)查看是否新建成功

[atguigu@hadoop102 config]$ curl -H "Accept:application/json" hadoop102:8083/connectors/

["SqlServer-connector"]

 

# 出現剛剛配置的名稱說明成功新建

6)查看KafkaTopic信息

[atguigu@hadoop103 bin]$ ./kafka-topics.sh --bootstrap-server hadoop102:9092 --list

 

7)可以往Oracle插入數據,然后查看對應Topic的是否有數據,只要表數據進行更新,Topic會自動創建。

7. Flink消費Kafka數據

7.1. 大致配置信息

消費的時候指定為'value.format' = 'debezium-json'

 

 

 

這樣就能消費到數據,使用cdc的方式進行消費,也可以獲取一些其它的值,比如時間戳,表名稱,庫名稱等等字段信息,增刪改變化目標表會同步相應操作。

具體看Flink官方文檔

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/debezium/

7.2. 案例:把kafka數據寫入到mysql中采用debezium-json的方式進行format

7.2.1. 新建MySql輸出表

create table `test02` (

`name` varchar(100) not null primary key,

`amount` double

);

7.2.2. 代碼示例如下

package flinkTest.connect;

 

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.TableResult;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

 

public class kafkaTomysql {

    public static void main(String[] args) {

        //設置flink表環境變量

        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()

                .useBlinkPlanner()

                .inStreamingMode()

                .build();

 

        //獲取flink流環境變量

        StreamExecutionEnvironment exeEnv = StreamExecutionEnvironment.getExecutionEnvironment();

        exeEnv.setParallelism(1);

 

        //表執行環境

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(exeEnv, fsSettings);

 

        String sourceDDL =

                "CREATE TABLE kafka_source (\n" +

                        " id int,\n" +

                        " name STRING,\n" +

                        " age int,\n" +

                        " addr STRING,\n" +

                        " sku_id int,\n" +

                        " sku_name STRING,\n" +

                        " amount double,\n" +

                        " create_time timestamp,\n" +

                        " PRIMARY KEY (id) NOT ENFORCED\n" +

                        ") WITH (\n" +

                        " 'connector' = 'kafka',\n" +

                        " 'topic' = 'flink_test',\n" +

                        " 'properties.bootstrap.servers' = 'hadoop102:9092',\n" +

                        " 'value.format' = 'debezium-json'\n" +

                        ")";

 

        //拼接sinkDLL

        String sinkDDL =

                "CREATE TABLE mysql_sink (\n" +

                        " name STRING,\n" +

                        " amount double,\n" +

                        " PRIMARY KEY (name) NOT ENFORCED\n" +

                        ") WITH (\n" +

                        " 'connector' = 'jdbc',\n" +

//                        " 'driver' = 'com.mysql.jdbc.Driver',\n" +

                        " 'url' = 'jdbc:mysql://123.57.104.176:3306/flink_cs?useUnicode=true&characterEncoding=UTF-8',\n" +

                        " 'username' = 'root',\n" +

                        " 'password' = '000000',\n" +

                        " 'table-name' = 'test02'\n" +

                        ")";

 

        String transformSQL =

                "INSERT INTO mysql_sink " +

                        "SELECT name,sum(amount) " +

                        "FROM kafka_source group by name";

 

 

        //執行sourceddl

        tableEnv.executeSql(sourceDDL);

        //執行sinkddl

        tableEnv.executeSql(sinkDDL);

        //執行邏輯sql語句

        TableResult tableResult = tableEnv.executeSql(transformSQL);

    }

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM