Apche Kafka 的生與死 – failover 機制詳解


Kafka 作為 high throughput 的消息中間件,以其性能,簡單和穩定性,成為當前實時流處理框架中的主流的基礎組件。

當然在使用 Kafka 中也碰到不少問題,尤其是 failover 的問題,常常給大家帶來不少困擾和麻煩。
所以在梳理完 kafka 源碼的基礎上,盡量用通俗易懂的方式,把 Kafka 發生 failover 時的機制解釋清楚,讓大家在使用和運維中,做到心中有數。

如果對 kafka 不了解的,可以先參考https://kafka.apache.org/08/design.html,有個大致的概念。

 

0 背景

這里討論 kafka 的 failover 的前提是在0.8版本后, kafka 提供了 replica 機制。
對於0.7版本不存在 failover 的說法,因為任意一個 broker dead 都會導致上面的數據不可讀,從而導致服務中斷。

下面簡單的介紹一下 0.8中加入的 replica 機制和相應的組件,

Replica 機制

基本思想大同小異,如下圖 (Ref.2):

image_thumb3

 

圖中有4個 kafka brokers,並且Topic1有四個 partition(用藍色表示)分布在4個 brokers 上,為 leader replica;
且每個 partition 都有兩個 follower replicas(用橘色表示),分布在和 leader replica 不同的 brokers。
這個分配算法很簡單,有興趣的可以參考kafka的design。

 

Replica 組件

為了支持replica機制,主要增加的兩個組件是,Replica Manager和Controller, 如下圖:

image_thumb1

 

Replica Manager

每個 broker server 都會創建一個 Replica Manager,所有的數據的讀寫都需要經過它 ,
0.7版本,kafka 會直接從 LogManager 中讀數據,但在增加 replica 機制后,只有 leader replica 可以響應數據的讀寫請求 。
所以,Replica Manager 需要管理所有 partition 的 replica 狀態,並響應讀寫請求,以及其他和 replica 相關的操作。

 

Controller

大家可以看到,每個 partition 都有一個 leader replica,和若干的 follower replica,那么誰來決定誰是leader?
你說有 zookeeper,但用 zk 為每個 partition 做 elect,效率太低,而且 zk 會不堪重負;
所以現在的通用做法是,只用 zk 選一個 master 節點,然后由這個 master 節點來做其他的所有仲裁工作。
kafka 的做法就是在 brokers 中選出一個作為 controller,來做為 master 節點,從而仲裁所有的 partition 的 leader 選舉。

 

下面我們會從如下幾個方面來解釋 failover 機制,
先從 client 的角度看看當 kafka 發生 failover 時,數據一致性問題。
然后從 Kafka 的各個重要組件,Zookeeper,Broker, Controller 發生 failover 會造成什么樣的影響?
最后給出一些判斷 kafka 狀態的 tips。

 

1 從 Client 的角度

從 producer 的角度, 發的數據是否會丟?

除了要打開 replica 機制,還取決於 produce 的 request.required.acks 的設置,

  • acks = 0,發就發了,不需要 ack,無論成功與否 ;
  • acks = 1,當寫 leader replica 成功后就返回,其他的 replica 都是通過fetcher去異步更新的,當然這樣會有數據丟失的風險,如果leader的數據沒有來得及同步,leader掛了,那么會丟失數據;
  • acks = –1, 要等待所有的replicas都成功后,才能返回;這種純同步寫的延遲會比較高。

所以,一般的情況下,thoughput 優先,設成1,在極端情況下,是有可能丟失數據的;
如果可以接受較長的寫延遲,可以選擇將 acks 設為 –1。

 

從 consumer 的角度, 是否會讀到不一致的數據?

首先無論是 high-level 或 low-level consumer,我們要知道他是怎么從 kafka 讀數據的?

image_thumb12

kafka 的 log patition 存在文件中,並以 offset 作為索引,所以 consumer 需要對於每個 partition 記錄上次讀到的 offset (high-level和low-level的區別在於是 kafka 幫你記,還是你自己記);

所以如果 consumer dead,重啟后只需要繼續從上次的 offset 開始讀,那就不會有不一致的問題。

但如果是 Kafka broker dead,並發生 partition leader 切換,如何保證在新的 leader 上這個 offset 仍然有效? 
Kafka 用一種機制,即 committed offset,來保證這種一致性,如下圖(Ref.2)

image_thumb13

log 除了有 log end offset 來表示 log 的末端,還有一個 committed offset, 表示有效的 offset;
committed offset 只有在所有 replica 都同步完該 offset 后,才會被置為該offset;
所以圖中 committed 置為2, 因為 broker3 上的 replica 還沒有完成 offset 3 的同步;
所以這時,offset 3 的 message 對 consumer 是不可見的,consumer最多只能讀到 offset 2。
如果此時,leader dead,無論哪個 follower 重新選舉成 leader,都不會影響數據的一致性,因為consumer可見的offset最多為2,而這個offset在所有的replica上都是一致的。

所以在一般正常情況下,當 kafka 發生 failover 的時候,consumer 是不會讀到不一致數據的。特例的情況就是,當前 leader 是唯一有效的 replica,其他replica都處在完全不同步狀態,這樣發生 leader 切換,一定是會丟數據的,並會發生 offset 不一致。

 

2 Zookeeper Failover

Kafka 首先對於 zookeeper 是強依賴,所以 zookeeper 發生異常時,會對數據造成如何的影響?

Zookeeper Dead

如果 zookeeper dead,broker 是無法啟動的,報如下的異常:

image_thumb16

這種異常,有可能是 zookeeper dead,也有可能是網絡不通,總之就是連不上 zookeeper。
這種 case,kafka完全不工作,直到可以連上 zookeeper 為止。

 

Zookeeper Hang

其實上面這種情況比較簡單,比較麻煩的是 zookeeper hang,可以說 kafka 的80%以上問題都是由於這個原因
zookeeper hang 的原因有很多,主要是 zk 負載過重,zk 所在主機 cpu,memeory 或網絡資源不夠等

zookeeper hang 帶來的主要問題就是 session timeout,這樣會觸發如下的問題,

a. Controller Fail,Controller 發生重新選舉和切換,具體過程參考下文。

b. Broker Fail,導致partition的leader發生切換或partition offline,具體過程參考下文。

c. Broker 被 hang 住 。
這是一種比較特殊的 case,出現時在 server.log 會出現如下的log,

server.log:
“INFO I wrote this conflicted ephemeral node [{"jmx_port":9999,"timestamp":"1444709  63049","host":"10.151.4.136","version":1,"port":9092}] at /brokers/ids/1 a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)”

這個問題本身是由於 zookeeper 的一個 bug,參考:https://issues.apache.org/jira/browse/ZOOKEEPER-1740

問題在於“The current behavior of zookeeper for ephemeral nodes is that session expiration and ephemeral node deletion is not an atomic operation.”
zk 的 session 過期和 ephemeral node 刪除並不是一個原子操作;
出現的case如下:

  • 在極端case下,zk 觸發了 session timeout,但還沒來得及完成 /brokers/ids/1 節點的刪除,就被 hang 住了,比如是去做很耗時的 fsync 操作 。
  • 但是 broker 1 收到 session timeout 事件后,會嘗試重新去 zk 上創建 /brokers/ids/1 節點,可這時舊的節點仍然存在,所以會得到 NodeExists,其實這個是不合理的,因為既然 session timeout,這個節點就應該不存在。
  • 通常的做法,既然已經存在,我就不管了,該干啥干啥去;問題是一會 zk 從 fsync hang 中恢復了,他會記得還有一個節點沒有刪除,這時會去把 /brokers/ids/1 節點刪除。
  • 結果就是對於client,雖然沒有再次收到 session 過期的事件,但是 /brokers/ids/1 節點卻不存在了。

