FlinkCDC
1.簡介
CDC是Change Data Capture(變更數據獲取)的簡稱。核心思想是,監測並捕獲數據庫的變動(包括數據或數據表的插入、更新以及刪除等),將這些變更按發生的順序完整記錄下來,寫入到消息中間件中以供其他服務進行訂閱及消費。
CDC種類
CDC主要分為基於查詢和基於Binlog兩種方式,我們主要了解一下這兩種之間的區別:
基於查詢的CDC | 基於Binlog的CDC | |
---|---|---|
開源產品 | Sqoop、Kafka JDBC Source | Canal、Maxwell、Debezium |
執行模式 | Batch | Streaming |
是否可以捕獲所有數據變化 | 否 | 是 |
延遲性 | 高延遲 | 低延遲 |
是否增加數據庫壓力 | 是 | 否 |
支持的數據庫
Flink社區開發了 flink-cdc-connectors 組件,這是一個可以直接從 MySQL、PostgreSQL
等數據庫直接讀取全量數據和增量變更數據的 source 組件。目前也已開源,開源地址:https://github.com/ververica/flink-cdc-connectors。
2.依賴
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
</dependencies>
3.flink stream api
package com.flink.day07_flink_cdc;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
/**
* @description: TODO Flink CDC stream api
* @author: HaoWu
* @create: 2021年05月21日
*/
public class Flink01_Flink_CDC_Streaming {
public static void main(String[] args) throws Exception {
//1.TODO 創建執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.TODO Flink-CDC將讀取binlog的位置信息以狀態的方式保存在CK,如果想要做到斷點續傳,需要從Checkpoint或者Savepoint啟動程序
//2.1 開啟Checkpoint,每隔5秒鍾做一次CK
env.enableCheckpointing(5000L);
//2.2 指定CK的一致性語義
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//2.3 設置任務關閉的時候保留最后一次CK數據
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//2.4 指定從CK自動重啟策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
//2.5 設置狀態后端
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:9000/flinkCDC"));
//2.6 設置訪問HDFS的用戶名
System.setProperty("HADOOP_USER_NAME", "atguigu");
//3.創建Flink-MySQL-CDC的Source
Properties properties = new Properties();
//initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
//latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.
//timestamp: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp. The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.
//specific-offset: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset.
properties.setProperty("scan.startup.mode", "initial");
DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
.hostname("hadoop102")
.port(3306)
.username("root")
.password("root")
.databaseList("gmall-flink-200821")
.tableList("gmall-flink-200821.z_user_info") //可選配置項,如果不指定該參數,則會讀取上一個配置下的所有表的數據,注意:指定的時候需要使用"db.table"的方式
.debeziumProperties(properties)
.deserializer(new StringDebeziumDeserializationSchema())
.build();
//4.使用CDC Source從MySQL讀取數據
DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
//5.打印數據
mysqlDS.print();
//6.執行任務
env.execute();
}
}
4.flink sql
package com.flink.day07_flink_cdc;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @description: TODO FLink CDC SQL
* @author: HaoWu
* @create: 2021年05月24日
*/
public class Flink02_Flink_CDC_Sql {
public static void main(String[] args) throws Exception {
//1.創建flink sql執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2.創建FLink-Mysql-CDC 的Source
String sourceSql= "create table user_info(\n" +
" id int,\n" +
" name string,\n" +
" age int\n" +
")\n" +
"with(\n" +
" 'connector'='mysql-cdc',\n" +
" 'hostname'='hadoop102',\n" +
" 'port'='3306',\n" +
" 'username'='root',\n" +
" 'password'='root',\n" +
" 'password'='root',\n" +
" 'database-name'='gmall-flink-200821',\n" +
" 'table-name'='z_user_info'\n" +
")";
tableEnv.executeSql(sourceSql);
tableEnv.executeSql("select * from user_info").print();
//3.執行程序
env.execute();
}
}
5.自定義反序列化器
參考:https://blog.csdn.net/qq_31866793/article/details/109207663
package com.flink.day07_flink_cdc;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
import java.util.Properties;
/**
* @description: TODO Flink-CDC 自定義反序列化
* @author: HaoWu
* @create: 2021年05月24日
*/
public class Flink03_Flink_CDC_CustomerSchema {
public static void main(String[] args) throws Exception {
//1.創建執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.創建 Flink-Mysql-CDC 的Source
Properties properties = new Properties();
//initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
//latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.
//timestamp: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp. The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.
//specific-offset: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset
properties.setProperty("debezium.snapshot.mode", "initial");
DebeziumSourceFunction<JSONObject> mysqlSource = MySQLSource
.<JSONObject>builder()
.hostname("hadoop102")
.port(3306)
.username("root")
.password("root")
.databaseList("gmall-flink-200821")
.tableList("gmall-flink-200821.z_user_info") //可選配置項,如果不指定該參數,則會讀取上一個配置下的所有表的數據,注意:指定的時候需要使用"db.table"的方式
.debeziumProperties(properties)
.deserializer(new CdcDwdDeserializationSchema()) //自定義類解析cdc數據格式
.build();
//3.使用CDC Source從MySQL讀取數據
DataStreamSource<JSONObject> mysqlDS = env.addSource(mysqlSource);
//4.打印數據
mysqlDS.print();
//5.執行任務
env.execute();
}
}
/**
* 自定義類,解析一下cdc的格式:test不支持delete
*/
class MyCustomSchema implements DebeziumDeserializationSchema<String>{//自定義數據解析器
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//獲取主題信息,包含着數據庫和表名 mysql_binlog_source.gmall-flink-200821.z_user_info
String topic = sourceRecord.topic();
String[] arr = topic.split("\\.");
String db = arr[1];
String tableName = arr[2];
//獲取操作類型 READ DELETE UPDATE CREATE ,注:導debezium.data包
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
//獲取值信息並轉換為Struct類型,注:導kafka的包
Struct value = (Struct) sourceRecord.value();
//獲取變化后的數據 ,注:導kafka的包
Struct after = value.getStruct("after");
//創建JSON對象用於存儲數據信息
JSONObject data = new JSONObject();
for (Field field : after.schema().fields()) {
Object o = after.get(field);
data.put(field.name(), o);
}
//創建JSON對象用於封裝最終返回值數據信息
JSONObject result = new JSONObject();
result.put("operation", operation.toString().toLowerCase());
result.put("data", data);
result.put("database", db);
result.put("table", tableName);
//發送數據至下游
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}
/**
* 自定義類,解析一下cdc的格式,支持所有操作
*/
class CdcDwdDeserializationSchema implements DebeziumDeserializationSchema<JSONObject> {
private static final long serialVersionUID = -3168848963265670603L;
public CdcDwdDeserializationSchema() {
}
@Override
public void deserialize(SourceRecord record, Collector<JSONObject> out) throws Exception {
Struct dataRecord = (Struct)record.value();
Struct afterStruct = dataRecord.getStruct("after");
Struct beforeStruct = dataRecord.getStruct("before");
/*
todo 1,同時存在 beforeStruct 跟 afterStruct數據的話,就代表是update的數據
2,只存在 beforeStruct 就是delete數據
3,只存在 afterStruct數據 就是insert數據
*/
JSONObject logJson = new JSONObject();
String canal_type = "";
List<Field> fieldsList = null;
if(afterStruct !=null && beforeStruct !=null){
System.out.println("這是修改數據");
canal_type = "update";
fieldsList = afterStruct.schema().fields();
//todo 字段與值
for (Field field : fieldsList) {
String fieldName = field.name();
Object fieldValue = afterStruct.get(fieldName);
// System.out.println("*****fieldName=" + fieldName+",fieldValue="+fieldValue);
logJson.put(fieldName,fieldValue);
}
}else if (afterStruct !=null){
System.out.println( "這是新增數據");
canal_type = "insert";
fieldsList = afterStruct.schema().fields();
//todo 字段與值
for (Field field : fieldsList) {
String fieldName = field.name();
Object fieldValue = afterStruct.get(fieldName);
// System.out.println("*****fieldName=" + fieldName+",fieldValue="+fieldValue);
logJson.put(fieldName,fieldValue);
}
}else if (beforeStruct !=null){
System.out.println( "這是刪除數據");
canal_type = "detele";
fieldsList = beforeStruct.schema().fields();
//todo 字段與值
for (Field field : fieldsList) {
String fieldName = field.name();
Object fieldValue = beforeStruct.get(fieldName);
// System.out.println("*****fieldName=" + fieldName+",fieldValue="+fieldValue);
logJson.put(fieldName,fieldValue);
}
}else {
System.out.println("一臉蒙蔽了");
}
//todo 拿到databases table信息
Struct source = dataRecord.getStruct("source");
Object db = source.get("db");
Object table = source.get("table");
Object ts_ms = source.get("ts_ms");
logJson.put("canal_database",db);
logJson.put("canal_database",table);
logJson.put("canal_ts",ts_ms);
logJson.put("canal_type",canal_type);
//todo 拿到topic
String topic = record.topic();
System.out.println("topic = " + topic);
//todo 主鍵字段
Struct pk = (Struct)record.key();
List<Field> pkFieldList = pk.schema().fields();
int partitionerNum = 0 ;
for (Field field : pkFieldList) {
Object pkValue= pk.get(field.name());
partitionerNum += pkValue.hashCode();
}
int hash = Math.abs(partitionerNum) % 3;
logJson.put("pk_hashcode",hash);
out.collect(logJson);
}
@Override
public TypeInformation<JSONObject> getProducedType() {
return BasicTypeInfo.of(JSONObject.class);
}
}
6.打包測試
1)打包並上傳至Linux
2)開啟MySQL Binlog並重啟MySQL
3)啟動Flink集群
bin/start-cluster.sh
4)啟動HDFS集群
start-dfs.sh
5)啟動程序
bin/flink run -c com.hadoop.FlinkCDC flink-200821-1.0-SNAPSHOT-jar-with-dependencies.jar
6)在MySQL的gmall-flink-200821.z_user_info表中添加、修改或者刪除數據
7)給當前的Flink程序創建Savepoint
bin/flink savepoint JobId hdfs://hadoop102:8020/flink/save
8)關閉程序后,從savepoint重啟程序
bin/flink run -s hdfs://hadoop102:8020/flink/save/... -c com.hadoop.FlinkCDC flink-200821-1.0-SNAPSHOT-jar-with-dependencies.jar