使用消息隊列,必須要考慮的問題就是生產者消息發送失敗和消費者消息處理失敗,這兩種情況怎么處理.
生產者發送消息,成功,則確認消息發送成功;失敗,則返回消息發送失敗信息,再做處理.
消費者處理消息,成功,則消息隊列自動刪除消息;失敗,則消息重新返回隊列,等待處理.
對於消費者處理失敗的情況,如果僅僅只是讓消息重新返回隊列,等待處理,那么久有可能會出現很多消息一直無法處理的情況;因此,是否讓消息返回隊列,還有待商榷.
現在,我一步步來分析RabbitMQ的消息確認(這次的代碼同樣是在上次的代碼基礎上做修改,修改后的代碼會上傳到百度雲,后面會有鏈接地址):
生產者端,修改spring-config.xml,rabbitTemplate增加消息確認和返回錯誤信息的監聽器:
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" confirm-callback="confirmCallBackListener" return-callback="returnCallBackListener" mandatory="true" />
消息確認監聽器confirmCallBackListener:
@Service("confirmCallBackListener") public class ConfirmCallBackListener implements ConfirmCallback { @Override public void confirm(CorrelationData arg0, boolean arg1) { System.out.println("確認消息完成..."); // 只確認生產者消息發送成功,消費者是否處理成功不做保證 } }
消息發送失敗返回監聽器returnCallBackListener:
@Service("returnCallBackListener") public class ReturnCallBackListener implements ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("消息返回處理中..."); } }
消費者端,修改spring-config.xml,設置ack方式為手動,增加對應隊列的監聽器。
<!-- ========================================RabbitMQ========================================= -->
<!-- 連接工廠 -->
<rabbit:connection-factory id="connectionFactory" host="localhost" publisher-confirms="true" virtual-host="/" username="guest" password="guest" />
<!-- 監聽器 設置acknowledge="manual" 則開啟ack機制 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
<!-- queues是隊列名稱,可填多個,用逗號隔開, method是ref指定的Bean調用Invoke方法執行的方法名稱 -->
<rabbit:listener queues="test" ref="receiveConfirmTestListener" />
</rabbit:listener-container>
<!-- 隊列聲明 -->
<rabbit:queue name="test" durable="true" />
<!-- 測試監聽處理器 -->
<bean id="receiveConfirmTestListener" class="com.aitongyi.customer.ReceiveConfirmTestListener" />
配置的receiveConfirmTestListener沒有指定方法,是因為它實現了接口ChannelAwareMessageListener,代碼如下:
public class ReceiveConfirmTestListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { try { System.out.println("consumer--:" + message.getMessageProperties() + ":" + new String(message.getBody())); // deliveryTag是消息傳送的次數,我這里是為了讓消息隊列的第一個消息到達的時候拋出異常,處理異常讓消息重新回到隊列,然后再次拋出異常,處理異常拒絕讓消息重回隊列 if (message.getMessageProperties().getDeliveryTag() == 1 || message.getMessageProperties().getDeliveryTag() == 2) { throw new Exception(); } channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // false只確認當前一個消息收到,true確認所有consumer獲得的消息 } catch (Exception e) { e.printStackTrace(); if (message.getMessageProperties().getRedelivered()) { System.out.println("消息已重復處理失敗,拒絕再次接收..."); channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒絕消息 } else { System.out.println("消息即將再次返回隊列處理..."); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // requeue為是否重新回到隊列 } } } }
啟動項目,發送消息,查看測試結果:
打開發送消息頁面,點擊發送:
在控制台查看結果(拋出的異常信息我沒有粘貼出來):
2017-05-17 14:56:21 532 [INFO] c.a.p.c.RabbitController - rabbitmq--收到待發送消息: type[test]-msg[hello world test rabbit!] 2017-05-17 14:56:21 819 [INFO] c.a.p.s.RabbitServiceImpl - rabbitmq--發送消息完成: routingKey[test]-msg[hello world test rabbit!] consumer--:MessageProperties [headers={spring_return_correlation=776f57cd-1e22-44de-bc11-eb82bc043562}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=test, deliveryTag=1, messageCount=0]:hello world test rabbit! 消息即將再次返回隊列處理... consumer--:MessageProperties [headers={spring_return_correlation=776f57cd-1e22-44de-bc11-eb82bc043562}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=test, deliveryTag=2, messageCount=0]:hello world test rabbit! 確認消息完成... 消息已重復處理失敗,拒絕再次接收... consumer--:MessageProperties [headers={spring_return_correlation=776f57cd-1e22-44de-bc11-eb82bc043562}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=test, deliveryTag=3, messageCount=0]:hello world test rabbit!
上面代碼的處理方式是消費者接收到消息后處理消息,第一次處理失敗,讓消息重回隊列,如果重回隊列后仍然失敗,則拒絕接收消息。這個做法只是一個參考,具體如何做,要根據業務來做改變。
消息發送失敗情形展示:將發送的消息隊列routingKey,也就是配置的隊列名改成一個在RabbitMQ Server不存在的。然后打開發送消息頁面,點擊發送,結果如下:
2017-05-17 15:12:54 437 [INFO] c.a.p.c.RabbitController - rabbitmq--收到待發送消息: type[test]-msg[hello world test rabbit!] 2017-05-17 15:12:54 439 [INFO] c.a.p.s.RabbitServiceImpl - rabbitmq--發送消息完成: routingKey[test]-msg[hello world test rabbit!] 消息返回處理中... 確認消息完成...
至於為什么還是會打印"確認消息完成...",有興趣的可以看下源碼。
源代碼已上傳至百度雲網盤,歡迎下載閱讀,地址:http://pan.baidu.com/s/1eSL2w8M