使用OGG同步Oracle12C數據至Kafka


使用OGG同步Oracle12C數據至Kafka

https://blog.csdn.net/andyguan01_2/article/details/86716388

一、環境

操作系統:CentOS6.9
軟件版本:Zookeeper3.4.13,Kafka2.1.0
集群架構(Zookeeper和Kafka):
Node1:10.200.4.116(oracle03)
Node2:10.200.4.117(oracle02)
Node3:10.100.125.156(db01)

二、OGG源端安裝配置

如果要了解OGG基本架構,可以查看:https://blog.csdn.net/andyguan01_2/article/details/87075927

我這里的源端Oracle數據庫安裝在Node1(10.200.4.116),按說應該是要有一台單獨的數據庫服務器,由於我這里資源有限,在Node1上面搭建了源端Oracle數據庫。

1、源端Oracle數據庫配置

1.1 開啟歸檔模式

檢查源端Oracle數據庫是否開啟歸檔模式,若沒有可按以下方法開啟:

Oracle12C開啟歸檔模式:https://blog.csdn.net/andyguan01_2/article/details/86715413

1.2 啟用ENABLE_GOLDENGATE_REPLICATION參數

參數ENABLE_GOLDENGATE_REPLICATION需要置為TRUE,否則后面啟動OGG的Extract進程的時候會報錯。

數據庫默認是沒有啟用此參數,可按此方法啟用:
https://blog.csdn.net/andyguan01_2/article/details/86748448

1.3 啟用最小日志補全supplemental_log_data_min

需要啟用最小日志補全supplemental_log_data_min,否則后面啟動OGG的Extract進程的時候會報錯。

數據庫默認是沒有啟用此功能,可按此方法啟用:https://blog.csdn.net/andyguan01_2/article/details/86743751

2、建立數據庫OGG用戶

創建OGG用戶表空間:

create tablespace oggdata datafile '/data/oracle/rcas/RCAS/datafile/oggdata01.dbf' size 10M autoextend on next 10M maxsize 1G;

    1

創建OGG用戶:

create user ogg identified by ogg default tablespace oggdata;  

    1

給OGG用戶授權:

grant connect,resource to ogg;
--如果不做ddl trigger,dba權限可以不給
grant dba to ogg;
 
GRANT CREATE SESSION TO ogg;
GRANT ALTER SESSION  TO ogg;
GRANT SELECT ANY DICTIONARY TO ogg;
GRANT SELECT ANY TABLE TO ogg;
--用戶配置表級追加日志
GRANT ALTER ANY TABLE TO ogg;
 
GRANT FLASHBACK ANY TABLE TO ogg;
GRANT EXECUTE on DBMS_FLASHBACK TO ogg;
GRANT EXECUTE ON utl_file TO ogg;
grant execute on sys.dbms_lob to ogg;

execute DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE('OGG');
--以下語句是在oracle 11g之上版本用的,10g版本不需要執行
execute DBMS_GOLDENGATE_AUTH.GRANT_ADMIN_PRIVILEGE(Grantee=> 'OGG',privilege_type=> 'CAPTURE',grant_select_privileges => TRUE,do_grants=> TRUE);

  
3、在源端安裝OGG

安裝方法見:

在CentOS6.9安裝OGG18.1.0.0 for Oracle:https://blog.csdn.net/andyguan01_2/article/details/86719382

4、在源端配置OGG

4.1 在oracle用戶打開ggsci命令:

cd $GG_HOME
./ggsci

4.2 創建OGG相關子目錄
create subdirs


4.3 配置OGG Manager

編輯mgr參數文件:

edit params mgr

輸入以下內容:

PORT 7809
DYNAMICPORTLIST 7810-7860
AUTOSTART EXTRACT *
AUTORESTART EXTRACT *, RETRIES 5, WAITMINUTES 3, RESETMINUTES 60
PURGEOLDEXTRACTS ./dirdat/*, usecheckpoints, minkeepdays 3
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45

    MANAGER進程參數配置說明:

    PORT:指定服務監聽端口,默認為7809。

    DYNAMICPORTLIST:動態端口,可以制定最大256個可用端口的動態列表,當指定的端口不可用時,管理進程將會從列表中選擇一個可用的端口,源端和目標端的Collector、Replicat、GGSCI進程通信也會使用這些端口。

    COMMENT:注釋行,也可以用–來代替。

    AUTOSTART:指定在管理進程啟動時自動啟動哪些進程。

    AUTORESTART:自動重啟參數設置。本處設置表示每3分鍾嘗試重新啟動所有EXTRACT進程,共嘗試5次;

    PURGEOLDEXTRACTS:定期清理trail文件設置。本處設置表示對於超過3天的trail文件進行刪除。

    LAGREPORTHOURS、LAGINFOMINUTES、LAGCRITICALMINUTES:定義數據延遲的預警機制。本處設置表示MGR進程每隔1小時檢查EXTRACT的延遲情況,如果超過了30分鍾就把延遲作為信息記錄到錯誤日志中,如果延遲超過了45分鍾,則把它作為警告寫到錯誤日志中。

啟動OGG Manager:

start mgr

4.4 創建Extract抽取進程

編輯Extract參數文件:

edit params ext_1
輸入以下內容:

EXTRACT ext_1
Setenv (NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
USERID ogg@rcas, PASSWORD ogg
GETTRUNCATES
DISCARDFILE /data/ogg_app/dirrpt/ext_1.dsc, APPEND, MEGABYTES 1024
DBOPTIONS  ALLOWUNUSEDCOLUMN
REPORTCOUNT EVERY 1 MINUTES, RATE
WARNLONGTRANS 2h,CHECKINTERVAL 5m
FETCHOPTIONS NOUSESNAPSHOT
--TRANLOGOPTIONS  CONVERTUCS2CLOBS
EXTTRAIL  /data/ogg_app/dirdat/te
GETUPDATEBEFORES
NOCOMPRESSUPDATES
NOCOMPRESSDELETES
DYNAMICRESOLUTION
table ogg.test_1;

    EXTRACT進程參數配置說明:

    SETENV:配置系統環境變量。

    USERID/PASSWORD: 指定OGG連接數據庫的用戶名和密碼,連接的是源端的數據庫。例如:userid ogg@source, password gg。source是連接源庫的tnsname。

    COMMENT:注釋行,也可以用–來代替。

    TABLE:定義需復制的表,后面需以;結尾

    TABLEEXCLUDE:定義需要排除的表,如果在TABLE參數中使用了通配符,可以使用該參數指定排除掉的表。

    GETUPDATEAFTERS|IGNOREUPDATEAFTERS:是否在隊列中寫入后影像,缺省復制。

    GETUPDATEBEFORES| IGNOREUPDATEBEFORES:是否在隊列中寫入前影像,缺省不復制。

    GETUPDATES|IGNOREUPDATES:是否復制UPDATE操作,缺省復制。

    GETDELETES|IGNOREDELETES:是否復制DELETE操作,缺省復制。

    GETINSERTS|IGNOREINSERTS:是否復制INSERT操作,缺省復制。

    GETTRUNCATES|IGNORETRUNDATES:是否復制TRUNCATE操作,缺省不復制。

    RMTHOST:指定目標系統及其Goldengate Manager進程的端口號,還用於定義是否使用壓縮進行傳輸。

    RMTTRAIL:指定寫入到目標端的哪個隊列。

    EXTTRAIL:指定寫入到本地的哪個隊列。

    SQLEXEC:在extract進程運行時首先運行一個SQL語句。

    PASSTHRU:禁止extract進程與數據庫交互,適用於Data Pump傳輸進程。

    REPORT:定義自動定時報告。

    STATOPTIONS:定義每次使用stat時統計數字是否需要重置。

    REPORTCOUNT:報告已經處理的記錄條數統計數字。

    TLTRACE:打開對於數據庫日志的跟蹤日志。

    DISCARDFILE:定義discardfile文件位置,如果處理中有記錄出錯會寫入到此文件中。

    DBOPTIONS:指定對於某種特定數據庫所需要的特殊參數。此處ALLOWUNUSEDCOLUMN參數表示,當抽取進程遇到一個沒有使用的字段時只生成一個警告,進程會繼續執行而不會被異常終止(abend)。

    TRANLOGOPTIONS:指定在解析數據庫日志時所需要的特殊參數。本處配置參數CONVERTUCS2CLOBS只用在extract端UTF字符類型,並且OGG11.1.1版本之前處理CLOB才需要。

    WARNLONGTRANS:指定對於超過一定時間的長交易可以在gsserr.log里面寫入警告信息,本處配置為每隔5分鍾檢查一次長交易,對於超過2小時的進行警告。

    DYNAMICRESOLUTION:有時候開啟OGG進程的時候較慢,可能是因為需要同步的表太多,OGG在開啟進程之前會將需要同步的表建立一個記錄並且存入到磁盤中,這樣就需要耗費大量的時間。使用該參數來解決此問題。

    FETCHOPTIONS:參數NOUSESNAPSHOT表示不會從閃回日志中獲取數據。

添加Extract進程:

add extract ext_1, TRANLOG, BEGIN NOW


定義trail文件:

GGSCI> add exttrail /data/ogg_app/dirdat/te, EXTRACT ext_1, MEGABYTES 200

4.5 創建Pump傳輸進程

抽取進程Extract和傳輸進程Pump其實都是Extract進程,也可以配置在一個進程完成這兩個功能,但是當網絡傳輸有問題時,這樣抽取也就不能繼續運行了,所以推薦分開配置為兩個進程。

編輯Pump參數文件:

edit param pump_1

輸入以下內容:

EXTRACT pump_1
--PASSTHRU
RMTHOST 10.100.125.156, MGRPORT 7809
RMTTRAIL /data/ogg_app/dirdat/te
DYNAMICRESOLUTION
TABLE ogg.test_1;

    PUMP進程參數配置說明:

    PASSTHRU:如果在配置OGG 的時候既沒有過濾行也沒有選擇列,並且源和目標數據結構都是一模一樣,那么可以指定PASSTHRU參數。使用PASSTHRU參數可以使OGG繞過檢測表定義數據文件從而提高性能。

    RMTHOST,MGRPORT:目標端主機IP,管理進程端口號。

    RMTTRAIL:目標端主機保存隊列文件的目錄。

添加Pump進程:

ADD EXTRACT pump_1, EXTTRAILSOURCE /data/ogg_app/dirdat/te


定義pump trail文件:

GGSCI> ADD RMTTRAIL /data/ogg_app/dirdat/te, EXTRACT pump_1, MEGABYTES 200


啟動Extract和Pump進程:

start extract ext_1
start extract pump_1

查看進程狀態:

 info all

三、OGG目標端安裝配置

我這里的OGG目標端安裝在Node3(10.100.125.156),按說應該是要有一台單獨的接口服務器,由於我這里資源有限,在Node3上面搭建了目標端OGG。

1、在目標端安裝OGG

可按以下方法在目標端安裝OGG For BigData:

https://www.cnblogs.com/zzpblogs/p/13049698.html

2、在目標端配置OGG

2.1 創建相關子目錄
./ggsci
create subdirs

 

 


2.2 復制example

cp $OGG_HOME/AdapterExamples/big-data/kafka/* $OGG_HOME/dirprm/
ll $OGG_HOME/dirprm/

在這里插入圖片描述
2.3 配置manager

啟動GGSCI后,編輯mgr參數:

edit params mgr

在這里插入圖片描述
輸入以下內容:

PORT 7809
DYNAMICPORTLIST 7810-7860
AUTORESTART REPLICAT *, RETRIES 5, WAITMINUTES 3, RESETMINUTES 60
PURGEOLDEXTRACTS /data/ogg_app/dirdat/*, usecheckpoints, minkeepdays 1
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45

2.4 配置kafka.props

vi /data/ogg_app/dirprm/kafka.props

    1

配置以下內容:

gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
#The following resolves the topic name using the short table name
#gg.handler.kafkahandler.topicMappingTemplate=${tableName}
gg.handler.kafkahandler.topicMappingTemplate=ogg
#The following selects the message key using the concatenated primary keys
#gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}
#gg.handler.kafkahandler.format=avro_op
gg.handler.kafkahandler.format=json
#gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
gg.handler.kafkahandler.SchemaTopicName=myogg
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=op

goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE

gg.log=log4j
gg.log.level=INFO

gg.report.time=30sec

#Sample gg.classpath for Apache Kafka
gg.classpath=dirprm/:/u01/app/kafka_2.12-2.1.0/libs/*
#Sample gg.classpath for HDP
#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*

javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

 

    配置說明:

    gg.handler.kafkahandler.topicMappingTemplate:kafka topic名稱的映射,指定topic名稱,也可以通過占位符的方式,例如${tableName},每一張表對應一個topic。

    gg.handler.kafkahandler.format:傳輸文件的格式,支持json,xml等。

    gg.handler.kafkahandler.mode:傳輸模式,op為一次SQL傳輸一次,tx為一次事務傳輸一次。

    gg.classpath:須指定相應的lib路徑。

2.5 配置custom_kafka_producer.properties

vi /data/ogg_app/dirprm/custom_kafka_producer.properties

配置以下內容:

bootstrap.servers=10.200.4.117:9092,10.200.4.116:9092,10.100.125.156:9092
acks=1
reconnect.backoff.ms=1000

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size=16384
linger.ms=0

    1
    2
    3
    4
    5
    6
    7
    8
    9

    配置說明:

    bootstrap.servers:用於建立與kafka集群的連接,這個list僅僅影響用於初始化的hosts,來發現全部的servers。格式:host1:port1,host2:port2,…,數量盡量不止一個,以防其中一個down了。

    acks:Server完成 producer request 前需要確認的數量。
    acks=0時,producer不會等待確認,直接添加到socket等待發送;
    acks=1時,等待leader寫到local log就行;
    acks=all或acks=-1時,等待isr中所有副本確認
    (注意:確認都是 broker 接收到消息放入內存就直接返回確認,不是需要等待數據寫入磁盤后才返回確認,這也是kafka快的原因)
    batch.size:Producer可以將發往同一個Partition的數據做成一個Produce Request發送請求,即Batch批處理,以減少請求次數,該值即為每次批處理的大小。另外,每個Request請求包含多個Batch,每個Batch對應一個Partition,且一個Request發送的目的Broker均為這些partition的leader副本。若將該值設為0,則不會進行批處理。

    reconnect.backoff.ms:連接失敗時,當我們重新連接時的等待時間。

    value.serializer:value序列化方式,類型為class,需實現Serializer interface。

    key.serializer:key 序列化方式,類型為class,需實現Serializer interface。

    linger.ms:Producer默認會把兩次發送時間間隔內收集到的所有Requests進行一次聚合然后再發送,以此提高吞吐量,而linger.ms則更進一步,這個參數為每次發送增加一些delay,以此來聚合更多的Message。
    官網解釋翻譯:producer會將request傳輸之間到達的所有records聚合到一個批請求。通常這個值發生在欠負載情況下,record到達速度快於發送。但是在某些場景下,client即使在正常負載下也期望減少請求數量。這個設置就是如此,通過人工添加少量時延,而不是立馬發送一個record,producer會等待所給的時延,以讓其他records發送出去,這樣就會被聚合在一起。這個類似於TCP的Nagle算法。該設置給了batch的時延上限:當我們獲得一個partition的batch.size大小的records,就會立即發送出去,而不管該設置;但是如果對於這個partition沒有累積到足夠的record,會linger指定的時間等待更多的records出現。該設置的默認值為0(無時延)。例如,設置linger.ms=5,會減少request發送的數量,但是在無負載下會增加5ms的發送時延。

2.6 定義表結構傳遞

    GoldenGate 提供了一個名為 DEFGEN 的專用工具,用於生成數據定義,當源表和目標表中的定義不同時(例如源數據庫為Oracle,目標數據庫為SQL Server),Oracle GoldenGate 進程將引用該專用工具。在運行 DEFGEN 之前,需要為其創建一個參數文件,指定該工具應檢查哪些表以及在檢查表之后存放類型定義文件的位置。

在源端執行:

ggsci
edit param defgen

    1
    2

在這里插入圖片描述
輸入以下內容:

DEFSFILE /data/ogg_app/dirdef/source.def, PURGE
USERID ogg@rcas, PASSWORD ogg
TABLE ogg.test_1;

    1
    2
    3

在源端執行defgen命令:

defgen paramfile /data/ogg_app/dirprm/defgen.prm

    1

在這里插入圖片描述
復制源端的/data/ogg_app/dirdef/source.def文件到目標端的/data/ogg_app/dirdef目錄下。在源端oracle用戶執行:

--scp /data/ogg_app/dirprm/defgen.prm oracle@10.100.125.156:/data/ogg_app/dirdef
scp /data/ogg_app/dirdef/source.def oracle@10.100.125.156:/data/ogg_app/dirdef

    1
    2

2.7 定義Replication進程

2.7.1 定義參數文件

ggsci
edit params rep_1

    1
    2

在這里插入圖片描述
輸入以下內容:

REPLICAT rep_1
TARGETDB LIBFILE libggjava.so SET property=/data/ogg_app/dirprm/kafka.props
SOURCEDEFS /data/ogg_app/dirdef/source.def
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP ogg.*, TARGET ogg.*;

    1
    2
    3
    4
    5
    6

2.7.2 指定Trail文件

add replicat rep_1, exttrail /data/ogg_app/dirdat/te

    1

在這里插入圖片描述
2.7.3 啟動Replicat進程

先啟動Manager進程:

start mgr
info all

    1
    2

在這里插入圖片描述
再啟動Replicat進程:

start replicat rep_1

在這里插入圖片描述
查看進程狀態:

info all


在這里插入圖片描述
四、測試驗證

1、啟動Kafka ConsumerConsole

在任一節點啟動:

kafka-console-consumer.sh --bootstrap-server  10.200.4.116:9092,10.200.4.117:9092,10.100.125.156:9092 --topic ogg

    1

2、在源端操作表數據

對表數據進行insert,update,delete。執行以下SQL:

insert into ogg.test_1 values (123);
commit;
update ogg.test_1 set c1 = 2 where c1 = 1;
commit;
delete from ogg.test_1 where c1 = 2;
commit;

3、查看Kafka ConsumerConsole是否接收到數據

發現ConsumerConsole對應有3行輸出,顯示如下:


另外,我測試了對數據表進行truncate,發現ConsumerConsole沒有輸出。因為OGG默認是不捕獲truncate操作的,如要捕獲,需在Extract和Replicat進程都添加GETTRUNCATES參數。(這種方法我沒有測試,有興趣的朋友可以試試看)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM