配置Debezium Connector for Oracle


配置Debezium Connector for Oracle

一、踩坑收藏

環境

  1. 操作系統:centos7.9
  2. oracle版本:oracle-database-ee-19c-1.0-1.x86_64.rpm
  3. zookeeper版本:apache-zookeeper-3.7.0-bin.tar.gz
  4. kafka版本:kafka_2.12-2.7.0.tgz

參考文章

  1. Debezium Connector for Oracle :: Debezium Documentation
  2. Apache Kafka
  3. 實時監視同步數據庫變更,這個框架真是神器
  4. Introduction to Debezium | Baeldung
  5. debeziumEmbedded: 自己編寫的使用 debezium 訪問數據庫
  6. docker安裝oracle19c
  7. oracle 12c的PDB數據庫未打開
  8. oracle的補全日志--Supplemental Logging
  9. oracle 歸檔日志模式和非歸檔日志模式
  10. Oracle數據庫的非歸檔模式遷移到歸檔模式
  11. Oracle登錄 ORA-01033: ORACLE正在初始化或關閉的解決方法
  12. Debezium 從oracle抓取數據到kafka_
  13. Kafka Connect
  14. kafka connect簡介以及部署
  15. 關鍵字: oracle lrm-00109: could not open parameter file '/opt/oracle - adodo1
  16. Kafka使用Debezium實時同步Oracle數據 | BlackC
  17. ORA-00942: 表或視圖不存在解決方法
  18. oracle - Maven including ocijdbc19 in java.library.path - Stack Overflow
  19. JDBC驅動oci和thin區別
  20. Error while fetching metadata with correlation id : {LEADER_NOT_AVAILABLE} 正確處理姿勢

二、監控Oracle

Debezium提供了兩種監控數據庫的方式,對應了oracle的兩種連接方式。

  • LogMiner:本質是jdbc thin driver,純Java開發,與平台無關。
  • XStream API:本質是jdbc oci driver,通過調用oci客戶端c動態庫實現。

引用官方描述

The JDBC Thin driver is a pure Java, Type IV driver that can be used in applications and applets. It is platform-independent and does not require any additional Oracle software on the client-side. The JDBC Thin driver communicates with the server using SQL*Net to access Oracle Database.

The JDBC Thin driver allows a direct connection to the database by providing an implementation of SQL*Net on top of Java sockets. The driver supports the TCP/IP protocol and requires a TNS listener on the TCP/IP sockets on the database server.

The JDBC OCI driver is a Type II driver used with Java applications. It requires an Oracle client installation and, therefore, is Oracle platform-specific. It supports all installed Oracle Net adapters, including interprocess communication (IPC), named pipes, TCP/IP, and Internetwork Packet Exchange/Sequenced Packet Exchange (IPX/SPX).

The JDBC OCI driver, written in a combination of Java and C, converts JDBC invocations to calls to OCI, using native methods to call C-entry points. These calls communicate with the database using SQL*Net.

The JDBC OCI driver uses the OCI libraries, C-entry points, Oracle Net, core libraries, and other necessary files on the client computer where it is installed.

下面的步驟基於已經安裝好oracle 19c,可以參考Centos8安裝Oracle19c

2.1 LogMiner

切換到用戶oracle

su - oracle

連接oracle,修改sys密碼,這是為了跟debezium上的語句對應,可以拿來就用。

sqlplus / as sysdba
connect / as sysdba
alter user sys identified by top_secret;
exit;

數據庫開啟歸檔模式

sqlplus / as sysdba
connect sys/top_secret AS SYSDBA
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
-- Should now "Database log mode: Archive Mode"
archive log list

exit;

切換到root用戶,創建db_recovery_file_dest文件夾,並賦予權限后,再切換回oracle用戶

su root
mkdir /opt/oracle/oradata/recovery_area
chmod 777 /opt/oracle/oradata/recovery_area
su oracle

7表示r(讀)、w(寫)、x(執行)權限

777表示給文件擁有者、同組用戶、其他組用戶都分配rwx權限

在數據庫級別啟用最小補充日志記錄,並且可以按如下方式配置。

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

ALTER DATABASE DROP SUPPLEMENTAL LOG DATA;

如果只是想給某個表(比如stuinfo),開啟最小日志記錄,參考下面。

更改不成功並且表存在時,就先select * from C##TEST.STUINFO,如果提示沒有表,就換個方式select * from C##TEST."STUINFO"

ALTER TABLE C##TEST.STUINFO ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

創建用戶並分配權限

sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
  CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//localhost:1521/ORCLPDB1 as sysdba
  CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba

  CREATE USER c##dbzuser IDENTIFIED BY dbz
    DEFAULT TABLESPACE logminer_tbs
    QUOTA UNLIMITED ON logminer_tbs
    CONTAINER=ALL;

  GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;
  GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;
  GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL;
  GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;
  GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL;
  GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
  GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
  GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL;
  GRANT LOGMINING TO c##dbzuser CONTAINER=ALL;

  GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL;
  GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL;
  GRANT ALTER ANY TABLE TO c##dbzuser CONTAINER=ALL;
  GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL;

  GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL;
  GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL;

  GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL;
  GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL;
  GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL;

  exit;

創建表,並開啟最小日志

sqlplus / as sysdba
conn c##dbzuser/dbz;
CREATE TABLE STU ( "s_id" INT PRIMARY KEY, "s_name" VARCHAR ( 255 ) );
ALTER TABLE C##DBZUSER.STU ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
exit;

經過上面的步驟,接下來,就可以通過java api或者kafka-connector方式來監控數據庫。相對來說,直接通過java api會方便許多。

java API

創建SpringBoot項目

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-api</artifactId>
            <version>1.6.2.Final</version>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-embedded</artifactId>
            <version>1.6.2.Final</version>
        </dependency>

        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-connector-mysql</artifactId>
            <version>1.6.2.Final</version>
        </dependency>

        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-connector-oracle</artifactId>
            <version>1.6.2.Final</version>
        </dependency>

        <dependency>
            <groupId>com.oracle.ojdbc</groupId>
            <artifactId>ojdbc8</artifactId>
            <version>19.3.0.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

resources下面創建logback.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="true">

    <appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
        <Target>System.out</Target>
        <encoder>
            <pattern>%-5p [%d][%mdc{mdc_userId}] %C:%L - %m %n</pattern>
            <charset>utf-8</charset>
        </encoder>
        <!-- 此日志appender是為開發使用,只配置最底級別,控制台輸出的日志級別是大於或等於此級別的日志信息 -->
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>INFO</level>
        </filter>
    </appender>
    <root level="info">
        <!-- 生產環境將請stdout去掉 -->
        <appender-ref ref="stdout"/>
    </root>
</configuration>

創建OracleDebezium_19c類

import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.relational.history.FileDatabaseHistory;

