Flume-Kafka
1. 修改 avro-memory-kafka.conf文件:
# Name the components on this agent avro-memory-kafka.sources = avro-source avro-memory-kafka.sinks = kafka-sink avro-memory-kafka.channels = memory-channel # Describe/configure the source avro-memory-kafka.sources.avro-source.type = avro avro-memory-kafka.sources.avro-source.bind = 192.168.1.143 avro-memory-kafka.sources.avro-source.port = 44444 # Describe the sink avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink avro-memory-kafka.sinks.kafka-sink.brokerList = 192.168.1.143:9092 avro-memory-kafka.sinks.kafka-sink.topic = hello_topic avro-memory-kafka.sinks.kafka-sink.batchSize = 5 avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1 # Use a channel which buffers events in memory avro-memory-kafka.channels.memory-channel.type = memory # Bind the source and sink to the channel avro-memory-kafka.sources.avro-source.channels = memory-channel avro-memory-kafka.sinks.kafka-sink.channel = memory-channel
2.啟動Flume:
(1)先啟動44444端口的Flume
flume-ng agent \ --name avro-memory-kafka \ --conf $FLUME_HOME/conf \ --conf-file $FLUME_HOME/conf/avro-memory-kafka.conf \ -Dflume.root.logger=INFO,console
(2)再啟動exec-memory-avro.conf的Flume
flume-ng agent \ --name exec-memory-avro \ --conf $FLUME_HOME/conf \ --conf-file $FLUME_HOME/conf/exec-memory-avro.conf \ -Dflume.root.logger=INFO,console
*(3)如果報錯,可能是沒有啟動zookeeper,
zookeeper-server-start.sh zookeeper.properties
(4) 開啟Kafka消費者:
kafka-console-consumer.sh --zookeeper 192.168.1.143:2181 --topic hello_topic
(5)在data.log中輸入一些數據。(生產數據)
echo helloaaa >> data.log echo helloaaa >> data.log echo helloaaa >> data.log echo helloaaa >> data.log echo helloaaa >> data.log echo helloaaa >> data.log echo helloaaa >> data.log echo helloaaa >> data.log echo helloaaa >> data.log
(6)如下圖,消費者消費了這些
以上就是簡單實現了Flume-Kakfa的數據產生到消費。