RabbitMQ學習筆記六:RabbitMQ之消息確認


使用消息隊列,必須要考慮的問題就是生產者消息發送失敗和消費者消息處理失敗,這兩種情況怎么處理.

生產者發送消息,成功,則確認消息發送成功;失敗,則返回消息發送失敗信息,再做處理.

消費者處理消息,成功,則消息隊列自動刪除消息;失敗,則消息重新返回隊列,等待處理.

對於消費者處理失敗的情況,如果僅僅只是讓消息重新返回隊列,等待處理,那么久有可能會出現很多消息一直無法處理的情況;因此,是否讓消息返回隊列,還有待商榷.

現在,我一步步來分析RabbitMQ的消息確認(這次的代碼同樣是在上次的代碼基礎上做修改,修改后的代碼會上傳到百度雲,后面會有鏈接地址):

生產者端,修改spring-config.xml,rabbitTemplate增加消息確認和返回錯誤信息的監聽器:

    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"
        confirm-callback="confirmCallBackListener"  
        return-callback="returnCallBackListener"   
        mandatory="true"
    />

消息確認監聽器confirmCallBackListener:

@Service("confirmCallBackListener")
public class ConfirmCallBackListener implements ConfirmCallback {

    @Override
    public void confirm(CorrelationData arg0, boolean arg1)
    {
        System.out.println("確認消息完成..."); // 只確生產者消息發送成功,消費者是否處理成功不做保證
    }
}

消息發送失敗返回監聽器returnCallBackListener:

@Service("returnCallBackListener")
public class ReturnCallBackListener implements ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey)
    {
        System.out.println("消息返回處理中...");
    }
}

消費者端,修改spring-config.xml,設置ack方式為手動,增加對應隊列的監聽器。

<!-- ========================================RabbitMQ========================================= -->
    <!-- 連接工廠 -->
    <rabbit:connection-factory id="connectionFactory" host="localhost" publisher-confirms="true" virtual-host="/" username="guest" password="guest" />
    <!-- 監聽器 設置acknowledge="manual" 則開啟ack機制 -->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
        <!-- queues是隊列名稱,可填多個,用逗號隔開, method是ref指定的Bean調用Invoke方法執行的方法名稱 -->
        <rabbit:listener queues="test" ref="receiveConfirmTestListener" />
    </rabbit:listener-container>
    <!-- 隊列聲明 -->
    <rabbit:queue name="test" durable="true" />
    <!-- 測試監聽處理器 -->
    <bean id="receiveConfirmTestListener" class="com.aitongyi.customer.ReceiveConfirmTestListener" />

配置的receiveConfirmTestListener沒有指定方法,是因為它實現了接口ChannelAwareMessageListener,代碼如下:

public class ReceiveConfirmTestListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception
    {
        try
        {
            System.out.println("consumer--:" + message.getMessageProperties() + ":" + new String(message.getBody()));

            // deliveryTag是消息傳送的次數,我這里是為了讓消息隊列的第一個消息到達的時候拋出異常,處理異常讓消息重新回到隊列,然后再次拋出異常,處理異常拒絕讓消息重回隊列
            if (message.getMessageProperties().getDeliveryTag() == 1 || message.getMessageProperties().getDeliveryTag() == 2)
            {
                throw new Exception();
            }

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // false只確認當前一個消息收到,true確認所有consumer獲得的消息
        }
        catch (Exception e)
        {
            e.printStackTrace();

            if (message.getMessageProperties().getRedelivered())
            {
                System.out.println("消息已重復處理失敗,拒絕再次接收...");
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒絕消息
            }
            else
            {
                System.out.println("消息即將再次返回隊列處理...");
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // requeue為是否重新回到隊列
            }
        }
    }
}

啟動項目,發送消息,查看測試結果:

打開發送消息頁面,點擊發送:

在控制台查看結果(拋出的異常信息我沒有粘貼出來):

2017-05-17 14:56:21 532 [INFO] c.a.p.c.RabbitController - rabbitmq--收到待發送消息: type[test]-msg[hello world test rabbit!]
2017-05-17 14:56:21 819 [INFO] c.a.p.s.RabbitServiceImpl - rabbitmq--發送消息完成: routingKey[test]-msg[hello world test rabbit!]
consumer--:MessageProperties [headers={spring_return_correlation=776f57cd-1e22-44de-bc11-eb82bc043562}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=test, deliveryTag=1, messageCount=0]:hello world test rabbit!
消息即將再次返回隊列處理...
consumer--:MessageProperties [headers={spring_return_correlation=776f57cd-1e22-44de-bc11-eb82bc043562}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=test, deliveryTag=2, messageCount=0]:hello world test rabbit!
確認消息完成...
消息已重復處理失敗,拒絕再次接收...
consumer--:MessageProperties [headers={spring_return_correlation=776f57cd-1e22-44de-bc11-eb82bc043562}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=test, deliveryTag=3, messageCount=0]:hello world test rabbit!

上面代碼的處理方式是消費者接收到消息后處理消息,第一次處理失敗,讓消息重回隊列,如果重回隊列后仍然失敗,則拒絕接收消息。這個做法只是一個參考,具體如何做,要根據業務來做改變。

消息發送失敗情形展示:將發送的消息隊列routingKey,也就是配置的隊列名改成一個在RabbitMQ Server不存在的。然后打開發送消息頁面,點擊發送,結果如下:

2017-05-17 15:12:54 437 [INFO] c.a.p.c.RabbitController - rabbitmq--收到待發送消息: type[test]-msg[hello world test rabbit!]
2017-05-17 15:12:54 439 [INFO] c.a.p.s.RabbitServiceImpl - rabbitmq--發送消息完成: routingKey[test]-msg[hello world test rabbit!]
消息返回處理中...
確認消息完成...

至於為什么還是會打印"確認消息完成...",有興趣的可以看下源碼。

源代碼已上傳至百度雲網盤,歡迎下載閱讀,地址:http://pan.baidu.com/s/1eSL2w8M

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM