Kafka-消費者-偏移量的提交方式
每次調用poll()方法,它總是返回由生產者寫入Kafka但還沒有被消費者讀取過的記錄,可以追蹤到哪些記錄是被群組里的哪個消費者讀取的。
更新分區當前位置的操作叫做提交。
消費者往一個叫做 _consumer_offset的特殊主題發送消息,消息里包含每個分區的偏移量。如果消費者一直處於運行狀態,那么偏移量就沒有什么用處。不過,如果消費者發生崩潰或者有新的消費者加入群組,就會觸發再均衡,完成再均衡之后,每個消費者可能分配到新的分區,而不是之前處理的那個。為了能夠繼續之前的工作,消費者需要讀取每個分區最后一個提交的偏移量,然后從偏移量指定的地方繼續處理。
如果提交的偏移量小於客戶端處理的最后一個消息的偏移量,那么處於兩個偏移量之間的消息就會被重復處理。
如果提交的偏移量大於客戶端處理的最后一個消息的偏移量,那么處於兩個偏移量之間的消息將會丟失。
自動提交偏移量
最簡單的方式是讓消費者自動提交偏移量。如果enable.auto.commit被設為true,那么每過5s,消費者會自動把從poll()方法接收到的最大偏移量提交上去。提交時間間隔由auto.commit.interval.ms控制,默認值是5s。自動提交也是在輪詢里進行的,消費者每次在進行輪詢時會檢查是否該提交偏移量了,如果是,那么就會提交從上一次輪詢返回的偏移量。
在使用自動提交時,每次調用輪詢方法都會把上一次調用返回的偏移量提交上去,它並不知道具體哪些消息已經被處理了,所以再次調用之前最好確保所有當前調用返回的消息都已經處理完畢(在調用close()方法之前也會進行自動提交)。
手動提交當前偏移量
開發者可以在必要的時候提交當前偏移量,而不是基於時間間隔。
把enable.auto.commit設為false,讓應用程序決定何時提交偏移量。使用commitSync()提交偏移量最簡單最可靠。這個API會提交由poll()方法返回的最新偏移量,提交成功后馬上返回,如果提交失敗就拋出異常。在處理完所有記錄后要確保調用了commitSync(),否則還是會有丟失消息的風險。如果發生了再均衡,從最近一批消息到發生再均衡之間的所有消息都將被重復處理。
異步提交當前偏移量
手動提交由一個不足之處,在broker對提交請求作出回應之前,應用程序會一直阻塞,這樣會限制應用程序的吞吐量。此時可以使用異步提交API,無需等待broker的相應
Consumer.commitAsync();
異步的缺點:提交失敗之后不會進行重試,因為在收到服務器相應的時候,可能有一個更大的偏移量已經提交成功。可以使用異步提交的回調記錄下錯誤信息和偏移量。
同步和異步組合提交
如果一切正常,使用commitAsync()方法來提交。這樣速度更快,而且即使這次提交失敗,下一次提交很可能會成功。
如果直接關閉消費者,就沒有所謂的下一次提交了。在異常處理后使用commitSync()提交。
提交特定的偏移量
上述幾種方式的提交偏移量的頻率與處理消息批次的頻率是一樣的,如果poll()方法返回一大批數據,為了避免因再均衡引起的重復處理整批消息,可以在調用commitAsync()和commitSync()方法時傳進去希望提交的分區和偏移量的map。
代碼如下
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Properties; /** * @Author FengZhen * @Date 2020-04-06 11:07 * @Description kafka消費者 */ public class KafkaConsumerTest { private static Properties kafkaProps = new Properties(); static { kafkaProps.put("bootstrap.servers", "localhost:9092"); kafkaProps.put("group.id", "test"); kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); } public static void main(String[] args) { commitAuto(); } /** * 自動提交偏移量 */ public static void commitAuto(){ KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaProps); //訂閱主題,可傳入一個主題列表,也可以是正則表達式,如果有人創建了與正則表達式匹配的新主題,會立即觸發一次再均衡,消費者就可以讀取新添加的主題。 //如:test.*,訂閱test相關的所有主題 consumer.subscribe(Collections.singleton("test_partition")); System.out.println("==== subscribe success ===="); try { while (true){ //消費者持續對kafka進行輪訓,否則會被認為已經死亡,它的分區會被移交給群組里的其他消費者。 //傳給poll方法的是一個超時時間,用於控制poll()方法的阻塞時間(在消費者的緩沖區里沒有可用數據時會發生阻塞) //如果該參數被設為0,poll會立即返回,否則它會在指定的毫秒數內一直等待broker返回數據 //poll方法返回一個記錄列表。每條記錄包含了記錄所屬主題的信息、記錄所在分區的信息、記錄在分區里的偏移量,以及記錄的鍵值對。 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); System.out.println("==== data get ===="); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s", record.topic(), record.partition(), record.offset(), record.key(), record.value())); } } } catch(Exception e){ e.printStackTrace(); } finally { //退出應用前使用close方法關閉消費者。 //網絡連接和socket也會隨之關閉,並立即觸發一次再均衡,而不是等待群組協調器發現它不在發送心跳並認定它已死亡,因為那樣需要更長的時間,導致政哥群組在一段時間內無法讀取消息。 consumer.close(); } } /** * 同步提交偏移量 */ public static void commitSelfSync(){ //關閉自動提交偏移量,改用手動提交,與下方consumer.commitSync();一起使用 kafkaProps.put("enable.auto.commit", false); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaProps); //訂閱主題,可傳入一個主題列表,也可以是正則表達式,如果有人創建了與正則表達式匹配的新主題,會立即觸發一次再均衡,消費者就可以讀取新添加的主題。 //如:test.*,訂閱test相關的所有主題 consumer.subscribe(Collections.singleton("test_partition")); System.out.println("==== subscribe success ===="); try { while (true){ //消費者持續對kafka進行輪訓,否則會被認為已經死亡,它的分區會被移交給群組里的其他消費者。 //傳給poll方法的是一個超時時間,用於控制poll()方法的阻塞時間(在消費者的緩沖區里沒有可用數據時會發生阻塞) //如果該參數被設為0,poll會立即返回,否則它會在指定的毫秒數內一直等待broker返回數據 //poll方法返回一個記錄列表。每條記錄包含了記錄所屬主題的信息、記錄所在分區的信息、記錄在分區里的偏移量,以及記錄的鍵值對。 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); System.out.println("==== data get ===="); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s", record.topic(), record.partition(), record.offset(), record.key(), record.value())); } //同步手動提交偏移量,只要沒有發生不可恢復的錯誤,commitSync方法會一直嘗試直至提交成功。 consumer.commitSync(); } } catch(Exception e){ e.printStackTrace(); } finally { //退出應用前使用close方法關閉消費者。 //網絡連接和socket也會隨之關閉,並立即觸發一次再均衡,而不是等待群組協調器發現它不在發送心跳並認定它已死亡,因為那樣需要更長的時間,導致政哥群組在一段時間內無法讀取消息。 consumer.close(); } } /** * 異步提交偏移量 */ public static void commitSelfAsync(){ //關閉自動提交偏移量,改用手動提交,與下方consumer.commitSync();一起使用 kafkaProps.put("enable.auto.commit", false); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaProps); //訂閱主題,可傳入一個主題列表,也可以是正則表達式,如果有人創建了與正則表達式匹配的新主題,會立即觸發一次再均衡,消費者就可以讀取新添加的主題。 //如:test.*,訂閱test相關的所有主題 consumer.subscribe(Collections.singleton("test_partition")); System.out.println("==== subscribe success ===="); try { while (true){ //消費者持續對kafka進行輪訓,否則會被認為已經死亡,它的分區會被移交給群組里的其他消費者。 //傳給poll方法的是一個超時時間,用於控制poll()方法的阻塞時間(在消費者的緩沖區里沒有可用數據時會發生阻塞) //如果該參數被設為0,poll會立即返回,否則它會在指定的毫秒數內一直等待broker返回數據 //poll方法返回一個記錄列表。每條記錄包含了記錄所屬主題的信息、記錄所在分區的信息、記錄在分區里的偏移量,以及記錄的鍵值對。 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); System.out.println("==== data get ===="); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s", record.topic(), record.partition(), record.offset(), record.key(), record.value())); } //異步提交 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (null != exception){ System.out.println(String.format("==== Commit failed for offsets %s, error:%s ====", offsets, ExceptionUtil.getStackTrace(exception))); } } }); } } catch(Exception e){ e.printStackTrace(); } finally { //退出應用前使用close方法關閉消費者。 //網絡連接和socket也會隨之關閉,並立即觸發一次再均衡,而不是等待群組協調器發現它不在發送心跳並認定它已死亡,因為那樣需要更長的時間,導致政哥群組在一段時間內無法讀取消息。 consumer.close(); } } /** * 同步異步結合使用提交偏移量 */ public static void commitSelfSyncAndAsync(){ //關閉自動提交偏移量,改用手動提交,與下方consumer.commitSync();一起使用 kafkaProps.put("enable.auto.commit", false); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaProps); //訂閱主題,可傳入一個主題列表,也可以是正則表達式,如果有人創建了與正則表達式匹配的新主題,會立即觸發一次再均衡,消費者就可以讀取新添加的主題。 //如:test.*,訂閱test相關的所有主題 consumer.subscribe(Collections.singleton("test_partition")); System.out.println("==== subscribe success ===="); try { while (true){ //消費者持續對kafka進行輪訓,否則會被認為已經死亡,它的分區會被移交給群組里的其他消費者。 //傳給poll方法的是一個超時時間,用於控制poll()方法的阻塞時間(在消費者的緩沖區里沒有可用數據時會發生阻塞) //如果該參數被設為0,poll會立即返回,否則它會在指定的毫秒數內一直等待broker返回數據 //poll方法返回一個記錄列表。每條記錄包含了記錄所屬主題的信息、記錄所在分區的信息、記錄在分區里的偏移量,以及記錄的鍵值對。 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); System.out.println("==== data get ===="); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s", record.topic(), record.partition(), record.offset(), record.key(), record.value())); } //異步提交(結合下方同步提交) consumer.commitAsync(); } } catch(Exception e){ e.printStackTrace(); } finally { //同步提交 consumer.commitSync(); //退出應用前使用close方法關閉消費者。 //網絡連接和socket也會隨之關閉,並立即觸發一次再均衡,而不是等待群組協調器發現它不在發送心跳並認定它已死亡,因為那樣需要更長的時間,導致政哥群組在一段時間內無法讀取消息。 consumer.close(); } } /** * 指定偏移量提交 */ public static void commitSelfAppoint(){ Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>(); int count = 0; //關閉自動提交偏移量,改用手動提交,與下方consumer.commitSync();一起使用 kafkaProps.put("enable.auto.commit", false); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaProps); //訂閱主題,可傳入一個主題列表,也可以是正則表達式,如果有人創建了與正則表達式匹配的新主題,會立即觸發一次再均衡,消費者就可以讀取新添加的主題。 //如:test.*,訂閱test相關的所有主題 consumer.subscribe(Collections.singleton("test_partition")); System.out.println("==== subscribe success ===="); try { while (true){ //消費者持續對kafka進行輪訓,否則會被認為已經死亡,它的分區會被移交給群組里的其他消費者。 //傳給poll方法的是一個超時時間,用於控制poll()方法的阻塞時間(在消費者的緩沖區里沒有可用數據時會發生阻塞) //如果該參數被設為0,poll會立即返回,否則它會在指定的毫秒數內一直等待broker返回數據 //poll方法返回一個記錄列表。每條記錄包含了記錄所屬主題的信息、記錄所在分區的信息、記錄在分區里的偏移量,以及記錄的鍵值對。 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); System.out.println("==== data get ===="); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s", record.topic(), record.partition(), record.offset(), record.key(), record.value())); currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no metadata")); if (count % 2 == 0){ //每2次提交一次,還可以根據時間間隔來提交 consumer.commitAsync(currentOffsets, new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (null != exception){ System.out.println(String.format("==== Commit failed for offsets %s, error:%s ====", offsets, ExceptionUtil.getStackTrace(exception))); } } }); } count++; } //異步提交(結合下方同步提交) consumer.commitAsync(); } } catch(Exception e){ e.printStackTrace(); } finally { //退出應用前使用close方法關閉消費者。 //網絡連接和socket也會隨之關閉,並立即觸發一次再均衡,而不是等待群組協調器發現它不在發送心跳並認定它已死亡,因為那樣需要更長的時間,導致政哥群組在一段時間內無法讀取消息。 consumer.close(); } } }