Debezium Oracle 基於Logminer測試


准備docker鏡像

只測試Source端數據抽取

使用oracle官方鏡像

  • 啟動docker容器 docker run -d --name oracle12c-test container-registry.oracle.com/database/enterprise:12.2.0.1
  • 如果本地沒有該鏡像 需要執行 docker login container-registry.oracle.com 依次根據提示輸入oracle賬戶的用戶名密碼
  • 執行 docker ps |grep oracle12c-test
  • Up 8 minutes (healthy) 狀態是healthy狀態后 可以進入容器操作
  • 鏡像默認開啟PDB模式 默認CDB名稱為ORCLCDB 默認PDB名稱為ORCLPDB1
  • 鏡像默認sys密碼為Orcldoc_db1

准備設置數據庫

開啟日志歸檔

  • 進入容器 開始操作 docker exec -it 容器的ID bash
  • sqlplus sys/Orcldoc_db1 as sysdba 或者 sqlplus sys / as sysdba
  • 檢查是否開啟了歸檔 默認的容器里是沒有開啟的
  • 執行archive log list 如果結果顯示 Database log mode No Archive Mode 說明我們的數據庫是沒有開啟歸檔的
我們依次執行
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
結果應該如虛線以下提示
---------------------------------- 
SQL> shutdown immediate; 
Database closed. 
Database dismounted. 
ORACLE instance shut down. 
SQL> startup mount; 
ORACLE instance started. 
Total System Global Area 1342177280 bytes 
Fixed Size 8792536 bytes 
Variable Size 989857320 bytes 
Database Buffers 318767104 bytes 
Redo Buffers 24760320 bytes 
Database mounted. 
SQL> alter database archivelog; 
Database altered. 
SQL> alter database open; 
Database altered. 
--------------------------------------- 
再次查看是否開啟歸檔 已經顯示處於歸檔模式 
SQL> archive log list; 
Database log mode Archive 

創建DBZ用戶相關

create user c##dbzuser identified by dbz default tablespace users container=all;
ALTER USER c##dbzuser QUOTA UNLIMITED ON users;
GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;
GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL;
GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL;
GRANT LOGMINING TO c##dbzuser CONTAINER=ALL;

GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL;
GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL;

GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL;

GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL;

准備相關組件

  1. 啟動zookeeper、kafka、connect組件
IP 換成實際IP
 docker run -d  --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 -p 9011:9011  debezium/zookeeper:1.6
 docker run -d --name kafka -p 9092:9092 -p 9012:9012  --link zookeeper:zookeeper debezium/kafka:1.6
 docker run -d --name connect -p 8083:8083 -p 9010:9010  -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses  -e CONNECT_MAX_REQUEST_SIZE=20000000  -e CONNECT_BUFFER_MEMORY=800000000    -e CONNECT_FETCH_MAX_BYTES=20000000  -e CONNECT_MAX_PARTITION_FETCH_BYTES=20000000 -e OFFSET_FLUSH_INTERVAL_MS=10000  -e OFFSET_FLUSH_TIMEOUT_MS=6000000 -e CONNECT_CONNECTIONS_MAX_IDLE_MS=6000000 -e  CONNECT_RECEIVE.BUFFER.BYTES=500000000 -e CONNECT_PRODUCER_MAX_REQUEST_SIZE=20000000  --link zookeeper:zookeeper --link kafka:kafka debezium/connect:1.6
  1. 自行下載ojdbc8.jar
docker cp ojdbc8.jar 容器ID:/kafka/libs 
重啟connect docker restart connect容器ID

准備測試相關數據

  1. 創建PDB用戶
  2. 賦予PDB用戶權限
  3. 創建相關表格 插入數據測試
sqlplus / as sysdba
alter session set container = ORCLPDB1;
create user dbz identified by dbz;
ALTER USER dbz QUOTA UNLIMITED ON users;
grant connect, resource to dbz;
切換dbz用戶連接到pdb數據庫 orclpdb1
conn dbz/dbz@orclpdb1
CREATE TABLE CUSTOMERS (
  id NUMBER(9) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY,
  first_name VARCHAR2(255) NOT NULL,
  last_name VARCHAR2(255) NOT NULL,
  email VARCHAR2(255) NOT NULL UNIQUE
);
insert into customers values(1001,'a','b','c');

開啟日志增補

sqlplus sys/Orcldoc_db1 as sysdba
修改全局數據庫 也可以直接開啟ALL級別的全庫增補
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
SELECT SUPPLEMENTAL_LOG_DATA_MIN FROM V$DATABASE;
切換到pdb數據庫下 也可以直接開啟ALL級別的全庫增補
alter session set container = ORCLPDB1;
ALTER PLUGGABLE DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER TABLE DBZ.CUSTOMERS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
SELECT SUPPLEMENTAL_LOG_DATA_MIN FROM V$DATABASE;

測試數據庫層面Logminer是否好用

sqlplus / as sysdba
查看redo日志文件位置 下面命令根據顯示的填寫
select member from v$logfile;
execute dbms_logmnr.add_logfile('/u04/app/oracle/redo/redo001.log',dbms_logmnr.new);
execute dbms_logmnr.add_logfile('/u04/app/oracle/redo/redo002.log',dbms_logmnr.addfile);
execute dbms_logmnr.add_logfile('/u04/app/oracle/redo/redo003.log',dbms_logmnr.addfile);
execute dbms_logmnr.start_logmnr(options=>dbms_logmnr.dict_from_online_catalog+dbms_logmnr.committed_data_only);
select sql_redo,sql_undo from v$logmnr_contents where table_name like '%CUSTOMERS%' and OPERATION='INSERT';
下面語句為結束語句
execute dbms_logmnr.end_logmnr;

該圖片為select sql_redo,sql_undo from v$logmnr_contents where table_name like '%CUSTOMERS%' and OPERATION='INSERT'; 捕獲結果說明好用
image

DBZ測試

准備source文件

  1. 用戶名密碼根據實際情況填寫 如果完全按照本操作進行 不用修改
  2. IP 修改為實際數據庫容器的IP,IP查看命令docker inspect 數據庫容器ID |grep IP
{
    "name": "test-connector",
    "config": {
        "connector.class" : "io.debezium.connector.oracle.OracleConnector",
        "tasks.max" : "1",
        "database.server.name" : "ORCLCDB",
        "database.user" : "c##dbzuser",
        "database.password" : "dbz",
        "database.url": "jdbc:oracle:thin:@(DESCRIPTION= (ADDRESS= (PROTOCOL=TCP)(HOST=172.17.0.10)(PORT=1521)) (CONNECT_DATA= (SID=ORCLCDB) (SERVER=dedicated)))",
        "database.dbname" : "ORCLCDB",
        "database.pdb.name" : "ORCLPDB1",        
        "table.include.list": "DBZ.CUSTOMERS", // 只監控我們的表
        "database.history.kafka.bootstrap.servers" : "kafka:9092",
        "database.history.kafka.topic": "schema-changes.test"
    }
}

創建source connector

  • 執行curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @test.json 根據實際情況填寫
  • 結果如圖所示
    image

核對捕獲數據結果

  • 進入到connect容器內部 執行
bin/kafka-topics.sh --list --zookeeper zookeeper:2181 
bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.4:9092 --topic schema-changes.test  --from-beginning
bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.4:9092 --topic ORCLCDB.DBZ.CUSTOMERS  --from-beginning
  • 核對數據正確

DDL
image

DML
image


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM