【Kafka入門】搭建Kafka本地環境


本博文介紹如何一步步搭建起Kafka本地環境。

下載Kafka 0.9.0.0 並配置軟鏈接

  下載好后,放入電腦本地安裝目錄,mac下我放在/usr/local下,解壓Kafka。

tar -xzf kafka_2.11-0.9.0.0.tgz

  然后建立一個當前版本的kafka的軟鏈接,這樣在配置的時候系統環境配置的時候不需要寫版本號,也有利於以后的版本更新。

ln -s kafka_2.11-0.9.0.0 kafka

  接着cd kafka進入主目錄。

 

啟動Kafka Server

  在啟動前,需要安裝並配置Zookeeper,不過在kafka中帶有一個單節點示例的Zookeeper實例腳本包,所以我們也可以先不安裝配置Zookeeper,在啟動Kafka Server前,我們先啟動Zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties

  在Zookeeper成功啟動以后,就可以啟動Kafka Server了:

bin/kafka-server-start.sh config/server.properties

 

創建topic話題

  通過下面的腳本,我們創建了一個名為test的topic話題,該話題分區數為1,冗余份數為1。然后通過list展示當前話題,會列出test。  

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --list --zookeeper localhost:2181

 

啟動生產者Producer和消費者Consumer

  通過以下腳本啟動一個Producer,然后可以輸入生產的消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

this is a test of producer

   通過下面的命令啟動消費者Consumer來消費,然后會打印出上述輸入的消息:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
this is a test of producer

  如果上述兩個命令,在不同的終端啟動,這時候你在生產者終端輸入消息並回車,消費者端可以看到。

 

啟動多broker的集群

  由於Kafka是分布式的消息系統,目前我們只是啟動了一個Kafka Server,並看不到分布式的效果,下面在本地啟動多個Kafka Server,首先我們需要將server的配置文件復制兩份,因為每個Kafka服務啟動都是依賴配置文件,在配置文件中設置了諸如服務啟動端口等信息。

cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

  修改上述復制出來的配置文件,由於默認服務啟動在9092端口,所以后續兩個server啟動在9093和9094;然后默認的server的broker id為0,由於id是borker的唯一標識,所以后續的id依次為1和2.默認的幾個屬性如下:

broker.id=0

############################# Socket Server Settings #############################

listeners=PLAINTEXT://:9092

# The port the socket server listens on
#port=9092

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

  server1和server2的關於這幾個屬性的配置修改如下:

broker.id=1
listeners=PLAINTEXT://:9093
port=9093
log.dirs=/tmp/kafka-logs-1

broker.id=2
listeners=PLAINTEXT://:9094
port=9094
log.dirs=/tmp/kafka-logs-2

  然后依次啟動兩個broker的服務:

bin/kafka-server-start.sh config/server-1.properties 
bin/kafka-server-start.sh config/server-2.properties

  最好在多個終端啟動,分別看看效果。

  接着我們啟動一個冗余數為3的topic並describe這個topic,效果顯示如下:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replication-topic
Created topic "my-replication-topic".
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replication-topic
Topic:my-replication-topic    PartitionCount:1    ReplicationFactor:3    Configs:
Topic: my-replication-topic    Partition: 0    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1

  顯示的內容一共是兩行,第一行是對所有partition的總結,后續各行是每個partition的信息,由於我們這里只有一個partition,所以后續只有1行(也就是第二行)

  • Leader:表示負責該partition所有讀寫的節點
  • Replicas:所有擁有該partition 日志的備份數據的節點,不管這些節點是否是Leader或者活着。
  • Isr:是所有同步Replicas集合。 這是Replicas列表的子集合,在這個集合中的都是活着並且趕上Leader進度的。

多broker的容錯性測試

  上面的my-replication-topic的Leader是0號服務,那我們嘗試找到0號broker並將該進程殺死,腳本如下:

Lilis-MacBook-Pro:kafka_2.10-0.9.0.0 lili$ ps | grep server.properties
 2366 ttys001    1:28.51 /Library/Java/JavaVirtualMachines。。。
 2686 ttys006    0:00.00 grep server.properties
Lilis-MacBook-Pro:kafka_2.10-0.9.0.0 lili$ kill -9 2366

  再describe my-replication-topic的時候,發現leader已經變了,此時變成了2號broker,而且同步Replicas集合中也少了0號broker。

Lilis-MacBook-Pro:kafka_2.10-0.9.0.0 lili$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replication-topic
Topic:my-replication-topic    PartitionCount:1    ReplicationFactor:3    Configs:
    Topic: my-replication-topic    Partition: 0    Leader: 2    Replicas: 0,2,1    Isr: 2,1
Lilis-MacBook-Pro:kafka_2.10-0.9.0.0 lili$

  當然,再次啟動消費者來消費消息,消息依然還是存在的。

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replication-topic

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM