1.概述
目前,Kafka 官網最新版[0.10.1.1],已默認將消費的 offset 遷入到了 Kafka 一個名為 __consumer_offsets 的Topic中。其實,早在 0.8.2.2 版本,已支持存入消費的 offset 到Topic中,只是那時候默認是將消費的 offset 存放在 Zookeeper 集群中。那現在,官方默認將消費的offset存儲在 Kafka 的Topic中,同時,也保留了存儲在 Zookeeper 的接口,通過 offsets.storage 屬性來進行設置。
2.內容
其實,官方這樣推薦,也是有其道理的。之前版本,Kafka其實存在一個比較大的隱患,就是利用 Zookeeper 來存儲記錄每個消費者/組的消費進度。雖然,在使用過程當中,JVM幫助我們完成了自一些優化,但是消費者需要頻繁的去與 Zookeeper 進行交互,而利用ZKClient的API操作Zookeeper頻繁的Write其本身就是一個比較低效的Action,對於后期水平擴展也是一個比較頭疼的問題。如果期間 Zookeeper 集群發生變化,那 Kafka 集群的吞吐量也跟着受影響。
在此之后,官方其實很早就提出了遷移到 Kafka 的概念,只是,之前是一直默認存儲在 Zookeeper集群中,需要手動的設置,如果,對 Kafka 的使用不是很熟悉的話,一般我們就接受了默認的存儲(即:存在 ZK 中)。在新版 Kafka 以及之后的版本,Kafka 消費的offset都會默認存放在 Kafka 集群中的一個叫 __consumer_offsets 的topic中。
當然,其實她實現的原理也讓我們很熟悉,利用 Kafka 自身的 Topic,以消費的Group,Topic,以及Partition做為組合 Key。所有的消費offset都提交寫入到上述的Topic中。因為這部分消息是非常重要,以至於是不能容忍丟數據的,所以消息的 acking 級別設置為了 -1,生產者等到所有的 ISR 都收到消息后才會得到 ack(數據安全性極好,當然,其速度會有所影響)。所以 Kafka 又在內存中維護了一個關於 Group,Topic 和 Partition 的三元組來維護最新的 offset 信息,消費者獲取最新的offset的時候會直接從內存中獲取。
3.實現
那我們如何實現獲取這部分消費的 offset,我們可以在內存中定義一個Map集合,來維護消費中所捕捉到 offset,如下所示:
protected static Map<GroupTopicPartition, OffsetAndMetadata> offsetMap = new ConcurrentHashMap<>();
然后,我們通過一個監聽線程來更新內存中的Map,代碼如下所示:
private static synchronized void startOffsetListener(ConsumerConnector consumerConnector) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(consumerOffsetTopic, new Integer(1)); KafkaStream<byte[], byte[]> offsetMsgStream = consumerConnector.createMessageStreams(topicCountMap).get(consumerOffsetTopic).get(0); ConsumerIterator<byte[], byte[]> it = offsetMsgStream.iterator(); while (true) { MessageAndMetadata<byte[], byte[]> offsetMsg = it.next(); if (ByteBuffer.wrap(offsetMsg.key()).getShort() < 2) { try { GroupTopicPartition commitKey = readMessageKey(ByteBuffer.wrap(offsetMsg.key())); if (offsetMsg.message() == null) { continue; } OffsetAndMetadata commitValue = readMessageValue(ByteBuffer.wrap(offsetMsg.message())); offsetMap.put(commitKey, commitValue); } catch (Exception e) { e.printStackTrace(); } } } }
在拿到這部分更新后的offset數據,我們可以通過 RPC 將這部分數據共享出去,讓客戶端獲取這部分數據並可視化。RPC 接口如下所示:
namespace java org.smartloli.kafka.eagle.ipc service KafkaOffsetServer{ string query(1:string group,2:string topic,3:i32 partition), string getOffset(), string sql(1:string sql), string getConsumer(), string getActiverConsumer() }
這里,如果我們不想寫接口來操作 offset,可以通過 SQL 來操作消費的 offset 數組,使用方式如下所示:
- 引入依賴JAR
<dependency> <groupId>org.smartloli</groupId> <artifactId>jsql-client</artifactId> <version>1.0.0</version> </dependency>
- 使用接口
JSqlUtils.query(tabSchema, tableName, dataSets, sql);
tabSchema:表結構;tableName:表名;dataSets:數據集;sql:操作的SQL語句。
4.預覽
消費者預覽如下圖所示:
正在消費的關系圖如下所示:
消費詳細 offset 如下所示:
消費和生產的速率圖,如下所示:
5.總結
這里,說明一下,當 offset 存入到 Kafka 的topic中后,消費線程ID信息並沒有記錄,不過,我們通過閱讀Kafka消費線程ID的組成規則后,可以手動生成,其消費線程ID由:Group+ConsumerLocalAddress+Timespan+UUID(8bit)+PartitionId,由於消費者在其他節點,我們暫時無法確定ConsumerLocalAddress。最后,歡迎大家使用 Kafka 集群監控 ——[ Kafka Eagle ],[ 操作手冊 ]。
6.結束語
這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!