RabbitMQ死信隊列另類用法之復合死信


前言

在業務開發過程中,我們常常需要做一些定時任務,這些任務一般用來做監控或者清理任務,比如在訂單的業務場景中,用戶在創建訂單后一段時間內,沒有完成支付,系統將自動取消該訂單,並將庫存返回到商品中,又比如在微信中,用戶發出紅包24小時后,需要對紅包進行檢查,是否已領取完成,如未領取完成,將剩余金額退回到發送者錢包中,同時銷毀該紅包。

在項目初始階段,或者是一些小型的項目中,常常采用定時輪詢的方法進行檢查,但是我們都知道,定時輪詢將給數據庫帶來不小的壓力,而且定時間隔無法進行動態調整,特別是一個系統中,同時存在好幾個定時器的時候,就顯得非常的麻煩,同時給數據庫造成巨大的訪問壓力。

下面,本文將演示如何使用一個 RabbitMQ 的死信隊列同時監控多種業務(復合業務),達到模塊解耦,釋放壓力的目的。

注意:名詞“復合死信”是為了敘述方便臨時創造的,如有不妥,歡迎指正

1. 什么是 RabbitMQ 死信隊列

DLX(Dead Letter Exchanges)死信交換,死信隊列本身也是一個普通的消息隊列,在創建隊列的時候,通過設置一些關鍵參數,可以將一個普通的消息隊列設置為死信隊列,與其它消息隊列不同的是,其入棧的消息根據入棧時指定的過期時間/被拒絕/超出隊列長度被移除,依次被轉發到指定的消息隊列中進行二次處理。這樣說法比較拗口,其原理就是死信隊列內位於頂部的消息過期時,該消息將被馬上發送到另外一個訂閱者(消息隊列)中。

其原理入下圖

由上圖可以看到,目前有三種類型的業務需要使用 DLX 進行處理,因為每個業務的超時時間不一致的問題,如果將他們都放入一個 DLX 中進行處理,將會出現一個時序的問題,即消息隊列總數處理頂部的消息,如果頂部的消息未過期,而底部的消息過期,這就麻煩了,因為過期的消息無法得到消費,將會造成延遲;所以正常情況下,最好的辦法是每個業務都獨立一個隊列,這樣就可以保證,即將過期的消息總是處於隊列的頂部,從而被第一時間處理。

但是多個 DLX 又帶來了管理上面的問題,隨着業務的增加,越來越多的業務需要進入不同的 DLX ,這個時候我們發現,由於人手不足的原因,維護這么多 DLX 實在是太吃力了,如果能將這些消息都接入一個 DLX 中該多好呀,在一個 DLX 中進行消息訂閱,然后進行分發或者處理,這就非常有趣了。

下面就按照這個思路,我們進行集中處理,也就是復合死信交換 CDLX(Composite Dead Letter Exchanges)

2. 如何創建死信隊列

創建 DLX 隊列的方式非常簡單,我們使用 RabbitMQ Web 控制面板進行創建 Exhcange(交換機)/Consumer(死信消費隊列)/cdlx(復合死信隊列)

2.1 創建隊列

創建交換機 cdlx-Exchange

死信消費隊列 cdlx-Consumer

復合死信隊列 cdlx-Master

  • 注意,這里添加死信隊列必須同時設置死信轉發交換機和路由,后續通過路由綁定實現消費隊列

路由綁定

上面的路由綁定共有兩個,分別是 Master 和 Consumer 用於消息路由到隊列,為下面的業務消息做准備,建好后的隊列如下

3.復合業務進入死信隊列

當建立好隊列以后,我們就可以專心的處理業務了,下面就來模擬3種業務將消息發送到死信隊列的過程

3.1 發送死信消息到隊列

發送消息使用了 Asp.NetCore輕松學-實現一個輕量級高可復用的RabbitMQ客戶端 中的輕量客戶端,封裝后的發送消息代碼如下

public class CdlxMasterService
    {
        private IConfiguration cfg = null;
        private ILogger logger = null;
        private string vhost = "test_mq";
        private string exchange = "cdlx-Exchange";
        private string routekey = "master";
        private static MQConnection connection = null;

        private MQConnection Connection
        {
            get
            {
                if (connection == null || !connection.Connection.IsOpen)
                {
                    connection = new MQConnection(
                        cfg["rabbitmq:username"],
                        cfg["rabbitmq:password"],
                        cfg["rabbitmq:host"],
                        Convert.ToInt32(cfg["rabbitmq:port"]),
                        vhost,
                        logger);
                }
                return connection;
            }
        }

        private static IModel channel = null;
        private IModel Channel
        {
            get
            {
                if (channel == null || channel.IsClosed)
                    channel = Connection.Connection.CreateModel();

                return channel;
            }
        }

        public void SendMessage(object data)
        {
            string message = JsonConvert.SerializeObject(data);
            this.Connection.Publish(this.Channel, exchange, routekey, message);
        }
    }
3.2 將 CdlxMasterService 注入到服務
  public void ConfigureServices(IServiceCollection services)
        {
           services.AddSingleton<CdlxMasterService>();
           ...
        }
3.3 模擬3種業務生產死信消息
    public class HomeController : Controller
    {
        private CdlxMasterService masterService;
        public HomeController(CdlxMasterService masterService)
        {
            this.masterService = masterService;
        }

        [HttpGet("publish")]
        public int Publish()
        {
            Contract contract = new Contract(this.masterService);
            for (int i = 0; i < 10; i++)
            {
                contract.Publish(MessageType.RedPackage, "紅包信息,超時時間1024s");
                contract.Publish(MessageType.Order, "訂單信息,超時時間2048s");
                contract.Publish(MessageType.Vote, "投票信息,超時時間4096s");
            }
            return 0;
        }
    }

上面的接口 puhlish 模擬了業務消息,由於我們依次發布了 紅包/訂單/投票 消息,所以迭代發布 10 次后,正好形成了一個時序錯亂的信息隊列,按照自動過期時序計算,當第一個紅包超時到達時,第四條消息(紅包)也會接着超時,可是由於此時訂單和投票消息位於紅包消息上面,該紅包消息在達到超時時間后並不會被投遞到 Consumer 消費隊列,這是正確的,我們確實也是希望是這個結果

如果有一個辦法把超時的消息自動將其提升到隊列頂部就好了!

4. 處理復合死信

在 RabbitMQ 提供的 API 接口中,沒有什么直接可用的能將死信隊列中超時消息提升到頂部的好辦法;但是,我們可以利用部分 API 接口的特性來完成這件事情。

4.1 定時消費客戶端

下面,我們將使用一個定時消費客戶端來完成對死信隊列的輪詢,充分利用 RabbitMQ 的消費特性來完成超時消息的位置提升。

過程如下圖:

如上圖所示,我們增加一個 dlx-timer 定時器,定時的發起對死信隊列的消費,該消費者僅僅是消費,不確認消息,也就是不做 ack,然后將消息重新置入隊列中;這個過程,就是將消息不斷提升位置的過程。

4.2 定時消費客戶端實現代碼
    public class CdlxTimerService : MQServiceBase
    {
        public override string vHost { get { return "test_mq"; } }
        public override string Exchange { get { return "cdlx-Exchange"; } }
        public override List<BindInfo> Binds => new List<BindInfo>();
        private string queue = "cdlx-Master";

        public CdlxTimerService(IConfiguration cfg, ILogger logger) : base(cfg, logger)
        {
        }

        /// <summary>
        ///  檢查死信隊列
        /// </summary>
        /// <returns></returns>
        public List<CdlxMessage> CheckMessage()
        {
            long total = 0;
            List<CdlxMessage> list = new List<CdlxMessage>();
            var connection = base.CreateConnection();
            using (IModel channel = connection.Connection.CreateModel())
            {
                bool latest = true;
                while (latest)
                {
                    BasicGetResult result = channel.BasicGet(this.queue, false);
                    total++;
                    latest = result != null;
                    if (latest)
                    {
                        var json = Encoding.UTF8.GetString(result.Body);
                        list.Add(JsonConvert.DeserializeObject<CdlxMessage>(json));
                    }
                }
                channel.Close();
                connection.Close();
            }
            return list;
        }
    }

上面的代碼首先在定時調用到來的時候,創建了一個 Connection,然后利用此 Connection 創建了了一個 Channel,緊接着,使用該 Channel 調用 BasicGet 方法,獲得隊列頂部的信息,且設置 autoAck=false,表示僅檢查消息,不確認,然后進入一個 while 迭代過程,一直讀取到隊列底部,獲得所有隊列中的信息,最后,關閉了通道釋放連接。

這樣,就完成了一次消息檢查的過程,在調用 BasicGet 后,下一條信息將會出現在隊列的頂部,同步,隊列將自動對該消息進行超時檢查,由於我們在調用 BasicGet 的時候,傳入 autoAck=false,不確認該消息,在 RabbitMQ 控制台中,將顯示為 unacted,所以在釋放連接后,所有消息將會被重新置入隊列中,這是一個自動的過程,無需我們做額外的工作。

4.3 Consumer(死信消費隊列)最終處理業務

配置隊列管理隨程序啟動停止

        private MQServcieManager serviceManager;
        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory factory, IApplicationLifetime lifeTime)
        {
            serviceManager = new MQServcieManager(this.Configuration, factory.CreateLogger<MQServcieManager>());
            lifeTime.ApplicationStarted.Register(() => { serviceManager.Start(); });
            lifeTime.ApplicationStopping.Register(() => { serviceManager.Stop(); });
            ...
        }

實現消費隊列

    public class CdlxConsumerService : MQServiceBase
    {
        public override string vHost { get { return "test_mq"; } }
        public override string Exchange { get { return "cdlx-Exchange"; } }
        private string queue = "cdlx-Consumer";
        private string routeKey = "all";
        private List<BindInfo> bs = new List<BindInfo>();
        public override List<BindInfo> Binds { get { return bs; } }

        public CdlxConsumerService(IConfiguration cfg, ILogger logger) : base(cfg, logger)
        {
            this.bs.Add(new BindInfo
            {
                ExchangeType = ExchangeType.Direct,
                Queue = this.queue,
                RouterKey = this.routeKey,
                OnReceived = this.OnReceived
            });
        }

        private void OnReceived(MessageBody body)
        {
            var message = JsonConvert.DeserializeObject<CdlxMessage>(body.Content);
            Console.WriteLine("類型:{0}\t 內容:{1}\t進入時間:{2}\t過期時間:{3}", message.Type, message.Data, message.CreateTime, message.CreateTime.AddSeconds(message.Expire));

            body.Consumer.Model.BasicAck(body.BasicDeliver.DeliveryTag, true);
        }
    }

上面的代碼,模擬了最終業務處理的過程,這里僅僅是簡單演示,所以只是將消息打印到屏幕上;在實際的業務場景中,我們可以根據不同的 MessageType 進行消息的分發處理。

5. 消費過程演示

為了比較直觀的觀看死信消費過程,我們編寫一個簡單的列表頁面,自動刷新后去消費死信隊列,然后將消息輸出到頁面上,通過觀察此頁面,我們可以實時了解到死信隊列的消費過程,實際的業務場景中,大家可以利用第三方定時器定時調用接口實現,或者使用內置的輕量主機做后台任務實現定時輪詢,具體參考 Asp.Net Core 輕松學-基於微服務的后台任務調度管理器

5.1 發布消息

瀏覽器訪問本機地址:http://localhost:5000/home/publish

下面將發布 30 條信息到 DLX 中,每個業務各 10 條信息。

通常情況下,紅包的過期時間最短且超時時間一致,應該最快超時,意味着當第一條紅包消息超時的時候,其余 9 條紅包消息也會一並超時,但是由於紅包消息混合的發布在隊列中,且只有第一條紅包消息位移隊列頂部;所以,當第一條紅包消息超時被消費后,其余 9 條紅包由於不是位於隊列頂部,雖然此時他們已經超時,但是 DLX 將無法處理;當我們使用 cdlx-timer(定時器)模擬調用 CdlxTimerService 的時候(也就是刷新首頁), CdlxTimerService 服務將會對 DLX 進行檢查。

查看消費狀態

通過上圖的觀察得知,紅色部分首先位於消息頂部被消費,然后就無法進行超時判斷,接下來,由於使用了定時輪詢,使得綠色部分消息得以浮動到消息頂部,然后被 DLX 進行處理后消費。

5.2 定時器檢查死信隊列

瀏覽器訪問本機地址:http://localhost:5000/home

上圖的每一次刷新,都是對 DLX 的一次輪詢檢查,隨着輪詢的深入,所有處於隊列中不同位置的超時消息都有機會浮動到隊列頂部進行消費處理。

結束語

業務的發展促進了架構的演進,每一個需求升級的背后,是程序員深深的思考;本文從 CDLX 的需求出發,充分利用了 RabbitMQ DLX 對消息檢查的特性,實現了對復合業務的集中處理。

演示代碼下載

https://github.com/lianggx/Examples/tree/master/RabbitMQ.CDLX


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM