基於OGG 實現Oracle到Kafka增量數據實時同步
背景
在大數據時代,存在大量基於數據的業務。數據需要在不同的系統之間流動、整合。通常,核心業務系統的數據存在OLTP數據庫系統中,其它業務系統需要獲取OLTP系統中的數據。傳統的數倉通過批量數據同步的方式,定期從OLTP系統中抽取數據。但是隨着業務需求的升級,批量同步無論從實時性,還是對在線OLTP系統的抽取壓力,都無法滿足要求。需要實時從OLTP系統中獲取數據變更,實時同步到下游業務系統。
本文基於Oracle OGG,介紹一種將Oracle數據庫的數據實時同步到Kafka消息隊列的方法。
Kafka是一種高效的消息隊列實現,通過訂閱kafka的消息隊列,下游系統可以實時獲取在線Oracle系統的數據變更情況,實現業務系統。
環境介紹
組件版本

整體架構圖

名詞解釋
1.OGG Manager
OGG Manager用於配置和管理其它OGG組件,配置數據抽取、數據推送、數據復制,啟動和停止相關組件,查看相關組件的運行情況。
2.數據抽取(Extract)
抽取源端數據庫的變更(DML, DDL)。數據抽取主要分如下幾種類型:
本地抽取
從本地數據庫捕獲增量變更數據,寫入到本地Trail文件
數據推送(Data Pump)
從本地Trail文件讀取數據,推送到目標端。
初始數據抽取
從數據庫表中導出全量數據,用於初次數據加載
3.數據推送(Data Pump)
Data Pump是一種特殊的數據抽取(Extract)類型,從本地Trail文件中讀取數據,並通過網絡將數據發送到目標端OGG
4.Trail文件
數據抽取從源端數據庫抓取到的事物變更信息會寫入到Trail文件。
5.數據接收(Collector)
數據接收程序運行在目標端機器,用於接收Data Pump發送過來的Trail日志,並將數據寫入到本地Trail文件。
6.數據復制(Replicat)
數據復制運行在目標端機器,從Trail文件讀取數據變更,並將變更數據應用到目標端數據存儲系統。本案例中,數據復制將數據推送到kafka消息隊列。
7.檢查點(Checkpoint)
檢查點用於記錄數據庫事物變更。
操作步驟
源端Oracle配置
1.檢查歸檔
使用OGG,需要在源端開啟歸檔日志
SQL> archive log list;
Database log mode Archive Mode
Automatic archival Enabled
Archive destination /u01/app/oracle/product/12.2.0/db_1/dbs/arch
Oldest online log sequence 2576
Next log sequence to archive 2577
Current log sequence 2577
2.檢查數據庫配置
SQL> select force_logging, supplemental_log_data_min from v$database;
FORCE_LOGG SUPPLEMENTAL_LOG_DATA_MI
---------- ------------------------
YES YES
如果沒有開啟輔助日志,需要開啟:
SQL> alter database force logging;
SQL> alter database add supplemental log data;
3.開啟goldengate復制參數
SQL> alter system set enable_goldengate_replication = true;
4.創建源端Oracle賬號
SQL> create tablespace tbs_ogg datafile '/oradata/dtstack/tbs_ogg.dbf' size 1024M autoextend on;
SQL> create user ggsadmin identified by oracle default tablespace tbs_ogg;
SQL> grant dba to ggsadmin;
5.創建測試表
SQL> create table baiyang.ora_to_kfk as select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id < 500;
SQL> alter table baiyang.ora_to_kfk add constraint pk_kfk_obj primary key(object_id);
SQL> select count(*) from baiyang.ora_to_kfk;
COUNT(*)
----------
436
源端OGG配置
1.檢查源端OGG環境
cd /oradata/oggorcl/ogg
./ggsci
GGSCI (dtproxy) 1> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER STOPPED
2.創建相關文件夾
GGSCI (dtproxy) 2> create subdirs
Creating subdirectories under current directory /oradata/oggorcl/ogg
Parameter file /oradata/oggorcl/ogg/dirprm: created.
Report file /oradata/oggorcl/ogg/dirrpt: created.
Checkpoint file /oradata/oggorcl/ogg/dirchk: created.
Process status files /oradata/oggorcl/ogg/dirpcs: created.
SQL script files /oradata/oggorcl/ogg/dirsql: created.
Database definitions files /oradata/oggorcl/ogg/dirdef: created.
Extract data files /oradata/oggorcl/ogg/dirdat: created.
Temporary files /oradata/oggorcl/ogg/dirtmp: created.
Credential store files /oradata/oggorcl/ogg/dircrd: created.
Masterkey wallet files /oradata/oggorcl/ogg/dirwlt: created.
Dump files /oradata/oggorcl/ogg/dirdmp: created.
3.配置源端Manager
GGSCI (dtproxy) 4> dblogin userid ggsadmin password oracle
Successfully logged into database.
GGSCI (dtproxy as ggsadmin@dtstack) 5> edit param ./globals
-- 添加
oggschema ggsadmin
GGSCI (dtproxy as ggsadmin@dtstack) 6> edit param mgr
-- 添加
PORT 7810 --默認監聽端口
DYNAMICPORTLIST 7811-7820 --動態端口列表
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 --進程有問題,每3分鍾重啟一次,一共重啟五次
PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 7 --*/
LAGREPORTHOURS 1 --每隔一小時檢查一次傳輸延遲情況
LAGINFOMINUTES 30 --傳輸延時超過30分鍾將寫入錯誤日志
LAGCRITICALMINUTES 45 --傳輸延時超過45分鍾將寫入警告日志
PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7 --定期清理trail文件
ACCESSRULE, PROG *, IPADDR 172.*.*.*, ALLOW --設定172網段可連接
-- 添加同步的表
GGSCI (dtproxy as ggsadmin@dtstack) 9> add trandata baiyang.ora_to_kfk
-- Oracle Goldengate marked following column as key columns on table BAIYANG.ORA_TO_KFK: OBJECT_ID.
GGSCI (dtproxy as ggsadmin@dtstack) 10> info trandata baiyang.ora_to_kfk
-- Prepared CSN for table BAIYANG.ORA_TO_KFK: 192881239
目標端OGG配置
1.目標端檢查環境
GGSCI (172-16-101-242) 1> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER STOPPED
2.創建目錄
GGSCI (172-16-101-242) 2> create subdirs
Creating subdirectories under current directory /app/ogg
Parameter file /app/ogg/dirprm: created.
Report file /app/ogg/dirrpt: created.
Checkpoint file /app/ogg/dirchk: created.
Process status files /app/ogg/dirpcs: created.
SQL script files /app/ogg/dirsql: created.
Database definitions files /app/ogg/dirdef: created.
Extract data files /app/ogg/dirdat: created.
Temporary files /app/ogg/dirtmp: created.
Credential store files /app/ogg/dircrd: created.
Masterkey wallet files /app/ogg/dirwlt: created.
Dump files /app/ogg/dirdmp: created.
3.目標端Manager配置
GGSCI (172-16-101-242) 3> edit params mgr
-- 添加
PORT 7810
DYNAMICPORTLIST 7811-7820
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
GGSCI (172-16-101-242) 4> edit param ./GLOBALS
CHECKPOINTTABLE ggsadmin.checkpoint
全量數據同步
1.配置源端數據初始化
-- 配置源端初始化進程
GGSCI (dtproxy as ggsadmin@dtstack) 15> add extract initkfk,sourceistable
-- 配置源端初始化參數
GGSCI (dtproxy as ggsadmin@dtstack) 16> edit params initkfk
-- 添加
EXTRACT initkfk
SETENV (NLS_LANG=AMERICAN_AMERICA.AL32UTF8)
USERID ggsadmin,PASSWORD oracle
RMTHOST 172.16.101.242, MGRPORT 7810
RMTFILE ./dirdat/ekfk,maxfiles 999, megabytes 500
table baiyang.ora_to_kfk;
2.源端生成表結構define文件
GGSCI (dtproxy as ggsadmin@dtstack) 17> edit param define_kfk
-- 添加
defsfile /oradata/oggorcl/ogg/dirdef/define_kfk.txt
userid ggsadmin,password oracle
table baiyang.ora_to_kfk;
-- 執行
$./defgen paramfile dirprm/define_kfk.prm
-- Definitions generated for 1 table in /oradata/oggorcl/ogg/dirdef/define_kfk.txt
-- 將此文件傳輸到目標段dirdef文件夾
scp /oradata/oggorcl/ogg/dirdef/define_kfk.txt 172.16.101.242:/app/ogg/dirdef/define_kfk.txt
3.配置目標端數據初始化進程
-- 配置目標端初始化進程
GGSCI (172-16-101-242) 3> ADD replicat initkfk,specialrun
GGSCI (172-16-101-242) 6> edit params initkfk
-- 添加
SPECIALRUN
end runtime
setenv(NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
targetdb libfile libggjava.so set property=./dirprm/kafka.props
SOURCEDEFS ./dirdef/define_kfk.txt
EXTFILE ./dirdat/ekfk000000
reportcount every 1 minutes, rate
grouptransops 10000
map baiyang.ora_to_kfk,target baiyang.ora_to_kfk;
4.配置kafka相關參數
-- 配置kafka 相關參數
vi ./dirprm/kafka.props
-- 添加
gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=test_ogg
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.mode=op
gg.classpath=dirprm/:/data/kafka_2.12-2.2.0/libs/*:/app/ogg/:/app/ogg/lib/* --*/
vi custom_kafka_producer.properties
-- 添加
bootstrap.servers=172.16.101.242:9092
acks=1
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000
5.源端開啟全量數據抽取
-- 源端
GGSCI (dtproxy) 20> start mgr
GGSCI (dtproxy) 21> start initkfk
6.目標端全量數據應用
GGSCI (172-16-101-242) 13> start mgr
./replicat paramfile ./dirprm/initkfk.prm reportfile ./dirrpt/init01.rpt -p INITIALDATALOAD
7.kafka數據驗證
使用kafka客戶端工具查看topic的數據
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_ogg --from-beginning
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:55.946000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"C_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":2,"DATA_OBJECT_ID":2,"OBJECT_TYPE":"CLUSTER"}}
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:56.289000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"I_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":3,"DATA_OBJECT_ID":3,"OBJECT_TYPE":"INDEX"}}
全量數據已經同步到目標kafka topic
增量數據同步
1.源端抽取進程配置
GGSCI (dtproxy) 9> edit param extkfk
-- 添加
dynamicresolution
SETENV (ORACLE_SID = "dtstack")
SETENV (NLS_LANG = "american_america.AL32UTF8")
userid ggsadmin,password oracle
exttrail ./dirdat/to
table baiyang.ora_to_kfk;
-- 添加extract進程
GGSCI (dtproxy) 10> add extract extkfk,tranlog,begin now
-- 添加trail文件的定義與extract進程綁定
GGSCI (dtproxy) 11> add exttrail ./dirdat/to,extract extkfk
2.源端數據推送進程配置
-- 配置源端推送進程
GGSCI (dtproxy) 12> edit param pupkfk
-- 添加
extract pupkfk
passthru
dynamicresolution
userid ggsadmin,password oracle
rmthost 172.16.101.242 mgrport 7810
rmttrail ./dirdat/to
table baiyang.ora_to_kfk;
-- 添加extract進程
GGSCI (dtproxy) 13> add extract pupkfk,exttrailsource /oradata/oggorcl/ogg/dirdat/to
-- 添加trail文件的定義與extract進程綁定
GGSCI (dtproxy) 14> add rmttrail ./dirdat/to,extract pupkfk
3.配置目標端恢復進程
-- 配置目標端恢復進程
edit param repkfk
-- 添加
REPLICAT repkfk
SOURCEDEFS ./dirdef/define_kfk.txt
targetdb libfile libggjava.so set property=./dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP baiyang.ora_to_kfk, TARGET baiyang.ora_to_kfk;
--添加trail文件到replicate進程
add replicat repkfk exttrail ./dirdat/to,checkpointtable ggsadmin.checkpoint
4.源端開啟實時數據抓取
./ggsci
GGSCI (dtproxy) 5> start extkfk
Sending START request to MANAGER ...
EXTRACT EXTKFK starting
GGSCI (dtproxy) 6> start pupkfk
Sending START request to MANAGER ...
EXTRACT PUPKFK starting
GGSCI (dtproxy) 7> status all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
EXTRACT RUNNING EXTKFK 00:00:00 00:00:10
EXTRACT RUNNING PUPKFK 00:00:00 00:00:00
5.目標端開啟實時數據同步
./ggsci
GGSCI (172-16-101-242) 7> start replicat repkfk
Sending START request to MANAGER ...
REPLICAT REPKFK starting
GGSCI (172-16-101-242) 8> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
REPLICAT RUNNING REPKFK 00:00:00 00:00:00
6.測試增量數據同步
Oracle插入增量數據
SQL> insert into baiyang.ora_to_kfk select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id >500 and object_id < 1000;
SQL> commit;
SQL> select count(*) from baiyang.ora_to_kfk;
COUNT(*)
----------
905
查看Kafka消息隊列消費數據
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042000","pos":"00000000000000075298","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS","SUBOBJECT_NAME":null,"OBJECT_ID":998,"DATA_OBJECT_ID":998,"OBJECT_TYPE":"TABLE"}}
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042001","pos":"00000000000000075459","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS_I","SUBOBJECT_NAME":null,"OBJECT_ID":999,"DATA_OBJECT_ID":999,"OBJECT_TYPE":"INDEX"}}
源端Oracle刪除數據
SQL> delete from baiyang.ora_to_kfk ;
906 rows deleted.
SQL> commit;
查看kafka消息隊列消費數據
{"table":"BAIYANG.ORA_TO_KFK","op_type":"D","op_ts":"2019-11-11 21:13:11.166184","current_ts":"2019-11-11T21:13:17.449007","pos":"00000000000000216645","before":{"OWNER":"x1","OBJECT_NAME":"SSSSS","SUBOBJECT_NAME":"z1","OBJECT_ID":111000,"DATA_OBJECT_ID":2000,"OBJECT_TYPE":"x1"}}
源端插入數據
SQL> insert into baiyang.ora_to_kfk values('漢字', 'y1', 'z1', 111000,2000,'x1');
1 row created.
SQL> commit;
查看kafka消息隊列消費數據
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:14:21.167454","current_ts":"2019-11-11T21:14:26.497000","pos":"00000000000000216794","after":{"OWNER":"漢字","OBJECT_NAME":"y1","SUBOBJECT_NAME":"z1","OBJECT_ID":111000,"DATA_OBJECT_ID":2000,"OBJECT_TYPE":"x1"}}
總結
使用OGG可以方便地將Oracle的數據變更情況實時同步到Kafka消息隊列。下游業務系統通過訂閱kafka的消息隊列,能方便地實現各類實時數據的應用。