從0開始搭建多節點kafka集群


環境及版本:mac系統,kafka 0.11,java8,zk3.4.10,100%可復現

安裝Java安裝ZooKeeper安裝kafka驗證測試topic創建測試消息讀寫

安裝Java

  不論是ZooKeeper還是kakfa都需要提前安裝好Java,這里選擇的是java8版本。下載地址:https://www.java.com/zh_CN/download/mac_download.jsp


  按提示步驟下載即可,下載完成后,我們需要配置一下java的環境變量。
  首先我們在終端中輸入: /usr/libexec/java_home -V,用來查看我們安裝的jdk地址

  選擇要進行配置的java版本,復制路徑。輸入 open -e .bash_profile,打開配置文件,按照以下格式輸入,並將之前復制的路徑,粘貼到JAVA_HOME處。保存文件

 

1JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home
2PATH=$JAVA_HOME/bin:$PATH
3export 
4export

  輸入source .bash_profile進行保存,接下來我們就可以驗證以下配置是否成功。
  輸入echo $JAVA_HOME查看JAVA_HOME配置,輸入java -version查看java版本。

安裝ZooKeeper

  ZooKeeper是安裝Kafka集群必要的組件,我們選擇要安裝的Zk版本是3.4.10。
  下載地址:
  https://archive.apache.org/dist/zookeeper/
  zookeeper的默認配置文件為zookeeper/conf/zoo_sample.cfg,需要將其修改為zoo.cfg,其中存放着zookeeper的配置信息,我們可以根據需要進行調整。最后我們可以在終端中進入zk的安裝路徑,輸入bin/zkServer.sh start啟動zk。你將看到如下輸出

1ZooKeeper JMX enabled by default
2Using config: /Users/lijl/kafka/zookeeper-3.4.10/bin/../conf/zoo.cfg
3Starting zookeeper ... STARTED

  還可以輸入:bin/zkServer.sh status來查看當前zk的狀態,來驗證zk是否啟動成功。

1ZooKeeper JMX enabled by default
2Using config: /Users/lijl/kafka/zookeeper-3.4.10/bin/../conf/zoo.cfg
3Mode: standalone

  從輸出可以看出,我們已經成功啟動了一個單點zk,我們可以輸入bin/zkServer.sh stop來關閉zk。

1ZooKeeper JMX enabled by default
2Using config: /Users/lijl/kafka/zookeeper-3.4.10/bin/../conf/zoo.cfg
3Stopping zookeeper ... STOPPED

安裝kafka

  接下來就是我們這篇文章的重點,安裝kafka,我們要安裝的是一個多點環境的kafka,因為kafka偽集群(多個kakfa部署在同一個服務器上)只能進行業務開發或者是驗證功能。但是不能在生產環境直接運用,否則就會失去kafka的分布式特性,比如負載均衡、故障轉移等。
  多點kafka集群,是由一套多節點ZooKeeper集群和一套多節點kafka集群組成。所以我們想要搭建多點的kafka集群,首先要搭建多點的Zookeeper集群。ZooKeeper想要正常服務,要滿足一個條件,“半數以上服務器存活”,即一個2n+1個服務器搭建的集群,最多可以容忍n台服務器宕機而依然保持服務狀態。(我們最好選取奇數個zk服務器搭建集群,否則在進行leader選舉的時候,會出現腦裂現象),本文將搭建3個節點的zk集群,主機名分別是:zk1、zk2、zk3。
  下面我們給出一個經典的多節點環境配置文件zoo.cfg,並對其參數進行解釋。

1tickTime=2000
2dataDir=/Users/lijl/kafka/dataDir
3clientPort=2181
4initLimit=5
5syncLimit=2
6server.1=zk1:2888:3888
7server.2=zk2:2888:3888
8server.3=zk3:2888:3888

  tickTime:Zookeeper最小時間單位,用於丈量心跳時間和超時時間,通常設置成默認值:2秒即可
  dataDir:ZooKeeper會在內存中保存系統快照,並定期寫入該路徑的指定文件夾中,生產環境下應該轉移該文件夾的磁盤占用情況。
  clientPort:Zookeeper監聽客戶端鏈接的端口,一般設置成默認的2181
  initLimit:指定follower節點初始時連接leader的最大tick次數,假設是5,那么follower必須在5 * tickTime(2s) = 10s內連接上leader,否則超時。
  SyncLimit:設定了follower節點和leader節點進行同步的最大時間,算法和initLimit類似。
  server.X=host:port:port:X必須是一個全局唯一的數字,並且要與myid文件中的數字對應,X一般取1~255,這行的后面配置了2個端口,第一個端口用於follower節點連接leader節點,第二個端口用於leader選舉。myid文件位於zoo.cfg中DataDir配置的目錄下,內容僅有一個數字ID。
  接下來,我們進入conf目錄,創建3個配置文件zoo1.cfg、zoo2.cfg、zoo3.cfg內容如下(注意:這里server.x配置中的zk是我們zk節點所在的主機名):

 1tickTime=2000
2dataDir=/Users/lijl/kafka/dataDir/zk1
3clientPort=2181
4initLimit=5
5syncLimit=2
6server.1=zk:2888:3888
7server.2=zk:2889:3889
8server.3=zk:2890:3890
9
10tickTime=2000
11dataDir=/Users/lijl/kafka/dataDir/zk2
12clientPort=2182
13initLimit=5
14syncLimit=2
15server.1=zk:2888:3888
16server.2=zk:2889:3889
17server.3=zk:2890:3890
18
19tickTime=2000
20dataDir=/Users/lijl/kafka/dataDir/zk3
21clientPort=2183
22initLimit=5
23syncLimit=2
24server.1=zk:2888:3888
25server.2=zk:2889:3889
26server.3=zk:2890:3890

  這里我們分別取了2181、2182、2183,其實取一個也是可以的,要確保端口沒有沖突。文本只是在一台電腦上進行搭建,所以選了不同的端口,防止沖突,主機名也是相同的。
  創建好配置文件,接下來就要創建myid文件了,該文件必須在我們配置的dataDir中,在終端中輸入以下內容:
  echo "1" > /Users/lijl/kafka/dataDir/zk1/myid
  echo "2" > /Users/lijl/kafka/dataDir/zk2/myid
  echo "3" > /Users/lijl/kafka/dataDir/zk3/myid
  進行快速的創建文件以及數據的寫入,接下來我們來運行一下這3個zk實例。
  bin/zkServer.sh start conf/zoo1.cfg
  bin/zkServer.sh start conf/zoo2.cfg
  bin/zkServer.sh start conf/zoo3.cfg
  並分別查看他們的狀態:bin/zkServer.sh status conf/zoo1.cfg(zoo2、zoo3類似)

 1ZooKeeper JMX enabled by default
2Using config: conf/zoo1.cfg
3Mode: follower
4
5ZooKeeper JMX enabled by default
6Using config: conf/zoo2.cfg
7Mode: leader
8
9conf/zoo3.cfg
10ZooKeeper JMX enabled by default
11Using config: conf/zoo3.cfg
12Mode: follower

  至此,我們已經搭建了一個3個節點的zookeepr集群。接下來我們就在這個多節點的zk集群的基礎上,搭建多節點kafka集群。
  首先我們下載kafka,選取的版本是0.11.3,下載地址:
  http://kafka.apache.org/downloads


  和搭建zk集群類似,我們也需要創建多份配置文件。(conf下創建3份配置文件,注意:這里listeners配置中的kafka1/2/3 代表我們kafka集群的主機名,zookeeper.connect中配置的zk,是我們zk節點所在的主機名)

 

 1broker.id=0
2delete.topic.enable=true
3listeners=PLAINTEXT://kafka1:9002
4log.dirs=/Users/lijl/kafka/dataDir/kafka1
5zookeeper.connect=zk:2181,zk:2182,zk1:2183
6unclean.leader.election.enable=false
7zookeeper.connection.timeout.ms=6000
8
9broker.id=1
10delete.topic.enable=true
11listeners=PLAINTEXT://kafka2:9003
12log.dirs=/Users/lijl/kafka/dataDir/kafka2
13zookeeper.connect=zk:2181,zk:2182,zk1:2183
14unclean.leader.election.enable=false
15zookeeper.connection.timeout.ms=6000
16
17broker.id=2
18delete.topic.enable=true
19listeners=PLAINTEXT://kafka3:9004
20log.dirs=/Users/lijl/kafka/dataDir/kafka3
21zookeeper.connect=zk:2181,zk:2182,zk1:2183
22unclean.leader.election.enable=false
23zookeeper.connection.timeout.ms=6000

  我們簡單的介紹一下這些配置的含義:
  broder.id:broder的唯一標識
  delete.topic.enable:是否可以刪除主題
  listener:該參數用於客戶端和broker進行連接,用來綁定私網ip,如果想綁定公網ip,應該設置的是:advertised.listeners這個參數。
  log.dirs:kafka消息持久化的路徑
  zookeeper.connect:zk節點,這里注意,應該配置全部的zk節點
  unclean.leader.election.enable:消息同步落后太多的副本可不可以進行leader選舉。
  zookeeper.connection.timeout.ms:zk連接超時時間。
  接着我們只需要執行命令啟動kafka就可以了。
  bin/kafka-server-start.sh -daemon config/server1.properties
  bin/kafka-server-start.sh -daemon config/server2.properties
  bin/kafka-server-start.sh -daemon config/server3.properties
  我們可以查看位於kafka logs目錄下的server.log,確認kafka broker已經啟動成功。


  也可以通過 jps |grep Kafka來確認broker進程是否啟動成功(Kafka大寫)

   如果沒有正確啟動,首先應該着重檢查kafka配置是否正確,比如listeners少加了一個s,那么在啟動kafka的時候,就會啟動默認的主機和端口配置,會報端口占用,而不是幫你檢查配置文件是否正確

 

驗證

  此時我們的kakfa多點集群環境已經搭建完成,接下來我們來對這個集群做一些簡單的驗證。

測試topic創建

  首先我們創建一個測試用的topic,名為test-topic,為了充分利用搭建的3台服務器,我們選擇創建3個分區,每一個分區分配3個副本。
  bin/kafka-topics.sh --zookeeper zk:2181,zk:2182,zk:2183 --create --topic test-topic --partitions 3 --replication-factor 3
  這里的zk指的是zookeeper節點主機名,執行成功后,會輸出:Created topic "test-topic"
  接下來通過一些命令來做一下驗證,查看所有的topic列表
  bin/kafka-topics.sh --zookeeper zk:2181,zk:2182,zk:2183 -list,執行后顯示我們剛創建的test-topic
  bin/kafka-topics.sh --zookeeper zk:2181,zk:2182,zk:2183 --describe --topic test-topic,執行后將顯示test-topic的詳細信息,包括分區數,副本數,ISR,分區Leader信息等。

1Topic:test-topic    PartitionCount:3    ReplicationFactor:3 Configs:
2    Topic: test-topic   Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0
3    Topic: test-topic   Partition: 1    Leader: 2   Replicas: 2,0,1 Isr: 2,0,1
4    Topic: test-topic   Partition: 2    Leader: 0   Replicas: 0,1,2 Isr: 0,1,2

測試消息讀寫

  最后我們對消息的發送和消費進行測試,使用kafka提供的kafka-console-producer和kafka-console-consumer腳本進行數據的讀寫。具體的命令如下:
  bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094 --topic test-topic --from-beginning
  bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094 --topic test-topic --from-beginning
  我們可以同時開啟兩個終端進行操作,命令中的kafka1/2/3指的是kafka所在的節點主機名。我們可以觀察出來,producer腳本產生的消息,consumer腳本馬上就能感知到。


  最后,期待您的訂閱和點贊,專欄每周都會更新,希望可以和您一起進步,同時也期待您的批評與指正!

imageimage


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM