FlinkCDC-Springboot拉取數據寫入Kafka


前言:

CDC,Change Data Capture,變更數據獲取的簡稱,使用CDC我們可以從數據庫中獲取已提交的更改並將這些更改發送到下游,供下游使用。這些變更可以包括INSERT,DELETE,UPDATE等,

用戶可以在以下的場景下使用CDC:

使用flink sql進行數據同步,可以將數據從一個數據同步到其他的地方,比如mysql、elasticsearch等。
可以在源數據庫上實時的物化一個聚合視圖
因為只是增量同步,所以可以實時的低延遲的同步數據
使用EventTime join 一個temporal表以便可以獲取准確的結果
flink 1.11 將這些changelog提取並轉化為table apa和sql,目前支持兩種格式:Debezium和Canal,這就意味着源表不僅僅是append操作,而且還有upsert、delete操作。

 

 

 

一 創建項目

 

 

 

 

 

 

二 導入pom文件

<properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.4.1</spring-boot.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.4.1</version>
                <configuration>
                    <mainClass>com.lexue.gmall_logger.GmallLoggerApplication</mainClass>
                </configuration>
                <executions>
                    <execution>
                        <id>repackage</id>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

三 創建LoggerController.class

package com.lexue.gmall_logger.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@Slf4j
public class LoggerController {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @RequestMapping("applog")
    public String getLogger(@RequestParam("param") String logStr){
        //落盤
        log.info(logStr);

        //寫入Kafka
        kafkaTemplate.send("ods_base_log",logStr);

        return "success";
    }

//本地瀏覽器測試使用 @RequestMapping(
"test1") public String getLogger(@RequestParam("name") String name, @RequestParam("age") int age){ System.out.println(name + ":" + age); return "success"; } }

四 在resources目錄下創建logback.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property name="LOG_HOME" value="/opt/module/lxz_file/logs" />
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_HOME}/app.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern>
        </rollingPolicy>
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <!-- 將某一個包下日志單獨打印日志 這里需要根據讀者實際類名填寫 -->
    <logger name="com.lexue.gmall_logger.controller.LoggerController"
            level="INFO" additivity="false">
        <appender-ref ref="rollingFile" />
        <appender-ref ref="console" />
    </logger>

    <root level="error" additivity="false">
        <appender-ref ref="console" />
    </root>
</configuration>

五 在reousces創建application.properties文件

# 項目名稱
spring.application.name=gmall-logger

# 指定使用的端口號
server.port=8081

#============== kafka ===================
# 指定Kafka 代理地址,可以多個
spring.kafka.bootstrap-servers=hadoop1:9092

# 指定消息key和消息體的編解碼方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

注意:項目中的test目錄下有個test.class,刪除此文件,不然會報錯.

六 測試

1.本地IDEA開啟客戶端

2.服務器開啟zk,kafka集群

 

 3.開啟Kafka消費者

 bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic ods_base_log

4.發送數據查看Kafka消費者是否消費

 

 數據正在發送,查看Kafka是否成功消費到

 

 Kafka成功消費

去IDEA看看客戶端是否有數據進來

客戶端成功抓取到數據

 

 

 此時,生產數據已經可以成功通過FinkCDC程序順利寫入Kafka主題ods_base_log中.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM