基於OGG實現ORACLE同步至KAFKA實施方案


一、背景

本文基於Oracle OGG,介紹一種將Oracle數據庫的數據實時同步到Kafka消息隊列的方法。Kafka是一種高效的消息隊列實現,通過訂閱kafka的消息隊列,下游系統可以實時獲取在線Oracle系統的數據變更情況,實現從OLTP系統中獲取數據變更,實時同步到下游業務系統。

二、環境介紹

1、組件版本

組件

版本

操作系統

IP地址

描述

源端Oracle

11.2.0.4.0

Red Hat 6.8

192.168.140.186

源端Oracle數據庫

源端OGG

12.1.2.1.0

Red Hat 6.8

192.168.140.186

源端OGG,用於抽取源端Oracle數據變更,並將變更日志發送到目標端

目標端OGG

12.3.2.1.1

Red Hat 7.5

192.168.83.227、228、229

目標端OGG,接受源端發送的Oracle事務變更日志,並將變更推送到kafka消息隊列

目標端kafka

2.12-1.1.0

Red Hat 7.5

192.168.83.227、228、229

消息隊列,接收目標端OGG推送過來的數據

 

2、整體架構圖

 

3、名詞解釋

1)   OGG Manager:OGG Manager用於配置和管理其它OGG組件,配置數據抽取、數據推送、數據復制,啟動和停止相關組件,查看相關組件的運行情況。

2)   數據抽取(Extract):抽取源端數據庫的變更(DML, DDL)。數據抽取主要分如下2種類型:

a.本地抽取:從本地數據庫捕獲增量變更數據,寫入到本地Trail文件

b.初始數據抽取:從數據庫表中導出全量數據,用於初次數據加載

3)   數據推送(Data Pump):Data Pump是一種特殊的數據抽取(Extract)類型,從本地Trail文件中讀取數據,並通過網絡將數據發送到目標端OGG

4)   4.Trail文件:數據抽取從源端數據庫抓取到的事物變更信息會寫入到Trail文件。

5)   數據接收(Collector):數據接收程序運行在目標端機器,用於接收Data Pump發送過來的Trail日志,並將數據寫入到本地Trail文件。

6)   數據復制(Replicat):數據復制運行在目標端機器,從Trail文件讀取數據變更,並將變更數據應用到目標端數據存儲系統。本案例中,數據復制將數據推送到kafka消息隊列。

7)   檢查點(Checkpoint):檢查點用於記錄數據庫事物變更。

 

二、操作步驟

1、源端安裝配置(192.168.140.186)

(1)源端安裝了ORACLE 版本11.2.0.4

     alter system set enable_goldengate_replication=TRUE;

     Alter database add supplemental log data;

     alter database force logging;

     Select supplemental_log_data_min from v$database;

     Create user ogg identified by "Qwer!234" Default tablespace users temporary tablespace temp  profile DEFAULT;

Grant connect to ogg;

grant resource to ogg;

grant dba to ogg;

grant alter session to ogg;

grant create session to ogg;

grant select any dictionary to ogg;

grant select any table to ogg;

grant insert any table to ogg;

grant delete any table to ogg;

grant update any table to ogg;

grant alter any table to ogg;

grant create table to ogg;

grant lock any table to ogg;

grant flashback any table to ogg;

Grant unlimited tablespace to ogg;

(2)源端安裝了OGG版本12.1.2.1.0

     --創建用戶

     $useradd -g oinstall gg

     --設置gg環境變量

      將oracle的環境變量拷貝至gg用戶

     --創建目錄

      Mkdir /oracle/gg

      Chown oracle:oinstall /oracle/gg

     --解壓安裝

     Unzip 121210_fbo_ggs_Linux_x64_shiphome.zip

     vi /oracle/gg/fbo_ggs_Linux_x64_shiphome/Disk1/response/oggcore.rsp

      INSTALL_OPTION=ORA11g

      SOFTWARE_LOCATION=/oracle/gg

      START_MANAGER=false

     cd /oracle/gg/fbo_ggs_Linux_x64_shiphome/Disk1

./runInstaller -responseFile /oracle/gg/fbo_ggs_Linux_x64_shiphome/Disk1/response/oggcore.rsp -silent  -ignoreSysPrereqs  -ignorePrereq –local

--創建子目錄

./ggsci

create SUBDIRS

start mgr

 

(3)源端配置OGG

/*網絡截取資料

GGSCI (zwjfdb3) 7> view param EZWJFBOR

EXTRACT EZWJFBOR

SETENV (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")

SETENV (ORACLE_HOME = "/u01/app/oracle/product/11.2.0/db_1")

SETENV (ORACLE_SID = "zwjfdb3")

--捕獲 truncate 操作

gettruncates

--定義discardfile文件位置,如果處理中有記錄出錯會寫入到此文件中

DISCARDFILE ./dirrpt/ezwjfbor.dsc, APPEND, MEGABYTES 1024

--動態解析表名

DYNAMICRESOLUTION

--獲取更新之前數據

GETUPDATEBEFORES

--當抽取進程遇到一個沒有使用的字段時只生成一個警告,進程會繼續執行而不會被異常終止(abend)

DBOPTIONS  ALLOWUNUSEDCOLUMN

--每隔30分鍾報告一次從程序開始到現在的抽取進程或者復制進程的事物記錄數,並匯報進程的統計信息

REPORTCOUNT EVERY 30 MINUTES, RATE

--每隔3分鍾檢查一下大事務,超過2小時還沒結束的進行報告

WARNLONGTRANS 2h,CHECKINTERVAL 3m

--不會從閃回日志中獲取數據

FETCHOPTIONS NOUSESNAPSHOT

USERID xxxxxx,PASSWORD xxxxxx

EXTTRAIL ./dirdat/zb

TABLE xx.xx;

TABLE xx.xx;

 

#添加抽取進程

GGSCI (zwjfdb3) 11> add extract EZWJFBOR,TRANLOG, BEGIN NOW

EXTRACT added.

#定義trail文件

GGSCI (zwjfdb3) 12> ADD EXTTRAIL ./dirdat/zb,EXTRACT EZWJFBOR, MEGABYTES 200

EXTTRAIL added.

 

#pump extract進程

GGSCI (zwjfdb3) 8> view param PZWJFBOR

EXTRACT PZWJFBOR

SETENV (NLS_LANG = "AMERICAN_AMERICA.AL32UTF8")

PASSTHRU

DYNAMICRESOLUTION

RMTHOST xx.xx.xx.xx,MGRPOT 7809

RMTTRAIL ./dirdat/zb

TABLE xx.xx;

TABLE xx.xx;

#添加pump捕獲組

GGSCI (zwjfdb3) 23> ADD EXTRACT PZWJFBOR,EXTTRAILSOURCE ./dirdat/zb

EXTRACT added.

#定義pump trail文件

GGSCI (zwjfdb3) 25> ADD RMTTRAIL ./dirdat/zb,EXTRACT PZWJFBOR, MEGABYTES 200

RMTTRAIL added.

*/

1、ORACLE創建測試表

create tablespace TBSDATA datafile size 1G;

create user linq identified by "Qwer!234" default tablespace TBSDATA;

grant connect to linq;

grant resource to linq;

create table linq.test_ogg(id number ,name varchar2(200),primary key(id));

    2、配置MGR

>edit params mgr

    PORT 7809

PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 2

   3、配置捕獲進程
    #一定要記得同步之前要開啟表的全補充日志
#alter table linq.test_ogg add supplemental log data (all) columns;

>dblogin userid ogg@ORCL,password Qwer!234

    >add extract E_LINQ,tranlog,begin now

>ADD EXTTRAIL ./dirdat/lq, EXTRACT E_LINQ

>add trandata linq.test_ogg

>info  trandata linq.test_ogg

>edit params E_LINQ

  extract E_LINQ      

--動態解析表名

dynamicresolution

SETENV (ORACLE_SID = "newdb")

SETENV (NLS_LANG = "AMERICAN_AMERICA.ZHS16GBK")

--SETENV (NLS_LANG = "american_america.AL32UTF8")

userid ogg@ORCL,password Qwer!234

--獲取更新之前全部字段數據(如果注釋,before顯示空,需與UPDATERECORDFORMAT COMPACT一起配置)

GETUPDATEBEFORES

--獲取刪除前非主鍵字段(如果注釋,before只會顯示主鍵)

NOCOMPRESSDELETES

--獲取更新前非主鍵字段(目前沒發現用途)

NOCOMPRESSUPDATES

--獲取更新的前后鏡像信息(如果注釋,before顯示空,需與GETUPDATEBEFORES一起配置)

UPDATERECORDFORMAT COMPACT

--定義discardfile文件位置,如果處理中有記錄出錯會寫入到此文件中

discardfile ./dirrpt/E_LINQ.dsc,purge,megabytes 20000

warnlongtrans 2h,checkinterval 3m

exttrail ./dirdat/lq

PURGEOLDEXTRACTS ./dirdat/lq*,usecheckpoints, minkeepdays 3

TRANLOGOPTIONS DBLOGREADER

numfiles 3000

allocfiles 200

table linq.test_ogg;

   4、配置傳輸進程

>add extract P_LINQ,exttrailsource ./dirdat/lq

> ADD EXTTRAIL ./dirdat/lq, EXTRACT P_LINQ

>add rmttrail ./dirdat/lq,extract P_LINQ

>edit params P_LINQ

extract P_LINQ

SETENV (NLS_LANG = "AMERICAN_AMERICA.ZHS16GBK")

passthru

dynamicresolution

rmthost 192.168.73.227,mgrport 7809 ,compress

rmttrail ./dirdat/lq

numfiles 3000

table linq.test_ogg;

 

5、配置define定義文件

> edit param test_ogg

defsfile ./dirdef/linq.test_ogg

userid ogg@ORCL,password Qwer!234

table linq.test_ogg;

在OGG主目錄下執行:

./defgen paramfile dirprm/test_ogg.prm

  
注:目標端安裝后,將生成的./dirdef/linq.test_ogg發送的目標端ogg目錄下的dirdef里

2、目標端安裝配置(192.168.73.227)

(1)目標端安裝kafka,版本2.12-1.1.0,已安裝
      
(2)目標端安裝OGG,版本12.3.2.1.1
  groupadd gg
  useradd -g gg -G gg gg
  passwd gg
su - gg
unzip
unzip OGG_BigData_Linux_x64_12.3.2.1.1.zip
tar xf OGG_BigData_Linux_x64_12.3.2.1.1.tar -C /data/gg/
  
  vi /home/gg/.bash_profile
export OGG_HOME=/data/gg 
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$OGG_HOME/lib 
export PATH=$OGG_HOME:$PATH
. .bash_profile
 
測試ggsci

 
        
 
創建子目錄create subdirs

 
        
(3)定義文件拷貝
從源端192.168.140.128的定義文件/oracle/gg/dirdef/linq.test_ogg拷貝
到目標端/data/gg/dirdef/linq.test_ogg
scp /oracle/gg/dirdef/linq.test_ogg gg@192.168.83.227:/data/gg/dirdef/
(4)kafka創建主題
kafka-topics.sh --create --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --replication-factor 3 --partitions 3 --topic testogg2
kafka-topics.sh --describe --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic testogg2
(5)開啟kafka進程
--83.227開啟
  kafka-server-start.sh -daemon /data/kfdata/kafka/config/server.properties
(6)配置管理器mgr
>edit param mgr 
PORT 7809 
DYNAMICPORTLIST 7810-7909 
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
 
(7)配置checkpoint
>edit  param  ./GLOBALS
CHECKPOINTTABLE test_ogg.checkpoint
(8)配置replicate進程
>edit param rekafka
REPLICAT rekafka
sourcedefs /data/gg/dirdef/linq.test_ogg 
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props 
REPORTCOUNT EVERY 1 MINUTES, RATE 
GROUPTRANSOPS 10000 
MAP linq.test_ogg, TARGET linq.test_ogg;
說明:REPLICATE rekafka定義rep進程名稱;sourcedefs即在4.6中在源服務器上做的表映射文件;TARGETDB LIBFILE即定義kafka一些適配性的庫文件以及配置文件,配置文件位於OGG主目錄下的dirprm/kafka.props;REPORTCOUNT即復制任務的報告生成頻率;GROUPTRANSOPS為以事務傳輸時,事務合並的單位,減少IO操作;MAP即源端與目標端的映射關系。
(9)配置kafka.props(備注不能配進去)
cd /data/gg/dirprm/
vi kafka.props
/*
gg.handlerlist=kafkahandler //handler類型 
gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相關配置 
gg.handler.kafkahandler.topicMappingTemplate=test_ogg //kafka的topic名稱,可先手動創建好,默認創建的話partition數只有1
gg.handler.kafkahandler.format=json //傳輸文件的格式,支持json,xml等 
gg.handler.kafkahandler.format.includePrimaryKeys=true --包含主鍵
gg.handler.kafkahandler.mode=op //OGG for Big Data中傳輸模式,即op為一次SQL傳輸一次,tx為一次事務傳輸一次 
gg.classpath=dirprm/:/data/kfdata/kafka/libs/*:/data/gg/:/data/gg/lib/*
*/
無備注版:
gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka 
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=test_ogg
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.format.includePrimaryKeys=true 
gg.handler.kafkahandler.mode=op
gg.classpath=dirprm/:/data/kfdata/kafka/libs/*:/data/gg/:/data/gg/lib/*
 
vi custom_kafka_producer.properties
/*
bootstrap.servers=192.168.83.227:9092 //kafkabroker的地址 
acks=1 
compression.type=gzip //壓縮類型 
reconnect.backoff.ms=1000 //重連延時 
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 
batch.size=102400 
linger.ms=10000
*/
無備注版:
bootstrap.servers=192.168.83.227:9092
acks=1 
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 
batch.size=102400 
linger.ms=10000
(10)添加trail文件到replicate進程
add replicat rekafka exttrail /data/gg/dirdat/lq,checkpointtable test_ogg.checkpoint
(11)開啟源端與目標端ogg
  
(12)測試
1、源端數據入庫
conn linq/Qwer!234
insert into test_ogg values(2,'go');
commit;
2、查看目標端kafka主題是否創建
kafka-topics.sh --list --zookeeper localhost:2181

 
        
3、進行消費測試
kafka-console-consumer.sh --bootstrap-server 192.168.83.227:9092 --from-beginning --topic test_ogg
(用kafka端口9092,consumer的信息將會存放在kafka之中,推薦)
或:
kafka-console-consumer.sh --zookeeper 192.168.83.227:2181 --from-beginning --topic test_ogg
(用zookeeper端口2181,consumer的信息將會存放在zk之中)
測試結果如下(中文也是支持的):

[root@kafka3 data]# kafka-console-consumer.sh --bootstrap-server 192.168.83.227:9092 --from-beginning --topic test_ogg
{"table":"LINQ.TEST_OGG","op_type":"I","op_ts":"2020-03-06 17:12:03.648091","current_ts":"2020-03-06T17:12:10.737000","pos":"00000000000000001432","primary_keys":["ID"],"after":{"ID":2,"NAME":"go"}}
{"table":"LINQ.TEST_OGG","op_type":"I","op_ts":"2020-03-06 17:21:23.652647","current_ts":"2020-03-06T17:21:31.368000","pos":"00000000000000001569","primary_keys":["ID"],"after":{"ID":3,"NAME":"ok"}}
{"table":"LINQ.TEST_OGG","op_type":"U","op_ts":"2020-03-06 17:22:10.653057","current_ts":"2020-03-06T17:22:17.411000","pos":"00000000000000001701","primary_keys":["ID"],"before":{"ID":3,"NAME":"ok"},"after":{"ID":3,"NAME":"ok3"}}
{"table":"LINQ.TEST_OGG","op_type":"D","op_ts":"2020-03-06 17:22:58.653488","current_ts":"2020-03-06T17:23:05.454000","pos":"00000000000000001858","primary_keys":["ID"],"before":{"ID":1,"NAME":"begin"}}
{"table":"LINQ.TEST_OGG","op_type":"I","op_ts":"2020-03-07 19:21:07.411700","current_ts":"2020-03-07T19:21:12.465000","pos":"00000000000000001994","primary_keys":["ID"],"after":{"ID":1,"NAME":"begin"}}
{"table":"LINQ.TEST_OGG","op_type":"I","op_ts":"2020-03-10 15:23:20.354624","current_ts":"2020-03-10T15:23:27.371000","pos":"00000000000000002130","primary_keys":["ID"],"after":{"ID":4,"NAME":"linq"}}
{"table":"LINQ.TEST_OGG","op_type":"U","op_ts":"2020-03-10 15:30:19.357882","current_ts":"2020-03-10T15:30:25.697000","pos":"00000000000000002266","primary_keys":["ID"],"before":{"ID":4,"NAME":"linq"},"after":{"ID":4,"NAME":"林勤"}}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM