GoldenGate實時投遞數據到大數據平台(3)- Apache Flume


Apache Flume

    Flume NG是一個分布式、可靠、可用的系統,它能夠將不同數據源的海量日志數據進行高效收集、聚合,最后存儲到一個中心化數據存儲系統中,方便進行數據分析。事實上flume也可以收集其他信息,不僅限於日志。包括端口數據、JMS、命令行等輸出數據。

架構

clip_image001

Flume主要的組件包括source(數據源),數據中間存儲(channel),sink數據目標存儲。

可實現多種拓撲架構,如級聯數據傳輸。

clip_image003

可以多對一做數據集中

clip_image005

也可以一對多做數據分發

clip_image007

Flume支持的組件
Source

clip_image008clip_image009

Channel

clip_image010

sink

clip_image011

安裝及簡單測試

從flume.apache.org下載安裝包之后,解壓到/u02/flume1.8

編輯一個示例文件 conf/a1.conf,內容如下

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1


以上配置監聽44444端口的netcat source。

啟動代理

bin/flume-ng agent --conf conf --conf-file conf/a1.conf --name a1 -Dflume.root.logger=INFO,console

輸出日志如下

info: Including Hive libraries found via () for Hive access

+ exec /u01/jdk1.8.0_111/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/u02/flume1.8/conf:/u02/flume1.8/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application --conf-file conf/a1.conf --name a1

2018-01-02 09:11:28,634 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:62)] Configuration provider starting

2018-01-02 09:11:28,640 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:134)] Reloading configuration file:conf/a1.conf

............................

2018-01-02 09:11:28,814 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]


可以看到監聽的IP和端口已經正常啟動。

在另一個終端上執行telnet,輸入相關的文字。

$telnet localhost 44444

Trying 127.0.0.1...

Connected to localhost.

Escape character is '^]'.

hello

OK

that's ok

OK

quit

OK

^]

telnet> quit

Connection closed.


可以看到flume agent窗口輸出的日志包含有剛才輸入的文字,flume agent可以正常運行。

2018-01-02 09:12:07,928 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 0D hello. }

2018-01-02 09:12:16,932 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 74 68 61 74 27 73 20 6F 6B 0D that's ok. }

2018-01-02 09:12:20,935 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 71 75 69 74 0D quit. }


OGG與flume集成的架構

clip_image013

GoldenGate在源端捕獲關系型數據庫的增量數據,OGG通過網絡將數據傳輸到flume配置的節點,在此節點上配置flume agent,並啟動一個監聽端口,然后OGG Java adapter解析增量數據,實時將數據寫入到Flume agent的source端口中,最后由flume通過channel, sink將數據傳輸到下一個目標存儲節點。

采用GoldenGate與flume的集成,可以實現RDBMS的增量數據,實時寫入到flume支持的任意目標類型節點。

針對OGG的投遞,只需要在a1.conf中添加一段接收avro的監聽配置即可,如下:

a1.sources = r1 r2


a1.sources.r2.channels = c1

a1.sources.r2.type = avro

a1.sources.r2.bind = 192.168.89.132

a1.sources.r2.port = 41414


重新啟動flume agent,可以看到輸出日志中包含了AVRO相關的日志信息:

2018-01-02 09:39:21,785 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r2: Successfully registered new MBean.

2018-01-02 09:39:21,786 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r2 started

2018-01-02 09:39:21,788 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:260)] Avro source r2 started.


OGG配置

解壓goldengate for bigdata 12.3到/u01/ogg4bd_12.3目錄。

設置環境變量

export LD_LIBRARY_PATH=/u01/jdk1.8.0_111/jre/lib/amd64/server

拷貝Adapter-Examples/big-data/flume中的所有文件到dirprm/目錄下。

修改flume.props文件中flume對應的lib路徑。

修改custom-flume-rpc.properties中flume監聽avro的主機和端口。

flume.props文件內容如下:

[oracle@ol73 dirprm]$ more flume.props

gg.handlerlist = flumehandler

gg.handler.flumehandler.type=flume

gg.handler.flumehandler.RpcClientPropertiesFile=custom-flume-rpc.properties

gg.handler.flumehandler.format=avro_op

gg.handler.flumehandler.mode=tx

#gg.handler.flumehandler.maxGroupSize=100, 1Mb

#gg.handler.flumehandler.minGroupSize=50, 500 Kb

gg.handler.flumehandler.EventMapsTo=tx

gg.handler.flumehandler.PropagateSchema=true

gg.handler.flumehandler.includeTokens=false

gg.handler.flumehandler.format.WrapMessageInGenericAvroMessage=true

goldengate.userexit.timestamp=utc

goldengate.userexit.writers=javawriter

javawriter.stats.display=TRUE

javawriter.stats.full=TRUE

gg.log=log4j

gg.log.level=INFO

gg.report.time=30sec

#Sample gg.classpath for Apache Flume

gg.classpath=dirprm/:/u02/flume1.8/lib/*:

javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

以上文件內容從示例文件中直接拷貝即可,唯一要修改的是gg.classpath。


custom-flume-rpc.properties內容如下

[oracle@ol73 dirprm]$ more custom-flume-rpc.properties

client.type=default

hosts=h1

hosts.h1=192.168.89.132:41414

batch-size=100

connect-timeout=20000

request-timeout=20000

唯一要修改的是flume監聽的主機IP和端口,如果有多個flume組成的集群,可以設置多個host。


最后,是ogg投遞的進程參數rflume.prm

[oracle@ol73 dirprm]$ more rflume.prm

REPLICAT rflume

-- Trail file for this example is located in "AdapterExamples/trail" directory

-- Command to add REPLICAT

-- add replicat rflume, exttrail AdapterExamples/trail/tr

TARGETDB LIBFILE libggjava.so SET property=dirprm/flume.props

REPORTCOUNT EVERY 1 MINUTES, RATE

GROUPTRANSOPS 10000

MAP QASOURCE.*, TARGET QASOURCE.*;


OGG投遞測試及驗證

進入ggsci,添加進程,使用自帶的示例數據

GGSCI> add replicat rflume, exttrail AdapterExamples/trail/tr

啟動投遞進程

GGSCI (ol73) 5> start rflume

Sending START request to MANAGER ...

REPLICAT RFLUME starting

查看進程狀態,發現已經投遞完成

GGSCI (ol73) 6> info rflume

REPLICAT RFLUME Last Started 2018-01-02 09:41 Status RUNNING

Checkpoint Lag 00:00:00 (updated 00:00:00 ago)

Process ID 66856

Log Read Checkpoint File /u01/ogg4bd_12.3/AdapterExamples/trail/tr000000000

2015-11-06 02:45:39.000000 RBA 5660


檢查投遞的數據

GGSCI (ol73) 7> stats rflume, total

Sending STATS request to REPLICAT RFLUME ...

Start of Statistics at 2018-01-02 09:41:51.

Replicating from QASOURCE.TCUSTMER to QASOURCE.TCUSTMER:

*** Total statistics since 2018-01-02 09:41:45 ***

Total inserts 5.00

Total updates 1.00

Total deletes 0.00

Total discards 0.00

Total operations 6.00

Replicating from QASOURCE.TCUSTORD to QASOURCE.TCUSTORD:

*** Total statistics since 2018-01-02 09:41:45 ***

Total inserts 5.00

Total updates 3.00

Total deletes 2.00

Total discards 0.00

Total operations 10.00

End of Statistics.


在flume agent的日志上可以看到

2018-01-02 09:41:45,296 (New I/O server boss #5) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x41866afa, /192.168.89.132:17935 => /192.168.89.132:41414] OPEN

2018-01-02 09:41:45,299 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x41866afa, /192.168.89.132:17935 => /192.168.89.132:41414] BOUND: /192.168.89.132:41414

2018-01-02 09:41:45,300 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x41866afa, /192.168.89.132:17935 => /192.168.89.132:41414] CONNECTED: /192.168.89.132:17935

2018-01-02 09:41:45,700 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{SCHEMA_EVENT=TRUE, SCHEMA_NAME=QASOURCE, TABLE_NAME=TCUSTMER, GENERIC_WRAPPER=false, SCHEMA_FINGERPRINT=-1783711649} body: 7B 0A 20 20 22 74 79 70 65 22 20 3A 20 22 72 65 {. "type" : "re }

2018-01-02 09:41:45,701 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{SCHEMA_EVENT=TRUE, SCHEMA_NAME=QASOURCE, TABLE_NAME=TCUSTMER, GENERIC_WRAPPER=true, SCHEMA_FINGERPRINT=1472787928} body: 7B 0A 20 20 22 74 79 70 65 22 20 3A 20 22 72 65 {. "type" : "re }

2018-01-02 09:41:45,701 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{SCHEMA_EVENT=TRUE, SCHEMA_NAME=QASOURCE, TABLE_NAME=TCUSTORD, GENERIC_WRAPPER=false, SCHEMA_FINGERPRINT=495754722} body: 7B 0A 20 20 22 74 79 70 65 22 20 3A 20 22 72 65 {. "type" : "re }

2018-01-02 09:41:45,701 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{OP_COUNT=4, GG_TRANID=00000000000000001956} body: 22 51 41 53 4F 55 52 43 45 2E 54 43 55 53 54 4D "QASOURCE.TCUSTM }

2018-01-02 09:41:45,748 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{OP_COUNT=12, GG_TRANID=00000000000000003286} body: 22 51 41 53 4F 55 52 43 45 2E 54 43 55 53 54 4D "QASOURCE.TCUSTM }


可以看到,在flume輸出的日志中,已經有對應的表及源端DB操作等數據,console輸出信息中,flume-ng針對logger是只顯示16個字節的,剩下的都被sink截了。

如果有需求,也可以使用flume sink將數據輸出到HDFS、HIVE, kafka等目標端,從而實現RDBMS數據與其它半結構化數據的實時整合。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM