“Pulsar is a distributed pub-sub messaging platform with a very flexible messaging model and an intuitive client API.”
Pulsar是pub-sub模式的分布式消息平台,擁有靈活的消息模型和直觀的客戶端API。
Pulsar由雅虎開發並開源的下一代消息系統,目前是Apache軟件基金會的孵化器項目。
本片文章簡單介紹Pulsar的Producer,包含以下內容:
- Producer的設計
- 消息發送的實現
1. Producer設計
1.1 創建Producer
以上是Pulsar官網上創建一個Producer的示例代碼。
創建的過程如下:
- 指定serviceUrl創建PulsarClient
- 指定Producer發送消息的Topic,通過PulsarClient創建Producer
通過上述的創建代碼可以推測:
- serviceUrl應該是用於做服務發現的,通過serviceUrl查找Broker的信息
- Producer指定了Topic,那么一個Producer只能往特定的Topic發送消息
1.2 Producer API
Pulsar中,發送相關的接口為Producer,如上圖所示:
- Producer定義了發送接口
- ProducerBase作為抽象類,提供了基礎實現
- ProducerImpl則是真正的實現類
- PartitionedProducerImpl看着和分區相關,這個之后再看
Producer 接口具體如下:
public interface Producer<T> extends Closeable {
/**
* 返回Producer發送消息的Topic
*/
String getTopic();
/**
* Producer的名稱
*/
String getProducerName();
/**
* 同步發送消息
*/
MessageId send(T message) throws PulsarClientException;
/**
* 有發送消息
*/
CompletableFuture<MessageId> sendAsync(T message);
/**
* Flush客戶端完成中的消息並等待所有消息成功持久化
* @since 2.1.0
* @see #flushAsync()
*/
void flush() throws PulsarClientException;
/**
* 異步Flush客戶端完成中的消息並等待所有消息成功持久化
* @since 2.1.0
* @see #flush()
*/
CompletableFuture<Void> flushAsync();
/**
* 創建TypedMessageBuilder,用於構建消息
*/
TypedMessageBuilder<T> newMessage();
/**
* 同步發送消息,已經被棄用
*/
@Deprecated
MessageId send(Message<T> message) throws PulsarClientException;
/**
* 異步發送消息,已經被棄用
*/
@Deprecated
CompletableFuture<MessageId> sendAsync(Message<T> message);
/**
* 獲取Producer發送的最后一個序列號
*/
long getLastSequenceId();
/**
* 獲取Producer的統計信息
*/
ProducerStats getStats();
/**
* 異步關閉Producer並且釋放資源
*/
CompletableFuture<Void> closeAsync();
/**
* 返回Producer是否連接到Broker上
*/
boolean isConnected();
}
通過Producer接口可以看出Pulsar Producer提供的能力:
- 同步發送消息
- 異步發送消息
- 一個Producer只能向一個特定的Topic發送消息(Producer#topic()返回了一個Topic,說明Producer會綁定到一個Topic上)
- 批量發送(flush方法說明了應該是支持批量的,消息會在客戶端內存中保存)
- 包含了sequenceId是否可以做冪等之類的事情?
- 統計能力
1.3 ProducerBase
ProducerBase作為抽象類,實現了Producer接口。
ProducerBase包含四個屬性:
- producerCreatedFuture:異步創建Producer的Future
- conf:Producer的配置
- schema:消息相關的Schema信息
- interceptors:Producer的攔截器,在發送前后插入一些操作
producerCreatedFuture
重命名上看這個屬性是用於異步創建Producer。
但是在一個基類中提供異步創建實體的Future顯得比較難理解。一般的編程思路會在基類中定義一些基礎的公共的屬性,用於保存狀態或者配置,比如conf。這里的producerCreatedFuture實際用於PartitionedProducerImpl異步創建多個Producer,這個后續再看。
conf
ProducerConfigurationData提供了Producer相關的配置信息,包含是否批量發送、內存緩存消息的大小、發送的Timeout等。
schema
Schema指明了消息的格式,通過Schema完成對消息的encode和decode。
interceptors
ProducerInterceptor是Producer提供的攔截器,包含兩個方法:beforeSend、onSendAcknowledgement,分別用於在發送前和發送后插入行為。
1.4 ProducerImpl
ProducerImpl繼承了ProducerBase,是Producer接口的核心實現。
ProducerImpl在ProducerBase的基礎上增加了大量的屬性,包含:
- producerId:通過AtomicLong生成的進程內唯一的Producer ID
- msgIdGenerator:消息ID
- pendingMessages:內存中緩存的消息
- pendingCallbacks:內存中緩存的消息對應的Callback
- timeout:發送的超時配置
- batchMessageContainer:批量消息的容器
- producerName:全局唯一的Producer名稱
- 等等...(在后續發送實現中介紹相關的屬性)
ProducerImpl實現了具體的發送行為,比如同步發送、異步發送(后續在消息發送的實現部分介紹)。
1.5 PartitionedProducerImpl
Producer提供的發送相關的API定義,ProducerBase提供了基礎實現,ProducerImpl提供了具體的實現,那么PartitionedProducerImpl做什么?
通過PartitionedProducerImpl的屬性可以看到內部包含了一個ProducerImpl列表,那么可以PartitionedProducerImpl和ProducerImpl是一個組合的關系。
通過start方法可以看出,PartitionedProducerImpl根據對應的topicMetadata的分區數創建了對應數量的ProducerImpl實例(這里也說明了ProducerBase中producerCreatedFuture的用途)。
為什么在PartitionedProducerImpl中需要創建一組ProducerImpl實例?
PartitionedProducerImpl另外增加了一個routerPolicy屬性,其接口為:
public interface MessageRouter extends Serializable {
@Deprecated
default int choosePartition(Message<?> msg) {
throw new UnsupportedOperationException("Use #choosePartition(Message, TopicMetadata) instead");
}
default int choosePartition(Message<?> msg, TopicMetadata metadata) {
return choosePartition(msg);
}
}
通過接口的定義不難理解MessageRouter接口實現Message和Partition的映射。
通過internalSendAsync方法的實現可以看出,發送消息時通過routerPolicy將消息映射到Partition,通過Partition選擇對應的Producer執行發送,那么久解釋了為什么在PartitionedProducerImpl會創建和對應Topic的分區數相同的ProducerImpl。
通過以上內容,能總結出Producer模塊的各個類的職責:
- Producer:定義發送接口,用戶使用的核心API
- ProducerBase:Producer接口的基礎實現
- ProducerImpl:實現具體的發送行為,一個ProducerImpl只能向一個Topic寫入消息
- PartitionedProducerImpl:整合多個ProducerImpl,用於向多分區發送消息的場景
2. 消息發送的實現
在對Producer模塊有個整體的認識后,后續內容具體闡述一條消息的發送流程。
在消息系統中,從Producer的視角看,一條消息寫入過程一般包含:
- 消息校驗
- 消息屬性增強(添加一些必要的系統屬性)
- 消息路由(選擇目標分區)
- 消息序列化
- 消息數據寫入網絡
- 等待寫入結果響應
- 返回寫入結果
下面將通過ProducerImpl的實現來了解Pulsar的Producer發送消息的過程。
2.1 尋址
要發送一條消息,除了校驗消息是否合法,首先要這條消息的寫入目標(通過路由找到消息目標的Partition)。
在ProducerImpl的構造方法最后一行調用了grabCnx()方法創建了鏈接(構建了鏈接的上下文)。
grabCnx方法通過PulsarClient創建Connection,而PubsarClient內部則通過LookupService接口來完成Topic到Broker的映射並建立鏈接。
LookupService接口提供了BinaryProtoLookupService和HttpLookupService實現,通過LookupService用戶也可以實現自己的服務發現模塊。
2.2 消息發送
發送消息的調用鏈如上圖所示,最終通過ProducerImpl的internalSendAsync將消息發送出去。無論同步發送還是異步發送,最終都會通過異步的方式執行發送(同時只是在異步的基礎上等待發送結果),這里可以看到Pulsar Producer在API實現上比較注重代碼的復用性即API的最小功能原則。
以單挑消息發送為例,sendAsync的具體實現如下:
- 在必要的校驗后,將消息包裝成OpSendMsg對象(包含異步發送完成后的Callback)
- 將消息添加到pendingMessages
- 通過Connection的EventLoop執行發送操作
ProducerImpl將在ackReceived方法中處理服務端對寫入消息的響應,通過消息的sequenceId來識別對應的OpSendMsg,並調用對應Callback來執行回調邏輯。實際在Callback完成了響應用戶的操作及發送行為的一些統計。
ProducerImpl只會建立一個鏈接,且發送和ACK都是通過synchronized執行的,所以中間通過pendingMessages來完成消息發送和響應的對應,以及超時的處理。這塊具體可以看一下代碼實現。
總結
本文介紹了Pulsar Producer模塊的設計,包含各個類的職責,並簡單介紹了消息的發送過程。Puslar Producer在設計上和RocketMQ的思想差異還是比較大的,比如Puslar Producer會將Producer對應到分區上,每個分區有自己的Producer,這樣可以比較容易完成一些冪等之類的操作。