Kafka(三)Kafka的高可用與生產消費過程解析


一  Kafka HA設計解析

1.1 為何需要Replication

  在Kafka在0.8以前的版本中,是沒有Replication的,一旦某一個Broker宕機,則其上所有的Partition數據都不可被消費,這與Kafka數據持久性及Delivery Guarantee的設計目標相悖。同時Producer都不能再將數據存於這些Partition中。

  如果Producer使用同步模式則Producer會在嘗試重新發送message.send.max.retries(默認值為3)次后拋出Exception,用戶可以選擇停止發送后續數據也可選擇繼續選擇發送。而前者會造成數據的阻塞,后者會造成本應發往該Broker的數據的丟失。

  如果Producer使用異步模式,則Producer會嘗試重新發送message.send.max.retries(默認值為3)次后記錄該異常並繼續發送后續數據,這會造成數據丟失並且用戶只能通過日志發現該問題。同時,Kafka的Producer並未對異步模式提供callback接口。

  由此可見,在沒有Replication的情況下,一旦某機器宕機或者某個Broker停止工作則會造成整個系統的可用性降低。隨着集群規模的增加,整個集群中出現該類異常的幾率大大增加,因此對於生產系統而言Replication機制的引入非常重要。

1.2 Leader Election

  引入Replication之后,同一個Partition可能會有多個Replica,而這時需要在這些Replication之間選出一個Leader,Producer和Consumer只與這個Leader交互,其它Replica作為Follower從Leader中復制數據。

  因為需要保證同一個Partition的多個Replica之間的數據一致性(其中一個宕機后其它Replica必須要能繼續服務並且即不能造成數據重復也不能造成數據丟失)。如果沒有一個Leader,所有Replica都可同時讀/寫數據,那就需要保證多個Replica之間互相(N×N條通路)同步數據,數據的一致性和有序性非常難保證,大大增加了Replication實現的復雜性,同時也增加了出現異常的幾率。而引入Leader后,只有Leader負責數據讀寫,Follower只向Leader順序Fetch數據(N條通路),系統更加簡單且高效。

1.3 如何將所有Replica均勻分布到整個集群

為了更好的做負載均衡,Kafka盡量將所有的Partition均勻分配到整個集群上。一個典型的部署方式是一個Topic的Partition數量大於Broker的數量。同時為了提高Kafka的容錯能力,也需要將同一個Partition的Replica盡量分散到不同的機器。實際上,如果所有的Replica都在同一個Broker上,那一旦該Broker宕機,該Partition的所有Replica都無法工作,也就達不到HA的效果。同時,如果某個Broker宕機了,需要保證它上面的負載可以被均勻的分配到其它幸存的所有Broker上。

Kafka分配Replica的算法如下:

1.將所有Broker(假設共n個Broker)和待分配的Partition排序

2.將第i個Partition分配到第(i mod n)個Broker上

3.將第i個Partition的第j個Replica分配到第((i + j) mode n)個Broker上

1.4 Data Replication(副本策略)

Kafka的高可靠性的保障來源於其健壯的副本(replication)策略。

1.4.1 消息傳遞同步策略

Producer在發布消息到某個Partition時,先通過ZooKeeper找到該Partition的Leader,然后無論該Topic的Replication Factor為多少,Producer只將該消息發送到該Partition的Leader。Leader會將該消息寫入其本地Log。每個Follower都從Leader pull數據。這種方式上,Follower存儲的數據順序與Leader保持一致。Follower在收到該消息並寫入其Log后,向Leader發送ACK。一旦Leader收到了ISR中的所有Replica的ACK,該消息就被認為已經commit了,Leader將增加HW並且向Producer發送ACK。

為了提高性能,每個Follower在接收到數據后就立馬向Leader發送ACK,而非等到數據寫入Log中。因此,對於已經commit的消息,Kafka只能保證它被存於多個Replica的內存中,而不能保證它們被持久化到磁盤中,也就不能完全保證異常發生后該條消息一定能被Consumer消費。

Consumer讀消息也是從Leader讀取,只有被commit過的消息才會暴露給Consumer。

Kafka Replication的數據流如下圖所示:

1.4.2 ACK前需要保證有多少個備份

對於Kafka而言,定義一個Broker是否“活着”包含兩個條件:

  • 一是它必須維護與ZooKeeper的session(這個通過ZooKeeper的Heartbeat機制來實現)。
  • 二是Follower必須能夠及時將Leader的消息復制過來,不能“落后太多”。

Leader會跟蹤與其保持同步的Replica列表,該列表稱為ISR(即in-sync Replica)。如果一個Follower宕機,或者落后太多,Leader將把它從ISR中移除。這里所描述的“落后太多”指Follower復制的消息落后於Leader后的條數超過預定值(該值可在$KAFKA_HOME/config/server.properties中通過replica.lag.max.messages配置,其默認值是4000)或者Follower超過一定時間(該值可在$KAFKA_HOME/config/server.properties中通過replica.lag.time.max.ms來配置,其默認值是10000)未向Leader發送fetch請求。

Kafka的復制機制既不是完全的同步復制,也不是單純的異步復制。事實上,完全同步復制要求所有能工作的Follower都復制完,這條消息才會被認為commit,這種復制方式極大的影響了吞吐率(高吞吐率是Kafka非常重要的一個特性)。而異步復制方式下,Follower異步的從Leader復制數據,數據只要被Leader寫入log就被認為已經commit,這種情況下如果Follower都復制完都落后於Leader,而如果Leader突然宕機,則會丟失數據。而Kafka的這種使用ISR的方式則很好的均衡了確保數據不丟失以及吞吐率。Follower可以批量的從Leader復制數據,這樣極大的提高復制性能(批量寫磁盤),極大減少了Follower與Leader的差距。

需要說明的是,Kafka只解決fail/recover,不處理“Byzantine”(“拜占庭”)問題。一條消息只有被ISR里的所有Follower都從Leader復制過去才會被認為已提交。這樣就避免了部分數據被寫進了Leader,還沒來得及被任何Follower復制就宕機了,而造成數據丟失(Consumer無法消費這些數據)。而對於Producer而言,它可以選擇是否等待消息commit,這可以通過request.required.acks來設置。這種機制確保了只要ISR有一個或以上的Follower,一條被commit的消息就不會丟失。

1.4.3 Leader Election算法

Leader選舉本質上是一個分布式鎖,有兩種方式實現基於ZooKeeper的分布式鎖:

  • 節點名稱唯一性:多個客戶端創建一個節點,只有成功創建節點的客戶端才能獲得鎖
  • 臨時順序節點:所有客戶端在某個目錄下創建自己的臨時順序節點,只有序號最小的才獲得鎖

一種非常常用的選舉leader的方式是“Majority Vote”(“少數服從多數”),但Kafka並未采用這種方式。這種模式下,如果我們有2f+1個Replica(包含Leader和Follower),那在commit之前必須保證有f+1個Replica復制完消息,為了保證正確選出新的Leader,fail的Replica不能超過f個。因為在剩下的任意f+1個Replica里,至少有一個Replica包含有最新的所有消息。這種方式有個很大的優勢,系統的latency只取決於最快的幾個Broker,而非最慢那個。Majority Vote也有一些劣勢,為了保證Leader Election的正常進行,它所能容忍的fail的follower個數比較少。如果要容忍1個follower掛掉,必須要有3個以上的Replica,如果要容忍2個Follower掛掉,必須要有5個以上的Replica。也就是說,在生產環境下為了保證較高的容錯程度,必須要有大量的Replica,而大量的Replica又會在大數據量下導致性能的急劇下降。這就是這種算法更多用在ZooKeeper這種共享集群配置的系統中而很少在需要存儲大量數據的系統中使用的原因。例如HDFS的HA Feature是基於majority-vote-based journal,但是它的數據存儲並沒有使用這種方式。

Kafka在ZooKeeper中動態維護了一個ISR(in-sync replicas),這個ISR里的所有Replica都跟上了leader,只有ISR里的成員才有被選為Leader的可能。在這種模式下,對於f+1個Replica,一個Partition能在保證不丟失已經commit的消息的前提下容忍f個Replica的失敗。在大多數使用場景中,這種模式是非常有利的。事實上,為了容忍f個Replica的失敗,Majority Vote和ISR在commit前需要等待的Replica數量是一樣的,但是ISR需要的總的Replica的個數幾乎是Majority Vote的一半。

雖然Majority Vote與ISR相比有不需等待最慢的Broker這一優勢,但是Kafka作者認為Kafka可以通過Producer選擇是否被commit阻塞來改善這一問題,並且節省下來的Replica和磁盤使得ISR模式仍然值得。

