使用ogg實現oracle到kafka的增量數據實時同步


使用ogg實現oracle到kafka的增量數據實時同步

彬彬
2022.04.07

一、OGG概述

OGG全稱為Oracle GoldenGate,是由Oracle官方提供的用於解決異構數據環境中數據復制的一個商業工具。相比於其它遷移工具OGG的優勢在於可以直接解析源端Oracle的redolog,因此能夠實現在不需要對原表結構做太多調整的前提下完成數據增量部分的同步。基於Oracle OGG,介紹一種將Oracle數據庫的數據實時同步到Kafka消息隊列的方法。
1、OGG邏輯架構

file

2、 OGG概念
 Manager進程:需要源端跟目標端同時運行,主要作用是監控管理其它進程,報告錯誤,分配及清理數據存儲空間,發布閾值報告等。

Extract進程:運行在數據庫源端,主要用於捕獲數據的變化,負責全量、增量數據的抽取。

Data Pump進程:運行在數據庫源端,屬於Extract進程的一個輔助進程,,從本地Trail文件中讀取數據,並通過網絡將數據發送到目標端OGG。

Collector進程:數據接收程序運行在目標端機器,用於接收Data Pump發送過來的Trail日志,並將數據寫入到本地Trail文件。

Replicat進程:數據復制(Replicat):數據復制運行在目標端機器,從Trail文件讀取數據變更,並將變更數據應用到目標端數據存儲系統。本案例中,數據復制將數據推送到kafka消息隊列。***

Trails文件:臨時存放在磁盤上的數據文件。
3、OGG檢查點
作為一個復制軟件,首要是考察是它的可靠性,確保事務的完整性,在復制的過程中,源端和目標端的一致性。在日常運維可能會發生各種故障:進程故障、trail文件故障、網絡故障、服務器故障等等。然后OGG各種故障的解決辦法:一是靠進程的自動重啟機制,二是靠checkpoint機制,保證在各種故障情況下不丟數據。

file

OGG檢查點:記錄進程的讀、寫的位置,在恢復時需要使用,保證事務的完整性。
OGG兩種存儲方式:
1)存放在dirchk下的文件中
2)存放在指定的checkpoint table
對比:

1)nodbcheckpoint:性能較高
2)checkpointtable:檢查點信息存儲在數據庫表中,和實際事務作為一個事務提交,可以從數據表中找到更多的信息
檢查點分為Startup檢查點信息、Recovery檢查點、Current檢查點
1)Startup檢查點信息:進程啟動時,會創建startup檢查點
2)Recovery檢查點:進程恢復時,需要從哪個點開始恢復
3)Current檢查點:進程當前(最近的)檢查點信息
查看指令:info ext1, showch
3.1 檢查點-extract進程

file

1、讀檢查點:讀到哪個日志文件及相對位移值
	1)有startup、recovery、current checkpoint
	2)一般是修改current checkpoint來調整日志文件讀的位置alter extract ext1, [thread n,] 		extseqno , extrba 0
2、寫檢查點:正在寫到的trail文件編號及相對位移值
	1)有current checkpoint
	2)修改寫檢查點:重啟進程或者etrollover
	alter extract ext1, etrollover
	(執行完成之后注意需要手工設置pmp進程的讀檢查點位置:
	info ext1, showch確認新的寫檢查點的trail文件編號為N,然后
	alter pmp1, extseqno N, extrba 0)
3.2 檢查點-pump進程

file

1、讀檢查點:讀到的trail文件的編號和具體字節位置
	1)有startup、current checkpoint
	2)通過alter pmp1, extseqno N, extrba 0來修改
2、寫檢查點:正在寫的remote trail文件編號及相對位移值
	1)有current checkpoint
	2)通過alter pmp1, etrollover來修改但是修改后同樣需要手工調整replicate進程讀的位置:alter 	rep1, extseqno N, extrba 0 (N為新的pmp1進程寫檢查點remote trial文件編號)
3.3 檢查點-replicat進程

file

1、讀檢查點:讀到的trail文件的編號和具體字節位置
	1)有startup、current checkpoint
	2)修改當前讀檢查點位置
	alter rep1, extseqno N, extrba 0
2、寫檢查點:無

二、OGG配置

1、環境信息
**組件 ** 版本 **IP地址 ** 描述
源端Oracle 11.2.0.4.0 192.168.152.101 源端Oracle數據庫
源端OGG 12.3.2.1.1 192.168.152.101 源端OGG,用於抽取源端Oracle數據變更,並將變更日志發送到目標端
目標端OGG 12.3.2.1.1 192.168.152.101 目標端OGG,接受源端發送的Oracle事務變更日志,並將變更推送到kafka消息隊列
目標端Kafka 2.11-0.11.0.0 192.168.152.101、102、103 消息隊列,接收目標端OGG推送過來的數據
2、源端OGG配置
源端OGG 管理進程(mgr)配置:
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

說明
PORT	即mgr的默認監聽端口;
DYNAMICPORTLIST	動態端口列表,當指定的mgr端口不可用時,會在這個端口列表中選擇一個,最大指定范圍為256個;
AUTORESTART	重啟參數設置表示重啟所有EXTRACT進程,最多5次,每次間隔3分鍾;
PURGEOLDEXTRACTS	即TRAIL文件的定期清理
源端OGG 抽取進程(extract)配置
extract extkafka
dynamicresolution
SETENV (ORACLE_SID = "orcl")
SETENV (NLS_LANG = "american_america.AL32UTF8")
userid ogg,password ogg
exttrail /opt/ogg/dirdat/to
table test_ogg.test_ogg;

說明
第一行指定extract進程名稱;
dynamicresolution	動態解析;
SETENV	設置環境變量,這里分別設置了Oracle數據庫以及字符集;
userid ogg,password ogg	即OGG連接Oracle數據庫的帳號密碼
exttrail	定義trail文件的保存位置以及文件名,注意這里文件名只能是2個字母,其余部分OGG會補齊;
table	即復制表的表名,支持*通配,必須以;結尾
源端OGG 傳輸進程(pump)配置
extract pukafka
passthru
dynamicresolution
userid ogg,password ogg
rmthost 192.168.152.131 mgrport 7909
rmttrail /home/gg/ogg/dirdat/go
table test_ogg.test_ogg;

說明
第一行指定extract進程名稱;
Passthru	即禁止OGG與Oracle交互,我們這里使用pump邏輯傳輸,故禁止即可;
Dynamicresolution	動態解析;
userid ogg,password ogg	即OGG連接Oracle數據庫的帳號密碼
rmthost和mgrhost	即目標端(kafka)OGG的mgr服務的地址以及監聽端口;
rmttrail	即目標端trail文件存儲位置以及名稱。
3、目標端OGG配置
目標端OGG管理進程(mgr)配置:
PORT 7909
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

說明
PORT	即mgr的默認監聽端口;
DYNAMICPORTLIST	動態端口列表,當指定的mgr端口不可用時,會在這個端口列表中選擇一個,最大指定范圍為256個;
AUTORESTART	重啟參數設置表示重啟所有EXTRACT進程,最多5次,每次間隔3分鍾;
PURGEOLDEXTRACTS	即TRAIL文件的定期清理
目標端OGG復制進程(replicat)配置:
REPLICAT rekafka
sourcedefs /home/gg/ogg/dirdef/test_ogg.test_ogg
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE 
GROUPTRANSOPS 10000
MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg;

說明
REPLICATE rekafka	定義rep進程名稱;
Sourcedefs	即在源服務器上做的表映射文件;
TARGETDB LIBFILE	即定義kafka一些適配性的庫文件以及配置文件,配置文件位於OGG主目錄下的dirprm/kafka.props;
REPORTCOUNT	即復制任務的報告生成頻率;
GROUPTRANSOPS	為以事務傳輸時,事務合並的單位,減少IO操作;
MAP	即源端與目標端的映射關系

三、數據測試

啟動所有進程
在源端和目標端的OGG命令行下使用start [進程名]的形式啟動所有進程。
啟動順序按照源mgr——目標mgr——源extract——源pump——目標replicate來完成。
全部需要在ogg目錄下執行ggsci目錄進入ogg命令行。

源端依次是
start mgr
start extkafka
start pukafka
目標端依次是
start mgr
start rekafka

可以通過info all 或者info [進程名] 查看狀態,所有的進程都為RUNNING才算成功
如果有不是RUNNING可通過查看日志的方法檢查解決問題
ogg命令行,以rekafka進程為例

view report rekafka
源端執行sql語句
insert into test_ogg(id,name) values('1','test');
commit;
update test_ogg set name=‘zhangsan ' where id='1';
commit;
delete test_ogg where id='1';
commit;

查看源端trail文件狀態
ls -l /opt/ogg/dirdat/to*

查看目標端trail文件狀態
ls -l /home/gg/ogg/dirdat/go*

通過消費者看是否有同步消息
bin/kafka-console-consumer.sh --bootstrap-server  bigdata02:9092 --topic  test_ogg

本文由博客一文多發平台 OpenWrite 發布!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM