前文中也多次提到消息傳輸的一些概念,這一篇比較全面的介紹一下,然后補充一些內容。
消息的應答
RabbitMQ有兩種應答模式,自動和手動。這也是AMQP協議所推薦的。這在point-to-point和broadcast都是一樣的。
自動應答-當RabbitMQ把消息發送到接收端,接收端把消息出隊列的時候就自動幫你發應答消息給服務。
手動應答-需要我們開發人員手動去調用ack方法去告訴服務已經收到。
文檔推薦在大數據傳輸中,如果對個別消息的丟失不是很敏感的話選用自動應答比較理想,而對於那些一個消息都不能丟的場景,需要選用手動應答,也就是說在正確處理完以后才應答。如果選擇了自動應答,那么消息重發這個功能就沒有了。
消息的拒收
拒收,是接收端在收到消息的時候響應給RabbitMQ服務的一種命令,告訴服務器不應該由我處理,或者拒絕處理,扔掉。接收端在發送reject命令的時候可以選擇是否要重新放回queue中。如果沒有其他接收者監控這個queue的話,要注意一直無限循環發送的危險。
BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
channel.BasicReject(ea.DeliveryTag, false);
BasicReject方法第一個參數是消息的DeliveryTag,對於每個Channel來說,每個消息都會有一個DeliveryTag,一般用接收消息的順序來表示:1,2,3,4 等等。第二個參數是是否放回queue中,requeue。
BasicReject一次只能拒絕接收一個消息,而BasicNack方法可以支持一次0個或多個消息的拒收,並且也可以設置是否requeue。
channel.BasicNack(3, true, false);
在第一個參數DeliveryTag中如果輸入3,則消息DeliveryTag小於等於3的,這個Channel的,都會被拒收。
消息的QoS
QoS = quality-of-service, 顧名思義,服務的質量。通常我們設計系統的時候不能完全排除故障或保證說沒有故障,而應該設計有完善的異常處理機制。在出現錯誤的時候知道在哪里出現什么樣子的錯誤,原因是什么,怎么去恢復或者處理才是真正應該去做的。在接收消息出現故障的時候我們可以通過RabbitMQ重發機制來處理。重發就有重發次數的限制,有些時候你不可能不限次數的重發,這取決於消息的大小,重要程度和處理方式。
甚至QoS是在接收端設置的。發送端沒有任何變化,接收端的代碼也比較簡單,只需要加如下代碼:
channel.BasicQos(0, 1, false);
代碼第一個參數是可接收消息的大小的,但是似乎在客戶端2.8.6版本中它必須為0,即使:不受限制。如果不輸0,程序會在運行到這一行的時候報錯,說還沒有實現不為0的情況。第二個參數是處理消息最大的數量。舉個例子,如果輸入1,那如果接收一個消息,但是沒有應答,則客戶端不會收到下一個消息,消息只會在隊列中阻塞。如果輸入3,那么可以最多有3個消息不應答,如果到達了3個,則發送端發給這個接收方得消息只會在隊列中,而接收方不會有接收到消息的事件產生。總結說,就是在下一次發送應答消息前,客戶端可以收到的消息最大數量。第三個參數則設置了是不是針對整個Connection的,因為一個Connection可以有多個Channel,如果是false則說明只是針對於這個Channel的。
這種數量的設置,也為我們在多個客戶端監控同一個queue的這種負載均衡環境下提供了更多的選擇。