CDC概念
CDC全稱是Change data Cpature,即變更數據捕獲,主要面向數據庫的變更,是數據庫領域非常常見的技術,主要用於捕獲數據庫的一些變更,然后可以把變更數據發送到下游。

CDC類型
1.基於查詢的,客戶端會通過SQL方式查詢源庫表變更數據,然后對外發送。
2.基於日志的,這也是業界廣泛使用的一種方式,一般是通過binlog方式,變更的記錄會寫入binlog,解析binlog后會寫入消息系統,或直接基於Flink CDC進行處理。

CDC數據入湖
基於CDC數據的入湖,架構:上游各種各樣的數據源,比如DB的變更數據、事件流,以及各種外部數據源,都可以通過變更流的方式寫入表中,再進行外部的查詢分析

典型CDC入湖的鏈路:

鏈路1是大部分公司采取的鏈路,前面CDC的數據先通過CDC工具導入kafka或者Pulsar,再通過Flink或者是spark流式消費寫到Hudi里
鏈路2是通過Flink CDC直聯到MySQL上游數據源,直接寫到下游hudi表。
Flink CDC Hudi概述
基於Flink CDC技術,實時采集MySQL數據庫表數據,進行ETL轉換處理,最終存儲Hudi表

實踐
MySQL數據庫創建表,實時添加數據,通過Flink CDC將數據寫入Hudi表,並且Hudi與Hive集成,自動在hive中創建表與添加分區信息,最后hive終端beeline查詢分析數據。

hudi表與hive表自動關聯集成,需要重新編譯hudi源碼,指定hive版本及編譯時包含hive依賴jar包
1.MySQL數據庫,創建表及開啟binlog
2.創建flink CDC表,關聯到MySQL數據庫表
3.創建視圖,數據來源輸入表,字段與輸出表相同
4.創建輸出表,關聯到hudi表,自動同步到hive中,字段與hudi表相同
5.查詢視圖數據,插入到輸出表(hudi表)
6.查詢hive表數據,ro類型(讀優化查詢)和rt類型(快照查詢)
准備工作
1.編譯hudi源碼
修改hudi集成flink和hive編譯依賴版本配置
原因:現在版本hudi,在編譯的時候后本身默認已經集成了flink-SQL-connector-hive的包,會和flink lib包下的flink-SQL-connector-hive沖突。所以,編譯的過程中只修改hive編譯版本
文件: hudi-0.10.1/packaging/hudi-flink-bundle/pom.xml,將hive.version改為2.3.9
<include>org.apache.flink:flink-sql-connector-hive-2.3.9_${scala.binary.version}</include>
該問題從 0.10 版本已經解決
mvn clean install -DskipTests -Drat.skip=true -Dscala-2.12 -Dspark3 -Dhive.version=2.3.9 -Pflink-bundle-shade-hive2
編譯完成后,有2個jar包,非常重要
hudi-flink-bundle_2.12-0.10.1.jar位於hudi-0.10.1/packaging/hudi-flink-bundle/target,flink用來寫入和讀取數據,將其拷貝至$FLINK_HOME/lib目錄中,如果以前有同名jar包,先刪除再拷貝。
hudi-hadoop-mr-bundle-0.10.1.jar位於hudi-0.10.1/packaging/hudi-hadoop-mr-bundle/target,hive需要用來讀hudi數據,將其拷貝至$HIVE_HOME/lib目錄中。
2.將Flink CDC MySQL對應jar包,放到$FLINK_HOME/lib目錄中
建議用新版的CDC2,因為功能差了好多,CDC1的時候是把數據先全都讀取到內存,再執行后續操作,CDC2是邊讀邊執行后續操作,這么一比,CDC1被秒殺。
CDC1:
flink-sql-connector-mysql-cdc-1.4.0.jar
CDC2[升級版]:
flink-connector-mysql-cdc-2.0.2.jar
<dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.0.2</version> </dependency>
實現
零、組件版本
hudi:0.10.1 flink:1.13.1 hive:2.3.9 Hadoop:2.8.0 Scala:2.12.4
一、MySQL配置
1.MySQL數據庫,創建表及開啟binlog
vim /etc/my.cnf
在[mysqld]下面添加內容
server-id=2 log-bin=mysql-bin binlog_format=row expire_logs_day=15 binlog_row_image=full
2.重啟MySQL
service mysqld restart
查看是否生效
show master logs

3.建表
create database test_hudi; create table test_hudi.tbl_users( id bigint auto_increment primary key, name varchar(20) null, birthday timestamp default CURRENT_TIMESTAMP NOT NULL, ts timestamp default CURRENT_TIMESTAMP NOT NULL ); INSERT INTO test_hudi.tbl_users(name) VALUES(“測試”); INSERT INTO test_hudi.tbl_users(name) VALUES(“張三”); INSERT INTO test_hudi.tbl_users(name) VALUES(“李四”); INSERT INTO test_hudi.tbl_users(name) VALUES(“王五”); INSERT INTO test_hudi.tbl_users(name) VALUES(“趙六");
二、創建Flink CDC表,關聯到MySQL數據庫表
1.啟動相關服務
HDFS hadoop-daemon.sh start namenode hadoop-daemon.sh start datanode Hive ./hive --service metastore & ./hive --service hiveserver2 & Flink standalone start-cluster.sh 啟動Flink SQL Client客戶端 sql-client.sh embedded -j /Users/FengZhen/Desktop/Hadoop/flink/flink-1.13.1/lib/flink-connector-mysql-cdc-1.4.0.jar shell -j /Users/FengZhen/Desktop/Hadoop/flink/flink-1.13.1/lib/hudi-hive-sync-bundle-0.10.1.jar 設置屬性 set sql-client.execution.result-mode=tableau; set execution.checkpointing.interval=3sec;
2.創建輸入表,關聯MySQL表,采用MySQL CDC關聯
CREATE TABLE users_source_mysql( id BIGINT PRIMARY KEY NOT ENFORCED, name STRING, birthday TIMESTAMP(3), ts TIMESTAMP(3) )WITH( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '1234qwer', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode' = 'initial', 'database-name' = 'test_hudi', 'table-name' = 'tbl_users' ); Flink SQL> select * from users_source_mysql; +----+----------------------+--------------------------------+-------------------------+-------------------------+ | op | id | name | birthday | ts | +----+----------------------+--------------------------------+-------------------------+-------------------------+ | +I | 1 | 測試 | 2022-03-15 22:09:00.000 | 2022-03-15 22:09:00.000 |
使用CDC2必須配置一下主鍵,因為參數【scan.incremental.snapshot.enabled】默認為true,增量讀取就必須配置PK,如果不做增量讀取,直接改為false即可。
debezium.snapshot.mode枚舉值:
參數值 | 描述 |
initial(默認) | 連接器執行數據庫的初始一致性快照,快照完成后,連接器開始為后續數據庫更改流式傳輸事件記錄。 |
initial_only | 連接器只執行數據庫的初始一致性快照,不允許捕獲任何后續更改的事件。 |
schema_only | 連接器只捕獲所有相關表的表結構,不捕獲初始數據,但是會同步后續數據庫的更改記錄 |
schema_only_recovery | 設置此選項可恢復丟失或損壞的數據庫歷史主題(database.history.kafka.topic)。 |
三、創建視圖,查詢輸入表,字段與輸出表相同
創建視圖,增加分區列part,方便后續同步hive分區表
CREATE VIEW view_users_cdc AS SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') AS part FROM users_source_mysql; Flink SQL> select * From view_users_cdc; +----+----------------------+--------------------------------+-------------------------+-------------------------+--------------------------------+ | op | id | name | birthday | ts | part | +----+----------------------+--------------------------------+-------------------------+-------------------------+--------------------------------+ | +I | 1 | 測試 | 2022-03-15 22:09:00.000 | 2022-03-15 22:09:00.000 | 20220315 | | +I | 2 | 張三 | 2022-03-17 13:26:05.000 | 2022-03-17 13:26:05.000 | 20220317 | | +I | 3 | 李四 | 2022-03-17 13:26:05.000 | 2022-03-17 13:26:05.000 | 20220317 | | +I | 4 | 王五 | 2022-03-17 13:26:05.000 | 2022-03-17 13:26:05.000 | 20220317 | | +I | 5 | 趙六 | 2022-03-17 13:26:05.000 | 2022-03-17 13:26:05.000 | 20220317 |
四、創建CDC Hudi Sink 表,並自動同步hive分區
CREATE TABLE users_sink_hudi_hive( id bigint , name string, birthday TIMESTAMP(3), ts TIMESTAMP(3), part VARCHAR(20), primary key(id) not enforced ) PARTITIONED BY (part) with( 'connector'='hudi', 'path'= 'hdfs://localhost:9000/hudi-warehouse/users_sink_hudi_hive' , 'hoodie.datasource.write.recordkey.field'= 'id'-- 主鍵 , 'write.precombine.field'= 'ts'-- 自動precombine的字段 , 'write.tasks'= '1' , 'compaction.tasks'= '1' , 'write.rate.limit'= '2000'-- 限速 , 'table.type'= 'MERGE_ON_READ'-- 默認COPY_ON_WRITE,可選MERGE_ON_READ , 'compaction.async.enabled'= 'true'-- 是否開啟異步壓縮 , 'compaction.trigger.strategy'= 'num_commits'-- 按次數壓縮 , 'compaction.delta_commits'= '1'-- 默認為5 , 'changelog.enabled'= 'true'-- 開啟changelog變更 , 'read.streaming.enabled'= 'true'-- 開啟流讀 , 'read.streaming.check-interval'= '3'-- 檢查間隔,默認60s , 'hive_sync.enable'= 'true'-- 開啟自動同步hive , 'hive_sync.mode'= 'hms'-- 自動同步hive模式,默認jdbc模式, hms:hive metastore , 'hive_sync.metastore.uris'= 'thrift://localhost:9083'-- hive metastore地址 -- , 'hive_sync.jdbc_url'= 'jdbc:hive2://localhost:10000'-- hiveServer地址 , 'hive_sync.table'= 'users_sink_hudi_hive_sync'-- hive 新建表名 , 'hive_sync.db'= 'db_hudi'-- hive 新建數據庫名 , 'hive_sync.username'= ''-- HMS 用戶名 , 'hive_sync.password'= ''-- HMS 密碼 , 'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp類型 );
五、視圖數據寫入hudi表
INSERT INTO users_sink_hudi_hive
SELECT id,name,birthday,ts,part FROM view_users_cdc;
六、Hive表查詢
需要將hudi-hadoop-mr-bundle-0.10.1.jar包,放到$HIVE_HOME/lib下
啟動beeline客戶端,連接hiveserver2
beeline -u jdbc:hive2://localhost:10000 -n root -p 123456
我吐了,我的hive沒生成下邊這兩張表,我各種操作都TM不行,這個情況先保留,以后再看
會自動生成hudi MOR模式的兩張表
users_sink_hudi_hive_ro:ro表全稱read optimized table,對於MOR表同步的xxx_ro表,只暴露壓縮后的parquet。其查詢方式和COW表類似。設置完hiveInputformat之后和普通的hive表一樣查詢即可。
users_sink_hudi_rt:rt表示增量視圖,主要針對增量查詢的rt表;ro表只能查parquet文件數據,rt表parquet文件數據和log文件數據都可查。
查看hive表數據
set hive.exec.mode.local.auto = true;
-- 不添加的話會導致count數據和select * 數據量不一致 set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat; set hive.mapred.mode = nonstrict;