hadoop 之 kafka 安裝與 flume -> kafka 整合


62-kafka 安裝 : flume 整合 kafka

一.kafka 安裝

1.下載

http://kafka.apache.org/downloads.html

2. 解壓

tar -zxvf kafka_2.10-0.8.1.1.tgz

3.啟動服務

3.1 首先啟動zookeeper服務

bin/zookeeper-server-start.sh config/zookeeper.properties

3.2啟動Kafka

bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

3.3創建topic

創建一個"test"的topic,一個分區一個副本
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看主題
bin/kafka-topics.sh --list --zookeeper localhost:2181
查看主題詳情
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
刪除主題
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

二. flume -整合 --kafka

1.啟動 flume (配置文件)

flume-ng agent --conf conf -f /bigdata/flume-1.6/conf/kafka.conf -name producer -Dlume.root.logger=DEBUG,console

2.啟動 kafka

bin/zookeeper-server-start.sh config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

3. 發送 消息

echo 'wo l g q .' |nc -u hadoop1 8285

4.--啟動consumer查看是否接受到信息

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

三.flume - kafka 錯誤


java.lang.ClassNotFoundException: org.apache.flume.plugins.KafkaSink

jar -tf flume-ng-kafka-sink-1.6.0.jar | fgrep KafkaSink,你就能確定這里面有沒有KafkaSink了

producer.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink

  (一定要參考官網配置 agent)
 

  flume 官網
  

四. kafka.conf

  

producer agent 配置

#memory channel called ch1 on agent1
producer.channels.channel1.type = memory

# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
producer.sources.source1.channels = channel1
producer.sources.source1.type = syslogudp
producer.sources.source1.bind = 127.0.0.1
producer.sources.source1.port = 8285

# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
producer.sinks.sink1.channel = channel1

producer.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
producer.sinks.sink1.brokerList=127.0.0.1:9092
producer.sinks.sink1.topic=test
producer.sinks.sink1.batchSize=20
 


# Finally, now that we've defined all of our components, tell
# agent1 which ones we want to activate.
producer.channels = channel1
producer.sources = source1
producer.sinks = sink1


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM