Kafka-再均衡監聽器


Kafka-再均衡監聽器

在為消費者分配新分區或移除舊分區時,可以通過消費者API執行一些應用程序代碼,在調用subscribe()方法時傳進去一個ConsumerRebalanceListener實例就可以了。

public void onPartitionsRevoked(Collection<TopicPartition> partitions)

方法會在再均衡開始之前和消費者停止讀取消息之后被調用。如果在這里提交偏移量,下一個接管分區的消費者就知道該從哪里讀取了。

public void onPartitionsAssigned(Collection<TopicPartition> partitions)

方法會在重新分配分區之后和消費者開始讀取消息之前被調用。

 代碼如下

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.*;

/**
 * @Author FengZhen
 * @Date 2020-04-06 11:07
 * @Description kafka消費者
 */
public class KafkaConsumerTest {
    private static Properties kafkaProps = new Properties();
    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put("group.id", "test");
        kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    }

    private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

    private static KafkaConsumer<String, String> consumer;/**
     * 指定偏移量提交
     */
    public static void commitSelfAppointWithRebalanceListener(){
        int count = 0;

        //關閉自動提交偏移量,改用手動提交,與下方consumer.commitSync();一起使用
        kafkaProps.put("enable.auto.commit", false);
        consumer = new KafkaConsumer<String, String>(kafkaProps);
        //訂閱主題,可傳入一個主題列表,也可以是正則表達式,如果有人創建了與正則表達式匹配的新主題,會立即觸發一次再均衡,消費者就可以讀取新添加的主題。
        //如:test.*,訂閱test相關的所有主題
        consumer.subscribe(Collections.singleton("test_partition"), new HandleRebalance());
        System.out.println("==== subscribe success ====");
        try {
            while (true){
                //消費者持續對kafka進行輪訓,否則會被認為已經死亡,它的分區會被移交給群組里的其他消費者。
                //傳給poll方法的是一個超時時間,用於控制poll()方法的阻塞時間(在消費者的緩沖區里沒有可用數據時會發生阻塞)
                //如果該參數被設為0,poll會立即返回,否則它會在指定的毫秒數內一直等待broker返回數據
                //poll方法返回一個記錄列表。每條記錄包含了記錄所屬主題的信息、記錄所在分區的信息、記錄在分區里的偏移量,以及記錄的鍵值對。
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                System.out.println("==== data get ====");
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value()));
                    currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no metadata"));
                    if (count % 2 == 0){
                        //每2次提交一次,還可以根據時間間隔來提交
                        consumer.commitAsync(currentOffsets, new OffsetCommitCallback() {
                            @Override
                            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                                if (null != exception){
                                    System.out.println(String.format("==== Commit failed for offsets %s, error:%s ====", offsets, ExceptionUtil.getStackTrace(exception)));
                                }
                            }
                        });
                    }
                    count++;
                }
                //異步提交(結合下方同步提交)
                consumer.commitAsync();
            }
        } catch(Exception e){
            e.printStackTrace();
        } finally {
            //退出應用前使用close方法關閉消費者。
            //網絡連接和socket也會隨之關閉,並立即觸發一次再均衡,而不是等待群組協調器發現它不在發送心跳並認定它已死亡,因為那樣需要更長的時間,導致政哥群組在一段時間內無法讀取消息。
            consumer.close();
        }
    }


    /**
     * 再均衡監聽器
     */
    private static class HandleRebalance implements ConsumerRebalanceListener{

        /**
         * 方法會在再均衡開始之前和消費者停止讀取消息之后被調用。如果在這里提交偏移量,下一個接管分區的消費者就知道該從哪里讀取了。
         * @param partitions
         */
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            System.out.println("Lost partitions in rebalance.Committing current offsets:" + currentOffsets);
            consumer.commitSync(currentOffsets);
        }

        /**
         * 方法會在重新分配分區之后和消費者開始讀取消息之前被調用。
         * @param partitions
         */
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

        }
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM