環境及版本:mac系統,kafka 0.11,java8,zk3.4.10,100%可復現
安裝Java安裝ZooKeeper安裝kafka驗證測試topic創建測試消息讀寫
安裝Java
不論是ZooKeeper還是kakfa都需要提前安裝好Java,這里選擇的是java8版本。下載地址:https://www.java.com/zh_CN/download/mac_download.jsp

按提示步驟下載即可,下載完成后,我們需要配置一下java的環境變量。
首先我們在終端中輸入: /usr/libexec/java_home -V,用來查看我們安裝的jdk地址

選擇要進行配置的java版本,復制路徑。輸入 open -e .bash_profile,打開配置文件,按照以下格式輸入,並將之前復制的路徑,粘貼到JAVA_HOME處。保存文件
1JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home
2PATH=$JAVA_HOME/bin:$PATH
3export
4export
輸入source .bash_profile進行保存,接下來我們就可以驗證以下配置是否成功。
輸入echo $JAVA_HOME查看JAVA_HOME配置,輸入java -version查看java版本。

安裝ZooKeeper
ZooKeeper是安裝Kafka集群必要的組件,我們選擇要安裝的Zk版本是3.4.10。
下載地址:
https://archive.apache.org/dist/zookeeper/
zookeeper的默認配置文件為zookeeper/conf/zoo_sample.cfg,需要將其修改為zoo.cfg,其中存放着zookeeper的配置信息,我們可以根據需要進行調整。最后我們可以在終端中進入zk的安裝路徑,輸入bin/zkServer.sh start啟動zk。你將看到如下輸出
1ZooKeeper JMX enabled by default
2Using config: /Users/lijl/kafka/zookeeper-3.4.10/bin/../conf/zoo.cfg
3Starting zookeeper ... STARTED
還可以輸入:bin/zkServer.sh status來查看當前zk的狀態,來驗證zk是否啟動成功。
1ZooKeeper JMX enabled by default
2Using config: /Users/lijl/kafka/zookeeper-3.4.10/bin/../conf/zoo.cfg
3Mode: standalone
從輸出可以看出,我們已經成功啟動了一個單點zk,我們可以輸入bin/zkServer.sh stop來關閉zk。
1ZooKeeper JMX enabled by default
2Using config: /Users/lijl/kafka/zookeeper-3.4.10/bin/../conf/zoo.cfg
3Stopping zookeeper ... STOPPED
安裝kafka
接下來就是我們這篇文章的重點,安裝kafka,我們要安裝的是一個多點環境的kafka,因為kafka偽集群(多個kakfa部署在同一個服務器上)只能進行業務開發或者是驗證功能。但是不能在生產環境直接運用,否則就會失去kafka的分布式特性,比如負載均衡、故障轉移等。
多點kafka集群,是由一套多節點ZooKeeper集群和一套多節點kafka集群組成。所以我們想要搭建多點的kafka集群,首先要搭建多點的Zookeeper集群。ZooKeeper想要正常服務,要滿足一個條件,“半數以上服務器存活”,即一個2n+1個服務器搭建的集群,最多可以容忍n台服務器宕機而依然保持服務狀態。(我們最好選取奇數個zk服務器搭建集群,否則在進行leader選舉的時候,會出現腦裂現象),本文將搭建3個節點的zk集群,主機名分別是:zk1、zk2、zk3。
下面我們給出一個經典的多節點環境配置文件zoo.cfg,並對其參數進行解釋。
1tickTime=2000
2dataDir=/Users/lijl/kafka/dataDir
3clientPort=2181
4initLimit=5
5syncLimit=2
6server.1=zk1:2888:3888
7server.2=zk2:2888:3888
8server.3=zk3:2888:3888
tickTime:Zookeeper最小時間單位,用於丈量心跳時間和超時時間,通常設置成默認值:2秒即可
dataDir:ZooKeeper會在內存中保存系統快照,並定期寫入該路徑的指定文件夾中,生產環境下應該轉移該文件夾的磁盤占用情況。
clientPort:Zookeeper監聽客戶端鏈接的端口,一般設置成默認的2181
initLimit:指定follower節點初始時連接leader的最大tick次數,假設是5,那么follower必須在5 * tickTime(2s) = 10s內連接上leader,否則超時。
SyncLimit:設定了follower節點和leader節點進行同步的最大時間,算法和initLimit類似。
server.X=host:port:port:X必須是一個全局唯一的數字,並且要與myid文件中的數字對應,X一般取1~255,這行的后面配置了2個端口,第一個端口用於follower節點連接leader節點,第二個端口用於leader選舉。myid文件位於zoo.cfg中DataDir配置的目錄下,內容僅有一個數字ID。
接下來,我們進入conf目錄,創建3個配置文件zoo1.cfg、zoo2.cfg、zoo3.cfg內容如下(注意:這里server.x配置中的zk是我們zk節點所在的主機名):
1tickTime=2000
2dataDir=/Users/lijl/kafka/dataDir/zk1
3clientPort=2181
4initLimit=5
5syncLimit=2
6server.1=zk:2888:3888
7server.2=zk:2889:3889
8server.3=zk:2890:3890
9
10tickTime=2000
11dataDir=/Users/lijl/kafka/dataDir/zk2
12clientPort=2182
13initLimit=5
14syncLimit=2
15server.1=zk:2888:3888
16server.2=zk:2889:3889
17server.3=zk:2890:3890
18
19tickTime=2000
20dataDir=/Users/lijl/kafka/dataDir/zk3
21clientPort=2183
22initLimit=5
23syncLimit=2
24server.1=zk:2888:3888
25server.2=zk:2889:3889
26server.3=zk:2890:3890
這里我們分別取了2181、2182、2183,其實取一個也是可以的,要確保端口沒有沖突。文本只是在一台電腦上進行搭建,所以選了不同的端口,防止沖突,主機名也是相同的。
創建好配置文件,接下來就要創建myid文件了,該文件必須在我們配置的dataDir中,在終端中輸入以下內容:
echo "1" > /Users/lijl/kafka/dataDir/zk1/myid
echo "2" > /Users/lijl/kafka/dataDir/zk2/myid
echo "3" > /Users/lijl/kafka/dataDir/zk3/myid
進行快速的創建文件以及數據的寫入,接下來我們來運行一下這3個zk實例。
bin/zkServer.sh start conf/zoo1.cfg
bin/zkServer.sh start conf/zoo2.cfg
bin/zkServer.sh start conf/zoo3.cfg
並分別查看他們的狀態:bin/zkServer.sh status conf/zoo1.cfg(zoo2、zoo3類似)
1ZooKeeper JMX enabled by default
2Using config: conf/zoo1.cfg
3Mode: follower
4
5ZooKeeper JMX enabled by default
6Using config: conf/zoo2.cfg
7Mode: leader
8
9conf/zoo3.cfg
10ZooKeeper JMX enabled by default
11Using config: conf/zoo3.cfg
12Mode: follower
至此,我們已經搭建了一個3個節點的zookeepr集群。接下來我們就在這個多節點的zk集群的基礎上,搭建多節點kafka集群。
首先我們下載kafka,選取的版本是0.11.3,下載地址:
http://kafka.apache.org/downloads

和搭建zk集群類似,我們也需要創建多份配置文件。(conf下創建3份配置文件,注意:這里listeners配置中的kafka1/2/3 代表我們kafka集群的主機名,zookeeper.connect中配置的zk,是我們zk節點所在的主機名)
1broker.id=0
2delete.topic.enable=true
3listeners=PLAINTEXT://kafka1:9002
4log.dirs=/Users/lijl/kafka/dataDir/kafka1
5zookeeper.connect=zk:2181,zk:2182,zk1:2183
6unclean.leader.election.enable=false
7zookeeper.connection.timeout.ms=6000
8
9broker.id=1
10delete.topic.enable=true
11listeners=PLAINTEXT://kafka2:9003
12log.dirs=/Users/lijl/kafka/dataDir/kafka2
13zookeeper.connect=zk:2181,zk:2182,zk1:2183
14unclean.leader.election.enable=false
15zookeeper.connection.timeout.ms=6000
16
17broker.id=2
18delete.topic.enable=true
19listeners=PLAINTEXT://kafka3:9004
20log.dirs=/Users/lijl/kafka/dataDir/kafka3
21zookeeper.connect=zk:2181,zk:2182,zk1:2183
22unclean.leader.election.enable=false
23zookeeper.connection.timeout.ms=6000
我們簡單的介紹一下這些配置的含義:
broder.id:broder的唯一標識
delete.topic.enable:是否可以刪除主題
listener:該參數用於客戶端和broker進行連接,用來綁定私網ip,如果想綁定公網ip,應該設置的是:advertised.listeners這個參數。
log.dirs:kafka消息持久化的路徑
zookeeper.connect:zk節點,這里注意,應該配置全部的zk節點
unclean.leader.election.enable:消息同步落后太多的副本可不可以進行leader選舉。
zookeeper.connection.timeout.ms:zk連接超時時間。
接着我們只需要執行命令啟動kafka就可以了。
bin/kafka-server-start.sh -daemon config/server1.properties
bin/kafka-server-start.sh -daemon config/server2.properties
bin/kafka-server-start.sh -daemon config/server3.properties
我們可以查看位於kafka logs目錄下的server.log,確認kafka broker已經啟動成功。

也可以通過 jps |grep Kafka來確認broker進程是否啟動成功(Kafka大寫)

如果沒有正確啟動,首先應該着重檢查kafka配置是否正確,比如listeners少加了一個s,那么在啟動kafka的時候,就會啟動默認的主機和端口配置,會報端口占用,而不是幫你檢查配置文件是否正確
驗證
此時我們的kakfa多點集群環境已經搭建完成,接下來我們來對這個集群做一些簡單的驗證。
測試topic創建
首先我們創建一個測試用的topic,名為test-topic,為了充分利用搭建的3台服務器,我們選擇創建3個分區,每一個分區分配3個副本。
bin/kafka-topics.sh --zookeeper zk:2181,zk:2182,zk:2183 --create --topic test-topic --partitions 3 --replication-factor 3
這里的zk指的是zookeeper節點主機名,執行成功后,會輸出:Created topic "test-topic"
接下來通過一些命令來做一下驗證,查看所有的topic列表
bin/kafka-topics.sh --zookeeper zk:2181,zk:2182,zk:2183 -list,執行后顯示我們剛創建的test-topic
bin/kafka-topics.sh --zookeeper zk:2181,zk:2182,zk:2183 --describe --topic test-topic,執行后將顯示test-topic的詳細信息,包括分區數,副本數,ISR,分區Leader信息等。
1Topic:test-topic PartitionCount:3 ReplicationFactor:3 Configs:
2 Topic: test-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
3 Topic: test-topic Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
4 Topic: test-topic Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
測試消息讀寫
最后我們對消息的發送和消費進行測試,使用kafka提供的kafka-console-producer和kafka-console-consumer腳本進行數據的讀寫。具體的命令如下:
bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094 --topic test-topic --from-beginning
bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094 --topic test-topic --from-beginning
我們可以同時開啟兩個終端進行操作,命令中的kafka1/2/3指的是kafka所在的節點主機名。我們可以觀察出來,producer腳本產生的消息,consumer腳本馬上就能感知到。


最后,期待您的訂閱和點贊,專欄每周都會更新,希望可以和您一起進步,同時也期待您的批評與指正!
