server.1=192.168.1.131:2888:3888 server.2=192.168.1.129:2888:3888 server.3=192.168.1.134:2888:3888 server.4=192.168.1.135:2888:3888 server.5=192.168.1.136:2888:3888 docker exec -it zookeeper cat /conf/zoo.cfg docker run --network host -d --name zookeeper --restart always -e ZOO_SERVERS="clientPort=2181 server.1=192.168.1.131:2888:3888 server.2=192.168.1.129:2888:3888 server.3=192.168.1.134:2888:3888" -e ZOO_MY_ID=1 docker.io/zookeeper:3.5.5 docker run --network host -d --name zookeeper --restart always -e ZOO_SERVERS="clientPort=2181 server.1=192.168.1.131:2888:3888 server.2=192.168.1.129:2888:3888 server.3=192.168.1.134:2888:3888 server.4=192.168.1.135:2888:3888 server.5=192.168.1.136:2888:3888" -e ZOO_MY_ID=1 docker.io/zookeeper:3.5.5 docker run --restart always -d --name kafka3 --network host -e KAFKA_ZOOKEEPER_CONNECT='192.168.1.131:2181,192.168.1.129:2181,192.168.1.134:2181' -e KAFKA_LISTENERS='PLAINTEXT://0.0.0.0:9092' -e KAFKA_BROKER_ID='2' -e KAFKA_ADVERTISED_LISTENERS='PLAINTEXT://192.168.1.129:9092' docker.io/wurstmeister/kafka:2.12-2.2.1 zookeeper節點擴展 一、啟動兩個zookeeper 配置文件 server.1=192.168.1.131:2888:3888 server.2=192.168.1.129:2888:3888 watch -n 0.5 -c "curl 127.0.0.1:8080/commands/mntr" #查看node02為leader 二、啟動第三個節點 配置文件 server.1=192.168.1.131:2888:3888 server.2=192.168.1.129:2888:3888 server.3=192.168.1.134:2888:3888 此時node02為leader followers兩個 三、可以直接修改node01、02的容器里的配置、因為此鏡像的docker-entrypoint.sh會判斷zoo.cfg存在就不重建,所以不用改系統變量 配置文件 server.1=192.168.1.131:2888:3888 server.2=192.168.1.129:2888:3888 server.3=192.168.1.134:2888:3888 此時leader為node03,但是停掉node01、02任何一個還是會down,所以還要重啟下node03 重啟順序為:要先 stop掉 再start 順序為: ZOO_MY_ID從大到小 這樣隨便停掉一個節點zookeeper集群都正常 在遷移kafka的時候。zookeeper的遷移時,一定要先同步下zookeeper的數據(讓新的節點先為就集群的follower) 驗證:一、舊集群node01 和node02 node02為leader 創建新集群node04和node05 讓kafka集群連接新集群,則kafka數據丟失 二、舊集群node01 和node02 node02為leader 讓node05 為舊集群的follower,先同步zookeeper的數據 配置為:01 02 05 stop掉所有zookeeper, 修改node05的配置 04 05 添加node04 配置04 05 啟動 node05 和node04 此時05為leader 讓kafka集群連接新集群。kafka數據正常 以下是kafka常用命令行總結: 0.查看有哪些主題: ./kafka-topics.sh --list --zookeeper 192.168.0.201:12181 1.查看topic的詳細信息 ./kafka-topics.sh -zookeeper 127.0.0.1:2181 -describe -topic testKJ1 2、為topic增加副本 ./kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 -reassignment-json-file json/partitions-to-move.json -execute 3、創建topic ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testKJ1 4、為topic增加partition kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --partitions 2 --topic topic
修改位移
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --reset-offsets --group sale_log_kaetl_group --topic topicname:1 --to-offset 3 --execute #topicname:1數字是分區
5、kafka生產者客戶端命令 ./kafka-console-producer.sh --broker-list localhost:9092 --topic testKJ1 6、kafka消費者客戶端命令 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic testKJ1
kafka-console-consumer.sh --bootstrap-server 172.18.72.163:9092 --offset 106553 --partition 0 --topic vm_replenish_log --max-messages 1 #此處的offset是編號,從0開始 7、kafka服務啟動 ./kafka-server-start.sh -daemon ../config/server.properties 8、下線broker ./kafka-run-class.sh kafka.admin.ShutdownBroker --zookeeper 127.0.0.1:2181 --broker #brokerId# --num.retries 3 --retry.interval.ms 60 shutdown broker 9、刪除topic ./kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic testKJ1 --zookeeper 127.0.0.1:2181 ./kafka-topics.sh --zookeeper localhost:2181 --delete --topic testKJ1 10、查看consumer組內消費的offset kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --offsets --describe --group reduce_group #顯示的是offset的位置 從1開始, 單節點kafka和zookeeper擴展為雙節點步驟及問題 原主機node01,擴展主機node02 node01: docker run --network host -d --name zookeeper --restart always -e ZOO_SERVERS="clientPort=2181 server.1=10.201.5.14:2888:3888" -e ZOO_MY_ID=1 docker.io/zookeeper:3.5.5 docker run --restart always -d --name kafka --network host -e KAFKA_ZOOKEEPER_CONNECT='10.201.5.14:2181' -e KAFKA_LISTENERS='PLAINTEXT://0.0.0.0:9092' -e KAFKA_BROKER_ID='1' -e KAFKA_ADVERTISED_LISTENERS='PLAINTEXT://10.201.5.14:9092' docker.io/wurstmeister/kafka:2.12-2.2.1 此時node01 zookeeper為tandalone node02:擴展 docker run -d --name zookeeper --restart always --network host -e ZOO_SERVERS="clientPort=2181 server.1=10.201.5.14:2888:3888 server.2=10.201.5.15:2888:3888" -e ZOO_MY_ID=2 docker.io/zookeeper:3.5.5 修改node01 zookeeper配置文件 server.1=10.201.5.14:2888:3888 server.2=10.201.5.15:2888:3888 並重啟node01,此時集群已建立,node01為leader,node02會同步node01的數據。(集群擴展式,必須要同步數據。不能把舊zookeeper刪除新建,這樣kafka的數據會丟失。kafka的元數據存在zookeeper里)按道理會從ZOO_MY_ID大的節點選舉leader,但是node01上面已經有數據了。 docker run --restart always -d --name kafka002 --network host -e KAFKA_ZOOKEEPER_CONNECT='10.201.5.14:2181,10.201.5.15:2181' -e KAFKA_LISTENERS='PLAINTEXT://0.0.0.0:9092' -e KAFKA_BROKER_ID='2' -e KAFKA_ADVERTISED_LISTENERS='PLAINTEXT://10.201.5.15:9092' wurstmeister/kafka:2.12-2.2.1 驗證kafk雙節點高可用 kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test kafka-topics.sh -zookeeper 127.0.0.1:2181 -describe -topic test 會顯示副本為1,數據在哪個節點上,leader是誰。 kafka-console-producer.sh --broker-list 192.168.1.131:9092 --topic test 發消息 kafka-console-consumer.sh --bootstrap-server 192.168.1.129:9092 --from-beginning --topic test 收消息 收發消息正常 此時,windows客戶端連接kafka,無論連接哪個節點,都能看到消息。 但是關閉掉存數據的kafka后,windows客戶端連接kafka就看不到消息了。再次啟動剛剛關閉的kafka,恢復 所以需要要備份數據,使ReplicationFactor等於2 kafka-reassign-partitions.sh --zookeeper 127.0.0.1:2181 --reassignment-json-file json.json --execute 文件內容實例: { "version": 1, "partitions": [ { "topic": "zerg.hydra", "partition": 0, "replicas": [ 1, 2 ] }, { "topic": "zerg.hydra", "partition": 1, "replicas": [ 1, 2 ] }, { "topic": "zerg.hydra", "partition": 2, "replicas": [ 1, 2 ] } ] } 此時,windows客戶端連接kafka,無論連接哪個節點,都能看到消息。隨便停掉一個kafka也可以看到。 但是,消費消息時失敗。leader節點停掉后,kafka的leader成功切換了也不行 原因是消費消息時跟這個topic: __consumer_offsets 有關。但是__consumer_offsets只有一個節點只有數據。 所以也要備份__consumer_offsets。 kafka-reassign-partitions.sh --zookeeper 127.0.0.1:2181 --reassignment-json-file json.json --execute 因為它的partition有很多,一下為生成文件腳本 #!/bin/bash for i in {0..49} do echo """{ \"topic\": \"zerg.hydra\", \"partition\": $i, \"replicas\": [ 1, 2 ] },""" >> test.txt done 生成后,再補充下 這樣kafka就高可用了 如果還不行,有可能是kafka的元數據沒更新leader。kafka的元數據在zookeeper里存着 zookeeper shell命令 修改元數據 zkCli.sh -server localhost:2181 >> ls / >> ls /brokers/topics/test08/partitions/0/state >> get /brokers/topics/test08/partitions/0/state >> {"controller_epoch":14,"leader":2,"version":1,"leader_epoch":2,"isr":[1,2]} 刪除元數據 https://www.cnblogs.com/xiaodf/p/6289383.html https://blog.csdn.net/jiaotongqu6470/article/details/78495147 kafka配置文件:如果一下數值改為2、則當只啟動一個kafka,也會消費失敗的,因為__consumer_offsets 主題會創建失敗。 ############################# Internal Topic Settings ############################# # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" # For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3. offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 kafka基本原理及leader,replica,isr介紹 https://www.cnblogs.com/answerThe/p/11267454.html 更改replicas和isr數量:kafka-reassign-partitions.sh --zookeeper 127.0.0.1:2181 --reassignment-json-file json.json --execute json文件: { "version": 1, "partitions": [ { "topic": "test", "partition": 0, "replicas": [ 1, 2 ], "isr":[1,2] } ] }