消息的可靠投遞除了需要硬件,網絡,消息中間件等的可靠保證外,還需要生產者,消費者來共同保證來完成。一條消息從生產者產生,到發送到交換機,並被投遞到隊列,並最終被消費者消費,這整個路徑上,途徑的每一個地方都要保證消息的可靠性。
其實,官方文檔Reliability Guide已經總結了消息系統安全的方方面面。
- 網絡方面可以使用心跳檢測TCP連接:Detecting Dead TCP Connections with Heartbeats
- 分布式的RabbitMQ可以用Federation and Shovel來實現
- 可以通過建立監控來監視服務器狀況:Monitoring and Health Checks
除了上面這幾點外,還需要下面一系列的方法來共同保證消息的可靠傳輸。
一、持久化-(Data Safety on the Broker Side)
官方文檔:Data Safety on the Broker Side
將交換機、隊列和消息設為durable
①交換機和隊列持久化
spring代碼中,交換機和隊列默認都是持久化的
②消息持久化
需要將消息的投遞模式(delivery_mode)設置為2(也就是持久化)。
當我們使用RabbitTemplate調用了convertAndSend(String exchange, String routingKey, final Object object) 方法。默認就是持久化模式。
注意:
- 持久化的消息在到達隊列時就被寫入到磁盤,並且如果可以,持久化的消息也會在內存中保存一份備份,這樣可以提高一定的性能,只有在內存吃緊的時候才會從內存中清除。
- 非持久化的消息一般只保存在內存中,在內存吃緊的時候會被換入到磁盤中,以節省內存空間。
但要注意的是,將所有的消息都設置為持久化,會嚴重影響RabbitMQ的性能,寫入硬盤的速度比寫入內存的速度慢的不只一點點。對於可靠性不是那么高的消息可以不采用持久化處理以提高整體的吞吐率,在選擇是否要將消息持久化時,需要在可靠性和吞吐量之間做一個權衡。
在某種應用場景,如大流量的訂單交易系統,為了不影響性能,我們可以不設置持久化,但是我們會定時掃描數據庫中的未發送成功的消息,進行重試發送,實際應用場景,我們其實有很多解決方案。
二、生產者消息確認機制-(Acknowledgements and Confirms)
當消息發送出去之后,我們如何知道消息有沒有正確到達exchange呢?如果在這個過程中,消息丟失了,我們根本不知道發生了什么,也不知道是什么原因導致消息發送失敗了,為解決這個問題,主要有如下兩種方案:
-
通過事務機制實現
-
通過生產者消息確認機制(publisher confirm)實現
但是使用事務機制實現會嚴重降低RabbitMQ的消息吞吐量,我們采用一種輕量級的方案——生產者消息確認機制。
①生產者消息確認機制
消息確認機制就是生產者發送的消息一旦被投遞到匹配的隊列之后,交換機就會發送一個確認消息給生產者,生產者就知曉消息已經正確到達了目的地。 如果消息和隊列是持久化存儲的,那么確認消息會在消息寫入磁盤之后發出。
具體實現:通過實現ConfirmCallBack接口,消息發送到Exchange后觸發回調。
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //未收到確認 if (!ack) { log.debug("消息投遞到exchange失敗,原因: {}", cause); } } });
這里僅僅是記錄了日志文件,后續還要對消息進行重試發送,可以根據業務的需要使用立即重試或者在未來的某個時間點重試。
重試機制
當然,使用生產者消息確認機制需要考慮的另外一個問題是由於網絡斷開等原因導致生產者收不到ack確認,那么對於生產者來說可能會有兩種結果:
- 消息沒有發送成功,自然消費者也不會消費,此時生產者需要重發消息
- 消息已發送成功但沒收到確認,消費者很大程度已經消費了消息,此時無需重發消息
對於后者,一種可能的處理方式是在生產者確認回調方法中去驗證(查詢)一下消費者對應的業務是否對該消息進行了處理(根據消息對應的業務的id),但很顯然這樣是非常不合理的。因為消費者可能有多個,需要一一去驗證,同時這也與引入消息隊列實現生產者和消費者的解耦相悖。所以最合理的處理方式就是生產者進行重試,但消費者要進行冪等處理了。可以參考RocketMQ消息冪等處理。
對於立即重試,因為可能是網絡故障,所以依然於事無補,所以可以指定一定的重試次數,或者可以同時調整重試時間間隔(1,2,4,8……)。對於后者,可以將未成功發送的消息記錄到數據庫日志表,然后使用一個定時器去定時掃描日志表,然后調用生產者重新發送消息。
具體實現如下:
RabbitTemplate有一個確認回調接口,該接口中有個confirm方法。只需要實現該回調接口,並在confirm方法中判斷是否收到確認,如果未收到確認,則記錄日志(消息id,失敗原因),然后將消息id和消息內容保存到redis,然后啟動一個定時任務來重發消息。
public class PublishConfirmCallback implements RabbitTemplate.ConfirmCallback { private static final Logger LOGGER = LoggerFactory.getLogger(PublishConfirmCallback.class); @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //未收到確認消息,重試 if (!ack) { LOGGER.warn("mq send message error: id [{}] cause [{}]", correlationData.getId(), cause); //todo : add id and info to redis; schedule a job to resend message; //todo : 將id和信息加入redis,啟動定時任務來重發消息 } } }
對於生產者重試的實現可以參考博客:rabbitmq可靠發送的自動重試機制
在消息發送、接受時記錄DB日志,定時輪訓DB日志,查明哪些發送消息沒有成功消費,啟動重新發送消息機制。
②mandatory參數
補充一個Mandatory參數。當Mandatory參數設為true時,如果目的不可達,會將消息返還給生產者,生產者通過一個回調函數可以獲取該信息。
三、消費者消息手動確認機制-(Acknowledgements and Confirms)
為了保證消息從隊列可靠地到達消費者,RabbitMQ提供了消費者消息確認機制(message acknowledgement)。采用消息確認機制之后,消費者就有足夠的時間來處理消息,不用擔心處理消息過程中消費者進程掛掉后消息丟失的問題,因為RabbitMQ會一直等待並持有消息,直到消費者確認了該消息。
但默認情況下消費者是自動 ack (確認)消息的,也就是消費者接收到消息后會立即回復一個ack確認消息,而無論消費者是否對消息成功進行了處理。如果消費者在處理消息時發生了異常,此時就無法保證業務的一致性了,這樣顯然是不合理的。所以,可以設置為消費者手動確認,一旦消息者成功消費就會手動發送確認消息,mq收到確認后就會將消息從隊列中刪除。但需要注意的是,消費者一定要做冪等處理,從而避免進行重復處理。
參考博客:spring-boot + rabbitmq消息手動確認模式的幾點說明(重試機制)
四、死信隊列-(Unprocessable Deliveries)
官方文檔:dead letter
什么是死信交換機
DLX,Dead Letter Exchange 的縮寫,又死信郵箱、死信交換機。DLX就是一個普通的交換機,和一般的交換機沒有任何區別。 當消息在一個隊列中變成死信(dead message)時,通過這個交換機將死信發送到死信隊列中(指定好相關參數,rabbitmq會自動發送)。什么是死信呢?什么樣的消息會變成死信呢?
-
消息被拒絕(basic.reject或basic.nack)並且requeue=false
-
消息TTL過期
- 隊列達到最大長度(隊列滿了,無法再添加數據到mq中)
如何使用死信交換機
在定義業務隊列的時可以考慮指定一個死信交換機,並綁定一個死信隊列,當消息變成死信時該消息就會被發送到該死信隊列上,這樣就方便我們查看消息失敗的原因了。
定義業務(普通)隊列的時候指定參數:
-
x-dead-letter-exchange: 用來設置死信后發送的交換機
-
x-dead-letter-routing-key:用來設置死信的routingKey
@Bean public Queue helloQueue() { //將普通隊列綁定到私信交換機上 Map<String, Object> args = new HashMap<>(2); args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName); args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey); Queue queue = new Queue(queueName, true, false, false, args); return queue; }
五、集群模式-(Clustering and Message Replication)
參考博客:RabbitMQ的集群模式
除了上面講的基本可靠性保證外,其實還有很多性能優化方案、可靠性保證方案:集群監控、流控、鏡像隊列、HAProxy+Keeplived高可靠負載均衡