一、數據丟失的三個場景
一條消息從生產者發送到消費者消費的過程:
可以看出,一條消息整個過程要經歷兩次的網絡傳輸:
- 從生產者發送到RabbitMQ服務器,從RabbitMQ服務器發送到消費者
- 在消費者未消費前存儲在隊列(Queue)中
所以可以知道,有三個場景下是會發生消息丟失的:
- 生產者發送消息到RabbitMQ服務器過程中,RabbitMQ服務器如果宕機停止服務,消息會丟失。
- 存儲在隊列中,如果隊列沒有對消息持久化,RabbitMQ服務器宕機重啟會丟失數據。
- 消費者從RabbitMQ服務器獲取隊列中存儲的數據消費,但是消費者程序出錯或者宕機而沒有正確消費,導致數據丟失。
針對以上三種場景,RabbitMQ提供了三種解決的方式,分別是confirm機制,消息持久化,ACK事務機制。
二、消息發送端:confirm機制(生產者-->MQ server)
在生產者發送到RabbitMQ Server時,有可能因為網絡問題導致投遞失敗,從而丟失數據。我們可以使用confirm模式防止數據丟失。工作流程是怎么樣的呢,看以下圖解:
- 第一步,一條消息從生產者發送到RabbitMQ,首先會發送到Exchange,對應回調函數confirm()
- 第二步,從Exchange路由分配到Queue中,對應回調函數則是returnedMessage()
注意:
- exchange無論是否收到消息,都會回調confirm()函數
- 只有當exchange無法路由到queue,才會調用returnedMessage()
代碼實現
代碼怎么實現呢,請看演示:
首先在application.yml配置文件中加上如下配置:
spring: rabbitmq: publisher-confirms: true publisher-returns: true template: mandatory: true //publisher-confirms:設置為true時。當消息投遞到Exchange后,會回調confirm()方法進行通知生產者 //publisher-returns:設置為true時。當消息匹配到Queue並且失敗時,會通過回調returnedMessage()方法返回消息 //spring.rabbitmq.template.mandatory: 設置為true時。指定消息在沒有被隊列接收時會通過回調returnedMessage()方法退回。
有個小細節,publisher-returns和mandatory如果都設置的話,優先級是以mandatory優先。可以看源碼:
接着我們需要定義回調方法:
@Component public class RabbitmqConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { private Logger logger = LoggerFactory.getLogger(RabbitmqConfirmCallback.class); /** * 監聽消息是否到達Exchange * * @param correlationData 包含消息的唯一標識的對象 * @param ack true 標識 ack,false 標識 nack * @param cause nack 投遞失敗的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { logger.info("消息投遞成功~消息Id:{}", correlationData.getId()); } else { logger.error("消息投遞失敗,Id:{},錯誤提示:{}", correlationData.getId(), cause); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.info("消息沒有路由到隊列,獲得返回的消息"); Map map = byteToObject(message.getBody(), Map.class); logger.info("message body: {}", map == null ? "" : map.toString()); logger.info("replyCode: {}", replyCode); logger.info("replyText: {}", replyText); logger.info("exchange: {}", exchange); logger.info("routingKey: {}", exchange); logger.info("------------> end <------------"); } @SuppressWarnings("unchecked") private <T> T byteToObject(byte[] bytes, Class<T> clazz) { T t; try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes); ObjectInputStream ois = new ObjectInputStream(bis)) { t = (T) ois.readObject(); } catch (Exception e) { e.printStackTrace(); return null; } return t; } }
我這里就簡單地打印回調方法返回的消息,在實際項目中,可以把返回的消息存儲到日志表中,使用定時任務進行進一步的處理。
我這里是使用RabbitTemplate進行發送,所以在Service層的RabbitTemplate需要設置一下:
@Service public class RabbitMQServiceImpl implements RabbitMQService { @Resource private RabbitmqConfirmCallback rabbitmqConfirmCallback; @Resource private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { //指定 ConfirmCallback rabbitTemplate.setConfirmCallback(rabbitmqConfirmCallback); //指定 ReturnCallback rabbitTemplate.setReturnCallback(rabbitmqConfirmCallback); } @Override public String sendMsg(String msg) throws Exception { Map<String, Object> message = getMessage(msg); try { CorrelationData correlationData = (CorrelationData) message.remove("correlationData"); rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, message, correlationData); return "ok"; } catch (Exception e) { e.printStackTrace(); return "error"; } } private Map<String, Object> getMessage(String msg) { String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32); CorrelationData correlationData = new CorrelationData(msgId); String sendTime = sdf.format(new Date()); Map<String, Object> map = new HashMap<>(); map.put("msgId", msgId); map.put("sendTime", sendTime); map.put("msg", msg); map.put("correlationData", correlationData); return map; } }
大功告成!接下來我們進行測試,發送一條消息,我們可以控制台:
假設發送一條信息沒有路由匹配到隊列,可以看到如下信息:
這就是confirm模式。它的作用是為了保障生產者投遞消息到RabbitMQ不會出現消息丟失。
三、消息中間件端:消息持久化(exchange 和 queue 的持久化)
RabbitMQ是支持消息持久化的,消息持久化需要設置:Exchange為持久化和Queue持久化,這樣當消息發送到RabbitMQ服務器時,消息就會持久化。
3.1 exchange的持久化
首先看Exchange交換機的類圖:
看這個類圖其實是要說明上一篇文章介紹的四種交換機都是AbstractExchange抽象類的子類,所以根據java的特性,創建子類的實例會先調用父類的構造器,父類也就是AbstractExchange的構造器是怎么樣的呢?
從上面的注釋可以看到durable參數表示是否持久化。默認是持久化(true)。創建持久化的Exchange可以這樣寫:
@Bean public DirectExchange rabbitmqDemoDirectExchange() { //Direct交換機 return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false); }
3.2 queue的持久化
接着是Queue隊列,我們先看看Queue的構造器是怎么樣的:
也是通過durable參數設置是否持久化,默認是true。所以創建時可以不指定:
@Bean public Queue fanoutExchangeQueueA() { //只需要指定名稱,默認是持久化的 return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A); }
3.3 怎么證明持久化成功
怎么證明是已經持久化了呢,實際上可以找到對應的文件:
找到對應磁盤中的目錄:
消息持久化可以防止消息在RabbitMQ Server中不會因為宕機重啟而丟失。
四、消息消費端:ACK事務機制(隊列-->消費者)
原本:消費者從隊列中獲取到消息后,會直接確認簽收,假設消費者宕機或者程序出現異常,數據沒有正常消費,這種情況就會出現數據丟失。
修改:所以關鍵在於把自動簽收改成手動簽收,正常消費則返回確認簽收,如果出現異常,則返回拒絕簽收重回隊列。
代碼實現
首先在消費者的application.yml文件中設置事務提交為manual手動模式:
spring: rabbitmq: listener: simple: acknowledge-mode: manual # 手動ack模式 concurrency: 1 # 最少消費者數量 max-concurrency: 10 # 最大消費者數量
然后編寫消費者的監聽器:
@Component public class RabbitDemoConsumer { enum Action { //處理成功 SUCCESS, //可以重試的錯誤,消息重回隊列 RETRY, //無需重試的錯誤,拒絕消息,並從隊列中刪除 REJECT } @RabbitHandler @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC)) public void process(String msg, Message message, Channel channel) { long tag = message.getMessageProperties().getDeliveryTag(); Action action = Action.SUCCESS; try { System.out.println("消費者RabbitDemoConsumer從RabbitMQ服務端消費消息:" + msg); if ("bad".equals(msg)) { throw new IllegalArgumentException("測試:拋出可重回隊列的異常"); } if ("error".equals(msg)) { throw new Exception("測試:拋出無需重回隊列的異常"); } } catch (IllegalArgumentException e1) { e1.printStackTrace(); //根據異常的類型判斷,設置action是可重試的,還是無需重試的 action = Action.RETRY; } catch (Exception e2) { //打印異常 e2.printStackTrace(); //根據異常的類型判斷,設置action是可重試的,還是無需重試的 action = Action.REJECT; } finally { try { if (action == Action.SUCCESS) { //multiple 表示是否批量處理。true表示批量ack處理小於tag的所有消息。false則處理當前消息 channel.basicAck(tag, false); } else if (action == Action.RETRY) { //Nack,拒絕策略,消息重回隊列 channel.basicNack(tag, false, true); } else { //Nack,拒絕策略,並且從隊列中刪除 channel.basicNack(tag, false, false); } channel.close(); } catch (Exception e) { e.printStackTrace(); } } } }
解釋一下上面的代碼:
- 如果沒有異常,則手動確認回復RabbitMQ服務端basicAck(消費成功)。
- 如果拋出某些可以重回隊列的異常,我們就回復basicNack並且設置重回隊列。
- 如果是拋出不可重回隊列的異常,就回復basicNack並且設置從RabbitMQ的隊列中刪除。
接下來進行測試,發送一條普通的消息"hello":
解釋一下ack返回的三個方法的意思。
①成功確認
void basicAck(long deliveryTag, boolean multiple) throws IOException;
消費者成功處理后調用此方法對消息進行確認。
- deliveryTag:該消息的index
- multiple:是否批量.。true:將一次性ack所有小於deliveryTag的消息。
②失敗確認
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
- deliveryTag:該消息的index。
- multiple:是否批量。true:將一次性拒絕所有小於deliveryTag的消息。
- requeue:被拒絕的是否重新入隊列。
③失敗確認
void basicReject(long deliveryTag, boolean requeue) throws IOException;
- deliveryTag:該消息的index。
- requeue:被拒絕的是否重新入隊列。
basicNack()和basicReject()的區別在於:basicNack()可以批量拒絕,basicReject()一次只能拒接一條消息。
五、補充方案1:設置集群鏡像模式
我們先來介紹下RabbitMQ三種部署模式:
1)單節點模式:最簡單的情況,非集群模式,節點掛了,消息就不能用了。業務可能癱瘓,只能等待。
2)普通模式:默認的集群模式,某個節點掛了,該節點上的消息不能用,有影響的業務癱瘓,只能等待節點恢復重啟可用(必須持久化消息情況下)。
3)鏡像模式:把需要的隊列做成鏡像隊列(互為鏡像的是隊列,並非節點),一個消息會存在於多個節點的隊列中,屬於RabbitMQ的HA方案(高可用方案)
下面介紹下鏡像模式的三種高可用策略模式:
1)同步至所有的broker節點
2)同步最多N個機器
3)只同步至符合指定名稱的nodes
命令處理HA策略模版:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
1)為每個以“rock.wechat”開頭的隊列設置所有節點的鏡像,並且設置為自動同步模式
rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
2)為每個以“rock.wechat.”開頭的隊列設置兩個節點的鏡像,並且設置為自動同步模式
rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat" \
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
3)為每個以“node.”開頭的隊列分配指定的節點做鏡像
rabbitmqctl set_policy ha-nodes "^nodes\." \
'{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
但是:HA 鏡像隊列有一個很大的缺點就是:系統的吞吐量會有所下降。
六、補充方案2:消息補償機制
我們通過之前的方案 , 基本上已經能夠保證消息投遞成功了 ! 為什么還要消息補償機制呢? 難道消息還會丟失,沒錯,系統是在一個復雜的環境,不要想的太簡單了,雖然以上的三種方案,基本可以保證消息的高可用不丟失的問題。但是事無完全:
比如:持久化的消息,保存到硬盤過程中,當前隊列節點掛了,存儲節點硬盤又壞了,消息丟了,怎么辦?
產線網絡環境太復雜,所以不知數太多,所以要做消息補償機制 !
消息補償機制需要建立在業務數據庫和MQ數據庫的基礎之上 , 當我們發送消息時 , 需要同時將消息數據保存在數據庫中, 兩者的狀態必須記錄。 然后通過業務數據庫和MQ數據庫的對比檢查消費是否成功,不成功,進行消息補償措施,重新發送消息處理。

參考文獻
https://zhuanlan.zhihu.com/p/166426241