場景應用:將MySQL的變化數據轉為實時流輸出到Kafka中。
注意版本問題,版本不同可能會出現異常,以下版本測試沒問題:
flink1.12.7
flink-connector-mysql-cdc 1.3.0(com.alibaba.ververica) (測試時使用1.2.0版本時會出現空指針錯誤)
1. MySQL的配置
在/etc/my.cnf文件中,【mysqld】下面添加以下配置:
binlog-do-db 是指定要監控的數據庫,如果是多個數據庫,每個數據庫需要單獨一行設置。
修改完成后,需要重啟數據庫,並檢查binlog有沒有生成。
補充幾個其他的配置:
1、修改配置 [mysqld] # 前面還有其他配置 # 添加的部分 server-id = 12345 log-bin = mysql-bin # 必須為ROW binlog_format = ROW # 必須為FULL,MySQL-5.7后才有該參數 binlog_row_image = FULL expire_logs_days = 15 2、驗證 SHOW VARIABLES LIKE '%binlog%'; 3、設置權限 -- 設置擁有同步權限的用戶 CREATE USER 'flinkuser' IDENTIFIED BY 'flinkpassword'; -- 賦予同步相關權限 GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkuser'; 創建用戶並賦予權限成功后,使用該用戶登錄MySQL,可以使用以下命令查看主從同步相關信息 SHOW MASTER STATUS SHOW SLAVE STATUS SHOW BINARY LOGS
2. FlinkCDC的開發
從這里開始建立flink工程項目,以下項目flink版本為1.12.7,scala版本用的2.12。
大概的思考步驟如下:
1) 獲取執行環境
2)開啟檢查點ck (重點)
3)通過flinkcdc構建sourceFunction,並讀取數據 (重點)
4)在執行環境中添加3)中構建的source
5)配置kafka生產者環境(重點)
6)在執行環境中增加5)中的Sink
7)啟動任務
項目結構(gmall-realtime)如下:
2.1 Pom文件配置
由於這是我的一個子項目,所以實際使用的時候自己修改。

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>gmall-flink-2021</artifactId> <groupId>com.king</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>gmall-flink-cdc</artifactId> <version>1.0</version> <properties> <java.version>1.8</java.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> <flink.version>1.12.7</flink.version> <scala.version>2.12</scala.version> <hadoop.version>3.1.3</hadoop.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.68</version> </dependency> <!--如果保存檢查點到 hdfs 上,需要引入此依賴--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.16</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.7.0</version> </dependency> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>1.3.0</version> </dependency> <!-- https://mvnrepository.com/artifact/com.ververica/flink-connector-mysql-cdc --> <!-- <dependency> 該包僅支持flink1.13版本及以上--> <!-- <groupId>com.ververica</groupId>--> <!-- <artifactId>flink-connector-mysql-cdc</artifactId>--> <!-- <version>2.1.1</version>--> <!-- </dependency>--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.20</version> </dependency> <!--Flink 默認使用的是 slf4j 記錄日志,相當於一個日志的接口,我們這里使用 log4j 作為 具體的日志實現--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.32</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.32</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-to-slf4j</artifactId> <version>2.17.1</version> </dependency> </dependencies> <build> <!-- <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>--> <!-- <resources>--> <!-- <resource>--> <!-- <directory>${project.basedir}/src/main/resources</directory>--> <!-- </resource>--> <!-- </resources>--> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
注意一點:如果使用java開發,可以直接編譯成功。但是我這里全部使用scala開發,所以需要在pom文件配置額外的插件,否則打包scala項目會不成功。

<plugins> <plugin> <!-- !!必須有這個插件,才可以編譯scala代碼找到主類,版本我是網上搞來的 --> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <id>compile-scala</id> <phase>compile</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>test-compile-scala</id> <phase>test-compile</phase> <goals> <goal>add-source</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins>
2.2 讀取MySQL
Flinkcdc.scala中:
通過引入的flink-connector-mysql-cdc已經提供了讀取MySQL的工具類。
val sourceFunction = MySQLSource.builder[String]() .hostname("hadoop200") .port(3306) .username("root") .password("root") .databaseList("gmall-210325-flink")
//如果不添加該參數,則消費指定數據庫中所有表的數據
//如果添加,則需要按照 數據庫名.表名 的格式指定,多個表使用逗號隔開
// .tableList("gmall-210325-flink.base_trademark")
.deserializer(new CustomerDeseriallization())
new CustomerDeseriallization() 是自定義的讀取的MySQL的數據輸出格式,如果不指定,系統也有個new StringDebeziumDeserializationSchema()可以使用。
2.3 自定義從MySQL讀取的數據的輸出格式
CustomerDeseriallization類

package com.king.app.function import com.alibaba.fastjson.JSONObject import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.util.Collector import org.apache.kafka.connect.data.{Schema, Struct} import org.apache.kafka.connect.source.SourceRecord /** * @Author: KingWang * @Date: 2021/12/29 * @Desc: **/ class CustomerDeseriallization extends DebeziumDeserializationSchema[String]{ /** * 封裝的數據: * { * "database":"", * "tableName":"", * "type":"c r u d", * "before":"", * "after":"", * "ts": "" * * } * * @param sourceRecord * @param collector */ override def deserialize(sourceRecord: SourceRecord, collector: Collector[String]): Unit = { //1. 創建json對象用於保存最終數據 val result = new JSONObject() val value:Struct = sourceRecord.value().asInstanceOf[Struct] //2. 獲取庫名&表名 val source:Struct = value.getStruct("source") val database = source.getString("db") val table = source.getString("table") //3. 獲取before val before = value.getStruct("before") val beforeObj = if(before != null) getJSONObjectBySchema(before.schema(),before) else new JSONObject() //4. 獲取after val after = value.getStruct("after") val afterObj = if(after != null) getJSONObjectBySchema(after.schema(),after) else new JSONObject() //5. 獲取操作類型 val op:String = value.getString("op") //6. 獲取操作時間 val ts = source.getInt64("ts_ms") // val ts = value.getInt64("ts_ms") //7. 拼接結果 result.put("database", database) result.put("table", table) result.put("type", op) result.put("before", beforeObj) result.put("after", afterObj) result.put("ts", ts) collector.collect(result.toJSONString) } override def getProducedType: TypeInformation[String] = { BasicTypeInfo.STRING_TYPE_INFO } //從Schema中獲取字段和值 def getJSONObjectBySchema(schema:Schema,struct:Struct):JSONObject = { val fields = schema.fields() var jsonBean = new JSONObject() val iter = fields.iterator() while(iter.hasNext){ val field = iter.next() val key = field.name() val value = struct.get(field) jsonBean.put(key,value) } jsonBean } }
2.4 寫入到Kafka

package com.king.util import org.apache.flink.api.common.serialization.{SerializationSchema, SimpleStringSchema} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer /** * @Author: KingWang * @Date: 2022/1/1 * @Desc: **/ object MyKafkaUtil { val broker_list = "hadoop200:9092,hadoop201:9092,hadoop202:9092" def getKafkaProducer(topic:String):FlinkKafkaProducer[String] = new FlinkKafkaProducer[String](broker_list,topic,new SimpleStringSchema()) }
FlinkCDC.scala的完整代碼如下:

package com.king.app.ods import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions import com.king.app.function.CustomerDeseriallization import com.king.util.MyKafkaUtil import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment /** * @Author: KingWang * @Date: 2021/12/26 * @Desc: **/ object FlinkCDC { def main(args: Array[String]): Unit = { //1. 獲取執行環境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //1.1 開啟ck並指定狀態后端fs // env.setStateBackend(new FsStateBackend("hdfs://hadoop200:8020/gmall-flink-210325/ck")) // .enableCheckpointing(10000L) //頭尾間隔:每10秒觸發一次ck // env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // // env.getCheckpointConfig.setCheckpointTimeout(10000L) // env.getCheckpointConfig.setMaxConcurrentCheckpoints(2) // env.getCheckpointConfig.setMinPauseBetweenCheckpoints(3000l) //尾和頭間隔時間3秒 // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L)); //2. 通過flinkCDC構建SourceFunction並讀取數據 val sourceFunction = MySQLSource.builder[String]() .hostname("hadoop200") .port(3306) .username("root") .password("root") .databaseList("gmall-210325-flink") //如果不添加該參數,則消費指定數據庫中所有表的數據 //如果添加,則需要按照 數據庫名.表名 的格式指定,多個表使用逗號隔開 // .tableList("gmall-210325-flink.base_trademark") .deserializer(new CustomerDeseriallization()) //監控的方式: // 1. initial 初始化全表拷貝,然后再比較 // 2. earliest 不做初始化,只從當前的 // 3. latest 指定最新的 // 4. specificOffset 指定offset // 3. timestamp 比指定的時間大的 .startupOptions(StartupOptions.latest()) .build() val dataStream = env.addSource(sourceFunction) //3. sink, 寫入kafka dataStream.print() val sinkTopic = "ods_base_db" dataStream.addSink(MyKafkaUtil.getKafkaProducer(sinkTopic)) //4. 啟動任務 env.execute("flinkCDC") } }
3. 測試項目
准備好kafka,mysql,可以在本地測試。
啟動kafka消費者,topic是ods_base_db
在idea中啟動flinkcdc程序。
打開mysql編輯器,表base_trademark中原始記錄有12條如下:
現在手工增加一條記錄,編號為13 wang
查看idea控制台顯示添加消息如下:
同時在Kafka消費者也看到一條記錄如下,字段type為操作類型,c表示創建
再次在MySQL中做修改和刪除操作,可以看到控制多了兩條記錄,操作類型分別為u和d,表示修改和刪除操作。
到此flinkcdc的操作基本完成。