部署配置
1.准備部署包(自行下載)
vim conf/zoo.cfg
dataDir=/data/vfan/zk/data/ dataLogDir=/data/vfan/zk/logs/ startLogDir=/data/vfan/zk/logs/ clientPort=2181 maxClientCnxns=0 initLimit=5 syncLimit=2 server.1=10.61.194.34:2801:3801 server.2=10.61.199.15:2802:3802 server.3=10.61.202.16:2803:3803 # server.A=B:C:D 其中A是一個數字,代表這是第幾號服務器;B是服務器的IP地址;C表示服務器與群集中的“領導者”交換信息的端口;當領導者失效后,D表示用來執行選舉時服務器相互通信的端口 snapCount=20 autopurge.snapRetainCount =3 autopurge.purgeInterval =1
zk集群配置如上,如果是單台,從server.1開始往下都注釋即可
啟動zk
bin/zkServer.sh start ## ps 檢查進程
2.配置kafka
vim kafka/config/server.properties
broker.id=1 listeners=PLAINTEXT://10.153.204.28:9092
num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/data/vfan/kfk/logs/ # 當topic不存在系統自動創建時的分區數 num.partitions=3 # 當topic不存在系統自動創建時的副本數 default.replication.factor=3 # offset topic的replicas數量 offsets.topic.replication.factor=3 # 每個數據目錄的線程數,用於啟動時的日志恢復和關閉時的刷新 num.recovery.threads.per.data.dir=1 # 事務主題的復制因子 transaction.state.log.replication.factor=3 # 覆蓋事務主題的min.insync.replicas配置 transaction.state.log.min.isr=3 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=10.61.194.34:2181,10.61.199.15:2181,10.61.202.16 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
集群模式時,修改每個配置文件的 broker.id listeners 即可,zookeeper.connect若為單機就寫一個
啟動kafka
bin/kafka-server-start.sh -daemon config/server.properties ## ss -tnlp|grep 9092 檢查端口
常用命令總結
topic相關
## 查看所有topic ./kafka-topics.sh --zookeeper localhost:2181 --list ## 查看所有topic詳情(副本、分區、ISR等) ./kafka-topics.sh --zookeeper localhost:2181 --describe ## 查看某個topic詳情 ./kafka-topics.sh --zookeeper localhost:2181 --describe --topic test ## 創建topic,3副本 3分區 ./kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 3 --partitions 3 ## 調整分區數量 ./kafka-topics.sh --alter --zookeeper localhost:2181 --topic test --partitions 3 ## 刪除topic,需要將參數設置為delete.topic.enable=true,如果還是刪不了則刪除kafka中的所有分區log,及通過zk客戶端刪除 ./kafka-topics.sh --zookeeper localhost:2181 --delete --topic test ## 查看topic各個分區的消息數量 ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -1 --topic test
模擬kafka生產消費
## 生產 ./kafka-console-producer.sh --broker-list 10.153.204.28:9092 --topic test ## 消費,--from-beginning參數表示從頭開始 ./kafka-console-consumer.sh --bootstrap-server 10.153.204.28:9092 --topic test --from-beginning
此處需要注意,生產者和測試者指定的broker必須和配置文件中zookeeper.connect和listeners中的地址一至,如寫localhost生產者會類似如下信息:
WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
消費者會報錯類似錯誤:
WARN [Consumer clientId=consumer-1, groupId=console-consumer-8350] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
消費者相關
## 顯示所有消費者 ./kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list ## 獲取某消費者消費某個topic的offset ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-consumer ## 調整消費者對某個topic的offset,發生阻塞等情況時可使用 .kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupName --reset-offsets --to-offset 1000 --topic topicName --execute
調整默認分區副本數
## 配置文件中指定默認分區 副本數 num.partitions=3 ;當topic不存在系統自動創建時的分區數 default.replication.factor=3 ;當topic不存在系統自動創建時的副本數 offsets.topic.replication.factor=3 ;表示kafka的內部topic consumer_offsets副本數,默認為1
調整topic分區副本數
目前 guoqing 的topic副本和分區都為1
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic guoqing Topic:guoqing PartitionCount:1 ReplicationFactor:1 Configs: Topic: guoqing Partition: 0 Leader: 1 Replicas: 1 Isr: 1
將分區數調整為3
## 擴容 ./kafka-topics.sh --alter --zookeeper localhost:2181 --topic guoqing --partitions 3 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded! ## 檢查 ./kafka-topics.sh --zookeeper localhost:2181 --describe --topic guoqing Topic:guoqing PartitionCount:3 ReplicationFactor:1 Configs: Topic: guoqing Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Topic: guoqing Partition: 1 Leader: 2 Replicas: 2 Isr: 2 Topic: guoqing Partition: 2 Leader: 3 Replicas: 3 Isr: 3
注意:分區數只能增加,不能減少
將副本數調整為3,首先准備json文件,格式如下:
vim guoqing.json
{ "version": 1, "partitions": [ { "topic": "guoqing", "partition": 0, "replicas": [ 1, 2, 3 ] }, { "topic": "guoqing", "partition": 1, "replicas": [ 2, 1, 3 ] }, { "topic": "guoqing", "partition": 2, "replicas": [ 3, 2, 1 ] } ] }
執行調整命令
./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file /tmp/guoqing.json --execute Current partition replica assignment {"version":1,"partitions":[{"topic":"guoqing","partition":0,"replicas":[1],"log_dirs":["any"]},{"topic":"guoqing","partition":2,"replicas":[3],"log_dirs":["any"]},{"topic":"guoqing","partition":1,"replicas":[2],"log_dirs":["any"]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions.
檢查調整進度
./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file /tmp/guoqing.json --verify Status of partition reassignment: Reassignment of partition guoqing-0 completed successfully Reassignment of partition guoqing-1 completed successfully Reassignment of partition guoqing-2 completed successfully
檢查調整后的狀態
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic guoqing Topic:guoqing PartitionCount:3 ReplicationFactor:3 Configs: Topic: guoqing Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: guoqing Partition: 1 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: guoqing Partition: 2 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1