flume與kafka整合
前提:##
-
flume安裝和測試通過,可參考:http://www.cnblogs.com/rwxwsblog/p/5800300.html
-
kafka安裝和測試通過,可參考:http://www.cnblogs.com/rwxwsblog/p/5800224.html
在上訴條件滿足的情況下才能進行flume和kafka的整合。
flume與kafka整合
- 修改/usr/local/flume/conf/flume-conf.properties
agent.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.s1.topic = mytopic
agent.sinks.s1.brokerList = localhost:9092
agent.sinks.s1.requiredAcks = 1
agent.sinks.s1.batchSize = 20
agent.sinks.s1.channel = c1
啟動kafka
- 啟動zookeeper
/usr/local/kafka/bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
- 啟動kafka
/usr/local/kafka/bin/kafka-server-start.sh config/server.properties
- 創建主題
test
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
啟動flume
/usr/local/flume/bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent&
驗證
- 新開一個客戶端
telnet localhost 8888
[root@localhost ~]# telnet localhost 8888
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
test
OK
chat
OK
this is a flume message to kafka
OK
from flume message.
OK
it's flume and kafka message。
OK
- kafka查看消息接收情況
[root@localhost kafka]# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
test
chat
this is a flume message to kafka
from flume message.
it's flume and kafka message。
如果出現上述效果則說明flume和kafka整合成功。