FlinkCDC讀取MySQL並寫入Kafka案例(com.alibaba.ververica)


場景應用:將MySQL的變化數據轉為實時流輸出到Kafka中。

 

注意版本問題,版本不同可能會出現異常,以下版本測試沒問題:

flink1.12.7

flink-connector-mysql-cdc 1.3.0(com.alibaba.ververica) (測試時使用1.2.0版本時會出現空指針錯誤)

1. MySQL的配置

 在/etc/my.cnf文件中,【mysqld】下面添加以下配置:

binlog-do-db  是指定要監控的數據庫,如果是多個數據庫,每個數據庫需要單獨一行設置。

 修改完成后,需要重啟數據庫,並檢查binlog有沒有生成。

 

補充幾個其他的配置:

1、修改配置
    [mysqld]
    # 前面還有其他配置
    # 添加的部分
    server-id = 12345
    log-bin = mysql-bin
    # 必須為ROW
    binlog_format = ROW
    # 必須為FULL,MySQL-5.7后才有該參數
    binlog_row_image  = FULL
    expire_logs_days  = 15

2、驗證
SHOW VARIABLES LIKE '%binlog%';
    
3、設置權限
    -- 設置擁有同步權限的用戶
    CREATE USER 'flinkuser' IDENTIFIED BY 'flinkpassword';
    -- 賦予同步相關權限
    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkuser';
    
    創建用戶並賦予權限成功后,使用該用戶登錄MySQL,可以使用以下命令查看主從同步相關信息
    SHOW MASTER STATUS
    SHOW SLAVE STATUS
    SHOW BINARY LOGS

 

 

 

2. FlinkCDC的開發

從這里開始建立flink工程項目,以下項目flink版本為1.12.7,scala版本用的2.12。

大概的思考步驟如下:

1) 獲取執行環境

2)開啟檢查點ck (重點)

3)通過flinkcdc構建sourceFunction,並讀取數據 (重點)

4)在執行環境中添加3)中構建的source

5)配置kafka生產者環境(重點)

6)在執行環境中增加5)中的Sink

7)啟動任務

 

項目結構(gmall-realtime)如下:

 

 

2.1 Pom文件配置

由於這是我的一個子項目,所以實際使用的時候自己修改。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>gmall-flink-2021</artifactId>
        <groupId>com.king</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>gmall-flink-cdc</artifactId>
    <version>1.0</version>

    <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <flink.version>1.12.7</flink.version>
        <scala.version>2.12</scala.version>
        <hadoop.version>3.1.3</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>
        <!--如果保存檢查點到 hdfs 上,需要引入此依賴-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.16</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.3.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.ververica/flink-connector-mysql-cdc -->
<!--        <dependency>  該包僅支持flink1.13版本及以上-->
<!--            <groupId>com.ververica</groupId>-->
<!--            <artifactId>flink-connector-mysql-cdc</artifactId>-->
<!--            <version>2.1.1</version>-->
<!--        </dependency>-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
        </dependency>
        <!--Flink 默認使用的是 slf4j 記錄日志,相當於一個日志的接口,我們這里使用 log4j 作為
        具體的日志實現-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.32</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.32</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.17.1</version>
        </dependency>
    </dependencies>


    <build>
<!--        <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>-->
<!--        <resources>-->
<!--            <resource>-->
<!--                <directory>${project.basedir}/src/main/resources</directory>-->
<!--            </resource>-->
<!--        </resources>-->
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>


</project>
pom.xml

 

 

注意一點:如果使用java開發,可以直接編譯成功。但是我這里全部使用scala開發,所以需要在pom文件配置額外的插件,否則打包scala項目會不成功。

<plugins>
            <plugin>
                <!-- !!必須有這個插件,才可以編譯scala代碼找到主類,版本我是網上搞來的 -->
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <id>compile-scala</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>test-compile-scala</id>
                        <phase>test-compile</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
plugins

 

2.2 讀取MySQL

Flinkcdc.scala中:

通過引入的flink-connector-mysql-cdc已經提供了讀取MySQL的工具類。

    val sourceFunction = MySQLSource.builder[String]()
        .hostname("hadoop200")
        .port(3306)
        .username("root")
        .password("root")
        .databaseList("gmall-210325-flink")
        //如果不添加該參數,則消費指定數據庫中所有表的數據
//如果添加,則需要按照 數據庫名.表名 的格式指定,多個表使用逗號隔開
// .tableList("gmall-210325-flink.base_trademark")
.deserializer(new CustomerDeseriallization())
new CustomerDeseriallization() 是自定義的讀取的MySQL的數據輸出格式,如果不指定,系統也有個new StringDebeziumDeserializationSchema()可以使用。

2.3 自定義從MySQL讀取的數據的輸出格式

CustomerDeseriallization類
package com.king.app.function

import com.alibaba.fastjson.JSONObject
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.util.Collector
import org.apache.kafka.connect.data.{Schema, Struct}
import org.apache.kafka.connect.source.SourceRecord


/**
 * @Author: KingWang
 * @Date: 2021/12/29  
 * @Desc:
 **/
class CustomerDeseriallization extends DebeziumDeserializationSchema[String]{

  /**
   * 封裝的數據:
   * {
   *   "database":"",
   *   "tableName":"",
   *   "type":"c r u d",
   *   "before":"",
   *   "after":"",
   *   "ts": ""
   *
   * }
   *
   * @param sourceRecord
   * @param collector
   */
  override def deserialize(sourceRecord: SourceRecord, collector: Collector[String]): Unit = {
    //1. 創建json對象用於保存最終數據
    val result = new JSONObject()


    val value:Struct = sourceRecord.value().asInstanceOf[Struct]
    //2. 獲取庫名&表名
    val source:Struct = value.getStruct("source")
    val database = source.getString("db")
    val table = source.getString("table")

    //3. 獲取before
    val before = value.getStruct("before")
    val beforeObj = if(before != null)  getJSONObjectBySchema(before.schema(),before) else new JSONObject()


    //4. 獲取after
    val after = value.getStruct("after")
    val afterObj = if(after != null) getJSONObjectBySchema(after.schema(),after) else new JSONObject()

    //5. 獲取操作類型
    val op:String = value.getString("op")

    //6. 獲取操作時間
    val ts = source.getInt64("ts_ms")
//    val ts = value.getInt64("ts_ms")


    //7. 拼接結果
    result.put("database", database)
    result.put("table", table)
    result.put("type", op)
    result.put("before", beforeObj)
    result.put("after", afterObj)
    result.put("ts", ts)

    collector.collect(result.toJSONString)

  }


  override def getProducedType: TypeInformation[String] = {
    BasicTypeInfo.STRING_TYPE_INFO
  }


  //從Schema中獲取字段和值
  def getJSONObjectBySchema(schema:Schema,struct:Struct):JSONObject = {
    val fields = schema.fields()
    var jsonBean = new JSONObject()
    val iter = fields.iterator()
    while(iter.hasNext){
      val field = iter.next()
      val key = field.name()
      val value = struct.get(field)
      jsonBean.put(key,value)
    }
    jsonBean
  }

}
CustomerDeseriallization

 

2.4 寫入到Kafka

package com.king.util

import org.apache.flink.api.common.serialization.{SerializationSchema, SimpleStringSchema}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer

/**
 * @Author: KingWang
 * @Date: 2022/1/1  
 * @Desc:
 **/
object MyKafkaUtil {

  val broker_list = "hadoop200:9092,hadoop201:9092,hadoop202:9092"

  def getKafkaProducer(topic:String):FlinkKafkaProducer[String] =
    new FlinkKafkaProducer[String](broker_list,topic,new SimpleStringSchema())


}
MyKafkaUtil

 

FlinkCDC.scala的完整代碼如下:

package com.king.app.ods

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions
import com.king.app.function.CustomerDeseriallization
import com.king.util.MyKafkaUtil
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment

/**
 * @Author: KingWang
 * @Date: 2021/12/26  
 * @Desc:
 **/
object FlinkCDC {

  def main(args: Array[String]): Unit = {

    //1. 獲取執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //1.1 開啟ck並指定狀態后端fs
//    env.setStateBackend(new FsStateBackend("hdfs://hadoop200:8020/gmall-flink-210325/ck"))
//      .enableCheckpointing(10000L) //頭尾間隔:每10秒觸發一次ck
//      env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)  //
//      env.getCheckpointConfig.setCheckpointTimeout(10000L)
//    env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
//    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(3000l)  //尾和頭間隔時間3秒

//    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L));



    //2. 通過flinkCDC構建SourceFunction並讀取數據
    val sourceFunction = MySQLSource.builder[String]()
        .hostname("hadoop200")
        .port(3306)
        .username("root")
        .password("root")
        .databaseList("gmall-210325-flink")

        //如果不添加該參數,則消費指定數據庫中所有表的數據
        //如果添加,則需要按照 數據庫名.表名 的格式指定,多個表使用逗號隔開
//        .tableList("gmall-210325-flink.base_trademark")
        .deserializer(new CustomerDeseriallization())

        //監控的方式:
      // 1. initial 初始化全表拷貝,然后再比較
      // 2. earliest 不做初始化,只從當前的
      // 3. latest  指定最新的
      // 4. specificOffset 指定offset
      // 3. timestamp 比指定的時間大的

        .startupOptions(StartupOptions.latest())
        .build()

    val  dataStream = env.addSource(sourceFunction)

    //3. sink, 寫入kafka
    dataStream.print()
    val sinkTopic = "ods_base_db"
    dataStream.addSink(MyKafkaUtil.getKafkaProducer(sinkTopic))
    //4. 啟動任務
    env.execute("flinkCDC")

  }

}
FlinkCDC.scala

 

3. 測試項目

准備好kafka,mysql,可以在本地測試。

啟動kafka消費者,topic是ods_base_db

 

 

在idea中啟動flinkcdc程序。

 

打開mysql編輯器,表base_trademark中原始記錄有12條如下:

 

 現在手工增加一條記錄,編號為13  wang

 

 查看idea控制台顯示添加消息如下:

 

 同時在Kafka消費者也看到一條記錄如下,字段type為操作類型,c表示創建

 

 

再次在MySQL中做修改和刪除操作,可以看到控制多了兩條記錄,操作類型分別為u和d,表示修改和刪除操作。

 

 

到此flinkcdc的操作基本完成。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM