Flume-Spooling Directory Source 監控目錄下多個新文件


使用 Flume 監聽整個目錄的文件,並上傳至 HDFS。

 

一、創建配置文件 flume-dir-hdfs.conf

https://flume.apache.org/FlumeUserGuide.html#spooling-directory-source

# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /tmp/upload
# 給 spoolDir 目錄中文件添加的后綴,區分記錄與未記錄(先記錄后改名)
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
# 忽略所有以.tmp 結尾的文件,不上傳
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://h136:9000/flume/upload/%Y%m%d/%H
# 上傳文件的前綴
a3.sinks.k3.hdfs.filePrefix = upload-
# 是否按照時間滾動文件夾
a3.sinks.k3.hdfs.round = true
# 多少時間單位創建一個新的文件夾
a3.sinks.k3.hdfs.roundValue = 1
# 重新定義時間單位
a3.sinks.k3.hdfs.roundUnit = hour
# 是否使用本地時間戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
# 積攢多少個 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
# 設置文件類型,可支持壓縮
a3.sinks.k3.hdfs.fileType = DataStream
# 多久生成一個新的文件
a3.sinks.k3.hdfs.rollInterval = 60
# 設置每個文件的滾動大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
# 文件的滾動與 Event 數量無關
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

 

二、啟動

cd /opt/apache-flume-1.9.0-bin/
bin/flume-ng agent --conf conf/ --name a3 --conf-file /tmp/flume-job/flume-dir-hdfs.conf -Dflume.root.logger=INFO,console

 

三、測試

vim /tmp/123.txt

123
456
789

cp /tmp/123.txt /tmp/upload/
cp /tmp/123.txt /tmp/upload/456.txt
cp /tmp/123.txt /tmp/upload/789.txt

已記錄的文件會自動加上后綴。若復制以 tmp 結尾的文件 Flume 不記錄,在配置中已忽略。

說明:在使用 Spooling Directory Source 時不要在監控目錄中創建並持續修改文件,上傳完成的文件會以 .COMPLETED 結尾,被監控文件夾每 500 毫秒掃描一次文件變動。

HDFS 上的文件


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM