Flume簡介與使用(三)——Kafka Sink消費數據之Kafka安裝


  前面已經介紹了如何利用Thrift Source生產數據,今天介紹如何用Kafka Sink消費數據。

  其實之前已經在Flume配置文件里設置了用Kafka Sink消費數據

agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafkaSink.topic = TRAFFIC_LOG
agent1.sinks.kafkaSink.brokerList = 10.208.129.3:9092,10.208.129.4:9092,10.208.129.5:9092
agent1.sinks.kafkaSink.metadata.broker.list = 10.208.129.3:9092,10.208.129.4:9092,10.208.129.5:9092
agent1.sinks.kafkaSink.producer.type=sync
agent1.sinks.kafkaSink.serializer.class=kafka.serializer.DefaultEncoder
agent1.sinks.kafkaSink.channel = memoryChannel

  那么當Flume的channel收到數據的時候,會根據配置文件主動把數據event發送到Kafka的broker上,所以只要安裝好Kafka就可以消費收據了。

Step 1: Download the code

下載安裝包並解壓

 > tar -xzf kafka_2.11-0.10.0.0.tgz

 > cd kafka_2.11-0.10.0.0

Step 2: Start the server

Kafka是基於Zookeeperl來實現分布式協同的,因此先啟動Zookeeper:

 > %Zookeeper_Home%/bin/zkServer.sh start

在配置文件server.properties把下面一句前面的注釋去掉,然后啟動Kafka服務器

 > #listeners=PLAINTEXT://:9092

 > bin/kafka-server-start.sh config/server.properties

接下來啟動其他兩個broker:

 > cp config/server.properties config/server-1.properties

 > cp config/server.properties config/server-2.properties

 修改配置文件,broker.id不能重復

config/server-1.properties: broker.id=1
config/server-2.properties: broker.id=2

Step 3: Create a topic

創建一個TRAFFIC_LOG主題的broker,復制因子為3(因為有3個Kafka服務器集群),分區個數為1

 > bin/kafka-topics.sh --create --zookeeper 10.208.129.4:2181 --replication-factor 3 --partitions 1 --topic TRAFFIC_LOG

Step 5: Start a consumer

 > bin/kafka-console-consumer.sh --zookeeper 10.208.129.4:2181/kafka --topic TRAFFIC_LOG --from-beginning

topic一定要寫正確了,否則消費不到數據

如果在終端看到之前接入的Thrift Source輸出,那么整個Flume+Kafka算是跑通了


 這里已經引入了Flume和Kafka,下一篇將介紹Kafka以及Flume和Kafka的區別


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM