在《kafka在windows上的安装、运行》一文中,我们已初步对kafka有了一个感性的认识,但在实际应用中都是在Linux系统上通过集群方式来使用的。
下面我们来进一步进行Kafka集群搭建及操作。
一.环境说明
这里没有使用kafka自带的zookeeper,而是独立安装了一个zookeeper集群,zookeeper集群的搭建详见《zookeeper集群搭建及Leader选举算法源码解析》。
zookeeper集群的机器分别是:10.255.34.78、10.255.34.74、10.255.34.76
kafka集群的机器分别是:10.255.34.141、10.255.34.145、10.255.34.78
即10.255.34.78机器上同时搭建了zookeeper和kafka,其它机器均只搭建zookeeper或kafka。
二.kafka集群搭建
1.从官网下载kafka
我这里下载的是kafka_2.12-2.0.0.tgz包,上传到Linux服务器上,运行tar -xzf kafka_2.12-2.0.0.tgz解压。

2.修改配置文件kafka/config/server.properties
这里是在10.255.34.145机器上进行配置。
a.配置broker的ID
############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=12
10.255.34.78配置broker.id=11,10.255.34.145配置broker.id=12,10.255.34.141配置broker.id=13。
b.打开监听端口
############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://10.255.34.145:9092
每台服务配置自已机器的IP。
c.修改log的目录
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/data/app/kafka_2.12-2.0.0/kafka-logs
d.修改zookeeper.connect
############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=10.255.34.78:2181,10.255.34.76:2181,10.255.34.74:2181/kafka
这里配置的是zookeeper群集的IP和端口。
这里需要说明的是,默认Kafka会使用ZooKeeper默认的/路径,这样有关Kafka的ZooKeeper配置就会散落在根路径下面,如果你有其他的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,所以强烈建议指定一个chroot路径,直接在 zookeeper.connect配置项中指定。
zookeeper.connect=10.255.34.78:2181,10.255.34.76:2181,10.255.34.74:2181/kafka
而且,需要手动在ZooKeeper中创建路径/kafka,使用如下命令连接到任意一台 ZooKeeper服务器:
cd ~/zookeeper bin/zkCli.sh 然后输入:create /kafka ''
就创建了chroot路径,这样,每次连接Kafka集群的时候(使用--zookeeper选项),也必须使用带chroot路径的连接字符串,后面会看到。

这个命令连接到zookeeper的任何一台机器执行,zookeeper会自动同步到其它集群中的机器。【在其它机器上也执行会提示Node already exists】
3.在其它机器上进行配置
两种方法:
a.将配置好的安装文件同步到其他的节点上,在这里是10.255.34.141、10.255.34.78机器,然后修改一下差异化变量brokerid。
b.在其它节点10.255.34.141、10.255.34.78机器上按上面步骤进行配置,10.255.34.141配置broker.id=13,10.255.34.78配置broker.id=11。
因为Kafka集群需要保证各个Broker的id在整个集群中必须唯一,需要调整这个配置项的值,且经测试发现broker.id的值只能为数字(如果在单机上,可以通过建立多个Broker进程来模拟分布式的Kafka集群,也需要Broker的id唯一,还需要修改一些配置目录的信息)。
三. 启动集群并创建topic
1.启动集群
在kafka的三台机器上(10.255.34.141、10.255.34.145、10.255.34.78)分别启动Kafka,分别执行如下命令:
bin/kafka-server-start.sh config/server.properties &
可以通过查看日志,或者检查进程状态,保证Kafka集群启动成功,启动的时候注意看错误,jps看到Kafka就表示服务已经启动了。

2.创建一个名称为bj-replicated-topic5的Topic,5个分区,并且复制因子为3,执行如下命令:
bin/kafka-topics.sh --create --zookeeper 10.255.34.78:2181,10.255.34.74:2181,10.255.34.76:2181/kafka --replication-factor 3 --partitions 5 --topic bj-replicated-topic5
3.创建一个名称为bj-replicated-topic5的Topic,5个分区,并且复制因子为3,执行如下命令:
bin/kafka-topics.sh --create --zookeeper 10.255.34.78:2181,10.255.34.74:2181,10.255.34.76:2181/kafka --replication-factor 3 --partitions 5 --topic bj-replicated-topic5

当然,这里也可以只指定一台zookeeper机器,因为集群之间会自动同步。如下所示:
bin/kafka-topics.sh --create --zookeeper 10.255.34.76:2181/kafka --replication-factor 3 --partitions 5 --topic bj2-replicated-topic5

a.zookeeper指定其中一个节点即可,集群之间会自动同步。
b.–replication-factor 3 –partitions 5理论上应该是可选参数,但此脚本必须写这2个参数。
c.还可以使用–config 来指定topic的某个具体参数,以代替配置文件中的参数。如:
bin/kafka-topics.sh –create –zookeeper 10.255.34.76:2181/kafka –replication-factor 3 –partitions 5 –topic test_topic retention.bytes=3298534883328
指定了某个topic最大的保留日志量,单位是字节。
4.查看全部topic
bin/kafka-topics.sh -list -zookeeper 10.255.34.76:2181/kafka

5.查看某个topic的详细信息
bin/kafka-topics.sh --describe --zookeeper 10.255.34.76:2181/kafka --topic bj-replicated-topic5
bin/kafka-topics.sh --describe --zookeeper 10.255.34.78:2181,10.255.34.74:2181,10.255.34.76:2181/kafka --topic bj-replicated-topic5

第一行列出了这个topic的总体情况,如topic名称,分区数量,副本数量等。
第二行开始,每一行列出了一个分区的信息,如它是第几个分区,这个分区的leader是哪个broker,副本位于哪些broker,有哪些副本处理同步状态。
上面Leader、Replicas、Isr的含义如下:
a.Partition:分区
b.Leader:负责读写指定分区的节点
c.Replicas:复制该分区log的节点列表
d.Isr:"in-sync" replicas,当前活跃的副本列表(是一个子集),并且可能成为Leader
我们可以通过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如果发布消息、消费消息。
6.在一个终端,启动Producer,并向我们上面创建的名称为bj-replicated-topic5的Topic中生产消息,执行如下脚本:
bin/kafka-console-producer.sh --broker-list 10.255.34.141:9092,10.255.34.145:9092,10.255.34.78:9092 --topic bj-replicated-topic5
7.在另一个终端,启动Consumer,并订阅我们上面创建的名称为bj-replicated-topic5的Topic中生产的消息,执行如下脚本:
bin/kafka-console-consumer.sh --bootstrap-server 10.255.34.141:9092,10.255.34.145:9092,10.255.34.78:9092 --topic bj-replicated-topic5
可以在Producer终端上输入字符串消息行,就可以在Consumer终端上看到消费者消费的消息内容。
也可以参考Kafka的Producer和Consumer的Java API,通过API编码的方式来实现消息生产和消费的处理逻辑。
8.修改topic
使用--alert原则上可以修改任何配置,以下列出了一些常用的修改选项:
a.改变分区数量
bin/kafka-topics.sh --alter --zookeeper 10.255.34.76:2181/kafka --topic bj2-replicated-topic5 --partitions 6

由上面的截图实操来看,partitions只能改大,不能改小。改完后再次查下这个topic详情如下:

b.增加、修改或者删除一个配置参数
bin/kafka-topics.sh —alter --zookeeper 10.255.34.76:2181/kafka --topic my_topic_name --config key=value
bin/kafka-topics.sh —alter --zookeeper 10.255.34.76:2181/kafka --topic my_topic_name --deleteConfig key
9.删除一个topic
bin/kafka-topics.sh --delete --zookeeper 10.255.34.76:2181/kafka --topic bj2-replicated-topic5
a.配置文件中必须delete.topic.enable=true,否则只会标记为删除,而不是真正删除。
b.执行此脚本的时候,topic的数据会同时被删除。如果由于某些原因导致topic的数据不能完全删除(如其中一个broker down了),此时topic只会被marked for deletion,而不会真正删除。此时创建同名的topic会有冲突。

在这里,由于在kafka_2.12-2.0.0/config/server.properties中未设置delete.topic.enable=true配置,执行删除命令后,只是在对应的topic后面打一个-marked for deletion标识。

如果在kafka_2.12-2.0.0/config/server.properties中增加了delete.topic.enable=true配置

10.停止集群
在kafka的三台机器上(10.255.34.141、10.255.34.145、10.255.34.78)分别停止Kafka,分别执行如下命令:
bin/kafka-server-stop.sh
jps看不到Kafka进程就表示服务已经停掉了。
四.高可用
1.某个broker挂掉,本机器可重启
如果一个broker挂掉,且可以重启则处理步骤如下:
a.重启kafka进程
b.执行rebalance(由于已经设置配置项自动执行balance,因此此步骤一般可忽略)
详细分析见下面操作过程:
a.topic的情况
[app@VM_34_145_centos kafka_2.12-2.0.0]$ bin/kafka-topics.sh --describe --zookeeper 10.255.34.76:2181/kafka --topic bj-replicated-topic5 Topic:bj-replicated-topic5 PartitionCount:5 ReplicationFactor:3 Configs: Topic: bj-replicated-topic5 Partition: 0 Leader: 11 Replicas: 11,12,13 Isr: 11,13,12 Topic: bj-replicated-topic5 Partition: 1 Leader: 12 Replicas: 12,13,11 Isr: 13,11,12 Topic: bj-replicated-topic5 Partition: 2 Leader: 13 Replicas: 13,11,12 Isr: 13,11,12 Topic: bj-replicated-topic5 Partition: 3 Leader: 11 Replicas: 11,13,12 Isr: 11,13,12 Topic: bj-replicated-topic5 Partition: 4 Leader: 12 Replicas: 12,11,13 Isr: 11,13,12
集群中有3台机器,id为【11-13】,topic 有5个分区,每个分区3个副本,leader分别位于11,12,13,11,12中。
b.模拟机器down,kill掉进程
分区0的leader位于id=11的broker中,kill掉这台机器的kafka进程。

