Flume 實戰,將多台機器日志直接收集到 Kafka


目前我們使用的一個 b 端軟件的報錯日志分散在集群各處,現在想把它收集到一個地方然后統一丟進 Kafka 提供給下游業務進行消費。

我想到了 flume,之前讓同事搭建的這次自己想多了解一些細節於是就開搞了。

首先還是下載 flume 的客戶端,這里我使用最新版本 1.9.0

curl -O http://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
tar -zvf apache-flume-1.9.0-bin.tar.gz

設置需要的 java 環境,注意路徑自定義一下,沒有 java 自己下個 java8

export JAVA_HOME=/opt/java8
PATH=$PATH:$JAVA_HOME/bin

 

在 apache-flume-1.9.0-bin/conf 我們可以找到對應的配置文件模版,1.9.0 的模版大概長這樣

# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'

agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = loggerSink

# For each one of the sources, the type is defined
agent.sources.seqGenSrc.type = seq

# The channel can be defined as follows.
agent.sources.seqGenSrc.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.loggerSink.type = logger

#Specify the channel the sink should use
agent.sinks.loggerSink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100

我們復制一份當作操作的 conf

mv flume-conf.properties.template flume-conf.properties

 

從上面的配置文件中我們不難發現

source channelsink 都是單獨定義的項,他們都需要配置一個這個配置文件里面生效的名字,以及其他的基於這個名字的配置。

比如這里我的需求是將某文件里面的新增信息讀出來包裝為事件,先發到 channel 等待處理,我可以配置一個 Taildir Source 來處理這個任務。

flume 為我們准備了非常多的現成的 sources channelsink ,他們都具有不同的功能可以直接提供給我們使用,具體可以參考一下對應版本的官方文檔。

這里我們只談一下這次用到的 Taildir Source

agent.sources = sensorsInvalidRecordsFile
agent.channels = file
agent.sinks = kafkaSink

# For each one of the sources, the type is defined
agent.sources.sensorsInvalidRecordsFile.type = TAILDIR
agent.sources.sensorsInvalidRecordsFile.filegroups = f1
agent.sources.sensorsInvalidRecordsFile.filegroups.f1 = /sa_cluster/logs/sp/extractor/invalid_records
agent.sources.sensorsInvalidRecordsFile.headers.f1.fileName = invalid_records
agent.sources.sensorsInvalidRecordsFile.headers.f1.logType = sensorsInvalidRecords
agent.sources.sensorsInvalidRecordsFile.channels = file
agent.sources.positionFile = ~/.flume/taildir_position.json

頭三行先申明一下這里配置的 sources channels sinks 各為什么名字。這里我們可以留意到,所有的組件都被命名為復數,這就意味着我們可以同時申明多個 sources ,只需要將其配置行用空格依次分割即可

agent.sources = s1 s2 s3

這樣即可同時生成三個 source。

這里的配置我們指定了一個實例,並且對這個實例上的屬性就行初始化。

 

然后我們繼續配置一個 channel 。這里配置一個 file channel,將從 source 里面抽出來的 event 都落盤防止數據丟失。

# Each channel's type is defined.
agent.channels.fileC.type = file
agent.channels.fileC.dataDirs = ~/.flume/file-channel/data
agent.channels.fileC.useDualCheckpoints = true
agent.channels.fileC.backupCheckpointDir = ~/.flume/file-channel/backup_checkpoint

 

最后我需要定義一個可以將 channel 里面的數據讀出來,並且放到 kafka 里面去的 sink。找了一下正好有一個叫 kafka sink 的 sink 可以滿足我

可以看到和 apache hadoop 生態結合得比較好的 flume 為什么成為抽取日志的首選,或者優先考慮的對象,就是其對生態的友好和提供足夠多的開箱即用的功能。

agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.channel = fileC
agent.sinks.kafkaSink.kafka.bootstrap.servers = 10.171.97.1:9092, 10.163.13.219:9092, 10.170.249.122:9092
agent.sinks.kafkaSink.topic = flume-topic-sensors-invalid-records
agent.sinks.kafkaSink.producer.acks = -1
agent.sinks.kafkaSink.producer.compression.type = snappy

將 kafka 集群信息配置上去。

最后一步我們來啟動 flume-ng 

/bin/flume-ng agent -n agent -c conf -f /home/flume_self/apache-flume-1.9.0-bin/conf/flume-conf.properties -Dflume.root.logger=INFO,console

-n 是名稱

-c 是配置

-f 是配置地址

最好用 nohup 或者 supervisor 對任務進行管理。

 

再去目標 kafka-manager 之類的工具上去看下是否發送成功即可!

到此為止我們的目標就達成了。感覺還是蠻簡單的,就是隨便配置一下配置就可以完成工作,需要定制化的工作 flume 也支持利用一些勾子讀取到數據然后進行 etl 或者修改之后再發送。還是比較靈活。希望早點遇到類似需求再玩一下。

 

 

Reference:

https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html    Flume user_guide

https://juejin.im/post/5be4e549f265da61441f8dbe    Apache Flume 入門教程

https://www.mtyun.com/library/how-to-install-flume-on-centos7    在 CentOS7 上安裝 Flume

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM