Flume整合Kafka完成實時數據采集


agent選擇

agent1 exec source + memory channel + avro sink

agent2 avro source + memory channel 

模擬實際工作中的場景,agent1 為A機器,agent2 為B機器。

 

avro source: 監聽avro端口,並且接收來自外部avro信息,

avro sink:一般用於跨節點傳輸,主要綁定數據移動目的地的ip和port

 

 

在創建agent2配置文件

cd /app/flume/flume/conf

vi test-avro-memory-kafka.conf

avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = memory-channel
 
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind= dblab-VirtualBox
avro-memory-kafka.sources.avro-source.port=44444
 
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers = dblab-VirtualBox:9092
avro-memory-kafka.sinks.kafka-sink.kafka.topic = hello_topic
avro-memory-kafka.sinks.kafka-sink.batchSize = 5
avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1 

avro-memory-kafka.channels.memory-channel.type = memory
 
avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel

 

啟動agent2

flume-ng agent --name avro-memory-kafka -c conf -f conf/test-avro-memory-kafka.conf -Dflume.root.logger=INFO,console

 

 

這里一定要等agent2的avro-source啟動成功,已經監聽了自己的44444端口,才能去啟動agent1,不然agent1啟動會被拒絕連接

 

創建agent1配置文件

cd /app/flume/flume/conf

vi test-exec-memory-avro.conf

exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel

exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /home/hadoop/data/data.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c

exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = dblab-VirtualBox
exec-memory-avro.sinks.avro-sink.port = 44444

exec-memory-avro.channels.memory-channel.type = memory

exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel

 

啟動agent2

flume-ng agent --name exec-memory-avro -c conf -f conf/test-exec-memory-avro.conf -Dflume.root.logger=INFO,console

 

接下來對Kafka進行配置

先啟動Kafka

$ kafka-server-start.sh $KAFKA_HOME/config/server.properties

創建hello_topic

$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

 啟動生產者

kafka-console-producer.sh --broker-list localhost:9092 --topic hello_topic

 

啟動一個Kafka的客戶端來消費,測試是否啟動成功

kafka-console-consumer.sh --zookeeper localhost:2181 --topic hello_topic

 

向agent1的exec-source監聽的文件中寫數據

 

查看Kafka的客戶端是否通過flume消費到數據

 

至此完成Flume整合Kafka完成實時數據采集


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM