CDC介紹
CDC 是 Change Data Capture(變更數據獲取)的簡稱。核心思想是,監測並捕獲數據庫的變動(包括數據或數據表的插入、更新以及刪除等),將這些變更按發生的順序完整記錄下來,寫入到消息中間件中以供其他服務進行訂閱及消費。
CDC種類
基於查詢的CDC
例如:Sqoop、JDBC source等產品。
特點:基於批處理,不能捕獲到所有數據的變化、高延遲、需要查詢數據庫,會增加數據庫壓力
基於binlog的CDC
例如:Maxwell、Canal、Debezium
特點:基於streaming模式、能捕捉所有數據的變化、低延遲、不會增加數據庫壓力。
Flink-CDC
Flink 社區開發了flink-cdc-connectors組件,這是一個可以直接從MySQL、PostgreSQL
等數據庫直接讀取全量數據和增量變更數據的source組件。目前已開源。
開源地址:https://github.com/ververica/flink-cdc-connectors
Flink-CDC實踐案例
1.開啟mysql binlog
查看mysql-binlog狀態並開啟mysql-binlog
上圖是開始的狀態。如果沒有開始,則log_bin=off,log_bin_basename和log_bin_index值為空。開啟方式如下:
vim vim /etc/my.cnf
在添加以下信息
#開啟binglog
server-id=1
log-bin=/var/lib/mysql/mysql-bin
server-id表示單個結點的id,這里由於只有一個結點,所以可以把id隨機指定為一個數,這里將id設置成1。若集群中有多個結點,則id不能相同
第二句是指定binlog日志文件的名字為mysql-bin,以及其存儲路徑。
添加完成后保存退出。
重啟mysql服務
service mysqld restart
查看binlog
2.建立mysql測試表並初始化數據
3.編寫Flink-CDC代碼
導入jar包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
編寫測試類
package com.meijs;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCDC {
public static void main(String args[]) throws Exception {
//獲取執行環境
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(1);
DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("192.168.154.130")
.port(3306)
.username("root")
.password("123456")
.databaseList("test")
.tableList("test.flink_cdc_test")//監控對應的表,如果沒有該參數,則是監控全表
.deserializer(new StringDebeziumDeserializationSchema())
.startupOptions(StartupOptions.initial())//initial對監控的表做一個初始化快照,earliest,latest等參數與kafka的的offset類似
.build();
DataStreamSource<String> streamSource = executionEnvironment.addSource(sourceFunction);
streamSource.print();
executionEnvironment.execute("FlinkCDC");
}
}
4.測試Flink-CDC
初始化執行后的打印結果如下:
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000005, pos=1353, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{after=Struct{id=1,name=小米,log_url=www.xiaomi.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=test,table=flink_cdc_test,server_id=0,file=mysql-bin.000005,pos=1353,row=0},op=c,ts_ms=1641905845597}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000005, pos=1353, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{after=Struct{id=2,name=華為,log_url=www.huawei.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=test,table=flink_cdc_test,server_id=0,file=mysql-bin.000005,pos=1353,row=0},op=c,ts_ms=1641905845602}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000005, pos=1353, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{after=Struct{id=3,name=蘋果,log_url=www.pingguo.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=test,table=flink_cdc_test,server_id=0,file=mysql-bin.000005,pos=1353,row=0},op=c,ts_ms=1641905845602}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000005, pos=1353}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=4}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{after=Struct{id=4,name=歐派,log_url=www.oppo.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=last,db=test,table=flink_cdc_test,server_id=0,file=mysql-bin.000005,pos=1353,row=0},op=c,ts_ms=1641905845602}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
op=c代表是創建,after為啟動后當前的數據狀態
更新一條數據觀察打印結果
打印日志如下
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1641906085, file=mysql-bin.000005, pos=1418, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=4}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{before=Struct{id=4,name=歐派,log_url=www.oppo.com},after=Struct{id=4,name=oppo,log_url=www.oppo.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1641906085000,db=test,table=flink_cdc_test,server_id=1,file=mysql-bin.000005,pos=1553,row=0,thread=14},op=u,ts_ms=1641906085304}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
op=u代表為update,before為修改更新前的數據,after更新后的數據狀態
刪除一條數據觀察打印結果
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1641906292, file=mysql-bin.000005, pos=1735, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{before=Struct{id=3,name=蘋果,log_url=www.pingguo.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1641906292000,db=test,table=flink_cdc_test,server_id=1,file=mysql-bin.000005,pos=1870,row=0,thread=14},op=d,ts_ms=1641906292636}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
op=d代表為delete,before為修改更新前的數據,可以看到沒after
在開啟狀態上增加一條數據觀察打印結果
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1641906490, file=mysql-bin.000005, pos=2030, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=6}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{after=Struct{id=6,name=kupai,log_url=www.kupai.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1641906490000,db=test,table=flink_cdc_test,server_id=1,file=mysql-bin.000005,pos=2165,row=0,thread=14},op=c,ts_ms=1641906490308}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
同時可以看出flink對bing-log的監控和mysql-binglog一致