前言:
CDC,Change Data Capture,變更數據獲取的簡稱,使用CDC我們可以從數據庫中獲取已提交的更改並將這些更改發送到下游,供下游使用。這些變更可以包括INSERT,DELETE,UPDATE等,
用戶可以在以下的場景下使用CDC:
使用flink sql進行數據同步,可以將數據從一個數據同步到其他的地方,比如mysql、elasticsearch等。
可以在源數據庫上實時的物化一個聚合視圖
因為只是增量同步,所以可以實時的低延遲的同步數據
使用EventTime join 一個temporal表以便可以獲取准確的結果
flink 1.11 將這些changelog提取並轉化為table apa和sql,目前支持兩種格式:Debezium和Canal,這就意味着源表不僅僅是append操作,而且還有upsert、delete操作。
一 創建項目


二 導入pom文件
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.4.1</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<mainClass>com.lexue.gmall_logger.GmallLoggerApplication</mainClass>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
三 創建LoggerController.class
package com.lexue.gmall_logger.controller; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @Slf4j public class LoggerController { @Autowired private KafkaTemplate<String,String> kafkaTemplate; @RequestMapping("applog") public String getLogger(@RequestParam("param") String logStr){ //落盤 log.info(logStr); //寫入Kafka kafkaTemplate.send("ods_base_log",logStr); return "success"; }
//本地瀏覽器測試使用 @RequestMapping("test1") public String getLogger(@RequestParam("name") String name, @RequestParam("age") int age){ System.out.println(name + ":" + age); return "success"; } }
四 在resources目錄下創建logback.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration> <property name="LOG_HOME" value="/opt/module/lxz_file/logs" /> <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%msg%n</pattern> </encoder> </appender> <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${LOG_HOME}/app.log</file> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern> </rollingPolicy> <encoder> <pattern>%msg%n</pattern> </encoder> </appender> <!-- 將某一個包下日志單獨打印日志 這里需要根據讀者實際類名填寫 --> <logger name="com.lexue.gmall_logger.controller.LoggerController" level="INFO" additivity="false"> <appender-ref ref="rollingFile" /> <appender-ref ref="console" /> </logger> <root level="error" additivity="false"> <appender-ref ref="console" /> </root> </configuration>
五 在reousces創建application.properties文件
# 項目名稱 spring.application.name=gmall-logger # 指定使用的端口號 server.port=8081 #============== kafka =================== # 指定Kafka 代理地址,可以多個 spring.kafka.bootstrap-servers=hadoop1:9092 # 指定消息key和消息體的編解碼方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
注意:項目中的test目錄下有個test.class,刪除此文件,不然會報錯.
六 測試
1.本地IDEA開啟客戶端
2.服務器開啟zk,kafka集群

3.開啟Kafka消費者
bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic ods_base_log
4.發送數據查看Kafka消費者是否消費

數據正在發送,查看Kafka是否成功消費到

Kafka成功消費
去IDEA看看客戶端是否有數據進來

客戶端成功抓取到數據
此時,生產數據已經可以成功通過FinkCDC程序順利寫入Kafka主題ods_base_log中.
