Kafka-再均衡監聽器
在為消費者分配新分區或移除舊分區時,可以通過消費者API執行一些應用程序代碼,在調用subscribe()方法時傳進去一個ConsumerRebalanceListener實例就可以了。
public void onPartitionsRevoked(Collection<TopicPartition> partitions)
方法會在再均衡開始之前和消費者停止讀取消息之后被調用。如果在這里提交偏移量,下一個接管分區的消費者就知道該從哪里讀取了。
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
方法會在重新分配分區之后和消費者開始讀取消息之前被調用。
代碼如下
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import java.time.Duration; import java.util.*; /** * @Author FengZhen * @Date 2020-04-06 11:07 * @Description kafka消費者 */ public class KafkaConsumerTest { private static Properties kafkaProps = new Properties(); static { kafkaProps.put("bootstrap.servers", "localhost:9092"); kafkaProps.put("group.id", "test"); kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); } private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>(); private static KafkaConsumer<String, String> consumer;/** * 指定偏移量提交 */ public static void commitSelfAppointWithRebalanceListener(){ int count = 0; //關閉自動提交偏移量,改用手動提交,與下方consumer.commitSync();一起使用 kafkaProps.put("enable.auto.commit", false); consumer = new KafkaConsumer<String, String>(kafkaProps); //訂閱主題,可傳入一個主題列表,也可以是正則表達式,如果有人創建了與正則表達式匹配的新主題,會立即觸發一次再均衡,消費者就可以讀取新添加的主題。 //如:test.*,訂閱test相關的所有主題 consumer.subscribe(Collections.singleton("test_partition"), new HandleRebalance()); System.out.println("==== subscribe success ===="); try { while (true){ //消費者持續對kafka進行輪訓,否則會被認為已經死亡,它的分區會被移交給群組里的其他消費者。 //傳給poll方法的是一個超時時間,用於控制poll()方法的阻塞時間(在消費者的緩沖區里沒有可用數據時會發生阻塞) //如果該參數被設為0,poll會立即返回,否則它會在指定的毫秒數內一直等待broker返回數據 //poll方法返回一個記錄列表。每條記錄包含了記錄所屬主題的信息、記錄所在分區的信息、記錄在分區里的偏移量,以及記錄的鍵值對。 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); System.out.println("==== data get ===="); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s", record.topic(), record.partition(), record.offset(), record.key(), record.value())); currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no metadata")); if (count % 2 == 0){ //每2次提交一次,還可以根據時間間隔來提交 consumer.commitAsync(currentOffsets, new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (null != exception){ System.out.println(String.format("==== Commit failed for offsets %s, error:%s ====", offsets, ExceptionUtil.getStackTrace(exception))); } } }); } count++; } //異步提交(結合下方同步提交) consumer.commitAsync(); } } catch(Exception e){ e.printStackTrace(); } finally { //退出應用前使用close方法關閉消費者。 //網絡連接和socket也會隨之關閉,並立即觸發一次再均衡,而不是等待群組協調器發現它不在發送心跳並認定它已死亡,因為那樣需要更長的時間,導致政哥群組在一段時間內無法讀取消息。 consumer.close(); } } /** * 再均衡監聽器 */ private static class HandleRebalance implements ConsumerRebalanceListener{ /** * 方法會在再均衡開始之前和消費者停止讀取消息之后被調用。如果在這里提交偏移量,下一個接管分區的消費者就知道該從哪里讀取了。 * @param partitions */ @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { System.out.println("Lost partitions in rebalance.Committing current offsets:" + currentOffsets); consumer.commitSync(currentOffsets); } /** * 方法會在重新分配分區之后和消費者開始讀取消息之前被調用。 * @param partitions */ @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { } } }
