CDC
CDC是Change Data Capture(變更數據獲取)的簡稱。核心思想是,監測並捕獲數據庫的變動(包括數據或數據表的插入、更新以及刪除等),將這些變更按發生的順序完整記錄下來,寫入到消息中間件中以
供其他服務進行訂閱及消費。
CDC主要分為基於查詢和基於Binlog兩種方式,這兩種之間的區別:
|
基於查詢的CDC |
基於Binlog的CDC |
開源產品 |
Sqoop、Kafka JDBC Source |
Canal、Maxwell、Debezium |
執行模式 |
Batch |
Streaming |
是否可以捕獲所有數據變化 |
否 |
是 |
延遲性 |
高延遲 |
低延遲 |
是否增加數據庫壓力 |
是 |
否 |
Flink社區開發了 flink-cdc-connectors 組件,這是一個可以直接從 MySQL、PostgreSQL 等數據庫直接讀取全量數據和增量變更數據的 source 組件。目前也已開源,開源地址:
https://github.com/ververica/flink-cdc-connectors
https://ververica.github.io/flink-cdc-connectors/master/
Caused by: org.apache.flink.table.api.ValidationException: Currently Flink MySql CDC connector only supports MySql whose version is larger or equal to 5.7, but actual is 5.6.
Mysql的配置
修改my.cnf配置
[kris@hadoop101 ~]$ sudo vim /etc/my.cnf [mysqld] max_allowed_packet=1024M server_id=1 log-bin=master binlog_format=row binlog-do-db=gmall binlog-do-db=test #添加test庫
重啟mysql
sudo service mysql start Starting MySQL [確定]
或(mysql版本)
sudo service mysqld start
查看mysql的binlog文件
kris@hadoop101 ~]$ cd /var/lib/mysql/ [kris@hadoop101 mysql]$ ll 總用量 178740
-rw-rw---- 1 mysql mysql 1076 4月 18 2021 mysql-bin.000095
-rw-rw---- 1 mysql mysql 143 5月 9 2021 mysql-bin.000096
-rw-rw---- 1 mysql mysql 846 7月 26 2021 mysql-bin.000097
-rw-rw---- 1 mysql mysql 143 11月 3 08:26 mysql-bin.000098
-rw-rw---- 1 mysql mysql 143 2月 4 22:05 mysql-bin.000099
-rw-rw---- 1 mysql mysql 120 2月 6 11:21 mysql-bin.000100
-rw-rw---- 1 mysql mysql 1900 2月 6 11:21 mysql-bin.index srwxrwxrwx 1 mysql mysql 0 2月 6 11:21 mysql.sock drwx------ 2 mysql mysql 4096 5月 17 2019 online_edu drwx------ 2 mysql mysql 4096 3月 15 2019 performance_schema -rw-r--r-- 1 root root 125 3月 15 2019 RPM_UPGRADE_HISTORY -rw-r--r-- 1 mysql mysql 125 3月 15 2019 RPM_UPGRADE_MARKER-LAST drwx------ 2 mysql mysql 4096 4月 19 2019 sparkmall drwxr-xr-x 2 mysql mysql 4096 4月 26 2020 test
案例
Maven 依賴:

<properties>
<flink-version>1.13.0</flink-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
DataStream方式的應用
測試代碼
package com.stream.flinkcdc; import com.ververica.cdc.connectors.mysql.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * DataStream方式的應用 */
public class FlinkCDC { public static void main(String[] args) throws Exception { //1.獲取Flink 執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder() .hostname("hadoop101") .port(3306) .username("root") .password("123456") .databaseList("test") .tableList("test.student_info") //庫名.表名 不加讀取當前庫下所有表
.deserializer(new StringDebeziumDeserializationSchema()) .startupOptions(StartupOptions.initial()) //initial 這種模式先初始化全部讀取表中的數據, 再去讀取最新的binlog文件
//latest() 這種模式是只會讀取最新binlog的數據,不會讀歷史數據。
.build(); DataStreamSource<String> mysqlDS = env.addSource(sourceFunction); //5. 打印數據
mysqlDS.print(); //6. 執行任務
env.execute(); } }
initial初始化,將表中的數據全部打印出,op = r
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1644126182, file=master.000001, pos=154, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.student_info', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.test.student_info.Key:STRUCT}, value=Struct{after=Struct{id=1,uname=kris,uage=20,create_time=2022-02-06T19:24:00Z,update_time=2022-02-06T19:24:00Z,update_time_dw=2022-02-06T19:24:00Z},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1644126182017,snapshot=true,db=test,table=student_info,server_id=0,file=master.000001,pos=154,row=0},op=r,ts_ms=1644126182026}, valueSchema=Schema{mysql_binlog_source.test.student_info.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)} SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1644126182, file=master.000001, pos=154}} ConnectRecord{topic='mysql_binlog_source.test.student_info', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_binlog_source.test.student_info.Key:STRUCT}, value=Struct{after=Struct{id=2,uname=smile,uage=18,create_time=2022-02-06T19:24:14Z,update_time=2022-02-06T19:24:14Z,update_time_dw=2022-02-06T19:24:14Z},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1644126182036,snapshot=last,db=test,table=student_info,server_id=0,file=master.000001,pos=154,row=0},op=r,ts_ms=1644126182037}, valueSchema=Schema{mysql_binlog_source.test.student_info.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)} 新增 op = c SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1644126223, file=master.000001, pos=219, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.test.student_info', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_binlog_source.test.student_info.Key:STRUCT}, value=Struct{after=Struct{id=3,uname=CC,uage=18,create_time=2022-02-06T05:43:43Z,update_time=2022-02-06T05:43:43Z,update_time_dw=2022-02-06T05:43:43Z},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1644126223000,db=test,table=student_info,server_id=1,file=master.000001,pos=356,row=0},op=c,ts_ms=1644126223433}, valueSchema=Schema{mysql_binlog_source.test.student_info.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)} 修改 op = u SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1644126435, file=master.000001, pos=511, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.test.student_info', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_binlog_source.test.student_info.Key:STRUCT}, value=Struct{before=Struct{id=2,uname=smile,uage=18,create_time=2022-02-06T05:24:14Z,update_time=2022-02-06T05:24:14Z,update_time_dw=2022-02-06T05:24:14Z},after=Struct{id=2,uname=smile,uage=28,create_time=2022-02-06T05:24:14Z,update_time=2022-02-06T05:24:14Z,update_time_dw=2022-02-06T05:24:14Z},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1644126435000,db=test,table=student_info,server_id=1,file=master.000001,pos=648,row=0},op=u,ts_ms=1644126435149}, valueSchema=Schema{mysql_binlog_source.test.student_info.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)} 刪除 op = d SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1644126510, file=master.000001, pos=834, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.test.student_info', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_binlog_source.test.student_info.Key:STRUCT}, value=Struct{before=Struct{id=3,uname=CC,uage=18,create_time=2022-02-06T05:43:43Z,update_time=2022-02-06T05:43:43Z,update_time_dw=2022-02-06T05:43:43Z},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1644126510000,db=test,table=student_info,server_id=1,file=master.000001,pos=971,row=0},op=d,ts_ms=1644126510055}, valueSchema=Schema{mysql_binlog_source.test.student_info.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
問題所在 查看mysql中時間和 同步回的時間,initial相差了6h; op in (c, u, d)時相差8h ;
增加flink 的checkpoint 的配置
// Flink-CDC將讀取binlog的位置信息以狀態的方式保存在CK,如果想要做到斷點續傳,需要從Checkpoint或者Savepoint啟動程序
env.enableCheckpointing(5000L); //開啟Checkpoint,每隔5秒鍾做一次CK
env.getCheckpointConfig().setCheckpointTimeout(10000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //最大並發的 checkpoint
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //指定CK的一致性語義
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//設置任務關閉的時候保留最后一次CK數據
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L)); //指定從CK自動重啟策略
env.setStateBackend(new FsStateBackend("hdfs:hadoop101:8020/flinkCDC")); //狀態后端
System.setProperty("HADOOP_USER_NAME", "kris"); //設置訪問HDFS的用戶名
詳細代碼如下:
package com.stream.flinkcdc; import com.ververica.cdc.connectors.mysql.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * DataStream方式的應用 */
public class FlinkCDC { public static void main(String[] args) throws Exception { //1.獲取Flink 執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2. 開啟CK // Flink-CDC將讀取binlog的位置信息以狀態的方式保存在CK,如果想要做到斷點續傳,需要從Checkpoint或者Savepoint啟動程序
env.enableCheckpointing(5000L); //開啟Checkpoint,每隔5秒鍾做一次CK
env.getCheckpointConfig().setCheckpointTimeout(10000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //最大並發的 checkpoint
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //指定CK的一致性語義
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//設置任務關閉的時候保留最后一次CK數據
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L)); //指定從CK自動重啟策略
env.setStateBackend(new FsStateBackend("hdfs:hadoop101:8020/flinkCDC")); //狀態后端
System.setProperty("HADOOP_USER_NAME", "kris"); //設置訪問HDFS的用戶名 //3.通過FlinkCDC構建SourceFunction // 創建Flink-MySQL-CDC的Source //initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog. //latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started. //timestamp: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp. The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp. //specific-offset: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset.
DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder() .hostname("hadoop101") .port(3306) .username("root") .password("123456") .databaseList("test") .tableList("test.student_info") //庫名.表名 不加讀取當前庫下所有表
.deserializer(new StringDebeziumDeserializationSchema()) .startupOptions(StartupOptions.initial()) //initial 這種模式先初始化全部讀取表中的數據, 再去讀取最新的binlog文件
.build(); DataStreamSource<String> mysqlDS = env.addSource(sourceFunction); //4. 打印數據
mysqlDS.print(); //5. 執行任務
env.execute(); } }
① 啟動Hadoop、 zookeeper、 Flink_standalone模式 yarn模式
② 啟動flincdc 程序
3個Task Manager,每個Task Manager有2個slot,一共有6個Task Slots
bin/flink run -m hadoop101:8081 -c com.stream.flinkcdc.FlinkCDC ./flink-1.0-SNAPSHOT-jar-with-dependencies.jar
③ 在MySQL表中添加、修改或者刪除數據
④ 給當前的Flink程序創建Savepoint(測試從當前的savepoint的位置重啟)
bin/flink savepoint JobId hdfs://hadoop101:8020/flink/save
bin/flink savepoint 30018d6b36959e2d862eca8c7914fa57 hdfs://hadoop101:8020/flinkcdc/save Savepoint completed. Path:hdfs://hadoop101:8020/flinkcdc/save/savepoint-30018d-b5b60b62399f You can resume your program from this savepoint with the run command.
⑤ 在MySQL表中添加、修改或者刪除數據
⑥ 關閉程序以后從Savepoint重啟程序
bin/flink run \
-m hadoop101:8081 \
-s hdfs://hadoop101:8020/flink/save/savepoint-30018d-b5b60b62399f \
-c com.stream.flinkcdc.FlinkCDC \
./flink-1.0-SNAPSHOT-jar-with-dependencies.jar
可以看到initial之后,去讀取最新的bin-log信息,當前打印的還是mysql中最新的數據。
FlinkSQL方式的應用
① 不需要反序列化器,因為它會根據創建的表格進行解析;
② initial初始化沒有了改為ddl中的配置;
③ 這種方式一次只能讀取一張表,不能像dataStream樣讀取庫下的所有表;
④ cdc 2.0的dataStream方式可以兼容flink1.12,flinksql這種方式只能flink1.13
代碼:
package com.stream.flinkcdc; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; /** * FlinkSQL方式的應用 * */
public class FlinkSQLCDC { public static void main(String[] args) throws Exception { //1.創建執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //2.創建FlinkSQL DDL模式構建CDC表
tableEnv.executeSql("CREATE TABLE student_info (" +
" id INT primary key," +
" uname STRING," +
" uage INT," +
" create_time STRING," +
" update_time STRING," +
" update_time_dw STRING" +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'scan.startup.mode' = 'latest-offset', " + // initial初始化歷史所有的數據;
" 'hostname' = 'hadoop101'," +
" 'port' = '3306'," +
" 'username' = 'root'," +
" 'password' = '123456'," +
" 'database-name' = 'test'," +
" 'table-name' = 'student_info'" +
")"); //3. 查詢數據並轉換為流輸出 //tableEnv.executeSql("select * from student_info").print();
Table table = tableEnv.sqlQuery("select * from student_info"); DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class); retractStream.print(); //4. 啟動
env.execute("FlinkSQL CDC"); } }
測試如下:
"'scan.startup.mode' = 'initial', " initial模式只會初始化歷史數據,不會監控最新的bin-log變更: (true,+I[2, smile, 28, 2022-02-06T13:24:14Z, 2022-02-06T13:24:14Z, 2022-02-06T13:24:14Z]) (true,+I[1, kris, 20, 2022-02-06T13:24:00Z, 2022-02-06T13:24:00Z, 2022-02-06T13:24:00Z])
"'scan.startup.mode' = 'latest-offset', 這種模式只監控最新的變更: 變更 -U (false,-U[3, DD, 20, 2022-02-06T09:28:08Z, 2022-02-06T09:28:08Z, 2022-02-06T09:28:08Z]) (true,+U[3, DD, 18, 2022-02-06T09:28:08Z, 2022-02-06T09:28:08Z, 2022-02-06T09:28:08Z]) 新增 +I (true,+I[4, EE, 22, 2022-02-06T09:31:30Z, 2022-02-06T09:31:30Z, 2022-02-06T09:31:30Z]) 刪除 -D (false,-D[3, DD, 18, 2022-02-06T09:28:08Z, 2022-02-06T09:28:08Z, 2022-02-06T09:28:08Z])
自定義反序列化器
代碼如下:
/* 自定義反序列化器 */ public static class CustomerDeserializationSchema implements DebeziumDeserializationSchema<String> { /** * { * "db":"", * "tableName":"", * "before":{"id":"1001","name":""...}, * "after":{"id":"1001","name":""...}, * "op":"" * } */ @Override public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception { //創建JSON對象用於封裝結果數據 JSONObject result = new JSONObject(); //獲取庫名&表名 String topic = sourceRecord.topic(); String[] fields = topic.split("\\."); result.put("db", fields[1]); result.put("tableName", fields[2]); //獲取before數據 Struct value = (Struct) sourceRecord.value(); Struct before = value.getStruct("before"); JSONObject beforeJson = new JSONObject(); if (before != null) { //獲取列信息 Schema schema = before.schema(); List<Field> fieldList = schema.fields(); for (Field field : fieldList) { beforeJson.put(field.name(), before.get(field)); } } result.put("before", beforeJson); //獲取after數據 Struct after = value.getStruct("after"); JSONObject afterJson = new JSONObject(); if (after != null) { //獲取列信息 Schema schema = after.schema(); List<Field> fieldList = schema.fields(); for (Field field : fieldList) { afterJson.put(field.name(), after.get(field)); } } result.put("after", afterJson); //獲取操作類型 Envelope.Operation operation = Envelope.operationFor(sourceRecord); result.put("op", operation); //輸出數據 collector.collect(result.toJSONString()); } @Override public TypeInformation<String> getProducedType() { return BasicTypeInfo.STRING_TYPE_INFO; } }

package com.stream.flinkcdc; import com.alibaba.fastjson.JSONObject; import com.ververica.cdc.connectors.mysql.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.DebeziumSourceFunction; import io.debezium.data.Envelope; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import java.util.List; import java.util.Properties; /** * 自定義反序列化器 */ public class CDCWithCustomerSchema { public static void main(String[] args) throws Exception { //1.獲取Flink 執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //1.1 開啟CK // env.enableCheckpointing(5000); // env.getCheckpointConfig().setCheckpointTimeout(10000); // env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // // env.setStateBackend(new FsStateBackend("hdfs://hadoop101:8020/cdc-test/ck")); //2.通過FlinkCDC構建SourceFunction DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder() .hostname("hadoop101") .port(3306) .username("root") .password("123456") .databaseList("test") .tableList("test.student_info") .deserializer(new CustomerDeserializationSchema()) .startupOptions(StartupOptions.initial()) .build(); DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction); //3.數據打印 dataStreamSource.print(); //4.啟動任務 env.execute("FlinkCDC"); } /* 自定義反序列化器 */ public static class CustomerDeserializationSchema implements DebeziumDeserializationSchema<String> { /** * { * "db":"", * "tableName":"", * "before":{"id":"1001","name":""...}, * "after":{"id":"1001","name":""...}, * "op":"" * } */ @Override public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception { //創建JSON對象用於封裝結果數據 JSONObject result = new JSONObject(); //獲取庫名&表名 String topic = sourceRecord.topic(); String[] fields = topic.split("\\."); result.put("db", fields[1]); result.put("tableName", fields[2]); //獲取before數據 Struct value = (Struct) sourceRecord.value(); Struct before = value.getStruct("before"); JSONObject beforeJson = new JSONObject(); if (before != null) { //獲取列信息 Schema schema = before.schema(); List<Field> fieldList = schema.fields(); for (Field field : fieldList) { beforeJson.put(field.name(), before.get(field)); } } result.put("before", beforeJson); //獲取after數據 Struct after = value.getStruct("after"); JSONObject afterJson = new JSONObject(); if (after != null) { //獲取列信息 Schema schema = after.schema(); List<Field> fieldList = schema.fields(); for (Field field : fieldList) { afterJson.put(field.name(), after.get(field)); } } result.put("after", afterJson); //獲取操作類型 Envelope.Operation operation = Envelope.operationFor(sourceRecord); result.put("op", operation); //輸出數據 collector.collect(result.toJSONString()); } @Override public TypeInformation<String> getProducedType() { return BasicTypeInfo.STRING_TYPE_INFO; } } }
測試:
StartupOptions.initial() 模式: 信息: Connected to hadoop101:3306 at master.000001/3155 (sid:5498, cid:31) {"op":"READ","before":{},"after":{"update_time":"2022-02-06T19:24:00Z","uname":"kris","create_time":"2022-02-06T19:24:00Z","id":1,"uage":20,"update_time_dw":"2022-02-06T19:24:00Z"},"db":"test","tableName":"student_info"} {"op":"READ","before":{},"after":{"update_time":"2022-02-06T19:24:14Z","uname":"smile","create_time":"2022-02-06T19:24:14Z","id":2,"uage":28,"update_time_dw":"2022-02-06T19:24:14Z"},"db":"test","tableName":"student_info"} {"op":"READ","before":{},"after":{"update_time":"2022-02-06T23:31:30Z","uname":"EE","create_time":"2022-02-06T23:31:30Z","id":4,"uage":22,"update_time_dw":"2022-02-06T23:31:30Z"},"db":"test","tableName":"student_info"} {"op":"CREATE","before":{},"after":{"update_time":"2022-02-06T09:46:37Z","uname":"DD","create_time":"2022-02-06T09:46:37Z","id":3,"uage":17,"update_time_dw":"2022-02-06T09:46:37Z"},"db":"test","tableName":"student_info"} {"op":"UPDATE","before":{"update_time":"2022-02-06T09:46:37Z","uname":"DD","create_time":"2022-02-06T09:46:37Z","id":3,"uage":17,"update_time_dw":"2022-02-06T09:46:37Z"},"after":{"update_time":"2022-02-06T09:46:37Z","uname":"DD","create_time":"2022-02-06T09:46:37Z","id":3,"uage":27,"update_time_dw":"2022-02-06T09:46:37Z"},"db":"test","tableName":"student_info"} {"op":"DELETE","before":{"update_time":"2022-02-06T09:31:30Z","uname":"EE","create_time":"2022-02-06T09:31:30Z","id":4,"uage":22,"update_time_dw":"2022-02-06T09:31:30Z"},"after":{},"db":"test","tableName":"student_info"}
DataStreamAPI和Flinksql方式的區別
DataStreamAPI可以在Flink1.12/1.13 用,sql的方式只能在1.13用;
DataStreamAPI支持多庫多表,sql只能單表;
DataStreamAPI需要自定義反序列化,sql不需要反序列化;
FlinkCDC2.0
1.X的痛點
數據初始化時,先要查詢所有的表,需要去加鎖;
2.0的設計
DBLog: A Watermark Based Change-Data-Capture Framework (arxiv.org)
FLIP-27: Refactor Source Interface - Apache Flink - Apache Software Foundation
概述
在對於有主鍵的表做初始化模式,整體的流程主要分為5個階段:
- 1.Chunk切分;2.Chunk分配;(實現並行讀取數據&CheckPoint)
- 3.Chunk讀取;(實現無鎖讀取)
- 4.Chunk匯報;
- 5.Chunk分配。
① Chunk切分
根據Netflix DBlog的論文中的無鎖算法原理,對於目標表按照主鍵進行數據分片,設置每個切片的區間為左閉右開或者左開右閉來保證數據的連續性。
② Chunk分配
將划分好的Chunk分發給多個 SourceReader,每個SourceReader讀取表中的一部分數據,實現了並行讀取的目標。
同時在每個Chunk讀取的時候可以單獨做CheckPoint,某個Chunk讀取失敗只需要單獨執行該Chunk的任務,而不需要像1.x中失敗了只能從頭讀取。
若每個SourceReader保證了數據一致性,則全表就保證了數據一致性。
③ Chunk讀取
讀取可以分為5個階段
1)SourceReader讀取表數據之前先記錄當前的Binlog位置信息記為低位點;
2)SourceReader將自身區間內的數據查詢出來並放置在buffer中;
3)查詢完成之后記錄當前的Binlog位置信息記為高位點;
4)在增量部分消費從低位點到高位點的Binlog;
5)根據主鍵,對buffer中的數據進行修正並輸出。
通過以上5個階段可以保證每個Chunk最終的輸出就是在高位點時該Chunk中最新的數據,但是目前只是做到了保證單個Chunk中的數據一致性。
④Chunk匯報
在Snapshot Chunk讀取完成之后,有一個匯報的流程,如上圖所示,即SourceReader需要將Snapshot Chunk完成信息匯報給SourceEnumerator。
⑤ Chunk分配
FlinkCDC是支持全量+增量數據同步的,在SourceEnumerator接收到所有的Snapshot Chunk完成信息之后,還有一個消費增量數據(Binlog)的任務,此時是通過下發Binlog Chunk給任意一個SourceReader進
行單並發讀取來實現的。
核心原理分析
Binlog Chunk中開始讀取位置源碼
MySqlHybridSplitAssigner

private MySqlBinlogSplit createBinlogSplit() { final List<MySqlSnapshotSplit> assignedSnapshotSplit = snapshotSplitAssigner.getAssignedSplits().values().stream() .sorted(Comparator.comparing(MySqlSplit::splitId)) .collect(Collectors.toList()); Map<String, BinlogOffset> splitFinishedOffsets = snapshotSplitAssigner.getSplitFinishedOffsets(); final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>(); final Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>(); BinlogOffset minBinlogOffset = BinlogOffset.INITIAL_OFFSET; for (MySqlSnapshotSplit split : assignedSnapshotSplit) { // find the min binlog offset BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId()); if (binlogOffset.compareTo(minBinlogOffset) < 0) { minBinlogOffset = binlogOffset; } finishedSnapshotSplitInfos.add( new FinishedSnapshotSplitInfo( split.getTableId(), split.splitId(), split.getSplitStart(), split.getSplitEnd(), binlogOffset)); tableSchemas.putAll(split.getTableSchemas()); } final MySqlSnapshotSplit lastSnapshotSplit = assignedSnapshotSplit.get(assignedSnapshotSplit.size() - 1).asSnapshotSplit(); return new MySqlBinlogSplit( BINLOG_SPLIT_ID, lastSnapshotSplit.getSplitKeyType(), minBinlogOffset, BinlogOffset.NO_STOPPING_OFFSET, finishedSnapshotSplitInfos, tableSchemas); }
讀取低位點到高位點之間的Binlog
BinlogSplitReader

/** * Returns the record should emit or not. * * <p>The watermark signal algorithm is the binlog split reader only sends the binlog event that * belongs to its finished snapshot splits. For each snapshot split, the binlog event is valid * since the offset is after its high watermark. * * <pre> E.g: the data input is : * snapshot-split-0 info : [0, 1024) highWatermark0 * snapshot-split-1 info : [1024, 2048) highWatermark1 * the data output is: * only the binlog event belong to [0, 1024) and offset is after highWatermark0 should send, * only the binlog event belong to [1024, 2048) and offset is after highWatermark1 should send. * </pre> */ private boolean shouldEmit(SourceRecord sourceRecord) { if (isDataChangeRecord(sourceRecord)) { TableId tableId = getTableId(sourceRecord); BinlogOffset position = getBinlogPosition(sourceRecord); // aligned, all snapshot splits of the table has reached max highWatermark if (position.isAtOrBefore(maxSplitHighWatermarkMap.get(tableId))) { return true; } Object[] key = getSplitKey( currentBinlogSplit.getSplitKeyType(), sourceRecord, statefulTaskContext.getSchemaNameAdjuster()); for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) { if (RecordUtils.splitKeyRangeContains( key, splitInfo.getSplitStart(), splitInfo.getSplitEnd()) && position.isAtOrBefore(splitInfo.getHighWatermark())) { return true; } } // not in the monitored splits scope, do not emit return false; } // always send the schema change event and signal event // we need record them to state of Flink return true; }