Kafka設計原理總結


前言

好長時間沒有更新過博客了,這一段時間工作和私人生活的時間擠壓,也很少有時間可以比較詳細的梳理一篇文章。梳理一篇文章讓遇到相同問題或學習的同學理解我認為是很重要的,隨便搞一搞會誤人子弟的(我盡量不誤人子弟哈,捂臉),哈哈。

在自己的學習過程中,每次深入學習一個技術時也都會將思路以及核心點按自己理解的方式總結筆記,但是發現這樣的總結可能不適合以博客的形式發出來,詳細的寫一篇文章的成本非常高,為了加快自己的學習效率,所總結的筆記更多的是記錄自己理解時疑點或是比較精華的地方,記錄形式也會稍有粗糙。后面有時間我會盡量將總結的內容詳細整理后發出來,大家一起相互學習,互相進步 ~

本片文章適合對kafka基礎有一定了解的同學參考、學習。

場景分析

觀察者模式應該都不陌生,看過zk源碼應該都知道,里面大量使用了 阻塞隊列+線程池的方式來實現觀察者模式,來做異步處理。這種方式只能解決單進程下的異步處理,假設是兩個服務之間采用異步的方式來處理任務,單進程隊列就無法處理了。那么就需要一個中間層來存儲消息分發消息。

應用場景

Kafka是一款分布式消息發布與訂閱系統,特點是高性能、高吞吐量。

Kafka最開始的應用場景就是針對流數據、運營數據等做日志收集,用戶畫像、日志收集監控。適用於大數據的場景。

架構設計

一個典型的 Kafka 體系架構包括若干 Producer(可以是服務器日志,業務數據,頁面前端產生的 page view 等等),若干 broker(Kafka 支持水平擴展,一般 broker 數量越多,集群吞吐率越高),若干 Consumer (Group),以及一個 Zookeeper 集群。Kafka 通過 Zookeeper 管理集群配置,選舉 leader,以及在 consumer group 發生變化時進行 rebalance。Producer 使用 push(推) 模式將消息發布到 broker,Consumer 使用 pull(拉) 模式從 broker 訂閱並消費消息。

分區(Partition)

針對一個主題划分出多個區域部署在多台節點中,每個分區都是一個獨立,都保存着一個topic的部分數據,這樣的做法為了提升處理性能以及存儲容量,並可以線性拓展提高吞吐量。

在很多場景都存在這種思想,網絡通信TCP數據包、數據庫分庫分表等等。

消費者集群可以按策略均勻的訂閱不同的分區,提高處理性能。

每一個分區只能有一個消費者消費(同一個消費組下),這樣減少了並發爭搶帶來的開銷。如果consumer比partition多,那么多余的consumer是浪費的。

分區的建議:

1、consumer group下的consumer的數量不要大於partition

上面提到了,kafka的設計是在同一個partition上是不允許並發的。consumer比partition多則浪費。

2、建議partition的數量是consumer group下的consumer數量的整數倍

當partition比consumer多時,一個consumer可能會消費多個partition,如果不是整數倍,則會有的consumer消費的partition多,有的少,消費不均勻。

Rebalance

每一個消費者都會訂閱屬於自己負責的topic分區,但是如果出現Broker宕機或者是消費者宕機,某些topic分區的數據就沒有消費者來消費了,造成消息丟失的錯覺。那么Rebalance就是一種動態來分配分區與消息者之間關系的策略。有以下幾種情況會觸發Rebalance:

  • 當消費組消費者數量發生變化(宕機、擴縮容等)
  • topic的分區數量發生變化

那么當出現以上情況的任意一種時,kafka就會重新分配每個consumer和partition之間的關系。Rebalance制定了一些分區策略,讓消費組下的所有consumer達成一致,讓每個消費者均衡的訂閱不同的分區。

  • 范圍分區
1、對partition和consumer個數相除、取模划分消費的范圍。

公式:
n = partition個數 / consumer 個數
m = partition個數 % consumer 個數
前m個consumer分配n+1,后面的消費者分配n個分區。

example:
partition:0,1,2,3,4,5,6,7,8,9
consumer:0,1,2

consumer0:0,1,2,3
consumer1:4,5,6
consumer2:7,8,9
  • 輪詢分區
1、將partition和consumer分別通過hashcode進行排序,然后consumer組按順序依次輪詢一人消費一個分區,直到沒有分區。
example:
partition:0,1,2,3,4,5,6,7
consumer:0,1,2

consumer0:0,3,6
consumer1:1,4,6
consumer2:2,5
  • 粘性分區
1、分區的分配盡可能的均勻
2、分區的分配盡可能和上次分配保持相同

Rebalance策略總結:范圍分區、輪詢分區兩個策略相對分配策略比較簡單,但當觸發Rebalance時可能每個消費者Rebalance后的與之前的分區映射很可能有很大的差別,或不均勻的情況。粘性分區可以進可能的保持這兩點,但分配計算策略要復雜很多,相應的Rebalance計算時間也會更長些。

Coordinator

上面介紹了Rebalance策略對partition和consumer之間關系的概念,但這個概念最終的如何做的,如何交互的。這時就引入了Coordinator。

核心分為三個階段:

find

consumer group下所有節點選擇kafka cluster中負載最小的節點發送請求,這個kafka node會根據consumer group id對_consumer_offsets分區數量進行取模找到存儲當前consumer group offset的kafka node信息返回給consumer,consumer以它作為當前分組的coordinator,並向coordinator建立連接。

__consumer_offsets是記錄consumer消費partition的消費位置的主題(默認有50個分區)

join

1、consumer發送當前自己的信息,比如自己的分區策略、訂閱的topic信息、consumer groupid等等。

2、coordinator根據consumer group下的所有consumer上報的分區策略,來確定分區策略(哪個策略多就選哪個)。

3、coordinator選舉一個consumer作為當前consumer group的leader節點,將選好的分區策略和同consumer group的其他consumer信息告訴leader(步驟1的response),由leader來進行具體的策略執行。

1、正常情況下同一個consumer group下consumer分區策略都是一樣,我們不會部署一個應用集群,每台機器還單獨選擇不同的kafka 分區策略。

2、將計算分區的邏輯下發給客戶端做,減少kafka cluster的計算資源。

sync

1、當連接建立成功后,所有consumer會想coordinator發送同步請求,只有leader的sync請求會將計算好的分區規則發送給coordinator,coordinator會等待leader的sync請求,收到leader的分區規則后,將規則response所有consumer分區的規則。

分區副本(Replication)

每個分區都是一個獨立,都保存着一個topic的部分數據,那么如果其中分區的機器發生宕機,將丟失部分數據。分區副本就是來保證每個分區的高可用,避免數據丟失。

副本分區主要采用leader/follower類似主從的方式,leader副本處理所有請求,follower同步leader數據。

Kafka副本選舉機制不采用Quorum多數通過的策略是因為在大多數投票中,多數節點掛掉會讓你不能選舉leader,要冗余單點故障需要三份數據(三個節點),並且要冗余兩個故障需要五份的數據。根據我們的經驗,在一個系統中,僅僅靠冗余來避免單點故障是不夠的,但是每寫5次,對磁盤空間需求是5倍, 吞吐量下降到 1/5,這對於處理海量數據問題是不切實際的。quorum算法更適用於做一些集群配置等存儲,並不適合大數據原始數據的存儲。kafka采用一種自研的ISR的做法。

分區副本數量不能大於broker數量,每個台broker上同一個分區副本只能存在一份。因為放多份也沒什么意義,掛掉是整個broker都掛掉了。

ISR

ISR是一個分區副本集合,這個集合里存放的副本是與leader副本的延遲”差不多“的副本,與Quorum機制不同的是ISR是一個可以靈活調整的高可用機制。

follower副本會同步leader副本的所有數據,那么這個同步過程肯定是有延遲的,這個延遲每個follower副本肯定都是不一樣的,中間摻雜網絡問題、宕機的副本重新恢復了等等。

ISR集合維護是與leader節點保持密切聯系(或者說性能比較好)的follower副本,每次提交數據只用和這些性能較好的高度一致的副本(ISR集合中的副本)完成同步就可以認為這條數據已經提交了成功了,然后給到producer響應。在一致性和性能上做了權衡,也可以根據配置來適配業務。

加入ISR集合條件

ISR的數據會保存在zookeeper中由leader副本負責維護,滿足ISR集合的條件:

1、副本節點必須與zookeeper保持連接。

2、副本節點的最后一條offset值與leader副本節點的offset值的相差不能超過一個時間閾值,如果follower副本在此閾值時間內一直趕不上leader副本,則被踢出ISR集合。

生產者確認機制

這塊主要就是開發給業務人員來根據自身的業務來權衡一致性和性能的策略

0 代表不需要等待broker確認

效率最高,但風險最大,生產者都不知道消息是否發送到了broker

1 代表只需要kafka cluster中leader副本確認

延遲較小,但也容易出現leader副本確認后宕機消息沒有同步到follower副本的情況

-1 代表需要ISR集合中所有的節點確認

效率最低,但安全性最高。但也不能完全避免數據丟失,ISR集合極端情況也有可能縮小到只有一個副本。

所有副本不工作情況

當partition下的所有分區都掛掉時的處理策略,在可用性或一致性之間的衡量。

1、只等待ISR集合中有副本活過來選他作為leader

保證消息的一致性,但不可用的時間會相對拉長,如果ISR集合一直沒有副本活過來則一直不可用

2、任意一個副本活過來選他作為leader

復活的副本可以是不在ISR集合中的副本,但他不一定包含了全部已經commit了的消息。

LEO & HW

在副本同步中,LEO和HW用來記錄數據同步處理過程的狀態。

LEO(log end offset):表示消息日志的下一條offset位置(是當前末尾日志的下一個即將被消費的位置)
HW(hight water):表示當前位置之前的消息都是可以消費的

表示已經被ISR集合中所有副本同步過的值,可以理解為這些值是都已經備份過的,代表數據已經提交成功了的。

例如下圖:

leader副本收到了生產者發來的6條消息,followerB同步了5條,followerC同步了4條。那么leader的HW等於2,證明只有0-2的數據已經被所有follower(根據配置的策略定的)備份過了。

副本同步

同步流程

同步流程:

1、follower副本與leader副本建立一個長輪訓模型(pull)建立時會發送自己的下一個的消費位置LEO。
2、當leader副本收到一條消息時,leader副本保持消息並更新LEO,響應follower副本請求,將消息和HW response給follower副本,follower副本保持消息並更新LEO。
3、follower處理完response后,會再次向leader副本發起fetch(長輪訓),同樣告訴leader副本自己的LEO信息,leader將會更新remote LEO值(leader記錄follower副本LEO的屬性),根據remote LEO取所有follower副本(ISR集合中)發送的LEO值的最小值更新leader的HW值。

Kafka使用HW值來決定副本備份的進度,而HW值的更新通常需要額外一輪FETCH RPC才能完成

日志截斷

如果發生當leader副本收到producer提交消息后,消息沒有完全同步(有可能同步了一部分,不同follower之間同步的進度也不一致)到follower副本時出現宕機后又恢復,follower與leader副本的數據如何保持一致。如下圖:

1、leader收到了6條數據,followerB少同步了5,followerC少同步了4、5.
2、當leader出現宕機時,followerB擔任leader
3、當老的leader恢復時變成followerA,會將日志截斷到HW時的位置,將LEO指向HW。然后向新leader進行FETCH數據。

老的leader恢復時必須要放棄之前提交消息,如果不進行日志截斷,那么新leaderB如果收到又一個producer的消息那么他5這個位置和老leader5這個位置就產生數據不一致了。所以將LEO恢復到HW位置,因為只有HW位置之前的數據都是所有副本已備份並且認同的,3、4、5數據並沒有與所有副本(ISR集合)確認,需要拋棄這些數據然后重新和新的leader進行同步。

如果ISR副本同步策略等於-1,那么證明其實kafka server還並沒有響應producer告訴他這條消息發生成功了,那么這時如果leader宕機producer那邊收到異常情況就會嘗試重新發送消息(kafka默認保障At least once策略,可能會出現重復消息)。

follower副本如果重啟是一樣的,同樣也會截斷到follower的HW位置。因為不知道在重啟過過程中,自己之前備份的數據是否最終被“提交了”,或者經過了多輪leader選舉,leader都換了不知道多少人了,那HW之后位置的消息誰知道還是不是一致的了。

數據丟失風險

了解完了上面副本同步流程,LEO&HW本身是一個好的設計,但是只按上面的設計會存在數據丟失的風險,核心就點在於在第二輪fetch時follower的HW才可以被更新(是一個異步延遲更新),一旦出現崩潰就會被作為日志截斷的依據,導致HW過期。如下圖:

如上圖描述,producer端已經收到消息確認的通知了,但經過這樣的極端情況,最終導致已經確認的消息丟失。

數據不一致風險

如上圖描述,前三步驟和數據丟失情況一致,在老leader沒有恢復之前,新leader又收到了生產者發來的一條消息。當老leader恢復時變成follower節點,發生自己的HW和LEO相等,就不用日志截斷了。這樣就發生了同一個offset位置的數據不一致情況。

Leader Epoch

核心問題在於依據HW截斷做日志截斷的依據,而且HW的同步是異步的,任何異常崩潰都可能導致HW是一個過期的值。在kafka0.11.x版本引入了leader epoch的概念來規避此問題。leader epoch由一對二元組組成(epoch,startOffset)。Kafka Broker 會在內存中為每個分區都緩存 Leader Epoch 數據,同時它還會定期地將這些信息持久化到一個 checkpoint 文件中。當 Leader 副本寫入消息到磁盤時,Broker 會嘗試更新這部分緩存。如果該 Leader 是首次寫入消息,那么 Broker 會向緩存中增加一個 Leader Epoch 條目,否則就不做更新。

epoch區別leader的朝代,當leader更換時epoch會+1

startOffset代表當前朝代的leader時從哪個offset位置開始的

follower當重啟后並不會直接進行日志截斷,先向現任leader發起OffsetsForLeaderEpochRequest請求攜帶follower副本當前的epoch。有如下幾種情況:

  • leader收到了請求
    • 如果follower的epoch與leader相等,leader返回當前LEO,follower leo不會大於leader leo所以不會發生截斷,繼續后續的fetch數據同步流程。
    • 如果follower的epoch與leader不等,leader根據follower的epoch+1去本地epoch文件找到對應的startOffset返回給follower,follower會根據leader返回的startOffset來判斷,如果自己當前的LEO大於則截斷,小於不會發生截斷,繼續后續的fetch數據同步流程。
  • leader掛了收不到請求
    • 那么follower會成為leader更新epoch+startOffset,並不會發生截斷。老leader復活后與新leader會走上面epoch不一致時的流程。

對應剛剛上面的場景,如下圖:

消息存儲

kafka采用日志的存儲格式並將消息持久化到磁盤中,在每台broker下都能看到不同的分區下存儲的消息信息。

磁盤給大家的印象都是比較慢的,如何支撐kafka的高吞量的呢,kafka官網也給出了答案:傳送門 。大概總結下就是,磁盤並不是想象中的那么慢,而是看如何去使用。借助磁盤順序讀寫、以及操作系統頁緩存從而達到高性能高吞吐。

kafka采用LogSegment的思想來對日志進行分段存儲,一個LogSegment中存在一個日志文件和一個索引文件,日志文件是用來存儲消息的,索引文件的保持消息的索引用來提高查找速度的。

為什么要采用LogSegment分段的思想來構建,單體文件會隨着消息的增加而無線擴張,形成一個巨型文件,對於消息的查找、清理、以及維護都會帶來更大的成本。LogSegment的思想就是將一個巨型文件按規則拆分成多個大小均勻的小文件,來提高磁盤的利用率。

消息文件

kafka通過文件名+offset的方式來進行日志文件的分段,每一個文件名代表開始存儲的offset。例如:

LogSegment1保持是消息范圍 (0,19282)
LogSegment2保持是消息范圍 (19283,48473)
LogSegment3保持是消息范圍 (19283,48473)

LogSegment的增長是取決於當前offset的LogSegment的大小,如果LogSegment的文件大小超過閾值(log.segment.bytes默認1GB)就會向前新增一個LogSegment段。

索引文件

.index文件是用來加速定位消息的物理位置的,kafka中索引屬於稀疏索引,它們不會保存每一條記錄的索引而且一個范圍或者說是N個記錄的索引區間。存儲的格式為:

offset:10331  position: 3293
offset:20331  position: 4270
offset:32331  position: 6389

第一條記錄對應offset范圍(0,10331)
第二條記錄對應offset范圍(10332,20331)
第三條記錄對應offset范圍(20332,32331)

offset為消息的偏移量,position為當前offset消息在日志文件中的物理偏移量地址。

index中數據的增長速率根據log.index.interval.bytes(默認4kb)來決定,也就是日志文件記錄多少kb大小的消息后記錄一條索引

如何查找消息流程

1、根據offset值采用二分查找法,快速找到Logegment段對應的index文件。
2、在索引文件中根據offset找到對應范圍的offset值映射的position地址
3、打開日志文件,從獲得position地址按順序向前查找,比較offset值最終找到offset對應的消息。

日志清理策略

kafka會定期的對日志清理,因為消費過的日志本質上已經沒有用了,不可能然消息一直占用磁盤空間。kafka提供了二中清理策略:清除單位是LogSegment

1、根據時間清理,默認會清除7天之前的日志文件包括索引文件。
2、根據所有LogSegment大小清理,超過一定閾值后(默認為不限制大小),會刪除舊的數據(根據總大小和閾值計算)。

日志壓縮

日志壓縮這個是一種日志去重策略,可以有效的減少日志文件的大小,根據日志的key進行去重,一個key只保留最新的一條記錄。

例如:修改名稱的場景,一個用戶可能修改了多次名稱,只有最后一次的名稱是有效的,前面的都是不生效沒用的。只保留最后一次的名稱即可。

高可靠保障

任何消息中間件都不能保證百分之百可靠,只能說無限接近百分之百。那么在kafka中有哪些保障策略我大概梳理了一下:

存儲層面

分區副本的概念,創建多個分區來保障數據存儲的可靠性以及吞吐量,但同樣也會帶來性能開銷。

生產者層面

ack機制,生產者發生消息到broker,broker要響應producer消息提交成功。但是也要保證消息到了broker並完成了多個副本的同步,這里可能會造成性能的犧牲。kafka這里開放了策略讓開發者進行選擇:生產者確認機制

消費者層面

消費者可以根據場景選擇自動批量提交以及手動提交等方式來進行消費確認機制。

消息傳輸層面

1、At most once: 消息可能會丟,但絕不會重復傳輸
2、At least once:消息絕不會丟,但可能會重復傳輸

producer端:

在消息傳輸過程中,所有消息中間件都會面臨一個問題,當producer消息發出去后,由於網絡問題導致通信的中斷,producer無法知道自己的數據是否已經提交,那么就面臨如果不care,那么可能造成消息丟失,如果retry多次這條消息,那么就有可能出現同樣的消息發送了多次,導致消費者可能消費多次出現重復消費的情況。

consumer端:

當 consumer 讀完消息之后先 commit 再處理消息,在這種模式下,如果 consumer 在 commit 后還沒來得及處理消息就 crash 了,下次重新開始工作后就無法讀到剛剛已提交而未處理的消息,這就對應於 at most once 了。

讀完消息先處理再 commit。這種模式下,如果處理完了消息在 commit 之前 consumer crash 了,下次重新開始工作時還會處理剛剛未 commit 的消息,實際上該消息已經被處理過了,這就對應於 at least once。

在各個商業級的消息中間件領域也都是寧願重復消費,也不能導致消息丟失的場景發生,kafka也是一樣。由開發者來根據自己業務保證消費端的冪等性。

如果必須保證業務消息去重,kafka提出了全局消息唯一id的概念,保證發送端冪等性,消費端引入第三方緩存,根據消息全局唯一id來存儲,保證consumer crash時消息會被持久化。這也是kafka提出的一些思路,但這樣必然大大增加的系統復雜度。

總結

作為一款主流的分布式中間件,核心要保證三大要素,水平擴容、高可用、高性能,針對這三個要素我們簡單總結下kafka中的一些值得學習的設計思想:

水平擴容

采用partition的思想將topic按需拆分均勻的分布在各個節點中,提升處理性能以及存儲容量,並可以進行水平擴容。解決了存儲問題同時也加強了消息處理性能。

高可用

采用分區副本機制,來保證topic中的每個分區消息不會出現單點問題,副本的同步這個機制又延伸到 ISR 機制、LEO&HW機制,這些都是一些很好的設計思想很值得學習。

ISR機制與Quorum機制兩種都是在數據備份場景中很好的設計思想

高性能

  • 消息存儲與檢索,采用LogSegment思想將大文件拆分、稀疏索引的設計。

  • 大量利用磁盤順序IO,操作系統層面 page cache(頁緩存)、IO 0拷貝機制大幅提高IO處理性能。

  • offset消費偏移量代替消息消費狀態,降低存儲效率、以及磁盤隨機IO帶來的性能開銷。詳細描述:傳送門

  • 一個consumer對應一個分區(同一個topic下),點對點訂閱,不用處理並發消費問題。

以上這些設計都是相對比較突出的好的設計思想點,在學習的過程中也遇到了很多比較巧妙的小設計值得我們學習,有興趣的同學可以繼續探索哈。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM