環境說明
- centos7(運行於vbox虛擬機)
- flume1.9.0(自定義了flume連接mongodb的source插件)
- jdk1.8
- kafka(2.11)
- zookeeper(3.57)
- mongoDB4.0.0(無密碼)
- xshell 7
自定義flume插件
由於flume對數據庫的支持欠缺,flume的source組件中,沒有組件適用於連接關系型數據庫或非關系型數據庫。
對於關系型數據庫(RDB),github中開源插件flume-ng-sql-source被廣泛用於對接RDB。但是對於非關系型數據庫,不同的非關系型數據庫之間都有些許差別,且沒有一個統一的,或者配對的插件來支持非關系型數據庫。
因此,需要使用者自定義插件來適配。
我自定義的flume-ng-mongodb-source的jar包如下:
()
將該jar包放在yourpath/flume/lib
下(yourpath指你flume文件夾前面路徑,下同。同理,下文出現的yourhost指你本機的ip地址)
連接mongodb的配置文件
在mongodb中創建database和collection,用於測試。
創建數據庫:
use flumetest
創建集合(隱式創建):
db.testCollection.insert({id:1,name:"333"})
查看是否已經創建了數據庫:
> show dbs
admin 0.000GB
config 0.000GB
flumetest 0.000GB
local 0.000GB
test 0.000GB
查看集合中的數據:
> db.testCollection.findOne()
{ "_id" : ObjectId("5fe29faad5553e6caaa8cbe9"), "id" : 1, "name" : "333" }
此外,我們需要將mongodb相關的驅動jar包放到yourpath/flume/lib
下
bson-3.12.7.jar
mongo-java-driver-3.12.7.jar
mongodb-driver-core-3.12.7.jar
flume連接mongodb需要先編寫相關的配置文件,在yourpath/flume/conf
里新增配置文件mongo-flume.conf
,具體的配置如下:
#This is a model,you can use for test
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.wms.flumesource.MongoDBSource
a1.sources.r1.Mongodb.url = yourhost:27017
a1.sources.r1.Mongodb.database=flumetest
a1.sources.r1.Mongodb.collection = testCollection
a1.sources.r1.Mongodb.column= _id
a1.sources.r1.start.from = 0
a1.sources.r1.interval=2000
a1.sources.r1.charset=UTF-8
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = mongoTopic
a1.sinks.k1.brokerList = yourhost:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
參數說明:
# mongodb的url
a1.sources.r1.Mongodb.url = yourhost:27017
# 要連接的database
a1.sources.r1.Mongodb.database=flumetest
# 要連接的collection
a1.sources.r1.Mongodb.collection = testCollection
# mongodb中每條數據都有默認的_id,用於續傳
a1.sources.r1.Mongodb.column= _id
# sink使用了kafka,flume成功連接之后開啟消費監控就能看到數據了
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 接下來用於監控消費的topic名字
a1.sinks.k1.topic = mongoTopic
因為mongodb有集群操作,所以flume-ng-mongodb-source也支持mongodb集群,只需要在a1.sources.r1.Mongodb.url
里配置多個url即可,如:
a1.sources.r1.Mongodb.url = yourhost1:port1,yourhost2:port2,yourhost3:port3,......
采集mongodb數據實踐
啟動mongodb和kafka。
啟動flume
bin/flume-ng agent -n a1 -c conf -f conf/mongo-flume.conf -Dflume.root.logger=INFO,console
參數說明:
a1
:是你在mongo-flume中給agent起的別名conf/mongo-flume.conf
:導入前文所述的配置文件,配置文件在yourpath/flume/conf
下。
啟動一個kafka消費監控:
bin/kafka-console-consumer.sh --bootstrap-server yourhost:9092 --topic mongoTopic --from-beginning
獲取testCollection中全部數據(下圖不是重復數據,是之前多次測試在topic中留下的數據):
往testCollection中添加一條數據:
db.testCollection.insert({id:7,name:"test",city:"Beijing"})
消費監控中的結果如下:
只讀增量數據
如果不想把collection中所有的數據都讀取出來,請修改flume-ng-mongodb-source源碼。
在MongoDBSource.java文件中,找到run
方法,取消掉events.clear()
的注釋。
再次打包,替換掉lib下flume-ng-mongodb-source的jar包。
然后再次執行上面的啟動操作:
bin/kafka-console-consumer.sh --bootstrap-server yourhost:9092 --topic mongoTopic --from-beginning
bin/kafka-console-consumer.sh --bootstrap-server yourhost:9092 --topic mongoTopic --from-beginning
插入一條數據:
db.testCollection.insert({id:8,name:"增量"})
查看消費監控:
可以看到只有新增的數據了,不會再讀取所有的數據
再插入一條數據實驗一下:
db.testCollection.insert({id:9,source:"MongoDBSource",channle:"memory",sink:"kafka"})