RabbitMQ如何解決各種情況下丟數據的問題


1.生產者丟數據

生產者的消息沒有投遞到MQ中怎么辦?從生產者弄丟數據這個角度來看,RabbitMQ提供transaction和confirm模式來確保生產者不丟消息。
transaction機制就是說,發送消息前,開啟事物(channel.txSelect()),然后發送消息,如果發送過程中出現什么異常,事物就會回滾(channel.txRollback()),如果發送成功則提交事
物(channel.txCommit())。

然而缺點就是吞吐量下降了。因此,按照博主的經驗,生產上用confirm模式的居多。一旦channel進入confirm模式,所有在該信道上面發布的消息都將會被指派一個唯一的ID(從1開始),一旦
消息被投遞到所有匹配的隊列之后,rabbitMQ就會發送一個Ack給生產者(包含消息的唯一ID),這就使得生產者知道消息已經正確到達目的隊列了.如果rabiitMQ沒能處理該消息,則會發送一個N
ack消息給你,你可以進行重試操作。

下面演示一下confirm模式:

//測試確認后回調
@Service
public class HelloSender1 implements RabbitTemplate.ConfirmCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    public void send() {
        String context = "你好現在是 " + new Date() +"";
        System.out.println("HelloSender發送內容 : " + context);
        this.rabbitTemplate.setConfirmCallback(this);
        //exchange,queue 都正確,confirm被回調, ack=true
        //this.rabbitTemplate.convertAndSend("exchange","topic.message", context);

        //exchange 錯誤,queue 正確,confirm被回調, ack=false
        //this.rabbitTemplate.convertAndSend("fasss","topic.message", context);

        //exchange 正確,queue 錯誤 ,confirm被回調, ack=true; return被回調 replyText:NO_ROUTE
        //this.rabbitTemplate.convertAndSend("exchange","", context);

        //exchange 錯誤,queue 錯誤,confirm被回調, ack=false
        this.rabbitTemplate.convertAndSend("fasss","fass", context);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);
    }

}

2.消息隊列丟數據

處理消息隊列丟數據的情況,一般是開啟持久化磁盤的配置。這個持久化配置可以和confirm機制配合使用,你可以在消息持久化磁盤后,再給生產者發送一個Ack信號。這樣,如果消息持久化磁盤
之前,rabbitMQ陣亡了,那么生產者收不到Ack信號,生產者會自動重發。
那么如何持久化呢,這里順便說一下吧,其實也很容易,就下面兩步
①、將queue的持久化標識durable設置為true,則代表是一個持久的隊列
②、發送消息的時候將deliveryMode=2
這樣設置以后,rabbitMQ就算掛了,重啟后也能恢復數據。在消息還沒有持久化到硬盤時,可能服務已經死掉,這種情況可以通過引入mirrored-queue即鏡像隊列,但也不能保證消息百分百不丟
失(整個集群都掛掉)
    /**
     * 第二個參數:queue的持久化是通過durable=true來實現的。
     * 第三個參數:exclusive:排他隊列,如果一個隊列被聲明為排他隊列,該隊列僅對首次申明它的連接可見,並在連接斷開時自動刪除。這里需要注意三點:
   1. 排他隊列是基於連接可見的,同一連接的不同信道是可以同時訪問同一連接創建的排他隊列;
   2.“首次”,如果一個連接已經聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同;
   3.即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除的,這種隊列適用於一個客戶端發送讀取消息的應用場景。 * 第四個參數:自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。 *
@param * @return * @Author zxj */ @Bean public Queue queue() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-message-ttl", 25000);//25秒自動刪除 Queue queue = new Queue("topic.messages", true, false, true, arguments); return queue; }

 

        MessageProperties properties=new MessageProperties();
        properties.setContentType(MessageProperties.DEFAULT_CONTENT_TYPE);
        properties.setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE);//持久化設置
        properties.setExpiration("2018-12-15 23:23:23");//設置到期時間
        Message message=new Message("hello".getBytes(),properties);
        this.rabbitTemplate.sendAndReceive("exchange","topic.message",message);

 

 

3.消費者丟數據

啟用手動確認模式可以解決這個問題
①自動確認模式,消費者掛掉,待ack的消息回歸到隊列中。消費者拋出異常,消息會不斷的被重發,直到處理成功。不會丟失消息,即便服務掛掉,沒有處理完成的消息會重回隊列,但是異常會讓
消息不斷重試。
②手動確認模式
③不確認模式,acknowledge="none" 不使用確認機制,只要消息發送完成會立即在隊列移除,無論客戶端異常還是斷開,只要發送完就移除,不會重發。

 

指定Acknowledge的模式:
spring.rabbitmq.listener.direct.acknowledge-mode=manual,表示該監聽器手動應答消息
針對手動確認模式,有以下特點:
1.使用手動應答消息,有一點需要特別注意,那就是不能忘記應答消息,因為對於RabbitMQ來說處理消息沒有超時,只要不應答消息,他就會認為仍在正常處理消息,導致消息隊列出現阻塞,影響
業務執行。
2.如果消費者來不及處理就死掉時,沒有響應ack時,會項目啟動后會重復發送一條信息給其他消費者; 3.可以選擇丟棄消息,這其實也是一種應答,如下,這樣就不會再次收到這條消息。 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
4.如果消費者設置了手動應答模式,並且設置了重試,出現異常時無論是否捕獲了異常,都是不會重試的
5.如果消費者沒有設置手動應答模式,並且設置了重試,那么在出現異常時沒有捕獲異常會進行重試,如果捕獲了異常不會重試。
 
          

 

重試機制:

spring.rabbitmq.listener.simple.retry.max-attempts=5  最大重試次數
spring.rabbitmq.listener.simple.retry.enabled=true 是否開啟消費者重試(為false時關閉消費者重試,這時消費端代碼異常會一直重復收到消息)
spring.rabbitmq.listener.simple.retry.initial-interval=5000 重試間隔時間(單位毫秒)
spring.rabbitmq.listener.simple.default-requeue-rejected=false 重試次數超過上面的設置之后是否丟棄(false不丟棄時需要寫相應代碼將該消息加入死信隊列)

如果設置了重試模式,那么在出現異常時沒有捕獲異常會進行重試,如果捕獲了異常不會重試。

 

 

當出現異常時,我們需要把這個消息回滾到消息隊列,有兩種方式:

//ack返回false,並重新回到隊列,api里面解釋得很清楚

//ack返回false,並重新回到隊列,api里面解釋得很清楚
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//拒絕消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

經過開發中的實際測試,當消息回滾到消息隊列時,這條消息不會回到隊列尾部,而是仍是在隊列頭部,這時消費者會立馬又接收到這條消息進行處理,接着拋出異常,進行         回滾,如此反復進行。這種情況會導致消息隊列處理出現阻塞,消息堆積,導致正常消息也無法運行。對於消息回滾到消息隊列,我們希望比較理想的方式時出現異常的消息到         達消息隊列尾部,這樣既保證消息不會丟失,又保證了正常業務的進行,因此我們采取的解決方案是,將消息進行應答,這時消息隊列會刪除該消息,同時我們再次發送該消息         到消息隊列,這時就實現了錯誤消息進行消息隊列尾部的方案。

   //手動進行應答
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            //重新發送消息到隊尾
            channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
                    message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
                    JSON.toJSONBytes(new Object()));

如果一個消息體本身有誤,會導致該消息體,一直無法進行處理,而服務器中刷出大量無用日志。解決這個問題可以采取兩種方案:

1.一種是對於日常細致處理,分清哪些是可以恢復的異常,哪些是不可以恢復的異常。對於可以恢復的異常我們采取第三條中的解決方案,對於不可以處理的異常,我們采用記錄日志,直接丟棄該消息方案。

2.另一種是我們對每條消息進行標記,記錄每條消息的處理次數,當一條消息,多次處理仍不能成功時,處理次數到達我們設置的值時,我們就丟棄該消息,但需要記錄詳細的日志。

 

消息監聽內的異常處理有兩種方式:

 1.內部catch后直接處理,然后使用channel對消息進行確認

 2.配置RepublishMessageRecoverer將處理異常的消息發送到指定隊列專門處理或記錄。監聽的方法內拋出異常貌似沒有太大用處。因為拋出異常就算是重試也非常有可能會繼續出現異常,當重試次數完了之后消息就只有重啟應用才能接收到了,很有可能導致消息消費不及時。當然可以配置RepublishMessageRecoverer來解決,但是萬一RepublishMessageRecoverer發送失敗了呢。。那就可能造成消息消費不及時了。所以即使需要將處理出現異常的消息統一放到另外隊列去處理,個人建議兩種方式:

①catch異常后,手動發送到指定隊列,然后使用channel給rabbitmq確認消息已消費
②給Queue綁定死信隊列,使用nack(requque為false)確認消息消費失敗

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM