摘要
在這一篇文章中,我將向你介紹消費者的一些參數。
這些參數影響了每次poll()請求的數據量,以及等待時間。
在這之后,我將向你介紹Kafka用來保證消費者擴展性以及可用性的設計——消費者組。
在消費者組的介紹中,我將重點放在了Rebalance的過程上,因為這是一個很重要又經常發生,還會導致消費者組不可用的操作。
1 消費者參數配置
對於一個消費者來說,他要做的事情只有一件,那就是使用poll()來拉取消息。
至於他是從哪個分區拉取,則是靠消費者組來動態的調整這個消費者所消費的分區,又或者是由開發者來自定義。
但無論如何,這個消費者都需要通過poll()來拉取消息。
這也是這一節的內容:通過參數配置能夠影響poll操作的哪些內容。
首先需要確定一點,當消費者使用poll()拉取消息的時候,他只能拉到HW水位線及以下的消息。
1.1 分區配置
我們可以讓消費者針對於某一個分區進行消費。
為了實現這個目標,我們可以用assign()方法。
但是注意,當這個消費者不是單獨的一個消費者,而是屬於某個消費者組的時候,將不允許使用自定義的分區分配。
1.2 POLL操作拉取的字節數目
對應的配置分別是:
fetch.min.bytes
對於每次拉取的最小字節數,默認是1。當拉取的消息大小小於設定的這個限度時,將會等待,直到這次被拉取的消息大小大於這個值。
於是我們可以得知,當我們即將要消費的消息比較小時,可以適當的調大這個參數的值,以提高吞吐量。
但是注意,這也可能造成消息的額外延遲。
fetch.max.bytes
這個參數跟上面的一樣,只不過他代表的意義是最大的字節數。
但是這存在一個問題,如果我們的消息大小全都大於這個參數的值,會發生什么情況呢?
答案是會返回即將拉取分區的第一條消息。
也就是說在這個參數中,不存在“不符合條件就不返回數據”的情況。
還有一個參數,叫做max.partition.fetch.bytes
這個參數跟上面提到的每次拉取的最大字節數工作原理是一樣的,也是會保證當消息大於設定的值的時候,一定會返回數據。
而不同的地方在於,這個參數代表的是分區。也就是說,一個參數代表的是一次拉取請求,而另外一個參數代表的是針對於每一個分區的拉取請求。
1.3 拉取消息的超時時間
fetch.max.wait.ms
這個參數的意義在於:如果拉取消息的時間達到了這個參數設定的值,那么無論符不符合其他條件,都會返回數據。
那么你很容易可以猜到,這個參數跟fetch.min.bytes是有關系的,這是為了防止當fetch.min.bytes參數設置的過大,導致無法返回消息的情況。
當然了,這個參數還有一個意義,如果你的業務需要更小的延遲,那么應該調小這個參數。
1.4 最大拉取消息數
如果我們的最大拉取字節數設置成了非常大,那么是不是代表我們每一次的poll(),都能直接拉到HW水位呢?
答案是否定的。
還存在一個參數:
max.poll.records
這個參數的意義在於,每次拉取消息的最大數量。
同樣的,如果消息的大小都比較小,那么可以調大這個參數,以提高消費速度。
1.5 消費者組相關的參數
另外,還存在一些消費者組相關的參數,我在這里先提一下,具體更詳細的解釋,將在后文給出。
heartbeat.interval.ms
這個參數是設置消費者與消費者組對應的Coordinator發送心跳響應的間隔時間。
session.timeout.ms
這個參數是用於Coordinator判斷多長時間沒收到消費者的心跳響應而認為這個消費者已經下線的時間。
max.poll.interval.ms
這個參數用於Coordinator判斷多長時間內消費者都沒有拉取消息,而認為這個消費者已經下線的時間。
auto.offset.reset
這個參數其實跟消費者組的聯系不是很大,但是我認為可以寫在這里。
因為有這么一個場景,當消費者Rebalance之后,如果位移主題之前保存的位移已經被刪除了,那么這個參數就決定了消費者該從哪里開始消費。
當然了,關於消費者還有許多的參數,不僅僅是上文提到的這些。
而上文提到的這些參數,是我認為可以讓初學者更好的理解消費者的工作原理。
2 Rebalance原理
在解釋Rebalance的原理之前,我想先跟你說一下我的思路,免得你看的一頭霧水。
當然了,這個思路是我認為更適合我自己去理解的。你也可以先看第三大節,再有了一個大概的認識后,再來看這一節的內容。
我希望先告訴你Rebalance的過程是怎么樣的,這里說的過程指的是Rebalance已經發生了,那么在Rebalance的過程中,會發生哪些事情。
在這之后,我再跟你說說Rebalance的五種狀態。
那么,我們開始。
2.1 尋找Coordinator
首先,應該有一個認識。Rebalance的所有操作都是通過Coordinator的協調下完成的,組內的消費者之間並不會進行相關的通信與交流。
Coordinator你可以理解為是一個服務,位於某個broker節點上。
假設當前的消費者已經保存了這個這個節點的信息,那么將會直接進入第二步。
如果當前的消費者沒有保存這個信息(比如這是一個新加入這個消費者組的消費者),那么他需要先找到這個Coordinator所在的broker節點。
這里的broker節點,是這個消費者對應的消費者組對應的位移主題的分區的leader節點。
聽起來有點繞,讓我來再解釋一下。
消費者 -> 消費者組 -> __consumer_offsets -> partition -> leader
關於位移主題,我已經在第二篇文章中提到過了,在這里不再贅述。
但是在這里,讓我們來再來回憶一遍消費者組對應的partition是怎么找到的。
- 先獲取
Group ID的hash值 - 將這個hash值,對
__consumer_offsets的分區數取模 - 獲得的數字,就是這個消費者組提交位移的分區
- 找到這個分區對應的leader副本,即為Coordinator對應的broker節點
2.2 Join Group
在找到了對應的broker節點后,第二步是發送加入Group的請求。
在這一步中,無論是之前已經在Group內的成員,還是准備加入Group的成員,都需要發送Join Group的申請。
在發起的JoinGroupRequest中,需要包含如下的數據:
-
Group id
-
Session_timeout
-
Rebalance_timeout
-
Menber_id
-
Partition assignor
需要事先說明的是,這里的名稱並不嚴格,是為了更好的理解而這樣寫的。如果你想要知道更加嚴謹的請求內容,可以去看廝大的《深入理解Kafka》。
下面我們挨個解釋:
Group ID,消費者組ID,代表了即將加入的消費者組。
Session_timeout,上文中提到過這個參數,用於Coordinator判斷多長時間內沒收到客戶端的心跳包而認為這個客戶端已經下線。
Rebalance_timeout,值等同於max.poll.interval.ms,意義在於告知Coordinator用多長的時間來等待其他消費者加入這個消費者組。
我們在上文中提到,無論之前是不是這個消費者組的成員,只要開啟了Rebalance,就需要重新加入這個消費者組。因此,Coordinator需要一段時間來接受JoinGroupRequest的請求。
至於為什么需要一段時間來接受請求,以及這段時間發生了什么,我將在后面給你解釋。
menber_id,作為組內消費者的識別編號,如果是新加入組的消費者,這個字段留空。
Partition assignor,指的是分區分配方式。因為Rebalance這個過程,就是分區分配的一個過程。每個消費者將其接受的分配方式放在這個字段中,隨后由Coordinator選出每個消費者都認可的分區分配方式。
然后我們來聊聊在這個階段,Coordinator需要做什么。
Coordinator需要一段時間來接收來自客戶端的JoinGroupRequest請求,是因為Coordinator需要收集每一個成員的信息,選出leader和分區分配方式,因此,Coordinator需要足夠的時間來“收集信息”。這就回答了上文說到的為什么“Coordinator需要一段時間來接受JoinGroupRequest的請求”。
選舉leader的算法很簡單,第一個發送請求的consumer,就是leader。
選出分區分配策略的算法也很簡單,首先Coordinator會收集所有消費者都支持的分區分配方式,然后每個消費者為它支持的分配方式投上一票。注意,這里的投票行為沒有經過多一次的交互,而是在候選集中找到第一個消費者支持的分區分配方式,作為這個消費者所投的票。
當Coordinator選取好Leader和分區分配方式后,將返回JoinGroupResponse給各個消費者。
在返回給各個消費者的JoinGroupResponse中,包含了menber_id,分區分配方式等。而對於leader消費者來說,還將獲得組內其他消費者的元數據,包含了各個消費者的menber_id,分區分配方式。
至此,JoinGroup階段完成。
注意,每個消費者從發送JoinGroupRequest到接收到JoinGroupResponse請求這段時間,是阻塞的。
2.3 分配分區
在第二步結束之后,每個消費者已經知道了自己的menber_id,以及Coordinator所選擇的分區分配方式。
但是此時每個消費者還不知道自己應該消費哪個分區。
這個分區分配的過程,是交給Leader消費者來完成的。
但是注意,雖然說這個過程是Leader消費者完成的,但是Leader消費者並不會跟其他消費者直接通信,而是將分配方式告知Coordinator,由Coordinator來告知各個消費者。
這個過程,稱為Sync_Group。
在這個過程中,每一個消費者都會發送SyncGroupRequest給Coordinator。要注意的是,Leader消費者在這個Request中還附帶了其他消費者的分區分配信息。
在Coordinator收到了這些請求后,會將這個分區分配方案等元數據保存在__consumer_offsets主題中。
隨后,Coordinator將發送響應給各個消費者。
在這個響應中,包含了各個消費者應該負責消費的分區編號。
至此,每個消費者都了解了自己應該消費的分區是哪些了。
2.4 消費並發送心跳包
在上一個階段中,組內各個消費者已經知道了自己負責的是哪些分區。
但是還存在一個問題,消費者應該從分區的哪個位置開始消費呢?
這就用到了__consumer_offsets主題了,這個主題保存了某個消費者組的各個分區的消費位移。
此外,每個消費者還需要不斷地發送心跳包給Coordinator,以告知Coordinator自己沒有下線。
這個發送心跳包的時間,就是我們設置的heartbeat.interval.ms參數。
在每個心跳包的響應中,Coordinator就會告知這個消費者,需不需要Rebalance。
那么也就說明了,這個參數設置的越小,消費者就越早能夠得知是否需要Rebalance。
而對應的session.timeout.ms,指的就是Coordinator在這么長的時間內沒收到消費者的心跳包,而認為這個消費者過期的參數。
3 消費者組的狀態轉移
在上面說完了Rebalance的核心原理后,我們再來聊聊消費者組的各個狀態。
先來介紹一下消費者組有哪幾種狀態:
- Empty:組內沒有任何的成員,但是保留着這些成員的元數據,比如在發生Rebalance的時候,Coordinator在心跳包的響應中告知消費者應該要進行Rebalance了,這個時候所有的消費者都離開了消費者組,那么這個消費者組就會處於Empty狀態。注意,一個新創建的消費者組,也處於這個狀態。
- Dead:組內沒有任何的成員,並且在
__consumer_offsets中也沒有保存這個消費者組的元數據。通常發生在這個消費者組被刪除了,或者__consumer_offsets分區leader發生了改變。(至於這個狀態我了解的也不是很多,如果可以的話,麻煩你評論區告訴我。) - PreparingRebalance:這個狀態為Coordinator正在等待Consumer加入。這個狀態對應於JoinGroup階段,會持續
Rebalance_timeout這么長的時間。 - CompletingRebalance:也被稱為AwaitingSync,為Coordinator正在等待Leader消費者的分區分配方案。對應於SyncGroup階段。
- Stable:到了這個階段,消費者組已經在正常工作了。
消費者組的狀態介紹大概就是這樣的。
簡單的來講,當一個消費者組需要Rebalance的時候,他就會進入PreparingRebalance階段,然后一直流轉到Stable階段。
在這個期間,如果有任何的成員變動,就會回到PreparingRebalance階段。
在這個期間,如果Coordinator改變,或者消費者組被刪除等,就會進入Dead階段。
下面是狀態轉移圖,你可以結合上面的文字解釋來查閱:

寫到最后
首先,謝謝你能看到這里!
在這一篇文章中,我沒有像介紹生產者那樣介紹一遍源碼。
因為對於生產者來說,他只需要將消息發送到broker中,而對於消費者來說,這個過程復雜得多,我希望能夠用比較淺顯易懂的方式,讓你能夠了解消費者組的工作方式。
在有了這樣的一個認識之后,無論使用什么客戶端,我認為都不會有太大的問題。
此外,在這一篇中我花了較大的筆墨去介紹Rebalance的過程,是因為Rebalance是一個很常見的現象,而且在這期間會導致Kafka消費者的不可用,所以我希望了解了Rebalance的工作原理,能夠讓你更容易的避免不必要的Rebalance。
當然了,因為作者才疏學淺能力有限,可能在這個過程中忽略了一些很重要的細節,又或者有一些錯誤的理解。如果你發現了,還請不吝指教,謝謝你!
再次謝謝你能看到這里,感恩~
PS:如果有任何的問題,可以在公眾號找到我,歡迎來找我玩!

