原文鏈接:https://blog.csdn.net/wangpei1949/article/details/80472928
flume中有三種可監控文件或目錄的source、分別是Exec Source、Spooling Directory Source和Taildir Source。
Taildir Source是1.7版本的新特性,綜合了Spooling Directory Source和Exec Source的優點。
使用場景:
Exec Source
Exec Source可通過tail -f命令去tail住一個文件,然后實時同步日志到sink。但存在的問題是,當agent進程掛掉重啟后,會有重復消費的問題。可以通過增加UUID來解決,或通過改進ExecSource來解決。
Spooling Directory Source
Spooling Directory Source可監聽一個目錄,同步目錄中的新文件到sink,被同步完的文件可被立即刪除或被打上標記。適合用於同步新文件,但不適合對實時追加日志的文件進行監聽並同步。如果需要實時監聽追加內容的文件,可對SpoolDirectorySource進行改進。
Taildir Source
Taildir Source可實時監控一批文件,並記錄每個文件最新消費位置,agent進程重啟后不會有重復消費的問題。
使用時建議用1.8.0版本的flume,1.8.0版本中解決了Taildir Source一個可能會丟數據的bug。
還有github上讀取數據庫的source以及遞歸遍歷文件子目錄的文件夾
Taildir Source
agent配置
# source的名字
agent.sources = s1
# channels的名字
agent.channels = c1
# sink的名字
agent.sinks = r1
# 指定source使用的channel
agent.sources.s1.channels = c1
# 指定sink使用的channel
agent.sinks.r1.channel = c1
######## source相關配置 ########
# source類型
agent.sources.s1.type = TAILDIR
# 元數據位置
agent.sources.s1.positionFile = /Users/wangpei/tempData/flume/taildir_position.json
# 監控的目錄
agent.sources.s1.filegroups = f1
agent.sources.s1.filegroups.f1=/Users/wangpei/tempData/flume/data/.*log
agent.sources.s1.fileHeader = true
######## channel相關配置 ########
# channel類型
agent.channels.c1.type = file
# 數據存放路徑
agent.channels.c1.dataDirs = /Users/wangpei/tempData/flume/filechannle/dataDirs
# 檢查點路徑
agent.channels.c1.checkpointDir = /Users/wangpei/tempData/flume/filechannle/checkpointDir
# channel中最多緩存多少
agent.channels.c1.capacity = 1000
# channel一次最多吐給sink多少
agent.channels.c1.transactionCapacity = 100
######## sink相關配置 ########
# sink類型
agent.sinks.r1.type = org.apache.flume.sink.kafka.KafkaSink
# brokers地址
agent.sinks.r1.kafka.bootstrap.servers = localhost:9092
# topic
agent.sinks.r1.kafka.topic = testTopic3
# 壓縮
agent.sinks.r1.kafka.producer.compression.type = snappy
記錄每個文件消費位置的元數據
#配置
agent.sources.s1.positionFile = /Users/wangpei/tempData/flume/taildir_position.json
#內容
[
{
"inode":6028358,
"pos":144,
"file":"/Users/wangpei/tempData/flume/data/test.log"
},
{
"inode":6028612,
"pos":20,
"file":"/Users/wangpei/tempData/flume/data/test_a.log"
}
]
可以看到,在taildir_position.json文件中,通過json數組的方式,記錄了每個文件最新的消費位置,每消費一次便去更新這個文件。