import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class OracleDebezium_19c {

    public static void main(String[] args) {

        // 1. 生成配置
        Properties props = genProps();

        // 2. 業務處理邏輯部分代碼
        DebeziumEngine<ChangeEvent<String, String>> engine = engineBuild(props);

        // 3. 正式運行
        runSoftware(engine);

    }

    // 生成連接 Oracle 的相關配置
    private static Properties genProps() {
        // 配置
        Properties props = new Properties();

        props.setProperty("name", "oracle-engine-0033");
        props.setProperty("connector.class", "io.debezium.connector.oracle.OracleConnector");
        props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
        // 指定 offset 存儲目錄
        props.setProperty("offset.storage.file.filename", "D:\\temp\\oracle4.txt");
        // 指定 Topic offset 寫入磁盤的間隔時間
        props.setProperty("offset.flush.interval.ms", "6000");
        //設置數據庫連接信息
        props.setProperty("database.hostname", "192.168.10.132");
        props.setProperty("database.port", "1521");
        props.setProperty("database.user", "C##DBZUSER");
        props.setProperty("database.password", "dbz");
        props.setProperty("database.server.id", "85701");
        props.setProperty("table.include.list", "C##DBZUSER.STU");
        props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName());
        props.setProperty("database.history.file.filename", "D:\\temp\\oracle4.txt");
        //每次運行需要對此參數進行修改,因為此參數唯一
        props.setProperty("database.server.name", "my-oracle-connector-0023");
        //指定 CDB 模式的實例名
        props.setProperty("database.dbname", "ORCLCDB");
        //是否輸出 schema 信息
        props.setProperty("key.converter.schemas.enable", "false");
        props.setProperty("value.converter.schemas.enable", "false");
        props.setProperty("database.serverTimezone", "UTC"); // 時區
        props.setProperty("database.connection.adapter", "logminer"); // 模式
        // Kafka 連接相關配置
        /*props.setProperty("database.history.kafka.bootstrap.servers", "192.168.131.130:9092");
        props.setProperty("database.history.kafka.topic", "oracle.history");*/

        return props;
    }

    // 開始運行程序
    public static void runSoftware(DebeziumEngine<ChangeEvent<String, String>> engine) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(engine);
    }

    // 實現邏輯
    public static DebeziumEngine<ChangeEvent<String, String>> engineBuild(Properties props) {

        // 2. 構建 DebeziumEngine
        // 使用 Json 格式
        DebeziumEngine<ChangeEvent<String, String>> engine =
                DebeziumEngine
                        .create(Json.class)
                        .using(props)
                        .notifying(record -> {
                            // record中會有操作的類型(增、刪、改)和具體的數據
                            System.out.println("record.key() = " + record.key());
                            System.out.println("record.value() = " + record.value());
                        })
                        .using((success, message, error) -> {
                            // 強烈建議加上此部分的回調代碼,方便查看錯誤信息
                            if (!success && error != null) {
                                // 報錯回調
                                System.out.println("----------error------");
                                System.out.println(message);
                                //System.out.println(error);
                                error.printStackTrace();
                            }
                        })
                        .build();

        return engine;
    }

}

啟動項目,執行完step六步之后,如果沒有報錯,說明啟動成功。

進入數據庫,對進行監控的表進行添加一條數據。

會出現下面的日志。說明監控成功。

kafka-connector

使用到的java、zookeeper、kafka解壓到/opt/module下,java需要配置環境變量

Central Repository: io/debezium/debezium-connector-oracle下載你需要的版本的plugin,如debezium-connector-oracle-1.6.2.Final-plugin.tar.gz

創建文件夾,存放kafka-connector-plugin

mkdir /opt/kafka-plugin

解壓下載的plugin,將里面的內容全部拷貝,復制到kafka-plugin一份、kafka的libs一份,如下圖

Oracle Instant Client Downloads下載對應操作系統的Basic Package (ZIP)。

將其解壓,提取其中的ojdbc8.jarkafka的libs中。

配置kafka-connector

cd /opt/module/kafka_2.12-2.7.0/
vi config/connect-distributed.properties 

添加plugin.path為剛才配置好的kafka-plugin,保存。

plugin.path=/opt/kafka-plugin

如此,就配置好了。

進入zookeeper路徑,復制一份zookeeper配置文件出來,啟動zookeeper

cp conf/zoo_sample.cfg conf/zoo.conf
bin/
bin/zkServer.sh start

進入kafka路徑,先啟動kafka,啟動成功后,再去啟動kafka-connect

bin/kafka-server-start.sh config/server.properties
bin/connect-distributed.sh config/connect-distributed.properties

打開瀏覽器/postman,get訪問8083端口,會出現版本信息

通過post訪問,ip:8083/connectors,並且攜帶配置json,可以注冊connector

{
	"name": "stu2",
	"config": {
		"connector.class": "io.debezium.connector.oracle.OracleConnector",
		"tasks.max": "1",
		"database.server.name": "server2",
		"database.hostname": "192.168.10.132",
		"database.port": "1521",
		"database.user": "c##dbzuser",
		"database.password": "dbz",
		"database.dbname": "ORCLCDB",
		"table.include.list": "C##DBZUSER.STU2",
		"database.history.kafka.bootstrap.servers": "192.168.10.132:9092",
		"database.history.kafka.topic": "schema-changes.stu2"
	}
}

kafka-connector會自動生成kafka-topic,一般是server.庫名.表名,不過像#符,一般給轉成了_符,像server2.C##DBZUSER.STU2就轉成了server2.C__DBZUSER.STU2,可以通過注冊connector仔細觀察日志發現。

進入kafka路徑,查看kafka所有的topic

bin/kafka-topics.sh --list --zookeeper 192.168.10.132:2181

監控當前topic,是否監控到數據庫變化

bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.132:9092 --topic server2.C__DBZUSER.STU2

監控到如上圖這樣的數據,說明監控成功!

2.2 XStream API

切換到用戶oracle

su - oracle

連接oracle,修改sys密碼,這是為了跟debezium上的語句對應,可以拿來就用。

sqlplus / as sysdba
connect / as sysdba
alter user sys identified by top_secret;
exit;

開啟歸檔模式

CONNECT sys/top_secret AS SYSDBA
alter system set db_recovery_file_dest_size = 5G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
alter system set enable_goldengate_replication=true;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
-- Should show "Database log mode: Archive Mode"
archive log list

exit;

切換到root用戶,創建db_recovery_file_dest文件夾,並賦予權限后,再切換回oracle用戶

su root
mkdir /opt/oracle/oradata/recovery_area
chmod 777 /opt/oracle/oradata/recovery_area
su oracle

7表示r(讀)、w(寫)、x(執行)權限

777表示給文件擁有者、同組用戶、其他組用戶都分配rwx權限

在數據庫級別啟用最小補充日志記錄,並且可以按如下方式配置。

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

如果只是想給某個表(比如stuinfo),開啟最小日志記錄,參考下面。

更改不成功並且表存在時,就先select * from C##TEST.STUINFO,如果提示沒有表,就換個方式select * from C##TEST."STUINFO"

ALTER TABLE C##TEST.STUINFO ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

配置XStream admin用戶

sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
  CREATE TABLESPACE xstream_adm_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/xstream_adm_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//localhost:1521/ORCLPDB1 as sysdba
  CREATE TABLESPACE xstream_adm_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/xstream_adm_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
  CREATE USER c##dbzadmin IDENTIFIED BY dbz
    DEFAULT TABLESPACE xstream_adm_tbs
    QUOTA UNLIMITED ON xstream_adm_tbs
    CONTAINER=ALL;

  GRANT CREATE SESSION, SET CONTAINER TO c##dbzadmin CONTAINER=ALL;

  BEGIN
     DBMS_XSTREAM_AUTH.GRANT_ADMIN_PRIVILEGE(
        grantee                 => 'c##dbzadmin',
        privilege_type          => 'CAPTURE',
        grant_select_privileges => TRUE,
        container               => 'ALL'
     );
  END;
  /

  exit;

創建XStream用戶

sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
  CREATE TABLESPACE xstream_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/xstream_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//localhost:1521/ORCLPDB1 as sysdba
  CREATE TABLESPACE xstream_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/xstream_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
  CREATE USER c##dbzuser IDENTIFIED BY dbz
    DEFAULT TABLESPACE xstream_tbs
    QUOTA UNLIMITED ON xstream_tbs
    CONTAINER=ALL;

  GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;
  GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;
  GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL;
  GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;
  GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
  GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
  exit;

創建XStream出站服務器

sqlplus c##dbzadmin/dbz@//localhost:1521/ORCLCDB
DECLARE
  tables  DBMS_UTILITY.UNCL_ARRAY;
  schemas DBMS_UTILITY.UNCL_ARRAY;
BEGIN
    tables(1)  := NULL;
    schemas(1) := 'debezium';
  DBMS_XSTREAM_ADM.CREATE_OUTBOUND(
    server_name     =>  'dbzxout',
    table_names     =>  tables,
    schema_names    =>  schemas);
END;
/
exit;

配置 XStream 用戶帳戶以連接到 XStream 出站服務器

sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
BEGIN
  DBMS_XSTREAM_ADM.ALTER_OUTBOUND(
    server_name  => 'dbzxout',
    connect_user => 'c##dbzuser');
END;
/
exit;

創建表,並開啟最小日志

sqlplus / as sysdba
conn c##dbzuser/dbz;
CREATE TABLE STU ( "s_id" INT PRIMARY KEY, "s_name" VARCHAR ( 255 ) );
ALTER TABLE C##DBZUSER.STU ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
exit;

經過上面的步驟,接下來,就可以通過java api或者kafka-connector方式來監控數據庫。相對來說,直接通過java api會方便許多。

java API

待完成

kafka-connector

玄學文件,監控不到數據,也沒有報錯

使用到的java、zookeeper、kafka解壓到/opt/module下,java需要配置環境變量

Central Repository: io/debezium/debezium-connector-oracle下載你需要的版本的plugin,如debezium-connector-oracle-1.6.2.Final-plugin.tar.gz

創建文件夾,存放kafka-connector-plugin

mkdir /opt/kafka-plugin

解壓下載的plugin,將里面的內容全部拷貝,復制到kafka-plugin一份、kafka的libs一份,如下圖

Oracle Instant Client Downloads下載對應操作系統的Basic Package (ZIP)。

將其解壓,提取其中的ojdbc8.jarxstream.jarkafka的libs中。

將解壓后的instantClient配置成環境變量

vim /etc/profile.d/

導出LD_LIBRARY_PATH,並保存

export LD_LIBRARY_PATH=/opt/instantclient_19_12

刷新環境變量

source /etc/profile

配置kafka-connector

cd /opt/module/kafka_2.12-2.7.0/
vi config/connect-distributed.properties 

添加plugin.path為剛才配置好的kafka-plugin,保存。

plugin.path=/opt/kafka-plugin

如此,就配置好了。

進入zookeeper路徑,復制一份zookeeper配置文件出來,啟動zookeeper

cp conf/zoo_sample.cfg conf/zoo.conf
bin/
bin/zkServer.sh start

進入kafka路徑,先啟動kafka,啟動成功后,再去啟動kafka-connect

bin/kafka-server-start.sh config/server.properties
bin/connect-distributed.sh config/connect-distributed.properties

打開瀏覽器/postman,get訪問8083端口,會出現版本信息

通過post訪問,ip:8083/connectors,並且攜帶配置json,可以注冊connector

{
    "name": "stu2",
    "config": {
        "connector.class" : "io.debezium.connector.oracle.OracleConnector",
        "tasks.max" : "1",
        "database.server.name" : "server6",
        "database.hostname" : "192.168.10.131",
        "database.port" : "1521",
        "database.user" : "c##dbzuser",
        "database.password" : "dbz",
        "database.dbname" : "ORCLCDB",
        "table.include.list" : "C##DBZUSER.STU",
        "database.history.kafka.bootstrap.servers" : "192.168.10.131:9092",
        "database.history.kafka.topic": "schema-changes.stu2",
        "database.connection.adapter": "xstream",
        "database.out.server.name" : "dbzxout"
    }
}

kafka-connector會自動生成kafka-topic,一般是server.庫名.表名,不過像#符,一般給轉成了_符,像server2.C##DBZUSER.STU2就轉成了server2.C__DBZUSER.STU2,可以通過注冊connector仔細觀察日志發現。

進入kafka路徑,查看kafka所有的topic

bin/kafka-topics.sh --list --zookeeper 192.168.10.132:2181

監控當前topic,是否監控到數據庫變化

bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.132:9092 --topic server2.C__DBZUSER.STU2


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM