摘要
在這篇文章中,我將從消息在Kafka中的物理存儲方式講起,介紹分區-日志段-日志的各個層次。
然后我將接着上一篇文章的內容,把消費者的內容展開講一講,區分消費者與消費者組,以及這么設計有什么用。
根據消費者的消費可能引發的問題,我將介紹Kafka中的位移主題,以及消費者要怎么提交位移到這個位移主題中。
最后,我將聊一聊消費者Rebalance的原因,以及不足之處。
1. log
在上一篇文章中,我們提到了“partition”的概念。
我們那個時候所表達的意思是,消息的生產跟消費是處於topic中的partition這個維度的,而不是位於主題的維度。
也就是說,我們那個時候對Kafka的理解,是處在topic下的每個parititon,都有一個稱為“隊列”的數據結構,所有送往這個主題的消息,會被分配到其中的一個parititon中。
這樣的設計可以避免消息隊列的性能在IO上具有瓶頸。
在這一節中,我們將進一步的解釋Kafka的消息儲存方式。
我們所理解的“消息”,在Kafka中被稱為日志。
在每一個broker中,保存了多個名字為{Topic}-{Parititon}
的文件夾,例如Test-1
、Test-2
。
這里的意思是,這個broker中能夠處理topic為Test,分區為1和2的消息。
但是注意,對於“parititon”這個名詞來說,他也是一個邏輯上的概念,對應在broker中只是一個文件夾,那么什么才是物理意義上的概念呢,我們接着往下看。
在{Topic}-{Parititon}
的文件夾內部,包含了很多很多的文件,里面的文件名都是64位的長整數。
例如:
在這張圖中,一個分區,包含了多個Log Segment
。注意,這里的Log Segment
也是邏輯上的概念,只有具體到具體的日志文件,才是物理上的概念。
我們看圖片最右邊的部分,文件名都是20位的整數,這個數字稱為消息的“基准偏移量”。例如我們第二個Log Segment是從121開始的,那么代表了這個日志段的第一條消息的偏移量是從121開始的,也代表了在這之前有121條日志記錄。
注意,因為我們的偏移量是從0開始的,所以在121這個偏移量之前有121條數據,而不是120條。
然后我們再聊聊文件的格式,我們看到這里有三種類型的文件,*.log
、*.index
、*.timeindex
。
log格式的文件記錄了消息,index是偏移量索引,timeindex是時間戳索引。但是這個我們不展開聊,這篇文章的定位還是偏向於了解各個組件。
如此一來,broker在接收到生產者發過來的消息的時候,需要將消息寫在最后的Log Segment中。這樣還帶來了一個好處,消息的寫入是順序的IO。也因為如此,最后的一個Log Segment,被稱為“active Log Segment”。
2. 消費者與消費者組
在上一篇文章中,我們只提到了“消費者”這個概念。
同樣在本文中,我們將更深入更准確的了解位於Kafka中的“消費者”。
其實在Kafka中,消費者是以消費者組的形式對外消費的。
我們作一個假設,假設沒有消費者組這種概念,我們現在有10個消費者訂閱了同一個主題,那么當這個主題有新的消息之后,我們這10個消費者是不是應該去“搶消息”進行消費呢?
這是一種浪費資源的表現。
所以消費者組,也可以認為是一種更加合理分配資源,進行負載均衡的設計。
假設有5個消費者屬於同一個消費者組,這個消費者組訂閱了一個具有10個分區的主題,那么組內的每一個消費者,都會負責處理2個分區的消息。
這樣,能夠保證當一條消息發送到主題中,只會被一個消費者所消費,不會造成重復消費的情況。
此外,消費者組的設計還能夠令我們很方便的橫向擴展系統的消費能力。設想一下在我們發覺系統中消息堆積越來越多,消費速度跟不上生產速度的時候,只需要新增消費者,並且將這個消費者划入原來的消費者組中,Kafka會自動調整組內消費者對分區的分配,這個過程稱為重平衡,我們在后面會提到。
但是需要注意的是,組內消費者的數量不能超過主題的分區數目。否則,多出的消費者將會空閑。例如一個主題具有10個分區,而組內有11個消費者,那么這多出來的一個消費者將空閑。
Kafka這樣的設計是為了同一個分區只能夠被一個消費者所消費,這個跟位移管理有關,我們在后文會提到。
另外,Kafka還支持多個消費者組訂閱同一個主題,這樣,相同的消息將被發送到所有訂閱了這個主題的消費者組中。
注意:我們說到了同一分區只能被同一個消費者消費,但是這個說法的前提是這些消費者位於同一個消費者組。也就是說,不同消費者組內的消費者,是可以消費同一個主題分區的。
所以,我們也可以認為Kafka的消費者組,是為了實現點對點以及廣播這兩種方式的消息傳遞。
3. 位移主題
我們在上一個小節提到了消費者組內的消費者對分區內的信息進行消費,並且存在了消費者的加入與退出這種情況。
所以在這節我們來聊聊Kafka是怎么做到在消費者有變動的情況下,消息不會丟失或者重復消費。
我們可以很容易的想到,只要記錄下消費過的位移,就能夠實現上述的目標了。
我們直接聊聊位移主題這種方式,不管以前的將位移保存在zk中的實現方式。
在Kafka中有一種特殊的主題,稱為位移主題,在Kafka中的主題名稱是__consumer_offsets
。
因為位移主題也是一個主題,所以也符合Kafka中主題的各種特性,我們可以隨意的發送消息,拉取消息,刪除主題。但是因為這個主題的數據是kafka設計好的,所以不能隨意的發送消息過去,否則在broker端不能解析的話,就會造成崩潰。
然后我們討論一下發往位移主題的消息格式。因為我們是希望保存位移,所以很容易會想到這是一個KV結構。那么Key中應該保存哪些消息呢?
Key中包含了主題名,分區名,消費者組名。
其實在這里是不需要保存消費者id之類的信息的,也就是說只需要具體到是哪個消費者組在哪個主題的哪個分區消費了多少數據,就足夠了。為什么呢?因為我們上文也提到了,消費者是可能發生變動的,我們的目的是讓消費者發生變動后,能知道從哪里繼續消費。因此,位移信息的精確度到消費者組級別,就足夠了。
並且,在Value中,只需要保存消費位移,就足夠了。
說完了位移信息是怎么保存的,我們再來聊聊位移主題本身。因為位移主題也是一個主題,所以必然也會有分區,也會有副本。那么消費者在消費了信息之后,該把位移發送到哪呢?
Kafka中的位移主題會在第一個消費者被創建的時候創建,默認會有50個分區。消費者在提交位移的時候,會根據自己組id的hash值模位移主題的分區數,所得到的結果就是位移信息該提交的分區id,然后找到這個分區id的leader節點,將位移信息提交到這個leader節點所在的broker中。
4. 位移的提交
聊了位移主題,我想你大概明白Kafka關於位移狀態的保存了,那么在這一節中,我們來聊聊位移是怎么被提交的。
在說到位移的提交之前需要明確的是:雖然有了位移主題這樣的設計,但是並不代表了消息一定不會被重復消費,也不代表消息一定不會丟失。
另外,Kafka會嚴格的執行位移主題中所提交的信息。例如已經消費了0-20的消息,如果你提交的位移是100,那么下一次拉取的信息一定是從100開始的,20-99的消息將會丟失。又比如你提交的位移是10,那么10-20的消息將會被重復消費。
在Kafka中,位移的提交有兩種方式,一種是自動提交,一種是手動提交。
4.1 自動提交
位移的自動提交是在POLL操作的時候進行的。
在消費者POLL拉取最新的消息之前,會先判斷目前是否已經到了提交位移的Deadline時間點,如果已經到了這個時間,則先進行位移的提交,然后再拉取信息。
注意,這里可能會發生如下的情況:
在某一時刻提交了位移100,隨后你拉取了100-150的消息,但是還沒有到下一次提交位移的時候,消費者宕機了。可能這個時候只消費了100-120的消息,那么在消費者重啟后,因為120的位移沒有提交,所以這部分的消息會被重復消費一次。
再設想一種情況,你拉取了100-150的消息,這個時候到了自動提交的時間,提交了150的這個位移,而這個時候消費者宕機了,重啟之后會從150開始拉取信息處理,那么在這之前的信息將會丟失。
4.2 手動提交
對於因為自動提交而造成的信息丟失和重復消費,你可以采取手動提交的方式來避免。
手動提交又分為同步提交和異步提交兩種提交方式。
同步提交會直到消息被寫入了位移主題,才會返回,這樣是安全的,但是可能造成的問題是TPS降低。
異步提交是觸發了提交這個操作,就會返回。這樣速度是很快的,但是可能會造成提交失敗的情況。
5. Rebalance
我們在上面的內容中提到過這么一種情況:
消費者組內的成員增減,導致組內的成員需要重新調整他需要負責的消費的分區。
這種情況我們稱為“Rebalance”,或者稱為“重平衡”。
用專業一點的話來下定義就是:某個消費組內的消費者就如何消費某個主題的所有分區達成一個共識的過程。
但是這個過程對Kafka的吞吐率影響是巨大的,因為這個過程有點像GC中的STW(世界停止),在Rebalance的時候,所有的消費者只能去做重平衡這一件事情,不能消費任何的消息。
下面我們來說說哪些情況可能會導致Rebalance:
- 組內成員數量發生了變化
- 訂閱主題的數量發生了變化
- 訂閱主題的分區數量發生了變化
而且在Rebalance的時候,假設有消費者退出了,導致多出了一些分區,Kafka並不是把這幾個多出來的分區分配給原來的那些消費者,而是所有的消費者一起參與重新分配所有的分區。
當有新的消費者加入的時候,也不是原本的每個消費者分出一些分區給新的消費者,而是所有的消費者一起參與重新分配所有的分區。
這樣的分配策略聽起來就很奇怪且影響效率,但是沒有辦法。
不過社區新推出了StickyAssignor(粘性分配)策略,就可以做到我們上面假設的情況,但是目前還存在一些bug。
寫在最后
首先,謝謝你能看到這里!
關於Kafka的前兩篇文章,我認為都是科普性質的,希望可以用比較簡單的方式給你梳理一遍Kafka具有的功能,以及各個功能的運作方式。
在后面的文章中,我也希望能夠比較清晰易懂的給你介紹Kafka的一些原理之類的東西。
因為作者也剛開始研究Kafka,很多地方的理解可能還是不到位的,所以在這期間如果你發現有什么問題,或者有哪些地方是我解釋的不好的,請留言告訴我,謝謝你!
再次感謝你能看到這里!
PS:如果有其他的問題,也可以在公眾號找到我,歡迎來找我玩~