Flume MongodbSink
此mongodb支持3.0
MongodbSink
flume-ng-mongodbsink
An Apache Flume Sink that send JSON to MongoDB collection
配置文件 configuration properties
Property Name | Default | Description |
---|---|---|
hostNames | - | host1:port1,host2,port2,...the mongodb host and port |
database | - | the mongodb database |
collection | - | the collection of database |
user | - | the username of databse |
password | - | the password of database |
batchSize | 100 | the batchSize of sources |
authentication_enabled | False | Whether u need a password and a user |
如果沒有密碼和用戶名 就不需要user password authentication_enabled T
如果有密碼,設置authentication_enabled =True
Example
# 定義數據出口
a1.sinks.s.type = com.kenshuchong.MongodbSink.MongoSinkSelf
a1.sinks.s.hostNames=127.0.0.1:27017
a1.sinks.s.authentication_enabled=True
a1.sinks.s.database = database
a1.sinks.s.password = password
a1.sinks.s.user = user
a1.sinks.s.collection = collection
a1.sinks.s.batchSize = 100
a1.sinks.s.channel = c
自定義修改Custom modify
可以修改其中生成json部分
line 76-82 jsonEvent is the event body
json event是日志主體
String cuTime = getCurrentTime();
String jsonEvent = new String(event.getBody(), StandardCharsets.UTF_8);
Document sentEvent = new Document("log",jsonEvent)
.append("Dir","/data/ngnix.log")
.append("Time", cuTime);
documents.add(sentEvent);
tips
本mongodbsink 支持3.0版本
線上使用需在flume/lib下添加一下幾個jar包
- mongodb-driver-3.0.2.jar
- mongodb-driver-core-3.0.2.jar
- bson-3.0.2.jar
結合TAILDIR srouce實時采集日志並存入mongodb中
需求
- 日志存儲在/opt/rec/log
- 日志需要采集其中的ERROR級別日志
- 日志存儲在mongodb中
處理需求
- 采用新的TAILDIR source來對/log進行實時采集
- 給suorce配置正則攔截器,攔截非ERROR日志
- 采用自定義mongodbsink實時將日志插入mongodb中
- positionFile 為存儲文件讀取偏移地址的josn文件,這種只從最新位置讀取
- 只有檢測到了文件位置有新的偏移才會再次讀取文件
配置文件
#定義組件名稱
a1.sources = r
a1.sinks = s
a1.channels = c
#定義數據入口
a1.sources.r.type = TAILDIR
a1.sources.r.channels = c
a1.sources.r.positionFile = /home/ch/logMonitor/taildir_position.json
a1.sources.r.filegroups = f1
a1.sources.r.filegroups.f1 = /opt/rec/log/*.log
##定義攔截器
a1.sources.r.interceptors=i1
a1.sources.r.interceptors.i1.type=regex_filter
a1.sources.r.interceptors.i1.regex= ERROR
# 定義數據出口
a1.sinks.s.type = com.kenshuchong.MongodbSink.MongoSinkSelf
a1.sinks.s.hostNames=127.0.0.1:27017
a1.sinks.s.authentication_enabled=True
a1.sinks.s.database = database
a1.sinks.s.password = password
a1.sinks.s.user = user
a1.sinks.s.collection = logsearch_info
a1.sinks.s.batchSize = 100
a1.sinks.s.channel = c
# 使用內存管道
a1.channels.c.type = memory
a1.channels.c.capacity = 10000
a1.channels.c.transactionCapacity = 100