部署配置
1.准备部署包(自行下载)
vim conf/zoo.cfg
dataDir=/data/vfan/zk/data/ dataLogDir=/data/vfan/zk/logs/ startLogDir=/data/vfan/zk/logs/ clientPort=2181 maxClientCnxns=0 initLimit=5 syncLimit=2 server.1=10.61.194.34:2801:3801 server.2=10.61.199.15:2802:3802 server.3=10.61.202.16:2803:3803 # server.A=B:C:D 其中A是一个数字,代表这是第几号服务器;B是服务器的IP地址;C表示服务器与群集中的“领导者”交换信息的端口;当领导者失效后,D表示用来执行选举时服务器相互通信的端口 snapCount=20 autopurge.snapRetainCount =3 autopurge.purgeInterval =1
zk集群配置如上,如果是单台,从server.1开始往下都注释即可
启动zk
bin/zkServer.sh start ## ps 检查进程
2.配置kafka
vim kafka/config/server.properties
broker.id=1 listeners=PLAINTEXT://10.153.204.28:9092
num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/data/vfan/kfk/logs/ # 当topic不存在系统自动创建时的分区数 num.partitions=3 # 当topic不存在系统自动创建时的副本数 default.replication.factor=3 # offset topic的replicas数量 offsets.topic.replication.factor=3 # 每个数据目录的线程数,用于启动时的日志恢复和关闭时的刷新 num.recovery.threads.per.data.dir=1 # 事务主题的复制因子 transaction.state.log.replication.factor=3 # 覆盖事务主题的min.insync.replicas配置 transaction.state.log.min.isr=3 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=10.61.194.34:2181,10.61.199.15:2181,10.61.202.16 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
集群模式时,修改每个配置文件的 broker.id listeners 即可,zookeeper.connect若为单机就写一个
启动kafka
bin/kafka-server-start.sh -daemon config/server.properties ## ss -tnlp|grep 9092 检查端口
常用命令总结
topic相关
## 查看所有topic ./kafka-topics.sh --zookeeper localhost:2181 --list ## 查看所有topic详情(副本、分区、ISR等) ./kafka-topics.sh --zookeeper localhost:2181 --describe ## 查看某个topic详情 ./kafka-topics.sh --zookeeper localhost:2181 --describe --topic test ## 创建topic,3副本 3分区 ./kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 3 --partitions 3 ## 调整分区数量 ./kafka-topics.sh --alter --zookeeper localhost:2181 --topic test --partitions 3 ## 删除topic,需要将参数设置为delete.topic.enable=true,如果还是删不了则删除kafka中的所有分区log,及通过zk客户端删除 ./kafka-topics.sh --zookeeper localhost:2181 --delete --topic test ## 查看topic各个分区的消息数量 ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -1 --topic test
模拟kafka生产消费
## 生产 ./kafka-console-producer.sh --broker-list 10.153.204.28:9092 --topic test ## 消费,--from-beginning参数表示从头开始 ./kafka-console-consumer.sh --bootstrap-server 10.153.204.28:9092 --topic test --from-beginning
此处需要注意,生产者和测试者指定的broker必须和配置文件中zookeeper.connect和listeners中的地址一至,如写localhost生产者会类似如下信息:
WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
消费者会报错类似错误:
WARN [Consumer clientId=consumer-1, groupId=console-consumer-8350] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
消费者相关
## 显示所有消费者 ./kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list ## 获取某消费者消费某个topic的offset ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-consumer ## 调整消费者对某个topic的offset,发生阻塞等情况时可使用 .kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupName --reset-offsets --to-offset 1000 --topic topicName --execute
调整默认分区副本数
## 配置文件中指定默认分区 副本数 num.partitions=3 ;当topic不存在系统自动创建时的分区数 default.replication.factor=3 ;当topic不存在系统自动创建时的副本数 offsets.topic.replication.factor=3 ;表示kafka的内部topic consumer_offsets副本数,默认为1
调整topic分区副本数
目前 guoqing 的topic副本和分区都为1
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic guoqing Topic:guoqing PartitionCount:1 ReplicationFactor:1 Configs: Topic: guoqing Partition: 0 Leader: 1 Replicas: 1 Isr: 1
将分区数调整为3
## 扩容 ./kafka-topics.sh --alter --zookeeper localhost:2181 --topic guoqing --partitions 3 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded! ## 检查 ./kafka-topics.sh --zookeeper localhost:2181 --describe --topic guoqing Topic:guoqing PartitionCount:3 ReplicationFactor:1 Configs: Topic: guoqing Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Topic: guoqing Partition: 1 Leader: 2 Replicas: 2 Isr: 2 Topic: guoqing Partition: 2 Leader: 3 Replicas: 3 Isr: 3
注意:分区数只能增加,不能减少
将副本数调整为3,首先准备json文件,格式如下:
vim guoqing.json
{ "version": 1, "partitions": [ { "topic": "guoqing", "partition": 0, "replicas": [ 1, 2, 3 ] }, { "topic": "guoqing", "partition": 1, "replicas": [ 2, 1, 3 ] }, { "topic": "guoqing", "partition": 2, "replicas": [ 3, 2, 1 ] } ] }
执行调整命令
./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file /tmp/guoqing.json --execute Current partition replica assignment {"version":1,"partitions":[{"topic":"guoqing","partition":0,"replicas":[1],"log_dirs":["any"]},{"topic":"guoqing","partition":2,"replicas":[3],"log_dirs":["any"]},{"topic":"guoqing","partition":1,"replicas":[2],"log_dirs":["any"]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions.
检查调整进度
./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file /tmp/guoqing.json --verify Status of partition reassignment: Reassignment of partition guoqing-0 completed successfully Reassignment of partition guoqing-1 completed successfully Reassignment of partition guoqing-2 completed successfully
检查调整后的状态
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic guoqing Topic:guoqing PartitionCount:3 ReplicationFactor:3 Configs: Topic: guoqing Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: guoqing Partition: 1 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: guoqing Partition: 2 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1