Kafka學習筆記(五、Kafka偏移量)


目錄:

  • MetaData信息
  • Kafka偏移量
  • 客戶端負載均衡

MetaData信息

客戶端如何知道該往哪個節點發送請求來獲取數據:通過元數據。

元數據(MetaData)是什么:topic、topic的分區、每個分區有哪些副本、哪個副本是leader等信息。

一般情況下客戶端會緩存元數據,並直接往目標broker上發送生產和獲取請求,並且客戶端還會定時的刷新自己的元數據

Kafka偏移量

1、Kafka GUI

說偏移量之前先介紹下Kafka GUI(Kafka graphical user interface),因官方沒有提供,所以我們使用社區較為活躍的工具。

  • Kafka Tool地址: http://www.kafkatool.com/download.html
  • Kafka Manager地址: https://github.com/yahoo/kafka-manager
  • KafkaOffsetMonitor地址: https://github.com/Morningstar/kafka-offset-monitor

KafkaOffsetMonitor配置:

KafkaOffsetMonitor啟動腳本(如: kafkaoffset_monitor.sh,記得給腳本賦執行權限)

java -cp KafkaOffsetMonitor-assembly 0.46-SNAPSHOT.jar \
         com.quantifind.kafka.offsetapp.OffsetGetterWeb\
--offsetStorage kafka \
--kafkaBrokers ip1:port1,ip2:port2,ip3:port3 \
--zk ip1:port1,ip2:port2,ip3:port3 \
--port 8088\
--refresh 10.seconds \
--retain 2.days
  • --offsetStorage:指明offset信息由kafka來保存,而非zookeeper
  • --refresh:多少秒刷新一次信息
  • --retain:信息保存到數據庫多少天

——————————————————————————————————————————————————————

2、消費指定偏移量的消息

// 指定分區信息
consumer.assign(Collections.singletonList(new TopicPartition("Topic-02", 0)));
// 從頭開始消費消息
consumer.seekToBeginning(Collections.singletonList(new TopicPartition("Topic-02", 0)));
// 按照指定的偏移量消費消息
consumer.seek(new TopicPartition("Topic-05", 1), 9);

——————————————————————————————————————————————————————

客戶端負載均衡

消費者發生變化(加入新的消費者或原有消費者宕機)或topic發生變化時會出現再均衡現象(分區的所有權從一個消費者轉到另一個消費者)。

再均衡現象會導致消息的重復處理丟失

  • 當提交的偏移量小於客戶端處理的偏移量時重復處理消息。
  • 當提交的偏移量大於客戶端處理的偏移量時會丟失消息。

——————————————————————————————————————————————————————

為了解決這一問題Kafka提供了再均衡監聽器:ConsumerRebalanceListener

private static class CustomerRebalancer implements ConsumerRebalanceListener {
    /**
     * 再均衡開始之前和消費者停止讀取消息之后被調用
     * 如果在這里提交偏移量,下一個接管分區的消費者就知道該從哪里開始讀取了
     */
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // 如果發生再均衡,我們要在即將失去分區所有權時提交偏移量
        // 要注意,提交的是最近處理過的偏移量,而不是批次中還在處理的最后一個偏移量
        System.out.println("Lost partitions in rebalance. Committing current offsets:" + currentOffsets);
        consumer.commitSync(currentOffsets);
    }

    /**
     * 在重新分配分區之后和新的消費者開始讀取消息之前被調用
     */
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        long committedOffset = -1;
        for (TopicPartition topicPartition : partitions) {
            // 獲取該分區已經消費的偏移量
            committedOffset = consumer.committed(topicPartition).offset();
            // 重置偏移量到上一次提交的偏移量的下一個位置處開始消費
            consumer.seek(topicPartition, committedOffset + 1);
        }
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM