Kafka集群搭建及操作


  在《kafka在windows上的安裝、運行》一文中,我們已初步對kafka有了一個感性的認識,但在實際應用中都是在Linux系統上通過集群方式來使用的。

  下面我們來進一步進行Kafka集群搭建及操作。

一.環境說明

  這里沒有使用kafka自帶的zookeeper,而是獨立安裝了一個zookeeper集群,zookeeper集群的搭建詳見《zookeeper集群搭建及Leader選舉算法源碼解析》。

  zookeeper集群的機器分別是:10.255.34.78、10.255.34.74、10.255.34.76

  kafka集群的機器分別是:10.255.34.141、10.255.34.145、10.255.34.78

  即10.255.34.78機器上同時搭建了zookeeper和kafka,其它機器均只搭建zookeeper或kafka。

 

二.kafka集群搭建

1.從官網下載kafka

  我這里下載的是kafka_2.12-2.0.0.tgz包,上傳到Linux服務器上,運行tar -xzf kafka_2.12-2.0.0.tgz解壓。

2.修改配置文件kafka/config/server.properties

  這里是在10.255.34.145機器上進行配置。

  a.配置broker的ID

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=12

  10.255.34.78配置broker.id=11,10.255.34.145配置broker.id=12,10.255.34.141配置broker.id=13。

  b.打開監聽端口

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://10.255.34.145:9092

  每台服務配置自已機器的IP。

  c.修改log的目錄

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/data/app/kafka_2.12-2.0.0/kafka-logs

  d.修改zookeeper.connect

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=10.255.34.78:2181,10.255.34.76:2181,10.255.34.74:2181/kafka

  這里配置的是zookeeper群集的IP和端口。

  這里需要說明的是,默認Kafka會使用ZooKeeper默認的/路徑,這樣有關Kafka的ZooKeeper配置就會散落在根路徑下面,如果你有其他的應用也在使用ZooKeeper集群,查看ZooKeeper中數據可能會不直觀,所以強烈建議指定一個chroot路徑,直接在 zookeeper.connect配置項中指定。

zookeeper.connect=10.255.34.78:2181,10.255.34.76:2181,10.255.34.74:2181/kafka

  而且,需要手動在ZooKeeper中創建路徑/kafka,使用如下命令連接到任意一台 ZooKeeper服務器:

cd ~/zookeeper
bin/zkCli.sh
然后輸入:create /kafka ''

  就創建了chroot路徑,這樣,每次連接Kafka集群的時候(使用--zookeeper選項),也必須使用帶chroot路徑的連接字符串,后面會看到。

  這個命令連接到zookeeper的任何一台機器執行,zookeeper會自動同步到其它集群中的機器。【在其它機器上也執行會提示Node already exists】

3.在其它機器上進行配置

  兩種方法:

  a.將配置好的安裝文件同步到其他的節點上,在這里是10.255.34.141、10.255.34.78機器,然后修改一下差異化變量brokerid。

  b.在其它節點10.255.34.141、10.255.34.78機器上按上面步驟進行配置,10.255.34.141配置broker.id=13,10.255.34.78配置broker.id=11。

  因為Kafka集群需要保證各個Broker的id在整個集群中必須唯一,需要調整這個配置項的值,且經測試發現broker.id的值只能為數字(如果在單機上,可以通過建立多個Broker進程來模擬分布式的Kafka集群,也需要Broker的id唯一,還需要修改一些配置目錄的信息)。

 

三. 啟動集群並創建topic

1.啟動集群

  在kafka的三台機器上(10.255.34.141、10.255.34.145、10.255.34.78)分別啟動Kafka,分別執行如下命令:

bin/kafka-server-start.sh config/server.properties &

  可以通過查看日志,或者檢查進程狀態,保證Kafka集群啟動成功,啟動的時候注意看錯誤,jps看到Kafka就表示服務已經啟動了。

2.創建一個名稱為bj-replicated-topic5的Topic,5個分區,並且復制因子為3,執行如下命令:

bin/kafka-topics.sh --create --zookeeper 10.255.34.78:2181,10.255.34.74:2181,10.255.34.76:2181/kafka --replication-factor 3 --partitions 5 --topic bj-replicated-topic5

3.創建一個名稱為bj-replicated-topic5的Topic,5個分區,並且復制因子為3,執行如下命令:

bin/kafka-topics.sh --create --zookeeper 10.255.34.78:2181,10.255.34.74:2181,10.255.34.76:2181/kafka --replication-factor 3 --partitions 5 --topic bj-replicated-topic5

  當然,這里也可以只指定一台zookeeper機器,因為集群之間會自動同步。如下所示:

bin/kafka-topics.sh --create --zookeeper 10.255.34.76:2181/kafka --replication-factor 3 --partitions 5 --topic bj2-replicated-topic5

  a.zookeeper指定其中一個節點即可,集群之間會自動同步。

  b.–replication-factor 3 –partitions 5理論上應該是可選參數,但此腳本必須寫這2個參數。
  c.還可以使用–config 來指定topic的某個具體參數,以代替配置文件中的參數。如:

bin/kafka-topics.sh –create –zookeeper 10.255.34.76:2181/kafka –replication-factor 3 –partitions 5 –topic test_topic retention.bytes=3298534883328

  指定了某個topic最大的保留日志量,單位是字節。

4.查看全部topic

bin/kafka-topics.sh -list -zookeeper 10.255.34.76:2181/kafka

5.查看某個topic的詳細信息

bin/kafka-topics.sh --describe --zookeeper 10.255.34.76:2181/kafka --topic bj-replicated-topic5
bin/kafka-topics.sh --describe --zookeeper 10.255.34.78:2181,10.255.34.74:2181,10.255.34.76:2181/kafka --topic bj-replicated-topic5

  第一行列出了這個topic的總體情況,如topic名稱,分區數量,副本數量等。
  第二行開始,每一行列出了一個分區的信息,如它是第幾個分區,這個分區的leader是哪個broker,副本位於哪些broker,有哪些副本處理同步狀態。

  上面Leader、Replicas、Isr的含義如下:

  a.Partition:分區
  b.Leader:負責讀寫指定分區的節點
  c.Replicas:復制該分區log的節點列表
  d.Isr:"in-sync" replicas,當前活躍的副本列表(是一個子集),並且可能成為Leader
  我們可以通過Kafka自帶的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh腳本,來驗證演示如果發布消息、消費消息。

6.在一個終端,啟動Producer,並向我們上面創建的名稱為bj-replicated-topic5的Topic中生產消息,執行如下腳本:

bin/kafka-console-producer.sh --broker-list 10.255.34.141:9092,10.255.34.145:9092,10.255.34.78:9092 --topic bj-replicated-topic5

7.在另一個終端,啟動Consumer,並訂閱我們上面創建的名稱為bj-replicated-topic5的Topic中生產的消息,執行如下腳本:

bin/kafka-console-consumer.sh --bootstrap-server 10.255.34.141:9092,10.255.34.145:9092,10.255.34.78:9092  --topic bj-replicated-topic5

  可以在Producer終端上輸入字符串消息行,就可以在Consumer終端上看到消費者消費的消息內容。

  也可以參考Kafka的Producer和Consumer的Java API,通過API編碼的方式來實現消息生產和消費的處理邏輯。

8.修改topic

  使用--alert原則上可以修改任何配置,以下列出了一些常用的修改選項:
  a.改變分區數量

bin/kafka-topics.sh --alter --zookeeper 10.255.34.76:2181/kafka --topic bj2-replicated-topic5 --partitions 6

  由上面的截圖實操來看,partitions只能改大,不能改小。改完后再次查下這個topic詳情如下:

  b.增加、修改或者刪除一個配置參數

bin/kafka-topics.sh —alter --zookeeper 10.255.34.76:2181/kafka --topic my_topic_name --config key=value
bin/kafka-topics.sh —alter --zookeeper 10.255.34.76:2181/kafka --topic my_topic_name --deleteConfig key

 9.刪除一個topic

bin/kafka-topics.sh --delete --zookeeper 10.255.34.76:2181/kafka --topic bj2-replicated-topic5

  a.配置文件中必須delete.topic.enable=true,否則只會標記為刪除,而不是真正刪除。

  b.執行此腳本的時候,topic的數據會同時被刪除。如果由於某些原因導致topic的數據不能完全刪除(如其中一個broker down了),此時topic只會被marked for deletion,而不會真正刪除。此時創建同名的topic會有沖突。

  在這里,由於在kafka_2.12-2.0.0/config/server.properties中未設置delete.topic.enable=true配置,執行刪除命令后,只是在對應的topic后面打一個-marked for deletion標識。

  如果在kafka_2.12-2.0.0/config/server.properties中增加了delete.topic.enable=true配置

10.停止集群

  在kafka的三台機器上(10.255.34.141、10.255.34.145、10.255.34.78)分別停止Kafka,分別執行如下命令:

bin/kafka-server-stop.sh

  jps看不到Kafka進程就表示服務已經停掉了。

 

四.高可用

1.某個broker掛掉,本機器可重啟

  如果一個broker掛掉,且可以重啟則處理步驟如下:

a.重啟kafka進程
b.執行rebalance(由於已經設置配置項自動執行balance,因此此步驟一般可忽略)

  詳細分析見下面操作過程:

a.topic的情況

[app@VM_34_145_centos kafka_2.12-2.0.0]$ bin/kafka-topics.sh --describe --zookeeper 10.255.34.76:2181/kafka --topic bj-replicated-topic5
Topic:bj-replicated-topic5      PartitionCount:5        ReplicationFactor:3     Configs:
        Topic: bj-replicated-topic5     Partition: 0    Leader: 11      Replicas: 11,12,13      Isr: 11,13,12
        Topic: bj-replicated-topic5     Partition: 1    Leader: 12      Replicas: 12,13,11      Isr: 13,11,12
        Topic: bj-replicated-topic5     Partition: 2    Leader: 13      Replicas: 13,11,12      Isr: 13,11,12
        Topic: bj-replicated-topic5     Partition: 3    Leader: 11      Replicas: 11,13,12      Isr: 11,13,12
        Topic: bj-replicated-topic5     Partition: 4    Leader: 12      Replicas: 12,11,13      Isr: 11,13,12

  集群中有3台機器,id為【11-13】,topic 有5個分區,每個分區3個副本,leader分別位於11,12,13,11,12中。

b.模擬機器down,kill掉進程
  分區0的leader位於id=11的broker中,kill掉這台機器的kafka進程。

c.再次查看topic的情況

[app@VM_34_145_centos kafka_2.12-2.0.0]$ bin/kafka-topics.sh --describe --zookeeper 10.255.34.76:2181/kafka --topic bj-replicated-topic5
Topic:bj-replicated-topic5      PartitionCount:5        ReplicationFactor:3     Configs:
        Topic: bj-replicated-topic5     Partition: 0    Leader: 12      Replicas: 11,12,13      Isr: 13,12
        Topic: bj-replicated-topic5     Partition: 1    Leader: 12      Replicas: 12,13,11      Isr: 13,12
        Topic: bj-replicated-topic5     Partition: 2    Leader: 13      Replicas: 13,11,12      Isr: 13,12
        Topic: bj-replicated-topic5     Partition: 3    Leader: 13      Replicas: 11,13,12      Isr: 13,12
        Topic: bj-replicated-topic5     Partition: 4    Leader: 12      Replicas: 12,11,13      Isr: 13,12

  可以看到,分區0的leader已經移到id=12的機器上了,它的副本位於11,12,13這3台機器上,但處於同步狀態的只有id=13和id=12這兩台機器。分區3的leader已經移到id=13的機器上了,它的副本位於11,13,12這3台機器上,但處於同步狀態的只有id=13和id=12這兩台機器。

d.重啟kafka進程

bin/kafka-server-start.sh config/server.properties &

e.再次查看狀態

[app@VM_34_145_centos kafka_2.12-2.0.0]$ bin/kafka-topics.sh --describe --zookeeper 10.255.34.76:2181/kafka --topic bj-replicated-topic5
Topic:bj-replicated-topic5      PartitionCount:5        ReplicationFactor:3     Configs:
        Topic: bj-replicated-topic5     Partition: 0    Leader: 12      Replicas: 11,12,13      Isr: 13,12,11
        Topic: bj-replicated-topic5     Partition: 1    Leader: 12      Replicas: 12,13,11      Isr: 13,12,11
        Topic: bj-replicated-topic5     Partition: 2    Leader: 13      Replicas: 13,11,12      Isr: 13,12,11
        Topic: bj-replicated-topic5     Partition: 3    Leader: 13      Replicas: 11,13,12      Isr: 13,12,11
        Topic: bj-replicated-topic5     Partition: 4    Leader: 12      Replicas: 12,11,13      Isr: 13,12,11

  發現分區0的3個副本都已經處於同步狀態,但leader依然為id=12的broker。

f.執行leader平衡

bin/kafka-preferred-replica-election.sh --zookeeper 10.255.34.76:2181/kafka

  如果配置文件中:

auto.leader.rebalance.enable=true

  則此步驟不需要執行。

g.重新查看topic

[app@VM_34_145_centos kafka_2.12-2.0.0]$ bin/kafka-topics.sh --describe --zookeeper 10.255.34.76:2181/kafka --topic bj-replicated-topic5
Topic:bj-replicated-topic5      PartitionCount:5        ReplicationFactor:3     Configs:
        Topic: bj-replicated-topic5     Partition: 0    Leader: 11      Replicas: 11,12,13      Isr: 13,12,11
        Topic: bj-replicated-topic5     Partition: 1    Leader: 12      Replicas: 12,13,11      Isr: 13,12,11
        Topic: bj-replicated-topic5     Partition: 2    Leader: 13      Replicas: 13,11,12      Isr: 13,12,11
        Topic: bj-replicated-topic5     Partition: 3    Leader: 11      Replicas: 11,13,12      Isr: 13,12,11
        Topic: bj-replicated-topic5     Partition: 4    Leader: 12      Replicas: 12,11,13      Isr: 13,12,11

  此時分區0和分區3的leader已經回到了id=11的broker,一切恢復正常。

2.某個broker掛掉且無法重啟,需要其它機器代替

  當一個broker掛掉,需要換機器時,采用以下步驟:

a.將新機器kafka配置文件中的broker.id設置為與原機器一樣
b.啟動kafka,注意kafka保存數據的目錄不會自動創建,需要手工創建

  詳細分析過程如下:

a.初始化機器,主要包括用戶創建,kafka文件的復制等。
b.修改config/server.properties文件
  注意,只需要修改一個配置broker.id,且此配置必須與掛掉的那台機器相同,因為kafka是通過broker.id來區分集群中的機器的。此處設為

broker.id=5

c.查看topic的當前狀態

bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic
    Topic:test_topic        PartitionCount:3        ReplicationFactor:2     Configs:
    Topic: test_topic       Partition: 0    Leader: 5       Replicas: 5,2   Isr: 2,5
    Topic: test_topic       Partition: 1    Leader: 2       Replicas: 2,3   Isr: 2,3
    Topic: test_topic       Partition: 2    Leader: 3       Replicas: 3,4   Isr: 3,4

  當前topic有3個分區,其中分區1的leader位於id=5的機器上。

d.關掉id=5的機器
  kill -9 ** 用於模擬機器突然down
  或者:

bin/kafka-server-stop.sh

  用於正常關閉

e.查看topic的狀態

bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic
    Topic:test_topic        PartitionCount:3        ReplicationFactor:2     Configs:
    Topic: test_topic       Partition: 0    Leader: 2       Replicas: 5,2   Isr: 2
    Topic: test_topic       Partition: 1    Leader: 2       Replicas: 2,3   Isr: 2,3
    Topic: test_topic       Partition: 2    Leader: 3       Replicas: 3,4   Isr: 3,4

  可見,topic的分區0的leader已經遷移到了id=2的機器上,且處於同步的機器只有一個了。

f.啟動新機器

nohup bin/kafka-server-start.sh config/server.properties

g.再看topic的狀態

bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic
    Topic:test_topic        PartitionCount:3        ReplicationFactor:2     Configs:
    Topic: test_topic       Partition: 0    Leader: 2       Replicas: 5,2   Isr: 2,5
    Topic: test_topic       Partition: 1    Leader: 2       Replicas: 2,3   Isr: 2,3
    Topic: test_topic       Partition: 2    Leader: 3       Replicas: 3,4   Isr: 3,4

  id=5的機器也處於同步狀態了,但還需要將leader恢復到這台機器上。

h.執行leader平衡

bin/kafka-preferred-replica-election.sh –zookeeper 192.168.172.98:2181/kafka

  如果配置文件中

auto.leader.rebalance.enable=true

  則此步驟不需要執行。

i.done

 bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic
    Topic:test_topic        PartitionCount:3        ReplicationFactor:2     Configs:
    Topic: test_topic       Partition: 0    Leader: 5       Replicas: 5,2   Isr: 2,5
    Topic: test_topic       Partition: 1    Leader: 2       Replicas: 2,3   Isr: 2,3
    Topic: test_topic       Partition: 2    Leader: 3       Replicas: 3,4   Isr: 3,4

  所有內容都恢復了

 

五.擴容

  將一台機器加入kafka集群很容易,只需要為它分配一個獨立的broker id,然后啟動它即可。但是這些新加入的機器上面並沒有任何的分區數據,所以除非將現有數據移動這些機器上,否則它不會做任何工作,直到創建新topic。因此,當你往集群加入機器時,你應該將其它機器上的一部分數據往這台機器遷移。

  數據遷移的工作需要手工初始化,然后自動完成。它的原理如下:當新機器起來后,kafka將其它機器的一些分區復制到這個機器上,並作為follower,當這個新機器完成復制並成為in-sync狀態后,那些被復制的分區的一個副本會被刪除。(都不會成為leader?)

  1.將新機器kafka配置文件中的broker.id設置為與原機器一樣
  2.啟動kafka,注意kafka保存數據的目錄不會自動創建,需要手工創建
    此時新建的topic都會優先分配leader到新增的機器上,但原有的topic不會將分區遷移過來。
  3.數據遷移,請見數據遷移部分。

 

六.數據遷移

  以下步驟用於將現有數據遷移到新的broker中,假設需要將test_topic與streaming_ma30_sdc的全部分區遷移到新的broker中(id 為6和7)
1.創建一個json文件,用於指定哪些topic將被遷移過去
  cat topics-to-move.json

{"topics": [
 {"topic": "test_topic"},
 {"topic": "streaming_ma30_sdc"}
 ],
 "version":1
}

注意全角,半角符號,或者中文引號之類的問題。

2.先generate遷移后的結果,檢查一下是不是你要想的效果

bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --topics-to-move-json-file topics-to-move.json --broker-list "6,7" —generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"streaming_ma30_sdc","partition":2,"replicas":[2]},{"topic":"test_topic","partition":0,"replicas":[5,2]},{"topic":"test_topic","partition":2,"replicas":[3,4]},{"topic":"streaming_ma30_sdc","partition":1,"replicas":[5]},{"topic":"streaming_ma30_sdc","partition":0,"replicas":[4]},{"topic":"test_topic","partition":1,"replicas":[2,3]},{"topic":"streaming_ma30_sdc","partition":3,"replicas":[3]},{"topic":"streaming_ma30_sdc","partition":4,"replicas":[4]}]}
 
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[7,6]},{"topic":"streaming_ma30_sdc","partition":2,"replicas":[7]},{"topic":"test_topic","partition":2,"replicas":[7,6]},{"topic":"streaming_ma30_sdc","partition":1,"replicas":[6]},{"topic":"test_topic","partition":1,"replicas":[6,7]},{"topic":"streaming_ma30_sdc","partition":0,"replicas":[7]},{"topic":"streaming_ma30_sdc","partition":4,"replicas":[7]},{"topic":"streaming_ma30_sdc","partition":3,"replicas":[6]}]}

  分別列出了當前的狀態以及遷移后的狀態。
  把這2個json分別保存下來,第一個用來萬一需要roll back的時候使用,第二個用來執行遷移。

3.執行遷移

bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --reassignment-json-file expand-cluster-reassignment.json --execute

  其中expand-cluster-reassignment.json為保存上面第二段json的文件。

4.查看遷移過程

bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [streaming_ma30_sdc,0] is still in progress
Reassignment of partition [streaming_ma30_sdc,4] is still in progress
Reassignment of partition [test_topic,2] completed successfully
Reassignment of partition [test_topic,0] completed successfully
Reassignment of partition [streaming_ma30_sdc,3] is still in progress
Reassignment of partition [streaming_ma30_sdc,1] is still in progress
Reassignment of partition [test_topic,1] completed successfully
Reassignment of partition [streaming_ma30_sdc,2] is still in progress

5.當所有遷移的完成后,查看一下結果是不是你想要的

bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic
    Topic:test_topic        PartitionCount:3        ReplicationFactor:2     Configs:
    Topic: test_topic       Partition: 0    Leader: 7       Replicas: 7,6   Isr: 6,7
    Topic: test_topic       Partition: 1    Leader: 6       Replicas: 6,7   Isr: 6,7
    Topic: test_topic       Partition: 2    Leader: 7       Replicas: 7,6   Isr: 6,7

  完成

  以上步驟將整個topic遷移,也可以只遷移其中一個或者多個分區。
以下將test_topic的分區1移到broker id為2,3的機器,分區2移到broker id為4,5的機器。
【其實還是整個topic遷移好一點,不然准備遷移文件會很麻煩】

1.准備遷移配置文件
  cat custom-reassignment.json

{"version":1,"partitions":[{"topic":"test_topic","partition":1,"replicas":[2,3]},{"topic":"test_topic","partition":2,"replicas":[4,5]}]}

2.執行遷移

bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --reassignment-json-file custom-reassignment.json --execute

3.查看遷移過程

bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --reassignment-json-file custom-reassignment.json --verify

4.查看遷移結果

bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic

 

七.機器下線

  當一個機器下線時,kafka並不會自動將這台機器上的副本遷移到其它機器上,因此,我們需要手工進行遷移。這個過程會相當的無聊,kafka打算在0.8.2版本中添加此特性。
  有了嗎?再找找。如果只是替換機器則不會有這個問題。

 

八.增加副本數量Increasing replication factor

  Increasing the replication factor of an existing partition is easy. Just specify the extra replicas in the custom reassignment json file and use it with the –execute option to increase the replication factor of the specified partitions.
  For instance, the following example increases the replication factor of partition 0 of topic foo from 1 to 3. Before increasing the replication factor, the partition’s only replica existed on broker 5. As part of increasing the replication factor, we will add more replicas on brokers 6 and 7.

  The first step is to hand craft the custom reassignment plan in a json file

cat increase-replication-factor.json
{“version”:1,
“partitions”:[{“topic”:”foo”,”partition”:0,”replicas”:[5,6,7]}]
}

  Then, use the json file with the –execute option to start the reassignment process

bin/kafka-reassign-partitions.sh –zookeeper localhost:2181 –reassignment-json-file increase-replication-factor.json –execute

  Current partition replica assignment

{“version”:1,
“partitions”:[{“topic”:”foo”,”partition”:0,”replicas”:[5]}]

}

  Save this to use as the –reassignment-json-file option during rollback

  Successfully started reassignment of partitions

{“version”:1,
“partitions”:[{“topic”:”foo”,”partition”:0,”replicas”:[5,6,7]}]

}

  The –verify option can be used with the tool to check the status of the partition reassignment. Note that the same increase-replication-factor.json (used with the –execute option) should be used with the –verify option

bin/kafka-reassign-partitions.sh –zookeeper localhost:2181 –reassignment-json-file increase-replication-factor.json –verify

  Status of partition reassignment:

  Reassignment of partition [foo,0] completed successfully
  You can also verify the increase in replication factor with the kafka-topics tool

bin/kafka-topics.sh –zookeeper localhost:2181 –topic foo –describe
Topic:foo PartitionCount:1 ReplicationFactor:3 Configs:
Topic: foo Partition: 0 Leader: 5 Replicas: 5,6,7 Isr: 5,6,7

 

九.leader的平衡

  當一個broker down掉時,所有本來將它作為leader的分區會被將leader轉移到其它broker。這意味着當這個broker重啟時,它將不再擔任何分區的leader,kafka的client也不會從這個broker來讀取消息,導致資源的浪費。
  為了避免這種情況的發生,kafka增加了一個標記:優先副本(preferred replicas)。如果一個分區有3個副本,且這3個副本的優先級別分別為1,5,9,則1會作為leader。為了使kafka集群恢復默認的leader,需要運行以下命令:

bin/kafka-preferred-replica-election.sh --zookeeper 192.168.172.98:2181/kafka

  或者可以設置以下配置項,leader 會自動執行balance:

auto.leader.rebalance.enable=true

  這配置默認即為空,但需要經過一段時間后才會觸發,約半小時。

 

十.其他

1.日志說明

  默認kafka的日志是保存在kafka_2.12-2.0.0/logs目錄下的,這里說幾個需要注意的日志

  a.server.log #kafka的運行日志
  b.state-change.log #kafka他是用zookeeper來保存狀態,所以他可能會進行切換,切換的日志就保存在這里

  c.controller.log #kafka選擇一個節點作為"controller",當發現有節點down掉的時候它負責在游泳分區的所有節點中選擇新的leader,這使得Kafka可以批量的高效的管理所有分區節點的主從關系。如果controller down掉了,活着的節點中的一個會備切換為新的controller.

2.可以登錄zk來查看zk的目錄情況

  a.使用客戶端進入zk

./zkCli.sh
或
./zkCli.sh -server 127.0.0.1:2181 #默認是不用加'-server'參數,如果改了zk端口才需要加

  b.查看目錄情況

  執行“ls /”

[zk: localhost:2181(CONNECTED) 0] ls /
[cluster, controller_epoch, brokers, zookeeper, kafka, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]

  上面的顯示結果中:zookeeper原生的,consumers, config, controller, isr_change_notification, admin, brokers, controller_epoch都是Kafka創建的

 

文章來源:http://blog.jobbole.com/99195/

https://www.cnblogs.com/eggplantpro/p/8428932.html

https://www.cnblogs.com/luotianshuai/p/5206662.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM