replica副本同步機制
1 前言
Kafka的流行歸功於它設計和操作簡單、存儲系統高效、充分利用磁盤順序讀寫等特性、非常適合在線日志收集等高吞吐場景。
Kafka特性之一是它的復制協議。復制協議是保障kafka高可靠性的關鍵。對於單個集群中每個Broker不同工作負載情況下,如何自動調優Kafka副本的工作方式是比較有挑戰的。它的挑戰之一是要知道如何避免follower進入和退出同步副本列表(即ISR)。從用戶的角度來看,如果生產者發送一大批海量消息,可能會引起Kafka Broker很多警告。這些警報表明一些topics處於“under replicated”狀態,這些副本處於同步失敗或失效狀態,更意味着數據沒有被復制到足夠數量Broker從而增加數據丟失的概率。因此Kafka集群中處於“under replicated”中Partition數要密切監控。這個警告應該來自於Broker失效,減慢或暫停等狀態而不是生產者寫不同大小消息引起的。
2 Kafka的副本機制
Kafka中主題的每個Partition有一個預寫式日志文件,每個Partition都由一系列有序的、不可變的消息組成,這些消息被連續的追加到Partition中,Partition中的每個消息都有一個連續的序列號叫做offset, 確定它在分區日志中唯一的位置。
Kafka每個topic的partition有N個副本,其中N是topic的復制因子。Kafka通過多副本機制實現故障自動轉移,當Kafka集群中一個Broker失效情況下仍然保證服務可用。在Kafka中發生復制時確保partition的預寫式日志有序地寫到其他節點上。N個replicas中。其中一個replica為leader,其他都為follower,leader處理partition的所有讀寫請求,與此同時,follower會被動定期地去復制leader上的數據。
如下圖所示,Kafka集群中有4個broker, 某topic有3個partition,且復制因子即副本個數也為3:
Kafka提供了數據復制算法保證,如果leader發生故障或掛掉,一個新leader被選舉並被接受客戶端的消息成功寫入。Kafka確保從同步副本列表中選舉一個副本為leader,或者說follower追趕leader數據。leader負責維護和跟蹤ISR(In-Sync Replicas的縮寫,表示副本同步隊列,具體可參考下節)中所有follower滯后的狀態。當producer發送一條消息到broker后,leader寫入消息並復制到所有follower。消息提交之后才被成功復制到所有的同步副本。消息復制延遲受最慢的follower限制,重要的是快速檢測慢副本,如果follower“落后”太多或者失效,leader將會把它從ISR中刪除。
副本同步隊列(ISR)
所謂同步,必須滿足如下兩個條件:
- 副本節點必須能與zookeeper保持會話(心跳機制)
- 副本能復制leader上的所有寫操作,並且不能落后太多。(卡住或滯后的副本控制是由 replica.lag.time.max.ms 配置)
默認情況下Kafka對應的topic的replica數量為1,即每個partition都有一個唯一的leader,為了確保消息的可靠性,通常應用中將其值(由broker的參數offsets.topic.replication.factor指定)大小設置為大於1,比如3。 所有的副本(replicas)統稱為Assigned Replicas,即AR。ISR是AR中的一個子集,由leader維護ISR列表,follower從leader同步數據有一些延遲。任意一個超過閾值都會把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也會先存放在OSR中。AR=ISR+OSR。
上一節中的HW俗稱高水位,是HighWatermark的縮寫,取一個partition對應的ISR中最小的LEO作為HW,consumer最多只能消費到HW所在的位置。另外每個replica都有HW,leader和follower各自負責更新自己的HW的狀態。對於leader新寫入的消息,consumer不能立刻消費,leader會等待該消息被所有ISR中的replicas同步后更新HW,此時消息才能被consumer消費。這樣就保證了如果leader所在的broker失效,該消息仍然可以從新選舉的leader中獲取。對於來自內部broKer的讀取請求,沒有HW的限制。
下圖詳細的說明了當producer生產消息至broker后,ISR以及HW和LEO的流轉過程:
由此可見,Kafka的復制機制既不是完全的同步復制,也不是單純的異步復制。事實上,同步復制要求所有能工作的follower都復制完,這條消息才會被commit,這種復制方式極大的影響了吞吐率。而異步復制方式下,follower異步的從leader復制數據,數據只要被leader寫入log就被認為已經commit,這種情況下如果follower都還沒有復制完,落后於leader時,突然leader宕機,則會丟失數據。而Kafka的這種使用ISR的方式則很好的均衡了確保數據不丟失以及吞吐率。
- Controller來維護:Kafka集群中的其中一個Broker會被選舉為Controller,主要負責Partition管理和副本狀態管理,也會執行類似於重分配partition之類的管理任務。在符合某些特定條件下,Controller下的LeaderSelector會選舉新的leader,ISR和新的leader_epoch及controller_epoch寫入Zookeeper的相關節點中。同時發起LeaderAndIsrRequest通知所有的replicas。
- leader來維護:leader有單獨的線程定期檢測ISR中follower是否脫離ISR, 如果發現ISR變化,則會將新的ISR的信息返回到Zookeeper的相關節點中。
副本不同步的異常情況
- 慢副本:在一定周期時間內follower不能追趕上leader。最常見的原因之一是I / O瓶頸導致follower追加復制消息速度慢於從leader拉取速度。
- 卡住副本:在一定周期時間內follower停止從leader拉取請求。follower replica卡住了是由於GC暫停或follower失效或死亡。
- 新啟動副本:當用戶給主題增加副本因子時,新的follower不在同步副本列表中,直到他們完全趕上了leader日志。
3 Follower向leader拉取數據的過程
3.1 replica fetcher 線程何時啟動
broker 分配的任何一個 partition 都是以 Replica 對象實例的形式存在,而 Replica 在 Kafka 上是有兩個角色: leader 和 follower,只要這個 Replica 是 follower,它便會向 leader 進行數據同步。
反映在 ReplicaManager 上就是如果 Broker 的本地副本被選舉為 follower,那么它將會啟動副本同步線程,其具體實現如下所示:
簡單來說,makeFollowers() 的處理過程如下:
- 先從本地記錄 leader partition 的集合中將這些 partition 移除,因為這些 partition 已經被選舉為了 follower;
- 將這些 partition 的本地副本設置為 follower,后面就不會接收關於這個 partition 的 Produce 請求了,如果依然有 client 在向這台 broker 發送數據,那么它將會返回相應的錯誤;
- 先停止關於這些 partition 的副本同步線程(如果本地副本之前是 follower 現在還是 follower,先關閉的原因是:這個 partition 的 leader 發生了變化,如果 leader 沒有發生變化,那么 makeFollower方法返回的是 False,這個 Partition 就不會被添加到 partitionsToMakeFollower 集合中),這樣的話可以保證這些 partition 的本地副本將不會再有新的數據追加;
- 對這些 partition 本地副本日志文件進行截斷操作並進行 checkpoint 操作;
- 完成那些延遲處理的 Produce 和 Fetch 請求;
- 如果本地的 broker 沒有掉線,那么向這些 partition 新選舉出來的 leader 啟動副本同步線程。
關於第6步,並不一定會為每一個 partition 都啟動一個 fetcher 線程,對於一個目的 broker,只會啟動 num.replica.fetchers
個線程,具體這個 topic-partition 會分配到哪個 fetcher 線程上,是根據 topic 名和 partition id 進行計算得到,實現所示:
3.2 replica fetcher 線程啟動
如上所示,在 ReplicaManager 調用 makeFollowers() 啟動 replica fetcher 線程后,它實際上是通過 ReplicaFetcherManager 實例進行相關 topic-partition 同步線程的啟動和關閉,其啟動過程分為下面兩步:
- ReplicaFetcherManager 調用 addFetcherForPartitions() 添加對這些 topic-partition 的數據同步流程;
- ReplicaFetcherManager 調用 createFetcherThread() 初始化相應的 ReplicaFetcherThread 線程。
addFetcherForPartitions()
的具體實現如下所示:
這個方法其實是做了下面這幾件事:
- 先計算這個 topic-partition 對應的 fetcher id;
- 根據 leader 和 fetcher id 獲取對應的 replica fetcher 線程,如果沒有找到,就調用 createFetcherThread() 創建一個新的 fetcher 線程;
- 如果是新啟動的 replica fetcher 線程,那么就啟動這個線程;
- 將 topic-partition 記錄到 fetcherThreadMap 中,這個變量記錄每個 replica fetcher 線程要同步的 topic-partition 列表。
ReplicaFetcherManager 創建 replica Fetcher 線程的實現如下:
3.3 replica fetcher 線程處理過程
replica fetcher 線程在啟動之后就開始進行正常數據同步流程了,這個過程都是在 ReplicaFetcherThread 線程中實現的。
ReplicaFetcherThread 的 doWork()
方法是一直在這個線程中的 run()
中調用的,實現方法如下:
在 doWork() 方法中主要做了兩件事:
- 構造相應的 Fetch 請求(buildFetchRequest());
- 通過 processFetchRequest() 方法發送 Fetch 請求,並對其結果進行相應的處理。
processFetchRequest()
這個方法的作用是發送 Fetch 請求,並對返回的結果進行處理,最終寫入到本地副本的 Log 實例中,其具體實現:
其處理過程簡單總結一下:
- 通過 fetch() 方法,發送 Fetch 請求,獲取相應的 response(如果遇到異常,那么在下次發送 Fetch 請求之前,會 sleep 一段時間再發);
- 如果返回的結果 不為空,並且 Fetch 請求的 offset 信息與返回結果的 offset 信息對得上,那么就會調用 processPartitionData() 方法將拉取到的數據追加本地副本的日志文件中,如果返回結果有錯誤信息,那么就對相應錯誤進行相應的處理;
- 對在 Fetch 過程中遇到異常或返回錯誤的 topic-partition,會進行 delay 操作,下次 Fetch 請求的發生至少要間隔 replica.fetch.backoff.ms 時間。
fetch()
方法作用是發送 Fetch 請求,並返回相應的結果,其具體的實現,如下:
processPartitionData
這個方法的作用是,處理 Fetch 請求的具體數據內容,簡單來說就是:檢查一下數據大小是否超過限制、將數據追加到本地副本的日志文件中、更新本地副本的 hw 值。
3.3 副本同步異常情況的處理
在副本同步的過程中,會遇到哪些異常情況呢?
大家一定會想到關於 offset 的問題,在 Kafka 中,關於 offset 的處理,無論是 producer 端、consumer 端還是其他地方,offset 似乎都是一個形影不離的問題。在副本同步時,關於 offset,會遇到什么問題呢?下面舉兩個異常的場景:
- 假如當前本地(id:1)的副本現在是 leader,其 LEO 假設為1000,而另一個在 isr 中的副本(id:2)其 LEO 為800,此時出現網絡抖動,id 為1 的機器掉線后又上線了,但是此時副本的 leader 實際上已經變成了 2,而2的 LEO 為800,這時候1啟動副本同步線程去2上拉取數據,希望從 offset=1000 的地方開始拉取,但是2上最大的 offset 才是800,這種情況該如何處理呢?
- 假設一個 replica (id:1)其 LEO 是10,它已經掉線好幾天,這個 partition leader 的 offset 范圍是 [100, 800],那么 1 重啟啟動時,它希望從 offset=10 的地方開始拉取數據時,這時候發生了 OutOfRange,不過跟上面不同的是這里是小於了 leader offset 的范圍,這種情況又該怎么處理?
以上兩種情況都是 offset OutOfRange 的情況,只不過:一是 Fetch Offset 超過了 leader 的 LEO,二是 Fetch Offset 小於 leader 最小的 offset
在介紹 Kafka 解決方案之前,我們先來自己思考一下這兩種情況應該怎么處理?
- 如果 fetch offset 超過 leader 的 offset,這時候副本應該是回溯到 leader 的 LEO 位置(超過這個值的數據刪除),然后再去進行副本同步,當然這種解決方案其實是無法保證 leader 與 follower 數據的完全一致,再次發生 leader 切換時,可能會導致數據的可見性不一致,但既然用戶允許了臟選舉的發生,其實我們是可以認為用戶是可以接收這種情況發生的;
- 這種就比較容易處理,首先清空本地的數據,因為本地的數據都已經過期了,然后從 leader 的最小 offset 位置開始拉取數據。
上面是我們比較容易想出的解決方案,而在 Kafka 中,其解決方案也很類似,不過遇到情況比上面我們列出的兩種情況多了一些復雜,其解決方案如下:
針對第一種情況,在 Kafka 中,實際上還會發生這樣一種情況,1 在收到 OutOfRange 錯誤時,這時去 leader 上獲取的 LEO 值與最小的 offset 值,這時候卻發現 leader 的 LEO 已經從 800 變成了 1100(這個 topic-partition 的數據量增長得比較快),再按照上面的解決方案就不太合理,Kafka 這邊的解決方案是:遇到這種情況,進行重試就可以了,下次同步時就會正常了,但是依然會有上面說的那個問題。
3.4 replica fetcher 線程的關閉
replica fetcher 線程關閉的條件,在三種情況下會關閉對這個 topic-partition 的拉取操作:
- stopReplica():broker 收到了 controller 發來的 StopReplica 請求,這時會開始關閉對指定 topic-partition 的同步線程;
- makeLeaders:這些 partition 的本地副本被選舉成了 leader,這時候就會先停止對這些 topic-partition 副本同步線程;
- makeFollowers():前面已經介紹過,這里實際上停止副本同步,然后再開啟副本同步線程,因為這些 topic-partition 的 leader 可能發生了切換。
這里直接說線程關閉,其實不是很准確,因為每個 replica fetcher 線程操作的是多個 topic-partition,而在關閉的粒度是 partition 級別,只有這個線程分配的 partition 全部關閉后,這個線程才會真正被關閉。
stopReplica
StopReplica 的請求實際上是 Controller 發送過來的,這個在 controller 部分會講述,它觸發的條件有多種,比如:broker 下線、partition replica 遷移等等。
makeLeaders
makeLeaders()
方法的調用是在 broker 上這個 partition 的副本被設置為 leader 時觸發的,其實現如下:
調用 ReplicaFetcherManager 的 removeFetcherForPartitions()
刪除對這些 topic-partition 的副本同步設置,這里在實現時,會遍歷所有的 replica fetcher 線程,都執行 removePartitions()
方法來移除對應的 topic-partition 集合。
removePartitions
這個方法的作用是:ReplicaFetcherThread 將這些 topic-partition 從自己要拉取的 partition 列表中移除。
ReplicaFetcherThread 的關閉
前面介紹那么多,似乎還是沒有真正去關閉,那么 ReplicaFetcherThread 真正關閉是哪里操作的呢?
實際上 ReplicaManager 每次處理完 LeaderAndIsr 請求后,都會調用 ReplicaFetcherManager 的 shutdownIdleFetcherThreads()
方法,如果 fetcher 線程要拉取的 topic-partition 集合為空,那么就會關閉掉對應的 fetcher 線程。
參考資料:
https://www.cnblogs.com/aidodoo/p/8873163.html
https://juejin.im/post/5bf6b0acf265da612d18e931
https://www.infoq.cn/article/depth-interpretation-of-kafka-data-reliability