GoldenGate實時投遞數據到大數據平台(5) - Kafka


Oracle GoldenGate是Oracle公司的實時數據復制軟件,支持關系型數據庫和多種大數據平台。從GoldenGate 12.2開始,GoldenGate支持直接投遞數據到Kafka等平台,而不用通過Java二次開發。在數據復制過程中,GoldenGate充當Kafka Producer的角色,從關系 型數據庫解析增量數據,再實時往Kafka平台寫入。當前最新的GoldenGate版本是12.3.1.1.1。

從下圖可以看出,GoldenGate不僅支持Kafka投遞,也支持其它大數據平台的投遞。

clip_image002

本文主要講述如何將增量數據投遞到Kafka平台。

環境准備

介質准備

GoldenGate介質下載

http://www.oracle.com/technetwork/middleware/goldengate/downloads/index.html

kafka的介質可以從kafka,apache.org官網下載。

軟件安裝

基於 GoldenGate的復制鏈路中,一般分為源端和目標端,在GoldenGate for kafka場景中,源端一般是關系型數據庫,目標端包括GoldenGate for kafka的節點,以及kafka集群。

Kafka的運行需要先安裝Zookeeper軟件。zookeeper和Kafka的安裝步驟可在網絡上搜索,不在此贅述。

本文重點講解GoldenGate for Kafka的功能,GoldenGate for DB的安裝配置在此略過。目標端GoldenGate for big data 的安裝需要有JDK環境,要求至少1.7及以上版本。安裝完JDK之后,需要指定相應的JAVA_HOME環境變量,並將$JAVA_HOME/bin添加到PATH環境變量。

安裝GoldenGate的節點要求能訪問kafka集群,因此,安裝GoldenGate的節點要有kafka lib,並在后面的kafka.props文件中設置對應的路徑。

GoldenGate的安裝介質是一個ZIP壓縮包,解壓之后,再繼續解壓對應的tar即安裝完成。安裝之后的目錄下有示例可供參考:

clip_image003

GoldenGate for kafka配置

GoldenGate投遞進程參數

REPLICAT myka

-- add replicat myka, exttrail ./dirdat/ea

TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props

REPORTCOUNT EVERY 1 MINUTES, RATE

GROUPTRANSOPS 10000

MAP gg_src.*, TARGET gg_src.*;


Kafka相關的屬性

hadoop@ubuntu2:/opt/GoldenGate12.2.1.1/dirprm$ more kafka.props

gg.handlerlist = kafkahandler

gg.handler.kafkahandler.type = kafka

gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties

gg.handler.kafkahandler.TopicName =mykaf4

gg.handler.kafkahandler.format =avro_op

gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic

gg.handler.kafkahandler.BlockingSend =false

gg.handler.kafkahandler.includeTokens=false

gg.handler.kafkahandler.mode =tx

goldengate.userexit.timestamp=utc

goldengate.userexit.writers=javawriter

javawriter.stats.display=TRUE

javawriter.stats.full=TRUE

gg.log=log4j

gg.log.level=INFO

gg.report.time=30sec

