flume與kafka整合


flume與kafka整合

前提:##

在上訴條件滿足的情況下才能進行flume和kafka的整合。

flume與kafka整合

  • 修改/usr/local/flume/conf/flume-conf.properties
agent.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.s1.topic = mytopic
agent.sinks.s1.brokerList = localhost:9092
agent.sinks.s1.requiredAcks = 1
agent.sinks.s1.batchSize = 20
agent.sinks.s1.channel = c1

啟動kafka

  • 啟動zookeeper

/usr/local/kafka/bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

  • 啟動kafka

/usr/local/kafka/bin/kafka-server-start.sh config/server.properties

  • 創建主題test

/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

啟動flume

/usr/local/flume/bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent&

驗證

  • 新開一個客戶端

telnet localhost 8888

[root@localhost ~]# telnet localhost 8888
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
test
OK
chat
OK
this is a flume message to kafka
OK
from flume message.
OK
it's flume and kafka message。
OK

  • kafka查看消息接收情況
[root@localhost kafka]# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
test
chat
this is a flume message to kafka
from flume message.
it's flume and kafka message。

如果出現上述效果則說明flume和kafka整合成功。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM