zookeeper與卡夫卡集群搭建


首先這片博客沒有任何理論性的東西,只是詳細說明kafka與zookeeper集群的搭建過程,需要三台linux服務器。

  • java環境變量設置
  • zookeeper集群搭建
  • kafka集群搭建

java環境變量設置

在每台服務器上都有設置java環境變量

這里使用java源碼安裝的方式:

下載源碼包解壓,放入到/usr/local/文件夾下,修改名目錄名字為jdk!接下就是把java的命令參數加入到linux的環境變量中。

[root@test3 jdk]# cat /etc/profile.d/java.sh 
JAVA_HOME=/usr/local/jdk
JAVA_BIN=/usr/local/jdk/bin
JRE_HOME=/usr/local/jdk/jre
PATH=$PATH:/usr/local/jdk/bin:/usr/local/jdk/jre/bin
CLASSPATH=/usr/local/jdk/jre/lib:/usr/local/jdk/lib:/usr/local/jdk/jre/lib/charsets.jar
[root@test3 jdk]# 

#然后執行source命令,加載新加入的java.sh腳本!
[root@test3 bin]# source /etc/profile.d/java.sh

#驗證java環境變量是否設置成功
[root@test3 bin]# java -version
java version "1.7.0_79"
Java(TM) SE Runtime Environment (build 1.7.0_79-b15)
Java HotSpot(TM) 64-Bit Server VM (build 24.79-b02, mixed mode)
[root@test3 bin]# echo $JAVA_HOME #查看JAVA_HOME變量是否是上面設置的路徑
/usr/local/jdk/jdk
[root@test3 bin]#

若是以上均反饋正常,則說明java環境已經設置完畢!

zookeeper集群搭建

zookeeper集群被稱為群組。zookeeper使用的是一致性協議,所以建議每個群組里應該包含奇數個節點,因為只有當群組中大多數節點處於可用狀態,zookeeper才能處理外部請求。也就是說,如果一個包含3個節點的群組,那么它允許一個節點失效。如果包含5個節點,那么它允許2個節點失效。【不建議zookeeper群組超過7個節點,因為zookeeper使用了一致性協議,節點過多會降低群組的性能】

群組中的每個服務器需要在數據目錄中創建一個myid文件,用於指明自己的ID。

這里zookeeper采用源碼的方式安裝:

[root@test3 src]# tar zxvf zookeeper-3.4.6.tar.gz -C ../         #解壓
[root@test3 src]# cd ../
[root@test2 local]# mv zookeeper-3.4.6 zookeeper                 #修改名字
[root@test3 local]# cd zookeeper
[root@test3 zookeeper]# ls
bin        CHANGES.txt  contrib     docs             ivy.xml  LICENSE.txt  README_packaging.txt  recipes  zookeeper-3.4.6.jar      zookeeper-3.4.6.jar.md5   zookeeper.out
build.xml  conf         dist-maven  ivysettings.xml  lib      NOTICE.txt   README.txt            src      zookeeper-3.4.6.jar.asc  zookeeper-3.4.6.jar.sha1
[root@test3 zookeeper]# mkdir -p /data/zookeeper/data                 #創建數據目錄
[root@test3 zookeeper]# mkdir -p /data/zookeeper/logs                 #創建日志文件目錄
[root@test1 zookeeper]# cd conf [root@test1 conf]#
ls configuration.xsl log4j.properties zoo_sample.cfg [root@test1 conf]# mv zoo_sample.cfg zoo.cfg #修改配置文件的名字,建議把原來的配置文件做備份

zookeeper的配置文件如下:

# The number of milliseconds of each tick
tickTime=2000 #單位為毫秒
# The number of ticks that the initial # synchronization phase can take initLimit
=10
#表示用於在從節點與主節點之間建立初始化連接的上限。
# The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5
#表示允許從節點與主節點處於不同步轉台的時間上限。
#initLimit與syncLimit這兩個值都是tickTime的倍數,表示的時間是tickTime乘以這兩個數值的值。 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/data/zookeeper/data #指定數據目錄文件 dataLogDir=/data/zookeeper/logs #指定日志目錄文件 # the port at which the clients will connect clientPort=2181 #zookeeper監聽的端口 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1

#zookeeper的群組設置
server.1=10.0.102.179:2888:3888 server.2=10.0.102.204:2888:3888 server.3=10.0.102.214:2888:3888

#群組設置的格式為: server.X=hostname:peerPort:leader:Port
X:服務器的ID,必須是一個整數,不過不一定要從0開始,也不要求是連續的。
peerPort:用於節點間通信的TCP端口。
leaderPort:用於leader選舉的TCP端口。

客戶端只需要通過clientport就能連接到群組,而群組節點間的通信則需要同時用到這三個端口。

 

【上面的操作,只是修改了配置文件,創建了數據目錄和日志文件目錄,還有就是解壓了源碼包!

下面要做的就是把上面操作在另外兩台機器上做一遍,也可以把上面的文件直接通過scp命令復制到其余兩台機器對應的位置上。】

同MySQL集群一樣,集群每台服務器都需要一個唯一的標識,zookeeper也一樣,需要一個myid文件,來標識每一台服務器!

[root@test1 data]# pwd
/data/zookeeper/data
[root@test1 data]# cat myid 
1

#在每台集群的數據文件目錄下面創建myid文件,每個文件中寫入一個整數,這個整數用來指明當前服務器的ID。
#這個ID應該是需要和上面配置文件中的那個server.X中的X一致的。【遇到過不一致的情況,但是啟動失敗,修改為一致,就啟動成功】

另外兩台的myid文件如下:

[root@test2 data]# pwd
/data/zookeeper/data
[root@test2 data]# cat myid 
2

[root@test3 data]# pwd
/data/zookeeper/data
[root@test3 data]# cat myid
3

然后啟動zookeeper集群:

#三台服務器都這樣啟動,會在當前目錄執行路徑下面生成nohup.out文件,若是報錯的話,可以查看文件內容!
[root@test3 bin]# pwd
/usr/local/zookeeper/bin
[root@test3 bin]# ./zkServer.sh start JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... STARTED

#需要注意的是,啟動完成之后一定要查看是否有進程,有些時候不會報錯,但是沒有啟動成功!
[root@test3 bin]# netstat -alntp | grep 2181 #查看監聽端口是否存在
tcp        0      0 :::2181                     :::*                        LISTEN      27962/java

三台服務器全部啟動完成之后,進行如下驗證:

[root@test3 bin]# ./zkCli.sh --server10.0.102.204
Connecting to localhost:2181
2018-12-21 10:54:38,286 [myid:] - INFO  [main:Environment@100] - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
2018-12-21 10:54:38,289 [myid:] - INFO  [main:Environment@100] - Client environment:host.name=test3
2018-12-21 10:54:38,289 [myid:] - INFO  [main:Environment@100] - Client environment:java.version=1.7.0_79
2018-12-21 10:54:38,291 [myid:] - INFO  [main:Environment@100] - Client environment:java.vendor=Oracle Corporation
2018-12-21 10:54:38,291 [myid:] - INFO  [main:Environment@100] - Client environment:java.home=/usr/local/jdk/jdk1.7.0_79/jre
2018-12-21 10:54:38,291 [myid:] - INFO  [main:Environment@100] - Client environment:java.class.path=/usr/local/zookeeper/bin/../build/classes:/usr/local/zookeeper/bin/../build/lib/*.jar:/usr/local/zookeeper/bin/../lib/slf4j-log4j12-1.6.1.jar:/usr/local/zookeeper/bin/../lib/slf4j-api-1.6.1.jar:/usr/local/zookeeper/bin/../lib/netty-3.7.0.Final.jar:/usr/local/zookeeper/bin/../lib/log4j-1.2.16.jar:/usr/local/zookeeper/bin/../lib/jline-0.9.94.jar:/usr/local/zookeeper/bin/../zookeeper-3.4.6.jar:/usr/local/zookeeper/bin/../src/java/lib/*.jar:/usr/local/zookeeper/bin/../conf:

2018-12-21 10:54:38,291 [myid:] - INFO  [main:Environment@100] - Client environment:java.library.path=/usr/local/mysql/lib:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2018-12-21 10:54:38,291 [myid:] - INFO  [main:Environment@100] - Client environment:java.io.tmpdir=/tmp
2018-12-21 10:54:38,292 [myid:] - INFO  [main:Environment@100] - Client environment:java.compiler=<NA>
2018-12-21 10:54:38,292 [myid:] - INFO  [main:Environment@100] - Client environment:os.name=Linux
2018-12-21 10:54:38,292 [myid:] - INFO  [main:Environment@100] - Client environment:os.arch=amd64
2018-12-21 10:54:38,292 [myid:] - INFO  [main:Environment@100] - Client environment:os.version=2.6.32-504.el6.x86_64
2018-12-21 10:54:38,292 [myid:] - INFO  [main:Environment@100] - Client environment:user.name=root
2018-12-21 10:54:38,292 [myid:] - INFO  [main:Environment@100] - Client environment:user.home=/root
2018-12-21 10:54:38,292 [myid:] - INFO  [main:Environment@100] - Client environment:user.dir=/usr/local/zookeeper/bin
2018-12-21 10:54:38,293 [myid:] - INFO  [main:ZooKeeper@438] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@1280d11b
Welcome to ZooKeeper!
2018-12-21 10:54:38,315 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@975] - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
JLine support is enabled
2018-12-21 10:54:38,320 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@852] - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-12-21 10:54:38,330 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@1235] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x367ce7db1fa0003, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0]


#若有如上顯示,則zookeeper集群搭建成功

搭建卡夫卡集群

仍然采用源碼的方式安裝。安裝過程和zookeeper差不多,解壓,修改配置文件,創建對應消息目錄。

[root@test3 src]# tar -zxvf kafka_2.10-0.10.0.0.tgz -C ../         #解壓
[root@test3 src]# cd ..
[root@test3 local]# mv kafka_2.10-0.10.0.0 kafka                   #修改名字
[root@test3 local]# cd kafka/
[root@test3 kafka]# ls
bin  config  libs  LICENSE  logs  NOTICE  site-docs
[root@test3 kafka]# cd config/                                      #
[root@test3 config]# ls                                             #kafka的配置文件有很多,但是目前我們只需要關注 server.properties
connect-console-sink.properties    connect-file-sink.properties    connect-standalone.properties  producer.properties     zookeeper.properties
connect-console-source.properties  connect-file-source.properties  consumer.properties            server.properties
connect-distributed.properties     connect-log4j.properties        log4j.properties               tools-log4j.properties

配置文件如下:把其中的注釋刪掉了!

broker.id=3                            #在每個kafka服務器中唯一的,是一個整數
port=9092
host.name=10.0.102.179
auto.create.topics.enable = false      
delete.topic.enable = true
num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600


log.dirs=/data/kafka/logs                 

num.partitions=1

num.recovery.threads.per.data.dir=1

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

zookeeper.connect=10.0.102.179:2181,10.0.102.204:2181,10.0.102.214:2181
zookeeper.connection.timeout.ms=6000

配置文件的參數的說明:

  • broker.id: 每一個broker都需要一個標識符,使用broker.id來表示。默認是0,可以被設置為任意整數,在kafka集群中必須是唯一的。
  • port:默認端口是9092,配置kafka監聽的端口。
  • zookeeper.connect:用於保存broker元數據的zookeeper地址是通過zookeeper.connect來指定的。新式就是上面那樣,主機和端口號,多個地址用逗號隔開。上面的三個是一個zookeeper集群。若是當前連接的zookeeper宕機,broker可以連接到zookeeper群組中的其他服務器上。
  • log.dirs: kafka把所有消息都保存在磁盤上,存放這些日子片段的目錄是通過log.dirs指定的。它是一組用逗號分隔的本地文件系統路徑。如果指定了多個路徑,那么broker會根據“最少使用”原則,把同一個分區的日志片段保存到同一個路徑下。需要注意,borker會往擁有最少數目分區的路徑新增分區,而不是往擁有最小磁盤空間的路徑新增分區。
  • num.recovery.threads.per.data.dir,對於如下3種情況,kafka會使用多少個線程來處理日志片段。
    •   服務器正常啟動,用於打開每個分區的日志片段。
    •        服務器崩潰后重啟,用於檢查和截短每個分區的日志片段。
    •        服務器正常關閉,用於關閉日志片段。

          默認情況下,每個日志目錄只使用一個線程。因為這些線程只是在服務器啟動和關閉時會用到,所以完全可以設置大量的線程來達到並行操作的目的。特別是對於包含大量分區的服務器來說,一旦發生崩潰,在進行恢復時使用並行操作可能會省下很多時間。所配置的數字對應的是log.dirs指定的單個日志目錄。譬如,這個值被設置為8,並且log.dirx指定了3個路徑,那么久需要24個線程。

  • auto.create.topics.enable 默認情況下,kafka會在以下幾種情況下創建主題。
    •   當一個生產者開始往主題寫入消息時。
    •        當一個消費者開始從主題讀取消息時。
    •        當任意一個客戶端向主題發送元數據請求是。

    這個參數有點迷糊,盡量設置為false吧!

 

上面指定了kafa的日志目錄,需要創建消息目錄:

[root@test1 ~]#  mkdir -p /data/kafka/logs

和zookeeper一樣,把上面解壓后的文件和創建的日志目錄,分別拷貝到其余兩台服務器上,注意修改其余兩台服務器的broker.id。

然后啟動kafka:

[root@test3 bin]# ls
connect-distributed.sh     kafka-consumer-groups.sh             kafka-reassign-partitions.sh   kafka-simple-consumer-shell.sh   zookeeper-server-start.sh
connect-standalone.sh      kafka-consumer-offset-checker.sh     kafka-replay-log-producer.sh   kafka-topics.sh                  zookeeper-server-stop.sh
kafka-acls.sh              kafka-consumer-perf-test.sh          kafka-replica-verification.sh  kafka-verifiable-consumer.sh     zookeeper-shell.sh
kafka-configs.sh           kafka-mirror-maker.sh                kafka-run-class.sh             kafka-verifiable-producer.sh
kafka-console-consumer.sh  kafka-preferred-replica-election.sh  kafka-server-start.sh          windows
kafka-console-producer.sh  kafka-producer-perf-test.sh          kafka-server-stop.sh           zookeeper-security-migration.sh

#bin目錄下面有很多腳本,kafka自帶的有zookeeper啟動,但是建議搭建獨立的zookeeper集群。

[root@test2 bin]# ./kafka-server-start.sh /usr/local/kafka/config/server.properties 1>/dev/null 2>&1 &
[1] 8757
[root@test2 bin]#

#kafka默認標准輸出到屏幕,上面的命令把kafka掛起在后台運行。

#啟動之后查看是否監聽了9092端口。
[root@test2 bin]# netstat -lntp |grep 9092
tcp        0      0 ::ffff:10.0.102.204:9092    :::*                        LISTEN      8757/java           
[root@test2 bin]#

三台服務器全部啟動完畢,可以進行如下測試:

[root@test1 bin]# ./kafka-console-producer.sh --broker-list 10.0.102.179:9092 --topci science       #創建一個主題
topci is not a recognized option
[root@test1 bin]#

#創建一個生產者,發送消息
[root@test1 bin]# ./kafka-console-producer.sh --broker-list 10.0.102.179:9092 --topic science
test message
first
#創意一個消費者對象,接收生產者發送的消息
[root@test1 bin]# ./kafka-console-consumer.sh --zookeeper 10.0.102.179:2181 --topic science --from-beginning
test message

first

#可以看到卡夫卡是一個先進先出的消息隊列

若是以上的隊列消息驗證成功,則說明集群搭建成功、

[root@test1 bin]# ./kafka-topics.sh --zookeeper 10.0.102.179:2181 --describe --topic science         #查看主題的信息
Topic:science    PartitionCount:1    ReplicationFactor:1    Configs:
    Topic: science    Partition: 0    Leader: 2    Replicas: 2    Isr: 2
[root@test1 bin]# 

 

一個異常說明:在測試的時候,剛開始總報如下錯誤:

[root@test1 bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic lianxi
test1
[2018-12-20 17:33:27,112] ERROR Error when sending message to topic lianxi with key: null, value: 5 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

這個問題搞了好久,最好把java的版本從1.8降到了1.7之后,然后就成功了!

搭建成功的各個組件的版本信息如下:

[root@test1 ~]# java -version
java version "1.7.0_79"
Java(TM) SE Runtime Environment (build 1.7.0_79-b15)
Java HotSpot(TM) 64-Bit Server VM (build 24.79-b02, mixed mode)

#kafka沒有類似的version命令,需要查看kafka的使用包信息
[root@test1 libs]# pwd
/usr/local/kafka/libs
[root@test1 libs]# ls kafka_2.10-0.10.0.0.jar
kafka_2.10-0.10.0.0.jar

#2.10是scala的版本,后面的0.10.0.0是kafka的版本

#在zookeeper的安裝目錄下面有對應版本的一些包
[root@test1 zookeeper]# ls
bin        CHANGES.txt  contrib     docs             ivy.xml  LICENSE.txt  README_packaging.txt  recipes  zookeeper-3.4.6.jar      zookeeper-3.4.6.jar.md5
build.xml  conf         dist-maven  ivysettings.xml  lib      NOTICE.txt   README.txt            src      zookeeper-3.4.6.jar.asc  zookeeper-3.4.6.jar.sha1
[root@test1 zookeeper]#
#zookeeper的版本是3.4.6

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM