默認情況下如果一個 Message 被消費者所正確接收則會被從 Queue 中移除
如果一個 Queue 沒被任何消費者訂閱,那么這個 Queue 中的消息會被 Cache(緩存),當有消費者訂閱時則會立即發送,當 Message 被消費者正確接收時,就會被從 Queue 中移除
消息發送確認
發送的消息怎么樣才算失敗或成功?如何確認?
- 當消息無法路由到隊列時,確認消息路由失敗。消息成功路由時,當需要發送的隊列都發送成功后,進行確認消息,對於持久化隊列意味着寫入磁盤,對於鏡像隊列意味着所有鏡像接收成功
消息接收確認
消息消費者如何通知 Rabbit 消息消費成功?
- 消息通過 ACK 確認是否被正確接收,每個 Message 都要被確認(acknowledged),可以手動去 ACK 或自動 ACK
- 自動確認會在消息發送給消費者后立即確認,但存在丟失消息的可能,如果消費端消費邏輯拋出異常,也就是消費端沒有處理成功這條消息,那么就相當於丟失了消息
- 如果消息已經被處理,但后續代碼拋出異常,使用 Spring 進行管理的話消費端業務邏輯會進行回滾,這也同樣造成了實際意義的消息丟失
- 如果手動確認則當消費者調用 ack、nack、reject 幾種方法進行確認,手動確認可以在業務失敗后進行一些操作,如果消息未被 ACK 則會發送到下一個消費者
- 如果某個服務忘記 ACK 了,則 RabbitMQ 不會再發送數據給它,因為 RabbitMQ 認為該服務的處理能力有限
- ACK 機制還可以起到限流作用,比如在接收到某條消息時休眠幾秒鍾
- 消息確認模式有:
- AcknowledgeMode.NONE:自動確認
- AcknowledgeMode.AUTO:根據情況確認
- AcknowledgeMode.MANUAL:手動確認
確認消息(局部方法處理消息)
- 默認情況下消息消費者是自動 ack (確認)消息的,如果要手動 ack(確認)則需要修改確認模式為 manual
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
- 或在 RabbitListenerContainerFactory 中進行開啟手動 ack
@Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //開啟手動 ack return factory; }
- 確認消息
@RabbitHandler public void processMessage2(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) { System.out.println(message); try { channel.basicAck(tag,false); // 確認消息 } catch (IOException e) { e.printStackTrace(); } }
- 需要注意的 basicAck 方法需要傳遞兩個參數
- deliveryTag(唯一標識 ID):當一個消費者向 RabbitMQ 注冊后,會建立起一個 Channel ,RabbitMQ 會用 basic.deliver 方法向消費者推送消息,這個方法攜帶了一個 delivery tag, 它代表了 RabbitMQ 向該 Channel 投遞的這條消息的唯一標識 ID,是一個單調遞增的正整數,delivery tag 的范圍僅限於 Channel
- multiple:為了減少網絡流量,手動確認可以被批處理,當該參數為 true 時,則可以一次性確認 delivery_tag 小於等於傳入值的所有消息
手動否認、拒絕消息
- 發送一個 header 中包含 error 屬性的消息

hducA.png
- 消費者獲取消息時檢查到頭部包含 error 則 nack 消息
@RabbitHandler public void processMessage2(String message, Channel channel,@Headers Map<String,Object> map) { System.out.println(message); if (map.get("error")!= null){ System.out.println("錯誤的消息"); try { channel.basicNack((Long)map.get(AmqpHeaders.DELIVERY_TAG),false,true); //否認消息 return; } catch (IOException e) { e.printStackTrace(); } } try { channel.basicAck((Long)map.get(AmqpHeaders.DELIVERY_TAG),false); //確認消息 } catch (IOException e) { e.printStackTrace(); } }
- 此時控制台重復打印,說明該消息被 nack 后一直重新入隊列然后一直重新消費
hello
錯誤的消息
hello
錯誤的消息
hello
錯誤的消息
hello
錯誤的消息
- 也可以拒絕該消息,消息會被丟棄,不會重回隊列
channel.basicReject((Long)map.get(AmqpHeaders.DELIVERY_TAG),false); //拒絕消息
確認消息 三種類型(全局處理消息)
- 自動確認 NONE(涉及到一個問題就是如果在處理消息的時候拋出異常,消息處理失敗,但是因為自動確認而導致 Rabbit 將該消息刪除了,造成消息丟失)
@Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("consumer_queue"); // 監聽的隊列 container.setAcknowledgeMode(AcknowledgeMode.NONE); // NONE 代表自動確認 container.setMessageListener((MessageListener) message -> { //消息監聽處理 System.out.println("====接收到消息====="); System.out.println(new String(message.getBody())); //相當於自己的一些消費邏輯拋錯誤 throw new NullPointerException("consumer fail"); }); return container; }
- 手動確認消息 MANUAL
@Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("consumer_queue"); // 監聽的隊列 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手動確認 container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> { //消息處理 System.out.println("====接收到消息====="); System.out.println(new String(message.getBody())); if(message.getMessageProperties().getHeaders().get("error") == null){ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println("消息已經確認"); }else { //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); System.out.println("消息拒絕"); } }); return container; }
- 根據情況確認消息 AUTO
- AcknowledgeMode 除了 NONE 和 MANUAL 之外還有 AUTO ,它會根據方法的執行情況來決定是否確認還是拒絕(是否重新入queue)
-
如果消息成功被消費(成功的意思是在消費的過程中沒有拋出異常),則自動確認
-
當拋出 AmqpRejectAndDontRequeueException 異常的時候,則消息會被拒絕,且 requeue = false(不重新入隊列)
-
當拋出 ImmediateAcknowledgeAmqpException 異常,則消費者會被確認
-
其他的異常,則消息會被拒絕,且 requeue = true(如果此時只有一個消費者監聽該隊列,則有發生死循環的風險,多消費端也會造成資源的極大浪費,這個在開發過程中一定要避免的)。可以通過 setDefaultRequeueRejected(默認是true)去設置
-
@Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("consumer_queue"); // 監聽的隊列 container.setAcknowledgeMode(AcknowledgeMode.AUTO); // 根據情況確認消息 container.setMessageListener((MessageListener) (message) -> { System.out.println("====接收到消息====="); System.out.println(new String(message.getBody())); //拋出NullPointerException異常則重新入隊列 //throw new NullPointerException("消息消費失敗"); //當拋出的異常是AmqpRejectAndDontRequeueException異常的時候,則消息會被拒絕,且requeue=false //throw new AmqpRejectAndDontRequeueException("消息消費失敗"); //當拋出ImmediateAcknowledgeAmqpException異常,則消費者會被確認 throw new ImmediateAcknowledgeAmqpException("消息消費失敗"); }); return container; }
消息可靠總結
- 持久化
- exchange要持久化
- queue要持久化
- message要持久化
- 消息確認
- 啟動消費返回(@ReturnList注解,生產者就可以知道哪些消息沒有發出去)
- 生產者和Server(broker)之間的消息確認
- 消費者和Server(broker)之間的消息確認