整合Flume和Kafka完成數據采集


Flume-Kafka

 

1. 修改 avro-memory-kafka.conf文件:

# Name the components on this agent
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = memory-channel

# Describe/configure the source
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = 192.168.1.143
avro-memory-kafka.sources.avro-source.port = 44444

# Describe the sink
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.brokerList = 192.168.1.143:9092
avro-memory-kafka.sinks.kafka-sink.topic = hello_topic
avro-memory-kafka.sinks.kafka-sink.batchSize = 5
avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1

# Use a channel which buffers events in memory
avro-memory-kafka.channels.memory-channel.type = memory

# Bind the source and sink to the channel
avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel

2.啟動Flume:

(1)先啟動44444端口的Flume

flume-ng agent \
--name avro-memory-kafka \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/avro-memory-kafka.conf \
-Dflume.root.logger=INFO,console

(2)再啟動exec-memory-avro.conf的Flume

flume-ng agent \
--name exec-memory-avro \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/exec-memory-avro.conf \
-Dflume.root.logger=INFO,console

 *(3)如果報錯,可能是沒有啟動zookeeper, 

zookeeper-server-start.sh zookeeper.properties

  (4) 開啟Kafka消費者:

kafka-console-consumer.sh --zookeeper 192.168.1.143:2181 --topic hello_topic

  (5)在data.log中輸入一些數據。(生產數據)

echo helloaaa >> data.log
echo helloaaa >> data.log
echo helloaaa >> data.log
echo helloaaa >> data.log
echo helloaaa >> data.log
echo helloaaa >> data.log
echo helloaaa >> data.log
echo helloaaa >> data.log
echo helloaaa >> data.log

  (6)如下圖,消費者消費了這些

 以上就是簡單實現了Flume-Kakfa的數據產生到消費。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM