首先這片博客沒有任何理論性的東西,只是詳細說明kafka與zookeeper集群的搭建過程,需要三台linux服務器。
- java環境變量設置
- zookeeper集群搭建
- kafka集群搭建
java環境變量設置
在每台服務器上都有設置java環境變量
這里使用java源碼安裝的方式:
下載源碼包解壓,放入到/usr/local/文件夾下,修改名目錄名字為jdk!接下就是把java的命令參數加入到linux的環境變量中。
[root@test3 jdk]# cat /etc/profile.d/java.sh JAVA_HOME=/usr/local/jdk JAVA_BIN=/usr/local/jdk/bin JRE_HOME=/usr/local/jdk/jre PATH=$PATH:/usr/local/jdk/bin:/usr/local/jdk/jre/bin CLASSPATH=/usr/local/jdk/jre/lib:/usr/local/jdk/lib:/usr/local/jdk/jre/lib/charsets.jar [root@test3 jdk]#
#然后執行source命令,加載新加入的java.sh腳本!
[root@test3 bin]# source /etc/profile.d/java.sh
#驗證java環境變量是否設置成功
[root@test3 bin]# java -version
java version "1.7.0_79"
Java(TM) SE Runtime Environment (build 1.7.0_79-b15)
Java HotSpot(TM) 64-Bit Server VM (build 24.79-b02, mixed mode)
[root@test3 bin]# echo $JAVA_HOME #查看JAVA_HOME變量是否是上面設置的路徑
/usr/local/jdk/jdk
[root@test3 bin]#
若是以上均反饋正常,則說明java環境已經設置完畢!
zookeeper集群搭建
zookeeper集群被稱為群組。zookeeper使用的是一致性協議,所以建議每個群組里應該包含奇數個節點,因為只有當群組中大多數節點處於可用狀態,zookeeper才能處理外部請求。也就是說,如果一個包含3個節點的群組,那么它允許一個節點失效。如果包含5個節點,那么它允許2個節點失效。【不建議zookeeper群組超過7個節點,因為zookeeper使用了一致性協議,節點過多會降低群組的性能】
群組中的每個服務器需要在數據目錄中創建一個myid文件,用於指明自己的ID。
這里zookeeper采用源碼的方式安裝:
[root@test3 src]# tar zxvf zookeeper-3.4.6.tar.gz -C ../ #解壓 [root@test3 src]# cd ../ [root@test2 local]# mv zookeeper-3.4.6 zookeeper #修改名字 [root@test3 local]# cd zookeeper [root@test3 zookeeper]# ls bin CHANGES.txt contrib docs ivy.xml LICENSE.txt README_packaging.txt recipes zookeeper-3.4.6.jar zookeeper-3.4.6.jar.md5 zookeeper.out build.xml conf dist-maven ivysettings.xml lib NOTICE.txt README.txt src zookeeper-3.4.6.jar.asc zookeeper-3.4.6.jar.sha1 [root@test3 zookeeper]# mkdir -p /data/zookeeper/data #創建數據目錄 [root@test3 zookeeper]# mkdir -p /data/zookeeper/logs #創建日志文件目錄
[root@test1 zookeeper]# cd conf [root@test1 conf]# ls configuration.xsl log4j.properties zoo_sample.cfg [root@test1 conf]# mv zoo_sample.cfg zoo.cfg #修改配置文件的名字,建議把原來的配置文件做備份
zookeeper的配置文件如下:
# The number of milliseconds of each tick
tickTime=2000 #單位為毫秒
# The number of ticks that the initial # synchronization phase can take initLimit=10
#表示用於在從節點與主節點之間建立初始化連接的上限。
# The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5
#表示允許從節點與主節點處於不同步轉台的時間上限。
#initLimit與syncLimit這兩個值都是tickTime的倍數,表示的時間是tickTime乘以這兩個數值的值。 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/data/zookeeper/data #指定數據目錄文件 dataLogDir=/data/zookeeper/logs #指定日志目錄文件 # the port at which the clients will connect clientPort=2181 #zookeeper監聽的端口 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1
#zookeeper的群組設置 server.1=10.0.102.179:2888:3888 server.2=10.0.102.204:2888:3888 server.3=10.0.102.214:2888:3888
#群組設置的格式為: server.X=hostname:peerPort:leader:Port
X:服務器的ID,必須是一個整數,不過不一定要從0開始,也不要求是連續的。
peerPort:用於節點間通信的TCP端口。
leaderPort:用於leader選舉的TCP端口。
客戶端只需要通過clientport就能連接到群組,而群組節點間的通信則需要同時用到這三個端口。
【上面的操作,只是修改了配置文件,創建了數據目錄和日志文件目錄,還有就是解壓了源碼包!
下面要做的就是把上面操作在另外兩台機器上做一遍,也可以把上面的文件直接通過scp命令復制到其余兩台機器對應的位置上。】
同MySQL集群一樣,集群每台服務器都需要一個唯一的標識,zookeeper也一樣,需要一個myid文件,來標識每一台服務器!
[root@test1 data]# pwd /data/zookeeper/data [root@test1 data]# cat myid 1
#在每台集群的數據文件目錄下面創建myid文件,每個文件中寫入一個整數,這個整數用來指明當前服務器的ID。
#這個ID應該是需要和上面配置文件中的那個server.X中的X一致的。【遇到過不一致的情況,但是啟動失敗,修改為一致,就啟動成功】
另外兩台的myid文件如下:
[root@test2 data]# pwd /data/zookeeper/data [root@test2 data]# cat myid 2
[root@test3 data]# pwd
/data/zookeeper/data
[root@test3 data]# cat myid
3
然后啟動zookeeper集群:
#三台服務器都這樣啟動,會在當前目錄執行路徑下面生成nohup.out文件,若是報錯的話,可以查看文件內容!
[root@test3 bin]# pwd
/usr/local/zookeeper/bin
[root@test3 bin]# ./zkServer.sh start JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... STARTED
#需要注意的是,啟動完成之后一定要查看是否有進程,有些時候不會報錯,但是沒有啟動成功!
[root@test3 bin]# netstat -alntp | grep 2181 #查看監聽端口是否存在
tcp 0 0 :::2181 :::* LISTEN 27962/java
三台服務器全部啟動完成之后,進行如下驗證:
[root@test3 bin]# ./zkCli.sh --server10.0.102.204 Connecting to localhost:2181 2018-12-21 10:54:38,286 [myid:] - INFO [main:Environment@100] - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT 2018-12-21 10:54:38,289 [myid:] - INFO [main:Environment@100] - Client environment:host.name=test3 2018-12-21 10:54:38,289 [myid:] - INFO [main:Environment@100] - Client environment:java.version=1.7.0_79 2018-12-21 10:54:38,291 [myid:] - INFO [main:Environment@100] - Client environment:java.vendor=Oracle Corporation 2018-12-21 10:54:38,291 [myid:] - INFO [main:Environment@100] - Client environment:java.home=/usr/local/jdk/jdk1.7.0_79/jre 2018-12-21 10:54:38,291 [myid:] - INFO [main:Environment@100] - Client environment:java.class.path=/usr/local/zookeeper/bin/../build/classes:/usr/local/zookeeper/bin/../build/lib/*.jar:/usr/local/zookeeper/bin/../lib/slf4j-log4j12-1.6.1.jar:/usr/local/zookeeper/bin/../lib/slf4j-api-1.6.1.jar:/usr/local/zookeeper/bin/../lib/netty-3.7.0.Final.jar:/usr/local/zookeeper/bin/../lib/log4j-1.2.16.jar:/usr/local/zookeeper/bin/../lib/jline-0.9.94.jar:/usr/local/zookeeper/bin/../zookeeper-3.4.6.jar:/usr/local/zookeeper/bin/../src/java/lib/*.jar:/usr/local/zookeeper/bin/../conf: 2018-12-21 10:54:38,291 [myid:] - INFO [main:Environment@100] - Client environment:java.library.path=/usr/local/mysql/lib:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib 2018-12-21 10:54:38,291 [myid:] - INFO [main:Environment@100] - Client environment:java.io.tmpdir=/tmp 2018-12-21 10:54:38,292 [myid:] - INFO [main:Environment@100] - Client environment:java.compiler=<NA> 2018-12-21 10:54:38,292 [myid:] - INFO [main:Environment@100] - Client environment:os.name=Linux 2018-12-21 10:54:38,292 [myid:] - INFO [main:Environment@100] - Client environment:os.arch=amd64 2018-12-21 10:54:38,292 [myid:] - INFO [main:Environment@100] - Client environment:os.version=2.6.32-504.el6.x86_64 2018-12-21 10:54:38,292 [myid:] - INFO [main:Environment@100] - Client environment:user.name=root 2018-12-21 10:54:38,292 [myid:] - INFO [main:Environment@100] - Client environment:user.home=/root 2018-12-21 10:54:38,292 [myid:] - INFO [main:Environment@100] - Client environment:user.dir=/usr/local/zookeeper/bin 2018-12-21 10:54:38,293 [myid:] - INFO [main:ZooKeeper@438] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@1280d11b Welcome to ZooKeeper! 2018-12-21 10:54:38,315 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@975] - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) JLine support is enabled 2018-12-21 10:54:38,320 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@852] - Socket connection established to localhost/127.0.0.1:2181, initiating session 2018-12-21 10:54:38,330 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1235] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x367ce7db1fa0003, negotiated timeout = 30000 WATCHER:: WatchedEvent state:SyncConnected type:None path:null [zk: localhost:2181(CONNECTED) 0]
#若有如上顯示,則zookeeper集群搭建成功
搭建卡夫卡集群
仍然采用源碼的方式安裝。安裝過程和zookeeper差不多,解壓,修改配置文件,創建對應消息目錄。
[root@test3 src]# tar -zxvf kafka_2.10-0.10.0.0.tgz -C ../ #解壓 [root@test3 src]# cd .. [root@test3 local]# mv kafka_2.10-0.10.0.0 kafka #修改名字 [root@test3 local]# cd kafka/ [root@test3 kafka]# ls bin config libs LICENSE logs NOTICE site-docs [root@test3 kafka]# cd config/ # [root@test3 config]# ls #kafka的配置文件有很多,但是目前我們只需要關注 server.properties connect-console-sink.properties connect-file-sink.properties connect-standalone.properties producer.properties zookeeper.properties connect-console-source.properties connect-file-source.properties consumer.properties server.properties connect-distributed.properties connect-log4j.properties log4j.properties tools-log4j.properties
配置文件如下:把其中的注釋刪掉了!
broker.id=3 #在每個kafka服務器中唯一的,是一個整數 port=9092 host.name=10.0.102.179 auto.create.topics.enable = false delete.topic.enable = true num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/data/kafka/logs num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=10.0.102.179:2181,10.0.102.204:2181,10.0.102.214:2181 zookeeper.connection.timeout.ms=6000
配置文件的參數的說明:
- broker.id: 每一個broker都需要一個標識符,使用broker.id來表示。默認是0,可以被設置為任意整數,在kafka集群中必須是唯一的。
- port:默認端口是9092,配置kafka監聽的端口。
- zookeeper.connect:用於保存broker元數據的zookeeper地址是通過zookeeper.connect來指定的。新式就是上面那樣,主機和端口號,多個地址用逗號隔開。上面的三個是一個zookeeper集群。若是當前連接的zookeeper宕機,broker可以連接到zookeeper群組中的其他服務器上。
- log.dirs: kafka把所有消息都保存在磁盤上,存放這些日子片段的目錄是通過log.dirs指定的。它是一組用逗號分隔的本地文件系統路徑。如果指定了多個路徑,那么broker會根據“最少使用”原則,把同一個分區的日志片段保存到同一個路徑下。需要注意,borker會往擁有最少數目分區的路徑新增分區,而不是往擁有最小磁盤空間的路徑新增分區。
- num.recovery.threads.per.data.dir,對於如下3種情況,kafka會使用多少個線程來處理日志片段。
- 服務器正常啟動,用於打開每個分區的日志片段。
- 服務器崩潰后重啟,用於檢查和截短每個分區的日志片段。
- 服務器正常關閉,用於關閉日志片段。
默認情況下,每個日志目錄只使用一個線程。因為這些線程只是在服務器啟動和關閉時會用到,所以完全可以設置大量的線程來達到並行操作的目的。特別是對於包含大量分區的服務器來說,一旦發生崩潰,在進行恢復時使用並行操作可能會省下很多時間。所配置的數字對應的是log.dirs指定的單個日志目錄。譬如,這個值被設置為8,並且log.dirx指定了3個路徑,那么久需要24個線程。
- auto.create.topics.enable 默認情況下,kafka會在以下幾種情況下創建主題。
- 當一個生產者開始往主題寫入消息時。
- 當一個消費者開始從主題讀取消息時。
- 當任意一個客戶端向主題發送元數據請求是。
這個參數有點迷糊,盡量設置為false吧!
上面指定了kafa的日志目錄,需要創建消息目錄:
[root@test1 ~]# mkdir -p /data/kafka/logs
和zookeeper一樣,把上面解壓后的文件和創建的日志目錄,分別拷貝到其余兩台服務器上,注意修改其余兩台服務器的broker.id。
然后啟動kafka:
[root@test3 bin]# ls connect-distributed.sh kafka-consumer-groups.sh kafka-reassign-partitions.sh kafka-simple-consumer-shell.sh zookeeper-server-start.sh connect-standalone.sh kafka-consumer-offset-checker.sh kafka-replay-log-producer.sh kafka-topics.sh zookeeper-server-stop.sh kafka-acls.sh kafka-consumer-perf-test.sh kafka-replica-verification.sh kafka-verifiable-consumer.sh zookeeper-shell.sh kafka-configs.sh kafka-mirror-maker.sh kafka-run-class.sh kafka-verifiable-producer.sh kafka-console-consumer.sh kafka-preferred-replica-election.sh kafka-server-start.sh windows kafka-console-producer.sh kafka-producer-perf-test.sh kafka-server-stop.sh zookeeper-security-migration.sh
#bin目錄下面有很多腳本,kafka自帶的有zookeeper啟動,但是建議搭建獨立的zookeeper集群。
[root@test2 bin]# ./kafka-server-start.sh /usr/local/kafka/config/server.properties 1>/dev/null 2>&1 &
[1] 8757
[root@test2 bin]#
#kafka默認標准輸出到屏幕,上面的命令把kafka掛起在后台運行。
#啟動之后查看是否監聽了9092端口。
[root@test2 bin]# netstat -lntp |grep 9092
tcp 0 0 ::ffff:10.0.102.204:9092 :::* LISTEN 8757/java
[root@test2 bin]#
三台服務器全部啟動完畢,可以進行如下測試:
[root@test1 bin]# ./kafka-console-producer.sh --broker-list 10.0.102.179:9092 --topci science #創建一個主題 topci is not a recognized option [root@test1 bin]#
#創建一個生產者,發送消息
[root@test1 bin]# ./kafka-console-producer.sh --broker-list 10.0.102.179:9092 --topic science
test message
first
#創意一個消費者對象,接收生產者發送的消息
[root@test1 bin]# ./kafka-console-consumer.sh --zookeeper 10.0.102.179:2181 --topic science --from-beginning
test message
first
#可以看到卡夫卡是一個先進先出的消息隊列
若是以上的隊列消息驗證成功,則說明集群搭建成功、
[root@test1 bin]# ./kafka-topics.sh --zookeeper 10.0.102.179:2181 --describe --topic science #查看主題的信息 Topic:science PartitionCount:1 ReplicationFactor:1 Configs: Topic: science Partition: 0 Leader: 2 Replicas: 2 Isr: 2 [root@test1 bin]#
一個異常說明:在測試的時候,剛開始總報如下錯誤:
[root@test1 bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic lianxi test1 [2018-12-20 17:33:27,112] ERROR Error when sending message to topic lianxi with key: null, value: 5 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
這個問題搞了好久,最好把java的版本從1.8降到了1.7之后,然后就成功了!
搭建成功的各個組件的版本信息如下:
[root@test1 ~]# java -version java version "1.7.0_79" Java(TM) SE Runtime Environment (build 1.7.0_79-b15) Java HotSpot(TM) 64-Bit Server VM (build 24.79-b02, mixed mode)
#kafka沒有類似的version命令,需要查看kafka的使用包信息
[root@test1 libs]# pwd
/usr/local/kafka/libs
[root@test1 libs]# ls kafka_2.10-0.10.0.0.jar
kafka_2.10-0.10.0.0.jar
#2.10是scala的版本,后面的0.10.0.0是kafka的版本
#在zookeeper的安裝目錄下面有對應版本的一些包
[root@test1 zookeeper]# ls
bin CHANGES.txt contrib docs ivy.xml LICENSE.txt README_packaging.txt recipes zookeeper-3.4.6.jar zookeeper-3.4.6.jar.md5
build.xml conf dist-maven ivysettings.xml lib NOTICE.txt README.txt src zookeeper-3.4.6.jar.asc zookeeper-3.4.6.jar.sha1
[root@test1 zookeeper]#
#zookeeper的版本是3.4.6