ENode 2.0 - 介紹一下關於ENode中對Command的調度設計


CQRS架構,C端的職責是處理從上層發送過來的command。對於單台機器來說,我們如何盡快的處理command呢?本文想通過不斷提問和回答的方式,把我的思考寫出來。

首先,我們最容易想到的是使用多線程。那當我們要處理一個command時,能直接丟到線程池中,直接交給線程池去調度嗎?不行。因為假如多個command修改同一個聚合根時,會導致db的並發沖突,從而會導致command的不斷重試,大大降低了command的處理速度。

那該怎么解決呢?既然直接多線程處理會有並發沖突的問題,那就對command進行路由。那根據什么進行路由呢?考慮到大部分command都會指定要創建或修改哪個聚合根,也就是說會指定聚合根ID。所以,我們就很容易想到,可以根據聚合根ID進行路由。我們可以根據聚合根ID的hashcode得到一個long;然后我們系統啟動時,創建N個ConcurrentQueue。然后路由就很簡單了,我們可以對聚合根ID的hashcode對N取模,然后得到余數,然后該余數就是對應的queue的索引,然后我們把該command放入對應的queue即可。然后每個queue的消費者,有一個單線程在不斷的按次序逐個消費當前隊列里的command。這個方法,解決了並發沖突的問題。但是卻帶來了另一個問題,就是熱點數據的情況。

假如針對某個聚合根的command非常多。那可想而知,某個queue里的command可能都是針對該聚合根的command了。那這樣的話這個queue中就基本沒機會處理其他聚合根的command,導致其他聚合根的command的處理會被大大延遲。雖然,這種方式遵守了先到先處理的原則,但在這種熱點數據的情況下,非熱點的數據基本沒機會被處理了。設想,現在總共有100個queue可以用來路由,然后現在正好有100個熱點修改的聚合根,然后它們的command分別會被路由到對應的queue。這樣,我們不難理解,這100個queue中的command就大部分都被這100個聚合根的command占滿了。這樣導致的結果就是不熱點的聚合根的command可能會很晚才會被處理。那怎么樣做可以更好呢?

學過akka框架的人應該知道,akka中每個actor都有一個mailbox,mailbox就是一個queue,mailbox中存放所有需要被當前actor處理的消息。如果我們把actor理解為DDD中的聚合根的話,那就是我們可以為每個聚合根設計一個queue,只要是對同一個聚合根的command,都會被先放入queue。然后立即通知任務調度服務,處理當前的聚合根的mailbox。如果當前線程池有可用的線程,那就會立即處理當前的mailbox。如果當前的mailbox正好有command正在被處理中,那本次處理直接結束。然后mailbox中的每個command,在被處理完之后,如果存在下一個command,則會立即請求任務調度服務處理下一個command。

通過這樣的設計,我們本質上是為每個聚合根分配一個mailbox,即一個queue;然后,只要是對某個聚合根的command,總是先進入對應的mailbox。然后立即調用任務調度服務去處理當前的mailbox。這個設計,其實也是先到先處理的方式,因為先到的command,會先通知任務調度服務處理。但是不同的是,假如任務調度服務在處理某個mailbox時,如果當前mailbox有command正在被處理,那是會直接結束本次處理的。這樣的好處是,任務調度服務可以快速的去處理下一個任務。而原來那種純粹只設計固定的N個queue的方式,處理command的順序只能是先進先處理了。

關於mailbox的設計,還有另外一點需要注意的。就是假如mailbox中當前某個command處理完了,然后后面也沒有需要處理的command了,我們可以直接將當前mailbox標記為空閑狀態嗎?不可以。因為我們整個設計是無鎖的,也就是說,當我們判斷當前mailbox,發現沒有后續command需要處理,然后在將mailbox標記為空閑狀態前,可能正好又有一個新的command進入了mailbox,且我們可能立即調度了一個任務去處理該mailbox,但是由於當前的mailbox還是在忙的狀態,所以就直接結束了。這樣導致的后果是,前一個command的處理線程認為當前沒有下一個command需要處理;而新進來的command的處理線程認為當前的mailbox還在忙,所以也不處理當前command。所以最后導致這個command一直無法被處理了。直到再下一個command進入mailbox后才正常。雖然這個概率非常低,但在高並發的情況下,完全是很有可能的。

為了解決這個問題,我們可以這樣做:在判斷是否還有下一個command需要處理后,如果沒有,則先將mailbox的狀態標記為空閑。然后再判斷是否有需要處理的command,如果還是沒有,才是真正的沒有。如果又有了,則應該立即通知任務調度服務處理當前的mailbox。

下面,我們再回到前面說的N個固定隊列的設計思路。其實這個設計還有另一個問題,就是由於每個隊列只有一個線程在處理。也就是說,如果我們有100個隊列,且隊列的消費者是一個個處理command的。那就意味着某一個時刻,最多只能有100個領域事件產生;而對於持久化領域事件,我在之前的文章中有提到,可以通過group commit技術來提高持久化的性能。因為如果每個領域事件的持久化都訪問一次DB,那對DB的IO會太頻繁。最后會導致我們的架構的瓶頸會被卡在領域事件的持久化上。所以,這種情況下,我們如果用了group commit技術,那一次只能最多持久化100個事件。而且一般是達不到100的,因為當第一批100個事件一次性group commit持久化完成后,這100個線程才能去各自的隊列里拿下一個command來處理。假設我們group commit的定時間隔是10ms,那我們是很難在這10ms內又立即產生下一個100個事件的。為什么會這樣?估計各位看官是不是看的有點暈了,沒關系,其實我也快暈了,呵呵。

因為我們要保證在同一個隊列里的command的處理一定是按次序一個個處理的。也就是說,當前一個command沒處理完時,是不能處理下一個command的。否則就會出現我文章最開始提到的並發沖突的問題。所以,當當前的command產生事件,並通知事件持久化服務去持久化這個事件時,必須等在那里,直到事件成功持久化為止。

那么有沒有辦法,既能保證1)對同一個聚合根的修改是串行的(解決並發沖突問題),2)但是聚合根之間是並行的,3)且我們可以完美的配合group commit技術呢?有,就是上面的mailbox的設計。首先,mailbox保證了對同一個聚合根的所有command都是排隊的;其次,mailbox能保證它里面的所有的command的處理也是按次序的,也就是前一個command處理完之后才會處理下一個command。第三,對mailbox的處理,完全是交給任務調度服務來完成。也就是mailbox不關系自己被哪個線程處理,它只保證自己內部的command的處理順序即可。

假如我們的任務調度服務背后是一個最大有200個線程的線程池。那在高並發的情況下,意味着任意時刻,這個線程池中的200個線程都在工作,且每個線程都在處理一個command,且這些線程不需要等待當前自己處理的command產生的事件的持久化。它們只需要負責把事件丟到一個全局唯一的事件隊列即可。然后就可以開始處理下一個command handle task了。然后,這個全局唯一的事件隊列只有一個消費者線程,它每隔一定時間(比如20ms)持久化一次隊列里的事件。一次持久化多少個我們可以自己配置,通過SqlBulkCopy,我們可以達到很好的批量插入事件的效果。注意,這個配置值以及定時持久化的間隔值,不是拍腦袋想出來的,而是需要我們通過實測來得到。

由於,command handle task thread不會等待當前事件的持久化完成,所以它就可以被線程池回收,然后去處理下一個command handle task了。然后,假如某批事件被持久化完成了,則先將這些事件對應的command都標記為完成。比如,調用command的complete方法,complete方法內,可以進一步通知它所屬的mailbox去處理下一個可能存在的command。這一批command都通知完成后,就可以立即處理下一批要持久化的事件了。此時,我們知道,在高寫入的場景下,一定已經有足夠數量的下一批事件了,因為之前的command handle task thread早就已經去處理其他的command了。

所以,通過上面的設計,我們可以在解決command並發沖突的前提下,保證command產生的事件的批量持久化,且能做到每次持久化都有足夠多的事件可以被批量持久化。從而整體提高command的處理吞吐量。當然,如果我們的系統寫入數據的並發並不高,那是沒必要使用group commit技術的。因為group commit畢竟是定時觸發的,比如20ms一次。當然,group commit並不是一定要等待20ms才做一次,我們可以在前一次做完后,立即判斷接下來是否有事件需要被持久化,如果有,則立即開始下一次批量持久化。

另外一點需要討論的是,有些command是沒有指定聚合根ID的,比如有些新增聚合根的command,可能聚合根ID沒有在command上指定,而是在實例化聚合根時才產生。或者,還有一些command不是操作聚合根,而是可能調用外部系統的API。這種command,我們可以直接調用任務調度服務去處理即可。因為這種command不會產生對我們的系統內的聚合根的修改的並發沖突。

說了這么多的思考文字,看起來比較枯燥,下面我們來看看關鍵部分的代碼吧!

public void Process(ProcessingCommand processingCommand)
{
    if (string.IsNullOrEmpty(processingCommand.AggregateRootId))
    {
        _commandScheduler.ScheduleCommand(processingCommand);
    }
    else
    {
        var commandMailbox = _mailboxDict.GetOrAdd(processingCommand.AggregateRootId,
            new CommandMailbox(_commandScheduler, _commandExecutor, _loggerFactory));
        commandMailbox.EnqueueCommand(processingCommand);
        _commandScheduler.ScheduleCommandMailbox(commandMailbox);
    }
}

當一個command被處理時,我們先判斷其是否有聚合根ID,如果沒有,則直接交給commandScheduler(command處理任務調度服務)進行處理;如果有,則根據聚合根ID找到其mailbox,然后把當前command放入mailbox,然后通知commandScheduler處理該mailbox。接下來我們看看command mailbox的設計:

public class CommandMailbox
{
    private readonly ConcurrentQueue<ProcessingCommand> _commandQueue;
    private readonly ICommandScheduler _commandScheduler;
    private readonly ICommandExecutor _commandExecutor;
    private readonly ILogger _logger;
    private int _isRunning;

    public CommandMailbox(ICommandScheduler commandScheduler, ICommandExecutor commandExecutor, ILoggerFactory loggerFactory)
    {
        _commandQueue = new ConcurrentQueue<ProcessingCommand>();
        _commandScheduler = commandScheduler;
        _commandExecutor = commandExecutor;
        _logger = loggerFactory.Create(GetType().FullName);
    }

    public void EnqueueCommand(ProcessingCommand command)
    {
        command.SetMailbox(this);
        _commandQueue.Enqueue(command);
    }
    public bool MarkAsRunning()
    {
        return Interlocked.CompareExchange(ref _isRunning, 1, 0) == 0;
    }
    public void MarkAsNotRunning()
    {
        Interlocked.Exchange(ref _isRunning, 0);
    }
    public void CompleteCommand(ProcessingCommand processingCommand)
    {
        _logger.DebugFormat("Command execution completed. cmdType:{0}, cmdId:{1}, aggId:{2}",
            processingCommand.Command.GetType().Name,
            processingCommand.Command.Id,
            processingCommand.AggregateRootId);
        MarkAsNotRunning();
        RegisterForExecution();
    }
    public void RegisterForExecution()
    {
        _commandScheduler.ScheduleCommandMailbox(this);
    }
    public void Run()
    {
        ProcessingCommand currentCommand = null;
        try
        {
            if (_commandQueue.TryDequeue(out currentCommand))
            {
                _logger.DebugFormat("Start to execute command. cmdType:{0}, cmdId:{1}, aggId:{2}",
                    currentCommand.Command.GetType().Name,
                    currentCommand.Command.Id,
                    currentCommand.AggregateRootId);
                ExecuteCommand(currentCommand);
            }
        }
        finally
        {
            if (currentCommand == null)
            {
                MarkAsNotRunning();
                if (!_commandQueue.IsEmpty)
                {
                    RegisterForExecution();
                }
            }
        }
    }
    private void ExecuteCommand(ProcessingCommand command)
    {
        try
        {
            _commandExecutor.ExecuteCommand(command);
        }
        catch (Exception ex)
        {
            _logger.Error(string.Format("Unknown exception caught when executing command. commandType:{0}, commandId:{1}",
                command.Command.GetType().Name, command.Command.Id), ex);
        }
    }
}

command mailbox有一個狀態標記,表示當前是否正在處理command。這個狀態我們會通過原子鎖來更改。為了更好的說明問題,我先把commandScheduler的代碼也先貼出來:

public class DefaultCommandScheduler : ICommandScheduler
{
    private readonly TaskFactory _taskFactory;
    private readonly ICommandExecutor _commandExecutor;

    public DefaultCommandScheduler(ICommandExecutor commandExecutor)
    {
        var setting = ENodeConfiguration.Instance.Setting;
        _taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(setting.CommandProcessorParallelThreadCount));
        _commandExecutor = commandExecutor;
    }

    public void ScheduleCommand(ProcessingCommand command)
    {
        _taskFactory.StartNew(() => _commandExecutor.ExecuteCommand(command));
    }
    public void ScheduleCommandMailbox(CommandMailbox mailbox)
    {
        _taskFactory.StartNew(() => TryRunMailbox(mailbox));
    }

    private void TryRunMailbox(CommandMailbox mailbox)
    {
        if (mailbox.MarkAsRunning())
        {
            _taskFactory.StartNew(mailbox.Run);
        }
    }
}

從上面的代碼可以看到,當command mailbox被commandScheduler處理時,實際上就是創建了一個task,該task是調用TryRunMailbox方法,該方法先嘗試將當前的mailbox標記為運行狀態,如果當前已經在運行狀態,則不做任何處理;如果標記為運行狀態成功,則啟動一個任務去調用commandMailBox的Run方法。在Run方法內部,mailbox會嘗試取出一個需要被處理的command,如果有,就執行該command;如果沒有,則先將mailbox當前狀態標記為空閑狀態,然后再判斷一次mailbox中是否有需要處理的command,如果有,則通知commandScheduler處理自己;否則,不做任何處理。

最后,當一個command處理完成后,它會通知自己所屬的mailbox。然后mailbox通知commandScheduler處理自己。如下代碼所示:

public class ProcessingCommand
{
    private CommandMailbox _mailbox;

    public void SetMailbox(CommandMailbox mailbox)
    {
        _mailbox = mailbox;
    }
    public void Complete(CommandResult commandResult)
    {
        if (_mailbox != null)
        {
            _mailbox.CompleteCommand(this);
        }
    }
}

為了代碼的好理解,我去掉了這個類中一些無關的代碼。

好了,通過上面的文字介紹和代碼,我基本把我想表達的設計寫了出來。有點亂,我發現要把自己所想的東西表達清楚,還真不是一件容易的事情。希望通過這篇文章,能讓對ENode有興趣的朋友更好的理解ENode。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM