文章更新時間:2020/06/14
一、生產者
當我們發送消息之前,先問幾個問題:每條消息都是很關鍵且不能容忍丟失么?偶爾重復消息可以么?我們關注的是消息延遲還是寫入消息的吞吐量?
舉個例子,有一個信用卡交易處理系統,當交易發生時會發送一條消息到 Kafka,另一個服務來讀取消息並根據規則引擎來檢查交易是否通過,將結果通過 Kafka 返回。對於這樣的業務,消息既不能丟失也不能重復,由於交易量大因此吞吐量需要盡可能大,延遲可以稍微高一點。
再舉個例子,假如我們需要收集用戶在網頁上的點擊數據,對於這樣的場景,少量消息丟失或者重復是可以容忍的,延遲多大都不重要只要不影響用戶體驗,吞吐則根據實時用戶數來決定。
不同的業務需要使用不同的寫入方式和配置。具體的方式我們在這里不做討論,現在先看下生產者寫消息的基本流程:

流程如下:
-
- 首先,我們需要創建一個ProducerRecord,這個對象需要包含消息的主題(topic)和值(value),可以選擇性指定一個鍵值(key)或者分區(partition)。
- 發送消息時,生產者會對鍵值和值序列化成字節數組,然后發送到分配器(partitioner)。
- 如果我們指定了分區,那么分配器返回該分區即可;否則,分配器將會基於鍵值來選擇一個分區並返回。
- 選擇完分區后,生產者知道了消息所屬的主題和分區,它將這條記錄添加到相同主題和分區的批量消息中,另一個線程負責發送這些批量消息到對應的Kafka broker。
- 當broker接收到消息后,如果成功寫入則返回一個包含消息的主題、分區及位移的RecordMetadata對象,否則返回異常。
- 生產者接收到結果后,對於異常可能會進行重試。
一句話流程:
- 【創建包含topic和value的對象】
- 【消息到分配器】
- 【分配器返回分區信息】
- 【消息添加到相同topic中,並根據分區策略添加到對應的分區里面】
- 【成功寫入返回RecordMetadata,否則返回異常】
- 【異常可能重試】
二、消費者
消費者與消費組
假設這么個場景:我們從Kafka中讀取消息,並且進行檢查,最后產生結果數據。我們可以創建一個消費者實例去做這件事情,但如果生產者寫入消息的速度比消費者讀取的速度快怎么辦呢?這樣隨着時間增長,消息堆積越來越嚴重。對於這種場景,我們需要增加多個消費者來進行水平擴展。
Kafka消費者是消費組的一部分,當多個消費者形成一個消費組來消費主題時,每個消費者會收到不同分區的消息。假設有一個T1主題,該主題有4個分區;同時我們有一個消費組G1,這個消費組只有一個消費者C1。那么消費者C1將會收到這4個分區的消息,如下所示:

如果我們增加新的消費者C2到消費組G1,那么每個消費者將會分別收到兩個分區的消息,如下所示:

如果增加到4個消費者,那么每個消費者將會分別收到一個分區的消息,如下所示:

但如果我們繼續增加消費者到這個消費組,剩余的消費者將會空閑,不會收到任何消息:

總而言之,我們可以通過增加消費組的消費者來進行水平擴展提升消費能力。這也是為什么建議創建主題時使用比較多的分區數,這樣可以在消費負載高的情況下增加消費者來提升性能。另外,消費者的數量不應該比分區數多,因為多出來的消費者是空閑的,沒有任何幫助。
Kafka一個很重要的特性就是,只需寫入一次消息,可以支持任意多的應用讀取這個消息。換句話說,每個應用都可以讀到全量的消息。為了使得每個應用都能讀到全量消息,應用需要有不同的消費組。對於上面的例子,假如我們新增了一個新的消費組G2,而這個消費組有兩個消費者,那么會是這樣的:

在這個場景中,消費組G1和消費組G2都能收到T1主題的全量消息,在邏輯意義上來說它們屬於不同的應用。
小結:如果應用需要讀取全量消息,那么請為該應用設置一個消費組;如果該應用消費能力不足,那么可以考慮在這個消費組里增加消費者。
消費組與分區重平衡
可以看到,當新的消費者加入消費組,它會消費一個或多個分區,而這些分區之前是由其他消費者負責的;另外,當消費者離開消費組(比如重啟、宕機等)時,它所消費的分區會分配給其他分區。這種現象稱為重平衡(rebalance)。重平衡是 Kafka 一個很重要的性質,這個性質保證了高可用和水平擴展。不過也需要注意到,在重平衡期間,所有消費者都不能消費消息,因此會造成整個消費組短暫的不可用。而且,將分區進行重平衡也會導致原來的消費者狀態過期,從而導致消費者需要重新更新狀態,這段期間也會降低消費性能。后面我們會討論如何安全的進行重平衡以及如何盡可能避免。
消費者通過定期發送心跳(hearbeat)到一個作為組協調者(group coordinator)的 broker 來保持在消費組內存活。這個 broker 不是固定的,每個消費組都可能不同。當消費者拉取消息或者提交時,便會發送心跳。
如果消費者超過一定時間沒有發送心跳,那么它的會話(session)就會過期,組協調者會認為該消費者已經宕機,然后觸發重平衡。可以看到,從消費者宕機到會話過期是有一定時間的,這段時間內該消費者的分區都不能進行消息消費;通常情況下,我們可以進行優雅關閉,這樣消費者會發送離開的消息到組協調者,這樣組協調者可以立即進行重平衡而不需要等待會話過期。
在 0.10.1 版本,Kafka 對心跳機制進行了修改,將發送心跳與拉取消息進行分離,這樣使得發送心跳的頻率不受拉取的頻率影響。另外更高版本的 Kafka 支持配置一個消費者多長時間不拉取消息但仍然保持存活,這個配置可以避免活鎖(livelock)。活鎖,是指應用沒有故障但是由於某些原因不能進一步消費。
Partition 與消費模型
Q:Kafka 中一個 topic 中的消息是被打散分配在多個 Partition(分區) 中存儲的, Consumer Group 在消費時需要從不同的 Partition 獲取消息,那最終如何重建出 Topic 中消息的順序呢?
A:沒有辦法。Kafka 只會保證在 Partition 內消息是有序的,而不管全局的情況。
Q:Partition 中的消息可以被(不同的 Consumer Group)多次消費,那 Partition中被消費的消息是何時刪除的? Partition 又是如何知道一個 Consumer Group 當前消費的位置呢?
A:無論消息是否被消費,除非消息到期 Partition 從不刪除消息。例如設置保留時間為 2 天,則消息發布 2 天內任何 Group 都可以消費,2 天后,消息自動被刪除。 Partition 會為每個 Consumer Group 保存一個偏移量,記錄 Group 消費到的位置。 如下圖:

Q:多個 partitions 的好處?
A:對 broker 上的數據進行分片,有效減少了消息的容量從而提升 io 性能,且不同消費者組可以消費不同的分區數據,也提高消費端的消費能力。
為什么 Kafka 是 pull 模型
Q:消費者應該向 Broker 要數據(pull)還是 Broker 向消費者推送數據(push)?
A:作為一個消息系統,Kafka 遵循了傳統的方式,選擇由 Producer 向 broker push 消息並由 Consumer 從 broker pull 消息。一些 logging-centric system,比如 Facebook 的Scribe和 Cloudera 的Flume,采用 push 模式。事實上,push 模式和 pull 模式各有優劣。
push 模式很難適應消費速率不同的消費者,因為消息發送速率是由 broker 決定的。push 模式的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成 Consumer 來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而 pull 模式則可以根據 Consumer 的消費能力以適當的速率消費消息。
對於 Kafka 而言,pull 模式更合適。pull 模式可簡化 broker 的設計,Consumer 可自主控制消費消息的速率,同時 Consumer 可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現不同的傳輸語義。
三、可靠性問題
當我們討論可靠性的時候,我們總會提到*保證**這個詞語。可靠性保證是基礎,我們基於這些基礎之上構建我們的應用。比如關系型數據庫的可靠性保證是ACID,也就是原子性(Atomicity)、一致性(Consistency)、隔離性(Isolation)和持久性(Durability)。
Kafka 中的可靠性保證有如下四點:
- 對於一個分區來說,它的消息是有序的。如果一個生產者向一個分區先寫入消息A,然后寫入消息B,那么消費者會先讀取消息A再讀取消息B。
- 當消息寫入所有in-sync狀態的副本后,消息才會認為已提交(committed)。這里的寫入有可能只是寫入到文件系統的緩存,不一定刷新到磁盤。生產者可以等待不同時機的確認,比如等待分區主副本寫入即返回,后者等待所有in-sync狀態副本寫入才返回。【參見第二篇的ACK應答機制:戳這里】
- 一旦消息已提交,那么只要有一個副本存活,數據不會丟失。
- 消費者只能讀取到已提交的消息。
使用這些基礎保證,我們構建一個可靠的系統,這時候需要考慮一個問題:究竟我們的應用需要多大程度的可靠性?可靠性不是無償的,它與系統可用性、吞吐量、延遲和硬件價格息息相關,得此失彼。因此,我們往往需要做權衡,一味的追求可靠性並不實際。
四、分區策略
Range strategy(范圍分區)
Range 策略是對每個主題而言的,首先對同一個主題里面的分區按照序號進行排序,並對消費者按照字母順序進行排序。假設我們有 10 個分區,3 個消費者,排完序的分區 將會是 0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消費者線程排完序將會是 C1-0, C2-0, C3-0。然后將 partitions 的個數除於消費者線程的總數來決定每個消費者線程消費幾個分區。如果除不盡,那么前面幾個消費者線程將會多消費一個分區。在例子里面,我們有 10 個分區,3 個消費者線程, 10 / 3=3,而且除不盡,那么消費者線程 C1-0 將會多消費一個分區,所以最后分區分配的結果看起來是這樣的:
- C1-0 將消費 0, 1, 2, 3 分區
- C2-0 將消費 4, 5, 6 分區
- C3-0 將消費 7, 8, 9 分區
假如我們有 11 個分區,那么最后分區分配的結果看起來是這樣的:
- C1-0 將消費 0, 1, 2, 3 分區
- C2-0 將消費 4, 5, 6, 7 分區
- C3-0 將消費 8, 9, 10 分區
假如我們有 2 個主題(T1 和 T2),分別有 10 個分區,那么最后分區分配的結果看起來是這樣的:
- C1-0 將消費 T1主題的 0, 1, 2, 3 分區以及 T2主題的 0, 1, 2, 3 分區
- C2-0 將消費 T1主題的 4,5,6 分區以及 T2主題的 4,5, 6 分區
- C3-0 將消費 T1主題的 7,8,9 分區以及 T2主題的 7,8, 9 分區
可以看出,C1-0 消費者線程比其他消費者線程多消費了 2 個 分區,這就是 Range strategy 的一個很明顯的弊端。
RoundRobin strategy(輪詢分區)
輪詢分區策略是把所有 partition 和所有 consumer 線程都列出來,然后按照 hashcode 進行排序。最后通過輪詢算法分配 partition 給消費線程。如果所有 consumer 實例的訂閱是相同的,那么 partition 會均勻分布。
在例子里面,假如按照 hashCode 排序完的 topic- partitions 組依次為 T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4,T1-7, T1-6, T1-9,我們的消費者線程排序為 C1-0, C1-1, C2- 0, C2-1,最后分區分配的結果為:
- C1-0 將消費 T1-5, T1-2, T1-6 分區
- C1-1 將消費 T1-3, T1-1, T1-9 分區
- C2-0 將消費 T1-0, T1-4 分區
- C2-1 將消費 T1-8, T1-7 分區
使用輪詢分區策略必須滿足兩個條件
- 1. 每個主題的消費者實例具有相同數量的流
- 2. 每個消費者訂閱的主題必須是相同的
Q:什么時候會觸發這個策略呢?
A:當出現以下幾種情況時,kafka 會進行一次分區分配操作,也就是 kafka consumer 的 rebalance:
- 同一個 consumer group 內新增了消費者
- 消費者離開當前所屬的 consumer group,比如主動停機或者宕機
- topic 新增了分區(也就是分區數量發生了變化)
PS:kafka consuemr 的 rebalance 機制規定了一個 consumer group 下的所有 consumer 如何達成一致來分配訂閱 topic 的每個分區。而具體如何執行分區策略,就是前面提到過的兩種內置的分區策略。而 kafka 對於分配策略這塊,提供了可插拔的實現方式,也就是說,除了這兩種之外,我們還可以創建自己的分配機制。
Q:誰來執行 Rebalance 以及管理 consumer 的 group 呢?
A:Kafka 提供了一個角色:coordinator【調配器】 來執行對於 consumer group 的管理,當 consumer group 的第一個 consumer 啟動的時候,它會去和 kafka server 確定誰是它們組的 coordinator。之后該 group 內的所有成員都會和該 coordinator 進行協調通信。
Q:如何確定 coordinator 【調配器】呢?
A:consumer group 如何確定自己的 coordinator 是誰呢, 消費者向 kafka 集群中的任意一個 broker 發送一個 GroupCoordinatorRequest 請求,服務端會返回一個負載最小的 broker 節點的 id,並將該 broker 設置為 coordinator。
參考資料
- kafka入門就這一篇(特此感謝!)
