Kafka 作為 high throughput 的消息中間件,以其性能,簡單和穩定性,成為當前實時流處理框架中的主流的基礎組件。
當然在使用 Kafka 中也碰到不少問題,尤其是 failover 的問題,常常給大家帶來不少困擾和麻煩。
所以在梳理完 kafka 源碼的基礎上,盡量用通俗易懂的方式,把 Kafka 發生 failover 時的機制解釋清楚,讓大家在使用和運維中,做到心中有數。
如果對 kafka 不了解的,可以先參考https://kafka.apache.org/08/design.html,有個大致的概念。
0 背景
這里討論 kafka 的 failover 的前提是在0.8版本后, kafka 提供了 replica 機制。
對於0.7版本不存在 failover 的說法,因為任意一個 broker dead 都會導致上面的數據不可讀,從而導致服務中斷。
下面簡單的介紹一下 0.8中加入的 replica 機制和相應的組件,
Replica 機制
基本思想大同小異,如下圖 (Ref.2):
圖中有4個 kafka brokers,並且Topic1有四個 partition(用藍色表示)分布在4個 brokers 上,為 leader replica;
且每個 partition 都有兩個 follower replicas(用橘色表示),分布在和 leader replica 不同的 brokers。
這個分配算法很簡單,有興趣的可以參考kafka的design。
Replica 組件
為了支持replica機制,主要增加的兩個組件是,Replica Manager和Controller, 如下圖:
Replica Manager
每個 broker server 都會創建一個 Replica Manager,所有的數據的讀寫都需要經過它 ,
0.7版本,kafka 會直接從 LogManager 中讀數據,但在增加 replica 機制后,只有 leader replica 可以響應數據的讀寫請求 。
所以,Replica Manager 需要管理所有 partition 的 replica 狀態,並響應讀寫請求,以及其他和 replica 相關的操作。
Controller
大家可以看到,每個 partition 都有一個 leader replica,和若干的 follower replica,那么誰來決定誰是leader?
你說有 zookeeper,但用 zk 為每個 partition 做 elect,效率太低,而且 zk 會不堪重負;
所以現在的通用做法是,只用 zk 選一個 master 節點,然后由這個 master 節點來做其他的所有仲裁工作。
kafka 的做法就是在 brokers 中選出一個作為 controller,來做為 master 節點,從而仲裁所有的 partition 的 leader 選舉。
下面我們會從如下幾個方面來解釋 failover 機制,
先從 client 的角度看看當 kafka 發生 failover 時,數據一致性問題。
然后從 Kafka 的各個重要組件,Zookeeper,Broker, Controller 發生 failover 會造成什么樣的影響?
最后給出一些判斷 kafka 狀態的 tips。
1 從 Client 的角度
從 producer 的角度, 發的數據是否會丟?
除了要打開 replica 機制,還取決於 produce 的 request.required.acks 的設置,
- acks = 0,發就發了,不需要 ack,無論成功與否 ;
- acks = 1,當寫 leader replica 成功后就返回,其他的 replica 都是通過fetcher去異步更新的,當然這樣會有數據丟失的風險,如果leader的數據沒有來得及同步,leader掛了,那么會丟失數據;
- acks = –1, 要等待所有的replicas都成功后,才能返回;這種純同步寫的延遲會比較高。
所以,一般的情況下,thoughput 優先,設成1,在極端情況下,是有可能丟失數據的;
如果可以接受較長的寫延遲,可以選擇將 acks 設為 –1。
從 consumer 的角度, 是否會讀到不一致的數據?
首先無論是 high-level 或 low-level consumer,我們要知道他是怎么從 kafka 讀數據的?
kafka 的 log patition 存在文件中,並以 offset 作為索引,所以 consumer 需要對於每個 partition 記錄上次讀到的 offset (high-level和low-level的區別在於是 kafka 幫你記,還是你自己記);
所以如果 consumer dead,重啟后只需要繼續從上次的 offset 開始讀,那就不會有不一致的問題。
但如果是 Kafka broker dead,並發生 partition leader 切換,如何保證在新的 leader 上這個 offset 仍然有效?
Kafka 用一種機制,即 committed offset,來保證這種一致性,如下圖(Ref.2)
log 除了有 log end offset 來表示 log 的末端,還有一個 committed offset, 表示有效的 offset;
committed offset 只有在所有 replica 都同步完該 offset 后,才會被置為該offset;
所以圖中 committed 置為2, 因為 broker3 上的 replica 還沒有完成 offset 3 的同步;
所以這時,offset 3 的 message 對 consumer 是不可見的,consumer最多只能讀到 offset 2。
如果此時,leader dead,無論哪個 follower 重新選舉成 leader,都不會影響數據的一致性,因為consumer可見的offset最多為2,而這個offset在所有的replica上都是一致的。
所以在一般正常情況下,當 kafka 發生 failover 的時候,consumer 是不會讀到不一致數據的。特例的情況就是,當前 leader 是唯一有效的 replica,其他replica都處在完全不同步狀態,這樣發生 leader 切換,一定是會丟數據的,並會發生 offset 不一致。
2 Zookeeper Failover
Kafka 首先對於 zookeeper 是強依賴,所以 zookeeper 發生異常時,會對數據造成如何的影響?
Zookeeper Dead
如果 zookeeper dead,broker 是無法啟動的,報如下的異常:
這種異常,有可能是 zookeeper dead,也有可能是網絡不通,總之就是連不上 zookeeper。
這種 case,kafka完全不工作,直到可以連上 zookeeper 為止。
Zookeeper Hang
其實上面這種情況比較簡單,比較麻煩的是 zookeeper hang,可以說 kafka 的80%以上問題都是由於這個原因
zookeeper hang 的原因有很多,主要是 zk 負載過重,zk 所在主機 cpu,memeory 或網絡資源不夠等
zookeeper hang 帶來的主要問題就是 session timeout,這樣會觸發如下的問題,
a. Controller Fail,Controller 發生重新選舉和切換,具體過程參考下文。
b. Broker Fail,導致partition的leader發生切換或partition offline,具體過程參考下文。
c. Broker 被 hang 住 。
這是一種比較特殊的 case,出現時在 server.log 會出現如下的log,
server.log:
“INFO I wrote this conflicted ephemeral node [{"jmx_port":9999,"timestamp":"1444709 63049","host":"10.151.4.136","version":1,"port":9092}] at /brokers/ids/1 a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)”
這個問題本身是由於 zookeeper 的一個 bug,參考:https://issues.apache.org/jira/browse/ZOOKEEPER-1740
問題在於“The current behavior of zookeeper for ephemeral nodes is that session expiration and ephemeral node deletion is not an atomic operation.”
即 zk 的 session 過期和 ephemeral node 刪除並不是一個原子操作;
出現的case如下:
- 在極端case下,zk 觸發了 session timeout,但還沒來得及完成 /brokers/ids/1 節點的刪除,就被 hang 住了,比如是去做很耗時的 fsync 操作 。
- 但是 broker 1 收到 session timeout 事件后,會嘗試重新去 zk 上創建 /brokers/ids/1 節點,可這時舊的節點仍然存在,所以會得到 NodeExists,其實這個是不合理的,因為既然 session timeout,這個節點就應該不存在。
- 通常的做法,既然已經存在,我就不管了,該干啥干啥去;問題是一會 zk 從 fsync hang 中恢復了,他會記得還有一個節點沒有刪除,這時會去把 /brokers/ids/1 節點刪除。
- 結果就是對於client,雖然沒有再次收到 session 過期的事件,但是 /brokers/ids/1 節點卻不存在了。
所以這里做的處理是,在前面發現 NodeExists 時,while true 等待,一直等到 zk 從 hang 中恢復刪除該節點,然后創建新節點成功,才算完;
這樣做的結果是這個broker也會被一直卡在這兒,等待該節點被成功創建。
3 Broker Failover
Broker 的 Failover,可以分為兩個過程,一個是 broker failure, 一個是 broker startup。
新加 broker
在談failover之前,我們先看一個更簡單的過程,就是新加一個全新的 broker:
首先明確,新加的 broker 對現存所有的 topic 和 partition,不會有任何影響;
因為一個 topic 的 partition 的所有 replica 的 assignment 情況,在創建時就決定了,並不會自動發生變化,除非你手動的去做 reassignment。
所以新加一個 broker,所需要做的只是大家同步一下元數據,大家都知道來了一個新的 broker,當你創建新的 topic 或 partition 的時候,它會被用上。
Broker Failure
首先明確,這里的 broker failure,並不一定是 broker server 真正的 dead了, 只是指該 broker 所對應的 zk ephemeral node ,比如/brokers/ids/1,發生 session timeout;
當然發生這個的原因,除了server dead,還有很多,比如網絡不通;但是我們不關心,只要出現 sessioin timeout,我們就認為這個 broker 不工作了;
會出現如下的log,
controller.log:
“INFO [BrokerChangeListener on Controller 1]: Newly added brokers: 3, deleted brokers: 4, all live brokers: 3,2,1 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)”
“INFO [Controller 1]: Broker failure callback for 4 (kafka.controller.KafkaController)”
當一個 broker failure 會影響什么,其實對於多 replicas 場景,一般對最終客戶沒啥影響。
只會影響哪些 leader replica 在該 broker 的 partitions; 需要重新做 leader election,如果無法選出一個新的 leader,會導致 partition offline。
因為如果只是 follow replica failure,不會影響 partition 的狀態,還是可以服務的,只是可用 replica 少了一個;需要注意的是,kafka 是不會自動補齊失敗的replica的,即壞一個少一個;
但是對於 leader replica failure,就需要重新再 elect leader,前面已經討論過,新選取出的 leader 是可以保證 offset 一致性的;
Note: 其實這里的一致性是有前提的,即除了 fail 的 leader,在 ISR(in-sync replicas) 里面還存在其他的 replica;顧名思義,ISR,就是能 catch up with leader 的 replica。
雖然 partition 在創建的時候,會分配一個 AR(assigned replicas),但是在運行的過程中,可能會有一些 replica 由於各種原因無法跟上 leader,這樣的 replica 會被從 ISR 中去除。
所以 ISR <= AR;
如果,ISR 中 沒有其他的 replica,並且允許 unclean election,那么可以從 AR 中選取一個 leader,但這樣一定是丟數據的,無法保證 offset 的一致性。
Broker Startup
這里的 startup,就是指 failover 中的 startup,會出現如下的log,
controller.log:
“INFO [BrokerChangeListener on Controller 1]: Newly added brokers: 3, deleted brokers: 4, all live brokers: 3,2,1 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)”
“INFO [Controller 1]: New broker startup callback for 3 (kafka.controller.KafkaController)”
過程也不復雜,先將該 broker 上的所有的 replica 設為 online,然后觸發 offline partition 或 new partition 的 state 轉變為 online;
所以 broker startup,只會影響 offline partition 或 new partition,讓他們有可能成為 online。
那么對於普通的已經 online partition,影響只是多一個可用的 replica,那還是在它完成catch up,被加入 ISR 后的事。
Note: Partition 的 leader 在 broker failover 后,不會馬上自動切換回來,這樣會產生的問題是,broker間負載不均衡,因為所有的讀寫都需要通過 leader。
為了解決這個問題,在server的配置中有個配置,auto.leader.rebalance.enable,將其設為true;
這樣 Controller 會啟動一個 scheduler 線程,定期去為每個 broker 做 rebalance,即發現如果該 broker 上的 imbalance ratio 達到一定比例,就會將其中的某些 partition 的 leader,進行重新 elect 到原先的 broker 上。
4 Controller Failover
前面說明過,某個 broker server 會被選出作為 Controller,這個選舉的過程就是依賴於 zookeeper 的 ephemeral node,誰可以先在"/controller"目錄創建節點,誰就是 controller;
所以反之,我們也是 watch 這個目錄來判斷 Controller 是否發生 failover 或 變化。Controller 發生 failover 時,會出現如下 log:
controller.log:
“INFO [SessionExpirationListener on 1], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)”
Controller 主要是作為 master 來仲裁 partition 的 leader 的,並維護 partition 和 replicas 的狀態機,以及相應的 zk 的 watcher 注冊;
Controller 的 failover 過程如下:
- 試圖去在“/controller” 目錄搶占創建 ephemeral node;
- 如果已經有其他的 broker 先創建成功,那么說明新的 controller 已經誕生,更新當前的元數據即可;
- 如果自己創建成功,說明我已經成為新的 controller,下面就要開始做初始化工作,
- 初始化主要就是創建和初始化 partition 和 replicas 的狀態機,並對 partitions 和 brokers 的目錄的變化設置 watcher。
可以看到,單純 Controller 發生 failover,是不會影響正常數據讀寫的,只是 partition 的 leader 無法被重新選舉,如果此時有 partition 的 leader fail,會導致 partition offline;
但是 Controller 的 dead,往往是伴隨着 broker 的 dead,所以在 Controller 發生 failover 的過程中,往往會出現 partition offline, 導致數據暫時不可用。
5 Tips
Kafka 提供一些工具來方便的查看信息,參考:Kafka Tools
a, 驗證topic 是否work?
最簡單的方式,就是用 producer 和 consumer console 來測試
Producer console,如下可以往 localhost 的 topic test,插入兩條 message,
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message
Consumer console,如下就可以把剛寫入的 message 讀出,
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
如果整個過程沒有報錯,ok,說明你的topic是可以工作的
b, 再看看topic是否健康?
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
這樣會打印出 topic test 的 detail 信息,如圖,
從這個圖可以說明幾個問題:
首先,topic 有幾個 partitions,並且 replicas factor 是多少,即有幾個 replica?
圖中分別有32個 partitions,並且每個 partition 有兩個 replica。
再者,每個 partition 的 replicas 都被分配到哪些 brokers 上,並且該 partition 的 leader 是誰?
比如,圖中的 partition0,replicas 被分配到 brokers 4和1上面,其中 leader replica 在 broker 1 上。
最后,是否健康?
從以下幾個方面依次表明健康程度,
- Isr 為空,說明這個 partition 已經 offline 無法提供服務了,這種 case 在我們的圖中沒有出現;
- Isr 有數據,但是 Isr < Replicas,這種情況下對於用戶是沒有感知的,但是說明有部分 replicas 已經出問題了,至少是暫時無法和 leader 同步;比如,圖中的 partition0,Isr 只有1,說明 replica 4 已經 offline
- Isr = Replicas,但是 leader 不是 Replicas 中的第一個 replica,這個說明 leader 是發生過重新選取的,這樣可能會導致 brokers 負載不均衡;比如,圖中的 partition9,leader是2,而不是3,說明雖然當前它的所有 replica 都是正常的,但之前發生過重新選舉。
c,最后就是看kafka的日志,kafka/logs
主要是看 controller.log 和 server.log,分別記錄 controller 和 broker server 的日志。
然后根據前面我給的每種異常的日志,你可以看出來到底是出現什么問題。
Reference
1. https://kafka.apache.org/08/design.html
2. Neha Narkhede,Hands-free Kafka Replication: A lesson in operational simplicity
3. Kafka Tools