原文作者為 Luk Perkins,來自 Splunk 團隊。
文章翻譯已獲得原作者授權。
消息隊列是大多數大規模數據架構的主要組件。如果必須對數據進行實時處理,那么使用消息隊列是很好的選擇。
數據處理管道會發生各種故障,數據 consumer 可能會受到延遲或完全不能工作,網絡分區可能會暫時切斷整個 consumer 組與數據管道的連接等。
有些情況必須使用消息隊列,例如:
- 開發拼車應用程序,不考慮高峰時段的使用峰值,需要確保每個乘車請求最終只匹配到一位司機
- 金融級事務交易管道需要同步請求處理,以防止數據丟失
- 搭建基於微服務的處理管道,前端為具有多個寫入端點的 REST API(每秒進行數千次運算),需要確保即使后端微服務出現故障,所有的工作對象都保留在系統中
消息隊列如何工作
下圖為消息隊列常見****的工作方式(並對故障做出響應)的示意圖:
在上圖中,producer 1、2、3 和 4 通過消息 broker 將消息發送到管道,而 consumer 1、2、3 和 4 處理(然后確認)這些消息。在本示例中,當 consumer 1 出現故障時,會出現非常嚴重的問題。Producer 會繼續將數據傳送到系統中,但 consumer 1 不能繼續處理消息。Broker 應該****開始存儲所有原本將會用於 consumer 1 的消息數據,直到 consumer 1 能夠繼續處理消息。
從這個示例可以看出,對於堆棧中任何重要的消息隊列而言,穩定的存儲組件都必不可少。幸運的是,消息隊列與支持消息隊列的存儲系統一樣性能良好。如果存儲組件易發故障、受到損壞,或運行緩慢,因而即便僅有一個組件出現故障,也不能很好地應對,那么強烈建議大家更換存儲部件。
引入 Apache Pulsar
一般而言,由不同的系統處理訂閱-發布消息和消息隊列。例如,典型的技術棧可能使用 Apache Kafka 處理發布-訂閱消息,使用 RabbitMQ 處理消息隊列。在這種情況下,雖然系統工作良好,但是你需要同時部署、管理多個消息系統。
我最喜歡 Apache Pulsar 的一點就是,它可以輕松連接訂閱-發布消息和消息隊列。Pulsar 是第一個為了同時處理訂閱-發布消息和消息隊列而開源的消息系統。
因為使用 Apache BookKeeper 分布式日志存儲數據庫作為存儲組件,Pulsar 可以輕松地同時支持訂閱-發布消息和消息隊列。BookKeeper 作為日志存儲系統,基於消息 topic 數據結構而構建,支持水平擴展(增加 “bookie” 數量即可擴展容量),且運行迅速。
Pulsar 支持兩種基本的 topic 類型:持久 topic 與非持久 topic。用戶可以根據名稱辨別 topic 類型,因為類型即為 topic 名稱的“schema”(類似於 https 是 URL https://google.com 的 schema)。
持久 topic 的名稱格式為:persistent://public/default/some-topic,而非持久 topic 的名稱格式為:non-persistent://public/default/some-topic。
用戶使用持久 topic 時,Pulsar 將所有未確認消息(即未處理消息)存儲在 BookKeeper 中的多個“bookie”服務器上。
Pulsar 的確支持非持久 topic,但是我們建議用戶只在可以接受丟失消息的用例中,使用非持久消息。對於具有消息隊列功能的 topic,絕不應該使用非持久 topic。與將消息數據存儲在內存中相比,這種存儲方式具有很多優勢。
如何將 Apache Pulsar 用作消息隊列
Pulsar 無需特殊配置或調整,即可支持兩種用例,因此在使用方面具有一定的優勢。重點在於如何使用 Pulsar,如下圖所示:
發布-訂閱 producer 和 consumer 通過發布-訂閱 topic 進行通信,而隊列 producer 和 consumer 通過隊列 topic 進行通信。不需要“標記”topic,也不需要預先指定 topic 為實時 topic 或隊列 topic。
消息隊列 topic 需要 consumer 使用共享訂閱,而不能是獨占訂閱(exclusive)或災備訂閱(failover)。另外,所有 consumer 必須使用相同的訂閱名稱,否則就不是同一訂閱。當 consumer 在 topic 上創建共享訂閱后,Pulsar 會自動在接收消息的 consumer 之間進行負載平衡,對於消息隊列來說,這是最理想的狀態。
以下代碼展示了五個 Java consumer 使用共享訂閱監聽同一 topic 的場景:
String PULSAR_SERVICE_URL = "pulsar://localhost:6650";
String MQ_TOPIC = "persistent://public/default/message-queue-topic";
String SUBSCRIPTION = "sub-1";
// Pulsar client
PulsarClient client = PulsarClient.builder()
.serviceUrl(PULSAR_SERVICE_URL)
.build();
// Base consumer builder for instantiating multiple consumers
ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer()
.topic(MQ_TOPIC)
.subscriptionName(SUBSCRIPTION)
.subscriptionType(SubscriptionType.Shared)
.messageListener(messageCallback);
// Create five consumers (mq-consumer-0, mq-consumer-1, etc.)
IntStream.range(0, 4).forEach(i -> {
String name = String.format("mq-consumer-%d", i);
consumerBuilder
.consumerName(name)
.subscribe();
});
控制消息調度
吞吐量在消息隊列中尤為重要。如果消息隊列沒有足夠的吞吐量來處理周圍數據管道所需要的內容,那么消息隊列可能不僅性能不夠好,甚至會產生一些負面影響。如果使用 Pulsar 作為消息隊列,則可以通過調整 consumer 的配置來微調處理吞吐量。
默認情況下,Apache Pulsar consumer 有一個接收隊列,用於一次處理多條消息。用戶可以自行配置單個 consumer 接收隊列的大小(默認值為 1000 條消息)。
理想情況下,應該根據 consumer 處理消息的速度來設置接收隊列的大小。如果可以非常快速地處理消息(只需幾毫秒),那么建議將接收隊列的大小設置為較大的值,因為這樣有助於最大化 consumer 的處理吞吐量。
但是如果處理消息需要較長時間,最好將接收隊列的大小設置為較小的值。如果 consumer 正在執行的任務屬於 CPU 密集型,也就是說任務處理需要幾秒鍾甚至更久,則建議將接收隊列的大小設置為個位數或 1,這樣負載平衡器能夠在 consumer 之間合理地分發消息。
在下面這段代碼中,consumer 接收隊列比較小(Java):
Consumer<byte[]> consumer = client.newConsumer()
.topic("slow-processing-topic")
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("sub-1")
.receiverQueueSize(5)
.messageListener(messageCallback)
.subscribe();
接收隊列的默認值適用於很多用例。但是建議用戶稍微留意一下接收隊列,以免在后續工作中需要進行調優。
一個消息平台,兩種用例場景
如果想在不同用例場景中同時運行多個消息平台,大家可以考慮使用 Pulsar。Pulsar 同時支持兩種主要的消息用例——發布-訂閱消息(尤其是持久消息)和消息隊列,並且運行速度快、可擴展,還可以減輕運維管理負擔。