【流數據處理】MySql/PG/Oracle+Kafka+Flink(CDC捕獲) 部署及實時計算


主要介紹實時數倉得部署、計算

文章主要分3部分
圖片1

  • 數據采集
  • $\color{red}{[E]}$ 關系型數據庫MySql/PG/Oracle+Debezium+Kafka Connector
  • 數據計算
  • $\color{red}{[T]}$ Flink
  • 數據存儲
  • $\color{red}{[L]}$ 傳輸,關系型數據庫/列式數據庫 clickhouse/hbase

注:這里貢獻2篇阿里巴巴得文章供參考
Flink JDBC Connector:Flink 與數據庫集成最佳實踐
基於 Flink SQL CDC 的實時數據同步方案
Debezium監控MySql
Debezium監控Oracle
Debezium-Github
Oracle部署參考文檔

1. 環境要求

軟件要求:

  1. Kafka集群:本實驗用得是CDH5.14版本得Kafka集群
  2. 數據庫:Mysql 8.x/PG 10.x/Oracle11G docker搭建。(Mysql開啟行日志模式,Oracle開啟歸檔)
  3. 計算引擎:Flink 1.13
  4. Kafka Connector:
    debezium-connector-mysql-1.4.0.Final-plugin.tar.gz
    debezium-connector-postgres-1.4.0.Final-plugin.tar.gz
    debezium-connector-oracle-1.4.0.Final-plugin.tar.gz

注:以下操作都需要在3台Kafka集群中操作

Kafka配置目錄:/opt/cloudera/parcels/KAFKA/etc/kafka/conf.dist
Kafka Bin目錄:/opt/cloudera/parcels/KAFKA/lib/kafka

1. 數據采集

1.1 Kafka部署(Mysql/PG/Oracle相同)

  • 下載軟件[debezium-connector-mysql-1.4.0.Final-plugin.tar.gz],並解壓,目錄可以隨便選。
  • 本人放得目錄為:/opt/cloudera/parcels/KAFKA/lib/kafka/kafka_connect
  • 並把 debezium-connector-mysql 目錄下得jar包都拷貝一份到${KAFKA_HOME}/libs中
  • 把Mysql/PG得jdbc包放入libs中 [mysql-connector-java-8.0.21.ja]
  • Oracle需要下載客戶端並把jar包復制到${KAFKA_HOME}/libs
    下載地址
  • 修改 ${KAFKA_HOME}/bin 或者 [opt/cloudera/parcels/KAFKA/etc/kafka/conf.dist] 中配置文件
  • 正常/CDH環境
  • 單機部署修改 [connect-standalone.properties]
  • 集群部署修改 [connect-distributed.properties]
  • 修改 Kafka cluster,打開 plugin.path 配置,並配置目錄
    Kafka cluster
  • 如果有多個不同的數據庫(Mysql/PG/Oracle)需要監控,目錄之間用逗號分隔
    plugin.path
  • 啟動Kafka集群,設置環境變量
export KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/opt/cloudera/parcels/KAFKA/etc/kafka/conf.dist/connect-log4j.properties #不設置后面kafka會報錯
./bin/connect-distributed.sh ../../etc/kafka/conf.dist/connect-distributed.properties
  • 提交mysql-connector,監視Mysql數據庫
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://192.168.58.172:8083/connectors/ -d '
{ 
"name" : "debezium-mysql",
"config":{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "10.20.60.44", #mysql的IP地址
"database.port": "3306", #mysql的端口號
"database.user": "yaowentao", #mysql的用戶名
"database.password": "Sx202101", #mysql用戶對應的密碼
"database.server.id" :"1739",
"database.server.name": "Mysql", #mysql服務的邏輯名,例如Mysql
"database.history.kafka.bootstrap.servers": "192.168.58.171:9092,192.168.58.172:9092,192.168.58.177:9092", #Kafka集群地址
"database.history.kafka.topic": "dbhistory.mydb", #Kafka topic名稱
"database.whitelist": "mydb", 
#"table.whitelist":"mydb.orders",
"include.schema.changes" : "true" ,
"decimal.handling.mode": "string", #處理浮點值
"mode" : "incrementing",
"incrementing.column.name" : "id",
"database.history.skip.unparseable.ddl" : "true"
}
}'
  • 提交Oracle-connector,監視Mysql數據庫
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://192.168.58.172:8083/connectors/ -d '
{
"name": "debezium-oracle-yaowentao",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"tasks.max" : "1",
"database.server.name" : "helowin",
"database.hostname" : "10.20.60.44",
"database.port" : "1521",
"database.user" : "dbzuser",
"database.password" : "dbz",
"database.dbname" : "helowin",
"database.schema" : "scott",
"database.connection.adapter": "logminer", #1.4版本需要設置
"database.tablename.case.insensitive": "true",
"table.include.list" : "scott.*", #表白名單
"snapshot.mode" : "initial",
"schema.include.list" : "scott",#schema白名單
"database.history.kafka.bootstrap.servers" : "192.168.58.171:9092,192.168.58.172:9092,192.168.58.177:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}'
  • 查看是否啟動成功,JPS
  • 如果是CDH集群,會報一個日志文件找不到得情況
    圖3
    解決辦法:將配置文件得路徑指向
export KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/opt/cloudera/parcels/KAFKA/etc/kafka/conf.dist/connect-log4j.properties

圖4

  • 查看connectors得情況,也可以在瀏覽器中查詢。本案例在命令行中執行
查看創建的connector列表
curl -X GET http://192.168.58.171:8083/connectors
http://192.168.58.172:8083/connectors

圖9

查看創建的connector狀態
curl -X GET http://192.168.58.171:8083/connectors/debezium-mysql/status
http://192.168.58.172:8083/connectors

圖9

查看創建的connector配置
curl -X GET http://192.168.58.171:8083/connectors/debezium-mysql/config

圖9

刪除connector
curl -X DELETE http://192.168.58.171:8083/connectors/debezium-mysql

圖5

  • Kafka Connector啟動后 會將監視得庫中每個表都創建個一個topic,且該topic只包含該表得增刪改(insert/delete/update)操作。DDL操作會統一寫入以配置文件中得database.server.name參數的值為名稱的topic內。命名方式:
  • DDL topic:serverName
  • DML topic:serverName.databaseName.tableName
    圖6

2. 數據計算

2.1

# Flink執行sql語句
DROP TABLE ORDERS;
CREATE TABLE orders (
order_id INT,
order_date STRING,
customer_name STRING,
price double,
product_id INT,
order_status INT
) WITH (
'connector' = 'kafka',
'format' = 'debezium-json',
'topic' = 'Mysql.mydb.orders',
'properties.bootstrap.servers' = '192.168.58.171:9092,192.168.58.172:9092,192.168.58.177:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'debezium-json.schema-include' = 'true'
);
#時間需要轉換
SELECT TO_TIMESTAMP_LTZ(cast(t.order_date as bigint),3) order_date_times,t.* from orders t;

圖7

3. 數據導入

3.1 Flink中創建表,直接可以導入

4. 補充,Oralce數據庫配置(11G往后的配置可參考官網)

alter system set db_recovery_file_dest_size=5G; #按要求修改,不然會報錯

#Oracle 開啟歸檔日志
alter database add supplemental log data (all) columns; #開啟行模式

#創建 新得表空間與dbzuser,並賦予相應得權限
CREATE TABLESPACE LOGMINER_TBS DATAFILE '/home/oracle/app/oracle/oradata/helowin/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER dbzuser IDENTIFIED BY dbz DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS ;

GRANT CREATE SESSION TO dbzuser;
GRANT SELECT ON V_$DATABASE TO dbzuser;
GRANT FLASHBACK ANY TABLE TO dbzuser;
GRANT SELECT ANY TABLE TO dbzuser;
GRANT SELECT_CATALOG_ROLE TO dbzuser;
GRANT EXECUTE_CATALOG_ROLE TO dbzuser;
GRANT SELECT ANY TRANSACTION TO dbzuser;
GRANT SELECT ANY DICTIONARY TO dbzuser;

GRANT CREATE TABLE TO dbzuser;
GRANT ALTER ANY TABLE TO dbzuser;
GRANT LOCK ANY TABLE TO dbzuser;
GRANT CREATE SEQUENCE TO dbzuser;

GRANT EXECUTE ON DBMS_LOGMNR TO dbzuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO dbzuser;
GRANT SELECT ON V_$LOGMNR_LOGS to dbzuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO dbzuser;
GRANT SELECT ON V_$LOGFILE TO dbzuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO dbzuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO dbzuser;

#暫時可以不用,官網有做要求,暫時沒明白有什么用
CREATE USER debezium IDENTIFIED BY dbz DEFAULT TABLESPACE USERS QUOTA UNLIMITED ON USERS;
GRANT CONNECT TO debezium;
GRANT CREATE SESSION TO debezium;
GRANT CREATE TABLE TO debezium;
GRANT CREATE SEQUENCE to debezium;
ALTER USER debezium QUOTA 100M on users;


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM