前言:
在上篇中我們了解了Kafka是什么,為什么需要Kafka,以及Kafka的基本架構和各自的作用是什么,這篇文章中我們將從kafka內部每一個組成部分去看kafka 是如何保證數據的可靠性以及工作機制。因為時間問題,或許排版多有瑕疵,有些內容未能做到詳盡。待之后有空會前來填坑。話不多說,正片開始:
4.Kafka工作流程
Kafka中消息是以topic進行分類的,生產者生產消息,消費者消費消息,都是面向topic的。
topic是邏輯上的概念,而partition是物理上的概念,每個partition對應於一個log文件,該log文件中存儲的就是producer生產的數據。Producer生產的數據會被不斷追加到該log文件末端,且每條數據都有自己的offset。消費者組中的每個消費者,都會實時記錄自己消費到了哪個offset,以便出錯恢復時,從上次的位置繼續消費。
為什么分區?
1)方便在集群中擴展,每個Partition可以通過調整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應任意大小的數據了;
2)可以提高並發,因為可以以Partition為單位讀寫了。
Kafka 使用 Zookeeper 來維護集群成員的信息。每個 broker (每一個節點就是一個broker)都有一個唯一標識符,這個標識符可以自動生成,也可以在配置文件里指定(我們一般也這樣做,常見的做法是通過kafka安裝目錄下conf/server.properties 文件進行配置) 。配置如下:
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
# 這個id值 集群全局唯一
broker.id=2
工作流程
在 broker 啟動的時候,它通過創建臨時節點把自己的 ID 注冊到 Zookeeper, Kafka 組件訂閱 Zookeeper 的/brokers/ids 路徑 (broker在Zookeeper 上的注冊路徑),當有 broker 加入集群或退出集群時,這些組件就可以獲得通知。
在broker 停機、出現網絡分區或長時間垃圾回收停頓時,broker 會從 Zookeeper 上斷開連 接,此時 broker 在啟動時創建的臨時節點會自動從 Zookeeper 上移除。監聽 broker 列表的 Kafka 組件會被告知該 broker 已移除。
當集群啟動之后,Kafka集群開始工作了,如上圖所示:
首先,集群啟動后,集群中的broker會通過選舉機制選出一個控制器Controller,具體的選舉細節在后邊在進行細說。控制器除了具有一般的broker的功能之處,還負責分區leader的選舉。我們到現在已經知道:
-
kafka使用主題Topic來進行組織數據,
-
每個主題被分成若干個分區(分區一般在我們創建topic的時候指定,默認是1個分區);
-
每個分區有多個副本(副本數量一般同樣是我們創建的時候指定,默認為1 ,但是其值不能超過節點的個數,因為副本是均衡分布的)。
- 副本有兩種類型:leader 分區和follower分區。
- 所有的生產者和消費者請求都經過leader分區進行處理,
- follower分區不處理客戶端請求,只是從對應的leader分區同步消息,保持與leader分區一致的狀態。當有leader分區崩潰,其中一個follower分區會被提升為新的leader分區。(同樣,具體的選舉細節我們稍后在展開)
1.生產者寫入分區策略
生產者會創建一個ProducerRecord對象通過指定的主題向集群發送消息,ProducerRecord對象需要將消息的鍵值序列化才能在網絡中傳輸。數據被發送到集群中的某個broker的時候,這個時候會先經過分區器確認數據要寫入在那個分區。這時候分區器對於數據的鍵key進行檢測,會有如下三種情況:
- 1.如果指定了分區,分區器不會做任何事,直接返回指定的分區;
- 2.如果沒有指定分區,分區器會查看ProducerRecord對象的鍵key,當鍵值存在的時候,會將鍵的hash值與topic的partition數進行取模得到對應的分區信息:\(partition = Mod(hash(key),partitionNums)\)
- 3.沒有指定分區,且對應的key不存在的情況下,
- 【舊版本0.9x以前】:對每個連接,第一次會生成一個隨機數,分區信息=隨機數對分區數取余的值\(partition=Mod(round(),partitionNums)\),之后的數據對應的分區信息=(隨機數+(N-1))取余分區數;其中N為第幾次發送數據:比如第二次,N=2,第三次,N=3 。。。依次類推
- 【新版本】:第一次還是會生成一個隨機數,分區信息=隨機數對分區數取余的值\(partition=Mod(round(),partitionNums)\),但之后的數據會排除上次選擇的分區,在剩下的分區中隨機選擇一個:比如:TopicA有三個分區P0,P1,P2。第一次發送數據到P1,那第二次就是在P0、P2中隨機選擇一個分區,假設選擇了P2,第三次發送數據的時候則在P0、P1中隨機選擇一個作為分區信息。。。依次處理
確定好分區信息后,生產者就知道該往那個主題和那個分區發送該條記錄了。但是這個消息不會立即發送,而是將這條記錄添加到一個記錄批次中,這個批次的所有消息都是發送同一主題和分區的。批次發送有兩個參數:設定的時間和批次的容量,只要滿足其一就發送。發送是由一個獨立的線程負責處理。服務器也就是broker收到消息返回是否寫入成功,
- 寫入成功則返回消息對應的主題、分區信息以及記錄在分區中的偏移量也就是offset值。
- 如果寫入失敗,生產者有重試機制,再重試機制下都沒有發送成功,則返回錯誤信息。
2.Acks應答機制
當對應主題和分區的broker接收到一批數據寫入請求時,broker先進行一些驗證:
- 生產者是否有權限向主題寫入的權限?
- 請求里的acks值是否有效(只允許出現0、1 或all[之后-1 等同於all])
- 如果acks=all/-1,是否有足夠多的同步副本保證消息安全寫入。
我們知道kafka是分布式消息隊列,如何保證數據的可靠性和不丟失是重中之重。生產者寫入過程(Kafka 還從broker內部也就是分區的方面進行了可靠性的保障機制,稍后展開)如何進行可靠性的保證,Kafka采用了Acks應答機制。topic的每個partition收到producer發送的數據后,都需要向producer發送ack(acknowledgement確認收到),如果producer收到ack,就會進行下一輪的發送,否則重新發送數據。
Acks機制提供了三種可靠性級別:
- acks=0 意味着如果生產者能夠通過網絡把消息發送出去,那么就認為消息已成功寫入Kafka 。producer不等待broker的ack,這一操作提供了一個最低的延遲,broker一接收到消息還沒有進行寫入磁盤的操作,Ack就已經返回,當broker故障時有可能丟失數據;
- acks=1 意味着leader在收到消息並把它寫入到分區數據文件(不 定同步到磁盤上)時,會返回確認或錯誤響應。這種模式下依然可能存在丟失數據的可能。如果生產者消息已經被leader分區寫入了,ack返回成功,但是此時,leader分區的broker掛了或者崩潰了,之前有簡單提到,Controller會從follower分區中選擇一個follower作為新的leader分區,但是此時新的leader並沒有同步到剛剛的消息,此時消息就發生丟失。
- acks=all/-1 意味着leader在返回確認或錯誤響應之前,會等待所有同步副本都收到並同步消息。當前模式下,如果leader在所有follower都同步完消息,發送ack的時候,leader網絡問題,導致超時沒有發送成功ack,這時候producer會繼續發送同樣的數據,或者leader掛了,新的leader已經有了這批數據,對於producer發送的數據還要重新寫入。就導致數據重復了。
3.文件存儲機制
這時候,broker開始寫入producer發送的一批數據。Kafka是順序寫磁盤的方式持久化數據,在我們的認知中,是不是覺得寫磁盤很慢,但有大量數據請求寫入,怕是寫的黃花菜都涼了哦。別急,kafka 能夠如此火熱自然有其特殊之處,正所謂:沒有金剛鑽,不攬瓷器活嘛。Kafka是對於數據進行追加的方式順序寫入,這樣就減少了大量的磁頭尋址的時間。官網數據表明:同樣的磁盤,順序寫能到600M/s,而隨機寫只有100K/s。
由於生產者生產的消息會不斷追加到log文件末尾,為防止log文件過大導致數據定位效率低下,Kafka采取了分片和索引機制,將每個partition分為多個segment。每個segment對應三個文件——“.index”文件、“.timeindex”文件和“.log”文件。這些文件位於一個文件夾下,該文件夾的命名規則為:topic名稱+分區序號。
- index:log文件的索引
- log: 數據存儲文件
- timeindex: log文件數據的時間索引
segment的命名規則:
1、每個分區第一個segment的文件名= 0000000000000000000
2、后續第N個segment文件名 = 第N-1個segment中最后一個offset+1
segment給log文件建索引的時候是每個一段范圍[4k]建一個索引,是為稀疏索引。
“.index”文件存儲大量的索引信息,“.log”文件存儲大量的數據,索引文件中的元數據指向對應數據文件中message的物理偏移地址。如果我們現在查詢第三條數據即offset=2的數據:則其索引為000000000000000002,通過在index文件中確認其索引,找到對應的數據記錄的log中的地址,然后再找到對應的數據位置,讀取出來。
producer向leader寫入數據之后,follower需要向leader同步消息信息也就是需要復制多少份數據,但是我們應該配置多少個副本呢?又因為副本的均衡分布,就是一個broker只會有同主題同分區的一個副本。那配置副本就是配置broker,也就是需要多少個節點可以滿足我們數據的可靠性保證呢?在Kafka中,每個分區的默認副本數=3。就是說最小集群的配置數,HDFS的默認副本也是3,所以一般3副本就足以保證數據不會丟失,當然也要考慮機架配置的哦。如果復制系數為N ,那么在N-1個broker 失效的情況下,仍然能夠從主題讀取數據或向主題寫入數據。所以,更高的復制系數會帶來更高的可用性、可靠性和更少的故障。另一方面,復制系數N需要至少N個broker ,而且會有N個數據副本。我們可以根據自身需求來確認,比如:銀行為了保證數據更高的可靠性,就可以將復制系數設置為5。如果我們可以接受主題偶爾的不可用,也可以配置為2,當一台broker崩潰,另一台broker作為新的controller繼續進行后續的工作。
在前邊我們了解Ack應答機制有三種應答級別。最為可靠的設置為all。在當前應答級別下,假設leader收到數據,所有follower都開始同步數據,但有一個follower,因為某種故障,遲遲不能與leader進行同步,那leader就要一直等下去,直到它完成同步,才能發送ack。這個問題怎么解決呢?
4.ISR
Leader維護了一個動態的in-sync replica set (ISR),意為和leader保持同步的follower集合。當ISR中的follower完成數據的同步之后,leader就會給producer發送ack。如果follower長時間未向leader同步數據,則該follower將被踢出ISR,該時間閾值由replica.lag.time.max.ms參數設定。Leader發生故障之后,就會從ISR中選舉新的leader。ISR隊列中follower的選擇標准:
- 舊版本【0.9x之前】:follower分區的通信速率和副本的完整性,就是數據和leader相差越少,則完整性越高。這個很容易理解的
- 新版本:follower分區與leader的通信速率。
我們知道消費者也是只和leader分區進行通訊進行消費數據,如下:
- 當前消費者消費leader數據到offset=14這條,這時候leader崩潰了,需要重新選擇分區leader,假設follower1選為新的leader,那這個時候消費者向新leader消費offset=14的這個消息,leader分區沒有這個信息,如果生產者開始寫入數據,如果ack=1,那寫入的數據就是0ffset=16及之后的數據了,那么就會導致新的leader丟失數據。如果是ack=all,那么這時候生產者根據之前leader的返回信息,假定在offset=10這筆寫完,leader給生產者返回了寫入成功的消息,也就是說offset=11之后的數據是新的一批,這時候producer向新leader重新發送數據,就會產生重復數據。
- 針對於消費者,如果副本沒有同步的消息,其實是“不安全”的,如果我們允許消費者消費leader分區中其他副本沒有完全同步的消息就會破壞一致性,因此我們只能允許消費者消費全部副本都已經同步了的數據。Kafka在Log文件中引入了HW和LEO
- LEO:指的是每個副本最大的offset
- HW:指的是消費者能見到的最大的offset,ISR隊列中最小的LEO。
1)follower故障
follower發生故障后會被臨時踢出ISR,待該follower恢復后,follower會讀取本地磁盤記錄的上次的HW,並將log文件高於HW的部分截取掉,從HW開始向leader進行同步。等該follower的LEO大於等於該Partition的HW,即follower追上leader之后,就可以重新加入ISR了。
2)leader故障
leader發生故障之后,會從ISR中選出一個新的leader,之后,為保證多個副本之間的數據一致性,其余的follower會先將各自的log文件高於HW的部分截掉,然后從新的leader同步數據。
注意:這只能保證副本之間的數據一致性,並不能保證數據不丟失或者不重復。
Kafka消費者從屬於消費者群組。一個群組里的消費者訂閱的是同一個主題,每個消費者接收主題一部分分區的消息。假設主題T1有4個分區,我們創建了消費者C1,他是群組G1中唯一的消費者,我們用它訂閱主題T1,消費者C1將收到主題T1全部4個分區的消息:
如果群組G1新增一個消費者C2,那么每個消費者將分別從兩個分區接受消息。我們假設消費者C1消費分區0和分區2的消息,消費者C2接收消費分區1和分區3的消息,如圖:
如果群組G1有4個消費者,那么每個消費者都分配到一個分區:
如果我們繼續往群組里添加更多消費者,超過主題的分區數量,那么有一部分消費者就會被閑置,不會接收到任何的消息:
往群組里增加消費者是橫向伸縮消費能力的主要方式。 Kafka 消費者經常會做一些高延遲 的操作,比如把數據寫到數據庫或 HDFS ,或者使用數據進行比較耗時的計算。在這些情況下,單個消費者無法跟上數據生成的速度,所以可以增加更多的消費者,讓它們分擔負載,每個消費者只處理部分分區的消息,這就是橫向伸縮的主要手段。我們有必要為主題創建大量的分區,在負載增長時可以加入更多的消費者。不過要注意,不要讓消費者的數量超過主題分區的數量,多余的消費者只會被閑置。
除了通過增加消費者來橫向伸縮單個應用程序外,還經常出現多個應用程序從同 主題 讀取數據的情況。實際上, Kafka 設計的主要目標之 ,就是要讓 Kafka 主題里的數據能 夠滿足企業各種應用場景的需求。在這些場景里,每個應用程序可以獲取到所有的消息, 而不只是其中的部分。只要保證每個應用程序有自己的消費者群組,就可以讓它們獲取到主題所有的消息。不同於傳統的消息系統,橫向伸縮 Kafka 消費者和消費者群組並不 對性能造成負面影響。
在上面的例子里,如果新增 個只包含 個消費者的群組 G2 ,那么這個消費者將從主題 Tl 上接收所有的消息,與群組 Gl 之間互不影響。群組 G2 可以增加更多的消費者,每個 消費者可以悄費若干個分區,就像群組 Gl 那樣,如圖所示。總的來說,群組 G2 還是 會接收到所有消息,不管有沒有其他群組存在。
5.分區再均衡
我們通過上邊的例子知道,群組里的消費者共同讀取主題的分區。一個新的悄費者加 入群組時,它讀取的是原本由其他消費者讀取的消息。當一個消費者被關閉或發生崩憤時,它就離開群組,原本由它讀取的分區將由群組里的其他消費者來讀取。在主題發生變化時 比如管理員添加了新的分區,會發生分區重分配。 分區的所有權從 個消費者轉移到另 個消費者,這樣的行為被稱為再均衡。再均衡非常 重要, 它為消費者群組帶來了高可用性和伸縮性(我們可以放心地添加或移除梢費者), 不過在正常情況下,我們並不希望發生這樣的行為。在再均衡期間,消費者無法讀取消息,造成整個群組一小段時間的不可用。另外,當分區被重新分配給另一個消費者時,消費者當前的讀取狀態會丟失,它有可能還需要去刷新緩存,在它重新恢復狀態之前會拖慢應用程序。如何進行安全的再均衡,以及如何避免不必要的再均衡。
消費者通過向被指派為群組協調器的 broker (不同的群組可以有不同的協調器)發送心跳來維持它們和群組的從屬關系以及它們對分區的所有權關系。只要消費者以正常的時間間隔發送心跳,就被認為是活躍的,說明它還在讀取分區里的消息。消費者會在輪詢消息 (為了獲取消息)或提交偏移量時發送心跳。如果消費者停止發送心跳的時間足夠長,會話就會過期,群組協調器認為它已經死亡,就會觸發一次再均衡。 如果一個消費者發生崩潰,井停止讀取消息,群組協調器會等待幾秒鍾,確認它死亡了才 觸發再均衡。在這幾秒鍾時間里,死掉的消費者不會讀取分區里的消息。在清理消費者時,消費者會通知協調器它將要離開群組,協調器會立即觸發一次再均衡,盡量降低處理停頓。
6.分配分區的過程
當消費者要加入群組時,它會向群組協調器發送 Join Group 請求。第1個加入群組的消費者將成為“群主”。群主從協調器那里獲得群組的成員列表(列表中包含了所有最近發送過心跳的消費者,它們被認為是活躍的),並負責給每一個消費者分配分區。它使用 個實現了 PartitionAssignor接口的類來決定哪些分區應該被分配給哪個消費者。分配完畢之后,群主把分配情況列表發送給群組協調器,協調器再把這些信息發 送給所有消費者。每個消費者只能看到自己的分配信息,只有群主知道群組里所有消費者的分配信息。這個過程會在每次再均衡時重復發生。
上邊我們知道,分區會被分配個群組里的消費者。PartitionAssignor 根據給定的消費者和主題,決定哪些分區應該被分配給哪個消費者。Kafka 有兩個默認的分配策:Range 和RoundRobin。
Range:
該策略會把主題的若干個連續的分區分配給消費者。假設消費者C1和消費者 C2 同時 訂閱了主題 T1 和主題 T2 ,井且每個主題有3個分區。那么消費者 C1有可能分配到這兩個主題的分區1和分區3,而消費者 C2 分配到這兩個主題的分區2 。因為每個主題擁有奇數個分區,而分配是在主題內獨立完成的,第一個消費者最后分配到比第二個消費者更多的分區。只要使用了Range策略,而且分區數量無法被消費者數量整除,就會出現這種情況。
- [x] Range分區是對於主題而言,對於每一個主題都會與消費者組進行一次分配:
如圖中,第一次先分配Topic1的三個分區,對於消費者,就是C1先分第一塊,然后C2分得第二塊,C1接着分配到第三塊。同樣對於第二個Topic,同樣的順序進行分配分區,C1分得第一塊分區,C2分得第二塊分區,C1接着分得第三塊。最后就是C1分配到4個分區進行消費,而C2只分得兩個分區。
RoundRobin
該策略把主題的所有分區逐個分配給消費者。如果使用 RoundRobin 策略來給消費者 C1和消費者 C2 分配分區,那么消費者C1將分到主題 T1的分區1和分區3以及主題 T2 的分區2 ,消費者 C2 將分配到主題 T1分區2 以及主題T2的分區1和分區3。一般 來說 ,如果所有消費者都訂閱相同的主題(這種情況很常見), RoundRobin 策略會給所有消費者分配相同數量的分區(或最多就差1個分區)。
- [x] RoundRobin 是對消費者組而言,把消費者訂閱的主題的所有分區都看做是統一的整體進行分配:
如圖:RoundRobin策略相當於把當前消費者組訂閱的主題中所有分區看做統一的整體,然后對消費者群組中的每一個活躍者進行輪流分配。
7.提交和偏移量
7.1消費者消費流程
在此,我們有必要明白Consumer是如何消費的。調用了那些方法,做了那些行為來完成一次消費;
對於輪詢階段我們進行詳細分析說明:
- 1.while(true) 使用無限循環,是因為消費者實際上是 個長期運行的應用程序,它通過持續輪詢向Kafka 請求數據。
- 2.kafkaConsumer.poll(Duration.ofSeconds(1)):消費者必須持續對 Kafka進行輪詢,否則會被認為己經死亡,它的分區會被移交給群組里的其他消費者。傳給 poll()方法的參數是一個超時時間,用於控制poll()方法的阻塞時間(在消費者的緩沖區里沒有可用數據時會發生阻塞)。如果該參數被設為 0, poll()會立即返回 ,否則 它會在指定的時間內一直等待broker 返回數據。
- 3.poll ()方法能返回一個記錄列表。每條記錄都包含了記錄所屬主題的信息、記錄所在分區的信息。記錄在分區里的偏移量 ,以及記錄的鍵值對。我們一般會遍歷這個列表,逐條處理這些記錄。poll ()方法有一個超時參數,它指定了方法在多久之后可以返回, 不管有沒有可用數據都要返回。 超時時間的設置取決於應用程序對響應速度的要求, 比如要在多長時間內把控制權歸還給執行輪詢的線程。
- 4.在退出應用程序之前使用 close()方法關閉消費者。網絡連接和 socket 也會隨之關閉,並立即觸發一次再均衡,而不是等待群組協調器發現它不再發送心跳井認定它已死亡, 因為那樣需要更長的時間,導致整個群組在一段時間內無法讀取消息。
輪詢不只是獲取數據那么簡單。在第一次調用新消費者的 poll()方法時,它會負責查找 GroupCoordinator 然后加入群組,接受分配的分區。如果發生了再均衡,整個過程也 在輪詢期間進行 。當然,心跳也是從輪詢里發送出去的。所以,我們要確保在輪詢期間,所做的任何處理工作都應該盡快完成。
7.2偏移量維護
每次調用 poll ()方法,它總是返回由生產者寫入 Kafka 但還沒有被消費者讀取過的記錄 我們因此可以追蹤到哪些記錄是被群組里的哪個消費者讀取的。這是 Kafka 個獨特之處。消費者可以使用 Kafka 來追蹤消息在分區里的位置(偏移量)。
我們把更新分區當前位置的操作叫作提交。消費者消費消息是按照批次進行的。
那么消費者是如何提交偏移量的呢?消費者往一個叫作 __consumer_offset 特殊主題發送消息,消息里包含每個分區的偏移量。如果消費者一直處於運行狀態,那么偏移量就沒有什么用處。不過,如果悄費者發生崩潰或者有新的消費者加入群組,就會觸發再均衡,完成再均衡之后,每個消費者可能分配到新的分區,而不是之前處理的那個。為了能夠繼續之前的工作,消費者需要讀取每個分區最后一次提交的偏移量,然后從偏移量指定的地方繼續處理。說人話也就是說消費者群組發生變化的時候或者消費者組重啟之后,要能從上一次消費的地方接着消費數據。消費者組會知道並記錄每一次消費的時候消費者消費主題分區的最后一個消息的offset,這樣,下一次當前消費者組再開始消費的時候,就能從具體分區的最后一次消費的地方接着消費。
如果提交的偏移量小於客戶端處理的最后一個消息的偏移量 ,那么處於兩個偏移量之間的消息就會被重復處理,如圖:
如果提交的偏移量大於客戶端處理的最后 個消息的偏移量,那么處於兩個偏移量之間的 消息將會丟失:
所以,處理偏移量的方式對客戶端會有很大的影響。
7.3自動提交
最簡單的提交方式是讓悄費者自動提交偏移量。如果 enable .auto.commit 被設為 true ,那 么每過5s,消費者會自動把從 poll()方法接收到的最大偏移量提交上去。提交時間間隔 auto.commit.interval.ms 控制,默認值是 5s 。與梢費者里的其他東西一樣,自動提交也是在輪詢里進行的。消費者每次在進行輪詢時會檢查是否該提交偏移量了,如果是,那么就會提交從上一次輪詢返回的偏移量。
不過當前策略有什么缺陷呢?可以想想。
假設我們仍然使用默認的 5s 提交時間間隔,在最近一次提交之后的 3s 發生了再均衡,再均衡之后,消費者從最后一次提交的偏移量位置開始讀取消息。這個時候偏移量已經落后 3s ,所以在這 3s 內到達的消息會被重復處理。可以通過修改提交時間間隔來更頻繁地提交偏移量,減小可能出現重復悄息的時間窗,不過這種情況是無法完全避免的。
在使用自動提交 ,每次調用輪詢方法上一次調用返回的偏移量提交上去,它並不知道具體哪些消息已經被處理了,所以在再次調用之前最好確保所有當前調用返回的消息都已經處理完畢(在調用 close()方位之前也 行自動提交)。
7.4手動提交
我們可以通過控制提交偏移量的時間盡可能消除丟失消息的可能性和再均衡時重復消費數據的數量。此外消費者API 提供了另一種提交偏移量的方式 ,讓我們可以基於處理消息的時候需要提交的去提交當前偏移盤,而不是基於時間間隔。
首先我們需要在消費者的配置中關閉自動提交參數:auto.commit.offset 設為false,讓應用程序決定何時提交偏移量。
7.5同步提交
使用 commitSync() 提交偏移量最簡單最可靠,因為這個方法是同步方法。這個方法會提交由 poll()方法返回的最新偏移量,提交成功后馬上返回,如果提交失敗就拋出異常。
要記住, commitSync() 將會提交由 poll ()返回的最新偏移量,所以在處理完所有記錄后要確保調用了 commitSync() ,否則還是會有丟失消息的風險。如果發生了再均衡,從最近一批消息到發生再均衡之間的所有消息都將被重復處理。
7.6異步提交
同步提交有一個不足之處, 在broker對提交請求作出回應之前,應用程序會一直阻塞,這樣會限制應用程序的吞吐量。我們可以通過降低提交頻率來提升吞吐量,但如果發生了再均衡, 會增加重復消息的數量。這時候我們可以使用異步提交方式進行提交:commitASync()。
這時候我們只發送提交請求,不用等待broker的響應.
總結
我們從kafka 的工作機制,從生產者 到broker集群到消費者全套的工作流程和內部的細節都有一個比較清晰的認知了。今天匆匆就結束這個篇章,待之后再來細化排布。
怕什么真理無窮,進一寸有一寸的歡喜。我是清風,希望這篇文章對你有幫助。如有不准確之處,還請評論區留言討論。