可靠性投遞:
1、保障消息能夠成功發出
2、保障rabbitmq(broker)能夠成功接收。接收指的是:broker接收到生產者發送的消息,放到exchange中,分發給對應的queue,交付給對應的消費者。
3、發送端要收到broker的確認應答,確認broker已收到|處理消息
4、完善的消息補償機制。發送端沒收到broker的確認應答,不知道消息是否成功投遞成功,這時候就需要做一些補償處理,比如重新投遞。
rabbitmq的server又叫做broker,接收客戶端的連接,實現AMQP實體服務,包含exchange、queue等多種組件。
說白了,broker就是rabbitmq服務器。
實現消息可靠性投遞的、常見的2種解決方案
- 消息狀態入庫,對消息狀態進行打標
- 延時再次投遞消息,回調確認
第一種方案 消息狀態打標
1、生產者將消息入庫、消息狀態入庫。
比如用戶下單,產生一個Order對象,把這個Order對象數據持久化到數據庫中
單獨用一張表來存儲消息狀態:
用外鍵關聯消息(比如關聯訂單表的id),
設置一個status列記錄消息投遞狀態,默認值為0,表示消息未投遞到broker;
設置一個時間列,記錄消息投遞時間;
設置一個重試次數列,記錄重新投遞的次數,默認值為0。
2、生產者發送消息到broker
把Order對象發送到broker,因為消息都要轉換為byte[ ]發送,什么類型都可以。
3、生產者接收到broker的確認應答,將數據庫中該條消息的狀態修改為1,表示成功投遞
4、分布式系統的定時任務
如果消息投遞一段時間后,未收到broker的確認應答,怎么補償處理?
使用定時任務來做:
生產者每隔一段時間,比如5min,啟動一條線程來查詢數據庫中重試次數達到指定值(比如3)、且投遞時間已超過指定值(比如5min)的消息,將其狀態修改為2,表示重試指定次數后仍未能成功投遞;
再查詢狀態為0、且發送時間已超過指定時間的消息,重新投遞,並更新投遞時間為當前時間、重試次數+1;
第一種方案存在的問題
生產者執行定時任務也有額外的開銷,生產者要進行2方面的數據庫IO操作(消息本身+消息狀態),IO是很花時間的,在高並發的情況下,數據庫性能很容易成為系統性能的瓶頸。
並發量大的情況下,第一種方案嚴重拉低生產者的性能。
相比之下,第二種方案用得更多,但稍微復雜一點。
第二種方案 延時再次投遞消息
1、生產者將消息入庫,並將消息發送給broker,broker將消息放到對應的queue1中
2、消費者監聽queue1,處理消息,處理一條消息后產生一條新消息作為確認(綁定queue2),比如以Order的id作為新消息,總之要能唯一標識處理的消息。
3、消費者將產生的消息發送給broker,broker將消息放到queue2中(注意不是消費者監聽的queue1)
4、單獨寫一個callback service(回調服務),來監聽queue2,把queue2中的消息入庫,比如放在tb_msg_processed表中,一條記錄代表一條已被消費者處理的消息
5、生產者發送消息后,延時再次發送這條消息(綁定queue3),比如3min|5min后再次發送這條消息。
6、回調服務監聽queue3,把queue中消息與數據庫中的記錄對比,比如把queue3獲取到的Order的id取出來,查詢tb_msg_processed中有沒有這個Order id,有就說明投遞成功;沒有就說說明未投妥,回調服務rpc通知生產者(傳遞order id),生產者從數據庫查詢該條消息的數據(order對象),重新投遞(queue1)——重新走一遍流程。
第一種方案消費者使用數據包來確認應答(ack),第二種由消費者自己產生一條消息來確認應答。
整個流程中,生產者又叫做upstream service(上游服務),消費者又叫做downstream service(下游服務)。
第二種方案的優缺點
相較於第一種方案,第二種方案多寫一個服務,每對生產者——消費者都使用一個額外的queue來確認,回調服務開發成本高些、略微復雜些;
部署回調服務又要使用、維護額外的機器,成本變高了。
但生產者的數據庫IO操作減少了,提升了性能。只要性能上去,稍微增加點成本完全可以接受。
說明
(1)消息入庫完成,然后發送消息(Order對象)到broker,注意順序
(2)分布式事務對性能的影響很大,並發量中小的可以加事務,如果並發量很大,事務會嚴重拉低性能,不建議加事務(能不加就不加)
(3)不管是第一種、還是第二種,都很難做到100%的投遞成功。優先考慮能夠扛得住高並發(性能),在保證性能的前提下盡可能提高消息投遞的可靠性