ENode 1.0 - 消息的重試機制的設計思路


項目開源地址:https://github.com/tangxuehua/enode

上一篇文章,簡單介紹了enode框架中消息隊列的設計思路,本文介紹一下enode框架中關系消息的重試機制的設計思路。

對於一個EDA架構為基礎的框架,核心就是消息驅動,然后基於最終一致性的原則。所以,非常重要的一點是,如果消息一次執行不成功,那該怎么辦?我能想到的對策就是消息的重試。我發現,這篇文章比較難寫,因為感覺要把復雜的事情清晰的表達出來,感覺確實不容易。說到重試,那什么是消息的重試呢?怎么重試呢?我這里提到的重試是指,一個消息,從消息隊列取出來后,要處理,但是處理失敗了,然后要重新嘗試再處理該消息;怎么重試?這個問題比較復雜,不能用簡單的一兩句話來說明。

上面說到,如果消息處理失敗要再重試,其實是一個比較粗的回答。因為比如一個消息在處理的時候總共有5個步驟,如果前2步都成功,但是第3步失敗了,那重試的時候,前2步還需要再執行嗎?我的想法是,在能辦到的情況下,就不要再做前2步操作了,而是直接從第3步開始重試。所以說,這種做法相當於是“哪里跌倒,哪里繼續”;

那么怎么重試呢?

經過分析,我們發現整個enode框架中需要重試的點是非常多的,比如command產生的event要發送到隊列時,如果失敗那需要重試;比如event持久化時失敗了,也需要重試,等等。所以,顯而易見,我們應該設計一個可以被重用的重試服務,提供對某些特定的重試場景的支持。

我們先來想一下,我們希望有什么樣的重試功能。以“event持久化時失敗”為例,如果這一步失敗,我們希望立馬對這個步驟重試幾次,比如3次,如果3次內成功了,那就成功了,繼續往下做下面的邏輯;如果還是失敗了呢?我們難道就放棄了嗎?實際上,我們不能放棄,因為一般如果事件持久化失敗很有可能是由於網絡問題或eventstore有什么問題,而且如果我們就這樣放棄了,那很可能整個業務邏輯的流程就被中斷了,這樣就無法做到數據的最終一致性了。所以,因為這種暫時的IO問題導致的失敗,我們不能隨便就放棄重試,應該在嘗試幾次重試仍失敗時采取必要的手段,可以在IO恢復時,能自動再處理該消息;但是我們又不能使用當前線程無限制的重試下去,因為這樣就導致沒辦法處理其他的消息了;所以我們自然就能想到:我們應該在消息重試幾次仍失敗時,將該消息放入一個專門的重試隊列,然后有另外一個獨立的線程會定時從該隊列取出要重試的消息,然后重試這些消息;這樣,當IO恢復時,這些消息就能很快被成功處理了;

另外一個問題,那這種專門的重試隊列需要支持消息持久化嗎?不用,我們只需要內存隊列就行了,因為當一個消息還沒被完全成功處理前,是不會從message store刪除的;所以,就算機器重啟了,該消息還是能在該機器重啟后被處理的;而當該機器沒重啟時,該專門重試的內存隊列會不斷地以獨立的線程定時重試該消息;

那這種專門的重試隊列需要多少個呢?理論上我們可以為每個需要重試的點都設計一個重試隊列來支持,但是這樣一方面過於復雜,而且線程多了還會影響系統的性能;所以我們需要權衡一下,只對同一個階段中要做的所有的事情設計一個重試隊列,該階段中這些要做的事情中有任何一步失敗,就都放到該階段對應的重試隊列里;

還有一個問題,如果一個消息在某一次重試時成功了,但是我們希望在成功后繼續對該消息做后續的步驟,該如何實現呢?這個問題初想想感覺比較麻煩,因為我們可能已經沒有了該消息的一些上下文環境。最重要的是,我們如何知道該消息重試成功后接下來該做什么呢?而且就算知道接下來要做什么了,但是要是我們在做這個下一步的步驟時,要是又失敗了呢?是不是也要重試呢?所以,我們發現這里很關鍵。

經過我的一些思考,我發現,如果一個消息在某個階段要被處理多個步驟,且有些步驟之間有條件依賴,比如只有在第2步處理的結果是成功時,我們才有必要做后面的3步;正常情況,如果一切順利,那就是一步步從上往下的去做;但是因為考慮到任何一步可能都會出問題,而且我們希望在任何一步失敗然后重試成功后,能繼續后續的步驟。所以,基於這些特征,我覺得我們可以設計一種類似回調函數的機制,當某個邏輯執行成功后,執行回調函數,我們可以在回調函數中存放接下來要做的邏輯;顯然,我覺得我們需要某種遞歸的數據結構;為了支持上面這種類似回調函數的需求,我設計了如下的一個數據結構:

    /// <summary>一個數據結構,封裝了一段要執行的邏輯以及一些相關的上下文信息
    /// </summary>
    public class ActionInfo
    {
        /// <summary>表示某個Action的名字
        /// </summary>
        public string Name { get; private set; }
        /// <summary>表示某個Action,封裝了一段邏輯
        /// </summary>
        public Func<object, bool> Action { get; private set; }
        /// <summary>表示Action執行時所需要的數據信息
        /// </summary>
        public object Data { get; private set; }
        /// <summary>表示Action執行成功后,要執行的下一個Action的信息,這里體現出遞歸
        /// </summary>
        public ActionInfo Next { get; private set; }

        public ActionInfo(string name, Func<object, bool> action, object data, ActionInfo next)
        {
            if (action == null)
            {
                throw new ArgumentNullException("action");
            }
            Name = name;
            Action = action;
            Data = data;
            Next = next;
        }
    }

從上面的代碼,我們可以清晰的看到,我們設計了一個簡單的數據結構,用來包含要執行的邏輯,該邏輯執行時所需要的參數信息,以及該邏輯執行成功后要做的下一個邏輯;通過上面這個數據結構,我們已經為實現上面的重試需求做好了數據結構方面的准備;

接下來,我們需要想想,如何設計一個重試服務。經過上面的分析,我們只要,我們的重試服務需要兩個主要功能:1)對某段邏輯連續重試指定次數;2)將某段邏輯放入重試隊列定時重試;對於第一個功能需求,比較簡單,直接設計一個遞歸函數即可,代碼如下:

        public bool TryAction(string actionName, Func<bool> action, int maxRetryCount)
        {
            return TryRecursively(actionName, (x, y, z) => action(), 0, maxRetryCount);
        }
        private bool TryRecursively(string actionName, Func<string, int, int, bool> action, int retriedCount, int maxRetryCount)
        {
            var success = false;
            try
            {
                success = action(actionName, retriedCount, maxRetryCount);
                if (retriedCount > 0)
                {
                    _logger.InfoFormat("Retried action {0} for {1} times.", actionName, retriedCount);
                }
            }
            catch (Exception ex)
            {
                _logger.Error(string.Format("Exception raised when tring action {0}, retrid count {1}.", actionName, retriedCount), ex);
            }

            if (success)
            {
                return true;
            }
            else if (retriedCount < maxRetryCount)
            {
                return TryRecursively(actionName, action, retriedCount + 1, maxRetryCount);
            }
            else
            {
                return false;
            }
        }

調用的代碼示例如下:

    if (_retryService.TryAction("TrySendEvent", () => TrySendEvent(eventStream), 3))
    {
        FinishExecution(command, queue);
    }

簡單說明一下:

當我們要重試時,我們首先調用retryService的TrtAction方法,該方法就是用來支持“對某段邏輯的指定次數的連續重試”。該方法的第一個參數是一個字符串,表示要執行的邏輯的名稱,這個名稱沒什么實際用途,只是幫助我們區分當前在執行的邏輯是哪段邏輯,該名稱會在記錄日志時使用,方便我們后續通過日志分析到底是哪里出錯了,或者重試過了;然后第二個參數表示要重試的某個委托;當然,因為我們要知道該委托內部的邏輯是否處理成功,所以需要一個布爾類型的返回值;最后一個參數則是指定需要連續重試多少次,上面的示例代碼表示:先執行指定邏輯,如果失敗,則連續重試3次;所以,如果每次都失敗,相當於總共會執行4次;上面的代碼應該不難理解,就不多分析了;

接下來分析一下第一個需求“將某段邏輯放入重試隊列定時重試”:

當連續重試還是失敗后,我們就會放入內存隊列,然后定時重試了。那么如何定時呢?一般用定時器即可;那定時多少呢?這個目前我也是拍腦袋的,目前設定為5秒。為什么是5秒呢?主要是兩個考慮:1)為了不希望太頻繁的重試,因為太頻繁的重試會占用更多的系統資源,導致會影響框架中正常的消息處理性能;2)因為這種定時的重試對實時性一般不會很高,就是說,比如當IO恢復后,我們一般不會要求馬上就能重試,過個幾秒甚至幾分鍾后再重試,也能接受。實際上,如果沒有這種自動定時的重試機制,我們可能只能等到機器重啟后才能再次被重試了,相比之下,已經非常自動和及時了。

所依,總結一下,我們需要:1)定時器,用於定時執行;2)ActionInfo包裝要重試的邏輯的相關信息;3)內存隊列,用於存放ActionInfo;所以,代碼如下:

public class DefaultRetryService : IRetryService
    {
        private const long DefaultPeriod = 5000;
        private BlockingCollection<ActionInfo> _retryQueue = new BlockingCollection<ActionInfo>(new ConcurrentQueue<ActionInfo>());
        private Timer _timer;
        private ILogger _logger;
        private bool _looping;

        public DefaultRetryService(ILoggerFactory loggerFactory)
        {
            _logger = loggerFactory.Create(GetType().Name);
            _timer = new Timer(Loop, null, 0, DefaultPeriod);
        }

        public void Initialize(long period)
        {
            _timer.Change(0, period);
        }
        public void RetryInQueue(ActionInfo actionInfo)
        {
            _retryQueue.Add(actionInfo);
        }

        private void Loop(object data)
        {
            try
            {
                if (!_looping)
                {
                    _looping = true;
                    RetryAction();
                    _looping = false;
                }
            }
            catch (Exception ex)
            {
                _logger.Error("Exception raised when retring action.", ex);
                _looping = false;
            }
        }
        private void RetryAction()
        {
            var actionInfo = _retryQueue.Take();
            if (actionInfo != null)
            {
                var success = false;
                try
                {
                    success = actionInfo.Action(actionInfo.Data);
                    _logger.InfoFormat("Executed action {0} from queue.", actionInfo.Name);
                }
                catch (Exception ex)
                {
                    _logger.Error(string.Format("Exception raised when executing action {0}.", actionInfo.Name), ex);
                }
                finally
                {
                    if (success)
                    {
                        if (actionInfo.Next != null)
                        {
                            _retryQueue.Add(actionInfo.Next);
                        }
                    }
                    else
                    {
                        _retryQueue.Add(actionInfo);
                    }
                }
            }
        }
    }

經過上面的分析后,相信大家看代碼都應該能理解了。需要注意的點:

  1. 我用了BlockingCollection,這是一個支持並發且支持阻塞的基於publish-consumer模式的集合,而且這里,該集合內部封裝了ConcurrentQueue,所以,他也是一個隊列;這樣設計的好處是,在隊列中沒有元素的時候,線程會被卡住,從而不會浪費資源;只有當隊列中有元素時,才會在當天timer周期到來時,能夠從隊列取出要重試的ActionInfo,然后進行重試操作。
  2. Timer的周期默認設置為5秒,那么,我們為了避免同一時刻,有兩個ActionInfo在被同時處理,我加了一個標記位_looping,當當前有ActionIno正在被處理時,則該標記位為true,否則為false。通過該標記位,我們能確保隊列中的元素會一個個按順序被處理,這樣就不會混亂,導致莫名其妙的bug出現;
  3. 從上面的RetryAction方法中,我們可以看出,當當前的ActionInfo處理成功后,如果下一個ActionInfo存在(Next屬性不等於空),則把下一個ActionInfo放入重試隊列,等待被處理;通過這樣的設計,我們能夠以非常統一的方式重試用戶希望重試的ActionInfo以及這些ActionInfo重試成功后的回調ActionInfo。另外,如果當前ActionInfo執行失敗,則仍然將當前ActionInfo再放回隊列,繼續重試;

下面我們看一個簡單的調用示例吧:

        private void CommitAggregate(AggregateRoot dirtyAggregate, ICommand command, IMessageQueue<ICommand> queue)
        {
            var eventStream = BuildEvents(dirtyAggregate, command);

            if (_retryService.TryAction("TrySendEvent", () => TrySendEvent(eventStream), 3))
            {
                FinishExecution(command, queue);
            }
            else
            {
                _retryService.RetryInQueue(
                    new ActionInfo(
                        "TrySendEvent",
                        (obj) => TrySendEvent(obj as EventStream),
                        eventStream,
                        new ActionInfo(
                            "SendEventSuccessAction",
                            (obj) =>
                            {
                                var data = obj as dynamic;
                                var currentCommand = data.Command as ICommand;
                                var currentQueue = data.Queue as IMessageQueue<ICommand>;
                                FinishExecution(currentCommand, currentQueue);
                                return true;
                            },
                            new { Command = command, Queue = queue },
                            null)));
            }
        }

說明:

上面的代碼是在一個command執行完成后對於產生的事件,框架要提交該聚合根產生的事件;通過BuildEvents方法獲取聚合根上產生的事件,然后我們接下來是嘗試將該事件發送到一個事件隊列,但是因為該事件隊列在消息入隊時會持久化消息,也就是會有IO操作,所以就可能失敗,所以我們先嘗試執行一次,如果失敗則立馬連續嘗試重試3次,如果這4次中任意一次成功了,則做成功的邏輯,上例是調用FinishExecution方法;如果這4次都失敗,則進入else的邏輯,即放入隊列定時重試,但是我們希望在放入隊列重試時如果某一次重試成功了也需要保證能調用FinishExecution方法,所以也定義了一個回調的ActionInfo。最后,為了盡量讓每個ActionInfo所需要的參數信息語義明確,避免語言層面的閉包等復雜難理解的問題,我們盡量將ActionInfo中的Action所需要的參數信息明確的設置到ActionInfo上,而不是從外層的函數中拿,從外層的函數中拿,要是再多線程時,容易出現問題,而且也容易引起代碼修改導致的難以檢查出來的閉包問題;當然,這里,大家可以看到我使用了匿名對象,我是偷懶了,如果希望性能更高,則可以顯示定義一個類來封裝需要的參數信息;

總結:

本文通過代碼加思路的方式大概介紹了enode框架中關於消息重試的設計思路。但是我沒有介紹enode中到底哪些點會用到重試機制,有很多,至少五六個地方吧。但我覺得這不是重點了,重點是上面我分析的一些思路,具體需要重試的場景是偏業務性質了,涉及到enode框架中從command開始處理到最后event被發布到query端的整個過程中的每個關鍵的環節。我覺得通過本文的分析,可以幫助想看代碼的朋友更容易理解enode中關於重試方面的代碼,這樣就夠了;關於重試方面,還有一個點沒有說,就是command的重試,關於這一點,和本文提到的重試有點不同,我准備專門寫一篇文章介紹一下吧。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM