最近一直在關注阿里的一個開源項目:OpenMessaging
OpenMessaging, which includes the establishment of industry guidelines and messaging, streaming specifications to provide a common framework for finance, e-commerce, IoT and big-data area. The design principles are the cloud-oriented, simplicity, flexibility, and language independent in distributed heterogeneous environments. Conformance to these specifications will make it possible to develop a heterogeneous messaging applications across all major platforms and operating systems.
這是OpenMessaging-Java項目GitHub上的一段介紹,大致是說OpenMessaging項目致力於建立MQ領域的標准。
看了OpenMessaging-Java項目的源碼,定義了:
- Message接口
- Producer接口
- Consumer接口
- 消費方式:Pull、Push
- 各種異常
確實是在朝着建立一套MQ的接口標准。
這引發了我的一個思考:MQ目前確實沒有一套標准的接口,如果我們嘗試從更高的層次看自己的項目,即我們希望它成為行業標准,那么現在項目中接口的定義合適嗎?是否夠通用、簡潔、易用、合理?
帶着這樣的疑問,最近把Kafka Consumer部分的源碼讀了一遍,因為:
- Kafka應該是業界最著名的一個開源MQ了(RocketMQ最初也是參考了Kafka去實現的)
- 希望通過讀Kafka源碼能找到一些定義MQ接口的想法
但是在讀完Kafka Consumer部分的源碼后稍稍有一些失望,因為它並沒有給我代碼我想要的,反而在讀完后覺得接口設計和源碼實現上相對於Kafka的盛名有一些名不副實的感覺。
接口定義
Kafka在消費部分只提供了一個接口,即Consumer接口。
Consumer接口如下:
- Set<TopicPartition> assignment();
- Set<String> subscription();
- void subscribe(Collection<String> topics);
- void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
- void assign(Collection<TopicPartition> partitions);
- void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
- void subscribe(Pattern pattern);
- void unsubscribe();
- ConsumerRecords<k, v=""> poll(long timeout);
- void commitSync();
- void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
- void commitAsync();
- void commitAsync(OffsetCommitCallback callback);
- void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);
- void seek(TopicPartition partition, long offset);
- void seekToBeginning(Collection<TopicPartition> partitions);
- void seekToEnd(Collection<TopicPartition> partitions);
- long position(TopicPartition partition);
- OffsetAndMetadata committed(TopicPartition partition);
- Map<MetricName, ? extends Metric> metrics();
- List<PartitionInfo> partitionsFor(String topic);
- Map<String, List> listTopics();
- Set<TopicPartition> paused();
- void pause(Collection<TopicPartition> partitions);
- void resume(Collection<TopicPartition> partitions);
- Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);
- Map<TopicPartition, Long> beginningOffsets(Collection partitions);
- Map<TopicPartition, Long> endOffsets(Collection partitions);
- ...
(讀源碼時光看完這部分接口我就已經暈了)
上面的方法大致可以分為四類:
- 訂閱相關:subscribe、unsubscribe
- 消費相關:assign、poll、commit
- 元數據相關:搜索、設置、獲取offset信息;partition信息
- 生命周期相關:pause、resume、close等
看完這個接口的第一個感覺就是靈活有余易用不足。
Kafka幾乎暴露了所有的操作API,這樣的好處是足夠靈活,但是帶來的問題就是易用性下降,哪怕用戶只是希望簡單的獲取消息並處理也需要關心offset的提交和管理以及commit等等。
另外功能上也並沒有提供用戶更多的選擇,比如只提供了poll模式去獲取消息,而沒有提供類似push的模式。
線程模型部分
看完接口之后,第二步看了Kafka Consumer部分的線程模型,即嘗試將Consumer部分的線程模型梳理清楚:Consumer部分有哪些線程,線程間的交互等。
Consumer部分包含以下幾個模塊:
- Consuming
- Consumer、ConsumerConfig、ConsumerProtocol
- Fetcher
- 分布式協調
- AbstractCoordinator、ConsumerCoordinator
- 分區分配和負載均衡
- Assignor
- ReblanceListener
- 網絡組件
- NetClient
- Future
- FutureListener
- 異常
- NoAvailableBrokersException、CommitFailedExceptin、...
- 元數據和數據
- ConsumerRecord、ConsumerRecords
- TopicPartition
- 統計及其他
通過分布式系統組件及分區分配策略,每個Consumer可以拿到自己消費的分區。之后通過Fetcher來執行獲取消息的操作,而底層通過網絡組件NetworkClient和Broker完成交互。
通過閱讀源碼和注釋發現,Kafka Consumer並沒有去管理線程,而是所有的操作都在用戶線程中完成。
所以線程模型就非常簡單,Consumer非線程安全,同時只能有一個線程執行操作,且所有的操作都在用戶的線程中執行。
Consumer通過一個AtomicLong的CAS操作來保證只能有一個線程操作(多線程的情況下會報出異常)

部分代碼實現解讀
ConsumerRecords<k, v=""> poll(long timeout)
poll應該是Consumer的核心接口了,因為到這里才真正執行了和獲取消息相關的邏輯。

首先是校驗邏輯,在poll之前如果沒有進行topic的訂閱或分區的分配,poll操作將拋出異常。
接着是poll的核心邏輯:
- 在一個循環體中執行獲取數據的邏輯,跳出循環的條件是超時或者獲取到數據
從代碼中可以看出pollOnce應該是真正的執行一次獲取消息的操作。而代碼中注釋的部分是poll的核心:
- fetcher#sendFetches方法給有需要的Server節點發送獲取消息的請求
- 這么做的目的是在用戶下一次進行poll操作之前先將獲取消息的請求發送出去
- 這樣網絡操作和就可以和用戶處理消息的邏輯並行,降低延遲
- client#hasPendingRequests判斷是否還有未從客戶端發送出去的請求
- client#pollNoWakeup執行網絡真正的網絡IO操作
從這段注釋和代碼中可以看出,poll時如果拿到數據了,會將剩余的請求發送出去來實現pipelining的目的。
所以對應的pollOnce內的邏輯必然有從緩存中(即上一次poll請求中獲取的數據)獲取數據的操作。

pollOnce對目標分區執行一路poll請求,大致流程如下:
- coordinator#poll確保Consumer在Coordinator的管理之中
- ensure coordinator
- ensure active group(將Consumer加入到group中)
- 發送heartbeat
- 更新positions
- 從fetcher中獲取消息,如果已經拿到消息則返回結果,調用結束
- 對分區執行poll請求
- 阻塞等待至少一個fetch操作完成
- 判斷是否操作期間元數據進行了變更,如果變更了,丟棄獲取的數據
- 返回獲取結果
讀上面的代碼,第一個感覺就是可讀性比較差,比較難懂。
比如pollOnce中,fetcher#sendFetches從字面上看會理解成發送fetch請求:
- 如果是同步的,那么應該獲取它的結果
- 如果是異步的,應該通過Future獲取最終的結果
而實際上fetcher#sendFetches只是去構建了請求,並且將請求保存在NetworkClient中(NetworkClient會有數據結構保存每個Node對應的請求:類似這樣的數據結構Map<Node,Queue<Request>>)。
在client#poll中才將通過fetcher構造的請求真正的寫出去,並且阻塞的等待fetch的結果,從實現上感覺將代碼變的復雜了。
NetworkClient提供了異步的網絡操作,且是非線程安全的。
NetworkClient只有poll會真正的去執行IO操作,而其中的send只是將send數據保存在channel上,直到執行poll時將它寫到網絡中。
總結
在讀完Kafka Consumer部分的源碼后,稍稍有些失望:
- 只提供了poll模式,沒有提供給用戶更多的選擇,比如push模式
- openmessaging在這塊分別提供了PullConsumer和PushConsumer接口
- 而我們自己的項目則是提供了ListenConsumer、StreamConsumer等(Listen模式用戶只提供回調接口,我們管理線程,而Stream模式將消費線程交給用戶自己管理),繼續還會提供基礎的PullConsumer等
- Consumer接口的靈活性由於,易用性不足
- 暴露了太多的接口,對於一個指向簡單獲取消息處理的使用方來說心智負擔太重
- 代碼的實現上復雜化了,比如提供了Fetcher和NetworkClient的實現非常復雜
總體上Consumer的代碼有一些亂,比如下面是Kafka源碼中Consumer部分的包組織和我自己讀源碼使對它的整理:

右邊是Kafka源碼Consumer部分的包結構,所有的類分了兩塊,內部的在internals中。右邊是自己讀源碼時根據各個模塊對Consumer的類進行划分。
私以為將各個類按照不同的模塊分開會更加清晰,讀起來也會更加舒服。