所以這里做的處理是,在前面發現 NodeExists 時,while true 等待,一直等到 zk 從 hang 中恢復刪除該節點,然后創建新節點成功,才算完;
這樣做的結果是這個broker也會被一直卡在這兒,等待該節點被成功創建。

 

3 Broker Failover

Broker 的 Failover,可以分為兩個過程,一個是 broker failure, 一個是 broker startup。

新加 broker

在談failover之前,我們先看一個更簡單的過程,就是新加一個全新的 broker:
首先明確,新加的 broker 對現存所有的 topic 和 partition,不會有任何影響
因為一個 topic 的 partition 的所有 replica 的 assignment 情況,在創建時就決定了,並不會自動發生變化,除非你手動的去做 reassignment。
所以新加一個 broker,所需要做的只是大家同步一下元數據,大家都知道來了一個新的 broker,當你創建新的 topic 或 partition 的時候,它會被用上。

 

Broker Failure

首先明確,這里的 broker failure,並不一定是 broker server 真正的 dead了, 只是指該 broker 所對應的 zk ephemeral node ,比如/brokers/ids/1,發生 session timeout;
當然發生這個的原因,除了server dead,還有很多,比如網絡不通;但是我們不關心,只要出現 sessioin timeout,我們就認為這個 broker 不工作了;
會出現如下的log,

controller.log:
“INFO [BrokerChangeListener on Controller 1]: Newly added brokers: 3, deleted brokers: 4, all live brokers: 3,2,1 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)”
“INFO [Controller 1]: Broker failure callback for 4 (kafka.controller.KafkaController)”

當一個 broker failure 會影響什么,其實對於多 replicas 場景,一般對最終客戶沒啥影響。
只會影響哪些 leader replica 在該 broker 的 partitions; 需要重新做 leader election,如果無法選出一個新的 leader,會導致 partition offline。
因為如果只是 follow replica failure,不會影響 partition 的狀態,還是可以服務的,只是可用 replica 少了一個;需要注意的是,kafka 是不會自動補齊失敗的replica的,即壞一個少一個;
但是對於 leader replica failure,就需要重新再 elect leader,前面已經討論過,新選取出的 leader 是可以保證 offset 一致性的;

Note: 其實這里的一致性是有前提的,即除了 fail 的 leader,在 ISR(in-sync replicas) 里面還存在其他的 replica;顧名思義,ISR,就是能 catch up with leader 的 replica。
雖然 partition 在創建的時候,會分配一個 AR(assigned replicas),但是在運行的過程中,可能會有一些 replica 由於各種原因無法跟上 leader,這樣的 replica 會被從 ISR 中去除。
所以 ISR <= AR;
如果,ISR 中 沒有其他的 replica,並且允許 unclean election,那么可以從 AR 中選取一個 leader,但這樣一定是丟數據的,無法保證 offset 的一致性。

 

 

Broker Startup

這里的 startup,就是指 failover 中的 startup,會出現如下的log,

controller.log:
“INFO [BrokerChangeListener on Controller 1]: Newly added brokers: 3, deleted brokers: 4, all live brokers: 3,2,1 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)”
“INFO [Controller 1]: New broker startup callback for
3 (kafka.controller.KafkaController)”

過程也不復雜,先將該 broker 上的所有的 replica 設為 online,然后觸發 offline partition 或 new partition 的 state 轉變為 online;
所以 broker startup,只會影響 offline partition 或 new partition,讓他們有可能成為 online
那么對於普通的已經 online partition,影響只是多一個可用的 replica,那還是在它完成catch up,被加入 ISR 后的事。

Note: Partition 的 leader 在 broker failover 后,不會馬上自動切換回來,這樣會產生的問題是,broker間負載不均衡,因為所有的讀寫都需要通過 leader。
為了解決這個問題,在server的配置中有個配置,auto.leader.rebalance.enable,將其設為true;
這樣 Controller 會啟動一個 scheduler 線程,定期去為每個 broker 做 rebalance,即發現如果該 broker 上的 imbalance ratio 達到一定比例,就會將其中的某些 partition 的 leader,進行重新 elect 到原先的 broker 上。

 

4 Controller Failover

前面說明過,某個 broker server 會被選出作為 Controller,這個選舉的過程就是依賴於 zookeeper 的 ephemeral node,誰可以先在"/controller"目錄創建節點,誰就是 controller;
所以反之,我們也是 watch 這個目錄來判斷 Controller 是否發生 failover 或 變化。Controller 發生 failover 時,會出現如下 log:

controller.log:
“INFO [SessionExpirationListener on 1], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)”

Controller 主要是作為 master 來仲裁 partition 的 leader 的,並維護 partition 和 replicas 的狀態機,以及相應的 zk 的 watcher 注冊;

Controller 的 failover 過程如下:

  • 試圖去在“/controller” 目錄搶占創建 ephemeral node;
  • 如果已經有其他的 broker 先創建成功,那么說明新的 controller 已經誕生,更新當前的元數據即可;
  • 如果自己創建成功,說明我已經成為新的 controller,下面就要開始做初始化工作,
  • 初始化主要就是創建和初始化 partition 和 replicas 的狀態機,並對 partitions 和 brokers 的目錄的變化設置 watcher。

可以看到,單純 Controller 發生 failover,是不會影響正常數據讀寫的,只是 partition 的 leader 無法被重新選舉,如果此時有 partition 的 leader fail,會導致 partition offline;
但是 Controller 的 dead,往往是伴隨着 broker 的 dead,所以在 Controller 發生 failover 的過程中,往往會出現 partition offline, 導致數據暫時不可用。

 

5 Tips

Kafka 提供一些工具來方便的查看信息,參考:Kafka Tools

a, 驗證topic 是否work?

最簡單的方式,就是用 producer 和 consumer console 來測試

Producer console,如下可以往 localhost 的 topic test,插入兩條 message,

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
This is a message
This is another message

Consumer console,如下就可以把剛寫入的 message 讀出,

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

如果整個過程沒有報錯,ok,說明你的topic是可以工作的

 

b, 再看看topic是否健康?

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

這樣會打印出 topic test 的 detail 信息,如圖,

image_thumb2

從這個圖可以說明幾個問題:

首先,topic 有幾個 partitions,並且 replicas factor 是多少,即有幾個 replica?
圖中分別有32個 partitions,並且每個 partition 有兩個 replica。

再者,每個 partition 的 replicas 都被分配到哪些 brokers 上,並且該 partition 的 leader 是誰
比如,圖中的 partition0,replicas 被分配到 brokers 4和1上面,其中 leader replica 在 broker 1 上。

最后,是否健康
從以下幾個方面依次表明健康程度,

  • Isr 為空,說明這個 partition 已經 offline 無法提供服務了,這種 case 在我們的圖中沒有出現;
  • Isr 有數據,但是 Isr < Replicas,這種情況下對於用戶是沒有感知的,但是說明有部分 replicas 已經出問題了,至少是暫時無法和 leader 同步;比如,圖中的 partition0,Isr 只有1,說明 replica 4 已經 offline
  • Isr = Replicas,但是 leader 不是 Replicas 中的第一個 replica,這個說明 leader 是發生過重新選取的,這樣可能會導致 brokers 負載不均衡;比如,圖中的 partition9,leader是2,而不是3,說明雖然當前它的所有 replica 都是正常的,但之前發生過重新選舉。

 

c,最后就是看kafka的日志,kafka/logs

主要是看 controller.log 和 server.log,分別記錄 controller 和 broker server 的日志。
然后根據前面我給的每種異常的日志,你可以看出來到底是出現什么問題。

 

Reference

1. https://kafka.apache.org/08/design.html

2. Neha NarkhedeHands-free Kafka Replication: A lesson in operational simplicity

3. Kafka Tools


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM