Pulsar-Consumer
“Pulsar is a distributed pub-sub messaging platform with a very flexible messaging model and an intuitive client API.”
Pulsar是pub-sub模式的分布式消息平台,擁有靈活的消息模型和直觀的客戶端API。
Pulsar由雅虎開發並開源的下一代消息系統,目前是Apache軟件基金會的孵化器項目。
本片文章簡單介紹Pulsar的Consumer,包含以下內容:
- Consumer的體系
- 消費邏輯的實現
1. Consumer體系
A consumer is a process that attaches to a topic via a subscription and then receives messages.
Consumer通過訂閱關系綁定到Topic(和Producer類似,都是綁定到一個Topic上),並接收消息。
Consumer支持:
- 同步接收消息:阻塞用戶線程等待消息
- 異步接收消息:異步等待消息(通過Future返回消息)
- 通過MessageListener返回消息:接收消息后回調用戶的MessageListener
Consumer提供了三類獲取消息的方式,其中異步的方式包含通過Future異步等待消息和通過MessageListener被動接收消息。MessageListener和另外兩種方式是互斥的,一旦Consumer注冊了MessageListener接口,則必須通過MessageListener處理消息,主動觸發receive獲取消息將拋出異常。
Consumer的繼承關系:
- Consumer:定義了消費者相關的接口
- ConsumerBase:接口中基礎方法的實現,抽象類
- ConsumerImpl:在ConsumerBase基礎上的Consumer具體實現
- MultiTopicsConsumerImpl:組合多個ConsumerImpl完成對多Topic/Partition的消費
Consumer的設計和Producer是一致的,通過接口定義行為,基礎類實現基本能力,在通過組合的方式來實現消費多個Topic/Partition(Producer則是像多個Topic發送消息)。
1.1 消費進度提交
Consumer處理消息后需要發送acknowledgement到Broker,這樣Broker可以丟棄消息(應該是移動消費offset的操作,類似RocketMQ,並不是真正的刪除消息)。支持單挑消息提交或者批量提交,批量提交則以最后一條消息的offset為准。(只是記錄一個offset比較某個位置之前的消息都已經被Consumer處理,所以批量提交其實只是把最大的offset提交)
1.2 訂閱模型
訂閱模型決定了消息時如何被投遞給Consumer的。在Pulsar中,訂閱模型有: exclusive、shared、 failover。
Exclusive
只能有一個Consumer綁定到訂閱關系上,其他Consumer嘗試綁定到訂閱關系上時會報錯(Exclusive是默認的訂閱模型)。
Shared
在Shared模型中,多個Consumer可以綁定到一個訂閱關系上。消息按照round-robin模式被投遞給各個Consumer。若某個Consumer宕機,被投遞給該Consumer的未被ACK(沒有acknowledgement)的消息將被重新投遞給其他的Consumer進行消費。
Shared模式帶來的限制:
- 消息時按照round-robin模式投遞給各個Consumer的,所以消息順序無法得到保障
- 同樣因為round-robin模式,無法使用批量提交acknowledgement的功能(如上圖Consumer C-3如果提交了m4會導致m3被標記為已經消費,但實際Consumer C1可能還沒處理m3)
failover
在Failover模型中,多個Consumer可以綁定到一個訂閱關系上,但是只有一個稱為Master Consumer的消費者能消費消息。對多個Consumer按照name進行排序,第一個Consumer則為Master Consumer。
在Master Consumer失效(比如斷開連接)后,Master Consumer未提交的消息和后續的消息會提交給后續的Consumer。
2. 消費邏輯的實現
Consumer獲取消息的核心API有以下兩個,分別實現同步獲取消息和異步獲取消息:
/**
* Receives a single message.
* <p>
* This calls blocks until a message is available.
*
* @return the received message
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
* @throws PulsarClientException.InvalidConfigurationException
* if a message listener was defined in the configuration
*/
Message<T> receive() throws PulsarClientException;
/**
* Receive a single message
* <p>
* Retrieves a message when it will be available and completes {@link CompletableFuture} with received message.
* </p>
* <p>
* {@code receiveAsync()} should be called subsequently once returned {@code CompletableFuture} gets complete with
* received message. Else it creates <i> backlog of receive requests </i> in the application.
* </p>
*
* @return {@link CompletableFuture}<{@link Message}> will be completed when message is available
*/
CompletableFuture<Message<T>> receiveAsync();
MessageListener則通過ConsumerBuilder接口進行設置並傳入到Consumer的構造方法中。
這三個API都由ConsumerImpl#messageReceived觸發,即Consumer接收到消息后根據請求的類型來決定:
- 同步獲取消息的,將消息放入內存隊列,被掛起的線程會從隊列中獲取消息
- 異步獲取消息的,執行callback將消息放入future
- 通過MessageListener處理消息的,通過ListenerExecutor執行邏輯
可見Pulsar在消費模式上處理是統一的,即無論客戶端采用何種方式進行消息的接收,消息統一由服務端進行“推送”,而在Consumer內部根據用戶請求的類型進行處理。
通過ConsumerImpl#messageReceived的實現可以發現Pulsar的消息消費是一種“推”的模型,這和RocketMQ的“拉”的模型差異是很大的(RocketMQ采用一種Long-Polling的方式,由Consumer主動發起請求從服務端獲取數據,若服務端有需要處理的消息,請求立即返回;如果沒有消息,這個請求會在服務單阻塞一段時間,直到新消息到達或者請求即將超時,返回給客戶端)。
Consumer獲取消息的模型
具體看Pulsar-Consumer獲取消息的代碼實現會發現它也不是一種純粹的,類似淘寶Notify的推的模式,而是一種推拉結合的方式,示意如下:
- Consumer向Broker發送FLOW請求,通知Broker可以推送消息給Consumer
- Broker將消息通過MESSAGE請求將消息推送給Consumer
這是一個反復的過程,每次Consumer接收消息處理后都會繼續發送FLOW請求給Broker。
這是在RocketMQ或者Kafka的設計中都沒有的一種方式,這種方式進行一定的拓展則可以實現類似akka的Dynamic Push/Pull模式(詳見公眾號歷史文章:《Push or Pull?》)。
在閱讀Pulsar Consumer部分代碼的時候還發現非常有趣的一點,當你搜索“Consumer”時會出現一個Consumer接口和一個Consumer類:
- 接口: org.apache.pulsar.client.api.Consumer
- 類: org.apache.pulsar.broker.service.Consumer
Consumer接口是Client模塊定義Consumer行為的,為什么在Broker模塊會有一個Consumer類?
實際在Broker端會給鏈接上來的Consumer構造一個對應的Consumer對象,維護遠端的Consumer的鏈接等信息。所有對遠端的Consumer的操作會封裝在Broker端的Consumer中。這樣可以更好的實現代碼的可插拔性,降低耦合,提升代碼的可測試性。比如在測試Broker端的邏輯時,只需要Mock一個Consumer類來模擬各種正常和網絡異常的情況,而不需要真正的啟動一個Consumer。
總結
本文主要是介紹一下Pulsar Consumer模塊的相關概念和一些模型,沒有深入的解讀代碼實現。Pulsar Consumer的實現方式還是非常有趣的,和大家比較熟悉的RocketMQ的實現方式差異比較大,值得一讀。