gg.classpath=dirprm/:/opt/kafka/libs/*:

javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

hadoop@ubuntu2:/opt/GoldenGate12.2.1.1/dirprm$ more custom_kafka_producer.properties

bootstrap.servers=localhost:9092

acks=1

compression.type=gzip

reconnect.backoff.ms=1000

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

# 100KB per partition

batch.size=102400

linger.ms=10000


測試

確保zookeeper, kafka相關進程是正常運行的。

clip_image004

啟動GoldenGate投遞進程

GGSCI (ubuntu2) 12> start myka

Sending START request to MANAGER ...

REPLICAT MYKA starting


查看狀態

GGSCI (ubuntu2) 21> info myka

REPLICAT MYKA Last Started 2017-12-18 12:59 Status RUNNING

Checkpoint Lag 00:00:00 (updated 00:00:01 ago)

Process ID 42206

Log Read Checkpoint File ./dirdat/ea000000038

2016-08-28 21:18:20.980481 RBA 2478


統計增量數據,已經寫入3條記錄。

GGSCI (ubuntu2) 22> stats myka, total

Sending STATS request to REPLICAT MYKA ...

Start of Statistics at 2017-12-18 13:05:09.

Replicating from GG_SRC.TB_HIVE to gg_src.TB_HIVE:

*** Total statistics since 2017-12-18 12:59:05 ***

Total inserts 3.00

Total updates 0.00

Total deletes 0.00

Total discards 0.00

Total operations 3.00

End of Statistics.


查看kafka集群,使用consumer命令行查看

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic mykaf4

輸出如下3條記錄,除字段數據外,還有其它輔助信息,如源表結構信息、源端commit時間、當前插入時間等,輸出的信息可以在kafka.props文件中控制。

GG_SRC.TB_HIVEI42016-08-28 12:43:21.97963642017-12-18T12:59:05.368000(00000000380000001916ID1cd&2016-02-08:00:00:00:2016-12-11:11:00:02.000000000
GG_SRC.TB_HIVEI42016-08-28 12:47:24.98154442017-12-18T12:59:05.462000(00000000380000002103IDcd22&2016-02-08:00:00:00:2016-12-11:11:00:02.000000000
GG_SRC.TB_HIVEI42016-08-28 13:18:20.98048142017-12-18T12:59:05.462001(00000000380000002292IDcd22&2016-02-08:00:00:00:2016-12-11:11:00:02.000000000


調整輸出的格式為XML,修改kafka.props文件,重新執行剛才的投遞進程。

gg.handler.kafkahandler.format =xml

GGSCI>stop myka

GGSCI>alter myka, extrba 0

GGSCI>start myka, NOFILTERDUPTRANSACTIONS

使用NOFILTERDUPTRANSACTIONS的目的是禁止OGG跳過已經處理過的事務。


再查看kafka-consumer的輸出結果:

可以看到,數據的格式已經變成xml,而且源端每個操作的詳細信息都已經記錄。

<operation table='GG_SRC.TB_HIVE' type='I' ts='2016-08-28 12:43:21.979636' current_ts='2017-12-18T16:49:00.995000' pos='00000000380000001916' numCols='4'>

<col name='ID' index='0'>

<before missing='true'/>

<after><![CDATA[1]]></after>

</col>

<col name='NAME' index='1'>

<before missing='true'/>

<after><![CDATA[cd]]></after>

</col>

<col name='BIRTH_DT' index='2'>

<before missing='true'/>

<after><![CDATA[2016-02-08:00:00:00]]></after>

</col>

<col name='CR_TM' index='3'>

<before missing='true'/>

<after><![CDATA[2016-12-11:11:00:02.000000000]]></after>

</col>

</operation>

<operation table='GG_SRC.TB_HIVE' type='I' ts='2016-08-28 12:47:24.981544' current_ts='2017-12-18T16:49:00.996000' pos='00000000380000002103' numCols='4'>

<col name='ID' index='0'>

<before missing='true'/>

<after><![CDATA[2]]></after>

</col>

<col name='NAME' index='1'>

<before missing='true'/>

<after><![CDATA[cd22]]></after>

</col>

<col name='BIRTH_DT' index='2'>

<before missing='true'/>

<after><![CDATA[2016-02-08:00:00:00]]></after>

</col>

<col name='CR_TM' index='3'>

<before missing='true'/>

<after><![CDATA[2016-12-11:11:00:02.000000000]]></after>

</col>

</operation>

<operation table='GG_SRC.TB_HIVE' type='I' ts='2016-08-28 13:18:20.980481' current_ts='2017-12-18T16:49:00.996001' pos='00000000380000002292' numCols='4'>

<col name='ID' index='0'>

<before missing='true'/>

<after><![CDATA[3]]></after>

</col>

<col name='NAME' index='1'>

<before missing='true'/>

<after><![CDATA[cd22]]></after>

</col>

<col name='BIRTH_DT' index='2'>

<before missing='true'/>

<after><![CDATA[2016-02-08:00:00:00]]></after>

</col>

<col name='CR_TM' index='3'>

<before missing='true'/>

<after><![CDATA[2016-12-11:11:00:02.000000000]]></after>

</col>

</operation>


最后,再修改輸出格式為json。

gg.handler.kafkahandler.format =json

GGSCI>stop myka

GGSCI>alter myka, extrba 0

GGSCI>start myka, NOFILTERDUPTRANSACTIONS


檢查kafka的輸出結果:

{"table":"GG_SRC.TB_HIVE","op_type":"I","op_ts":"2016-08-28 12:43:21.979636","current_ts":"2017-12-18T16:46:23.860000","pos":"00000000380000001916","after":{"ID":"1","NAME":"cd","BIRTH_DT":"2016-02-08:00:00:00","CR_TM":"2016-12-11:11:00:02.000000000"}}

{"table":"GG_SRC.TB_HIVE","op_type":"I","op_ts":"2016-08-28 12:47:24.981544","current_ts":"2017-12-18T16:46:23.914000","pos":"00000000380000002103","after":{"ID":"2","NAME":"cd22","BIRTH_DT":"2016-02-08:00:00:00","CR_TM":"2016-12-11:11:00:02.000000000"}}

{"table":"GG_SRC.TB_HIVE","op_type":"I","op_ts":"2016-08-28 13:18:20.980481","current_ts":"2017-12-18T16:46:23.914001","pos":"00000000380000002292","after":{"ID":"3","NAME":"cd22","BIRTH_DT":"2016-02-08:00:00:00","CR_TM":"2016-12-11:11:00:02.000000000"}}


可以看到,kafka上已經是JSON格式的數據,而且包含了相關的時間戳和其它輔助信息。

至此,測試完成。

最后,如果有必要,也可以使用GoldenGate針對現有存量數據的初始化,即將關系型數據庫的現有數據使用GoldenGate投遞到Kafka平台,從而省去使用java程序初始化的工作。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM