一、背景
本文基於Oracle OGG,介紹一種將Oracle數據庫的數據實時同步到Kafka消息隊列的方法。Kafka是一種高效的消息隊列實現,通過訂閱kafka的消息隊列,下游系統可以實時獲取在線Oracle系統的數據變更情況,實現從OLTP系統中獲取數據變更,實時同步到下游業務系統。
二、環境介紹
1、組件版本
組件 |
版本 |
操作系統 |
IP地址 |
描述 |
源端Oracle |
11.2.0.4.0 |
Red Hat 6.8 |
192.168.140.186 |
源端Oracle數據庫 |
源端OGG |
12.1.2.1.0 |
Red Hat 6.8 |
192.168.140.186 |
源端OGG,用於抽取源端Oracle數據變更,並將變更日志發送到目標端 |
目標端OGG |
12.3.2.1.1 |
Red Hat 7.5 |
192.168.83.227、228、229 |
目標端OGG,接受源端發送的Oracle事務變更日志,並將變更推送到kafka消息隊列 |
目標端kafka |
2.12-1.1.0 |
Red Hat 7.5 |
192.168.83.227、228、229 |
消息隊列,接收目標端OGG推送過來的數據 |
2、整體架構圖
3、名詞解釋
1) OGG Manager:OGG Manager用於配置和管理其它OGG組件,配置數據抽取、數據推送、數據復制,啟動和停止相關組件,查看相關組件的運行情況。
2) 數據抽取(Extract):抽取源端數據庫的變更(DML, DDL)。數據抽取主要分如下2種類型:
a.本地抽取:從本地數據庫捕獲增量變更數據,寫入到本地Trail文件
b.初始數據抽取:從數據庫表中導出全量數據,用於初次數據加載
3) 數據推送(Data Pump):Data Pump是一種特殊的數據抽取(Extract)類型,從本地Trail文件中讀取數據,並通過網絡將數據發送到目標端OGG
4) 4.Trail文件:數據抽取從源端數據庫抓取到的事物變更信息會寫入到Trail文件。
5) 數據接收(Collector):數據接收程序運行在目標端機器,用於接收Data Pump發送過來的Trail日志,並將數據寫入到本地Trail文件。
6) 數據復制(Replicat):數據復制運行在目標端機器,從Trail文件讀取數據變更,並將變更數據應用到目標端數據存儲系統。本案例中,數據復制將數據推送到kafka消息隊列。
7) 檢查點(Checkpoint):檢查點用於記錄數據庫事物變更。
二、操作步驟
1、源端安裝配置(192.168.140.186)
(1)源端安裝了ORACLE 版本11.2.0.4
alter system set enable_goldengate_replication=TRUE;
Alter database add supplemental log data;
alter database force logging;
Select supplemental_log_data_min from v$database;
Create user ogg identified by "Qwer!234" Default tablespace users temporary tablespace temp profile DEFAULT;
Grant connect to ogg;
grant resource to ogg;
grant dba to ogg;
grant alter session to ogg;
grant create session to ogg;
grant select any dictionary to ogg;
grant select any table to ogg;
grant insert any table to ogg;
grant delete any table to ogg;
grant update any table to ogg;
grant alter any table to ogg;
grant create table to ogg;
grant lock any table to ogg;
grant flashback any table to ogg;
Grant unlimited tablespace to ogg;
(2)源端安裝了OGG版本12.1.2.1.0
--創建用戶
$useradd -g oinstall gg
--設置gg環境變量
將oracle的環境變量拷貝至gg用戶
--創建目錄
Mkdir /oracle/gg
Chown oracle:oinstall /oracle/gg
--解壓安裝
Unzip 121210_fbo_ggs_Linux_x64_shiphome.zip
vi /oracle/gg/fbo_ggs_Linux_x64_shiphome/Disk1/response/oggcore.rsp
INSTALL_OPTION=ORA11g
SOFTWARE_LOCATION=/oracle/gg
START_MANAGER=false
cd /oracle/gg/fbo_ggs_Linux_x64_shiphome/Disk1
./runInstaller -responseFile /oracle/gg/fbo_ggs_Linux_x64_shiphome/Disk1/response/oggcore.rsp -silent -ignoreSysPrereqs -ignorePrereq –local
--創建子目錄
./ggsci
create SUBDIRS
start mgr
(3)源端配置OGG
/*網絡截取資料
GGSCI (zwjfdb3) 7> view param EZWJFBOR
EXTRACT EZWJFBOR
SETENV (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
SETENV (ORACLE_HOME = "/u01/app/oracle/product/11.2.0/db_1")
SETENV (ORACLE_SID = "zwjfdb3")
--捕獲 truncate 操作
gettruncates
--定義discardfile文件位置,如果處理中有記錄出錯會寫入到此文件中
DISCARDFILE ./dirrpt/ezwjfbor.dsc, APPEND, MEGABYTES 1024
--動態解析表名
DYNAMICRESOLUTION
--獲取更新之前數據
GETUPDATEBEFORES
--當抽取進程遇到一個沒有使用的字段時只生成一個警告,進程會繼續執行而不會被異常終止(abend)
DBOPTIONS ALLOWUNUSEDCOLUMN
--每隔30分鍾報告一次從程序開始到現在的抽取進程或者復制進程的事物記錄數,並匯報進程的統計信息
REPORTCOUNT EVERY 30 MINUTES, RATE
--每隔3分鍾檢查一下大事務,超過2小時還沒結束的進行報告
WARNLONGTRANS 2h,CHECKINTERVAL 3m
--不會從閃回日志中獲取數據
FETCHOPTIONS NOUSESNAPSHOT
USERID xxxxxx,PASSWORD xxxxxx
EXTTRAIL ./dirdat/zb
TABLE xx.xx;
TABLE xx.xx;
#添加抽取進程
GGSCI (zwjfdb3) 11> add extract EZWJFBOR,TRANLOG, BEGIN NOW
EXTRACT added.
#定義trail文件
GGSCI (zwjfdb3) 12> ADD EXTTRAIL ./dirdat/zb,EXTRACT EZWJFBOR, MEGABYTES 200
EXTTRAIL added.
#pump extract進程
GGSCI (zwjfdb3) 8> view param PZWJFBOR
EXTRACT PZWJFBOR
SETENV (NLS_LANG = "AMERICAN_AMERICA.AL32UTF8")
PASSTHRU
DYNAMICRESOLUTION
RMTHOST xx.xx.xx.xx,MGRPOT 7809
RMTTRAIL ./dirdat/zb
TABLE xx.xx;
TABLE xx.xx;
#添加pump捕獲組
GGSCI (zwjfdb3) 23> ADD EXTRACT PZWJFBOR,EXTTRAILSOURCE ./dirdat/zb
EXTRACT added.
#定義pump trail文件
GGSCI (zwjfdb3) 25> ADD RMTTRAIL ./dirdat/zb,EXTRACT PZWJFBOR, MEGABYTES 200
RMTTRAIL added.
*/
1、ORACLE創建測試表
create tablespace TBSDATA datafile size 1G;
create user linq identified by "Qwer!234" default tablespace TBSDATA;
grant connect to linq;
grant resource to linq;
create table linq.test_ogg(id number ,name varchar2(200),primary key(id));
2、配置MGR
>edit params mgr
PORT 7809
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 2
3、配置捕獲進程
#
一定要記得同步之前要開啟表的全補充日志
#alter table
linq.test_oggadd supplemental log data (all) columns;
>dblogin userid ogg@ORCL,password Qwer!234
>add extract E_LINQ,tranlog,begin now
>ADD EXTTRAIL ./dirdat/lq, EXTRACT E_LINQ
>add trandata linq.test_ogg
>info trandata linq.test_ogg
>edit params E_LINQ
extract E_LINQ
--動態解析表名
dynamicresolution
SETENV (ORACLE_SID = "newdb")
SETENV (NLS_LANG = "AMERICAN_AMERICA.ZHS16GBK")
--SETENV (NLS_LANG = "american_america.AL32UTF8")
userid ogg@ORCL,password Qwer!234
--獲取更新之前全部字段數據(如果注釋,before顯示空,需與UPDATERECORDFORMAT COMPACT一起配置)
GETUPDATEBEFORES
--獲取刪除前非主鍵字段(如果注釋,before只會顯示主鍵)
NOCOMPRESSDELETES
--獲取更新前非主鍵字段(目前沒發現用途)
NOCOMPRESSUPDATES
--獲取更新的前后鏡像信息(如果注釋,before顯示空,需與GETUPDATEBEFORES一起配置)
UPDATERECORDFORMAT COMPACT
--定義discardfile文件位置,如果處理中有記錄出錯會寫入到此文件中
discardfile ./dirrpt/E_LINQ.dsc,purge,megabytes 20000
warnlongtrans 2h,checkinterval 3m
exttrail ./dirdat/lq
PURGEOLDEXTRACTS ./dirdat/lq*,usecheckpoints, minkeepdays 3
TRANLOGOPTIONS DBLOGREADER
numfiles 3000
allocfiles 200
table linq.test_ogg;
4、配置傳輸進程
>add extract P_LINQ,exttrailsource ./dirdat/lq
> ADD EXTTRAIL ./dirdat/lq, EXTRACT P_LINQ
>add rmttrail
./dirdat/lq,extract
P_LINQ
>edit params P_LINQ
extract P_LINQ
SETENV (NLS_LANG = "AMERICAN_AMERICA.ZHS16GBK")
passthru
dynamicresolution
rmthost 192.168.73.227,mgrport 7809 ,compress
rmttrail ./dirdat/lq
numfiles 3000
table linq.test_ogg;
5、配置define定義文件
> edit param test_ogg
defsfile ./dirdef/linq.test_ogg
userid ogg@ORCL,password Qwer!234
table linq.test_ogg;
在OGG主目錄下執行:
./defgen paramfile dirprm/
test_ogg.prm
注:目標端安裝后,將生成的./dirdef/linq.test_ogg發送的目標端ogg目錄下的dirdef里
2、目標端安裝配置(192.168.73.227)
(1)目標端安裝kafka,版本
2.12-1.1.0,已安裝
(2)目標端安裝OGG,版本12.3.2.1.1
groupadd gg
useradd -g gg -G gg gg
passwd gg
su - gg
unzip
unzip OGG_BigData_Linux_x64_12.3.2.1.1.zip
tar xf OGG_BigData_Linux_x64_12.3.2.1.1.tar -C /data/gg/
vi /home/gg/.bash_profile
export OGG_HOME=/data/gg
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$OGG_HOME/lib
export PATH=$OGG_HOME:$PATH
. .bash_profile
測試ggsci
創建子目錄create subdirs
(3)定義文件拷貝
從源端192.168.140.128的定義文件/oracle/gg/dirdef/linq.test_ogg拷貝
到目標端/data/gg/dirdef/linq.test_ogg
scp /oracle/gg/dirdef/linq.test_ogg gg@192.168.83.227:/data/gg/dirdef/
(4)kafka創建主題
kafka-topics.sh --create --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --replication-factor 3 --partitions 3 --topic testogg2
kafka-topics.sh --describe --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic testogg2
(5)開啟kafka進程
--83.227開啟
kafka-server-start.sh -daemon /data/kfdata/kafka/config/server.properties
(6)配置管理器mgr
>edit param mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
(7)配置checkpoint
>edit param ./GLOBALS
CHECKPOINTTABLE test_ogg.checkpoint
(8)配置replicate進程
>edit param rekafka
REPLICAT rekafka
sourcedefs /data/gg/dirdef/linq.test_ogg
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP linq.test_ogg, TARGET linq.test_ogg;
說明:REPLICATE rekafka定義rep進程名稱;sourcedefs即在4.6中在源服務器上做的表映射文件;TARGETDB LIBFILE即定義kafka一些適配性的庫文件以及配置文件,配置文件位於OGG主目錄下的dirprm/kafka.props;REPORTCOUNT即復制任務的報告生成頻率;GROUPTRANSOPS為以事務傳輸時,事務合並的單位,減少IO操作;MAP即源端與目標端的映射關系。
(9)配置kafka.props(備注不能配進去)
cd /data/gg/dirprm/
vi kafka.props
/*
gg.handlerlist=kafkahandler //handler類型
gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相關配置
gg.handler.kafkahandler.topicMappingTemplate=test_ogg //kafka的topic名稱,可先手動創建好,默認創建的話partition數只有1
gg.handler.kafkahandler.format=json //傳輸文件的格式,支持json,xml等
gg.handler.kafkahandler.format.includePrimaryKeys=true --包含主鍵
gg.handler.kafkahandler.mode=op //OGG for Big Data中傳輸模式,即op為一次SQL傳輸一次,tx為一次事務傳輸一次
gg.classpath=dirprm/:/data/kfdata/kafka/libs/*:/data/gg/:/data/gg/lib/*
*/
無備注版:
gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=test_ogg
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.format.includePrimaryKeys=true
gg.handler.kafkahandler.mode=op
gg.classpath=dirprm/:/data/kfdata/kafka/libs/*:/data/gg/:/data/gg/lib/*
vi custom_kafka_producer.properties
/*
bootstrap.servers=192.168.83.227:9092 //kafkabroker的地址
acks=1
compression.type=gzip //壓縮類型
reconnect.backoff.ms=1000 //重連延時
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000
*/
無備注版:
bootstrap.servers=192.168.83.227:9092
acks=1
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000
(10)添加trail文件到replicate進程
add replicat rekafka exttrail /data/gg/dirdat/lq,checkpointtable test_ogg.checkpoint
(11)開啟源端與目標端ogg
略
(12)測試
1、源端數據入庫
conn linq/Qwer!234
insert into test_ogg values(2,'go');
commit;
2、查看目標端kafka主題是否創建
kafka-topics.sh --list --zookeeper localhost:2181
3、進行消費測試
kafka-console-consumer.sh--bootstrap-server
192.168.83.227
:9092 --from-beginning --topictest_ogg
(用kafka端口9092,consumer的信息將會存放在kafka之中,推薦)
或:
kafka-console-consumer.sh --zookeeper192.168.83.227
:2181 --from-beginning --topictest_ogg
(用zookeeper端口2181,consumer的信息將會存放在zk之中)
測試結果如下(中文也是支持的):
[root@kafka3 data]# kafka-console-consumer.sh --bootstrap-server 192.168.83.227:9092 --from-beginning --topic test_ogg
{"table":"LINQ.TEST_OGG","op_type":"I","op_ts":"2020-03-06 17:12:03.648091","current_ts":"2020-03-06T17:12:10.737000","pos":"00000000000000001432","primary_keys":["ID"],"after":{"ID":2,"NAME":"go"}}
{"table":"LINQ.TEST_OGG","op_type":"I","op_ts":"2020-03-06 17:21:23.652647","current_ts":"2020-03-06T17:21:31.368000","pos":"00000000000000001569","primary_keys":["ID"],"after":{"ID":3,"NAME":"ok"}}
{"table":"LINQ.TEST_OGG","op_type":"U","op_ts":"2020-03-06 17:22:10.653057","current_ts":"2020-03-06T17:22:17.411000","pos":"00000000000000001701","primary_keys":["ID"],"before":{"ID":3,"NAME":"ok"},"after":{"ID":3,"NAME":"ok3"}}
{"table":"LINQ.TEST_OGG","op_type":"D","op_ts":"2020-03-06 17:22:58.653488","current_ts":"2020-03-06T17:23:05.454000","pos":"00000000000000001858","primary_keys":["ID"],"before":{"ID":1,"NAME":"begin"}}
{"table":"LINQ.TEST_OGG","op_type":"I","op_ts":"2020-03-07 19:21:07.411700","current_ts":"2020-03-07T19:21:12.465000","pos":"00000000000000001994","primary_keys":["ID"],"after":{"ID":1,"NAME":"begin"}}
{"table":"LINQ.TEST_OGG","op_type":"I","op_ts":"2020-03-10 15:23:20.354624","current_ts":"2020-03-10T15:23:27.371000","pos":"00000000000000002130","primary_keys":["ID"],"after":{"ID":4,"NAME":"linq"}}
{"table":"LINQ.TEST_OGG","op_type":"U","op_ts":"2020-03-10 15:30:19.357882","current_ts":"2020-03-10T15:30:25.697000","pos":"00000000000000002266","primary_keys":["ID"],"before":{"ID":4,"NAME":"linq"},"after":{"ID":4,"NAME":"林勤"}}