c.再次查看topic的情况
[app@VM_34_145_centos kafka_2.12-2.0.0]$ bin/kafka-topics.sh --describe --zookeeper 10.255.34.76:2181/kafka --topic bj-replicated-topic5 Topic:bj-replicated-topic5 PartitionCount:5 ReplicationFactor:3 Configs: Topic: bj-replicated-topic5 Partition: 0 Leader: 12 Replicas: 11,12,13 Isr: 13,12 Topic: bj-replicated-topic5 Partition: 1 Leader: 12 Replicas: 12,13,11 Isr: 13,12 Topic: bj-replicated-topic5 Partition: 2 Leader: 13 Replicas: 13,11,12 Isr: 13,12 Topic: bj-replicated-topic5 Partition: 3 Leader: 13 Replicas: 11,13,12 Isr: 13,12 Topic: bj-replicated-topic5 Partition: 4 Leader: 12 Replicas: 12,11,13 Isr: 13,12
可以看到,分区0的leader已经移到id=12的机器上了,它的副本位于11,12,13这3台机器上,但处于同步状态的只有id=13和id=12这两台机器。分区3的leader已经移到id=13的机器上了,它的副本位于11,13,12这3台机器上,但处于同步状态的只有id=13和id=12这两台机器。
d.重启kafka进程
bin/kafka-server-start.sh config/server.properties &
e.再次查看状态
[app@VM_34_145_centos kafka_2.12-2.0.0]$ bin/kafka-topics.sh --describe --zookeeper 10.255.34.76:2181/kafka --topic bj-replicated-topic5 Topic:bj-replicated-topic5 PartitionCount:5 ReplicationFactor:3 Configs: Topic: bj-replicated-topic5 Partition: 0 Leader: 12 Replicas: 11,12,13 Isr: 13,12,11 Topic: bj-replicated-topic5 Partition: 1 Leader: 12 Replicas: 12,13,11 Isr: 13,12,11 Topic: bj-replicated-topic5 Partition: 2 Leader: 13 Replicas: 13,11,12 Isr: 13,12,11 Topic: bj-replicated-topic5 Partition: 3 Leader: 13 Replicas: 11,13,12 Isr: 13,12,11 Topic: bj-replicated-topic5 Partition: 4 Leader: 12 Replicas: 12,11,13 Isr: 13,12,11
发现分区0的3个副本都已经处于同步状态,但leader依然为id=12的broker。
f.执行leader平衡
bin/kafka-preferred-replica-election.sh --zookeeper 10.255.34.76:2181/kafka
如果配置文件中:
auto.leader.rebalance.enable=true
则此步骤不需要执行。
g.重新查看topic
[app@VM_34_145_centos kafka_2.12-2.0.0]$ bin/kafka-topics.sh --describe --zookeeper 10.255.34.76:2181/kafka --topic bj-replicated-topic5 Topic:bj-replicated-topic5 PartitionCount:5 ReplicationFactor:3 Configs: Topic: bj-replicated-topic5 Partition: 0 Leader: 11 Replicas: 11,12,13 Isr: 13,12,11 Topic: bj-replicated-topic5 Partition: 1 Leader: 12 Replicas: 12,13,11 Isr: 13,12,11 Topic: bj-replicated-topic5 Partition: 2 Leader: 13 Replicas: 13,11,12 Isr: 13,12,11 Topic: bj-replicated-topic5 Partition: 3 Leader: 11 Replicas: 11,13,12 Isr: 13,12,11 Topic: bj-replicated-topic5 Partition: 4 Leader: 12 Replicas: 12,11,13 Isr: 13,12,11
此时分区0和分区3的leader已经回到了id=11的broker,一切恢复正常。
2.某个broker挂掉且无法重启,需要其它机器代替
当一个broker挂掉,需要换机器时,采用以下步骤:
a.将新机器kafka配置文件中的broker.id设置为与原机器一样
b.启动kafka,注意kafka保存数据的目录不会自动创建,需要手工创建
详细分析过程如下:
a.初始化机器,主要包括用户创建,kafka文件的复制等。
b.修改config/server.properties文件
注意,只需要修改一个配置broker.id,且此配置必须与挂掉的那台机器相同,因为kafka是通过broker.id来区分集群中的机器的。此处设为
broker.id=5
c.查看topic的当前状态
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 5 Replicas: 5,2 Isr: 2,5 Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
当前topic有3个分区,其中分区1的leader位于id=5的机器上。
d.关掉id=5的机器
kill -9 ** 用于模拟机器突然down
或者:
bin/kafka-server-stop.sh
用于正常关闭
e.查看topic的状态
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 2 Replicas: 5,2 Isr: 2 Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
可见,topic的分区0的leader已经迁移到了id=2的机器上,且处于同步的机器只有一个了。
f.启动新机器
nohup bin/kafka-server-start.sh config/server.properties
g.再看topic的状态
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 2 Replicas: 5,2 Isr: 2,5 Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
id=5的机器也处于同步状态了,但还需要将leader恢复到这台机器上。
h.执行leader平衡
bin/kafka-preferred-replica-election.sh –zookeeper 192.168.172.98:2181/kafka
如果配置文件中
auto.leader.rebalance.enable=true
则此步骤不需要执行。
i.done
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 5 Replicas: 5,2 Isr: 2,5 Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
所有内容都恢复了
五.扩容
将一台机器加入kafka集群很容易,只需要为它分配一个独立的broker id,然后启动它即可。但是这些新加入的机器上面并没有任何的分区数据,所以除非将现有数据移动这些机器上,否则它不会做任何工作,直到创建新topic。因此,当你往集群加入机器时,你应该将其它机器上的一部分数据往这台机器迁移。
数据迁移的工作需要手工初始化,然后自动完成。它的原理如下:当新机器起来后,kafka将其它机器的一些分区复制到这个机器上,并作为follower,当这个新机器完成复制并成为in-sync状态后,那些被复制的分区的一个副本会被删除。(都不会成为leader?)
1.将新机器kafka配置文件中的broker.id设置为与原机器一样
2.启动kafka,注意kafka保存数据的目录不会自动创建,需要手工创建
此时新建的topic都会优先分配leader到新增的机器上,但原有的topic不会将分区迁移过来。
3.数据迁移,请见数据迁移部分。
六.数据迁移
以下步骤用于将现有数据迁移到新的broker中,假设需要将test_topic与streaming_ma30_sdc的全部分区迁移到新的broker中(id 为6和7)
1.创建一个json文件,用于指定哪些topic将被迁移过去
cat topics-to-move.json
{"topics": [
{"topic": "test_topic"},
{"topic": "streaming_ma30_sdc"}
],
"version":1
}
注意全角,半角符号,或者中文引号之类的问题。
2.先generate迁移后的结果,检查一下是不是你要想的效果
bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --topics-to-move-json-file topics-to-move.json --broker-list "6,7" —generate Current partition replica assignment {"version":1,"partitions":[{"topic":"streaming_ma30_sdc","partition":2,"replicas":[2]},{"topic":"test_topic","partition":0,"replicas":[5,2]},{"topic":"test_topic","partition":2,"replicas":[3,4]},{"topic":"streaming_ma30_sdc","partition":1,"replicas":[5]},{"topic":"streaming_ma30_sdc","partition":0,"replicas":[4]},{"topic":"test_topic","partition":1,"replicas":[2,3]},{"topic":"streaming_ma30_sdc","partition":3,"replicas":[3]},{"topic":"streaming_ma30_sdc","partition":4,"replicas":[4]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[7,6]},{"topic":"streaming_ma30_sdc","partition":2,"replicas":[7]},{"topic":"test_topic","partition":2,"replicas":[7,6]},{"topic":"streaming_ma30_sdc","partition":1,"replicas":[6]},{"topic":"test_topic","partition":1,"replicas":[6,7]},{"topic":"streaming_ma30_sdc","partition":0,"replicas":[7]},{"topic":"streaming_ma30_sdc","partition":4,"replicas":[7]},{"topic":"streaming_ma30_sdc","partition":3,"replicas":[6]}]}
分别列出了当前的状态以及迁移后的状态。
把这2个json分别保存下来,第一个用来万一需要roll back的时候使用,第二个用来执行迁移。
3.执行迁移
bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --reassignment-json-file expand-cluster-reassignment.json --execute
其中expand-cluster-reassignment.json为保存上面第二段json的文件。
4.查看迁移过程
bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --reassignment-json-file expand-cluster-reassignment.json --verify Status of partition reassignment: Reassignment of partition [streaming_ma30_sdc,0] is still in progress Reassignment of partition [streaming_ma30_sdc,4] is still in progress Reassignment of partition [test_topic,2] completed successfully Reassignment of partition [test_topic,0] completed successfully Reassignment of partition [streaming_ma30_sdc,3] is still in progress Reassignment of partition [streaming_ma30_sdc,1] is still in progress Reassignment of partition [test_topic,1] completed successfully Reassignment of partition [streaming_ma30_sdc,2] is still in progress
5.当所有迁移的完成后,查看一下结果是不是你想要的
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 7 Replicas: 7,6 Isr: 6,7 Topic: test_topic Partition: 1 Leader: 6 Replicas: 6,7 Isr: 6,7 Topic: test_topic Partition: 2 Leader: 7 Replicas: 7,6 Isr: 6,7
完成
以上步骤将整个topic迁移,也可以只迁移其中一个或者多个分区。
以下将test_topic的分区1移到broker id为2,3的机器,分区2移到broker id为4,5的机器。
【其实还是整个topic迁移好一点,不然准备迁移文件会很麻烦】
1.准备迁移配置文件
cat custom-reassignment.json
{"version":1,"partitions":[{"topic":"test_topic","partition":1,"replicas":[2,3]},{"topic":"test_topic","partition":2,"replicas":[4,5]}]}
2.执行迁移
bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --reassignment-json-file custom-reassignment.json --execute
3.查看迁移过程
bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --reassignment-json-file custom-reassignment.json --verify
4.查看迁移结果
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic
七.机器下线
当一个机器下线时,kafka并不会自动将这台机器上的副本迁移到其它机器上,因此,我们需要手工进行迁移。这个过程会相当的无聊,kafka打算在0.8.2版本中添加此特性。
有了吗?再找找。如果只是替换机器则不会有这个问题。
八.增加副本数量Increasing replication factor
Increasing the replication factor of an existing partition is easy. Just specify the extra replicas in the custom reassignment json file and use it with the –execute option to increase the replication factor of the specified partitions.
For instance, the following example increases the replication factor of partition 0 of topic foo from 1 to 3. Before increasing the replication factor, the partition’s only replica existed on broker 5. As part of increasing the replication factor, we will add more replicas on brokers 6 and 7.
The first step is to hand craft the custom reassignment plan in a json file
cat increase-replication-factor.json {“version”:1, “partitions”:[{“topic”:”foo”,”partition”:0,”replicas”:[5,6,7]}]
}
Then, use the json file with the –execute option to start the reassignment process
bin/kafka-reassign-partitions.sh –zookeeper localhost:2181 –reassignment-json-file increase-replication-factor.json –execute
Current partition replica assignment
{“version”:1,
“partitions”:[{“topic”:”foo”,”partition”:0,”replicas”:[5]}]
}
Save this to use as the –reassignment-json-file option during rollback
Successfully started reassignment of partitions
{“version”:1,
“partitions”:[{“topic”:”foo”,”partition”:0,”replicas”:[5,6,7]}]
}
The –verify option can be used with the tool to check the status of the partition reassignment. Note that the same increase-replication-factor.json (used with the –execute option) should be used with the –verify option
bin/kafka-reassign-partitions.sh –zookeeper localhost:2181 –reassignment-json-file increase-replication-factor.json –verify
Status of partition reassignment:
Reassignment of partition [foo,0] completed successfully
You can also verify the increase in replication factor with the kafka-topics tool
bin/kafka-topics.sh –zookeeper localhost:2181 –topic foo –describe Topic:foo PartitionCount:1 ReplicationFactor:3 Configs: Topic: foo Partition: 0 Leader: 5 Replicas: 5,6,7 Isr: 5,6,7
九.leader的平衡
当一个broker down掉时,所有本来将它作为leader的分区会被将leader转移到其它broker。这意味着当这个broker重启时,它将不再担任何分区的leader,kafka的client也不会从这个broker来读取消息,导致资源的浪费。
为了避免这种情况的发生,kafka增加了一个标记:优先副本(preferred replicas)。如果一个分区有3个副本,且这3个副本的优先级别分别为1,5,9,则1会作为leader。为了使kafka集群恢复默认的leader,需要运行以下命令:
bin/kafka-preferred-replica-election.sh --zookeeper 192.168.172.98:2181/kafka
或者可以设置以下配置项,leader 会自动执行balance:
auto.leader.rebalance.enable=true
这配置默认即为空,但需要经过一段时间后才会触发,约半小时。
十.其他
1.日志说明
默认kafka的日志是保存在kafka_2.12-2.0.0/logs目录下的,这里说几个需要注意的日志
a.server.log #kafka的运行日志
b.state-change.log #kafka他是用zookeeper来保存状态,所以他可能会进行切换,切换的日志就保存在这里
c.controller.log #kafka选择一个节点作为"controller",当发现有节点down掉的时候它负责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller.
2.可以登录zk来查看zk的目录情况
a.使用客户端进入zk
./zkCli.sh
或
./zkCli.sh -server 127.0.0.1:2181 #默认是不用加'-server'参数,如果改了zk端口才需要加
b.查看目录情况
执行“ls /”
[zk: localhost:2181(CONNECTED) 0] ls /
[cluster, controller_epoch, brokers, zookeeper, kafka, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
上面的显示结果中:zookeeper原生的,consumers, config, controller, isr_change_notification, admin, brokers, controller_epoch都是Kafka创建的

文章来源:http://blog.jobbole.com/99195/
