副本定義
Kafka 是有主題概念的,而每個主題又進一步划分成若干個分區。副本的概念實際上是在分區層級下定義的,每個分區配置有若干個副本。
所謂副本(Replica),本質就是一個只能追加寫消息的提交日志。根據 Kafka 副本機制的定義,同一個分區下的所有副本保存有相同的消息序列,這些副本分散保存在不同的 Broker 上,從而能夠對抗部分 Broker 宕機帶來的數據不可用。
在實際生產環境中,每台 Broker 都可能保存有各個主題下不同分區的不同副本,因此,單個 Broker 上存有成百上千個副本的現象是非常正常的。接下來我們來看一張圖,它展示的是一個有 3 台 Broker 的 Kafka 集群上的副本分布情況。
從這張圖中,我們可以看到,主題 1 分區 0 的 3 個副本分散在 3 台 Broker 上,其他主題分區的副本也都散落在不同的 Broker 上,從而實現數據冗余。
副本角色
既然分區下能夠配置多個副本,而且這些副本的內容還要一致,那么很自然的一個問題就是:我們該如何確保副本中所有的數據都是一致的呢?
特別是對 Kafka 而言,當生產者發送消息到某個主題后,消息是如何同步到對應的所有副本中的呢?針對這個問題,最常見的解決方案就是采用基於領導者(Leader-based)的副本機制。Apache Kafka 就是這樣的設計。
基於領導者的副本機制的工作原理如下圖所示,我來簡單解釋一下這張圖里面的內容。
- 在 Kafka 中,副本分成兩類:領導者副本(Leader Replica)和追隨者副本(Follower Replica)。每個分區在創建時都要選舉一個副本,稱為領導者副本,其余的副本自動稱為追隨者副本。
- Kafka 的副本機制比其他分布式系統要更嚴格一些。在 Kafka 中,追隨者副本是不對外提供服務的。這就是說,任何一個追隨者副本都不能響應消費者和生產者的讀寫請求。所有的請求都必須由領導者副本來處理,或者說,所有的讀寫請求都必須發往領導者副本所在的 Broker,由該 Broker 負責處理。追隨者副本不處理客戶端請求,它唯一的任務就是從領導者副本異步拉取消息,並寫入到自己的提交日志中,從而實現與領導者副本的同步。
- 當領導者副本掛掉了,或者說領導者副本所在的 Broker 宕機時,Kafka 依托於 ZooKeeper 提供的監控功能能夠實時感知到,並立即開啟新一輪的領導者選舉,從追隨者副本中選一個作為新的領導者。老 Leader 副本重啟回來后,只能作為追隨者副本加入到集群中。
你一定要特別注意上面的第二點,即追隨者副本是不對外提供服務的。還記得剛剛我們談到副本機制的好處時,說過 Kafka 沒能提供讀操作橫向擴展以及改善局部性嗎?具體的原因就在於此。
對於客戶端用戶而言,Kafka 的追隨者副本沒有任何作用,它既不能像 MySQL 那樣幫助領導者副本“抗讀”,也不能實現將某些副本放到離客戶端近的地方來改善數據局部性。
既然如此,Kafka 為什么要這樣設計呢?其實這種副本機制有兩個方面的好處。
1.方便實現“Read-your-writes”。
所謂 Read-your-writes,顧名思義就是,當你使用生產者 API 向 Kafka 成功寫入消息后,馬上使用消費者 API 去讀取剛才生產的消息。舉個例子,比如你平時發微博時,你發完一條微博,肯定是希望能立即看到的,這就是典型的 Read-your-writes 場景。如果允許追隨者副本對外提供服務,由於副本同步是異步的,因此有可能出現追隨者副本還沒有從領導者副本那里拉取到最新的消息,從而使得客戶端看不到最新寫入的消息。
2.方便實現單調讀(Monotonic Reads)。
什么是單調讀呢?就是對於一個消費者用戶而言,在多次消費消息時,它不會看到某條消息一會兒存在一會兒不存在。如果允許追隨者副本提供讀服務,那么假設當前有 2 個追隨者副本 F1 和 F2,它們異步地拉取領導者副本數據。倘若 F1 拉取了 Leader 的最新消息而 F2 還未及時拉取,那么,此時如果有一個消費者先從 F1 讀取消息之后又從 F2 拉取消息,它可能會看到這樣的現象:第一次消費時看到的最新消息在第二次消費時不見了,這就不是單調讀一致性。但是,如果所有的讀請求都是由 Leader 來處理,那么 Kafka 就很容易實現單調讀一致性。
AR(Assigned Replicas)
分區中的所有副本統稱為 AR(Assigned Replicas)。
OSR(Outof-sync Replicas)
ISR(In-sync Replicas)
我們剛剛反復說過,追隨者副本不提供服務,只是定期地異步拉取領導者副本中的數據而已。既然是異步的,就存在着不可能與 Leader 實時同步的風險。在探討如何正確應對這種風險之前,我們必須要精確地知道同步的含義是什么。或者說,Kafka 要明確地告訴我們,追隨者副本到底在什么條件下才算與 Leader 同步。
基於這個想法,Kafka 引入了 In-sync Replicas,也就是所謂的 ISR 副本集合。ISR 中的副本都是與 Leader 同步的副本,相反,不在 ISR 中的追隨者副本就被認為是與 Leader 不同步的。那么,到底什么副本能夠進入到 ISR 中呢?
我們首先要明確的是,Leader 副本天然就在 ISR 中。也就是說,ISR 不只是追隨者副本集合,它必然包括 Leader 副本。甚至在某些情況下,ISR 只有 Leader 這一個副本。
另外,能夠進入到 ISR 的追隨者副本要滿足一定的條件。至於是什么條件,我先賣個關子,我們先來一起看看下面這張圖。
圖中有 3 個副本:1 個領導者副本和 2 個追隨者副本。Leader 副本當前寫入了 10 條消息,Follower1 副本同步了其中的 6 條消息,而 Follower2 副本只同步了其中的 3 條消息。
現在,請你思考一下,對於這 2 個追隨者副本,你覺得哪個追隨者副本與 Leader 不同步?
答案是,要根據具體情況來定。換成英文,就是那句著名的“It depends”。看上去好像 Follower2 的消息數比 Leader 少了很多,它是最有可能與 Leader 不同步的。的確是這樣的,但僅僅是可能。
事實上,這張圖中的 2 個 Follower 副本都有可能與 Leader 不同步,但也都有可能與 Leader 同步。也就是說,Kafka 判斷 Follower 是否與 Leader 同步的標准,不是看相差的消息數,而是另有“玄機”。
這個標准就是 Broker 端參數 replica.lag.time.max.ms 參數值。這個參數的含義是 Follower 副本能夠落后 Leader 副本的最長時間間隔,當前默認值是 10 秒。這就是說,只要一個 Follower 副本落后 Leader 副本的時間不連續超過 10 秒,那么 Kafka 就認為該 Follower 副本與 Leader 是同步的,即使此時 Follower 副本中保存的消息明顯少於 Leader 副本中的消息。
我們在前面說過,Follower 副本唯一的工作就是不斷地從 Leader 副本拉取消息,然后寫入到自己的提交日志中。如果這個同步過程的速度持續慢於 Leader 副本的消息寫入速度,那么在 replica.lag.time.max.ms 時間后,此 Follower 副本就會被認為是與 Leader 副本不同步的,因此不能再放入 ISR 中。此時,Kafka 會自動收縮 ISR 集合,將該副本“踢出”ISR。
值得注意的是,倘若該副本后面慢慢地追上了 Leader 的進度,那么它是能夠重新被加回 ISR 的。這也表明,ISR 是一個動態調整的集合,而非靜態不變的。
Kafka的ISR的管理最終都會反饋到Zookeeper節點上。具體位置為:/brokers/topics/[topic]/partitions/[partition]/state。目前有兩個地方會對這個Zookeeper的節點進行維護:
Controller來維護:Kafka集群中的其中一個Broker會被選舉為Controller,主要負責Partition管理和副本狀態管理,也會執行類似於重分配partition之類的管理任務。在符合某些特定條件下,Controller下的LeaderSelector會選舉新的leader,ISR和新的leader_epoch及controller_epoch寫入Zookeeper的相關節點中。同時發起LeaderAndIsrRequest通知所有的replicas。
Leader來維護:leader有單獨的線程定期檢測ISR中follower是否脫離ISR, 如果發現ISR變化,則會將新的ISR的信息返回到Zookeeper的相關節點中。
Unclean 領導者選舉(Unclean Leader Election)
既然 ISR 是可以動態調整的,那么自然就可以出現這樣的情形:ISR 為空。因為 Leader 副本天然就在 ISR 中,如果 ISR 為空了,就說明 Leader 副本也“掛掉”了,Kafka 需要重新選舉一個新的 Leader。可是 ISR 是空,此時該怎么選舉新 Leader 呢?
Kafka 把所有不在 ISR 中的存活副本都稱為非同步副本。通常來說,非同步副本落后 Leader 太多,因此,如果選擇這些副本作為新 Leader,就可能出現數據的丟失。畢竟,這些副本中保存的消息遠遠落后於老 Leader 中的消息。在 Kafka 中,選舉這種副本的過程稱為 Unclean 領導者選舉。Broker 端參數 unclean.leader.election.enable 控制是否允許 Unclean 領導者選舉。
開啟 Unclean 領導者選舉可能會造成數據丟失,但好處是,它使得分區 Leader 副本一直存在,不至於停止對外提供服務,因此提升了高可用性。反之,禁止 Unclean 領導者選舉的好處在於維護了數據的一致性,避免了消息丟失,但犧牲了高可用性。
如果你聽說過 CAP 理論的話,你一定知道,一個分布式系統通常只能同時滿足一致性(Consistency)、可用性(Availability)、分區容錯性(Partition tolerance)中的兩個。顯然,在這個問題上,Kafka 賦予你選擇 C 或 A 的權利。
你可以根據你的實際業務場景決定是否開啟 Unclean 領導者選舉。不過,我強烈建議你不要開啟它,畢竟我們還可以通過其他的方式來提升高可用性。如果為了這點兒高可用性的改善,犧牲了數據一致性,那就非常不值當了。
副本Commit
同步復制: 只有所有的follower把數據拿過去后才commit,一致性好,可用性不高。
異步復制: 只要leader拿到數據立即commit,等follower慢慢去復制,可用性高,立即返回,一致性差一些。
Commit:是指leader告訴客戶端,這條數據寫成功了。kafka盡量保證commit后立即leader掛掉,其他flower都有該條數據。
kafka不是完全同步,也不是完全異步,是一種ISR機制:
1. leader會維護一個與其基本保持同步的Replica列表,該列表稱為ISR(in-sync Replica),每個Partition都會有一個ISR,而且是由leader動態維護
2. 如果一個flower比一個leader落后太多,或者超過一定時間未發起數據復制請求,則leader將其重ISR中移除
3. 當ISR中所有Replica都向Leader發送ACK時,leader才commit
既然所有Replica都向Leader發送ACK時,leader才commit,那么flower怎么會leader落后太多?
producer往kafka中發送數據,不僅可以一次發送一條數據,還可以發送message的數組;批量發送,同步的時候批量發送,異步的時候本身就是就是批量;底層會有隊列緩存起來,批量發送,對應broker而言,就會收到很多數據(假設1000),這時候leader發現自己有1000條數據,flower只有500條數據,落后了500條數據,就把它從ISR中移除出去,這時候發現其他的flower與他的差距都很小,就等待;如果因為內存等原因,差距很大,就把它從ISR中移除出去。
commit策略配置:
server配置
rerplica.lag.time.max.ms=10000
# 如果leader發現flower超過10秒沒有向它發起fech請求,那么leader考慮這個flower是不是程序出了點問題
# 或者資源緊張調度不過來,它太慢了,不希望它拖慢后面的進度,就把它從ISR中移除。
topic配置
min.insync.replicas=1 # 需要保證ISR中至少有多少個replica
producer配置
request.required.asks=0
# 0:相當於異步的,不需要leader給予回復,producer立即返回,發送就是成功, 那么發送消息網絡超時或broker crash(1.Partition的Leader還沒有commit消息 2.Leader與Follower數據不同步), 既有可能丟失也可能會重發 # 1:當leader接收到消息之后發送ack,丟會重發,丟的概率很小 # -1:當所有的follower都同步消息成功后發送ack. 丟失消息可能性比較低
副本處理
Kafka在啟動的時候會開啟兩個任務
一個任務用來定期地檢查是否需要縮減或者擴大ISR集合,這個周期是replica.lag.time.max.ms的一半,默認5000ms。當檢測到ISR集合中有失效副本時,就會收縮ISR集合,當檢查到有Follower的HighWatermark追趕上Leader時,就會擴充ISR。除此之外,當ISR集合發生變更的時候還會將變更后的記錄緩存到isrChangeSet中,
另外一個任務會周期性地檢查這個Set,如果發現這個Set中有ISR集合的變更記錄,那么它會在zk中持久化一個節點。然后因為Controllr在這個節點的路徑上注冊了一個Watcher,所以它就能夠感知到ISR的變化,並向它所管理的broker發送更新元數據的請求。最后刪除該路徑下已經處理過的節點。
此外,在0.9X版本之前,Kafka中還有另外一個參數replica.lag.max.messages,它也是用來判定失效副本的,當一個副本滯后leader副本的消息數超過這個參數的大小時,則判定它處於同步失效的狀態。它與replica.lag.time.max.ms參數判定出的失效副本取並集組成一個失效副本集合。
不過這個參數本身很難給出一個合適的值。以默認的值4000為例,對於消息流入速度很低的主題(比如TPS為10),這個參數就沒什么用;對於消息流入速度很高的主題(比如TPS為2000),這個參數的取值又會引入ISR的頻繁變動。所以從0.9x版本開始,Kafka就徹底移除了這一個參數。
副本異常
- 慢副本:在一定周期時間內follower不能追趕上leader。最常見的原因之一是I / O瓶頸導致follower追加復制消息速度慢於從leader拉取速度。
- 卡住副本:在一定周期時間內follower停止從leader拉取請求。follower replica卡住了是由於GC暫停或follower失效或死亡。
- 新啟動副本:當用戶給主題增加副本因子時,新的follower不在同步副本列表中,直到他們完全趕上了leader日志。
副本恢復到ISR
1、leader掛掉了,從它的follower中選舉一個作為leader,並把掛掉的leader從ISR中移除,繼續處理數據。一段時間后該leader重新啟動了,它知道它之前的數據到哪里了,嘗試獲取它掛掉后leader處理的數據,獲取完成后它就加入了ISR。
2、等待ISR中任一Replica恢復,並選它為Leader
- 等待時間較長,降低可用性
- 或ISR中的所有Replica都無法恢復或者數據丟失,則該Partition將永不可用
3、選擇第一個恢復的Replica為新的Leader,無論它是否在ISR中(即:Unclean 領導者選舉)
- 並未包含所有已被之前Leader Commit過的消息,因此會造成數據丟失
- 可用性較高