利用ogg實現oracle到kafka的增量數據實時同步
前言
https://dongkelun.com/2018/05/23/oggOracle2Kafka/
ogg即Oracle GoldenGate是Oracle的同步工具,本文講如何配置ogg以實現Oracle數據庫增量數據實時同步到kafka中,其中同步消息格式為json。
下面是我的源端和目標端的一些配置信息:
- | 版本 | OGG版本 | ip | 別名 |
---|---|---|---|---|
源端 | OracleRelease 11.2.0.1.0 | Oracle GoldenGate 11.2.1.0.3 for Oracle on Linux x86-64 | 192.168.44.128 | master |
目標端 | kafka_2.11-1.1.0 | Oracle GoldenGate for Big Data 12.3.1.1.1 on Linux x86-64 | 192.168.44.129 | slave1 |
1、下載
可在這里或舊版本查詢下載
注意:源端和目標端的文件不一樣,目標端需要下載Oracle GoldenGate for Big Data,源端需要下載Oracle GoldenGate for Oracle具體下載方法見最后的附錄截圖。
2、源端(Oracle)配置
注意:源端是安裝了oracle的機器,oracle環境變量之前都配置好了
2.1 解壓
先建立ogg目錄
1 |
mkdir -p /opt/ogg |
解壓后得到一個tar包,再解壓這個tar
1 |
tar xf fbo_ggs_Linux_x64_ora11g_64bit.tar -C /opt/ogg |
2.2 配置ogg環境變量
為了簡單方便起見,我在/etc/profile里配置的,建議在生產中配置oracle的環境變量文件/home/oracle/.bash_profile里配置,為了怕出問題,我把OGG_HOME等環境變量在/etc/profile配置了一份,不知道這是否是必須的。
1 |
vim /etc/profile |
1 |
export OGG_HOME=/opt/ogg |
使之生效
1 |
source /etc/profile |
測試一下ogg命令
1 |
ggsci |
如果命令成功即可進行下一步,不成功請檢查前面的步驟。
2.3 oracle打開歸檔模式
1 |
su - oracle |
執行下面的命令查看當前是否為歸檔模式
1 |
archive log list |
1 |
SQL> archive log list |
若為Disabled,手動打開即可
1 |
conn / as sysdba (以DBA身份連接數據庫) |
再執行一下
1 |
archive log list |
1 |
Database log mode Archive Mode |
可以看到為Enabled,則成功打開歸檔模式。
2.4 Oracle打開日志相關
OGG基於輔助日志等進行實時傳輸,故需要打開相關日志確保可獲取事務內容,通過下面的命令查看該狀態
1 |
select force_logging, supplemental_log_data_min from v$database; |
1 |
FORCE_ SUPPLEMENTAL_LOG |
若為NO,則需要通過命令修改
1 |
alter database force logging; |
再查看一下為YES即可
1 |
SQL> select force_logging, supplemental_log_data_min from v$database; |
2.5 oracle創建復制用戶
首先root用戶建立相關文件夾,並賦予權限
1 |
mkdir -p /u01/app/oracle/oggdata/orcl |
然后執行下面sql
1 |
SQL> create tablespace oggtbs datafile '/u01/app/oracle/oggdata/orcl/oggtbs01.dbf' size 1000M autoextend on; |
2.6 OGG初始化
1 |
ggsci |
1 |
ggsci |
2.7 Oracle創建測試表
創建一個用戶,在該用戶下新建測試表,用戶名、密碼、表名均為 test_ogg。
1 |
create user test_ogg identified by test_ogg default tablespace users; |
3 目標端(kafka)配置
1 |
mkdir -p /opt/ogg |
3.2 環境變量
1 |
vim /etc/profile |
1 |
export OGG_HOME=/opt/ogg |
1 |
source /etc/profile |
同樣測試一下ogg命令
1 |
ggsci |
3.3 初始化目錄
1 |
create subdirs |
4、OGG源端配置
4.1 配置OGG的全局變量
先切換到oracle用戶下
1 |
su oracle |
1 |
GGSCI (ambari.master.com) 1> dblogin userid ogg password ogg |
然后和用vim編輯一樣添加
1 |
oggschema ogg |
4.2 配置管理器mgr
1 |
GGSCI (ambari.master.com) 3> edit param mgr |
說明:PORT即mgr的默認監聽端口;DYNAMICPORTLIST動態端口列表,當指定的mgr端口不可用時,會在這個端口列表中選擇一個,最大指定范圍為256個;AUTORESTART重啟參數設置表示重啟所有EXTRACT進程,最多5次,每次間隔3分鍾;PURGEOLDEXTRACTS即TRAIL文件的定期清理
4.3 添加復制表
1 |
GGSCI (ambari.master.com) 4> add trandata test_ogg.test_ogg |
4.4 配置extract進程
1 |
GGSCI (ambari.master.com) 6> edit param extkafka |
說明:第一行指定extract進程名稱;dynamicresolution動態解析;SETENV設置環境變量,這里分別設置了Oracle數據庫以及字符集;userid ggs,password ggs即OGG連接Oracle數據庫的帳號密碼,這里使用2.5中特意創建的復制帳號;exttrail定義trail文件的保存位置以及文件名,注意這里文件名只能是2個字母,其余部分OGG會補齊;table即復制表的表名,支持*通配,必須以;結尾
添加extract進程:
1 |
GGSCI (ambari.master.com) 16> add extract extkafka,tranlog,begin now |
(注:若報錯
1 |
ERROR: Could not create checkpoint file /opt/ogg/dirchk/EXTKAFKA.cpe (error 2, No such file or directory). |
執行下面的命令再重新添加即可。
1 |
create subdirs |
)
添加trail文件的定義與extract進程綁定:
1 |
GGSCI (ambari.master.com) 17> add exttrail /opt/ogg/dirdat/to,extract extkafka |
4.5 配置pump進程
pump進程本質上來說也是一個extract,只不過他的作用僅僅是把trail文件傳遞到目標端,配置過程和extract進程類似,只是邏輯上稱之為pump進程
1 |
GGSCI (ambari.master.com) 18> edit param pukafka |
說明:第一行指定extract進程名稱;passthru即禁止OGG與Oracle交互,我們這里使用pump邏輯傳輸,故禁止即可;dynamicresolution動態解析;userid ogg,password ogg即OGG連接Oracle數據庫的帳號密碼rmthost和mgrhost即目標端(kafka)OGG的mgr服務的地址以及監聽端口;rmttrail即目標端trail文件存儲位置以及名稱。
分別將本地trail文件和目標端的trail文件綁定到extract進程:
1 |
GGSCI (ambari.master.com) 1> add extract pukafka,exttrailsource /opt/ogg/dirdat/to |
4.6 配置define文件
Oracle與MySQL,Hadoop集群(HDFS,Hive,kafka等)等之間數據傳輸可以定義為異構數據類型的傳輸,故需要定義表之間的關系映射,在OGG命令行執行:
1 |
GGSCI (ambari.master.com) 3> edit param test_ogg |
在OGG主目錄下執行(oracle用戶):
1 |
./defgen paramfile dirprm/test_ogg.prm |
將生成的/opt/ogg/dirdef/test_ogg.test_ogg發送的目標端ogg目錄下的dirdef里:
1 |
scp -r /opt/ogg/dirdef/test_ogg.test_ogg root@slave1:/opt/ogg/dirdef/ |
5、OGG目標端配置
5.1 開啟kafka服務
1 |
cd /opt/kafka_2.11-1.1.0/ |
5.2 配置管理器mgr
1 |
GGSCI (ambari.slave1.com) 1> edit param mgr |
5.3 配置checkpoint
checkpoint即復制可追溯的一個偏移量記錄,在全局配置里添加checkpoint表即可。
1 |
edit param ./GLOBALS |
5.4 配置replicate進程
1 |
GGSCI (ambari.slave1.com) 4> edit param rekafka |
說明:REPLICATE rekafka定義rep進程名稱;sourcedefs即在4.6中在源服務器上做的表映射文件;TARGETDB LIBFILE即定義kafka一些適配性的庫文件以及配置文件,配置文件位於OGG主目錄下的dirprm/kafka.props;REPORTCOUNT即復制任務的報告生成頻率;GROUPTRANSOPS為以事務傳輸時,事務合並的單位,減少IO操作;MAP即源端與目標端的映射關系
5.5 配置kafka.props
1 |
cd /opt/ogg/dirprm/ |
1 |
gg.handlerlist=kafkahandler //handler類型 |
1 |
vim custom_kafka_producer.properties |
1 |
bootstrap.servers=192.168.44.129:9092 //kafkabroker的地址 |
其中需要將后面的注釋去掉,ogg不識別注釋,如果不去掉會報錯
5.6 添加trail文件到replicate進程
1 |
GGSCI (ambari.slave1.com) 2> add replicat rekafka exttrail /opt/ogg/dirdat/to,checkpointtable test_ogg.checkpoint |
6、測試
6.1 啟動所有進程
在源端和目標端的OGG命令行下使用start [進程名]的形式啟動所有進程。
啟動順序按照源mgr——目標mgr——源extract——源pump——目標replicate來完成。
全部需要在ogg目錄下執行ggsci目錄進入ogg命令行。
源端依次是
1 |
start mgr |
目標端
1 |
start mgr |
可以通過info all 或者info [進程名] 查看狀態,所有的進程都為RUNNING才算成功
源端
1 |
GGSCI (ambari.master.com) 5> info all |
目標端
1 |
GGSCI (ambari.slave1.com) 3> info all |
6.2 異常解決
如果有不是RUNNING可通過查看日志的方法檢查解決問題,具體通過下面兩種方法
1 |
vim ggser.log |
或者ogg命令行,以rekafka進程為例
1 |
GGSCI (ambari.slave1.com) 2> view report rekafka |
列舉其中我遇到的一個問題:
異常信息
1 |
SEVERE: Unable to set property on handler 'kafkahandler' (oracle.goldengate.handler.kafka.KafkaHandler). Failed to set property: TopicName:="test_ogg" (class: oracle.goldengate.handler.kafka.KafkaHandler). |
具體原因是網上的教程是舊版的,設置topicName的屬性為:
1 |
gg.handler.kafkahandler.topicName=test_ogg |
新版的這樣設置
1 |
gg.handler.kafkahandler.topicMappingTemplate=test_ogg |
大家可根據自己的版本進行設置,附上stackoverflow原答案
1 |
I tried to move data from Oracle Database to Kafka using Golden gate adapter Version 12.3.0.1.0 |
6.3 測試同步更新效果
現在源端執行sql語句
1 |
conn test_ogg/test_ogg |
查看源端trail文件狀態
1 |
ls -l /opt/ogg/dirdat/to* |
查看目標端trail文件狀態
1 |
ls -l /opt/ogg/dirdat/to* |
查看kafka是否自動建立對應的主題
1 |
bin/kafka-topics.sh --list --zookeeper localhost:2181 |
在列表中顯示有test_ogg則表示沒問題
通過消費者看是否有同步消息
1 |
bin/kafka-console-consumer.sh --bootstrap-server 192.168.44.129:9092 --topic test_ogg --from-beginning |
顯然,Oracle的數據已准實時同步到Kafka,格式為json,其中op_type代表操作類型,這個可配置,我沒有配置則按默認的來,默認為
1 |
gg.handler.kafkahandler.format.insertOpKey = I |
before代表操作之前的數據,after代表操作后的數據,現在已經可以從kafka獲取到同步的json數據了,后面可以用SparkStreaming和Storm等解析然后存到hadoop等大數據平台里
6.4 SparkStreaming測試消費同步消息
具體代碼可參考Spark Streaming連接Kafka入門教程
下面附上消費成功的結果圖
7、更新:后續遇到的問題
在后面的使用過程中發現上面同步到kafka的json數據中少一些我們想要的一些,下面講一下我是如何解決的
首先建表:
1 |
CREATE TABLE "TCLOUD"."T_OGG2" |
為什么不用之前建的表,主要是之前的字段太少,不容易看出問題,現在主要是增加幾個字段,然后id,idd是聯合主鍵。
看一下按照之前的配置,同步到kafka的數據(截取部分數據)
1 |
{"table":"TCLOUD.T_OGG2","op_type":"I","op_ts":"2018-05-31 11:46:09.512672","current_ts":"2018-05-31T11:46:15.292000","pos":"00000000000000001903","after":{"ID":4,"TEXT_NAME":null,"AGE":0,"ADD":null,"IDD":"8"}} |
現在只有insert的數據是全的,update更新非主鍵字段before是沒有數據的,更新主鍵before只有主鍵的數據,delete只有before的主鍵字段,也就是update和delete的信息是不全的,且沒有主鍵信息(程序里是不能判斷哪一個是主鍵的),這樣對於程序自動解析同步數據是不利的(不同的需求可能不一樣),具體自己可以分析,就不啰嗦了,這里主要解決,有需要before和after全部信息和主鍵信息的需求。
7.1 添加before
在源端extract里添加下面幾行
1 |
GGSCI (ambari.master.com) 33> edit param extkafka |
重啟 extkafka
1 |
stop extkafka |
然后測試
1 |
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 14:48:55.630340","current_ts":"2018-05-31T14:49:01.709000","pos":"00000000000000003770","before":{"ID":1,"AGE":20,"IDD":"1"},"after":{"ID":1,"AGE":1,"IDD":"1"}} |
發現update之后before里有數據即可,但是現在before和after的數據都不全(只有部分字段)
網上有的說只添加GETUPDATES即可,但我測試了沒有成功,關於每個配置項什么含義可以參考https://blog.csdn.net/linucle/article/details/13505939(有些配置的含義里面也沒有給出)
參考:http://www.itpub.net/thread-2083473-1-1.html
7.2 添加主鍵
在kafka.props添加
1 |
gg.handler.kafkahandler.format.includePrimaryKeys=true |
重啟 rekafka
1 |
stop rekafka |
測試:
1 |
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 14:58:57.637035","current_ts":"2018-05-31T14:59:03.401000","pos":"00000000000000004510","primary_keys":["ID","IDD"],"before":{"ID":1,"AGE":1,"IDD":"1"},"after":{"ID":1,"AGE":20,"IDD":"1"}} |
發現有primary_keys,不錯~
參考:http://blog.51cto.com/lyzbg/2088409
7.3 補全全部字段
如果字段補全應該是Oracle沒有開啟全列補充日志
1 |
SQL> select supplemental_log_data_all from v$database; |
通過以下命令開啟
1 |
SQL> alter database add supplemental log data(all) columns; |
測試一下
1 |
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 15:27:45.655518","current_ts":"2018-05-31T15:27:52.891000","pos":"00000000000000006070","primary_keys":["ID","IDD"],"before":{"ID":1,"TEXT_NAME":null,"AGE":1,"ADD":null,"IDD":"1"},"after":{"ID":1,"TEXT_NAME":null,"AGE":20,"ADD":null,"IDD":"1"}} |
到現在json信息里的內容已經很全了,基本滿足了我想要的,附圖:
啟發我發現和Oracle全列補充日志沒有開啟有關的博客:https://blog.csdn.net/huoshuyinhua/article/details/79013387
開啟命令參考:https://blog.csdn.net/aaron8219/article/details/16825963
注:博客上講到,開啟全列補充日志會導致磁盤快速增長,LGWR進程繁忙,不建議使用。大家可根據自己的情況使用。
8、關於通配
如果想通配整個庫的話,只需要把上面的配置所有表名的改為,如test_ogg.test_ogg改為 test_ogg.,但是kafka的topic不能通配,所以需要把所有的表的數據放在一個topic即可,后面再用程序解析表名即可。
9、附錄
目標端在這里,下載下來后文件名123111_ggs_Adapters_Linux_x64.zip
源端在舊版本查詢下載,下載后文件名為V34339-01.zip