前言:
CDC,Change Data Capture,变更数据获取的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等,
用户可以在以下的场景下使用CDC:
使用flink sql进行数据同步,可以将数据从一个数据同步到其他的地方,比如mysql、elasticsearch等。
可以在源数据库上实时的物化一个聚合视图
因为只是增量同步,所以可以实时的低延迟的同步数据
使用EventTime join 一个temporal表以便可以获取准确的结果
flink 1.11 将这些changelog提取并转化为table apa和sql,目前支持两种格式:Debezium和Canal,这就意味着源表不仅仅是append操作,而且还有upsert、delete操作。
一 创建项目


二 导入pom文件
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.4.1</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<mainClass>com.lexue.gmall_logger.GmallLoggerApplication</mainClass>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
三 创建LoggerController.class
package com.lexue.gmall_logger.controller; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @Slf4j public class LoggerController { @Autowired private KafkaTemplate<String,String> kafkaTemplate; @RequestMapping("applog") public String getLogger(@RequestParam("param") String logStr){ //落盘 log.info(logStr); //写入Kafka kafkaTemplate.send("ods_base_log",logStr); return "success"; }
//本地浏览器测试使用 @RequestMapping("test1") public String getLogger(@RequestParam("name") String name, @RequestParam("age") int age){ System.out.println(name + ":" + age); return "success"; } }
四 在resources目录下创建logback.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration> <property name="LOG_HOME" value="/opt/module/lxz_file/logs" /> <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%msg%n</pattern> </encoder> </appender> <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${LOG_HOME}/app.log</file> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern> </rollingPolicy> <encoder> <pattern>%msg%n</pattern> </encoder> </appender> <!-- 将某一个包下日志单独打印日志 这里需要根据读者实际类名填写 --> <logger name="com.lexue.gmall_logger.controller.LoggerController" level="INFO" additivity="false"> <appender-ref ref="rollingFile" /> <appender-ref ref="console" /> </logger> <root level="error" additivity="false"> <appender-ref ref="console" /> </root> </configuration>
五 在reousces创建application.properties文件
# 项目名称 spring.application.name=gmall-logger # 指定使用的端口号 server.port=8081 #============== kafka =================== # 指定Kafka 代理地址,可以多个 spring.kafka.bootstrap-servers=hadoop1:9092 # 指定消息key和消息体的编解码方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
注意:项目中的test目录下有个test.class,删除此文件,不然会报错.
六 测试
1.本地IDEA开启客户端
2.服务器开启zk,kafka集群

3.开启Kafka消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic ods_base_log
4.发送数据查看Kafka消费者是否消费

数据正在发送,查看Kafka是否成功消费到

Kafka成功消费
去IDEA看看客户端是否有数据进来

客户端成功抓取到数据
此时,生产数据已经可以成功通过FinkCDC程序顺利写入Kafka主题ods_base_log中.
