RabbitMQ是一種使用Erlang語言編寫的開源的消息中間件,最初實現了AMQP(高級消息隊列協議),后來通過插件架構進行了擴展,支持STOMP(面向流文本的消息傳遞協議)、MQTT(MQ遙控傳輸)等協議。
1. RabbitMQ優勢
RabbitMQ支持多種客戶端,如Java、.NET、C、Python等,在易用性、擴展性、高可用性等方面表現都不錯,並且可以與Spring AMQP完美整合,API豐富易用。
RabbitMQ程序健壯、穩定、易用,跨平台、支持多種語言,管理界面簡單易用,功能全面,文檔相對比較齊全,社區活躍。
2. AMQP簡介
AMQP,全稱為:Advanced Message Queuing Protocol,高級消息隊列協議,是面向消息中間件的開放標准的二進制應用層協議。AMQP的核心特性是:面向消息、排隊、路由(包括點對點和發布訂閱)、可靠性和安全性。這些功能使其非常適合在應用程序之間傳遞業務消息,AMQP還可以用作物聯網IoT協議。
目前,AMQP 1.0已經被批准為國際標准,具體規范文檔,可以進一步閱讀:OASIS Advanced Message Queuing Protocol (AMQP) Version 1.0
而RabbitMQ最初是為了支持AMQP 0-9-1而開發的,因此,該協議是RabbitMQ Broker支持的核心協議。
下面我們就簡要介紹下AMQP 0-9-1協議。這部分內容,概念會比較多,稍微有點枯燥,但是可以說RabbitMQ就是按照這個協議去實現的,所以熟悉這個協議很重要。
完整的AMQP文檔可以從這里下載:AMQP Working Group 0-9-1
2.1 AMQP模型概述
2.1.1 AMQP 0-9-1
AMQP 0-9-1 是一個二進制協議,定義了非常強大的消息傳遞語義。對於客戶端來說,這是一個相對容易實現的協議,因此有大量客戶端庫可用於許多不同的編程語言和環境。
AMQP 0-9-1通常划分為兩層:
功能層(Functional Layer)
:定義了一組命令(按功能做不同的分類),提供給應用程序,用於支撐消息相關的工作;傳輸層(Transport Layer)
:傳輸層將這些方法從應用程序傳送到服務器並返回,並處理通道多路復用、成幀、內容編碼、心跳、數據表示和錯誤處理。
可以在不改變協議的應用程序相關功能的情況下用任意的傳輸協議來替換傳輸層,也可以將傳輸層用於不同的高級協議。
2.1.2 AMQP 0-9-1模型簡介
如下圖,消息Broker(代理)
從消息發布者
(發布消息的應用程序,也稱為生產者)接收消息並將它們路由到消費者
(處理消息的應用程序)。
由於AMQP是一個網絡協議,因此,生產者
、消費者
、代理
都可以部署在不同的機器上。AMQP模型如下圖所示:
消息發布到交換機(exchanges)
(通常將其比作郵局或郵箱),然后使用稱為綁定(Bindings)
的規則將消息副本分發到隊列(queues)
。然后代理(brokers)
要么將消息傳遞(deliver)
給訂閱隊列的消費者(consumers)
,要么消費者主動按需從隊列中獲取(fetch)
/拉取(pull)
消息。
消息元數據:發布消息的時候,發布者可以指定各種消息元數據(消息屬性)
,其中一些元數據可能由代理使用,其余的元數據僅由接收消息的應用程序使用。
消息確認:由於網絡是不可靠的,並且應用程序可能無法正確處理消息,因此AMQP 0-9-1模型有一個消息確認的概念:當消息傳遞給消費者時,消費者會自動或者由開發人員在應用程序中手動指定通知代理Broker,代理只會在收到消息(或消息組)的通知時從隊列中完全刪除該消息。
死信隊列:在某些情況下,例如,當消息無法路由時,消息可能會返回給發布者、或者丟棄掉、或者將其放入所謂的死信隊列
(如果代理擴展支持),發布者通過使用某些參數來選擇如何處理此類情況。
隊列(queues)/交換機(exchanges)和綁定(bindings)統稱為AMQP實體。
2.1.3 AMQP 0-9-1 是一個可編程的協議
AMQP 0-9-1是一種可編程的協議,因為AMQP 0-9-1實體
和路由方案
由應用程序本身定義,而不是代理管理員。因此AMQP制定了一些規定來實現這些協議操作:
- 聲明隊列和交換機;
- 定義他們之間的綁定;
- 訂閱隊列等。
這為應用程序開發人員提供了很大的自由,但也要求他們了解潛在的定義沖突。在實踐中,定義沖突很少見,通常表示為配置錯誤。
應用程序聲明它們需要的AMQP 0-9-1實體
,定義必要的路由方案
,並在不需要使用它們時進行刪除。
2.2 交換機(Exchanges)和交換機類型
交換機
是發送消息的AMQP 0-9-1實體。交換機收到一條消息,並將其路由到零個或者多個隊列
中。咳咳,Java架構雜談
提醒大家,不要聯想到了網絡的交換機(Network switch),只是中文名稱一樣而已。
使用的路由算法
取決於交換機類型
和稱為綁定
的規則。以下是AMQP 0-9-1 Broker提供的四種交換機類型:
交換類型 | 默認的預定義名稱 |
---|---|
直連交換機(Direct exchange) | 空字符串和amq.direct |
扇形交換機(Fanout exchange) | amq.fanout |
主題交換機(Topic exchange) | amq.topic |
頭信息交換機(Headers exchange) | amq.match 和RabbitMQ中的 amq.headers |
除了交換類型之外,交換機還聲明了許多屬性,關鍵屬性有:
Name
,交換機的名稱持久性
,保證交換機在Broker重啟后仍然存在,如果沒有指定持久,那么交換機在Broker重啟后就不存在了,需要重新聲明,並非所有場景和用例都要求交換機是持久的;自動刪除
,當最后一個隊列解除綁定時,交換機被刪除;參數
,可選,由插件和特定於代理的功能使用。
2.2.1 默認交換機
默認交換機是由Broker預先聲明的匿名直連交換機。使用默認交換機的時候,每個新建隊列都會自動綁定到默認交換機上,綁定的路由鍵與隊列名稱相同,默認交換機看起來可以將消息直接傳遞到隊列。
2.2.2 直連交換機
交換機根據消息路由鍵(router_key)
將消息傳遞到隊列,消息將會投遞到與路由鍵名稱和隊列名稱相同的隊列上。直接交換機是消息單播路由
的理想選擇(盡管它們也可以用於多播路由)。
直連交換機如下圖所示:
- 一個隊列N使用路由鍵 K 綁定到交換機;
- 當具有路由鍵 M 的新消息到達直連交換機時,如果 K = M,則交換機將其路由到隊列N。
如上圖,具有路由鍵"itzhai.com"的消息達到交換機之后,則會路由到Queue1中。
直連交換機通常用於以循環的方式在多個消費者之間分配任務,也就是說,消息的負載均衡是發生在消費者之間而不是隊列之間。
2.2.3 扇形交換機
扇形交換機將消息路由到綁定到它的所有隊列,並且忽略路由鍵。也就是說,當新消息發布到該交換機時,該消息的副本將投遞到所有綁定該交換機的隊列。扇形交換機是消息廣播
路由的理想選擇。
扇形交換機如下圖所示:
使用扇形交換機的案例都非常相似:
- 大型多人在線游戲(MMO)可以將其用於排行榜更新或其他全局事件;
- 體育新聞網站可以使用扇形交換機向客戶端近乎實時的分發比分信息;
- 分布式系統使用它來廣播各種狀態和配置更新;
- 群聊可以使用它在參與者之間分發消息(AMQP沒有內置presence的概念,因此XMPP可能會是更好的選擇)。
2.2.4 主題交換機
主題交換機根據消息路由鍵和和用於將隊列綁定到交換機的模式匹配字符串之間的匹配將消息路由到一個或者多個隊列。
也就是說通過消息的路由鍵去匹配到綁定到交換機的路由鍵匹配字符串,如果匹配上了,就進行投遞消息。
routing key模糊匹配的通配符如下:
*
:用於匹配一個單詞,比如itzhai.com.*
,可以匹配:itzhai.com.a
,itzhai.com.b
#
:用於匹配0個或者多個單詞,比如itzhai.com.#
,可以匹配:itzhai.com.a
,itzhai.com.a.b
routing key通過.
分隔字符串。
主題交換機如下圖所示:
當生產者發送的routing_key=itzhai.com
的時候,會把消息路由投遞到Queue1和Queue2。
當生產者發送的routing_key=www.itzhai.com
的時候,會把消息路由投遞到Queue3。
當需要有針對性的選擇多個接收消息的消費者或者應用的時候,主題交換機都可以被列入考慮的范圍。常見的使用場景:
- 后台任務由多個工作線程完成,每個工作線程負責處理某些特定的任務,這個時候可以通過主題交換機訂閱感興趣的消息;
- 分發特定地理位置的信息,每個工作線程只訂閱感興趣的地理位置的信息;
- ...
2.2.5 頭交換機
頭交換機不依賴路由鍵的匹配規則來路由消息,而是根據發送消息內容中的請求頭屬性進行匹配。
頭交換機類似於直連交換機,但是直連交換機的路由鍵必須是一個字符串,而請求頭信息則沒有這個約束,它們甚至可以是整數或者字典。因此可以用作路由鍵不必是字符串的直連交換。
綁定一個隊列到頭交換機上的時候,會同時綁定多個用於匹配的頭信息。
投遞消息的時候,可以攜帶一個x-match
參數,指定是否要求必須所有的頭信息都匹配(all)才能進行投遞,還是只要匹配任何一個就可以了(any)。
注意:以字符串
x-
打頭的頭屬性,不會作為匹配項。
2.3 隊列(Queues)
AMQP 0-9-1 中的隊列與其他消息隊列和任務隊列系統中的隊列類似,用於存儲即將被消費的消息。一般地,隊列與交換機共享一些屬性,但隊列也有一些特定的屬性:
Name
:隊列名稱;Durable
:隊列持久化,隊列在Broker重啟之后是否繼續存在;Exclusive
:隊列是否僅由一個連接使用,如果是,在連接關閉的時候,隊列將被刪除;Auto-delete
:當最后一個消費者取消訂閱的時候,立即刪除;Arguments
:可選,一些特定的插件和Broker功能使用,例如實現消息的TTL,隊列長度限制等。
關於隊列的聲明:
在使用隊列之前,必須先聲明它。聲明隊列的時候,如果隊列尚不存在,則將創建一個隊列;如果隊列已存在,並且屬性與聲明中的屬性相同,則不用重新創建一個;如果現有隊列屬性與聲明的隊列屬性不同,將會引發
406(PRECONDITION_FAILED)
的通道級異常。
2.3.1 隊列名稱
應用程序可以設置隊列名稱,如果設置為空字符串,Broker會為它們生成一個唯一的隊列名稱,在隊列聲明響應體中一起返回給客戶端。隊列名稱為255個字節以內的UTF-8字符。
以
amq
開頭的隊列名稱,保留給Broker內部使用,如果嘗試使用此類名稱聲明一個隊列將導致通道級別異常:403(ACCESS_REFUSED)
。
2.3.2 隊列持久化
持久化的隊列的元數據會存儲在磁盤上,當Broker重啟之后,隊列依然存在。沒有被持久化的隊列稱為暫存隊列。發布的消息也有同樣的區分,也就是說,持久化的隊列並不會使得路由到它的消息也具有持久性,需要手動把消息也標記為持久化才能保證消息的持久性。
2.4 綁定(Bindings)
綁定是交換機將消息路由到隊列的規則
。為了讓交換機能夠正確的把消息投遞到對應的隊列,需要把交換機和隊列通過路由鍵綁定起來,路由鍵就像是一個過濾器,決定了消息是否可以投遞給消息隊列。
如果一條消息不能被路由到任何隊列(例如,因為它被發布到的交換機沒有綁定),它要么被丟棄,要么返回給發布者,這取決於發布者設置的消息屬性。
2.5 消費者
如果消息只是存儲在隊列里沒有被消費,是沒有什么實際作用的。在AMQP 0-9-1中,有兩種途徑可以進行消息的消費:
- 訂閱消息隊列,以將消息投遞給應用(
push API
),這是推薦的做法; - 應用根據需要主動的輪訓獲取消息(
pull API
),這種方式非常低效,在大多數情況下應該避免。
如果應用程序對某一個特定隊列的消息感興趣,則可以注冊一個消費者,對隊列進行訂閱。每個隊列可以有多個消費者,當然也可以注冊一個獨享的消費者,這個時候其他消費者會被排除在外。
每個消費者(訂閱)都有一個稱為消費者標簽的字符串類型的標識符,可以用它來退訂消息。
2.5.1 消息確認
消費者應用程序可能偶爾無法處理單個消息或有時會崩潰,另外網絡問題也有可能導致問題。這就提出了一個問題:Broker何時應該從隊列中刪除消息?AMQP 0-9-1 規范中約定讓消費者對此進行控制,有兩種確認模式:
- 自動確認模式:在Broker向應用程序發送消息之后(使用basic.deliver或basic.get-ok方法),將消息從消息隊列中刪除;
- 顯示確認模式:在應用程序向broker發回確認之后(使用basic.ack方法),將消息從消息隊列中刪除。
在顯示模式下,應用程序選擇何時發送確認消息。如果消費者在沒有發送確認的情況下就掛掉了,那么Broker會將其重新投遞給另一個消費者,如果此時沒有可用的消費者,那么Broker將等到至少有一個消費者注冊到該隊列時,再嘗試重新投遞消息。
另外,如果應用程序崩潰(當連接關閉時 AMQP Broker會感知到這一點),並且AMQP Broker在預期的時間內未收到消息確認,則消息將重新入隊,如果此時有其他消費者,可能立即傳遞給另一個消費者。為此,我們的消費者做好業務的冪等處理也是非常重要的。
2.5.2 拒絕消息
當消費者接收到消息之后,可能處理成功或者失敗。應用程序可以通過拒絕消息向Broker表明消息處理失敗了(或者當時無法完成)。拒絕消息的時候,應用程序可以要求Broker丟棄消息或者重新入隊。
當隊列中只有一個消費者的時候,請確保您不會通過不斷地拒絕消息和重新入隊導致消息在同一個消費者身上無限循環的情況發生。
在AMQP中,basic.reject
方法用來執行拒絕消息的操作。
2.5.3 預取消息
在多個消費者共享一個隊列的情況,能夠制定每個消費者在發送下一個ack之前可以一次性接收多少條消息,這是非常有用的特性。這可以在試圖批量發布消息的時候,起到簡單的負載均衡和提高消息吞吐量的作用。
請注意:RabbitMQ僅支持通道級預取計數,不支持基於連接或者大小的預取。
2.6 消息屬性和有效負載
AMQP 0-9-1模型中的消息是具有屬性的,有些屬性非常常見,以至於AMQP 0-9-1明確定義了它們,例如:
Content type
內容類型Content encoding
內容編碼Routing key
路由鍵Delivery mode (persistent or not)
投遞模式,是否持久化Message priority
消息優先級Message publishing timestamp
消息發布的時間戳Expiration period
消息有效期Publisher application id
發布消息的應用id
有些屬性是被AMQP的Broker所使用的,但是大多數是開放給接收它們的應用程序用的。有些屬性是可選的,稱為消息頭(headers),它們類似於HTTP協議的X-Headers,消息屬性需要在消息被發布時定義。
消息體:AMQP消息除了屬性之外,還包括一個有效載荷Payload(消息實際攜帶的數據),AMQP Broker視其為一個透明的字節數組來對待。Broker不會修改payload。消息可能只包含屬性而沒有payload。payload通常使用JSON、Thrift、Protocol Buffers和MessagePack等序列化格式來序列化成結構化的數據,以便進行發布,協議對等方通常使用Content type
和Content encoding
字段來傳達此信息。
消息持久化:消息可以作為持久性發布,這使得Broker將他們持久化到磁盤。如果服務器重啟之后,系統可以確保接收到的持久化消息不會丟失。簡單的將消息發布到持久化的交換機或者被路由到持久化的隊列中,是不會讓消息也持久化的,消息是否持久化完全取決於消息本身的持久模式。將消息發布為持久性會影響性能,就像數據存儲一樣,持久性以一定的性能成本作為代價。
2.7 AMQP 0-9-1 方法
AMQP 0-9-1中定義了許多操作方法,詳細參考:AMQP 0-9-1參考。
很多方法都有對應的響應方法,有些甚至有不止一種可能的響應,如basic.get,響應可能為:get-ok或者get-empty。
如下是聲明一個交換機和響應成功的方法:
2.8 連接(Connections)
AMQP 0-9-1 連接通常是長連接,AMQP 0-9-1 是一種使用TCP提供可靠投遞的應用層協議。AMQP 0-9-1連接使用身份認證機制並提供TLS (SSL)保護。當應用程序不再需要連接到Broker時,它應該優雅地關閉其 AMQP 0-9-1 連接,而不是突然關閉底層 TCP 連接。
2.9 通道(Channels)
某些應用程序需要同時開啟連接到Broker,但是,同時保持許多TCP連接是不可取的,這樣會消耗系統資源並且使得配置防火牆更加困難。
AMQP 0-9-1通過通道復用技術通過通道的形式實現在一個TCP連接上面支持多個連接(虛擬的鏈接)。同一個TCP連接中有多個通道,通道之間的通信是完全隔離的。客戶端的每個協議操作都攜帶了一個通道ID,代理和客戶端都是用它來確定該操作所走的通道。
通道僅存在於TCP連接上下文中,一旦TCP連接關閉,其上所有通道也跟着關閉。
一般的,我們會給每個線程打開一個新的通道進行通信。
2.10 虛擬主機
為了讓單個代理可以托管多個隔離的環境(用戶組、交換機、隊列等),AMQP中提供了虛擬主機,這類似於許多流行的Web服務器使用的虛擬主機。協議客戶端在連接協商期間需要指定想要使用的虛擬主機。
2.11. AMQP Client架構
推薦的AQMP Client架構須由下面多個抽象層組成:
成幀層
:此層接收AMQP協議方法,並按某種語言格式(結構、類等)來序列化成線級幀,成幀層可以根據AMQP協議規范來實現;連接管理層
:此層用於讀寫AMQP幀,並管理所有連接、會話邏輯。在此層中,我們可以封裝開啟連接和會話、錯誤處理、內容傳輸和接收數據的全部邏輯;API層
:此層暴露了應用程序工作的特定API。API層可能會反映一些現有的標准,或暴露高層AMQP的方法。API層本身可能是由多個層組成的,如構建於AMQP方法API之上的高級API;IO層
:此外,通常還會有一些I/O層,這此可以是非常簡單的同步套接字讀取和寫入或復雜的異步多線程IO。
AMQP就介紹到這里了,接下來Java架構雜談
帶大家詳細看看RabbitMQ。
3. RabbitMQ架構
RabbitMQ的整體架構如下圖所示:
Broker
:Broker中按虛擬主機(virtual host
)划分,每個虛擬主機下面有自己的交換機(exchange
)和消息隊列(queue
),以及交換機和隊列的綁定routing_key
(有些人會把這個key稱為binding_key
);
生產端
:一般地,同一個客戶端(client
)里面的每個生產者(producer
)創建一個專門的通道(channel
),復用同一個TCP連接(connection
),每個生產者可以往Broker發布消息,發布消息的時候,需指定虛擬主機,以及虛擬主機上的交換機,並且消息需要帶上routing_key;
消費端
:一般地,同一個客戶端(client
)里面的每個消費者(consumer
)創建一個專門的通道(channel
),復用同一個TCP連接,每個消費者指定一個消息隊列進行消費。同一個消息隊列,可以有多個消費者共同消費,但消息隊列里面的同一條消息,只會由一個消費者消費,多個消費者相當於給消息隊列做了負載均衡。
針對默認交換機
、直連交換機
和主題交換機
,生產端帶入的routing_key
和交換機與隊列之間綁定的routing_key(binding_key)
進行匹配,匹配上了,就把消息投遞給對應的消息隊列。
針對扇形交換機,直接把消息投遞給所有與扇形交換機綁定的隊列。
rabbitmqctl
是管理RabbitMQ服務器節點的主要命令行工具,相關完成命令介紹參考:rabbitmqctl(8)
4. RabbitMQ特性
4.1 消息ACK機制
ACK (Acknowledge character),即是確認字符,消息的接收方需要告訴發送方已確認接收消息,這是實現可靠消息投遞的必備特性。
MQ系統中,涉及到ACK的流程如下圖所示:
4.1.1 生產端ACK之CONFIRM消息機制
如上圖所示:
Producer
發布消息到Broker
;Broker
將消息落地;Broker
發送ack給Producer
。
如果Producer
沒有收到ack,那么可以重發消息,直到收到ack為止。為了避免無限的給Broker
投遞消息,應該設置一個重試上限,並記錄下發送失敗的消息。在這個過程中,MQ Server可能會收到重復消息。
在RabbitMQ中,生產端的ACK通過ConfirmListener機制來實現:
在channel中開啟確認模式confirmSelect()
,然后在channel中添加監聽,用來監聽Broker返回的應答。
Broker何時給生產端發送ACK?
對於不可路由的消息,一旦交換機驗證消息不會路由到任何隊列,Broker將發出ack,如果開啟了Return消息機制(下一小節講解),那么Broker會先發送basic.return
消息給客戶端,再發送basic.ack
消息。示例代碼如下:
String message = "Hello ...."; // Confirm消息機制 channel.addConfirmListener(new TestConfirmListener()); // Return消息機制 channel.addReturnListener(new TestRetrunListener()); // 錯誤的路由鍵,但交換機的名稱正確 String errorRoutingKey = "itzhai.com.test1"; boolean mandatory = true; channel.basicPublish(exchangeName, errorRoutingKey, mandatory, basicProperties, message.getBytes());
執行以上代碼,生產者將依次收到basic.return(Return消息),basic.ack(Confirm消息)。
對於可路由的消息,當所有隊列都接收到消息的之后,Broker向生產端發送ACK。如果路由到的是持久隊列,並且是持久消息,那么這個ACK就意味着消息持久化到了磁盤。
也就是說,路由到持久隊列
的持久消息
的ACK將在將消息持久化到磁盤后發送。
RabbitMQ消息持久化的性能如何?
RabbitMQ持久化消息的刷盤策略:為了盡可能減少fsync(2)的調用次數,RabbitMQ在間隔一段時間(幾百毫秒)或者在隊列空閑的時候將消息分批保存到磁盤中。
這就意味着,在正常的負載下,生產端接收Broker的ACK時延可達幾百毫秒。為了提高吞吐量,強烈建議生產端應用程序異步處理ACK,或者批量發布消息,並等待ACK。
4.1.2 生產端ACK之RETURN消息機制
Return消息機制用於處理一些不可路由的消息。發送消息的時候,如果指定的routing_key路由不到隊列,這個時候就可以通過ReturnListener監聽這種異常情況。
4.1.3 消費端ACK
如上圖所示:
- 消息服務器將消息投遞給消費者;
- 消費者消費消息,並向消息服務器發送ack;
- 消息服務器收到消費者的ack之后,將已落地的消息刪除掉。
當Broker一直沒有收到消費端的ACK,則會重發消息,這個過程一般采用指數退避策略
,時間間隔按指數增長。
Rabbit中的消費端ACK
在RabbitMQ中,消費端的ACK可以是自動的,或者手動的。
手動ACK簽收
通過以下方法關閉自動ack簽收(入參autoAck設置為false):
Channel.java String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
然后自定義一個支持ack的Consumer:
public class TestAckConsumer extends DefaultConsumer { ... public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try{ ... // 成功消費的ack boolean multiple = false; channel.basicAck(envelope.getDeliveryTag(), multiple); }catch (Exception e) { // 未成功消費的ack,設置為不重回隊列,即立刻刪除消息 boolean multiple = false; boolean requeue = false; channel.basicNack(envelope.getDeliveryTag(), multiple, requeue); } } }
channel中有三種ack相關的方法:
- basic.ack:用於肯定確認,指示RabbitMQ消息已經
處理成功
,可以丟棄
消息了; - basic.nack:用於否定確認,指示RabbitMQ消息
未處理成功
,可以通過參數指定是否需要丟棄消息
還是重回隊列
。 - basic.reject:用於否定確認,指示RabbitMQ消息
未處理成功
,可以通過參數指定是否需要丟棄消息
還是重回隊列
。
basic.nack與basic.reject的區別就是,basic.nack支持批量手動確認,basic.nack是RabbitMQ對AMQP 0-9-1協議的擴展。
自動ACK簽收
使用自動確認模式,消息在發送之后就立刻被標記為投遞消費成功。如果消費者的TCP連接或者通道在真正投遞成功之前就關閉了,那么Broker發送的消息將會丟失。自動確認模式是以降低消息投遞的可靠性來換取更高的消費端吞吐量(只要消費端處理速度能夠跟上)。
如何避免消費過載的問題(消費端限流)?
使用自動模式可以提高吞吐量,但是前提是消費端要能夠處理得過來,如果處理不過來,就會在消費端的內存中積壓消息,直至把內存耗盡。因此,自動確認模式僅推薦用於能夠以穩定的速度高效地處理消息的消費者。
為了避免消費過載問題,我們一般使用手動確認模式
,配合通道預取限制
一起使用:
// 每條消息的大小限制,0表示不限制 int prefetchSize = 0; // MQ Server每次推送的消息的最大條數,0表示不限制 int prefetchCount = 1; // true 表示配置應用於整個通道,false表示只應用於消費級別 boolean global = false; channel.basicQos(prefetchSize, prefetchCount, global); // 隊列名稱 String queueName = "com.itzhai.queue"; // 設置為手動確認模式 boolean autoAck = false; // 消費者對象實例 Consumer consumer = new ItzhaiTestConsumer(channel); channel.basicConsume(queueName, autoAck, consumer);
如何提高手動ACK簽收的效率
如果不需要嚴格控制發送消費端ACK的時間,即,只要消費者成功接收到消息,不管有沒有消費成功,都允許進行ACK回復,那么就可以通過批量ACK簽收
的功能更來提高簽收的消效率。做法如下:
// 手動簽收模式 boolean autoAck = false; channel.basicConsume(queueName, autoAck, "a-consumer-tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); // 注意,這里設置為批量簽收 boolean mutiple = true; // 簽收deliveryTag以及deliveryTag之前的所有消息 channel.basicAck(deliveryTag, mutiple); } });
這樣執行basicAck
,deliveryTag以及deliveryTag之前的所有消息都將會被簽收。
什么時候需要讓消息重回隊列?
有時候消費者太繁忙導致無法立即處理接收到的消息,但是其他實例可能可以處理。這種情況,就可以拒絕消息,並且讓消息重回隊列。
另外,可以使用channel.basicNack
方法一次拒絕或者重新排隊多條消息:
// 指定批量拒絕策略 boolean multiple = true; // 指定拒絕之后重新入隊 boolean requeue = true; channel.basicNack(envelope.getDeliveryTag(), multiple, requeue);
極端情況下,如果所有消費者因為暫時無法處理接收的消息,會導致消息不斷的循環重回入隊,導致消耗網絡帶寬和CPU資源。為了避免這種情況,可以跟蹤重回隊列的消息數量,決定是否需要永久拒絕消息(丟棄消息)還是延遲重回隊列的時間。
4.2 消息的順序性能夠得到保證嗎?
一般情況下,在單個通道上發布的消息,Rabbit會按照消息發布的相通順序向生產端發送ACK消息,但也不是絕對的。發布ACK的確切時刻取決於消息的傳遞模式(持久化或瞬時),以及消息路由到的隊列的屬性。也就是說,不同的消息在不同的時間准備好進行確認,確認消息可以以不同的順序達到。所以,應用程序盡可能不要依賴於消息的順序性。
4.3 消息處理的冪等性如何處理?
無論是生產端還是消費端的ACK,都有可能因為網絡或者程序問題導致ACK消息沒有及時送達,這個時候會導致重復的消息投遞。如何保證消費同一條消息的情況下不影響業務,這就需要保證消息處理的冪等性。
也就是說,針對同一條消息,無論消費者消費多少次,產生的效果始終應該跟消費一次的保持一致,並且返回的ACK結果也是一致的。
常用的實現消息處理冪等性的方法:
- 每條消息生成唯一ID,消費端根據唯一ID判斷是否已經消費過,如果消費過,則直接返回消費成功的ACK。
- 針對入庫的業務操作可以通過數據庫的唯一索引來實現避免重復業務數據入庫;
- 針對修改數據類的操作,可以先判斷數據是否已經是目標狀態了,如果是目標狀態,直接返回再進行更新。
- 針對並發的場景,我們需要給業務消費程序添加分布式鎖,避免並發執行導致觸發業務重復處理。
4.4 死信隊列
如果消息隊列中的消息沒有被正常消費掉,那么該消息就會成為一個死信(Dead Letter)
,這條消息可以被重新發送到另一個交換機上,后面這個交換機就是死信交換機(DLX)
,死信交換機綁定的隊列就是死信隊列
。在以下情況下導致的消息未被正常消費,均會使消息變為死信:
- 消費者使用
basic.reject
或者basic.nack
來拒絕消息,同時設置requeue
參數為false
,表示消息不需要重回隊列; - 消息設置了TTL,並且過期了,或者隊列設置了消息的過期時間
x-message-ttl
; - 由於消息隊列超過了長度限制導致消息被丟棄了。
死信隊列也是一個正常的交換機,它可以是任何常見的交換機類型,與常規交換機聲明沒有區別。
DLX可以有客戶端使用隊列參數(arguments)進行定義,或者在服務器中使用策略(policy)進行定義,在policy和arguments都定義了的情況下,arguments中指定的那個會否決policy中指定的那個。
通過policy啟用死信隊列:
rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues
通過arguments啟用死信隊列:
// 聲明一個交換機,作為死信交換機 channel.exchangeDeclare("some.exchange.name", "direct"); Map<String, Object> args = new HashMap<String, Object>(); args.put("x-dead-letter-exchange", "some.exchange.name"); channel.queueDeclare("myqueue", false, false, false, args);
5. 持久化消息就意味着消息的可靠性嗎?如何實現可靠性投遞?
消息可靠性需要考慮生產端投遞消息的可靠性以及保證消費端最終成功地消費消息。
雖然通過生產端的ACK機制,可以確保消息成功的投遞到了RabbitMQ中,保證投遞的消息不丟失。但是如果生產端不知道消費者究竟有沒有成功的消費了消息,那也就無法實現可靠性投遞了。
而生產端投遞消息的過程中,通常會涉及到生產端的事務提交,要保證消息跟隨事務提交而發送,也是需要考慮的問題。
如何實現可靠投遞呢?這里留給大家思考,關鍵設計要點:
- 是否要發消息跟隨生產端
事務
一起保存到發送日志表
,提交事務之后立刻向消息隊列投遞一次消息;- 生產端發送日志表
消息狀態
:1 發送中,2 Broker簽收成功,3 Broker簽收失敗,4 消費端簽收成功,5 消費端簽收失敗
- 生產端發送日志表
- 使用消息隊列模擬
RPC調用
,在消費者成功處理消息之后,向生產者投遞成功消費的消息,以便讓生產端知道消息已經處理成功了; 定時任務
定時掃描生產端發送日志表,對於超過固定時間之內,還未處理成功的消息,進行重試投遞,重試可以使用指數退避策略
,並設置投遞上限次數。如果達到上限
次數還未成功,則預警人工介入
排查;- 消費端一定要做好
冪等
處理,避免重復消費導致業務異常。
提示的還不夠具體?再上一張圖:
6. RabbitMQ更多使用場景
通過給消息設置TTL,超時時候放入死信隊列進行處理,可以實現延遲隊列,當然,RabbitMQ也有專門的延遲隊列插件可以使用;
另外,也可以使用RabbitMQ模擬RPC調用,參考上一節實現消息可靠性投遞的例子。