kafka集群操作指南
@(博客文章)[kafka|大數據]
本系統文章共三篇,分別為
1、kafka集群原理介紹了以下幾個方面的內容:
(1)kafka基礎理論
(2)參數配置
(3)錯誤處理
(4)kafka集群在zookeeper集群中的內容
2、kafka集群操作介紹了kafka集群的安裝與操作
(1)單機版安裝
(2)集群安裝
(3)集群啟停操作
(4)topic相關操作
(5)某個broker掛掉,重啟本機器
(6)某個broker掛掉且無法重啟,使用其它機器代替
(7)擴容
(8)數據遷移
(9)機器下線
(10)增加副本數量
(11)平衡leader
3、kafka集群編程介紹了...
(一)單機版安裝
此部分不可用於生產,但新接觸kafka時,可以先有個感性的認識
Step 1: 下載Kafka
下載最新的版本並解壓.
$ wget http://mirrors.cnnic.cn/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz
$ tar -zxvf kafka_2.10-0.8.2.1.tgz
Step 2: 啟動服務
Kafka用到了Zookeeper,所有首先啟動Zookper,下面簡單的啟用一個單實例的Zookkeeper服務。可以在命令的結尾加個&符號,這樣就可以啟動后離開控制台。
> bin/zookeeper-server-start.sh config/zookeeper.properties &
現在啟動Kafka:
bin/kafka-server-start.sh config/server.properties
Step 3: 創建 topic
創建一個叫做“test”的topic,它只有一個分區,一個副本。
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
[2015-06-04 13:17:13,943] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket)
Created topic "test".
可以通過list命令查看創建的topic:
> bin/kafka-topics.sh --list --zookeeper localhost:2181
> test
除了手動創建topic,還可以配置broker讓它自動創建topic.
Step 4:發送消息.
Kafka 使用一個簡單的命令行producer,從文件中或者從標准輸入中讀取消息並發送到服務端。默認的每條命令將發送一條消息。
運行producer並在控制台中輸一些消息,這些消息將被發送到服務端:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a messageThis is another message
ctrl+c可以退出發送。
默認情況下,日志數據會被放置到/tmp/kafka-logs中,每個分區一個目錄
Step 5: 啟動consumer
Kafka also has a command line consumer that will dump out messages to standard output.
Kafka也有一個命令行consumer可以讀取消息並輸出到標准輸出:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message
你在一個終端中運行consumer命令行,另一個終端中運行producer命令行,就可以在一個終端輸入消息,另一個終端讀取消息。
這兩個命令都有自己的可選參數,可以在運行的時候不加任何參數可以看到幫助信息。
(二)集群安裝
注意,必須先搭建zookeeper集群
1、使用3台機器搭建Kafka集群:
192.168.169.92 gdc-dn01-test
192.168.169.93 gdc-dn02-test
192.168.169.94 gdc-dn03-test
2、在安裝Kafka集群之前,這里沒有使用Kafka自帶的Zookeeper,而是獨立安裝了一個Zookeeper集群,也是使用這3台機器,保證Zookeeper集群正常運行。
3、首先,在gdc-dn01-test上准備Kafka安裝文件,執行如下命令:
wget http://mirrors.cnnic.cn/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz
tar xvzf kafka_2.10-0.8.2.1.tgz
mv kafka_2.10-0.8.2.1 kafka
4、修改配置文件kafka/config/server.properties,修改如下內容:
broker.id=0
zookeeper.connect=192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka
這里需要說明的是,默認Kafka會使用ZooKeeper默認的/路徑,這樣有關Kafka的ZooKeeper配置就會散落在根路徑下面,如果 你有其他的應用也在使用ZooKeeper集群,查看ZooKeeper中數據可能會不直觀,所以強烈建議指定一個chroot路徑,直接在 zookeeper.connect配置項中指定:
zookeeper.connect=192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka
而且,需要手動在ZooKeeper中創建路徑/kafka,使用如下命令連接到任意一台ZooKeeper服務器:
cd ~/zookeeper
bin/zkCli.sh
在ZooKeeper執行如下命令創建chroot路徑:
create /kafka ''
這樣,每次連接Kafka集群的時候(使用--zookeeper選項),也必須使用帶chroot路徑的連接字符串,后面會看到。
5、然后,將配置好的安裝文件同步到其他的dn02、dn03節點上:
scp -r /usr/local/kafka_2.10-0.8.2.1/ 192.168.169.92:/home/hadoop
scp -r /usr/local/kafka_2.10-0.8.2.1/ 192.168.169.93:/home/hadoop
6、最后,在dn02、dn03節點上配置修改配置文件kafka/config/server.properties內容如下所示:
broker.id=1 # 在dn02修改
broker.id=2 # 在dn03修改
因為Kafka集群需要保證各個Broker的id在整個集群中必須唯一,需要調整這個配置項的值(如果在單機上,可以通過建立多個Broker進程來模擬分布式的Kafka集群,也需要Broker的id唯一,還需要修改一些配置目錄的信息)。
7、在集群中的dn01、dn02、dn03這三個節點上分別啟動Kafka,分別執行如下命令:
bin/kafka-server-start.sh config/server.properties &
可以通過查看日志,或者檢查進程狀態,保證Kafka集群啟動成功。
8、創建一個名稱為my-replicated-topic5的Topic,5個分區,並且復制因子為3,執行如下命令:
bin/kafka-topics.sh --create --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5
9、查看創建的Topic,執行如下命令:
bin/kafka-topics.sh --describe --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --topic my-replicated-topic5
結果信息如下所示:
Topic:my-replicated-topic5 PartitionCount:5 ReplicationFactor:3 Configs:
Topic: my-replicated-topic5 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: my-replicated-topic5 Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: my-replicated-topic5 Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: my-replicated-topic5 Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: my-replicated-topic5 Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
上面Leader、Replicas、Isr的含義如下:
1 Partition: 分區
2 Leader : 負責讀寫指定分區的節點
3 Replicas : 復制該分區log的節點列表
4 Isr : "in-sync" replicas,當前活躍的副本列表(是一個子集),並且可能成為Leader
我們可以通過Kafka自帶的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh腳本,來驗證演示如果發布消息、消費消息。
11、在一個終端,啟動Producer,並向我們上面創建的名稱為my-replicated-topic5的Topic中生產消息,執行如下腳本:
bin/kafka-console-producer.sh --broker-list 192.168.169.92:9092, 192.168.169.93:9092, 192.168.169.94:9092 --topic my-replicated-topic5
12、在另一個終端,啟動Consumer,並訂閱我們上面創建的名稱為my-replicated-topic5的Topic中生產的消息,執行如下腳本:
bin/kafka-console-consumer.sh --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --from-beginning --topic my-replicated-topic5
可以在Producer終端上輸入字符串消息行,就可以在Consumer終端上看到消費者消費的消息內容。
也可以參考Kafka的Producer和Consumer的Java API,通過API編碼的方式來實現消息生產和消費的處理邏輯。
(三)集群啟停操作
1、啟動集群
bin/kafka-server-start.sh config/server.properties &
2、停止集群
bin/kafka-server-stop.sh
3、重啟
沒有專用腳本,先停后啟即可
注:當然也可以使用kill命令來關閉,但使用腳本有以下好處:
(1)It will sync all its logs to disk to avoid needing to do any log recovery when it restarts (i.e. validating the checksum for all messages in the tail of the log). Log recovery takes time so this speeds up intentional restarts.
(2)It will migrate any partitions the server is the leader for to other replicas prior to shutting down. This will make the leadership transfer faster and minimize the time each partition is unavailable to a few milliseconds.
(四)topic相關的操作
1、創建topic
bin/kafka-topics.sh --create --zookeeper 192.168.172.98:2181/kafka --replication-factor 2 --partitions 3 --topic test_topic
(1)zookeeper指定其中一個節點即可,集群之間會自動同步。
(2)--replication-factor 2 --partitions 3理論上應該是可選參數,但此腳本必須寫這2個參數。
(3)還可以使用--config <name=value>來指定topic的某個具體參數,以代替配置文件中的參數。如:
bin/kafka-topics.sh --create --zookeeper 192.168.172.98:2181/kafka --replication-factor 2 --partitions 3 --topic test_topic retention.bytes=3298534883328
指定了某個topic最大的保留日志量,單位是字節。
2、查看全部topic
bin/kafka-topics.sh --list --zookeeper 192.168.172.98:2181/kafka
3、查看某個topic的詳細信息
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic
Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test_topic Partition: 0 Leader: 3 Replicas: 3,4 Isr: 3,4
Topic: test_topic Partition: 1 Leader: 4 Replicas: 4,5 Isr: 4,5
Topic: test_topic Partition: 2 Leader: 5 Replicas: 5,2 Isr: 5,2
(1)第一行列出了這個topic的總體情況,如topic名稱,分區數量,副本數量等。
(2)第二行開始,每一行列出了一個分區的信息,如它是第幾個分區,這個分區的leader是哪個broker,副本位於哪些broker,有哪些副本處理同步狀態。
4、啟動一個console producer,用於在console中模擬輸入消息
bin/kafka-console-producer.sh --broker-list 192.168.172.111:9092 --topic test_topic
5、啟動一個console consumer,用於模擬接收消息,並在console中輸出
bin/kafka-console-consumer.sh --zookeeper 192.168.172.111:2181/kafka --topic test_topic
此腳本可以用於驗證一個topic的數據情況,看消息是否正常流入等。
6、刪除一個topic
bin/kafka-topics.sh --delete --zookeeper 192.168.172.98:2181/kafka --topic test_topic
(1)配置文件中必須delete.topic.enable=true,否則只會標記為刪除,而不是真正刪除。
(2)執行此腳本的時候,topic的數據會同時被刪除。如果由於某些原因導致topic的數據不能完全刪除(如其中一個broker down了),此時topic只會被marked for deletion,而不會真正刪除。此時創建同名的topic會有沖突。
7、修改topic
使用—-alert原則上可以修改任何配置,以下列出了一些常用的修改選項:
(1)改變分區數量
bin/kafka-topics.sh --alter --zookeeper 192.168.172.98:2181/kafka --topic test_topic --partitions 4
(2)增加、修改或者刪除一個配置參數
bin/kafka-topics.sh —alter --zookeeper 192.168.172.98:2181/kafka --topic my_topic_name --config key=value
bin/kafka-topics.sh —alter --zookeeper 192.168.172.98:2181/kafka --topic my_topic_name --deleteConfig key
(五)某個broker掛掉,本機器可重啟
【結論】如果一個broker掛掉,且可以重啟則處理步驟如下:
(1)重啟kafka進程
(2)執行rebalance(由於已經設置配置項自動執行balance,因此此步驟一般可忽略)
詳細分析見下面操作過程。
1、topic的情況
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic
Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test_topic Partition: 0 Leader: 5 Replicas: 5,2 Isr: 5,2
Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
集群中有4台機器,id為【2-5】,topic 有3個分區,每個分區2個副本,leader分別位於2,3,5中。
2、模擬機器down,kill掉進程
分區0的leader位於id=5的broker中,kill掉這台機器的kafka進程
kill -9 ****
3、再次查看topic的情況
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic
Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test_topic Partition: 0 Leader: 2 Replicas: 5,2 Isr: 2
Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
可以看到,分區0的leader已經移到id=2的機器上了,它的副本位於2,5這2台機器上,但處於同步狀態的只有id=2這台機器。
4、重啟kafka進程
bin/kafka-server-start.sh config/server.properties &
5、再次查看狀態
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic
Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test_topic Partition: 0 Leader: 2 Replicas: 5,2 Isr: 2,5
Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
發現分區0的2個副本都已經處於同步狀態,但leader依然為id=2的broker。
6、執行leader平衡
詳見leader的平衡部分。
bin/kafka-preferred-replica-election.sh --zookeeper 192.168.172.98:2181/kafka
如果配置文件中
auto.leader.rebalance.enable=true
則此步驟不需要執行。
7、重新查看topic
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic
Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test_topic Partition: 0 Leader: 5 Replicas: 5,2 Isr: 2,5
Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
此時leader已經回到了id=5的broker,一切恢復正常。
(六)某個broker掛掉且無法重啟,需要其它機器代替
【結論】當一個broker掛掉,需要換機器時,采用以下步驟:
1、將新機器kafka配置文件中的broker.id設置為與原機器一樣
2、啟動kafka,注意kafka保存數據的目錄不會自動創建,需要手工創建
詳細分析過程如下:
1、初始化機器,主要包括用戶創建,kafka文件的復制等。
2、修改config/server.properties文件
注意,只需要修改一個配置broker.id,且此配置必須與掛掉的那台機器相同,因為kafka是通過broker.id來區分集群中的機器的。此處設為
broker.id=5
3、查看topic的當前狀態
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic
Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test_topic Partition: 0 Leader: 5 Replicas: 5,2 Isr: 2,5
Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
當前topic有3個分區,其中分區1的leader位於id=5的機器上。
4、關掉id=5的機器
kill -9 ** 用於模擬機器突然down
或者:
bin/kafka-server-stop.sh
用於正常關閉
5、查看topic的狀態
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic
Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test_topic Partition: 0 Leader: 2 Replicas: 5,2 Isr: 2
Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
可見,topic的分區0的leader已經遷移到了id=2的機器上,且處於同步的機器只有一個了。
6、啟動新機器
nohup bin/kafka-server-start.sh config/server.properties &
7、再看topic的狀態
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic
Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test_topic Partition: 0 Leader: 2 Replicas: 5,2 Isr: 2,5
Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
id=5的機器也處於同步狀態了,但還需要將leader恢復到這台機器上。
8、執行leader平衡
詳見leader的平衡部分。
bin/kafka-preferred-replica-election.sh --zookeeper 192.168.172.98:2181/kafka
如果配置文件中
auto.leader.rebalance.enable=true
則此步驟不需要執行。
9、done
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic
Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test_topic Partition: 0 Leader: 5 Replicas: 5,2 Isr: 2,5
Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
所有內容都恢復了
(七)擴容
將一台機器加入kafka集群很容易,只需要為它分配一個獨立的broker id,然后啟動它即可。但是這些新加入的機器上面並沒有任何的分區數據,所以除非將現有數據移動這些機器上,否則它不會做任何工作,直到創建新topic。因此,當你往集群加入機器時,你應該將其它機器上的一部分數據往這台機器遷移。
數據遷移的工作需要手工初始化,然后自動完成。它的原理如下:當新機器起來后,kafka將其它機器的一些分區復制到這個機器上,並作為follower,當這個新機器完成復制並成為in-sync狀態后,那些被復制的分區的一個副本會被刪除。(都不會成為leader?)
1、將新機器kafka配置文件中的broker.id設置為與原機器一樣
2、啟動kafka,注意kafka保存數據的目錄不會自動創建,需要手工創建
此時新建的topic都會優先分配leader到新增的機器上,但原有的topic不會將分區遷移過來。
3、數據遷移,請見數據遷移部分。
(八)數據遷移
以下步驟用於將現有數據遷移到新的broker中,假設需要將test_topic與streaming_ma30_sdc的全部分區遷移到新的broker中(id 為6和7)
1、創建一個json文件,用於指定哪些topic將被遷移過去
cat topics-to-move.json
{"topics": [
{"topic": "test_topic"},
{"topic": "streaming_ma30_sdc"}
],
"version":1
}
注意全角,半角符號,或者中文引號之類的問題。
2、先generate遷移后的結果,檢查一下是不是你要想的效果
bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --topics-to-move-json-file topics-to-move.json --broker-list "6,7" —generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"streaming_ma30_sdc","partition":2,"replicas":[2]},{"topic":"test_topic","partition":0,"replicas":[5,2]},{"topic":"test_topic","partition":2,"replicas":[3,4]},{"topic":"streaming_ma30_sdc","partition":1,"replicas":[5]},{"topic":"streaming_ma30_sdc","partition":0,"replicas":[4]},{"topic":"test_topic","partition":1,"replicas":[2,3]},{"topic":"streaming_ma30_sdc","partition":3,"replicas":[3]},{"topic":"streaming_ma30_sdc","partition":4,"replicas":[4]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[7,6]},{"topic":"streaming_ma30_sdc","partition":2,"replicas":[7]},{"topic":"test_topic","partition":2,"replicas":[7,6]},{"topic":"streaming_ma30_sdc","partition":1,"replicas":[6]},{"topic":"test_topic","partition":1,"replicas":[6,7]},{"topic":"streaming_ma30_sdc","partition":0,"replicas":[7]},{"topic":"streaming_ma30_sdc","partition":4,"replicas":[7]},{"topic":"streaming_ma30_sdc","partition":3,"replicas":[6]}]}
分別列出了當前的狀態以及遷移后的狀態。
把這2個json分別保存下來,第一個用來萬一需要roll back的時候使用,第二個用來執行遷移。
3、執行遷移
bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --reassignment-json-file expand-cluster-reassignment.json --execute
其中expand-cluster-reassignment.json為保存上面第二段json的文件。
4、查看遷移過程
bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [streaming_ma30_sdc,0] is still in progress
Reassignment of partition [streaming_ma30_sdc,4] is still in progress
Reassignment of partition [test_topic,2] completed successfully
Reassignment of partition [test_topic,0] completed successfully
Reassignment of partition [streaming_ma30_sdc,3] is still in progress
Reassignment of partition [streaming_ma30_sdc,1] is still in progress
Reassignment of partition [test_topic,1] completed successfully
Reassignment of partition [streaming_ma30_sdc,2] is still in progress
5、當所有遷移的完成后,查看一下結果是不是你想要的
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic
Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test_topic Partition: 0 Leader: 7 Replicas: 7,6 Isr: 6,7
Topic: test_topic Partition: 1 Leader: 6 Replicas: 6,7 Isr: 6,7
Topic: test_topic Partition: 2 Leader: 7 Replicas: 7,6 Isr: 6,7
完成
以上步驟將整個topic遷移,也可以只遷移其中一個或者多個分區。
以下將test_topic的分區1移到broker id為2,3的機器,分區2移到broker id為4,5的機器.
【其實還是整個topic遷移好一點,不然准備遷移文件會很麻煩】
1、准備遷移配置文件
cat custom-reassignment.json
{"version":1,"partitions":[{"topic":"test_topic","partition":1,"replicas":[2,3]},{"topic":"test_topic","partition":2,"replicas":[4,5]}]}
3、執行遷移
bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --reassignment-json-file custom-reassignment.json --execute
4、查看遷移過程
bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --reassignment-json-file custom-reassignment.json --verify
5、查看遷移結果
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic
(九)機器下線
當一個機器下線時,kafka並不會自動將這台機器上的副本遷移到其它機器上,因此,我們需要手工進行遷移。這個過程會相當的無聊,kafka打算在0.8.2版本中添加此特性。
有了嗎?再找找。如果只是替換機器則不會有這個問題。
(十)增加副本數量
Increasing replication factor
Increasing the replication factor of an existing partition is easy. Just specify the extra replicas in the custom reassignment json file and use it with the --execute option to increase the replication factor of the specified partitions.
For instance, the following example increases the replication factor of partition 0 of topic foo from 1 to 3. Before increasing the replication factor, the partition's only replica existed on broker 5. As part of increasing the replication factor, we will add more replicas on brokers 6 and 7.
The first step is to hand craft the custom reassignment plan in a json file-
cat increase-replication-factor.json
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
Then, use the json file with the --execute option to start the reassignment process-
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
The --verify option can be used with the tool to check the status of the partition reassignment. Note that the same increase-replication-factor.json (used with the --execute option) should be used with the --verify option
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition [foo,0] completed successfully
You can also verify the increase in replication factor with the kafka-topics tool-
bin/kafka-topics.sh --zookeeper localhost:2181 --topic foo --describe
Topic:foo PartitionCount:1 ReplicationFactor:3 Configs:
Topic: foo Partition: 0 Leader: 5 Replicas: 5,6,7 Isr: 5,6,7
(十一)leader的平衡
當一個broker down掉時,所有本來將它作為leader的分區會被將leader轉移到其它broker。這意味着當這個broker重啟時,它將不再擔任何分區的leader,kafka的client也不會從這個broker來讀取消息,導致資源的浪費。
為了避免這種情況的發生,kafka增加了一個標記:優先副本(preferred replicas)。如果一個分區有3個副本,且這3個副本的優先級別分別為1,5,9,則1會作為leader。為了使kafka集群恢復默認的leader,需要運行以下命令:
bin/kafka-preferred-replica-election.sh --zookeeper 192.168.172.98:2181/kafka
或者可以設置以下配置項,leader 會自動執行balance:
auto.leader.rebalance.enable=true
這配置默認即為空,但需要經過一段時間后才會觸發,約半小時。