Flink-CDC實踐


CDC介紹

CDC 是 Change Data Capture(變更數據獲取)的簡稱。核心思想是,監測並捕獲數據庫的變動(包括數據或數據表的插入、更新以及刪除等),將這些變更按發生的順序完整記錄下來,寫入到消息中間件中以供其他服務進行訂閱及消費。

CDC種類
基於查詢的CDC

例如:Sqoop、JDBC source等產品。
特點:基於批處理,不能捕獲到所有數據的變化、高延遲、需要查詢數據庫,會增加數據庫壓力

基於binlog的CDC

例如:Maxwell、Canal、Debezium
特點:基於streaming模式、能捕捉所有數據的變化、低延遲、不會增加數據庫壓力。

Flink 社區開發了flink-cdc-connectors組件,這是一個可以直接從MySQL、PostgreSQL
等數據庫直接讀取全量數據和增量變更數據的source組件。目前已開源。
開源地址:https://github.com/ververica/flink-cdc-connectors

1.開啟mysql binlog
查看mysql-binlog狀態並開啟mysql-binlog

上圖是開始的狀態。如果沒有開始,則log_bin=off,log_bin_basename和log_bin_index值為空。開啟方式如下:

vim vim /etc/my.cnf

在添加以下信息

#開啟binglog
server-id=1
log-bin=/var/lib/mysql/mysql-bin

server-id表示單個結點的id,這里由於只有一個結點,所以可以把id隨機指定為一個數,這里將id設置成1。若集群中有多個結點,則id不能相同
第二句是指定binlog日志文件的名字為mysql-bin,以及其存儲路徑。
添加完成后保存退出。

重啟mysql服務
service mysqld restart
查看binlog

2.建立mysql測試表並初始化數據

導入jar包

	<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.2.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>

編寫測試類

package com.meijs;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkCDC {
    public static void main(String args[]) throws Exception {
        //獲取執行環境
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);

        DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("192.168.154.130")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("test")
                .tableList("test.flink_cdc_test")//監控對應的表,如果沒有該參數,則是監控全表
                .deserializer(new StringDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())//initial對監控的表做一個初始化快照,earliest,latest等參數與kafka的的offset類似
                .build();
        DataStreamSource<String> streamSource = executionEnvironment.addSource(sourceFunction);

        streamSource.print();

        executionEnvironment.execute("FlinkCDC");
    }
}
初始化執行后的打印結果如下:
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000005, pos=1353, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{after=Struct{id=1,name=小米,log_url=www.xiaomi.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=test,table=flink_cdc_test,server_id=0,file=mysql-bin.000005,pos=1353,row=0},op=c,ts_ms=1641905845597}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000005, pos=1353, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{after=Struct{id=2,name=華為,log_url=www.huawei.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=test,table=flink_cdc_test,server_id=0,file=mysql-bin.000005,pos=1353,row=0},op=c,ts_ms=1641905845602}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000005, pos=1353, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{after=Struct{id=3,name=蘋果,log_url=www.pingguo.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=test,table=flink_cdc_test,server_id=0,file=mysql-bin.000005,pos=1353,row=0},op=c,ts_ms=1641905845602}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000005, pos=1353}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=4}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{after=Struct{id=4,name=歐派,log_url=www.oppo.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=last,db=test,table=flink_cdc_test,server_id=0,file=mysql-bin.000005,pos=1353,row=0},op=c,ts_ms=1641905845602}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

op=c代表是創建,after為啟動后當前的數據狀態

更新一條數據觀察打印結果

打印日志如下

SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1641906085, file=mysql-bin.000005, pos=1418, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=4}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{before=Struct{id=4,name=歐派,log_url=www.oppo.com},after=Struct{id=4,name=oppo,log_url=www.oppo.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1641906085000,db=test,table=flink_cdc_test,server_id=1,file=mysql-bin.000005,pos=1553,row=0,thread=14},op=u,ts_ms=1641906085304}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

op=u代表為update,before為修改更新前的數據,after更新后的數據狀態

刪除一條數據觀察打印結果

SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1641906292, file=mysql-bin.000005, pos=1735, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{before=Struct{id=3,name=蘋果,log_url=www.pingguo.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1641906292000,db=test,table=flink_cdc_test,server_id=1,file=mysql-bin.000005,pos=1870,row=0,thread=14},op=d,ts_ms=1641906292636}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

op=d代表為delete,before為修改更新前的數據,可以看到沒after

在開啟狀態上增加一條數據觀察打印結果

SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1641906490, file=mysql-bin.000005, pos=2030, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=6}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{after=Struct{id=6,name=kupai,log_url=www.kupai.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1641906490000,db=test,table=flink_cdc_test,server_id=1,file=mysql-bin.000005,pos=2165,row=0,thread=14},op=c,ts_ms=1641906490308}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

同時可以看出flink對bing-log的監控和mysql-binglog一致


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM