server.1=192.168.1.131:2888:3888
server.2=192.168.1.129:2888:3888
server.3=192.168.1.134:2888:3888
server.4=192.168.1.135:2888:3888
server.5=192.168.1.136:2888:3888
docker exec -it zookeeper cat /conf/zoo.cfg
docker run --network host -d --name zookeeper --restart always -e ZOO_SERVERS="clientPort=2181 server.1=192.168.1.131:2888:3888 server.2=192.168.1.129:2888:3888 server.3=192.168.1.134:2888:3888" -e ZOO_MY_ID=1 docker.io/zookeeper:3.5.5
docker run --network host -d --name zookeeper --restart always -e ZOO_SERVERS="clientPort=2181 server.1=192.168.1.131:2888:3888 server.2=192.168.1.129:2888:3888 server.3=192.168.1.134:2888:3888 server.4=192.168.1.135:2888:3888 server.5=192.168.1.136:2888:3888" -e ZOO_MY_ID=1 docker.io/zookeeper:3.5.5
docker run --restart always -d --name kafka3 --network host -e KAFKA_ZOOKEEPER_CONNECT='192.168.1.131:2181,192.168.1.129:2181,192.168.1.134:2181' -e KAFKA_LISTENERS='PLAINTEXT://0.0.0.0:9092' -e KAFKA_BROKER_ID='2' -e KAFKA_ADVERTISED_LISTENERS='PLAINTEXT://192.168.1.129:9092' docker.io/wurstmeister/kafka:2.12-2.2.1
zookeeper節點擴展
一、啟動兩個zookeeper
配置文件
server.1=192.168.1.131:2888:3888
server.2=192.168.1.129:2888:3888
watch -n 0.5 -c "curl 127.0.0.1:8080/commands/mntr" #查看node02為leader
二、啟動第三個節點
配置文件
server.1=192.168.1.131:2888:3888
server.2=192.168.1.129:2888:3888
server.3=192.168.1.134:2888:3888
此時node02為leader followers兩個
三、可以直接修改node01、02的容器里的配置、因為此鏡像的docker-entrypoint.sh會判斷zoo.cfg存在就不重建,所以不用改系統變量
配置文件
server.1=192.168.1.131:2888:3888
server.2=192.168.1.129:2888:3888
server.3=192.168.1.134:2888:3888
此時leader為node03,但是停掉node01、02任何一個還是會down,所以還要重啟下node03
重啟順序為:要先 stop掉 再start 順序為: ZOO_MY_ID從大到小
這樣隨便停掉一個節點zookeeper集群都正常
在遷移kafka的時候。zookeeper的遷移時,一定要先同步下zookeeper的數據(讓新的節點先為就集群的follower)
驗證:一、舊集群node01 和node02 node02為leader
創建新集群node04和node05
讓kafka集群連接新集群,則kafka數據丟失
二、舊集群node01 和node02 node02為leader
讓node05 為舊集群的follower,先同步zookeeper的數據 配置為:01 02 05
stop掉所有zookeeper,
修改node05的配置 04 05
添加node04 配置04 05
啟動 node05 和node04 此時05為leader
讓kafka集群連接新集群。kafka數據正常
以下是kafka常用命令行總結:
0.查看有哪些主題: ./kafka-topics.sh --list --zookeeper 192.168.0.201:12181
1.查看topic的詳細信息
./kafka-topics.sh -zookeeper 127.0.0.1:2181 -describe -topic testKJ1
2、為topic增加副本
./kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 -reassignment-json-file json/partitions-to-move.json -execute
3、創建topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testKJ1
4、為topic增加partition
kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --partitions 2 --topic topic
修改位移
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --reset-offsets --group sale_log_kaetl_group --topic topicname:1 --to-offset 3 --execute #topicname:1數字是分區
5、kafka生產者客戶端命令
./kafka-console-producer.sh --broker-list localhost:9092 --topic testKJ1
6、kafka消費者客戶端命令
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic testKJ1
kafka-console-consumer.sh --bootstrap-server 172.18.72.163:9092 --offset 106553 --partition 0 --topic vm_replenish_log --max-messages 1 #此處的offset是編號,從0開始
7、kafka服務啟動
./kafka-server-start.sh -daemon ../config/server.properties
8、下線broker
./kafka-run-class.sh kafka.admin.ShutdownBroker --zookeeper 127.0.0.1:2181 --broker #brokerId# --num.retries 3 --retry.interval.ms 60
shutdown broker
9、刪除topic
./kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic testKJ1 --zookeeper 127.0.0.1:2181
./kafka-topics.sh --zookeeper localhost:2181 --delete --topic testKJ1
10、查看consumer組內消費的offset
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --offsets --describe --group reduce_group #顯示的是offset的位置 從1開始,
單節點kafka和zookeeper擴展為雙節點步驟及問題
原主機node01,擴展主機node02
node01:
docker run --network host -d --name zookeeper --restart always -e ZOO_SERVERS="clientPort=2181 server.1=10.201.5.14:2888:3888" -e ZOO_MY_ID=1 docker.io/zookeeper:3.5.5
docker run --restart always -d --name kafka --network host -e KAFKA_ZOOKEEPER_CONNECT='10.201.5.14:2181' -e KAFKA_LISTENERS='PLAINTEXT://0.0.0.0:9092' -e KAFKA_BROKER_ID='1' -e KAFKA_ADVERTISED_LISTENERS='PLAINTEXT://10.201.5.14:9092' docker.io/wurstmeister/kafka:2.12-2.2.1
此時node01 zookeeper為tandalone
node02:擴展
docker run -d --name zookeeper --restart always --network host -e ZOO_SERVERS="clientPort=2181 server.1=10.201.5.14:2888:3888 server.2=10.201.5.15:2888:3888" -e ZOO_MY_ID=2 docker.io/zookeeper:3.5.5
修改node01 zookeeper配置文件
server.1=10.201.5.14:2888:3888
server.2=10.201.5.15:2888:3888
並重啟node01,此時集群已建立,node01為leader,node02會同步node01的數據。(集群擴展式,必須要同步數據。不能把舊zookeeper刪除新建,這樣kafka的數據會丟失。kafka的元數據存在zookeeper里)按道理會從ZOO_MY_ID大的節點選舉leader,但是node01上面已經有數據了。
docker run --restart always -d --name kafka002 --network host -e KAFKA_ZOOKEEPER_CONNECT='10.201.5.14:2181,10.201.5.15:2181' -e KAFKA_LISTENERS='PLAINTEXT://0.0.0.0:9092' -e KAFKA_BROKER_ID='2' -e KAFKA_ADVERTISED_LISTENERS='PLAINTEXT://10.201.5.15:9092' wurstmeister/kafka:2.12-2.2.1
驗證kafk雙節點高可用
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
kafka-topics.sh -zookeeper 127.0.0.1:2181 -describe -topic test
會顯示副本為1,數據在哪個節點上,leader是誰。
kafka-console-producer.sh --broker-list 192.168.1.131:9092 --topic test
發消息
kafka-console-consumer.sh --bootstrap-server 192.168.1.129:9092 --from-beginning --topic test
收消息
收發消息正常
此時,windows客戶端連接kafka,無論連接哪個節點,都能看到消息。
但是關閉掉存數據的kafka后,windows客戶端連接kafka就看不到消息了。再次啟動剛剛關閉的kafka,恢復
所以需要要備份數據,使ReplicationFactor等於2
kafka-reassign-partitions.sh --zookeeper 127.0.0.1:2181 --reassignment-json-file json.json --execute
文件內容實例:
{
"version": 1,
"partitions": [
{
"topic": "zerg.hydra",
"partition": 0,
"replicas": [
1,
2
]
},
{
"topic": "zerg.hydra",
"partition": 1,
"replicas": [
1,
2
]
},
{
"topic": "zerg.hydra",
"partition": 2,
"replicas": [
1,
2
]
}
]
}
此時,windows客戶端連接kafka,無論連接哪個節點,都能看到消息。隨便停掉一個kafka也可以看到。
但是,消費消息時失敗。leader節點停掉后,kafka的leader成功切換了也不行
原因是消費消息時跟這個topic: __consumer_offsets 有關。但是__consumer_offsets只有一個節點只有數據。
所以也要備份__consumer_offsets。
kafka-reassign-partitions.sh --zookeeper 127.0.0.1:2181 --reassignment-json-file json.json --execute
因為它的partition有很多,一下為生成文件腳本
#!/bin/bash
for i in {0..49}
do
echo """{
\"topic\": \"zerg.hydra\",
\"partition\": $i,
\"replicas\": [
1,
2
]
},""" >> test.txt
done
生成后,再補充下
這樣kafka就高可用了
如果還不行,有可能是kafka的元數據沒更新leader。kafka的元數據在zookeeper里存着
zookeeper shell命令 修改元數據
zkCli.sh -server localhost:2181
>> ls /
>> ls /brokers/topics/test08/partitions/0/state
>> get /brokers/topics/test08/partitions/0/state
>> {"controller_epoch":14,"leader":2,"version":1,"leader_epoch":2,"isr":[1,2]}
刪除元數據
https://www.cnblogs.com/xiaodf/p/6289383.html
https://blog.csdn.net/jiaotongqu6470/article/details/78495147
kafka配置文件:如果一下數值改為2、則當只啟動一個kafka,也會消費失敗的,因為__consumer_offsets 主題會創建失敗。
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
kafka基本原理及leader,replica,isr介紹
https://www.cnblogs.com/answerThe/p/11267454.html
更改replicas和isr數量:kafka-reassign-partitions.sh --zookeeper 127.0.0.1:2181 --reassignment-json-file json.json --execute
json文件:
{
"version": 1,
"partitions": [
{
"topic": "test",
"partition": 0,
"replicas": [
1,
2
],
"isr":[1,2]
}
]
}