采集文件call.log的數據到kafka,並從kafka消費者控制台獲取數據。
flume+kafka是目前大數據很經典的日志采集工具。文件數據通過flume采集,通過kafka進行訂閱發布並緩存,很適合充當消息中間件。
准備工作
啟動zookeeper,kafka集群
./bin/zkServer.sh start
./bin/kafka-server-start.sh /config/server.properties
在kafka創建ct主題,並設置分區數量,副本數量,這些信息都會保存在zookeeper上。
./bin/kafka-topics.sh --zookeeper master:2181 --create --topic ct --partitions 3 --replication-factor 2
啟動kafka控制台消費者,在這個進程可以看到采集的數據。
./bin/kafka-console-consumer.sh --zookeeper master:2181 --topic ct --from-beginning
啟動flume,其中flume-kafka.conf配置文件在下方。
./bin/flume-ng agent --conf ./conf/ --name a1 --conf-file ./flume-kafka.conf
flume-kafka.conf
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /usr/local/data/call.log
a1.sources.r1.shell = /bin/bash -c
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sinks.k1.kafka.topic = ct
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
小結
- 可以發現kafka的運行,都要通過zookeeper,可以很好理解zookeeper在kafka集群中充當的角色。
- 運行后,文件call.log的數據都會發送給kafka,無論是哪個節點,通過kafka創建消費者,獲取主題topic都會得到數據。
- flume的sink有直接的kafka源,兩者可以很簡易的結合
