Oracle GoldenGate mysql To Kafka上車記錄


一.前言

  首先要學習一下ogg的所有進程,看着這張圖來學習

 
  • Manager進程是GoldenGate的控制進程,運行在源端和目標端上。它主要作用有以下幾個方面:啟動、監控、重啟Goldengate的其他進程,報告錯誤及事件,分配數據存儲空間,發布閥值報告等。在目標端和源端有且只有一個manager進程
  • Extract運行在數據庫源端,負責從源端數據表或者日志中捕獲數據。Extract的作用可以按照階段來划分為:
    •   初始時間裝載階段:在初始數據裝載階段,Extract進程直接從源端的數據表中抽取數據
    •   同步變化捕獲階段:初始數據同步完成以后,Extract進程負責捕獲源端數據的變化(DML和DDL)
  • Data Pump進程運行在數據庫源端,其作用是將源端產生的本地trail文件,把trail以數據塊的形式通過TCP/IP 協議發送到目標端,這通常也是推薦的方式。pump進程本質是extract進程的一種特殊形式,如果不使用trail文件,那么extract進程在抽 取完數據以后,直接投遞到目標端,生成遠程trail文件。
  • Collector進程與Data Pump進程對應 的叫Server Collector進程,這個進程不需要引起我的關注,因為在實際操作過程中,無需我們對其進行任何配置,所以對我們來說它是透明的。它運行在目標端,其 任務就是把Extract/Pump投遞過來的數據重新組裝成遠程ttrail文件。
  • Replicat進程,通常我們也把它叫做應用進程。運行在目標端,是數據傳遞的最后一站,負責讀取目標端trail文件中的內容,並將其解析為DML或 DDL語句,然后應用到目標數據庫中。

 

  關於OGG的Trail文件:

  • 為了更有效、更安全的把數據庫事務信息從源端投遞到目標端。GoldenGate引進trail文件的概念。前面提到extract抽取完數據以 后 Goldengate會將抽取的事務信息轉化為一種GoldenGate專有格式的文件。然后pump負責把源端的trail文件投遞到目標端,所以源、 目標兩端都會存在這種文件。
  • trail文件存在的目的旨在防止單點故障,將事務信息持久化,並且使用checkpoint機制來記錄其讀寫位置,如果故障發生,則數據可以根據checkpoint記錄的位置來重傳 。

二.源端配置

2.1 版本選擇

  因為源端是抽取mysql db,所以要選擇正確的ogg版本,這里選擇的是:

2.2 解壓安裝

  我安裝的目錄是/opt/ogg目錄,所以就解壓在這個目錄

tar xf ggs_Linux_x64_MySQL_64bit.tar -C /opt/ogg

  然后執行ogg命令

./ggsci

  最初進入后需要初始化,執行命令

GGSCI (miaojiaxing-VirtualBox) 1> create subdirs

Creating subdirectories under current directory /opt/ogg

Parameter file                 /opt/ogg/dirprm: created.
extract extkafka
Report file                    /opt/ogg/dirrpt: created.
Checkpoint file                /opt/ogg/dirchk: created.
Process status files           /opt/ogg/dirpcs: created.
SQL script files               /opt/ogg/dirsql: created.
Database definitions files     /opt/ogg/dirdef: created.
Extract data files             /opt/ogg/dirdat: created.
Temporary files                /opt/ogg/dirtmp: created.
Credential store files         /opt/ogg/dircrd: created.
Masterkey wallet files         /opt/ogg/dirwlt: created.
Dump files                     /opt/ogg/dirdmp: created.

2.3 配置管理進程

GGSCI (miaojiaxing-VirtualBox) 2> edit param mgr

port 7809
dynamicportlist 7840-7939
autorestart er *, retries 5, waitminutes 3
purgeoldextracts /home/goldengate/dirdat/*,usecheckpoints, minkeepdays 2
  • port mgr:進程的默認監聽端口
  • dynamicportlist:動態端口列表,指定mgr端口不可用的時候從這個列表中選擇一個,最大范圍256個
  • AUTORESTART:重啟參數設置表示重啟所有EXTRACT進程,最多5次,每次間隔3分鍾
  • PURGEOLDEXTRACTS即TRAIL文件的定期清理

  start mgr可以啟動該進程,info mgr可查看該進程狀態。之后配置的進程都可以用這個命令,名字換一下即可。

GGSCI (miaojiaxing-VirtualBox) 3> start mgr
Manager started.

GGSCI (miaojiaxing-VirtualBox) 30> info mgr
Manager is running (IP port miaojiaxing-VirtualBox.7809, Process ID 13044).

2.4 配置抽取進程

GGSCI (miaojiaxing-VirtualBox) 2> edit param extkafka

extract extkafka
sourcedb alcmydata@172.29.30.6:3306 userid ogg password ogg
exttrail /opt/ogg/dirdat/me
TranLogOptions AltLogDest /var/log/mysql/mysql-bin.index
table alcmydata.alc_asset_equity;

  添加extract進程

GGSCI (miaojiaxing-VirtualBox) 10> add extract extkafka,tranlog,begin now
EXTRACT added.

  添加trail文件與extract進程綁定

GGSCI (miaojiaxing-VirtualBox) 11> add exttrail /opt/ogg/dirdat/me,extract extkafka
EXTTRAIL added.

  這里需要注意的mysql的my.cnf log-bin配置,因為對接mysql的時候,需要依賴mysql的binlog日志

2.5 配置傳遞進程

GGSCI (miaojiaxing-VirtualBox) 2> edit param mcp1

extract mcp1
passthru
sourcedb alcmydata@172.29.30.6:3306 userid ogg password ogg
rmthost  172.29.30.79,mgrport 7809,compress
rmttrail  /opt/ogg/dirdat/mp

dynamicresolution
numfiles 3000
table alcmydata.alc_asset_equity;
  • passthru:使用pump邏輯傳輸
  • rmthost:目標端ogg的mgr服務地址
  • rmttrail:目標端trail文件存儲位置以及名稱

  分別添加本地trail文件和目標端trail文件綁定到mcp1進程

GGSCI (miaojiaxing-VirtualBox) 2> add extract mcp1,exttrailsource /opt/ogg/dirdat/me
EXTRACT added.


GGSCI (miaojiaxing-VirtualBox) 3> add rmttrail /opt/ogg/dirdat/mp,extract mcp1
RMTTRAIL added.

2.6 創建表的定義文件

  ogg對接mysql和oracle不同的地方是ogg獲取mysql表結構的方法,需要配置defgen文件

GGSCI (miaojiaxing-VirtualBox) 2> edit param defgen

defsfile ./dirdef/gmqdsjsjp.def
sourcedb alcmydata@172.29.30.6:3306 userid ogg password ogg
table alcmydata.alc_asset_equity;

  然后生成表定義文件傳給目標庫目錄

./defgen paramfile ./dirprm/defgen.prm
scp /opt/ogg/dirdef/gmqdsjsjp.def miaojiaxing2@172.29.30.79:/opt/ogg/dirdef/

三.目標端配置

3.1 配置管理進程

  和源端配置是一樣的

GGSCI (miaojiaxing-VirtualBox) 2> edit param mgr

port 7809
dynamicportlist 7840-7939
autorestart er *, retries 5, waitminutes 3
purgeoldextracts /home/goldengate/dirdat/*,usecheckpoints, minkeepdays 2

3.2 配置replicate進程

GGSCI (miaojiaxing-VirtualBox) 2> edit param rekafka

REPLICAT rekafka
sourcedefs /opt/ogg/dirdef/gmqdsjsjp.def
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE 
GROUPTRANSOPS 10000
MAP alcmydata.alc_asset_equity, TARGET alcmydata.alc_asset_equity;
  • TARGETDB LIBFILE:定義kafka的一些配置文件
  • REPORTCOUNT:復制任務的報告生成頻率
  • GROUPTRANSOPS:以事務傳輸
  • MAP 源端和目標端的映射關系

  添加進程

add replicat rekafka exttrail /opt/ogg/dirdat/mp,checkpointtable test_ogg.checkpoint

3.3 配置kafka.props

gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=test_ogg
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.mode=op
gg.classpath=/ggwork/kafka/2.2.1/libexec/libs/*

custom_kafka_producer.properties文件如下

bootstrap.servers=172.29.31.214:9092
acks=1
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000

3.4 發車

  進程啟動順序為:源mgr——目標mgr——源extract——源pump——目標replicate來完成。

  源端:

start mgr
start extkafka
start mcp1

  目標端:

start mgr
start rekafka

  然后update一條數據,你可以用最簡單的term來查看結果(當然可以用個java的demo):

kafka-console-producer --broker-list localhost:9092 --topic test_ogg

  結果:

{"table":"alcmydata.alc_asset_equity","op_type":"I","op_ts":"2019-08-08 15:12:56.973193","current_ts":"2019-08-08T17:39:28.281000","pos":"00000000000000488603","after":{"ID":1437,"CREATED_AT":"2019-08-07 17:19:04.000000","CREATED_BY":"c3lz","UPDATED_AT":"2019-08-07 17:19:04.000000","UPDATED_BY":"c3lz","HOLD_SHARE":66300.0000000000,"HOLD_AMOUNT":0,"FROZEN_SHARE":0,"FROZEN_AMOUNT":0,"ENTRUST_AMOUNT":0,"TRANSIT_AMOUNT":0,"INVEST_AMOUNT":66300.00,"ACHIEVED_AMOUNT":0,"PRODUCT_ID":114388613151072,"PROD_CATEGORY":"RDAx","USER_ID":2869248,"ACCOUNT_ID":492,"DAILY_INCOME":null,"INCOME_DATE":null,"TOTAL_INCOME":null,"VERSION":1,"REMARK":"MQ==","UNPAID_INCOME":0,"ADVANCE_ACHIEVED_AMOUNT":null}}
{"table":"alcmydata.alc_asset_equity","op_type":"U","op_ts":"2019-08-08 18:11:50.968788","current_ts":"2019-08-08T18:11:57.664000","pos":"00000000010000003443","before":{},"after":{"ID":1,"HOLD_SHARE":20000.0000000000}}

 

 

 

參考

https://blog.csdn.net/TXBSW/article/details/87915942

http://www.voidcn.com/article/p-cadcicbv-bm.html

https://dongkelun.com/2018/05/23/oggOracle2Kafka/

https://docs.oracle.com/goldengate/bd123010/gg-bd/index.html

https://www.jianshu.com/p/eefbb731cc67

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM