一 消息發布時的權衡
失敗確認
在生產者通過channel的basicPublish方法發布消息時,通常有幾個參數需要設置,為此我們有必要了解清楚這些參數代表的具體含義及其作用,查看Channel接口,會發現存在3個重載的basicPublish方法
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
當mandatory標志位設置為true時,如果exchange根據自身類型和消息routingKey無法找到一個合適的queue存儲消息,那么broker會調用basic.return方法將消息返還給生產者;當mandatory設置為false時,出現上述情況broker會直接將消息丟棄;通俗的講,mandatory標志告訴broker代理服務器至少將消息route到一個隊列中,否則就將消息return給發送者;
可以這樣認為,開啟mandatory是開啟故障檢測模式。
注意:它只會讓RabbitMQ向你通知失敗,而不會通知成功。如果消息正確路由到隊列,則發布者不會受到任何通知。帶來的問題是無法確保發布消息一定是成功的,因為通知失敗的消息可能會丟失。

channel.addConfirmListener則用來監聽RabbitMQ發回的失敗信息。
package cn.enjoyedu.mandatory; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** *類說明: */ public class ProducerMandatory { //交換器名字 public final static String EXCHANGE_NAME = "mandatory_test"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { /** * 創建連接連接到RabbitMQ */ ConnectionFactory factory = new ConnectionFactory(); // 設置MabbitMQ所在主機ip或者主機名 factory.setHost("127.0.0.1"); // 創建一個連接 Connection connection = factory.newConnection(); // 創建一個信道 Channel channel = connection.createChannel(); // 指定轉發 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //失敗消息通知 channel.addReturnListener(new ReturnListener() { public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("返回的replyText :"+replyText); System.out.println("返回的exchange :"+exchange); System.out.println("返回的routingKey :"+routingKey); System.out.println("返回的message :"+message); } }); String[] severities={"error","info","warning"}; for(int i=0;i<3;i++){ String severity = severities[i%3]; // 發送的消息 String message = "Hello World_"+(i+1) +("_"+System.currentTimeMillis()); //向MQ發送消息 mandatory=true channel.basicPublish(EXCHANGE_NAME,severity,true, null,message.getBytes()); System.out.println("----------------------------------"); System.out.println(" Sent Message: [" + severity +"]:'" + message + "'"); Thread.sleep(200); } // 關閉頻道和連接 channel.close(); connection.close(); } }
監聽器的小甜點
在信道關閉和連接關閉時,還有兩個監聽器可以使用
//連接關閉時執行 connection.addShutdownListener(new ShutdownListener() { public void shutdownCompleted(ShutdownSignalException cause) { System.out.println(cause.getMessage()); } }); //信道關閉時執行 channel.addShutdownListener(new ShutdownListener() { public void shutdownCompleted(ShutdownSignalException cause) { System.out.println(cause.getMessage()); } });
事務
事務的實現主要是對信道(Channel)的設置,主要的方法有三個:
- channel.txSelect()聲明啟動事務模式;
- channel.txComment()提交事務;
- channel.txRollback()回滾事務;
在發送消息之前,需要聲明channel為事務模式,提交或者回滾事務即可。
開啟事務后,客戶端和RabbitMQ之間的通訊交互流程:
- · 客戶端發送給服務器Tx.Select(開啟事務模式)
- · 服務器端返回Tx.Select-Ok(開啟事務模式ok)
- · 推送消息
- · 客戶端發送給事務提交Tx.Commit
- · 服務器端返回Tx.Commit-Ok
以上就完成了事務的交互流程,如果其中任意一個環節出現問題,就會拋出IoException移除,這樣用戶就可以攔截異常進行事務回滾,或決定要不要重復消息。
那么,既然已經有事務了,為何還要使用發送方確認模式呢,原因是因為事務的性能是非常差的。根據相關資料,事務會降低2~10倍的性能。
發送方確認模式
基於事務的性能問題,RabbitMQ團隊為我們拿出了更好的方案,即采用發送方確認模式,該模式比事務更輕量,性能影響幾乎可以忽略不計。
原理:生產者將信道設置成confirm模式,一旦信道進入confirm模式,所有在該信道上面發布的消息都將會被指派一個唯一的ID(從1開始),由這個id在生產者和RabbitMQ之間進行消息的確認。
不可路由的消息,當交換器發現,消息不能路由到任何隊列,會進行確認操作,表示收到了消息。如果發送方設置了mandatory模式,則會先調用addReturnListener監聽器。

可路由的消息,要等到消息被投遞到所有匹配的隊列之后,broker會發送一個確認給生產者(包含消息的唯一ID),這就使得生產者知道消息已經正確到達目的隊列了,如果消息和隊列是可持久化的,那么確認消息會在將消息寫入磁盤之后發出,broker回傳給生產者的確認消息中delivery-tag域包含了確認消息的序列號。

confirm模式最大的好處在於他可以是異步的,一旦發布一條消息,生產者應用程序就可以在等信道返回確認的同時繼續發送下一條消息,當消息最終得到確認之后,生產者應用便可以通過回調方法來處理該確認消息,如果RabbitMQ因為自身內部錯誤導致消息丟失,就會發送一條nack消息,生產者應用程序同樣可以在回調方法中處理該nack消息決定下一步的處理。
Confirm的三種實現方式:
方式一:channel.waitForConfirms()普通發送方確認模式;消息到達交換器,就會返回true。
方式二:channel.waitForConfirmsOrDie()批量確認模式;使用同步方式等所有的消息發送之后才會執行后面代碼,只要有一個消息未到達交換器就會拋出IOException異常。
方式三:channel.addConfirmListener()異步監聽發送方確認模式;
如何使用,參見代碼no-spring模塊包cn.enjoyedu. producerconfirm中。
備用交換器
在第一次聲明交換器時被指定,用來提供一種預先存在的交換器,如果主交換器無法路由消息,那么消息將被路由到這個新的備用交換器。
如果發布消息時同時設置了mandatory會發生什么?如果主交換器無法路由消息,RabbitMQ並不會通知發布者,因為,向備用交換器發送消息,表示消息已經被路由了。注意,新的備用交換器就是普通的交換器,沒有任何特殊的地方。
使用備用交換器,向往常一樣,聲明Queue和備用交換器,把Queue綁定到備用交換器上。然后在聲明主交換器時,通過交換器的參數,alternate-exchange,,將備用交換器設置給主交換器。
建議備用交換器設置為faout類型,Queue綁定時的路由鍵設置為“#”
package cn.enjoyedu.backupexchange; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; /** *類說明:存放到延遲隊列的元素,對業務數據進行了包裝 */ public class BackupExProducer { public final static String EXCHANGE_NAME = "main-exchange"; public final static String BAK_EXCHANGE_NAME = "ae"; public static void main(String[] args) throws IOException, TimeoutException { /** * 創建連接連接到RabbitMQ */ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 創建一個連接 Connection connection = factory.newConnection(); // 創建一個信道 Channel channel = connection.createChannel(); // 聲明備用交換器 Map<String,Object> argsMap = new HashMap<String,Object>(); argsMap.put("alternate-exchange",BAK_EXCHANGE_NAME); //主交換器 channel.exchangeDeclare(EXCHANGE_NAME,"direct", false,false,argsMap); //備用交換器 channel.exchangeDeclare(BAK_EXCHANGE_NAME,BuiltinExchangeType.FANOUT, true,false,null); //所有日志嚴重性級別 String[] severities={"error","info","warning"}; for(int i=0;i<3;i++){ //每一次發送一條不同嚴重性的日志 String severity = severities[i%3]; // 發送的消息 String message = "Hello World_"+(i+1); //參數1:exchange name //參數2:routing key channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent '" + severity +"':'" + message + "'"); } // 關閉頻道和連接 channel.close(); connection.close(); } }
二 消息的消費
消息的獲得方式
拉取Get
屬於一種輪詢模型,發送一次get請求,獲得一個消息。如果此時RabbitMQ中沒有消息,會獲得一個表示空的回復。總的來說,這種方式性能比較差,很明顯,每獲得一條消息,都要和RabbitMQ進行網絡通信發出請求。而且對RabbitMQ來說,RabbitMQ無法進行任何優化,因為它永遠不知道應用程序何時會發出請求。具體使用,參見代碼no-spring模塊包cn.enjoyedu.GetMessage中。對我們實現者來說,要在一個循環里,不斷去服務器get消息。
推送Consume
屬於一種推送模型。注冊一個消費者后,RabbitMQ會在消息可用時,自動將消息進行推送給消費者。這種模式我們已經使用過很多次了,
消息的應答
前面說過,消費者收到的每一條消息都必須進行確認。消息確認后,RabbitMQ才會從隊列刪除這條消息,RabbitMQ不會為未確認的消息設置超時時間,它判斷此消息是否需要重新投遞給消費者的唯一依據是消費該消息的消費者連接是否已經斷開。這么設計的原因是RabbitMQ允許消費者消費一條消息的時間可以很久很久。
自動確認
消費者在聲明隊列時,可以指定autoAck參數,當autoAck=true時,一旦消費者接收到了消息,就視為自動確認了消息。如果消費者在處理消息的過程中,出了錯,就沒有什么辦法重新處理這條消息,所以我們很多時候,需要在消息處理成功后,再確認消息,這就需要手動確認。
自行手動確認
當autoAck=false時,RabbitMQ會等待消費者顯式發回ack信號后才從內存(和磁盤,如果是持久化消息的話)中移去消息。否則,RabbitMQ會在隊列中消息被消費后立即刪除它。
采用消息確認機制后,只要令autoAck=false,消費者就有足夠的時間處理消息(任務),不用擔心處理消息過程中消費者進程掛掉后消息丟失的問題,因為RabbitMQ會一直持有消息直到消費者顯式調用basicAck為止。
當autoAck=false時,對於RabbitMQ服務器端而言,隊列中的消息分成了兩部分:一部分是等待投遞給消費者的消息;一部分是已經投遞給消費者,但是還沒有收到消費者ack信號的消息。如果服務器端一直沒有收到消費者的ack信號,並且消費此消息的消費者已經斷開連接,則服務器端會安排該消息重新進入隊列,等待投遞給下一個消費者(也可能還是原來的那個消費者)。
如何使用,參見代碼no-spring模塊包cn.enjoyedu. ackfalse中。
通過運行程序,啟動兩個消費者A、B,都可以收到消息,但是其中有一個消費者A不會對消息進行確認,當把這個消費者A關閉后,消費者B又會收到本來發送給消費者A的消息。所以我們一般使用手動確認的方法是,將消息的處理放在try/catch語句塊中,成功處理了,就給RabbitMQ一個確認應答,如果處理異常了,就在catch中,進行消息的拒絕,如何拒絕,參考《消息的拒絕》章節。
QoS預取模式
在確認消息被接收之前,消費者可以預先要求接收一定數量的消息,在處理完一定數量的消息后,批量進行確認。如果消費者應用程序在確認消息之前崩潰,則所有未確認的消息將被重新發送給其他消費者。所以這里存在着一定程度上的可靠性風險。
這種機制一方面可以實現限速(將消息暫存到RabbitMQ內存中)的作用,一方面可以保證消息確認質量(比如確認了但是處理有異常的情況)。
注意:消費確認模式必須是非自動ACK機制(這個是使用baseQos的前提條件,否則會Qos不生效),然后設置basicQos的值;另外,還可以基於consume和channel的粒度進行設置(global)。
具體使用,參見代碼no-spring模塊包cn.enjoyedu. qos中。我們可以進行批量確認,也可以進行單條確認。
basicQos方法參數詳細解釋:
prefetchSize:最多傳輸的內容的大小的限制,0為不限制,但據說prefetchSize參數,rabbitmq沒有實現。
prefetchCount:會告訴RabbitMQ不要同時給一個消費者推送多於N個消息,即一旦有N個消息還沒有ack,則該consumer將block掉,直到有消息ack
global:true\false 是否將上面設置應用於channel,簡單點說,就是上面限制是channel級別的還是consumer級別。
如果同時設置channel和消費者,會怎么樣?AMQP規范沒有解釋如果使用不同的全局值多次調用basic.qos會發生什么。 RabbitMQ將此解釋為意味着兩個預取限制應該彼此獨立地強制執行; 消費者只有在未達到未確認消息限制時才會收到新消息。
channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true); // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);
也就是說,整個通道加起來最多允許15條未確認的消息,每個消費者則最多有10條消息。
消費者中的事務
使用方法和生產者一致
假設消費者模式中使用了事務,並且在消息確認之后進行了事務回滾,會是什么樣的結果?
結果分為兩種情況:
- autoAck=false手動應對的時候是支持事務的,也就是說即使你已經手動確認了消息已經收到了,但RabbitMQ對消息的確認會等事務的返回結果,再做最終決定是確認消息還是重新放回隊列,如果你手動確認之后,又回滾了事務,那么以事務回滾為准,此條消息會重新放回隊列;
- autoAck=true如果自動確認為true的情況是不支持事務的,也就是說你即使在收到消息之后在回滾事務也是於事無補的,隊列已經把消息移除了。
可靠性和性能的權衡

消息的拒絕
Reject和Nack
消息確認可以讓RabbitMQ知道消費者已經接受並處理完消息。但是如果消息本身或者消息的處理過程出現問題怎么辦?需要一種機制,通知RabbitMQ,這個消息,我無法處理,請讓別的消費者處理。這里就有兩種機制,Reject和Nack。
Reject在拒絕消息時,可以使用requeue標識,告訴RabbitMQ是否需要重新發送給別的消費者。不重新發送,一般這個消息就會被RabbitMQ丟棄。Reject一次只能拒絕一條消息。
channel.basicReject(envelope.getDeliveryTag(),false);
Nack則可以一次性拒絕多個消息。這是RabbitMQ對AMQP規范的一個擴展。
// 此處false 代表非批量, true代表是否重寫投遞(reQueue)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
死信交換器DLX
RabbitMQ對AMQP規范的一個擴展。被投遞消息被拒絕后的一個可選行為,往往用在對問題消息的診斷上。
消息變成死信一般是以下幾種情況:
- · 消息被拒絕,並且設置 requeue 參數為 false
- · 消息過期
- · 隊列達到最大長度
死信交換器仍然只是一個普通的交換器,創建時並沒有特別要求和操作。在創建隊列的時候,聲明該交換器將用作保存被拒絕的消息即可,相關的參數是x-dead-letter-exchange。
消息生產者 生產者產生了3條消息,分別是error,info,warn,
package cn.enjoyedu.dlx; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** *@author Mark老師 享學課堂 https://enjoy.ke.qq.com *往期視頻咨詢芊芊老師 QQ:2130753077 VIP課程咨詢 依娜老師 QQ:2470523467 *類說明: */ public class DlxProducer { public final static String EXCHANGE_NAME = "dlx_make"; public static void main(String[] args) throws IOException, TimeoutException { /** * 創建連接連接到MabbitMQ */ ConnectionFactory factory = new ConnectionFactory(); // 設置MabbitMQ所在主機ip或者主機名 factory.setHost("127.0.0.1"); // 創建一個連接 Connection connection = factory.newConnection(); // 創建一個信道 Channel channel = connection.createChannel(); // 指定轉發 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); /*日志消息級別,作為路由鍵使用*/ String[] serverities = {"error","info","warning"}; for(int i=0;i<3;i++){ String severity = serverities[i%3]; String msg = "Hellol,RabbitMq"+(i+1); /*發布消息,需要參數:交換器,路由鍵,其中以日志消息級別為路由鍵*/ channel.basicPublish(EXCHANGE_NAME,severity,null, msg.getBytes()); System.out.println("Sent "+severity+":"+msg); } // 關閉頻道和連接 channel.close(); connection.close(); } }
WillMakeWarnDlxConsumer
package cn.enjoyedu.dlx; import com.rabbitmq.client.*; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; /** *類說明:普通的消費者,但是自己無法消費的消息,通過路由鍵將投入死信隊列 */ public class WillMakeWarnDlxConsumer { public static void main(String[] argv) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打開連接和創建頻道,與發送端一樣 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(DlxProducer.EXCHANGE_NAME, BuiltinExchangeType.TOPIC); /*聲明一個隊列,並綁定死信交換器*/ String queueName = "dlx_warn_make"; Map<String, Object> arguments = new HashMap<String, Object>(); //死信交換器 arguments.put("x-dead-letter-exchange", DlxProcessWarnConsumer.DLX_EXCHANGE_NAME); //死信路由鍵,會替換消息原來的路由鍵 arguments.put("x-dead-letter-routing-key", DlxProcessWarnConsumer.DLX_ROUTE_KEY); channel.queueDeclare(queueName,false,false, false,arguments); /*綁定,將隊列和交換器通過路由鍵進行綁定*/ channel.queueBind(queueName, DlxProducer.EXCHANGE_NAME,"#"); System.out.println("waiting for message........"); /*聲明了一個消費者*/ final Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); if(envelope.getRoutingKey().equals("error")){ System.out.println("Received[" +envelope.getRoutingKey() +"]"+message); channel.basicAck(envelope.getDeliveryTag(), false); }else{ System.out.println("Will reject[" +envelope.getRoutingKey() +"]"+message); channel.basicReject(envelope.getDeliveryTag(), false); } } }; /*消費者正式開始在指定隊列上消費消息*/ channel.basicConsume(queueName,false,consumer); } }
WillMakeDlxConsumer
package cn.enjoyedu.dlx; import com.rabbitmq.client.*; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; /** *類說明:普通的消費者,但是自己無法消費的消息,將投入死信隊列 */ public class WillMakeDlxConsumer { public static void main(String[] argv) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打開連接和創建頻道,與發送端一樣 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(DlxProducer.EXCHANGE_NAME, BuiltinExchangeType.TOPIC); /*聲明一個隊列,並綁定死信交換器*/ String queueName = "dlx_make"; Map<String,Object> args = new HashMap<String,Object>(); args.put("x-dead-letter-exchange", DlxProcessConsumer.DLX_EXCHANGE_NAME); channel.queueDeclare(queueName,false,true, false, args); /*綁定,將隊列和交換器通過路由鍵進行綁定*/ channel.queueBind(queueName, DlxProducer.EXCHANGE_NAME,"#"); System.out.println("waiting for message........"); /*聲明了一個消費者*/ final Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); if(envelope.getRoutingKey().equals("error")){ System.out.println("Received[" +envelope.getRoutingKey() +"]"+message); channel.basicAck(envelope.getDeliveryTag(), false); }else{ System.out.println("Will reject[" +envelope.getRoutingKey() +"]"+message); channel.basicReject(envelope.getDeliveryTag(), false); } } }; /*消費者正式開始在指定隊列上消費消息*/ channel.basicConsume(queueName,false,consumer); } }
兩個消費者WillMakeDlxConsumer和WillMakeWarnDlxConsumer都拒絕了兩條消息,而投送到死信隊列后,可以發現根據投送死信時的路由鍵,
不同的消費者有些可以接受到消息,有些則不行
和備用交換器的區別
1、備用交換器是主交換器無法路由消息,那么消息將被路由到這個新的備用交換器,而死信交換器則是接收過期或者被拒絕的消息。
2、備用交換器是在聲明主交換器時發生聯系,而死信交換器則聲明隊列時發生聯系。
自動刪除隊列
自動刪除隊列和普通隊列在使用上沒有什么區別,唯一的區別是,當消費者斷開連接時,隊列將會被刪除。自動刪除隊列允許的消費者沒有限制,也就是說當這個隊列上最后一個消費者斷開連接才會執行刪除。
自動刪除隊列只需要在聲明隊列時,設置屬性auto-delete標識為true即可。系統聲明的隨機隊列,缺省就是自動刪除的。

單消費者隊列
普通隊列允許的消費者沒有限制,多個消費者綁定到多個隊列時,RabbitMQ會采用輪詢進行投遞。如果需要消費者獨占隊列,在隊列創建的時候,設定屬性exclusive為true。

自動過期隊列
指隊列在超過一定時間沒使用,會被從RabbitMQ中被刪除。什么是沒使用?
一定時間內沒有Get操作發生
沒有Consumer連接在隊列上
特別的:就算一直有消息進入隊列,也不算隊列在被使用。
通過聲明隊列時,設定x-expires參數即可,單位毫秒。

永久隊列
隊列的持久性
持久化隊列和非持久化隊列的區別是,持久化隊列會被保存在磁盤中,固定並持久的存儲,當Rabbit服務重啟后,該隊列會保持原來的狀態在RabbitMQ中被管理,而非持久化隊列不會被保存在磁盤中,Rabbit服務重啟后隊列就會消失。
非持久化比持久化的優勢就是,由於非持久化不需要保存在磁盤中,所以使用速度就比持久化隊列快。即是非持久化的性能要高於持久化。而持久化的優點就是會一直存在,不會隨服務的重啟或服務器的宕機而消失。
在聲明隊列時,將屬性durable設置為“false”,則該隊列為非持久化隊列,設置成“true”時,該隊列就為持久化隊列

隊列級別消息過期
就是為每個隊列設置消息的超時時間。只要給隊列設置x-message-ttl 參數,就設定了該隊列所有消息的存活時間,時間單位是毫秒。如果聲明隊列時指定了死信交換器,則過期消息會成為死信消息。

隊列保留參數列表

消息的屬性

在發送消息時,我們還可以對消息的屬性做更細微的控制,比如構建Request-Response模式
package cn.enjoyedu.setmsg; import com.rabbitmq.client.*; import java.io.IOException; import java.util.UUID; import java.util.concurrent.TimeoutException; /** *類說明:消息的屬性的控制 消息發送者 */ public class ReplyToProducer { public final static String EXCHANGE_NAME = "replyto"; public static void main(String[] args) throws IOException, TimeoutException { /* 創建連接,連接到RabbitMQ*/ ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); Connection connection = connectionFactory.newConnection(); /*創建信道*/ Channel channel = connection.createChannel(); /*創建持久化交換器*/ channel.exchangeDeclare(EXCHANGE_NAME,"direct",false); //響應QueueName ,消費者將會把要返回的信息發送到該Queue String responseQueue = channel.queueDeclare().getQueue(); //消息的唯一id String msgId = UUID.randomUUID().toString(); AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .replyTo(responseQueue) .messageId(msgId) .build(); /*聲明了一個消費者*/ final Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Received["+envelope.getRoutingKey() +"]"+message); } }; /*消費者正式開始在指定隊列上消費消息*/ channel.basicConsume(responseQueue,true,consumer); String msg = "Hellol,RabbitMq"; channel.basicPublish(EXCHANGE_NAME,"error", properties, msg.getBytes()); System.out.println("Sent error:"+msg); } }
消費者
package cn.enjoyedu.setmsg; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** *類說明:消息的屬性的控制 */ public class ReplyToConsumer { public static void main(String[] argv) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打開連接和創建頻道,與發送端一樣 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); /*創建交換器*/ channel.exchangeDeclare(ReplyToProducer.EXCHANGE_NAME, "direct",false); /*聲明一個隊列*/ String queueName = "replyto"; channel.queueDeclare(queueName,false,false, false,null); /*綁定,將隊列和交換器通過路由鍵進行綁定*/ String routekey = "error";/*表示只關注error級別的日志消息*/ channel.queueBind(queueName,ReplyToProducer.EXCHANGE_NAME,routekey); System.out.println("waiting for message........"); /*聲明了一個消費者*/ final Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Received["+envelope.getRoutingKey() +"]"+message); AMQP.BasicProperties respProp = new AMQP.BasicProperties.Builder() .replyTo(properties.getReplyTo()) .correlationId(properties.getMessageId()) .build(); //應答 channel.basicPublish("", respProp.getReplyTo() , respProp , ("Hi,"+message).getBytes("UTF-8")); } }; /*消費者正式開始在指定隊列上消費消息*/ channel.basicConsume(queueName,true,consumer); } }
甚至可以消息超時時間和死信交換器配合使用,實現延時隊列功能
消息存活時間
當隊列消息的TTL(過期時間) 和消息TTL都被設置,時間短的TTL設置生效。如果將一個過期消息發送給RabbitMQ,該消息不會路由到任何隊列,而是直接丟棄。
為消息設置TTL有一個問題:RabbitMQ只對處於隊頭的消息判斷是否過期(即不會掃描隊列),所以,很可能隊列中已存在死消息,但是隊列並不知情。這會影響隊列統計數據的正確性,妨礙隊列及時釋放資源。
消息的持久化
默認情況下,隊列和交換器在服務器重啟后都會消失,消息當然也是。將隊列和交換器的durable屬性設為true,缺省為false,但是消息要持久化還不夠,還需要將消息在發布前,將投遞模式設置為2(MessageProperties.PERSISTENT_TEXT_PLAIN)。消息要持久化,必須要有持久化的隊列、交換器。
package cn.enjoyedu.msgdurable; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import java.io.IOException; import java.util.concurrent.TimeoutException; /** *類說明:消息的持久化生產者 */ public class MsgAttrProducer { public final static String EXCHANGE_NAME = "msg_durable"; public static void main(String[] args) throws IOException, TimeoutException { /* 創建連接,連接到RabbitMQ*/ ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); Connection connection = connectionFactory.newConnection(); /*創建信道*/ Channel channel = connection.createChannel(); /*創建持久化交換器*/ channel.exchangeDeclare(EXCHANGE_NAME,"direct",true); /*日志消息級別,作為路由鍵使用*/ String[] serverities = {"error","info","warning"}; for(int i=0;i<3;i++){ String severity = serverities[i%3]; String msg = "Hellol,RabbitMq"+(i+1); /*發布持久化消息*/ channel.basicPublish(EXCHANGE_NAME,severity, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); } channel.close(); connection.close(); } }
消費者
package cn.enjoyedu.msgdurable; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** *類說明:消息持久化的消費者 */ public class MsgAttrConsumer { public static void main(String[] argv) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打開連接和創建頻道,與發送端一樣 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); /*創建持久化交換器*/ channel.exchangeDeclare(MsgAttrProducer.EXCHANGE_NAME, "direct",true); /*聲明一個持久化隊列*/ String queueName = "msgdurable"; channel.queueDeclare(queueName,true,false, false,null); /*綁定,將隊列和交換器通過路由鍵進行綁定*/ String routekey = "error";/*表示只關注error級別的日志消息*/ channel.queueBind(queueName,MsgAttrProducer.EXCHANGE_NAME,routekey); System.out.println("waiting for message........"); /*聲明了一個消費者*/ final Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Received["+envelope.getRoutingKey() +"]"+message); } }; /*消費者正式開始在指定隊列上消費消息*/ channel.basicConsume(queueName,true,consumer); } }
