Centos7安裝zookeeper+kafka集群
1 Zookeeper和kafka簡介
1) ZooKeeper
是一個分布式的、分層級的文件系統,能促進客戶端間的松耦合,並提供最終一致的,用於管理、協調Kafka代理,zookeeper集群中一台服務器作為Leader,其它作為Follower
2) Apache Kafka
是分布式發布-訂閱消息系統,kafka對消息保存時根據Topic進行歸類,每個topic將被分成多個partition(區),每條消息在文件中的位置稱為offset(偏移量),offset為一個long型數字,它是唯一標記一條消息,它唯一的標記一條消息。
一個partition中的消息只會被group中的一個consumer消費,每個group中consumer消息消費互相獨立,不過一個consumer可以消費多個partitions中的消息。
kafka只能保證一個partition中的消息被某個consumer消費時,消息是順序的。從Topic角度來說,消息仍不是有序的。
每個partition都有一個server為"leader";leader負責所有的讀寫操作,如果leader失效,那么將會有其他follower來接管成為新的leader(有zookeeper選舉);follower只是單調的和leader跟進,同步消息即可..由此可見作為leader的server承載了全部的請求壓力,因此從集群的整體考慮,有多少個partitions就意味着有多少個"leader",kafka會將"leader"均衡的分散在每個實例上,來確保整體的性能穩定.
2 服務器地址
使用3台服務器搭建集群
Server1:192.168.89.11
Server2:192.168.89.12
Server3:192.168.89.13
3 安裝jdk
(jdk1.8.0_102)
# yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
4 搭建zookeeper集群
Zookeeper集群的工作是超過半數才能對外提供服務,所以選擇主機數可以是1台、3台、5台…。
3台中允許1台掛掉 ,是否可以用偶數,其實沒必要。
如果有4台,那么掛掉一台還剩下三台服務器,如果再掛掉一個就不行了,記住是超過半數。
4.1 下載地址
http://mirrors.shu.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz
4.2 所有節點安裝zookeeper
1)創建zookeeper安裝目錄
# mkdir -p /data/zookeeper
2)將zookeeper解壓到安裝目錄
# tar -zxvf zookeeper-3.4.13.tar.gz -C /data/zookeeper/
3)新建保存數據的目錄
# mkdir -p /data/zookeeper/zookeeper-3.4.13/data
4)新建日志目錄
# mkdir -p /data/zookeeper/zookeeper-3.4.13/dataLog
5)配置環境變量並刷新
# vim /etc/profile
===================================================
export ZK_HOME=/data/zookeeper/zookeeper-3.4.13
export PATH=$PATH:$ZK_HOME/bin
===================================================
# source /etc/profile
4.3 所有節點配置zookeeper配置文件
4.3.1 各節點中配置
# cd /data/zookeeper/zookeeper-3.4.13/conf/
# cp -f zoo_sample.cfg zoo.cfg
# vim zoo.cfg
====================================================
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/zookeeper-3.4.13/data/
dataLogDir=/data/zookeeper/zookeeper-3.4.13/dataLog/
clientPort=2181
server.1=192.168.89.11:2888:3888
server.2=192.168.89.12:2888:3888
server.3=192.168.89.13:2888:3888
#第一個端口是master和slave之間的通信端口,默認是2888,第二個端口是leader選舉的端口,集群剛啟動的時候選舉或者leader掛掉之后進行新的選舉的端口默認是3888
====================================================
# echo "1" > /data/zookeeper/zookeeper-3.4.13/data/myid #server1配置,各節點不同,跟上面配置server.1的號碼一樣
# echo "2" > /data/zookeeper/zookeeper-3.4.13/data/myid #server2配置,各節點不同,跟上面配置server.2的號碼一樣
# echo "3" > /data/zookeeper/zookeeper-3.4.13/data/myid #server3配置,各節點不同,跟上面配置server.3的號碼一樣
4.3.2 啟動停止zookeeper命令並設置開機啟動
1)啟動停止zookeeper命令
# zkServer.sh start #啟動
# zkServer.sh stop #停止
# zkCli.sh #連接集群
2)設置開機啟動
# cd /usr/lib/systemd/system
# vim zookeeper.service
=========================================
[Unit]
Description=zookeeper server daemon
After=zookeeper.target
[Service]
Type=forking
ExecStart=/data/zookeeper/zookeeper-3.4.13/bin/zkServer.sh start
ExecReload=/data/zookeeper/zookeeper-3.4.13/bin/zkServer.sh stop && sleep 2 && /data/zookeeper/zookeeper-3.4.13/bin/zkServer.sh start
ExecStop=/data/zookeeper/zookeeper-3.4.13/bin/zkServer.sh stop
Restart=always
[Install]
WantedBy=multi-user.target
=======================================================
# systemctl start zookeeper
# systemctl enable zookeeper
5 搭建kafka集群
5.1 下載地址
http://mirror.bit.edu.cn/apache/kafka/2.1.0/kafka_2.12-2.1.0.tgz
5.2 所有節點上搭建kafka
1) 新建kafka工作目錄
# mkdir -p /data/kafka
2) 解壓kafka
# tar -zxvf kafka_2.12-2.1.0.tgz -C /data/kafka/
3) 新建kafka日志目錄
# mkdir -p /data/kafka/kafkalogs
4) 配置kafka配置文件
# vim /data/kafka/kafka_2.12-2.1.0/config/server.properties
=================================================
broker.id=1 #每一個broker在集群中的唯一標示,要求是正數
listeners=PLAINTEXT://192.168.89.11:9092 # 套接字服務器連接的地址
log.dirs=/data/kafka/kafkalogs/ #kafka數據的存放地址
message.max.byte=5242880 #消息體的最大大小,單位是字節
log.cleaner.enable=true #開啟日志清理
log.retention.hours=72 #segment文件保留的最長時間(小時),超時將被刪除,也就是說3天之前的數據將被清理掉
log.segment.bytes=1073741824 #日志文件中每個segmeng的大小(字節),默認為1G
log.retention.check.interval.ms=300000 #定期檢查segment文件有沒有達到1G(單位毫秒)
num.partitions=3 #每個topic的分區個數, 更多的分區允許更大的並行操作default.replication.factor=3 # 一個topic ,默認分區的replication個數 ,不得大於集群中broker的個數
delete.topic.enable=true # 選擇啟用刪除主題功能,默認false
replica.fetch.max.bytes=5242880 # replicas每次獲取數據的最大大小
#以下三個參數設置影響消費者消費分區可以連接的kafka主機,詳細請看第6點附錄
offsets.topic.replication.factor=3 #Offsets topic的復制因子(備份數)
transaction.state.log.replication.factor=3 #事務主題的復制因子(設置更高以確保可用性)
transaction.state.log.min.isr=3 #覆蓋事務主題的min.insync.replicas配置
zookeeper.connect=192.168.89.11:2181,192.168.89.12:2181,192.168.89.13:2181
#zookeeper集群的地址,可以是多個
=================================================
5) kafka節點默認需要的內存為1G,如果需要修改內存,可以修改kafka-server-start.sh的配置項
# vim /data/kafka/kafka_2.12-2.1.0/bin/kafka-server-start.sh
#找到KAFKA_HEAP_OPTS配置項,例如修改如下:
export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"
5.3 啟動kafka並設置開機啟動
1)啟動kafka
# cd /data/kafka/kafka_2.12-2.1.0/
./bin/kafka-server-start.sh -daemon ./config/server.properties
啟動后可以執行jps命令查看kafka是否啟動,如果啟動失敗,可以進入logs目錄,查看kafkaServer.out日志記錄。
2)設置開機啟動
# cd /usr/lib/systemd/system
# vim kafka.service
=========================================
[Unit]
Description=kafka server daemon
After=kafka.target
[Service]
Type=forking
ExecStart=/data/kafka/kafka_2.12-2.1.0/bin/kafka-server-start.sh -daemon /data/kafka/kafka_2.12-2.1.0/config/server.properties
ExecReload=/data/kafka/kafka_2.12-2.1.0/bin/kafka-server-stop.sh && sleep 2 && /data/kafka/kafka_2.12-2.1.0/bin/kafka-server-start.sh -daemon /
data/kafka/kafka_2.12-2.1.0/config/server.properties
ExecStop=/data/kafka/kafka_2.12-2.1.0/bin/kafka-server-stop.sh
Restart=always
[Install]
WantedBy=multi-user.target
=======================================================
# systemctl start kafka
# systemctl enable kafka
5.4 創建topic
創建3分區、3備份
# cd /data/kafka/kafka_2.12-2.1.0/
#./bin/kafka-topics.sh --create --zookeeper 192.168.89.11:2181,192.168.89.12:2181,192.168.89.13:2181 --replication-factor 3 --partitions 3 --topic SyslogTopic
5.5 常用命令
# cd /data/kafka/kafka_2.12-2.1.0/
1) 停止kafka
./bin/kafka-server-stop.sh
2) 創建topic
./bin/kafka-topics.sh --create --zookeeper 192.168.89.11:2181,192.168.89.12:2181,192.168.89.13:2181 --replication-factor 1 --partitions 1 --topic topic_name
3) 展示topic
./bin/kafka-topics.sh --list --zookeeper 192.168.89.11:2181,192.168.89.12:2181,192.168.89.13:2181
4) 查看描述topic
./bin/kafka-topics.sh --describe --zookeeper 192.168.89.11:2181,192.168.89.12:2181,192.168.89.13:2181 --topic topic_name
5) 生產者發送消息
./bin/kafka-console-producer.sh --broker-list 192.168.89.11:9092 --topic topic_name
6) 消費者消費消息
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.89.11:9092,192.168.89.12:9092,192.168.89.13:9092 --topic topic_name
7) 刪除topic
./bin/kafka-topics.sh --delete --topictopic_name --zookeeper 192.168.89.11:2181,192.168.89.12:2181,192.168.89.13:2181
8) 查看每分區consumer_offsets(可以連接到的消費主機)
./bin/kafka-topics.sh --describe --zookeeper 192.168.89.11:2181,192.168.89.12:2181,192.168.89.13:2181 --topic __consumer_offsets
6 附錄
1、 消費者消費分區數據,kafka的負載均衡
在/data/kafka/kafka_2.12-2.1.0/config/server.properties文件中修改offsets.topic.replication.factor,這個值為kafka集群的主機數量(__consumer_offest不受server.properties中num.partitions和default.replication.factor參數的制約。相反地,它的分區數和備份因子分別由offsets.topic.num.partitions和offsets.topic.replication.factor參數決定。這兩個參數的默認值分別是50和1,表示該topic有50個分區,副本因子是1。),設置正確如下圖:
(執行命令:bin/kafka-topics.sh --describe --zookeeper 192.168.89.11:2181,192.168.89.12:2181,192.168.89.13:2181 --topic __consumer_offsets)
注:如果配置文件offsets.topic.replication.factor設置成1后啟動了一次,再設置成3重新啟動副本因子不會更改,需要以下方法:
另外一種方法設置:
1) 創建規則json
cat > increase-replication-factor.json <<EOF
{"version":1, "partitions":[
{"topic":"__consumer_offsets","partition":0,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":1,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":2,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":3,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":4,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":5,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":6,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":7,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":8,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":9,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":10,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":11,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":12,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":13,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":14,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":15,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":16,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":17,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":18,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":19,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":20,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":21,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":22,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":23,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":24,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":25,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":26,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":27,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":28,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":29,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":30,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":31,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":32,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":33,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":34,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":35,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":36,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":37,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":38,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":39,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":40,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":41,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":42,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":43,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":44,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":45,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":46,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":47,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":48,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":49,"replicas":[1,2,3]}]
}
EOF
2) 執行
bin/kafka-reassign-partitions.sh --zookeeper 192.168.89.11:2181,192.168.89.12:2181,192.168.89.13:2181 --reassignment-json-file increase-replication-factor.json --execute
3) 驗證
bin/kafka-reassign-partitions.sh --zookeeper 192.168.89.11:2181,192.168.89.12:2181,192.168.89.13:2181 --reassignment-json-file increase-replication-factor.json --verify