一、偏移量提交
消費者提交偏移量的主要是消費者往一個名為_consumer_offset
的特殊主題發送消息,消息中包含每個分區的偏移量。
如果消費者一直運行,偏移量的提交並不會產生任何影響。但是如果有消費者發生崩潰,或者有新的消費者加入消費者群組的時候,會觸發 Kafka 的再均衡。這使得 Kafka 完成再均衡之后,每個消費者可能被會分到新分區中。為了能夠繼續之前的工作,消費者就需要讀取每一個分區的最后一次提交的偏移量,然后從偏移量指定的地方繼續處理。
但是這樣可能會出現如下的問題。
1.1 提交偏移量小於客戶端處理的偏移量
如果提交的偏移量小於客戶端處理的最后一個消息的偏移量,那么處於兩個偏移量之間的消息就會被重復處理。
1.2 提交偏移量大於客戶端處理的偏移量

如果提交的偏移量大於客戶端處理的最后一個消息的偏移量,那么處於兩個偏移量之間的消息將會丟失。
因此,如果處理偏移量,會對客戶端處理數據產生影響。KafkaConsumer API 提供了很多種方式來提交偏移量。
二、自動提交
自動提交是 Kafka 處理偏移量最簡單的方式。
當 enable.auto.commit 屬性被設為 true,那么每過 5s,消費者會自動把從 poll()方法接收到的最大偏移量提交上去。這是因為提交時間間隔由 auto.commit.interval.ms 控制,默認值是 5s。與消費者里的其他東西一樣,自動提交也是在輪詢里進行的。消費者每次在進行輪詢時會檢查是否該提交偏移量了,如果是,那么就會提交從上一次輪詢返回的偏移量。
但是使用這種方式,容易出現提交的偏移量小於客戶端處理的最后一個消息的偏移量這種情況的問題。假設我們仍然使用默認的 5s 提交時間間隔,在最近一次提交之后的 3s 發生了再均衡,再均衡之后,消費者從最后一次提交的偏移量位置開始讀取消息。這個時候偏移量已經落后了 3s(因為沒有達到5s的時限,並沒有提交偏移量),所以在這 3s 的數據將會被重復處理。
雖然可以通過修改提交時間間隔來更頻繁地提交偏移量,減小可能出現重復消息的時間窗的時間跨度,不過這種情況是無法完全避免的。
在使用自動提交時,每次調用輪詢方法都會把上一次調用返回的偏移量提交上去,它並不知道具體哪些消息已經被處理了,所以在再次調用之前最好確保所有當前調用返回的消息都已經處理完畢(在調用 close() 方法之前也會進行自動提交)。一般情況下不會有什么問題,不過在處理異常或提前退出輪詢時要格外小心。
三、手動提交
大部分開發者通過控制偏移量提交時間來消除丟失消息的可能性,並在發生再均衡時減少重復消息的數量。消費者 API 提供了另一種提交偏移量的方式,開發者可以在必要的時候提交當前偏移量,而不是基於時間間隔。
這是我們需要把把 auto.commit.offset 設為 false,讓應用程序決定何時提交偏移量。
3.1 同步提交
使用 commitSync() 提交偏移量最簡單也最可靠。這個 API 會提交由 poll() 方法返回的最新偏移量,提交成功后馬上返回,如果提交失敗就拋出異常。
代碼示例如下:
1 while (true) { 2 ConsumerRecords<String, String> records = consumer.poll(100); 3 for (ConsumerRecord<String, String> record : records) 4 { 5 System.out.printf("topic = %s, partition = %s, offset = 6 %d, customer = %s, country = %s\n", 7 record.topic(), record.partition(), 8 record.offset(), record.key(), record.value()); 9 } 10 try { 11 consumer.commitSync(); 12 } catch (CommitFailedException e) { 13 log.error("commit failed", e) 14 } 15 }
commitSync() 將會提交由 poll() 返回的最新偏移量,所以在處理完所有記錄后要確保調用了 commitSync(),否則還是會有丟失消息的風險。如果發生了再均衡,從最近一批消息到發生再均衡之間的所有消息都將被重復處理。
同時在這個程序中,只要沒有發生不可恢復的錯誤,commitSync() 方法會一直嘗試直至提交成功。如果提交失敗,我們也只能把異常記錄到錯誤日志里。
3.2 異步提交
同步提交有一個不足之處,在 broker 對提交請求作出回應之前,應用程序會一直阻塞,這樣會限制應用程序的吞吐量。我們可以通過降低提交頻率來提升吞吐量,但如果發生了再均衡,會增加重復消息的數量。
這個時候可以使用異步提交 API。我們只管發送提交請求,無需等待 broker 的響應。
1 while (true) { 2 ConsumerRecords<String, String> records = consumer.poll(100); 3 for (ConsumerRecord<String, String> record : records) 4 { 5 System.out.printf("topic = %s, partition = %s, 6 offset = %d, customer = %s, country = %s\n", 7 record.topic(), record.partition(), record.offset(), 8 record.key(), record.value()); 9 } 10 consumer.commitAsync(); 11 }
在成功提交或碰到無法恢復的錯誤之前,commitSync() 會一直重試,但是 commitAsync() 不會,這也是 commitAsync() 不好的一個地方。
它之所以不進行重試,是因為在它收到服務器響應的時候,可能有一個更大的偏移量已經提交成功。假設我們發出一個請求用於提交偏移量 2000,這個時候發生了短暫的通信問題,服務器收不到請求,自然也不會作出任何響應。與此同時,我們處理了另外一批消息,並成功提交了偏移量 3000。如果 commitAsync() 重新嘗試提交偏移量 2000,它有可能在偏移量 3000 之后提交成功。這個時候如果發生再均衡,就會出現重復消息。
commitAsync() 也支持回調,在 broker 作出響應時會執行回調。回調經常被用於記錄提交錯誤或生成度量指標。如果要用它來進行重試,則一定要注意提交的順序。
1 while (true) { 2 ConsumerRecords<String, String> records = consumer.poll(100); 3 for (ConsumerRecord<String, String> record : records) { 4 System.out.printf("topic = %s, partition = %s, 5 offset = %d, customer = %s, country = %s\n", 6 record.topic(), record.partition(), record.offset(), 7 record.key(), record.value()); 8 } 9 consumer.commitAsync(new OffsetCommitCallback() { 10 public void onComplete(Map<TopicPartition, 11 OffsetAndMetadata> offsets, Exception e) { 12 if (e != null) 13 log.error("Commit failed for offsets {}", offsets, e); 14 } 15 }); 16 }
3.3 同步和異步混合提交
一般情況下,針對偶爾出現的提交失敗,不進行重試不會有太大問題,因為如果提交失敗是因為臨時問題導致的,那么后續的提交總會有成功的。
但如果這是發生在關閉消費者或再均衡前的最后一次提交,就要確保能夠提交成功。因此在這種情況下,我們應該考慮使用混合提交的方法:
1 try { 2 while (true) { 3 ConsumerRecords<String, String> records = consumer.poll(100); 4 for (ConsumerRecord<String, String> record : records) { 5 System.out.println("topic = %s, partition = %s, offset = %d, 6 customer = %s, country = %s\n", 7 record.topic(), record.partition(), 8 record.offset(), record.key(), record.value()); 9 } 10 consumer.commitAsync(); 11 } 12 } catch (Exception e) { 13 log.error("Unexpected error", e); 14 } finally { 15 try { 16 consumer.commitSync(); 17 } finally { 18 consumer.close(); 19 } 20 }
- 在程序正常運行過程中,我們使用 commitAsync 方法來進行提交,這樣的運行速度更快,而且就算當前提交失敗,下次提交成功也可以。
- 如果直接關閉消費者,就沒有所謂的“下一次提交”了,因為不會再調用
poll()
方法。使用 commitSync() 方法會一直重試,直到提交成功或發生無法恢復的錯誤。
3.4 提交特定的偏移量
如果 poll() 方法返回一大批數據,為了避免因再均衡引起的重復處理整批消息,想要在批次中間提交偏移量該怎么辦?這種情況無法通過調用 commitSync() 或 commitAsync() 來實現,因為它們只會提交最后一個偏移量,而此時該批次里的消息還沒有處理完。
這時候需要使用一下的兩個方法:
1 /** 2 * Commit the specified offsets for the specified list of topics and partitions. 3 */ 4 @Override 5 public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) 6 7 8 /** 9 * Commit the specified offsets for the specified list of topics and partitions to Kafka. 10 */ 11 @Override 12 public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
消費者 API 允許在調用 commitSync() 和 commitAsync() 方法時傳進去希望提交的分區和偏移量的 map。
假設處理了半個批次的消息,最后一個來自主題“customers”分區 3 的消息的偏移量是 5000,你可以調用 commitSync() 方法來提交它。不過,因為消費者可能不只讀取一個分區,你需要跟蹤所有分區的偏移量,所以在這個層面上控制偏移量的提交會讓代碼變復雜。
代碼如下:
1 private Map<TopicPartition, OffsetAndMetadata> currentOffsets = 2 new HashMap<>(); 3 int count = 0; 4 5 ... 6 7 while (true) { 8 ConsumerRecords<String, String> records = consumer.poll(100); 9 for (ConsumerRecord<String, String> record : records) 10 { 11 System.out.printf("topic = %s, partition = %s, offset = %d, 12 customer = %s, country = %s\n", 13 record.topic(), record.partition(), record.offset(), 14 record.key(), record.value()); 15 currentOffsets.put(new TopicPartition(record.topic(), 16 record.partition()), new 17 OffsetAndMetadata(record.offset()+1, "no metadata")); 18 if (count % 1000 == 0) 19 consumer.commitAsync(currentOffsets,null); 20 count++; 21 } 22 }
這里調用的是 commitAsync(),不過調用commitSync()也是完全可以的。在提交特定偏移量時,仍然要處理可能發生的錯誤。
四、監聽再均衡
如果 Kafka 觸發了再均衡,我們需要在消費者失去對一個分區的所有權之前提交最后一個已處理記錄的偏移量。如果消費者准備了一個緩沖區用於處理偶發的事件,那么在失去分區所有權之前,需要處理在緩沖區累積下來的記錄。可能還需要關閉文件句柄、數據庫連接等。
在為消費者分配新分區或移除舊分區時,可以通過消費者 API 執行一些應用程序代碼,在調用 subscribe() 方法時傳進去一個 ConsumerRebalanceListener 實例就可以了。 ConsumerRebalanceListener 有兩個需要實現的方法。
- public void onPartitionsRevoked(Collection partitions) 方法會在再均衡開始之前和消費者停止讀取消息之后被調用。如果在這里提交偏移量,下一個接管分區的消費者就知道該從哪里開始讀取了。
- public void onPartitionsAssigned(Collection partitions) 方法會在重新分配分區之后和消費者開始讀取消息之前被調用。
下面的例子將演示如何在失去分區所有權之前通過 onPartitionsRevoked() 方法來提交偏移量。
1 private Map<TopicPartition, OffsetAndMetadata> currentOffsets= 2 new HashMap<>(); 3 4 private class HandleRebalance implements ConsumerRebalanceListener { 5 public void onPartitionsAssigned(Collection<TopicPartition> 6 partitions) { 7 } 8 9 public void onPartitionsRevoked(Collection<TopicPartition> 10 partitions) { 11 System.out.println("Lost partitions in rebalance. 12 Committing current 13 offsets:" + currentOffsets); 14 consumer.commitSync(currentOffsets); 15 } 16 } 17 18 try { 19 consumer.subscribe(topics, new HandleRebalance()); 20 21 while (true) { 22 ConsumerRecords<String, String> records = 23 consumer.poll(100); 24 for (ConsumerRecord<String, String> record : records) 25 { 26 System.out.println("topic = %s, partition = %s, offset = %d, 27 customer = %s, country = %s\n", 28 record.topic(), record.partition(), record.offset(), 29 record.key(), record.value()); 30 currentOffsets.put(new TopicPartition(record.topic(), 31 record.partition()), new 32 OffsetAndMetadata(record.offset()+1, "no metadata")); 33 } 34 consumer.commitAsync(currentOffsets, null); 35 } 36 } catch (WakeupException e) { 37 // 忽略異常,正在關閉消費者 38 } catch (Exception e) { 39 log.error("Unexpected error", e); 40 } finally { 41 try { 42 consumer.commitSync(currentOffsets); 43 } finally { 44 consumer.close(); 45 System.out.println("Closed consumer and we are done"); 46 } 47 }
如果發生再均衡,我們要在即將失去分區所有權時提交偏移量。要注意,提交的是最近處理過的偏移量,而不是批次中還在處理的最后一個偏移量。因為分區有可能在我們還在處理消息的時候被撤回。我們要提交所有分區的偏移量,而不只是那些即將失去所有權的分區的偏移量——因為提交的偏移量是已經處理過的,所以不會有什么問題。調用 commitSync() 方法,確保在再均衡發生之前提交偏移量。