/**
* RabbitMQ消息確認機制
* 關於rabbit的生產和消費方的一些實用的操作;
* producer的confirm和consumer的ack,這兩者使用的模式都是用來保證數據完整性,防止數據丟失
*/
/** * producer的confirm模式 * 業務場景描述: * 促銷系統在做活動前,需要給用戶的手機發送一條活動內容短信希望用戶來參加, * 因為用戶量有點大,所以通過往短信mq中插入數據方式,讓短信服務來消費mq發短信; * 此時插入mq消息的服務為了保證給所有用戶發消息,並且要在短時間內插入完成(因此用到了異步插入方式(快速)), * 我們就需要知道每次插入mq是否成功,如果不成功那我們可以收集失敗的信息后補發(因此confirm模式排上了用場); * 開啟confirm模式后,返回send結果(成功或失敗) */ public RabbitTemplate getRabbitTemplate(RabbitTemplate.ConfirmCallback confirmCallback){ CachingConnectionFactory connectionFactory = null; return getRabbitTemplate(connectionFactory, confirmCallback); } //producer生產 - confirm模式 public RabbitTemplate getRabbitTemplate(CachingConnectionFactory connectionFactory,RabbitTemplate.ConfirmCallback confirmCallback){ RabbitTemplate template = new RabbitTemplate(connectionFactory); //product開啟confirm模式 connectionFactory.setPublisherConfirms(true); //設置confirm回調處理 template.setConfirmCallback(confirmCallback); return template; }
/** * consumer的ack模式 * 場景描述:短信服務去消費mq隊列信息時,倘若服務調用的運營商發送短信接口異常了(短信運營商接口欠費), * 我們此時的短信是發送失敗的,用戶也收不到短信,但是在默認(默認開啟ack)前提下mq消息已經被消費了rabbit中沒有記錄了(kafka例外); * 想要mq消息在業務邏輯異常時還存在,那么可以使用ack方式; * 業務無異常,發送ack標識,mq消息釋放 * * 在springboot中可以使用基於amqp封裝的工廠類關閉自動ack模式,改為手動ack方式; * 只有當業務代碼流程走完后,最后通過代碼設置ack標識,來通知rabbit消息可以丟棄了; * 如果設置了手動模式后,又沒有提交ack標識,那么mq中的消息一直存在無法釋放(每次consumer消費后,rabbit會把noack的消息重復放入隊列中): */ public SimpleRabbitListenerContainerFactory listenerContainerFactory(){ ConnectionFactory connectionFactory = null; return listenerContainerFactory(connectionFactory); } public SimpleRabbitListenerContainerFactory listenerContainerFactory(ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); //代碼手動ack factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //開啟消費者數量 factory.setConcurrentConsumers(2); //手動確認模式可以使用 prefetch,限制通道上未完成的(“正在進行中的”)發送的數量 //每次接受數據量,默認250 factory.setPrefetchCount(300); return factory; } /** * 消息確認–ACK * 通過連接工廠設置手動ack方式,然后獲取mq消息后,走完正常業務邏輯,最后再手動通知ack釋放消息,如下: */ private void firstNodeListener(String msg,Channel channel,Message message){ try { long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.out.println("firstNodeListener - 消費消息 [" + deliveryTag + "] - " + msg); //這里ack主要根據mq消息的唯一編號(deliverTag)來通知; //如果我們不設置ack確認,RabbitMQ會認為這個消息沒有正常消費,會將此消息重新放入隊列中 //忘記通過basicAck返回確認信息,將導致消費者客戶端退出或者關閉后,消息會被退回RabbitMQ服務器,這會使RabbitMQ服務器內存爆滿,而且RabbitMQ也不會主動刪除這些被退回的消息 channel.basicAck(deliveryTag, true); } catch (IOException e) { e.printStackTrace(); } }
