Flume采集文件數據到Kafka


采集文件call.log的數據到kafka,並從kafka消費者控制台獲取數據。

flume+kafka是目前大數據很經典的日志采集工具。文件數據通過flume采集,通過kafka進行訂閱發布並緩存,很適合充當消息中間件。

准備工作

啟動zookeeper,kafka集群

./bin/zkServer.sh start
./bin/kafka-server-start.sh /config/server.properties

在kafka創建ct主題,並設置分區數量,副本數量,這些信息都會保存在zookeeper上。

./bin/kafka-topics.sh --zookeeper master:2181 --create --topic ct --partitions 3 --replication-factor 2

啟動kafka控制台消費者,在這個進程可以看到采集的數據。

./bin/kafka-console-consumer.sh --zookeeper master:2181 --topic ct --from-beginning

啟動flume,其中flume-kafka.conf配置文件在下方。

./bin/flume-ng agent --conf ./conf/ --name a1 --conf-file ./flume-kafka.conf

flume-kafka.conf

# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /usr/local/data/call.log
a1.sources.r1.shell = /bin/bash -c

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sinks.k1.kafka.topic = ct
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

小結

  • 可以發現kafka的運行,都要通過zookeeper,可以很好理解zookeeper在kafka集群中充當的角色。
  • 運行后,文件call.log的數據都會發送給kafka,無論是哪個節點,通過kafka創建消費者,獲取主題topic都會得到數據。
  • flume的sink有直接的kafka源,兩者可以很簡易的結合


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM