讀Kafka Consumer源碼


最近一直在關注阿里的一個開源項目:OpenMessaging

OpenMessaging, which includes the establishment of industry guidelines and messaging, streaming specifications to provide a common framework for finance, e-commerce, IoT and big-data area. The design principles are the cloud-oriented, simplicity, flexibility, and language independent in distributed heterogeneous environments. Conformance to these specifications will make it possible to develop a heterogeneous messaging applications across all major platforms and operating systems.

這是OpenMessaging-Java項目GitHub上的一段介紹,大致是說OpenMessaging項目致力於建立MQ領域的標准。

看了OpenMessaging-Java項目的源碼,定義了:

  • Message接口
  • Producer接口
  • Consumer接口
  • 消費方式:Pull、Push
  • 各種異常

確實是在朝着建立一套MQ的接口標准。

這引發了我的一個思考:MQ目前確實沒有一套標准的接口,如果我們嘗試從更高的層次看自己的項目,即我們希望它成為行業標准,那么現在項目中接口的定義合適嗎?是否夠通用、簡潔、易用、合理?

帶着這樣的疑問,最近把Kafka Consumer部分的源碼讀了一遍,因為:

  1. Kafka應該是業界最著名的一個開源MQ了(RocketMQ最初也是參考了Kafka去實現的)
  2. 希望通過讀Kafka源碼能找到一些定義MQ接口的想法

但是在讀完Kafka Consumer部分的源碼后稍稍有一些失望,因為它並沒有給我代碼我想要的,反而在讀完后覺得接口設計和源碼實現上相對於Kafka的盛名有一些名不副實的感覺。

接口定義

Kafka在消費部分只提供了一個接口,即Consumer接口。

Consumer接口如下:

  • Set<TopicPartition> assignment();
  • Set<String> subscription();
  • void subscribe(Collection<String> topics);
  • void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
  • void assign(Collection<TopicPartition> partitions);
  • void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
  • void subscribe(Pattern pattern);
  • void unsubscribe();
  • ConsumerRecords<k, v=""> poll(long timeout);
  • void commitSync();
  • void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
  • void commitAsync();
  • void commitAsync(OffsetCommitCallback callback);
  • void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);
  • void seek(TopicPartition partition, long offset);
  • void seekToBeginning(Collection<TopicPartition> partitions);
  • void seekToEnd(Collection<TopicPartition> partitions);
  • long position(TopicPartition partition);
  • OffsetAndMetadata committed(TopicPartition partition);
  • Map<MetricName, ? extends Metric> metrics();
  • List<PartitionInfo> partitionsFor(String topic);
  • Map<String, List> listTopics();
  • Set<TopicPartition> paused();
  • void pause(Collection<TopicPartition> partitions);
  • void resume(Collection<TopicPartition> partitions);
  • Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);
  • Map<TopicPartition, Long> beginningOffsets(Collection partitions);
  • Map<TopicPartition, Long> endOffsets(Collection partitions);
  • ...

(讀源碼時光看完這部分接口我就已經暈了)

上面的方法大致可以分為四類:

  1. 訂閱相關:subscribe、unsubscribe
  2. 消費相關:assign、poll、commit
  3. 元數據相關:搜索、設置、獲取offset信息;partition信息
  4. 生命周期相關:pause、resume、close等

看完這個接口的第一個感覺就是靈活有余易用不足。

Kafka幾乎暴露了所有的操作API,這樣的好處是足夠靈活,但是帶來的問題就是易用性下降,哪怕用戶只是希望簡單的獲取消息並處理也需要關心offset的提交和管理以及commit等等。

另外功能上也並沒有提供用戶更多的選擇,比如只提供了poll模式去獲取消息,而沒有提供類似push的模式。

線程模型部分

看完接口之后,第二步看了Kafka Consumer部分的線程模型,即嘗試將Consumer部分的線程模型梳理清楚:Consumer部分有哪些線程,線程間的交互等。

Consumer部分包含以下幾個模塊:

  1. Consuming
    • Consumer、ConsumerConfig、ConsumerProtocol
    • Fetcher
  2. 分布式協調
    • AbstractCoordinator、ConsumerCoordinator
  3. 分區分配和負載均衡
    • Assignor
    • ReblanceListener
  4. 網絡組件
    • NetClient
    • Future
    • FutureListener
  5. 異常
    • NoAvailableBrokersException、CommitFailedExceptin、...
  6. 元數據和數據
    • ConsumerRecord、ConsumerRecords
    • TopicPartition
  7. 統計及其他

通過分布式系統組件及分區分配策略,每個Consumer可以拿到自己消費的分區。之后通過Fetcher來執行獲取消息的操作,而底層通過網絡組件NetworkClient和Broker完成交互。

通過閱讀源碼和注釋發現,Kafka Consumer並沒有去管理線程,而是所有的操作都在用戶線程中完成。

所以線程模型就非常簡單,Consumer非線程安全,同時只能有一個線程執行操作,且所有的操作都在用戶的線程中執行。

Consumer通過一個AtomicLong的CAS操作來保證只能有一個線程操作(多線程的情況下會報出異常)

部分代碼實現解讀

ConsumerRecords<k, v=""> poll(long timeout)

poll應該是Consumer的核心接口了,因為到這里才真正執行了和獲取消息相關的邏輯。

首先是校驗邏輯,在poll之前如果沒有進行topic的訂閱或分區的分配,poll操作將拋出異常。

接着是poll的核心邏輯:

  • 在一個循環體中執行獲取數據的邏輯,跳出循環的條件是超時或者獲取到數據

從代碼中可以看出pollOnce應該是真正的執行一次獲取消息的操作。而代碼中注釋的部分是poll的核心:

  • fetcher#sendFetches方法給有需要的Server節點發送獲取消息的請求
    • 這么做的目的是在用戶下一次進行poll操作之前先將獲取消息的請求發送出去
    • 這樣網絡操作和就可以和用戶處理消息的邏輯並行,降低延遲
  • client#hasPendingRequests判斷是否還有未從客戶端發送出去的請求
  • client#pollNoWakeup執行網絡真正的網絡IO操作

從這段注釋和代碼中可以看出,poll時如果拿到數據了,會將剩余的請求發送出去來實現pipelining的目的。

所以對應的pollOnce內的邏輯必然有從緩存中(即上一次poll請求中獲取的數據)獲取數據的操作。

pollOnce對目標分區執行一路poll請求,大致流程如下:

  1. coordinator#poll確保Consumer在Coordinator的管理之中
    • ensure coordinator
    • ensure active group(將Consumer加入到group中)
    • 發送heartbeat
  2. 更新positions
  3. 從fetcher中獲取消息,如果已經拿到消息則返回結果,調用結束
  4. 對分區執行poll請求
  5. 阻塞等待至少一個fetch操作完成
  6. 判斷是否操作期間元數據進行了變更,如果變更了,丟棄獲取的數據
  7. 返回獲取結果

讀上面的代碼,第一個感覺就是可讀性比較差,比較難懂。

比如pollOnce中,fetcher#sendFetches從字面上看會理解成發送fetch請求:

  • 如果是同步的,那么應該獲取它的結果
  • 如果是異步的,應該通過Future獲取最終的結果

而實際上fetcher#sendFetches只是去構建了請求,並且將請求保存在NetworkClient中(NetworkClient會有數據結構保存每個Node對應的請求:類似這樣的數據結構Map<Node,Queue<Request>>)。

在client#poll中才將通過fetcher構造的請求真正的寫出去,並且阻塞的等待fetch的結果,從實現上感覺將代碼變的復雜了。

NetworkClient提供了異步的網絡操作,且是非線程安全的。

NetworkClient只有poll會真正的去執行IO操作,而其中的send只是將send數據保存在channel上,直到執行poll時將它寫到網絡中。

總結

在讀完Kafka Consumer部分的源碼后,稍稍有些失望:

  1. 只提供了poll模式,沒有提供給用戶更多的選擇,比如push模式
    • openmessaging在這塊分別提供了PullConsumer和PushConsumer接口
    • 而我們自己的項目則是提供了ListenConsumer、StreamConsumer等(Listen模式用戶只提供回調接口,我們管理線程,而Stream模式將消費線程交給用戶自己管理),繼續還會提供基礎的PullConsumer等
  2. Consumer接口的靈活性由於,易用性不足
    • 暴露了太多的接口,對於一個指向簡單獲取消息處理的使用方來說心智負擔太重
  3. 代碼的實現上復雜化了,比如提供了Fetcher和NetworkClient的實現非常復雜

總體上Consumer的代碼有一些亂,比如下面是Kafka源碼中Consumer部分的包組織和我自己讀源碼使對它的整理:

右邊是Kafka源碼Consumer部分的包結構,所有的類分了兩塊,內部的在internals中。右邊是自己讀源碼時根據各個模塊對Consumer的類進行划分。

私以為將各個類按照不同的模塊分開會更加清晰,讀起來也會更加舒服。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM