- 使用消息隊列處理消息的時候,我們可能會遇到以下問題:
- 消息處理失敗
- 消息體本身有誤
- 消息重復處理
- 消息丟失
- 對於消息處理失敗,有可能有由於網絡波動導致的數據處理異常,待網絡穩定時消息就會正常處理,對於這種處理失敗,我們應該繼續嘗試去處理消息。
- 消息體本身有誤,這會導致消息連續處理失敗,占用較多的資源,寫大量的無用日志,這種錯誤應該丟棄這部分無用消息,但要記錄下日志,記清消息體本身數據,以及丟棄消息的原因。
- 消息重復處理,例如我們通過消息隊列向數據庫中添加數據,由於數據庫網絡波動,導致數據庫連接超時,而我們的系統認為消息處理失敗,就會把消息回滾到消息隊列,繼續嘗試處理,這時就會造成消息重復處理的現象,對於重要的消息,我們可以每處理一條消息,就記錄一下,處理新的消息時,進行判斷消息是否已經處理,如果已經處理,就丟棄消息。
- 由於Spring 與RabbitMq集成 對消息的處理方式是默認自動應答,也就是處理消息時無論是否出現異常,都會給消息隊列應答處理成功,消息隊列刪除消息,這時就會出現消息丟失的情況,為了解決這個問題,我們需要使用手動應答的方式處理消息。
1.rabbitMQ消費者監聽器的配置
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
<rabbit:listener queues="queueName" ref="Listiner"/>
</rabbit:listener-container>
acknowledge="manual" 就表示該監聽器手動應答消息
2.消費者監聽器的編寫
1.為了能夠手動應答消息,我們編寫的監聽器需要實現ChannelAwareMessageListener,重寫onMessage()方法,里面有兩個參數Message message, Channel channel,Message 是消息體本身,Channel是RabbitMQ的連接通道。
3.異常的處理
1.消息處理正常,沒有拋出異常,這時我們需要手動應答消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
2.當出現異常時,我們需要把這個消息回滾到消息隊列,有兩種方式
//ack返回false,並重新回到隊列,api里面解釋得很清楚 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); //拒絕消息 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
3.經過開發中的實際測試,當消息回滾到消息隊列時,這條消息不會回到隊列尾部,而是仍是在隊列頭部,這時消費者會立馬又接收到這條消息,進行處理,接着拋出異常,進行 回滾,如此反復進行。這種情況會導致消息隊列處理出現阻塞,消息堆積,導致正常消息也無法運行。對於消息回滾到消息隊列,我們希望比較理想的方式時出現異常的消息到 達消息隊列尾部,這樣既保證消息不會丟失,又保證了正常業務的進行,因此我們采取的解決方案是,將消息進行應答,這時消息隊列會刪除該消息,同時我們再次發送該消息 到消息隊列,這時就實現了錯誤消息進行消息隊列尾部的方案。
//手動進行應答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//重新發送消息到隊尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
JSON.toJSONBytes(new Object()));
4.對於第三條中的解決方案會存在一個問題,如果一個消息體本身有誤,會導致該消息體,一直無法進行處理,而服務器中刷出大量無用日志。解決這個問題可以采取兩種方案
1.一種是對於日常細致處理,分清哪些是可以恢復的異常,哪些是不可以恢復的異常。對於可以恢復的異常我們采取第三條中的解決方案,對於不可以處理的異常,我們采用記錄日志,直接丟棄該消息方案。
2.另一種是我們對每條消息進行標記,記錄每條消息的處理次數,當一條消息,多次處理仍不能成功時,處理次數到達我們設置的值時,我們就 丟棄該消息,但需要記錄詳細的日志。
4.使用手動應答消息,有一點需要特別注意,那就是不能忘記應答消息,因為對於RabbitMQ來說處理消息沒有超時,只要不應答消息,他就會認為仍在正常處理消息,導致消息隊列出現阻塞,影響業務執行。

