Zookeeper+Kafka完全分布式實戰部署
作者:尹正傑
版權聲明:原創作品,謝絕轉載!否則將追究法律責任。
其實我之前部署過kafak和zookeeper的完全分布式,集群是可以正常使用沒錯,但是在調優方案我做的很少,本次部署模擬我實際生成環境中的kafka版本zookeeper的一些調優措施,以及一些腳本管理等。部署集群需要你自行安裝jdk,本篇博客就直接上干貨了。
關於本篇博客的測試版本視頻:鏈接:https://pan.baidu.com/s/1S3UqwTH05RKQOuQ9bwOFMg 提取碼:jsv3
關於kafka操作系統的優化,可參考:https://www.cnblogs.com/yinzhengjie/p/9993719.html
一.集群的調優方向
1>.調大zookeeper的heap內存,默認是1G,可以根據服務器大小配置其堆內存為2G或者4G足矣(kafka實時傳輸的數據如果達到PB級別的話,得觀察一下YGC和FGC的值可以適當再次調大);
2>.修改kafka的副本數,默認的副本數是1,建議修改為2,如果副本數為2,那么容災能力就是1,如果副本數3,則容災能力就是2,當然副本數越多,可能會導致集群的性能下降,但是可靠性更強,各有利弊,我這里推薦副本數為2;
3>.kafka推薦分區數,默認的分區數是1,理論上來說,parition的數量小於core的數量的話,值越大,kafka的吞吐量就越高,但是你必須得考慮你的磁盤IO的瓶頸,因此我不推薦你將分區數這只過大,我建議這個值大於broker的數量,比如我的集群broker的只有5台,我的集群的partition數量是20;
4>.kafka的heap內存,默認也是1G,生成環境中建議將它調大,不知道大家有沒有發現,你broker的heap內存不管有多的,它都能給你吃滿!我在生成環境中給kafka的heap內存是6G(kafka主要使用堆外內存,即大量使用操作系統的頁緩存,因此其並不需要分配太多的堆內存空間),zookeeper給的是2G,剩下的全部給操作系統預留着,否則你的機器會非常的卡頓;
5>.各種kafka配置文件調優,我這里就不一一贅述了,我在本文中已經有詳細的介紹。
二.部署zookeeper集群
1>.下載截止20181110日最新穩定版本版本的zookeeper-3.4.13.tar.gz(2018-07-16 )版本

[root@yinzhengjie zookeeper]# wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz --2018-11-10 00:45:07-- https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz Resolving mirrors.tuna.tsinghua.edu.cn (mirrors.tuna.tsinghua.edu.cn)... 101.6.8.193, 2402:f000:1:408:8100::1 Connecting to mirrors.tuna.tsinghua.edu.cn (mirrors.tuna.tsinghua.edu.cn)|101.6.8.193|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 37191810 (35M) [application/x-gzip] Saving to: ‘zookeeper-3.4.13.tar.gz’ 100%[====================================================================================================================================================================================================================================>] 37,191,810 81.5MB/s in 0.4s 2018-11-10 00:45:08 (81.5 MB/s) - ‘zookeeper-3.4.13.tar.gz’ saved [37191810/37191810] [root@yinzhengjie zookeeper]# [root@yinzhengjie zookeeper]# ll total 36324 -rw-r--r-- 1 root root 37191810 Jul 16 11:40 zookeeper-3.4.13.tar.gz [root@yinzhengjie zookeeper]#
2>.解壓zookeeper
[root@yinzhengjie zookeeper]# tar -zxf zookeeper-3.4.13.tar.gz -C /soft/
3>.創建軟連接
[root@yinzhengjie ~]# ln -s /soft/zookeeper-3.4.13/ /soft/zk
4>.創建配置zookeeper的堆內存配置文件
[root@yinzhengjie ~]# cat /soft/zk/conf/java.env #!/bin/bash #@author :yinzhengjie #blog:http://www.cnblogs.com/yinzhengjie #EMAIL:y1053419035@qq.com #指定JDK的安裝路徑 export JAVA_HOME=/soft/jdk #指定zookeeper的heap內存大小 export JVMFLAGS="-Xms2048m -Xmx2048m $JVMFLAGS" [root@yinzhengjie ~]#
5>.修改zookeeper的配置文件zoo.cfg(需要手動創建,或者從“ /soft/zk/conf/zoo_sample.cfg ”賦值一個模板即可)
[root@yinzhengjie ~]# cat /soft/zk/conf/zoo.cfg # 滴答,計時的基本單位,默認是2000毫秒,即2秒。它是zookeeper最小的時間單位,用於丈量心跳時間和超時時間等,通常設置成默認2秒即可。 tickTime=2000 # 初始化限制是10滴答,默認是10個滴答,即默認是20秒。指定follower節點初始化是鏈接leader節點的最大tick次數。 initLimit=5 # 數據同步的時間限制,默認是5個滴答,即默認時間是10秒。設定了follower節點與leader節點進行同步的最大時間。與initLimit類似,它也是以tickTime為單位進行指定的。 syncLimit=2 # 指定zookeeper的工作目錄,這是一個非常重要的參數,zookeeper會在內存中在內存只能中保存系統快照,並定期寫入該路徑指定的文件夾中。生產環境中需要注意該文件夾的磁盤占用情況。 dataDir=/home/yinzhengjie/zookeeper # 監聽zookeeper的默認端口。zookeeper監聽客戶端鏈接的端口,一般設置成默認2181即可。 clientPort=2181 # 這個操作將限制連接到 ZooKeeper 的客戶端的數量,限制並發連接的數量,它通過 IP 來區分不同的客戶端。此配置選項可以用來阻止某些類別的 Dos 攻擊。將它設置為 0 或者忽略而不進行設置將會取消對並發連接的限制。 #maxClientCnxns=60 # 在上文中已經提到,3.4.0及之后版本,ZK提供了自動清理事務日志和快照文件的功能,這個參數指定了清理頻率,單位是小時,需要配置一個1或更大的整數,默認是0,表示不開啟自動清理功能。 #autopurge.purgeInterval=1 # 這個參數和上面的參數搭配使用,這個參數指定了需要保留的文件數目。默認是保留3個。 #autopurge.snapRetainCount=3 #server.x=[hostname]:nnnnn[:nnnnn],這里的x是一個數字,與myid文件中的id是一致的。右邊可以配置兩個端口,第一個端口用於F和L之間的數據同步和其它通信,第二個端口用於Leader選舉過程中投票通信。 server.117=10.1.3.117:2888:3888 server.118=10.1.3.118:2888:3888 server.119=10.1.3.119:2888:3888 [root@yinzhengjie ~]#
6>.編寫zookeeper的啟動腳本
[root@yinzhengjie ~]# cat /usr/local/bin/xzk.sh #!/bin/bash #@author :yinzhengjie #blog:http://www.cnblogs.com/yinzhengjie #EMAIL:y1053419035@qq.com #判斷用戶是否傳參 if [ $# -ne 1 ];then echo "無效參數,用法為: $0 {start|stop|restart|status}" exit fi #獲取用戶輸入的命令 cmd=$1 #定義函數功能 function zookeeperManger(){ case $cmd in start) echo "啟動服務" remoteExecution start ;; stop) echo "停止服務" remoteExecution stop ;; restart) echo "重啟服務" remoteExecution restart ;; status) echo "查看狀態" remoteExecution status ;; *) echo "無效參數,用法為: $0 {start|stop|restart|status}" ;; esac } #定義執行的命令 function remoteExecution(){ for (( i=117 ; i<=119 ; i++ )) ; do tput setaf 2 echo ========== kafka${i}.aggrx zkServer.sh $1 ================ tput setaf 9 ssh kafka${i}.aggrx "source /etc/profile ; zkServer.sh $1" done } #調用函數 zookeeperManger [root@yinzhengjie ~]# [root@yinzhengjie ~]# chmod +x /usr/local/bin/xzk.sh [root@yinzhengjie ~]# [root@yinzhengjie ~]# ll /usr/local/bin/xzk.sh -rwxr-xr-x 1 root root 1101 Nov 7 10:53 /usr/local/bin/xzk.sh [root@yinzhengjie ~]#
8>.同步系統配置文件

[root@yinzhengjie ~]# cat /usr/local/bin/xrsync.sh #!/bin/bash #@author :yinzhengjie #blog:http://www.cnblogs.com/yinzhengjie #EMAIL:y1053419035@qq.com #判斷用戶是否傳參 if [ $# -lt 1 ];then echo "請輸入參數"; exit fi #獲取文件路徑 file=$@ #獲取子路徑 filename=`basename $file` #獲取父路徑 dirpath=`dirname $file` #獲取完整路徑 cd $dirpath fullpath=`pwd -P` #同步文件到Kafka集群 for (( i=116;i<=120;i++ )) do #使終端變綠色 tput setaf 2 echo =========== kafka${i}.aggrx : $file =========== #使終端變回原來的顏色,即白灰色 tput setaf 7 #遠程執行命令 rsync -lr $filename `whoami`@kafka${i}.aggrx:$fullpath #判斷命令是否執行成功 if [ $? == 0 ];then echo "命令執行成功" fi done [root@yinzhengjie ~]# [root@yinzhengjie ~]# [root@yinzhengjie ~]# ll /usr/local/bin/xrsync.sh -rwxr-xr-x 1 root root 771 Oct 13 20:12 /usr/local/bin/xrsync.sh [root@yinzhengjie ~]#
[root@yinzhengjie ~]# tail -3 /etc/profile #ADD Zookeeper PATH BY yinzhengjie ZOOKEEPER=/soft/zk PATH=$PATH:$ZOOKEEPER/bin [root@yinzhengjie ~]# [root@yinzhengjie ~]# xrsync.sh /etc/profile =========== yinzhengjie.aggrx : /etc/profile =========== 命令執行成功 =========== kafka117.aggrx : /etc/profile =========== 命令執行成功 =========== kafka118.aggrx : /etc/profile =========== 命令執行成功 =========== kafka119.aggrx : /etc/profile =========== 命令執行成功 =========== kafka120.aggrx : /etc/profile =========== 命令執行成功 [root@yinzhengjie ~]#
9>.將上述解壓的配置文件使用xrsync.sh同步到其它節點
注意,接下來需要在“/home/yinzhengjie/zookeeper/”目錄中創建一個myid,並寫入配置文件。也可以使用一個shell循環搞定,僅供參考。
[root@yinzhengjie ~]# for (( i=116;i<=119;i++ )) do ssh kafka${i}.aggrx "echo -n $i > /home/yinzhengjie/zookeeper/myid" ;done
10>.啟動zookeeper並查看狀態
[root@yinzhengjie ~]# xzk.sh status #這是查看zookeeper的狀態,如果是啟動,或停止zookeeper,直接調用start或者stop方法即可 查看狀態 ========== kafka117.aggrx zkServer.sh status ================ ZooKeeper JMX enabled by default Using config: /soft/zk/bin/../conf/zoo.cfg Mode: follower ========== kafka118.aggrx zkServer.sh status ================ ZooKeeper JMX enabled by default Using config: /soft/zk/bin/../conf/zoo.cfg Mode: leader #很顯然,該節點為zookeeper節點。 ========== kafka119.aggrx zkServer.sh status ================ ZooKeeper JMX enabled by default Using config: /soft/zk/bin/../conf/zoo.cfg Mode: follower [root@yinzhengjie ~]#
三.部署kafka集群
1>.官網下載kafka(kafka_2.11-0.10.2.1.tgz)

[root@yinzhengjie ~]# wget https://archive.apache.org/dist/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz --2018-11-10 01:21:08-- https://archive.apache.org/dist/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz Resolving archive.apache.org (archive.apache.org)... 163.172.17.199 Connecting to archive.apache.org (archive.apache.org)|163.172.17.199|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 37664956 (36M) [application/x-gzip] Saving to: ‘kafka_2.11-0.10.2.1.tgz’ 100%[====================================================================================================================================================================================================================================>] 37,664,956 228KB/s in 2m 58s 2018-11-10 01:24:07 (207 KB/s) - ‘kafka_2.11-0.10.2.1.tgz’ saved [37664956/37664956] [root@yinzhengjie ~]# [root@yinzhengjie ~]# ll total 36784 -rw-r--r-- 1 root root 37664956 Apr 27 2017 kafka_2.11-0.10.2.1.tgz [root@yinzhengjie ~]#
2>.解壓kafka
[root@yinzhengjie ~]# tar -zxf kafka_2.11-0.10.2.1.tgz -C /soft/
3>.創建軟連接
[root@yinzhengjie ~]# ln -s /soft/kafka_2.11-0.10.2.1/ /soft/kafka
4>.修改kafka的配置文件(server.properties)
[root@yinzhengjie ~]# cat /soft/kafka/config/server.properties | grep -v ^# | grep -v ^$ broker.id=116 delete.topic.enable=true auto.create.topics.enable=false port=9092 host.name=10.1.3.116 num.network.threads=30 num.io.threads=30 socket.send.buffer.bytes=5242880 socket.receive.buffer.bytes=5242880 socket.request.max.bytes=104857600 queued.max.requests=1000 log.dirs=/home/yinzhengjie/kafka/logs,/home/yinzhengjie/kafka/logs2,/home/yinzhengjie/kafka/log3 num.partitions=20 num.recovery.threads.per.data.dir=1 default.replication.factor=2 message.max.bytes=104857600 log.retention.hours=168 log.segment.bytes=536870912 log.retention.check.interval.ms=600000 zookeeper.connect=10.1.3.117:2181,10.1.3.118:2181,10.1.3.119:2181 zookeeper.session.timeout.ms=180000 zookeeper.connection.timeout.ms=6000 max.request.size=104857600 fetch.message.max.bytes=104857600 replica.fetch.max.bytes=104857600 replica.fetch.wait.max.ms=2000 unclean.leader.election=false num.replica.fetchers=5 [root@yinzhengjie ~]#

[root@yinzhengjie ~]# cat /soft/kafka/config/server.properties # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# #每一個broker在集群中的唯一表示,要求是正數。當該服務器的IP地址發生改變時,broker.id沒有變化,則不會影響consumers的消息情況 broker.id=116 #這就是說,這條命令其實並不執行刪除動作,僅僅是在zookeeper上標記該topic要被刪除而已,同時也提醒用戶一定要提前打開delete.topic.enable開關,否則刪除動作是不會執行的。 delete.topic.enable=true #是否允許自動創建topic,若是false,就需要通過命令創建topic auto.create.topics.enable=false ############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #Socket服務器偵聽的地址。如果沒有配置,它將獲得從Java.NET.InAddio.GETCANONICALITHAMEMENE()返回的值 #listeners=PLAINTEXT://10.1.3.116:9092 #broker server服務端口 port=9092 #broker的主機地址,若是設置了,那么會綁定到這個地址上,若是沒有,會綁定到所有的接口上,並將其中之一發送到ZK,一般不設置 host.name=10.1.3.116 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #kafka 0.9.x以后的版本新增了advertised.listeners配置,kafka 0.9.x以后的版本不要使用 advertised.host.name 和 advertised.host.port 已經deprecated.如果配置的話,它使用 "listeners" 的值。否則,它將使用從java.net.InetAddress.getCanonicalHostName()返回的值。 #advertised.listeners=PLAINTEXT://your.host.name:9092 #將偵聽器(listener)名稱映射到安全協議,默認情況下它們是相同的。有關詳細信息,請參閱配置文檔。 #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL #處理網絡請求的最大線程數 num.network.threads=30 #處理磁盤I/O的線程數 num.io.threads=30 #套接字服務器使用的發送緩沖區(SOYSNDBUF) socket.send.buffer.bytes=5242880 #套接字服務器使用的接收緩沖區(SOYRCVBUF) socket.receive.buffer.bytes=5242880 #套接字服務器將接受的請求的最大大小(對OOM的保護) socket.request.max.bytes=104857600 #I/O線程等待隊列中的最大的請求數,超過這個數量,network線程就不會再接收一個新的請求。應該是一種自我保護機制。 queued.max.requests=1000 ############################# Log Basics ############################# #日志存放目錄,多個目錄使用逗號分割,如果你有多塊磁盤,建議配置成多個目錄,從而達到I/O的效率的提升。 log.dirs=/home/yinzhengjie/kafka/logs,/home/yinzhengjie/kafka/logs2,/home/yinzhengjie/kafka/logs3 #每個topic的分區個數,若是在topic創建時候沒有指定的話會被topic創建時的指定參數覆蓋 num.partitions=20 #在啟動時恢復日志和關閉時刷盤日志時每個數據目錄的線程的數量,默認1 num.recovery.threads.per.data.dir=1 # 默認副本數 default.replication.factor=2 #服務器接受單個消息的最大大小,即消息體的最大大小,單位是字節 message.max.bytes=104857600 # 自動負載均衡,如果設為true,復制控制器會周期性的自動嘗試,為所有的broker的每個partition平衡leadership,為更優先(preferred)的replica分配leadership。 # auto.leader.rebalance.enable=false ############################# Log Flush Policy ############################# #在強制fsync一個partition的log文件之前暫存的消息數量。調低這個值會更頻繁的sync數據到磁盤,影響性能。通常建議人家使用replication來確保持久性,而不是依靠單機上的fsync,但是這可以帶來更多的可靠性,默認10000。 #log.flush.interval.messages=10000 #2次fsync調用之間最大的時間間隔,單位為ms。即使log.flush.interval.messages沒有達到,只要這個時間到了也需要調用fsync。默認3000ms. #log.flush.interval.ms=10000 ############################# Log Retention Policy ############################# # 日志保存時間 (hours|minutes),默認為7天(168小時)。超過這個時間會根據policy處理數據。bytes和minutes無論哪個先達到都會觸發。 log.retention.hours=168 #日志數據存儲的最大字節數。超過這個時間會根據policy處理數據。 #log.retention.bytes=1073741824 #控制日志segment文件的大小,超出該大小則追加到一個新的日志segment文件中(-1表示沒有限制) log.segment.bytes=536870912 # 當達到下面時間,會強制新建一個segment #log.roll.hours = 24*7 # 日志片段文件的檢查周期,查看它們是否達到了刪除策略的設置(log.retention.hours或log.retention.bytes) log.retention.check.interval.ms=600000 #是否開啟壓縮 #log.cleaner.enable=false #日志清理策略選擇有:delete和compact主要針對過期數據的處理,或是日志文件達到限制的額度,會被 topic創建時的指定參數覆蓋 #log.cleanup.policy=delete # 日志壓縮運行的線程數 #log.cleaner.threads=2 # 壓縮的日志保留的最長時間 #log.cleaner.delete.retention.ms=3600000 ############################# Zookeeper ############################# #zookeeper集群的地址,可以是多個,多個之間用逗號分割. zookeeper.connect=10.1.3.117:2181,10.1.3.118:2181,10.1.3.119:2181 #ZooKeeper的最大超時時間,就是心跳的間隔,若是沒有反映,那么認為已經死了,不易過大 zookeeper.session.timeout.ms=180000 #指定多久消費者更新offset到zookeeper中。注意offset更新時基於time而不是每次獲得的消息。一旦在更新zookeeper發生異常並重啟,將可能拿到已拿到過的消息,連接zk的超時時間 zookeeper.connection.timeout.ms=6000 #請求的最大大小為字節,請求的最大字節數。這也是對最大記錄尺寸的有效覆蓋。注意:server具有自己對消息記錄尺寸的覆蓋,這些尺寸和這個設置不同。此項設置將會限制producer每次批量發送請求的數目,以防發出巨量的請求。 max.request.size=104857600 #每次fetch請求中,針對每次fetch消息的最大字節數。這些字節將會督導用於每個partition的內存中,因此,此設置將會控制consumer所使用的memory大小。這個fetch請求尺寸必須至少和server允許的最大消息尺寸相等,否則,producer可能發送的消息尺寸大於consumer所能消耗的尺寸。 fetch.message.max.bytes=104857600 #ZooKeeper集群中leader和follower之間的同步時間,換句話說:一個ZK follower能落后leader多久。 #zookeeper.sync.time.ms=2000 ############################# Replica Basics ############################# # leader接收follower的"fetch請求"的超時時間,默認是10秒。 # replica.lag.time.max.ms=30000 # 如果relicas落后太多,將會認為此partition relicas已經失效。而一般情況下,因為網絡延遲等原因,總會導致replicas中消息同步滯后。如果消息嚴重滯后,leader將認為此relicas網絡延遲較大或者消息吞吐能力有限。在broker數量較少,或者網絡不足的環境中,建議提高此值.follower落后於leader的最大message數,這個參數是broker全局的。設置太大 了,影響真正“落后”follower的移除;設置的太小了,導致follower的頻繁進出。無法給定一個合適的replica.lag.max.messages的值,因此不推薦使用,據說新版本的Kafka移除了這個參數。 #replica.lag.max.messages=4000 # follower與leader之間的socket超時時間 #replica.socket.timeout.ms=30000 # follower每次fetch數據的最大尺寸 replica.fetch.max.bytes=104857600 # follower的fetch請求超時重發時間 replica.fetch.wait.max.ms=2000 # fetch的最小數據尺寸 #replica.fetch.min.bytes=1 # 是否允許控制器關閉broker ,默認值為true,它會關閉所有在這個broker上的leader,並轉移到其他broker,建議啟用,增加集群穩定性。 # controlled.shutdown.enable = false #0.11.0.0版本開始unclean.leader.election.enable參數的默認值由原來的true改為false,可以關閉unclean leader election,也就是不在ISR(IN-Sync Replica)列表中的replica,不會被提升為新的leader partition。kafka集群的持久化力大於可用性,如果ISR中沒有其它的replica,會導致這個partition不能讀寫。 unclean.leader.election=false # follower中開啟的fetcher線程數, 同步速度與系統負載均衡 num.replica.fetchers=5 # partition leader與replicas之間通訊時,socket的超時時間 #controller.socket.timeout.ms=30000 # partition leader與replicas數據同步時,消息的隊列尺寸. #controller.message.queue.size=10 #指定將使用哪個版本的 inter-broker 協議。 在所有經紀人升級到新版本之后,這通常會受到沖擊。升級時要設置 #inter.broker.protocol.version=0.10.1 #指定broker將用於將消息添加到日志文件的消息格式版本。 該值應該是有效的ApiVersion。 一些例子是:0.8.2,0.9.0.0,0.10.0。 通過設置特定的消息格式版本,用戶保證磁盤上的所有現有消息都小於或等於指定的版本。 不正確地設置這個值將導致使用舊版本的用戶出錯,因為他們將接收到他們不理解的格式的消息。 #log.message.format.version=0.10.1 [root@yinzhengjie ~]#
5>.修改kafka的啟動腳本
[root@yinzhengjie ~]# cat /soft/kafka/bin/kafka-server-start.sh #!/bin/bash # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. if [ $# -lt 1 ]; then echo "USAGE: $0 [-daemon] server.properties [--override property=value]*" exit 1 fi base_dir=$(dirname $0) if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties" fi if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then #在這里指定堆內存為20G export KAFKA_HEAP_OPTS="-Xmx20G -Xms20G" fi EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'} COMMAND=$1 case $COMMAND in -daemon) EXTRA_ARGS="-daemon "$EXTRA_ARGS shift ;; *) ;; esac exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@" [root@yinzhengjie ~]#
6>.編寫kafka啟動腳本
[root@yinzhengjie ~]# cat /usr/local/bin/xkafka.sh #!/bin/bash #@author :yinzhengjie #blog:http://www.cnblogs.com/yinzhengjie #EMAIL:y1053419035@qq.com #判斷用戶是否傳參 if [ $# -ne 1 ];then echo "無效參數,用法為: $0 {start|stop}" exit fi #獲取用戶輸入的命令 cmd=$1 for (( i=116 ; i<=120 ; i++ )) ; do tput setaf 2 echo ========== kafka${i}.aggrx $cmd ================ tput setaf 9 case $cmd in start) ssh kafka${i}.aggrx "source /etc/profile ; nohup kafka-server-start.sh /soft/kafka/config/server.properties >> /home/yinzhengjie/kafka/console/kafka-`date +%F`.log &" #ssh kafka${i}.aggrx "source /etc/profile ; kafka-server-start.sh -daemon /soft/kafka/config/server.properties" echo kafka${i}.aggrx "服務已啟動" ;; stop) ssh kafka${i}.aggrx "source /etc/profile ; kafka-server-stop.sh" echo kafka${i}.aggrx "服務已停止" ;; *) echo "無效參數,用法為: $0 {start|stop}" exit ;; esac done [root@yinzhengjie ~]#
7>.查看各個服務器的啟動進程

[root@yinzhengjie ~]# cat /usr/local/bin/xcall.sh #!/bin/bash #@author :yinzhengjie #blog:http://www.cnblogs.com/yinzhengjie #EMAIL:y1053419035@qq.com #判斷用戶是否傳參 if [ $# -lt 1 ];then echo "請輸入參數" exit fi #獲取用戶輸入的命令 cmd=$@ #Kafka集群批量執行命令 for (( i=116;i<=120;i++ )) do #使終端變綠色 tput setaf 2 echo =========== kafka${i}.aggrx : $cmd =========== #使終端變回原來的顏色,即白灰色 tput setaf 7 #遠程執行命令 ssh kafka${i}.aggrx $cmd #判斷命令是否執行成功 if [ $? == 0 ];then echo "命令執行成功" fi done [root@yinzhengjie ~]#
[root@yinzhengjie ~]# xcall.sh jps =========== yinzhengjie.aggrx : jps =========== 934 Jps 9929 Kafka 10746 ProdServerStart 命令執行成功 =========== kafka117.aggrx : jps =========== 953 Jps 8236 Kafka 4735 QuorumPeerMain 命令執行成功 =========== kafka118.aggrx : jps =========== 4616 QuorumPeerMain 2425 Jps 8382 Kafka 命令執行成功 =========== kafka119.aggrx : jps =========== 23953 Jps 4763 QuorumPeerMain 8079 Kafka 命令執行成功 =========== kafka120.aggrx : jps =========== 26196 Jps 8143 Kafka 命令執行成功 [root@yinzhengjie ~]#