MQ消費失敗,自動重試思路


在遇到與第三方系統做對接時,MQ無疑是非常好的解決方案(解耦、異步)。但是如果引入MQ組件,隨之要考慮的問題就變多了,如何保證MQ消息能夠正常被業務消費。所以引入MQ消費失敗情況下,自動重試功能是非常重要的。這里不過細講MQ有哪些原因會導致失敗。

MQ重試,網上有方案一般采用的是,本地消息表+定時任務,不清楚的可以自行了解下。

我這里提供一種另外的思路,供大家參考。方案實現在RabbitMQ(安裝延遲隊列插件)+.NET CORE 3.1

設計思路為:

內置一個專門做重試的隊列,這個隊列是一個延遲隊列,當業務隊列消費失敗時,將原始消息投遞至重試隊列,並設置延遲時間,當延遲時間到達后。重試隊列消費會自動將消息重新投遞會業務隊列,如此便可以實現消息的重試,而且可以根據重試次數來自定義重試時間,比如像微信支付回調一樣(第一次延遲3S,第二次延遲10S,第三次延遲60S),上面方案當然要保證MQ消費采用ACK機制。

那么如何讓重試隊列知道原來的業務隊列是哪個,我們定義業務隊列時,可以通過MQ的消息頭內置一些信息:隊列類型(業務隊列也有可能是延遲隊列)、重試次數(默認為 0)、交換機名稱、路由鍵。業務隊列消費失敗時,將消息投遞至重試隊列時,則可以把業務隊列的消息頭傳遞至重試隊列,那么重試隊列消費,重新將消息發送給業務隊列時,則可以知道業務隊列所需要的所有參數(需要將重試次數+1)。

下面結合代碼講下具體實現:

我們先看看業務隊列發送消息時,如何定義

IBasicProperties properties = channel.CreateBasicProperties();
                properties.Persistent = true;
                //初始化,需要內置一些消費異常,自動重試參數 
                if (headers == null)
                {
                    headers = new Dictionary<string, object>();
                }
                //ttlSecond 有值表示消息將投遞到延遲隊列
                //因為可以自建延遲隊列,ttlSecond是業務標識 
                if (ttlSecond.HasValue)
                {
                    if (!headers.ContainsKey("x-delay"))
                    {
                        headers.Add("x-delay", ttlSecond * 1000);
                    }
                    else
                    {
                        headers["x-delay"] = ttlSecond * 1000;
                    }
                    //queueType = 1表示延遲隊列 
                    //框架內部重試機制需要此參數,因為重新投遞到原始隊列時,需要區分普通隊列還是延遲隊列
                    if (!headers.ContainsKey("queueType"))
                    {
                        headers.Add("queueType", 1);
                    }
                }
                else
                {
                    //queueType = 0表示普通隊列
                    if (!headers.ContainsKey("queueType"))
                    {
                        headers.Add("queueType", 0);
                    }
                }
                //重試次數
                if (!headers.ContainsKey("retryCount"))
                {
                    headers.Add("retryCount", 0);
                }
                //原始交換機名稱
                if (!headers.ContainsKey("retryExchangeName"))
                {
                    headers.Add("retryExchangeName", exchangeName);
                }
                //原始路由鍵
                if (!headers.ContainsKey("retryRoutingKey"))
                {
                    headers.Add("retryRoutingKey", routingKey);
                }
                properties.Headers = headers;
                channel.BasicPublish(exchangeName, routingKey, properties, Encoding.UTF8.GetBytes(message));

 這里會內置上面描述的重試隊列需要的參數

再來看看業務隊列消費如何處理,這里因為會自動重試,所以保證業務隊列每次都是消費成功的(MQ才會將消息從隊列中刪除)

       //每次消費一條
            channel.BasicQos(0, 1, false);

            //定義消費者
            EventingBasicConsumer eventingBasicConsumer = new EventingBasicConsumer(channel);
            eventingBasicConsumer.Received += async (sender, basicConsumer) =>
            {
                string body = Encoding.UTF8.GetString(basicConsumer.Body.ToArray());
                Deadletter deadletter = null;
                try
                {
                    string errorMsg = await action(body);
                    if (!errorMsg.IsNullOrWhiteSpace())
                    {
                        deadletter = new Deadletter() { Body = body, ErrorMsg = errorMsg };
                        _logger.LogError($"業務隊列消費異常(已知),消息頭:{JsonUtils.Serialize(basicConsumer.BasicProperties.Headers)}{Environment.NewLine}原始消息:{body}{Environment.NewLine}錯誤:{errorMsg}");
                    }
                }
                catch (Exception ex)
                {
                    deadletter = new Deadletter() { Body = body, ErrorMsg = ex.Message };
                    _logger.LogError(ex, $"業務隊列消費異常(未知),消息頭:{JsonUtils.Serialize(basicConsumer.BasicProperties.Headers)}{Environment.NewLine}原始消息:{body}");
                }
                //必定應答,不管消費成功還是失敗
                channel.BasicAck(basicConsumer.DeliveryTag, false);
                //消費失敗,投遞消息至重試隊列
                if (deadletter != null)
                {
                    PublishRetry(deadletter, basicConsumer.BasicProperties.Headers);
                }
            };

 我們再看看PublishRetry重試隊列的推送方法如何實現

IBasicProperties properties = channel.CreateBasicProperties();
                properties.Persistent = true;
                //x-delay為延遲隊列的延遲時間
                //如果第一次進行重試,請求頭中是不存在延遲時間的,需要新增
                //因為可以進行多次重試,所以第二次時,就會存在延遲時間
                //但因為可以自建用於業務的延遲隊列,所以自建的延遲隊列,第一次重試也會存在x-delay,但是如果自建的延遲隊列失敗進行重試時,不能還使用自身的延遲時間,所以需要重新設置為系統默認的失敗重試時間
                if (!headers.ContainsKey("x-delay"))
                {
                    headers.Add("x-delay", 0);
                } 
                //重試次數
                int retryCount = Convert.ToInt32(headers["retryCount"]);
                //可以根據重試次數,實現上面說描述的微信回調的重試時間變長效果
                headers["x-delay"] = retryCount * 1000;
                properties.Headers = headers;
                channel.BasicPublish(RETRY_EXCHANGE_NAME, string.Empty, properties, Encoding.UTF8.GetBytes(JsonUtils.Serialize(deadletter)));

重試隊列的消費者實現

channel.BasicQos(0, 1, false); 
            EventingBasicConsumer eventingBasicConsumer = new EventingBasicConsumer(channel);
            eventingBasicConsumer.Received += async (sender, basicConsumer) =>
            {
                string message = Encoding.UTF8.GetString(basicConsumer.Body.ToArray());
                Deadletter deadletter = JsonUtils.Deserialize<Deadletter>(message); 
                IDictionary<string, object> headers = basicConsumer.BasicProperties.Headers;
                //請求頭中肯定會有如下參數,因為在框架代碼中已經內置
                //重試次數
                int retryCount = Convert.ToInt32(headers["retryCount"]);
                //原隊列類型,如果原隊列本身為延遲隊列,重試投遞的時候,必須也要為延遲隊列,只是不需要延遲時間,投遞回原隊列后,會立馬重新消費
                int queueType = Convert.ToInt32(headers["queueType"]);
                //原隊列名稱
                string retryExchangeName = Encoding.UTF8.GetString((byte[])headers["retryExchangeName"]);
                //原路由鍵
                string retryRoutingKey = Encoding.UTF8.GetString((byte[])headers["retryRoutingKey"]);
                if (retryCount <= 10)
                {
                    headers["retryCount"] = retryCount + 1;
                    //原有隊列為普通隊列,重新投遞時,也需要投遞為普通隊列類型
                    if (queueType == 0)
                    {
                        PublishMessage(retryExchangeName, retryRoutingKey, deadletter.Body, basicConsumer.BasicProperties.Headers);
                    }
                    //原有隊列為延遲隊列,重新投遞時,也需要投遞為延遲隊列類型
                    else
                    {
                        PublishMessage(retryExchangeName, retryRoutingKey, deadletter.Body, basicConsumer.BasicProperties.Headers, 0);
                    }
                }
                //超過重試最大次數不再處理,交由外部委托來處理死信
                else
                {
                    await deadLetterTask(retryExchangeName, deadletter.Body, deadletter.ErrorMsg);
                }
                //應答
                channel.BasicAck(basicConsumer.DeliveryTag, false);
            };
            //開啟監聽
            channel.BasicConsume(RETRY_QUEUE_NAME, false, eventingBasicConsumer);

然后在系統中,內置重試隊列消費者

//注冊框架內自動重試
            _rabbitMQClient.SubscribeRetry(async (exchangeName, message, errorMsg) =>
            {
                string content = $"原始交換機名稱:{exchangeName}{Environment.NewLine}" +
                             $"原始消息內容:{message}{Environment.NewLine}" +
                             $"錯誤消息:{errorMsg}";

                await PushWeChatMessage(content);
            });

 上述為我們MQ實現自動重試的一種方案,當然中間包括每次如果消費失敗都可以發送通知,來通知業務人員關注消費失敗的情況。可以自定義最大重試次數、重試間隔時間、死信的處理,這里僅僅是MQ重試機制的一種思路而已,大家如果有更好的方案,歡迎多多溝通。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM