目錄:
- MetaData信息
- Kafka偏移量
- 客戶端負載均衡
MetaData信息
客戶端如何知道該往哪個節點發送請求來獲取數據:通過元數據。
元數據(MetaData)是什么:topic、topic的分區、每個分區有哪些副本、哪個副本是leader等信息。
一般情況下客戶端會緩存元數據,並直接往目標broker上發送生產和獲取請求,並且客戶端還會定時的刷新自己的元數據。
Kafka偏移量
1、Kafka GUI
說偏移量之前先介紹下Kafka GUI(Kafka graphical user interface),因官方沒有提供,所以我們使用社區較為活躍的工具。
- Kafka Tool地址: http://www.kafkatool.com/download.html
- Kafka Manager地址: https://github.com/yahoo/kafka-manager
- KafkaOffsetMonitor地址: https://github.com/Morningstar/kafka-offset-monitor
KafkaOffsetMonitor配置:
KafkaOffsetMonitor啟動腳本(如: kafkaoffset_monitor.sh,記得給腳本賦執行權限)
java -cp KafkaOffsetMonitor-assembly 0.46-SNAPSHOT.jar \ com.quantifind.kafka.offsetapp.OffsetGetterWeb\ --offsetStorage kafka \ --kafkaBrokers ip1:port1,ip2:port2,ip3:port3 \ --zk ip1:port1,ip2:port2,ip3:port3 \ --port 8088\ --refresh 10.seconds \ --retain 2.days
- --offsetStorage:指明offset信息由kafka來保存,而非zookeeper
- --refresh:多少秒刷新一次信息
- --retain:信息保存到數據庫多少天
——————————————————————————————————————————————————————
2、消費指定偏移量的消息
// 指定分區信息 consumer.assign(Collections.singletonList(new TopicPartition("Topic-02", 0))); // 從頭開始消費消息 consumer.seekToBeginning(Collections.singletonList(new TopicPartition("Topic-02", 0))); // 按照指定的偏移量消費消息 consumer.seek(new TopicPartition("Topic-05", 1), 9);
——————————————————————————————————————————————————————
客戶端負載均衡
當消費者發生變化(加入新的消費者或原有消費者宕機)或topic發生變化時會出現再均衡現象(分區的所有權從一個消費者轉到另一個消費者)。
再均衡現象會導致消息的重復處理和丟失。
- 當提交的偏移量小於客戶端處理的偏移量時重復處理消息。
- 當提交的偏移量大於客戶端處理的偏移量時會丟失消息。
——————————————————————————————————————————————————————
為了解決這一問題Kafka提供了再均衡監聽器:ConsumerRebalanceListener
private static class CustomerRebalancer implements ConsumerRebalanceListener { /** * 再均衡開始之前和消費者停止讀取消息之后被調用 * 如果在這里提交偏移量,下一個接管分區的消費者就知道該從哪里開始讀取了 */ @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { // 如果發生再均衡,我們要在即將失去分區所有權時提交偏移量 // 要注意,提交的是最近處理過的偏移量,而不是批次中還在處理的最后一個偏移量 System.out.println("Lost partitions in rebalance. Committing current offsets:" + currentOffsets); consumer.commitSync(currentOffsets); } /** * 在重新分配分區之后和新的消費者開始讀取消息之前被調用 */ @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { long committedOffset = -1; for (TopicPartition topicPartition : partitions) { // 獲取該分區已經消費的偏移量 committedOffset = consumer.committed(topicPartition).offset(); // 重置偏移量到上一次提交的偏移量的下一個位置處開始消費 consumer.seek(topicPartition, committedOffset + 1); } } }