1.4.4 如何處理所有Replica都不工作

在ISR中至少有一個follower時,Kafka可以確保已經commit的數據不丟失,但如果某個Partition的所有Replica都宕機了,就無法保證數據不丟失了。這種情況下有兩種可行的方案:

1.等待ISR中的任一個Replica“活”過來,並且選它作為Leader

2.選擇第一個“活”過來的Replica(不一定是ISR中的)作為Leader

這就需要在可用性和一致性當中作出一個簡單的折衷。如果一定要等待ISR中的Replica“活”過來,那不可用的時間就可能會相對較長。而且如果ISR中的所有Replica都無法“活”過來了,或者數據都丟失了,這個Partition將永遠不可用。選擇第一個“活”過來的Replica作為Leader,而這個Replica不是ISR中的Replica,那即使它並不保證已經包含了所有已commit的消息,它也會成為Leader而作為consumer的數據源(前文有說明,所有讀寫都由Leader完成)。Kafka0.8.*使用了第二種方式。根據Kafka的文檔,在以后的版本中,Kafka支持用戶通過配置選擇這兩種方式中的一種,從而根據不同的使用場景選擇高可用性還是強一致性。

1.4.5 選舉Leader

最簡單最直觀的方案是,所有Follower都在ZooKeeper上設置一個Watch,一旦Leader宕機,其對應的ephemeral znode會自動刪除,此時所有Follower都嘗試創建該節點,而創建成功者(ZooKeeper保證只有一個能創建成功)即是新的Leader,其它Replica即為Follower。

但是該方法會有3個問題:

1.split-brain 這是由ZooKeeper的特性引起的,雖然ZooKeeper能保證所有Watch按順序觸發,但並不能保證同一時刻所有Replica“看”到的狀態是一樣的,這就可能造成不同Replica的響應不一致

2.herd effect 如果宕機的那個Broker上的Partition比較多,會造成多個Watch被觸發,造成集群內大量的調整

3.ZooKeeper負載過重 每個Replica都要為此在ZooKeeper上注冊一個Watch,當集群規模增加到幾千個Partition時ZooKeeper負載會過重。

Kafka 0.8.*的Leader Election方案解決了上述問題,它在所有broker中選出一個controller,所有Partition的Leader選舉都由controller決定。controller會將Leader的改變直接通過RPC的方式(比ZooKeeper Queue的方式更高效)通知需為為此作為響應的Broker。同時controller也負責增刪Topic以及Replica的重新分配。

二 Kafka生產過程分析

2.1 寫入方式

producer采用推(push)模式將消息發布到broker,每條消息都被追加(append)到分區(patition)中,屬於順序寫磁盤(順序寫磁盤效率比隨機寫內存要高,保障kafka吞吐率)。

2.2 分區(Partition)

Kafka集群有多個消息代理服務器(broker-server)組成,發布到Kafka集群的每條消息都有一個類別,用主題(topic)來表示。通常,不同應用產生不同類型的數據,可以設置不同的主題。一個主題一般會有多個消息的訂閱者,當生產者發布消息到某個主題時,訂閱了這個主題的消費者都可以接收到生成者寫入的新消息。

Kafka集群為每個主題維護了分布式的分區(partition)日志文件,物理意義上可以把主題(topic)看作進行了分區的日志文件(partition log)。主題的每個分區都是一個有序的、不可變的記錄序列,新的消息會不斷追加到日志中。分區中的每條消息都會按照時間順序分配到一個單調遞增的順序編號,叫做偏移量(offset),這個偏移量能夠唯一地定位當前分區中的每一條消息。

消息發送時都被發送到一個topic,其本質就是一個目錄,而topic是由一些Partition Logs(分區日志)組成,其組織結構如下圖所示:

下圖中的topic有3個分區,每個分區的偏移量都從0開始,不同分區之間的偏移量都是獨立的,不會相互影響。 

我們可以看到,每個Partition中的消息都是有序的,生產的消息被不斷追加到Partition log上,其中的每一個消息都被賦予了一個唯一的offset值。

發布到Kafka主題的每條消息包括鍵值和時間戳。消息到達服務器端的指定分區后,都會分配到一個自增的偏移量。原始的消息內容和分配的偏移量以及其他一些元數據信息最后都會存儲到分區日志文件中。消息的鍵也可以不用設置,這種情況下消息會均衡地分布到不同的分區。

1) 分區的原因

(1)方便在集群中擴展,每個Partition可以通過調整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應任意大小的數據了;

(2)可以提高並發,因為可以以Partition為單位讀寫了。

傳統消息系統在服務端保持消息的順序,如果有多個消費者消費同一個消息隊列,服務端會以消費存儲的順序依次發送給消費者。但由於消息是異步發送給消費者的,消息到達消費者的順序可能是無序的,這就意味着在並行消費時,傳統消息系統無法很好地保證消息被順序處理。雖然我們可以設置一個專用的消費者只消費一個隊列,以此來解決消息順序的問題,但是這就使得消費處理無法真正執行。

Kafka比傳統消息系統有更強的順序性保證,它使用主題的分區作為消息處理的並行單元。Kafka以分區作為最小的粒度,將每個分區分配給消費者組中不同的而且是唯一的消費者,並確保一個分區只屬於一個消費者,即這個消費者就是這個分區的唯一讀取線程。那么,只要分區的消息是有序的,消費者處理的消息順序就有保證。每個主題有多個分區,不同的消費者處理不同的分區,所以Kafka不僅保證了消息的有序性,也做到了消費者的負載均衡。

2)分區的原則

(1)指定了patition,則直接使用;

(2)未指定patition但指定key,通過對key的value進行hash出一個patition

(3)patition和key都未指定,使用輪詢選出一個patition。

DefaultPartitioner類

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

2.3 副本(Replication)

同一個partition可能會有多個replication(對應 server.properties 配置中的 default.replication.factor=N)。沒有replication的情況下,一旦broker 宕機,其上所有 patition 的數據都不可被消費,同時producer也不能再將數據存於其上的patition。引入replication之后,同一個partition可能會有多個replication,而這時需要在這些replication之間選出一個leader,producer和consumer只與這個leader交互,其它replication作為follower從leader 中復制數據。

2.4  寫入流程

 producer寫入消息流程如下:

 

1)producer先從zookeeper的 "/brokers/.../state"節點找到該partition的leader

2)producer將消息發送給該leader

3)leader將消息寫入本地log

4)followers從leader pull消息,寫入本地log后向leader發送ACK

5)leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)並向producer發送ACK

三 broker保存消息

3.1 存儲方式

物理上把 topic 分成一個或多個 patition(對應 server.properties 中的 num.partitions=3 配置),每個 patition 物理上對應一個文件夾(該文件夾存儲該 patition 的所有消息和索引文件),如下:

3.2 存儲策略

無論消息是否被消費,kafka 都會保留所有消息。有兩種策略可以刪除舊數據:

1、 基於時間:log.retention.hours=168 
2、 基於大小:log.retention.bytes=1073741824

需要注意的是,因為Kafka讀取特定消息的時間復雜度為O(1),即與文件大小無關,所以這里刪除過期文件與提高 Kafka 性能無關。

3.3Zookeeper存儲結構

admin

該目錄下znode只有在有相關操作時才會存在,操作結束時會將其刪除

/admin/reassign_partitions用於將一些Partition分配到不同的broker集合上。對於每個待重新分配的Partition,Kafka會在該znode上存儲其所有的Replica和相應的Broker id。該znode由管理進程創建並且一旦重新分配成功它將會被自動移除。

broker

即/brokers/ids/[brokerId])存儲“活着”的broker信息。

topic注冊信息(/brokers/topics/[topic]),存儲該topic的所有partition的所有replica所在的broker id,第一個replica即為preferred replica,對一個給定的partition,它在同一個broker上最多只有一個replica,因此broker id可作為replica id。

controller

/controller -> int (broker id of the controller)存儲當前controller的信息

/controller_epoch -> int (epoch)直接以整數形式存儲controller epoch,而非像其它znode一樣以JSON字符串形式存儲。

四 Kafka消費過程分析

kafka提供了兩套consumer API:高級Consumer API和低級API。

4.1消費模型

消息由生產者發布到Kafka集群后,會被消費者消費。消息的消費模型有兩種:推送模型(push)和拉取模型(pull)。

基於推送模型(push)的消息系統,由消息代理記錄消費者的消費狀態。消息代理在將消息推送到消費者后,標記這條消息為已消費,但這種方式無法很好地保證消息被處理。比如,消息代理把消息發送出去后,當消費進程掛掉或者由於網絡原因沒有收到這條消息時,就有可能造成消息丟失(因為消息代理已經把這條消息標記為已消費了,但實際上這條消息並沒有被實際處理)。如果要保證消息被處理,消息代理發送完消息后,要設置狀態為“已發送”,只有收到消費者的確認請求后才更新為“已消費”,這就需要消息代理中記錄所有的消費狀態,這種做法顯然是不可取的。

Kafka采用拉取模型,由消費者自己記錄消費狀態,每個消費者互相獨立地順序讀取每個分區的消息。如下圖所示,有兩個消費者(不同消費者組)拉取同一個主題的消息,消費者A的消費進度是3,消費者B的消費進度是6。消費者拉取的最大上限通過最高水位(watermark)控制,生產者最新寫入的消息如果還沒有達到備份數量,對消費者是不可見的。這種由消費者控制偏移量的優點是:消費者可以按照任意的順序消費消息。比如,消費者可以重置到舊的偏移量,重新處理之前已經消費過的消息;或者直接跳到最近的位置,從當前的時刻開始消費。

在一些消息系統中,消息代理會在消息被消費之后立即刪除消息。如果有不同類型的消費者訂閱同一個主題,消息代理可能需要冗余地存儲同一消息;或者等所有消費者都消費完才刪除,這就需要消息代理跟蹤每個消費者的消費狀態,這種設計很大程度上限制了消息系統的整體吞吐量和處理延遲。Kafka的做法是生產者發布的所有消息會一致保存在Kafka集群中,不管消息有沒有被消費。用戶可以通過設置保留時間來清理過期的數據,比如,設置保留策略為兩天。那么,在消息發布之后,它可以被不同的消費者消費,在兩天之后,過期的消息就會自動清理掉。

4.2高級API

1)高級API優點

高級API 寫起來簡單

不需要自行去管理offset,系統通過zookeeper自行管理。

不需要管理分區,副本等情況,.系統自動管理。

消費者斷線會自動根據上一次記錄在zookeeper中的offset去接着獲取數據(默認設置1分鍾更新一下zookeeper中存的offset)

可以使用group來區分對同一個topic 的不同程序訪問分離開來(不同的group記錄不同的offset,這樣不同程序讀取同一個topic才不會因為offset互相影響)

2)高級API缺點

不能自行控制offset(對於某些特殊需求來說)

不能細化控制如分區、副本、zk等

4.3低級API

1)低級 API 優點

能夠讓開發者自己控制offset,想從哪里讀取就從哪里讀取。

自行控制連接分區,對分區自定義進行負載均衡

對zookeeper的依賴性降低(如:offset不一定非要靠zk存儲,自行存儲offset即可,比如存在文件或者內存中)

2)低級API缺點

太過復雜,需要自行控制offset,連接哪個分區,找到分區leader 等。

4.4消費者組

消費者是以consumer group消費者組的方式工作,由一個或者多個消費者組成一個組,共同消費一個topic。每個分區在同一時間只能由group中的一個消費者讀取,但是多個group可以同時消費這個partition。在圖中,有一個由三個消費者組成的group,有一個消費者讀取主題中的兩個分區,另外兩個分別讀取一個分區。某個消費者讀取某個分區,也可以叫做某個消費者是某個分區的擁有者。

在這種情況下,消費者可以通過水平擴展的方式同時讀取大量的消息。另外,如果一個消費者失敗了,那么其他的group成員會自動負載均衡讀取之前失敗的消費者讀取的分區。

4.5 消費方式

consumer采用pull(拉)模式從broker中讀取數據。

push(推)模式很難適應消費速率不同的消費者,因為消息發送速率是由broker決定的。它的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而pull模式則可以根據consumer的消費能力以適當的速率消費消息。

對於Kafka而言,pull模式更合適,它可簡化broker的設計,consumer可自主控制消費消息的速率,同時consumer可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現不同的傳輸語義。

pull模式不足之處是,如果kafka沒有數據,消費者可能會陷入循環中,一直等待數據到達。為了避免這種情況,我們在我們的拉請求中有參數,允許消費者請求在等待數據到達的“長輪詢”中進行阻塞(並且可選地等待到給定的字節數,以確保大的傳輸大小)。

4.6消費者組案例

1)需求:測試同一個消費者組中的消費者,同一時刻只能有一個消費者消費。

2)案例實操

(1)在node21、node22上修改/opt/module/kafka/config/consumer.properties配置文件中的group.id屬性為任意組名。

[root@node22 config]$ vi consumer.properties

group.id=admin

(2)在node21、node22上分別啟動消費者

[root@node21 kafka]$ bin/kafka-console-consumer.sh --zookeeper node21:2181,node22:2181,node23:2181  --topic first --consumer.config config/consumer.properties

[root@node22 kafka]$ bin/kafka-console-consumer.sh --zookeeper node21:2181,node22:2181,node23:2181  --topic first --consumer.config config/consumer.properties

(3)在node23上啟動生產者

[root@node23 kafka]$ bin/kafka-console-producer.sh --broker-list node21:9092 --topic first

>hello world

(4)查看node21和node22的接收者。

同一時刻只有一個消費者接收到消息。

五 Topic的創建和刪除

5.1 創建topic

創建 topic 的序列圖如下所示:

流程說明:

1、 controller 在 ZooKeeper 的 /brokers/topics 節點上注冊 watcher,當 topic 被創建,則 controller 會通過 watch 得到該 topic 的 partition/replica 分配。
2、 controller從 /brokers/ids 讀取當前所有可用的 broker 列表,對於 set_p 中的每一個 partition:
     2.1、 從分配給該 partition 的所有 replica(稱為AR)中任選一個可用的 broker 作為新的 leader,並將AR設置為新的 ISR 
     2.2、 將新的 leader 和 ISR 寫入 /brokers/topics/[topic]/partitions/[partition]/state 
3、 controller 通過 RPC 向相關的 broker 發送 LeaderAndISRRequest。

5.2 刪除topic

刪除 topic 的序列圖如下所示:

流程說明:

1、 controller 在 zooKeeper 的 /brokers/topics 節點上注冊 watcher,當 topic 被刪除,則 controller 會通過 watch 得到該 topic 的 partition/replica 分配。 
2、 若 delete.topic.enable=false,結束;否則 controller 注冊在 /admin/delete_topics 上的 watch 被 fire,controller 通過回調向對應的 broker 發送 StopReplicaRequest。

六 broker failover

kafka broker failover 序列圖如下所示:

流程說明:

1、 controller 在 zookeeper 的 /brokers/ids/[brokerId] 節點注冊 Watcher,當 broker 宕機時 zookeeper 會 fire watch
2、 controller 從 /brokers/ids 節點讀取可用broker 
3、 controller決定set_p,該集合包含宕機 broker 上的所有 partition 
4、 對 set_p 中的每一個 partition 
    4.1、 從/brokers/topics/[topic]/partitions/[partition]/state 節點讀取 ISR 
    4.2、 決定新 leader 
    4.3、 將新 leader、ISR、controller_epoch 和 leader_epoch 等信息寫入 state 節點
5、 通過 RPC 向相關 broker 發送 leaderAndISRRequest 命令

七 controller failover

當 controller 宕機時會觸發 controller failover。每個 broker 都會在 zookeeper 的 "/controller" 節點注冊 watcher,當 controller 宕機時 zookeeper 中的臨時節點消失,所有存活的 broker 收到 fire 的通知,每個 broker 都嘗試創建新的 controller path,只有一個競選成功並當選為 controller。

當新的 controller 當選時,會觸發 KafkaController.onControllerFailover 方法,在該方法中完成如下操作:

1、 讀取並增加 Controller Epoch。 
2、 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注冊 watcher。 
3、 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注冊 watcher。 
4、 通過 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注冊 watcher。 
5、 若 delete.topic.enable=true(默認值是 false),則 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注冊 watcher。 
6、 通過 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注冊Watch。 
7、 初始化 ControllerContext 對象,設置當前所有 topic,“活”着的 broker 列表,所有 partition 的 leader 及 ISR等。 
8、 啟動 replicaStateMachine 和 partitionStateMachine。 
9、 將 brokerState 狀態設置為 RunningAsController。 
10、 將每個 partition 的 Leadership 信息發送給所有“活”着的 broker。 
11、 若 auto.leader.rebalance.enable=true(默認值是true),則啟動 partition-rebalance 線程。 
12、 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,則刪除相應的Topic。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM