replica副本同步機制


replica副本同步機制

1 前言

 Kafka的流行歸功於它設計和操作簡單、存儲系統高效、充分利用磁盤順序讀寫等特性、非常適合在線日志收集等高吞吐場景。

 Kafka特性之一是它的復制協議。復制協議是保障kafka高可靠性的關鍵。對於單個集群中每個Broker不同工作負載情況下,如何自動調優Kafka副本的工作方式是比較有挑戰的。它的挑戰之一是要知道如何避免follower進入和退出同步副本列表(即ISR)。從用戶的角度來看,如果生產者發送一大批海量消息,可能會引起Kafka Broker很多警告。這些警報表明一些topics處於“under replicated”狀態,這些副本處於同步失敗或失效狀態,更意味着數據沒有被復制到足夠數量Broker從而增加數據丟失的概率。因此Kafka集群中處於“under replicated”中Partition數要密切監控。這個警告應該來自於Broker失效,減慢或暫停等狀態而不是生產者寫不同大小消息引起的。

2 Kafka的副本機制

Kafka中主題的每個Partition有一個預寫式日志文件,每個Partition都由一系列有序的、不可變的消息組成,這些消息被連續的追加到Partition中,Partition中的每個消息都有一個連續的序列號叫做offset, 確定它在分區日志中唯一的位置。

 

 

Kafka每個topic的partition有N個副本,其中N是topic的復制因子。Kafka通過多副本機制實現故障自動轉移,當Kafka集群中一個Broker失效情況下仍然保證服務可用。在Kafka中發生復制時確保partition的預寫式日志有序地寫到其他節點上。N個replicas中。其中一個replica為leader,其他都為follower,leader處理partition的所有讀寫請求,與此同時,follower會被動定期地去復制leader上的數據。

如下圖所示,Kafka集群中有4個broker, 某topic有3個partition,且復制因子即副本個數也為3:

 

 

Kafka提供了數據復制算法保證,如果leader發生故障或掛掉,一個新leader被選舉並被接受客戶端的消息成功寫入。Kafka確保從同步副本列表中選舉一個副本為leader,或者說follower追趕leader數據。leader負責維護和跟蹤ISR(In-Sync Replicas的縮寫,表示副本同步隊列,具體可參考下節)中所有follower滯后的狀態。當producer發送一條消息到broker后,leader寫入消息並復制到所有follower。消息提交之后才被成功復制到所有的同步副本。消息復制延遲受最慢的follower限制,重要的是快速檢測慢副本,如果follower“落后”太多或者失效,leader將會把它從ISR中刪除。

副本同步隊列(ISR)
所謂同步,必須滿足如下兩個條件:

  • 副本節點必須能與zookeeper保持會話(心跳機制)
  • 副本能復制leader上的所有寫操作,並且不能落后太多。(卡住或滯后的副本控制是由 replica.lag.time.max.ms 配置)

默認情況下Kafka對應的topic的replica數量為1,即每個partition都有一個唯一的leader,為了確保消息的可靠性,通常應用中將其值(由broker的參數offsets.topic.replication.factor指定)大小設置為大於1,比如3。 所有的副本(replicas)統稱為Assigned Replicas,即AR。ISR是AR中的一個子集,由leader維護ISR列表,follower從leader同步數據有一些延遲。任意一個超過閾值都會把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也會先存放在OSR中。AR=ISR+OSR。

上一節中的HW俗稱高水位,是HighWatermark的縮寫,取一個partition對應的ISR中最小的LEO作為HW,consumer最多只能消費到HW所在的位置。另外每個replica都有HW,leader和follower各自負責更新自己的HW的狀態。對於leader新寫入的消息,consumer不能立刻消費,leader會等待該消息被所有ISR中的replicas同步后更新HW,此時消息才能被consumer消費。這樣就保證了如果leader所在的broker失效,該消息仍然可以從新選舉的leader中獲取。對於來自內部broKer的讀取請求,沒有HW的限制。
下圖詳細的說明了當producer生產消息至broker后,ISR以及HW和LEO的流轉過程:

 

 

由此可見,Kafka的復制機制既不是完全的同步復制,也不是單純的異步復制。事實上,同步復制要求所有能工作的follower都復制完,這條消息才會被commit,這種復制方式極大的影響了吞吐率。而異步復制方式下,follower異步的從leader復制數據,數據只要被leader寫入log就被認為已經commit,這種情況下如果follower都還沒有復制完,落后於leader時,突然leader宕機,則會丟失數據。而Kafka的這種使用ISR的方式則很好的均衡了確保數據不丟失以及吞吐率。

  • Controller來維護:Kafka集群中的其中一個Broker會被選舉為Controller,主要負責Partition管理和副本狀態管理,也會執行類似於重分配partition之類的管理任務。在符合某些特定條件下,Controller下的LeaderSelector會選舉新的leader,ISR和新的leader_epoch及controller_epoch寫入Zookeeper的相關節點中。同時發起LeaderAndIsrRequest通知所有的replicas。
  • leader來維護:leader有單獨的線程定期檢測ISR中follower是否脫離ISR, 如果發現ISR變化,則會將新的ISR的信息返回到Zookeeper的相關節點中。

副本不同步的異常情況

  • 慢副本:在一定周期時間內follower不能追趕上leader。最常見的原因之一是I / O瓶頸導致follower追加復制消息速度慢於從leader拉取速度。
  • 卡住副本:在一定周期時間內follower停止從leader拉取請求。follower replica卡住了是由於GC暫停或follower失效或死亡。
  • 新啟動副本:當用戶給主題增加副本因子時,新的follower不在同步副本列表中,直到他們完全趕上了leader日志。

 

3 Follower向leader拉取數據的過程

3.1 replica fetcher 線程何時啟動

broker 分配的任何一個 partition 都是以 Replica 對象實例的形式存在,而 Replica 在 Kafka 上是有兩個角色: leader 和 follower,只要這個 Replica 是 follower,它便會向 leader 進行數據同步。

反映在 ReplicaManager 上就是如果 Broker 的本地副本被選舉為 follower,那么它將會啟動副本同步線程,其具體實現如下所示:

 

 

簡單來說,makeFollowers() 的處理過程如下:

  1. 先從本地記錄 leader partition 的集合中將這些 partition 移除,因為這些 partition 已經被選舉為了 follower;
  2. 將這些 partition 的本地副本設置為 follower,后面就不會接收關於這個 partition 的 Produce 請求了,如果依然有 client 在向這台 broker 發送數據,那么它將會返回相應的錯誤;
  3. 先停止關於這些 partition 的副本同步線程(如果本地副本之前是 follower 現在還是 follower,先關閉的原因是:這個 partition 的 leader 發生了變化,如果 leader 沒有發生變化,那么 makeFollower方法返回的是 False,這個 Partition 就不會被添加到 partitionsToMakeFollower 集合中),這樣的話可以保證這些 partition 的本地副本將不會再有新的數據追加;
  4. 對這些 partition 本地副本日志文件進行截斷操作並進行 checkpoint 操作;
  5. 完成那些延遲處理的 Produce 和 Fetch 請求;
  6. 如果本地的 broker 沒有掉線,那么向這些 partition 新選舉出來的 leader 啟動副本同步線程。

關於第6步,並不一定會為每一個 partition 都啟動一個 fetcher 線程,對於一個目的 broker,只會啟動 num.replica.fetchers 個線程,具體這個 topic-partition 會分配到哪個 fetcher 線程上,是根據 topic 名和 partition id 進行計算得到,實現所示:

 

 

3.2 replica fetcher 線程啟動

如上所示,在 ReplicaManager 調用 makeFollowers() 啟動 replica fetcher 線程后,它實際上是通過 ReplicaFetcherManager 實例進行相關 topic-partition 同步線程的啟動和關閉,其啟動過程分為下面兩步:

  1. ReplicaFetcherManager 調用 addFetcherForPartitions() 添加對這些 topic-partition 的數據同步流程;
  2. ReplicaFetcherManager 調用 createFetcherThread() 初始化相應的 ReplicaFetcherThread 線程。

 

addFetcherForPartitions() 的具體實現如下所示:

 

 

這個方法其實是做了下面這幾件事:

  1. 先計算這個 topic-partition 對應的 fetcher id;
  2. 根據 leader 和 fetcher id 獲取對應的 replica fetcher 線程,如果沒有找到,就調用 createFetcherThread() 創建一個新的 fetcher 線程;
  3. 如果是新啟動的 replica fetcher 線程,那么就啟動這個線程;
  4. 將 topic-partition 記錄到 fetcherThreadMap 中,這個變量記錄每個 replica fetcher 線程要同步的 topic-partition 列表。

ReplicaFetcherManager 創建 replica Fetcher 線程的實現如下:

 

3.3 replica fetcher 線程處理過程

replica fetcher 線程在啟動之后就開始進行正常數據同步流程了,這個過程都是在 ReplicaFetcherThread 線程中實現的。

ReplicaFetcherThread 的 doWork() 方法是一直在這個線程中的 run() 中調用的,實現方法如下:

 

 

在 doWork() 方法中主要做了兩件事:

  1. 構造相應的 Fetch 請求(buildFetchRequest());
  2. 通過 processFetchRequest() 方法發送 Fetch 請求,並對其結果進行相應的處理。

 

processFetchRequest() 這個方法的作用是發送 Fetch 請求,並對返回的結果進行處理,最終寫入到本地副本的 Log 實例中,其具體實現:

 

 

其處理過程簡單總結一下:

  1. 通過 fetch() 方法,發送 Fetch 請求,獲取相應的 response(如果遇到異常,那么在下次發送 Fetch 請求之前,會 sleep 一段時間再發);
  2. 如果返回的結果 不為空,並且 Fetch 請求的 offset 信息與返回結果的 offset 信息對得上,那么就會調用 processPartitionData() 方法將拉取到的數據追加本地副本的日志文件中,如果返回結果有錯誤信息,那么就對相應錯誤進行相應的處理;
  3. 對在 Fetch 過程中遇到異常或返回錯誤的 topic-partition,會進行 delay 操作,下次 Fetch 請求的發生至少要間隔 replica.fetch.backoff.ms 時間。

 

fetch() 方法作用是發送 Fetch 請求,並返回相應的結果,其具體的實現,如下:

 

 

processPartitionData

這個方法的作用是,處理 Fetch 請求的具體數據內容,簡單來說就是:檢查一下數據大小是否超過限制、將數據追加到本地副本的日志文件中、更新本地副本的 hw 值。

 

 

3.3 副本同步異常情況的處理

在副本同步的過程中,會遇到哪些異常情況呢?

大家一定會想到關於 offset 的問題,在 Kafka 中,關於 offset 的處理,無論是 producer 端、consumer 端還是其他地方,offset 似乎都是一個形影不離的問題。在副本同步時,關於 offset,會遇到什么問題呢?下面舉兩個異常的場景:

  1. 假如當前本地(id:1)的副本現在是 leader,其 LEO 假設為1000,而另一個在 isr 中的副本(id:2)其 LEO 為800,此時出現網絡抖動,id 為1 的機器掉線后又上線了,但是此時副本的 leader 實際上已經變成了 2,而2的 LEO 為800,這時候1啟動副本同步線程去2上拉取數據,希望從 offset=1000 的地方開始拉取,但是2上最大的 offset 才是800,這種情況該如何處理呢?
  2. 假設一個 replica (id:1)其 LEO 是10,它已經掉線好幾天,這個 partition leader 的 offset 范圍是 [100, 800],那么 1 重啟啟動時,它希望從 offset=10 的地方開始拉取數據時,這時候發生了 OutOfRange,不過跟上面不同的是這里是小於了 leader offset 的范圍,這種情況又該怎么處理?

以上兩種情況都是 offset OutOfRange 的情況,只不過:一是 Fetch Offset 超過了 leader 的 LEO,二是 Fetch Offset 小於 leader 最小的 offset

在介紹 Kafka 解決方案之前,我們先來自己思考一下這兩種情況應該怎么處理?

  1. 如果 fetch offset 超過 leader 的 offset,這時候副本應該是回溯到 leader 的 LEO 位置(超過這個值的數據刪除),然后再去進行副本同步,當然這種解決方案其實是無法保證 leader 與 follower 數據的完全一致,再次發生 leader 切換時,可能會導致數據的可見性不一致,但既然用戶允許了臟選舉的發生,其實我們是可以認為用戶是可以接收這種情況發生的;
  2. 這種就比較容易處理,首先清空本地的數據,因為本地的數據都已經過期了,然后從 leader 的最小 offset 位置開始拉取數據。

上面是我們比較容易想出的解決方案,而在 Kafka 中,其解決方案也很類似,不過遇到情況比上面我們列出的兩種情況多了一些復雜,其解決方案如下:

 

 

針對第一種情況,在 Kafka 中,實際上還會發生這樣一種情況,1 在收到 OutOfRange 錯誤時,這時去 leader 上獲取的 LEO 值與最小的 offset 值,這時候卻發現 leader 的 LEO 已經從 800 變成了 1100(這個 topic-partition 的數據量增長得比較快),再按照上面的解決方案就不太合理,Kafka 這邊的解決方案是:遇到這種情況,進行重試就可以了,下次同步時就會正常了,但是依然會有上面說的那個問題。

3.4 replica fetcher 線程的關閉

 replica fetcher 線程關閉的條件,在三種情況下會關閉對這個 topic-partition 的拉取操作:

  1. stopReplica():broker 收到了 controller 發來的 StopReplica 請求,這時會開始關閉對指定 topic-partition 的同步線程;
  2. makeLeaders:這些 partition 的本地副本被選舉成了 leader,這時候就會先停止對這些 topic-partition 副本同步線程;
  3. makeFollowers():前面已經介紹過,這里實際上停止副本同步,然后再開啟副本同步線程,因為這些 topic-partition 的 leader 可能發生了切換。

這里直接說線程關閉,其實不是很准確,因為每個 replica fetcher 線程操作的是多個 topic-partition,而在關閉的粒度是 partition 級別,只有這個線程分配的 partition 全部關閉后,這個線程才會真正被關閉。

stopReplica

StopReplica 的請求實際上是 Controller 發送過來的,這個在 controller 部分會講述,它觸發的條件有多種,比如:broker 下線、partition replica 遷移等等。

makeLeaders

makeLeaders() 方法的調用是在 broker 上這個 partition 的副本被設置為 leader 時觸發的,其實現如下:

 

 

調用 ReplicaFetcherManager 的 removeFetcherForPartitions() 刪除對這些 topic-partition 的副本同步設置,這里在實現時,會遍歷所有的 replica fetcher 線程,都執行 removePartitions() 方法來移除對應的 topic-partition 集合。

 

 

removePartitions

這個方法的作用是:ReplicaFetcherThread 將這些 topic-partition 從自己要拉取的 partition 列表中移除。

 

 

 

ReplicaFetcherThread 的關閉

前面介紹那么多,似乎還是沒有真正去關閉,那么 ReplicaFetcherThread 真正關閉是哪里操作的呢?

實際上 ReplicaManager 每次處理完 LeaderAndIsr 請求后,都會調用 ReplicaFetcherManager 的 shutdownIdleFetcherThreads() 方法,如果 fetcher 線程要拉取的 topic-partition 集合為空,那么就會關閉掉對應的 fetcher 線程。

 

 

參考資料:

https://www.cnblogs.com/aidodoo/p/8873163.html

https://juejin.im/post/5bf6b0acf265da612d18e931

https://www.infoq.cn/article/depth-interpretation-of-kafka-data-reliability

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM