目前我們使用的一個 b 端軟件的報錯日志分散在集群各處,現在想把它收集到一個地方然后統一丟進 Kafka 提供給下游業務進行消費。
我想到了 flume,之前讓同事搭建的這次自己想多了解一些細節於是就開搞了。
首先還是下載 flume 的客戶端,這里我使用最新版本 1.9.0
curl -O http://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz tar -zvf apache-flume-1.9.0-bin.tar.gz
設置需要的 java 環境,注意路徑自定義一下,沒有 java 自己下個 java8
export JAVA_HOME=/opt/java8
PATH=$PATH:$JAVA_HOME/bin
在 apache-flume-1.9.0-bin/conf 我們可以找到對應的配置文件模版,1.9.0 的模版大概長這樣
# The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' agent.sources = seqGenSrc agent.channels = memoryChannel agent.sinks = loggerSink # For each one of the sources, the type is defined agent.sources.seqGenSrc.type = seq # The channel can be defined as follows. agent.sources.seqGenSrc.channels = memoryChannel # Each sink's type must be defined agent.sinks.loggerSink.type = logger #Specify the channel the sink should use agent.sinks.loggerSink.channel = memoryChannel # Each channel's type is defined. agent.channels.memoryChannel.type = memory # Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel agent.channels.memoryChannel.capacity = 100
我們復制一份當作操作的 conf
mv flume-conf.properties.template flume-conf.properties
從上面的配置文件中我們不難發現
source channel 和 sink 都是單獨定義的項,他們都需要配置一個這個配置文件里面生效的名字,以及其他的基於這個名字的配置。
比如這里我的需求是將某文件里面的新增信息讀出來包裝為事件,先發到 channel 等待處理,我可以配置一個 Taildir Source 來處理這個任務。
flume 為我們准備了非常多的現成的 sources channel 和 sink ,他們都具有不同的功能可以直接提供給我們使用,具體可以參考一下對應版本的官方文檔。
這里我們只談一下這次用到的 Taildir Source
agent.sources = sensorsInvalidRecordsFile agent.channels = file agent.sinks = kafkaSink # For each one of the sources, the type is defined agent.sources.sensorsInvalidRecordsFile.type = TAILDIR agent.sources.sensorsInvalidRecordsFile.filegroups = f1 agent.sources.sensorsInvalidRecordsFile.filegroups.f1 = /sa_cluster/logs/sp/extractor/invalid_records agent.sources.sensorsInvalidRecordsFile.headers.f1.fileName = invalid_records agent.sources.sensorsInvalidRecordsFile.headers.f1.logType = sensorsInvalidRecords agent.sources.sensorsInvalidRecordsFile.channels = file
agent.sources.positionFile = ~/.flume/taildir_position.json
頭三行先申明一下這里配置的 sources channels sinks 各為什么名字。這里我們可以留意到,所有的組件都被命名為復數,這就意味着我們可以同時申明多個 sources ,只需要將其配置行用空格依次分割即可
agent.sources = s1 s2 s3
這樣即可同時生成三個 source。
這里的配置我們指定了一個實例,並且對這個實例上的屬性就行初始化。
然后我們繼續配置一個 channel 。這里配置一個 file channel,將從 source 里面抽出來的 event 都落盤防止數據丟失。
# Each channel's type is defined. agent.channels.fileC.type = file agent.channels.fileC.dataDirs = ~/.flume/file-channel/data agent.channels.fileC.useDualCheckpoints = true agent.channels.fileC.backupCheckpointDir = ~/.flume/file-channel/backup_checkpoint
最后我需要定義一個可以將 channel 里面的數據讀出來,並且放到 kafka 里面去的 sink。找了一下正好有一個叫 kafka sink 的 sink 可以滿足我
可以看到和 apache hadoop 生態結合得比較好的 flume 為什么成為抽取日志的首選,或者優先考慮的對象,就是其對生態的友好和提供足夠多的開箱即用的功能。
agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafkaSink.channel = fileC agent.sinks.kafkaSink.kafka.bootstrap.servers = 10.171.97.1:9092, 10.163.13.219:9092, 10.170.249.122:9092 agent.sinks.kafkaSink.topic = flume-topic-sensors-invalid-records agent.sinks.kafkaSink.producer.acks = -1 agent.sinks.kafkaSink.producer.compression.type = snappy
將 kafka 集群信息配置上去。
最后一步我們來啟動 flume-ng
/bin/flume-ng agent -n agent -c conf -f /home/flume_self/apache-flume-1.9.0-bin/conf/flume-conf.properties -Dflume.root.logger=INFO,console
-n 是名稱
-c 是配置
-f 是配置地址
最好用 nohup 或者 supervisor 對任務進行管理。
再去目標 kafka-manager 之類的工具上去看下是否發送成功即可!
到此為止我們的目標就達成了。感覺還是蠻簡單的,就是隨便配置一下配置就可以完成工作,需要定制化的工作 flume 也支持利用一些勾子讀取到數據然后進行 etl 或者修改之后再發送。還是比較靈活。希望早點遇到類似需求再玩一下。
Reference:
https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html Flume user_guide
https://juejin.im/post/5be4e549f265da61441f8dbe Apache Flume 入門教程
https://www.mtyun.com/library/how-to-install-flume-on-centos7 在 CentOS7 上安裝 Flume