channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
如果用空字符串去申明一個exchange,那么系統就會使用"amq.direct"這個exchange。我們在創建一個queue的時候,默認的都會有一個和新建queue同名的routingKey綁定到這個默認的exchange上去
在方法中的第一個參數是需要輸入一個exchange。在RabbitMQ中,所有的消息都必須要通過exchange發送到各個queue里面去。發送者發送消息,其實也就是把消息放到exchange中去。而exchange知道應該把消息放到哪里去。在這個方法中,我們沒有輸入exchange的名稱,只是定義了一個空的echange,而在第二個參數routeKey中輸入了我們目標隊列的名稱。RabbitMQ會幫我定義一個默認的exchange,這個exchange會把消息直接投遞到我們輸入的隊列中,這樣服務端只需要直接去這個定義了的隊列中獲取消息就可以了
信息確認
RabbitMQ有兩種應答模式,自動和手動。這也是AMQP協議所推薦的。這在point-to-point和broadcast都是一樣的。
自動應答-當RabbitMQ把消息發送到接收端,接收端把消息出隊列的時候就自動幫你發應答消息給服務。
手動應答-需要我們開發人員手動去調用ack方法去告訴服務已經收到。
文檔推薦在大數據傳輸中,如果對個別消息的丟失不是很敏感的話選用自動應答比較理想,而對於那些一個消息都不能丟的場景,需要選用手動應答,也就是說在正確處理完以后才應答。如果選擇了自動應答,那么消息重發這個功能就沒有了。
點對點模式
也就是一發一接的模式,不適用發布/訂閱這種廣播模式
//autoAck 設置false,消費端掛掉,信息不會丟失,server會re-queue channel.basicConsume(TASK_QUEUE_NAME, false, consumer); //向服務器發送應答 channel.basicAck(envelope.getDeliveryTag(), false);
在RabbitMQ中,為了不讓消息丟失,它提供了消息應答的概念。當消費者獲取到了一個消息以后,需要給RabbitMQ服務一個應答的消息,告知服務我已經收到或正確處理了該消息。那么RabbitMQ可以放心的在隊列中刪除該消息
隊列持久化
//durable 設置true,queue持久化,server重啟,此queue不丟失
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
方法的第四的參數autoDelete,一般都會輸入false。文檔描述這個參數如果是true的話,意思是:如果這個queue不再使用(沒有被訂閱)的話,server就會刪除它。在我的測試過程中,只要是連接改queue的所有接收者都斷開連接的話,該queue就會被刪除,即使里面還有沒有處理的消息。RabbitMQ的重啟也同樣會刪除他們。如果輸入的是false,那與之相連的客戶端都斷開連接的話,服務是不會刪除這個隊列的,隊列中的消息也就會存在。發送端在沒有客戶端連接的時候也可以把消息放入改隊列,客戶端起來的時候,就會得到這些消息。但是如果RabbitMQ服務重啟的話,該隊列就沒有了,里面的消息自然也就沒有了。
第三個參數是exclusive,文檔描述說,如果是true,那么申明這個queue的connection斷了,那么這個隊列就被刪除了,包括里面的消息。
第二個參數durable,文檔描述說,如果是true,則代表是一個持久的隊列,那么在服務重啟后,也會存在。因為服務會把持久化的queue存放在硬盤上,放服務重啟的時候,會重新申明這個queue。當然必須是在autoDelete和exclusive都為false的時候。隊列是可以被持久化,但是里面的消息是否為持久化那還要看消息的持久化設置。也就是說,如果重啟之前那個queue里面還有沒有發出去的消息的話,重啟之后那隊列里面是不是還存在原來的消息,這個就要取決於發送者在發送消息時對消息的設置了(信息持久化)。
信息持久化
//BasicProperties設置MessageProperties.PERSISTENT_TEXT_PLAIN,信息持久化 channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8")); IBasicProperties properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; channel.BasicPublish("", "TaskQueue", properties, bytes);
DeliveryMode等於2就說明這個消息是persistent的。1是默認是,不是持久的。在接收者接收消息並處理的時候會出現各種各樣的問題:拋出異常導致與RabbitMQ連接斷開,程序掛掉,網絡問題等等。往往在出現這些問題的時候我們通常都希望隊列能保存這些消息,並在程序再次起來的時候能夠重新處理,或如果是負載均衡的模式下,能夠把這個消息重新分配給其他的同等的接受者來處理。這同樣也是RabbitMQ對消息持久化的一種功能。這我們在消息的傳輸控制中做詳細的說明
消息的拒收
拒收,是接收端在收到消息的時候響應給RabbitMQ服務的一種命令,告訴服務器不應該由我處理,或者拒絕處理,扔掉。接收端在發送reject命令的時候可以選擇是否要重新放回queue中。如果沒有其他接收者監控這個queue的話,要注意一直無限循環發送的危險。
BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
channel.BasicReject(ea.DeliveryTag, false);
BasicReject方法第一個參數是消息的DeliveryTag,對於每個Channel來說,每個消息都會有一個DeliveryTag,一般用接收消息的順序來表示:1,2,3,4 等等。第二個參數是是否放回queue中,requeue。
BasicReject一次只能拒絕接收一個消息,而BasicNack方法可以支持一次0個或多個消息的拒收,並且也可以設置是否requeue。
channel.BasicNack(3, true, false);
在第一個參數DeliveryTag中如果輸入3,則消息DeliveryTag小於等於3的,這個Channel的,都會被拒收。
消息的QoS
QoS = quality-of-service, 顧名思義,服務的質量。通常我們設計系統的時候不能完全排除故障或保證說沒有故障,而應該設計有完善的異常處理機制。在出現錯誤的時候知道在哪里出現什么樣子的錯誤,原因是什么,怎么去恢復或者處理才是真正應該去做的。在接收消息出現故障的時候我們可以通過RabbitMQ重發機制來處理。重發就有重發次數的限制,有些時候你不可能不限次數的重發,這取決於消息的大小,重要程度和處理方式。
甚至QoS是在接收端設置的。發送端沒有任何變化,接收端的代碼也比較簡單,只需要加如下代碼:
channel.BasicQos(0, 1, false);
代碼第一個參數是可接收消息的大小的,但是似乎在客戶端2.8.6版本中它必須為0,即使:不受限制。如果不輸0,程序會在運行到這一行的時候報錯,說還沒有實現不為0的情況。第二個參數是處理消息最大的數量。舉個例子,如果輸入1,那如果接收一個消息,但是沒有應答,則客戶端不會收到下一個消息,消息只會在隊列中阻塞。如果輸入3,那么可以最多有3個消息不應答,如果到達了3個,則發送端發給這個接收方得消息只會在隊列中,而接收方不會有接收到消息的事件產生。總結說,就是在下一次發送應答消息前,客戶端可以收到的消息最大數量。第三個參數則設置了是不是針對整個Connection的,因為一個Connection可以有多個Channel,如果是false則說明只是針對於這個Channel的。
這種數量的設置,也為我們在多個客戶端監控同一個queue的這種負載均衡環境下提供了更多的選擇。
//對服務器確認之前,一次只接受一條信息
channel.basicQos(1);
mandatory標志的作用
在生產者通過channel的basicPublish方法發布消息時,通常有幾個參數需要設置,為此我們有必要了解清楚這些參數代表的具體含義及其作用,查看Channel接口,會發現存在3個重載的basicPublish方法
1 void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException; 2 3 void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) 4 throws IOException; 5 6 void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) 7 throws IOException;
他們共有的參數分別是: exchange:交換機名稱 routingKey:路由鍵 props:消息屬性字段,比如消息頭部信息等等 body:消息主體部分
mandatory和immediate是AMQP協議中basic.pulish方法中的兩個標志位,它們都有當消息傳遞過程中不可達目的地時將消息返回給生產者的功能。具體區別在於:
mandatory標志位
當mandatory標志位設置為true時,如果exchange根據自身類型和消息routeKey無法找到一個符合條件的queue,那么會調用basic.return方法將消息返還給生產者;當mandatory設為false時,出現上述情形broker會直接將消息扔掉。
immediate標志位
當immediate標志位設置為true時,如果exchange在將消息route到queue(s)時發現對應的queue上沒有消費者,那么這條消息不會放入隊列中。當與消息routeKey關聯的所有queue(一個或多個)都沒有消費者時,該消息會通過basic.return方法返還給生產者。
概括來說,mandatory標志告訴服務器至少將該消息route到一個隊列中,否則將消息返還給生產者;immediate標志告訴服務器如果該消息關聯的queue上有消費者,則馬上將消息投遞給它,如果所有queue都沒有消費者,直接把消息返還給生產者,不用將消息入隊列等待消費者了。