一、單節點環境
部署kafka,需要先部署JDK與zookeeper
1、單節點zookeeper
官網下載zookeeper最新版apache-zookeeper-3.5.7-bin.tar.gz
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz cd apache-zookeeper-3.5.7-bin/conf/ cp zoo_sample.cfg zoo.cfg vim zoo.cfg //指定跟節點路徑dataDir bin/zkServer.sh start //STARTED啟動成功 telnet 127.0.0.1 2181 //測試
2、單節點kafka
官網下載kafka最新版kafka_2.11-2.4.0.tgz
tar -zxvf kafka_2.11-2.4.0.tgz cd kafka_2.11-2.4.0/config/ vim server.properties //修改logs路徑 bin/kafka-server-start.sh ./../config/server.properties //...started啟動成功
二、多節點環境
1、多節點zookeeper集群
多節點配置文件zoo.cfg
tickTime=2000 //最小時間單位,測量心跳時間、超時等 initLimit=10 //follower初始時連接leader節點的最大tick次數 syncLimit=5 //follower節點與leader節點進行同步的最大時間 dataDir=/home/wq/zookeeper/cluster/2191 //內存快照持久化另外兩個2192、2193 clientPort=2191 //監聽端口 server.1=mcip:2888:3888 //serverX=host:port1:port2 server.2=mcip:2889:3889 //X:zookeeper的myid,host:zookeeper主機,port1:followe節點連接leader節點,port2:leader選舉 server.3=mcip:2890:3890 //這里是同一虛擬機,所以指定port不同,多台實體服務器建議設置port一致。
小技巧:由於連接WiFi,虛擬機ip不停的在變,修改一下/etc/hosts文件,所有中間件的配置文件設置本機ip時,用mcip替換。換wifi了,修改一下hosts文件即可。
在各自dataDir下創建myid文件內容與上面server.X中X保持一致
touch myid
vim myid
另外由於java版本也可通過jps -m查看啟動結果
cluster.sh腳本
#!bin/bash /usr/etc/apache-zookeeper-3.5.7-bin/bin/zkServer.sh $1 /home/wq/zookeeper/cluster/2191/zoo.cfg /usr/etc/apache-zookeeper-3.5.7-bin/bin/zkServer.sh $1 /home/wq/zookeeper/cluster/2192/zoo.cfg /usr/etc/apache-zookeeper-3.5.7-bin/bin/zkServer.sh $1 /home/wq/zookeeper/cluster/2193/zoo.cfg
2、多節點kafka集群
多節點配置文件server.properties
broker.id=3 //需要設置不同1/2/3 listeners=PLAINTEXT://mcip:9093 //9091/9092/9093 log.dirs=/home/wq/kafka/cluster/logs_9093 //日志路徑文件夾必須為空且三個節點不能一樣logs_9091/logs_9092/logs_9093,是kafka持久化消息的目錄 zookeeper.connect=mcip:2191,mcip:2192,mcip:2193 zookeeper.connection.timeout.ms=6000
3、測試topic創建與刪除
kafka-topics.sh腳本:topic創建、查看、刪除
/usr/etc/kafka_2.11-2.4.0/bin/kafka-topics.sh --zookeeper mcip:2191,mcip:2192,mcip:2193 --create --topic topic-test --partitions 3 --replication-factor 3 /usr/etc/kafka_2.11-2.4.0/bin/kafka-topics.sh --zookeeper mcip:2191,mcip:2192,mcip:2193 --list topic-test /usr/etc/kafka_2.11-2.4.0/bin/kafka-topics.sh --zookeeper mcip:2191,mcip:2192,mcip:2193 --describe --topic topic-test /usr/etc/kafka_2.11-2.4.0/bin/kafka-topics.sh --zookeeper mcip:2191,mcip:2192,mcip:2193 --delete --topic topic-test
4、測試消息發送和消費
kafka-console-producer腳本和kafka-console-consumer腳本
//需要開兩個終端 /usr/etc/kafka_2.11-2.4.0/bin/kafka-console-producer.sh --broker-list mcip:9091,mcip:9092,mcip:9093 --topic topic-test /usr/etc/kafka_2.11-2.4.0/bin/kafka-console-consumer.sh --bootstrap-server mcip:9091,mcip:9092,mcip:9093 --topic topic-test --from-beginning
5、創建cluster.sh腳本,
#!bin/bash if [ "$1" = "start" ] ;then /usr/etc/kafka_2.11-2.4.0/bin/kafka-server-start.sh -daemon /home/wq/kafka/cluster/conf/server1.properties /usr/etc/kafka_2.11-2.4.0/bin/kafka-server-start.sh -daemon /home/wq/kafka/cluster/conf/server2.properties /usr/etc/kafka_2.11-2.4.0/bin/kafka-server-start.sh -daemon /home/wq/kafka/cluster/conf/server3.properties elif [ "$1" = "stop" ] ;then /usr/etc/kafka_2.11-2.4.0/bin/kafka-server-stop.sh /home/wq/kafka/cluster/conf/server1.properties /usr/etc/kafka_2.11-2.4.0/bin/kafka-server-stop.sh /home/wq/kafka/cluster/conf/server2.properties /usr/etc/kafka_2.11-2.4.0/bin/kafka-server-stop.sh /home/wq/kafka/cluster/conf/server3.properties elif [ "$1" = "topic" ] ;then if [ "$2" = "create" ] ;then /usr/etc/kafka_2.11-2.4.0/bin/kafka-topics.sh --zookeeper mcip:2191,mcip:2192,mcip:2193 --$2 --topic $3 --partitions $4 --replication-factor $5 elif [ "$2" = "list" ] ;then /usr/etc/kafka_2.11-2.4.0/bin/kafka-topics.sh --zookeeper mcip:2191,mcip:2192,mcip:2193 --$2 $3 elif [ "$2" = "describe" ] ;then /usr/etc/kafka_2.11-2.4.0/bin/kafka-topics.sh --zookeeper mcip:2191,mcip:2192,mcip:2193 --$2 --topic $3 elif [ "$2" = "delete" ] ;then /usr/etc/kafka_2.11-2.4.0/bin/kafka-topics.sh --zookeeper mcip:2191,mcip:2192,mcip:2193 --$2 --topic $3 else echo "should cluster.sh <topic> <create,list,describe,delete> <topic-name> <partitionsNum>" fi elif [ "$1" = "producer" ] ;then /usr/etc/kafka_2.11-2.4.0/bin/kafka-console-producer.sh --broker-list mcip:9091,mcip:9092,mcip:9093 --topic $2 elif [ "$1" = "consumer" ] ;then /usr/etc/kafka_2.11-2.4.0/bin/kafka-console-consumer.sh --bootstrap-server mcip:9091,mcip:9092,mcip:9093 --topic $2 --from-beginning else echo "should cluster.sh <start,stop,topic,producer,consumer>" fi
集群結構圖
① 從上面kafka啟動配置文件server.properties,可以看出kafka啟動是不知道其他kafka服務的,通過注冊
ZooKeeper來完成kafka集群,由Zookeeper來同一管理各節點狀態及通信。伸縮性強,新增kafka服務僅需要注冊到ZooKeeper中就行了。
②關於ZooKeeper集群采用的leader-follower模型,半數以上服務正常運行,則集群正常提供服務。所以經常采用奇數個服務組成集群。
例如:5個ZooKeeper服務組成集群,至少需要三個follower服務器才能選舉出leader,為保證集群正常,允許宕機2個
4個ZooKeeper服務組成集群,至少需要三個follower服務器才能選舉出leader,為保證集群正常,允許宕機1個
6個ZooKeeper服務組成集群,至少需要四個follower服務器才能選舉出leader,為保證集群正常,允許宕機2個
leader-follower模型:僅leader對外服務,follower作用僅僅是同步leader數據和當leader宕機時,重新選舉leader。
5個相比於4個,多允許宕機數,集群穩健性提升。
5個相比於6個,集群效果是一樣的,但是節省了一個服務。