主要介紹實時數倉得部署、計算
文章主要分3部分
- 數據采集
- $\color{red}{[E]}$ 關系型數據庫MySql/PG/Oracle+Debezium+Kafka Connector
- 數據計算
- $\color{red}{[T]}$ Flink
- 數據存儲
- $\color{red}{[L]}$ 傳輸,關系型數據庫/列式數據庫 clickhouse/hbase
注:這里貢獻2篇阿里巴巴得文章供參考
Flink JDBC Connector:Flink 與數據庫集成最佳實踐
基於 Flink SQL CDC 的實時數據同步方案
Debezium監控MySql
Debezium監控Oracle
Debezium-Github
Oracle部署參考文檔
1. 環境要求
軟件要求:
- Kafka集群:本實驗用得是CDH5.14版本得Kafka集群
- 數據庫:Mysql 8.x/PG 10.x/Oracle11G docker搭建。(Mysql開啟行日志模式,Oracle開啟歸檔)
- 計算引擎:Flink 1.13
- Kafka Connector:
debezium-connector-mysql-1.4.0.Final-plugin.tar.gz
debezium-connector-postgres-1.4.0.Final-plugin.tar.gz
debezium-connector-oracle-1.4.0.Final-plugin.tar.gz
注:以下操作都需要在3台Kafka集群中操作
Kafka配置目錄:/opt/cloudera/parcels/KAFKA/etc/kafka/conf.dist
Kafka Bin目錄:/opt/cloudera/parcels/KAFKA/lib/kafka
1. 數據采集
1.1 Kafka部署(Mysql/PG/Oracle相同)
- 下載軟件[debezium-connector-mysql-1.4.0.Final-plugin.tar.gz],並解壓,目錄可以隨便選。
- 本人放得目錄為:/opt/cloudera/parcels/KAFKA/lib/kafka/kafka_connect
- 並把 debezium-connector-mysql 目錄下得jar包都拷貝一份到${KAFKA_HOME}/libs中
- 把Mysql/PG得jdbc包放入libs中 [mysql-connector-java-8.0.21.ja]
- Oracle需要下載客戶端並把jar包復制到${KAFKA_HOME}/libs
下載地址 - 修改 ${KAFKA_HOME}/bin 或者 [opt/cloudera/parcels/KAFKA/etc/kafka/conf.dist] 中配置文件
- 正常/CDH環境
- 單機部署修改 [connect-standalone.properties]
- 集群部署修改 [connect-distributed.properties]
- 修改 Kafka cluster,打開 plugin.path 配置,並配置目錄
- 如果有多個不同的數據庫(Mysql/PG/Oracle)需要監控,目錄之間用逗號分隔
- 啟動Kafka集群,設置環境變量
export KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/opt/cloudera/parcels/KAFKA/etc/kafka/conf.dist/connect-log4j.properties #不設置后面kafka會報錯
./bin/connect-distributed.sh ../../etc/kafka/conf.dist/connect-distributed.properties
- 提交mysql-connector,監視Mysql數據庫
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://192.168.58.172:8083/connectors/ -d '
{
"name" : "debezium-mysql",
"config":{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "10.20.60.44", #mysql的IP地址
"database.port": "3306", #mysql的端口號
"database.user": "yaowentao", #mysql的用戶名
"database.password": "Sx202101", #mysql用戶對應的密碼
"database.server.id" :"1739",
"database.server.name": "Mysql", #mysql服務的邏輯名,例如Mysql
"database.history.kafka.bootstrap.servers": "192.168.58.171:9092,192.168.58.172:9092,192.168.58.177:9092", #Kafka集群地址
"database.history.kafka.topic": "dbhistory.mydb", #Kafka topic名稱
"database.whitelist": "mydb",
#"table.whitelist":"mydb.orders",
"include.schema.changes" : "true" ,
"decimal.handling.mode": "string", #處理浮點值
"mode" : "incrementing",
"incrementing.column.name" : "id",
"database.history.skip.unparseable.ddl" : "true"
}
}'
- 提交Oracle-connector,監視Mysql數據庫
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://192.168.58.172:8083/connectors/ -d '
{
"name": "debezium-oracle-yaowentao",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"tasks.max" : "1",
"database.server.name" : "helowin",
"database.hostname" : "10.20.60.44",
"database.port" : "1521",
"database.user" : "dbzuser",
"database.password" : "dbz",
"database.dbname" : "helowin",
"database.schema" : "scott",
"database.connection.adapter": "logminer", #1.4版本需要設置
"database.tablename.case.insensitive": "true",
"table.include.list" : "scott.*", #表白名單
"snapshot.mode" : "initial",
"schema.include.list" : "scott",#schema白名單
"database.history.kafka.bootstrap.servers" : "192.168.58.171:9092,192.168.58.172:9092,192.168.58.177:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}'
- 查看是否啟動成功,JPS
- 如果是CDH集群,會報一個日志文件找不到得情況
解決辦法:將配置文件得路徑指向
export KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/opt/cloudera/parcels/KAFKA/etc/kafka/conf.dist/connect-log4j.properties
- 查看connectors得情況,也可以在瀏覽器中查詢。本案例在命令行中執行
查看創建的connector列表
curl -X GET http://192.168.58.171:8083/connectors
http://192.168.58.172:8083/connectors
查看創建的connector狀態
curl -X GET http://192.168.58.171:8083/connectors/debezium-mysql/status
http://192.168.58.172:8083/connectors
查看創建的connector配置
curl -X GET http://192.168.58.171:8083/connectors/debezium-mysql/config
刪除connector
curl -X DELETE http://192.168.58.171:8083/connectors/debezium-mysql
- Kafka Connector啟動后 會將監視得庫中每個表都創建個一個topic,且該topic只包含該表得增刪改(insert/delete/update)操作。DDL操作會統一寫入以配置文件中得database.server.name參數的值為名稱的topic內。命名方式:
- DDL topic:serverName
- DML topic:serverName.databaseName.tableName
2. 數據計算
2.1
# Flink執行sql語句
DROP TABLE ORDERS;
CREATE TABLE orders (
order_id INT,
order_date STRING,
customer_name STRING,
price double,
product_id INT,
order_status INT
) WITH (
'connector' = 'kafka',
'format' = 'debezium-json',
'topic' = 'Mysql.mydb.orders',
'properties.bootstrap.servers' = '192.168.58.171:9092,192.168.58.172:9092,192.168.58.177:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'debezium-json.schema-include' = 'true'
);
#時間需要轉換
SELECT TO_TIMESTAMP_LTZ(cast(t.order_date as bigint),3) order_date_times,t.* from orders t;
3. 數據導入
3.1 Flink中創建表,直接可以導入
4. 補充,Oralce數據庫配置(11G往后的配置可參考官網)
alter system set db_recovery_file_dest_size=5G; #按要求修改,不然會報錯
#Oracle 開啟歸檔日志
alter database add supplemental log data (all) columns; #開啟行模式
#創建 新得表空間與dbzuser,並賦予相應得權限
CREATE TABLESPACE LOGMINER_TBS DATAFILE '/home/oracle/app/oracle/oradata/helowin/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER dbzuser IDENTIFIED BY dbz DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS ;
GRANT CREATE SESSION TO dbzuser;
GRANT SELECT ON V_$DATABASE TO dbzuser;
GRANT FLASHBACK ANY TABLE TO dbzuser;
GRANT SELECT ANY TABLE TO dbzuser;
GRANT SELECT_CATALOG_ROLE TO dbzuser;
GRANT EXECUTE_CATALOG_ROLE TO dbzuser;
GRANT SELECT ANY TRANSACTION TO dbzuser;
GRANT SELECT ANY DICTIONARY TO dbzuser;
GRANT CREATE TABLE TO dbzuser;
GRANT ALTER ANY TABLE TO dbzuser;
GRANT LOCK ANY TABLE TO dbzuser;
GRANT CREATE SEQUENCE TO dbzuser;
GRANT EXECUTE ON DBMS_LOGMNR TO dbzuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO dbzuser;
GRANT SELECT ON V_$LOGMNR_LOGS to dbzuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO dbzuser;
GRANT SELECT ON V_$LOGFILE TO dbzuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO dbzuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO dbzuser;
#暫時可以不用,官網有做要求,暫時沒明白有什么用
CREATE USER debezium IDENTIFIED BY dbz DEFAULT TABLESPACE USERS QUOTA UNLIMITED ON USERS;
GRANT CONNECT TO debezium;
GRANT CREATE SESSION TO debezium;
GRANT CREATE TABLE TO debezium;
GRANT CREATE SEQUENCE to debezium;
ALTER USER debezium QUOTA 100M on users;