關於實現一個基於文件持久化的EventStore的核心構思


大家知道enode框架的架構是基於ddd+event sourcing的思想。我們持久化的不是聚合根的最新狀態,而是聚合根產生的領域事件。最近我在思考如何實現一個基於文件的eventstore。目標有兩個:

1.必須要高性能;
2.支持聚合根事件的並發持久化,要確保單個聚合根實例不會保存版本號相同的事件;

事件持久化高性能

經過了一番調研,發現用文件存儲事件非常合適。要確保高性能,我們可以順序寫文件(append),然后隨機讀文件。之所以要隨機讀文件是因為在當某些command由於操作同一個聚合根而遇到並發沖突的時候,框架需要獲取該聚合根的所有最新的事件,然后通過event sourcing重建出最新的聚合根,然后再重試這些遇到並發沖突的command。經過測試,順序寫文件和隨機讀文件都非常高效,每秒100W次順序寫和每秒10W次隨機讀在我的筆記本上不是問題;因為在enode中,domain是基於in-memory架構的,所以我們很少會從eventstore讀取事件。所以重點是要優化持久化事件的性能。而讀事件只有在command遇到並發沖突的時候或系統重啟的時候,才有可能需要從eventstore讀取事件。所以每秒10W次隨機讀取應該不是問題。當然,關於文件如何寫,見下面的遺留問題的分析。

另外一個就是刷磁盤的問題。我們知道,通過文件流寫入數據到文件后,如果不Flush文件流,那數據有可能還沒刷到磁盤。所以必須定時Flush文件流,出於性能和可靠性的權衡,選擇定時1s刷一次磁盤,通過異步線程刷盤。實際上,大部分NoSQL產品都是如此,比如Redis的fsync可以指定為每隔1s刷一次AOF日志到磁盤。這樣做唯一的問題是斷電后可能丟失1s的數據,但這個可以通過在服務器上配置UPS備用電源確保斷電后服務器還能工作,來確保斷電后還能支持足夠的時間確保我們把文件流的數據刷到磁盤。這樣既解決性能問題,也能保證不丟失數據。

事件並發控制

首先,每個聚合根實例有多個事件,每個時刻,每個聚合根可能都會產生多個事件然后要保存到eventstore中。為什么呢?因為我們的domain model所在的應用服務器一般是集群部署的,所以完全有可能同一個聚合根在不同的機器上在被同時在做不同的修改,然后產生的事件的版本號是相同的,從而就會導致並發修改同一個聚合根的情況了。

因此,我們主要要確保的是,對同一個聚合根實例,產生的事件如果版本號相同,則只能有一個事件能保存成功,其他的認為並發沖突,需要告訴外部有並發沖突了,然后由外部決定接下來該如何做。那么如何保證這一點呢?

前面說到,所有聚合根的事件都是順序的方式append到同一個文件,append事件到文件這個步驟本身沒辦法檢查是否有並發沖突,文件只能幫我們持久化數據,不負責檢查是否有並發沖突。那如何檢查並發沖突呢?思路就是在內存設計一個Dictionary,Dictionary的key為聚合根ID,value保存當前聚合根產生的事件的最大版本號,也就是最后一個事件的版本號。

然后有兩個辦法可以實現並發沖突的檢測:

  1. 所有的事件進入eventstore服務器后,先通過一個ConcurrentQueue進行排隊。所有事件並發進入ConcurrentQueue,然后ConcurrentQueue的消費者為單線程。然后我們在單線程內一個個取出ConcurrentQueue中的事件,然后根據Dictionary里的內容一個個判斷當前事件是否有版本沖突,如果沒沖突,則先將事件寫入文件,再更新Dictionary里當前聚合根的最大版本號;這個方式沒問題,只是效率不是非常高,因為這樣相當於對所有的聚合根實例的處理都線性化了。實際上,我們希望的是,只有對同一個聚合根實例的操作是線性化的,而對不同聚合根實例之間,完全可以並行處理;那怎么做呢?見第二種思路。
  2. 首先,所有的事件不必排隊了,可以並行處理。但是對於每一個聚合根實例的事件的處理,需要通過原子鎖的方式(CAS原理)做並發控制。關鍵思路是,通過一個字段存儲每個聚合根的當前版本號信息,版本號信息中設計一個狀態位用來控制同一時刻只能有一個線程在更改當前聚合根的版本信息。以此來實現對同一個聚合根的處理的線性化。然后,當前修改版本狀態成功的線程,能夠進一步做持久化事件的邏輯,但持久化事件之前還需要判斷當前事件的版本是否已經是老的版本了(當前事件的版本一定等於當前聚合根的最大版本號+1),以此來確保同一個聚合根的事件序列一定是連續遞增的。具體的實現思路見如下的demo代碼。

DEMO代碼示例、注解

/// <summary>一個結構體,記錄當前聚合根的當前版本號,以及用於並發控制的一些狀態信息
/// </summary>
class AggregateVersionInfo
{
    public const int Editing = 1;    //一個常量,表示當前聚合根的當前版本號正在被修改
    public const int UnEditing = 0;  //一個常量,表示當前聚合根的當前版本號未在被修改

    public int CurrentVersion = 0;   //記錄當前聚合根的當前版本號,初始值為0,其實就是事件的個數
    public int Status = UnEditing;   //默認狀態,未被修改
}
class Program
{
    static void Main(string[] args)
    {
        var aggregateCount = 4;                    //用於測試的聚合根的個數
        var eventCountPerAggregate = 10;           //單個聚合根產生的事件數
        var aggregateIdList = new List<string>();  //一個List,存放所有聚合根的ID
        var aggregateCurrentVersionDict = new ConcurrentDictionary<string, AggregateVersionInfo>();  //一個Dict,用於保存所有聚合根的當前版本信息
        var aggregateEventsDict = new Dictionary<string, IList<int>>();                      //一個Dict,用於模擬存儲每個聚合根的所有事件

        //先生成所有聚合根ID
        for (var index = 1; index <= aggregateCount; index++)
        {
            aggregateIdList.Add("key-" + index);
        }
        //初始化每個聚合根的當前狀態
        foreach (var aggregateId in aggregateIdList)
        {
            aggregateCurrentVersionDict[aggregateId] = new AggregateVersionInfo();
            aggregateEventsDict[aggregateId] = new List<int>();
        }

        //該方法用於實現事件的並發沖突檢測和持久化邏輯。
        Action<string, int> persistEventAction = (aggregateId, currentEventVersion) =>
        {
            var aggregateVersionInfo = aggregateCurrentVersionDict[aggregateId];
            var originalStatus = Interlocked.CompareExchange(
                ref aggregateVersionInfo.Status,
                AggregateVersionInfo.Editing,
                AggregateVersionInfo.UnEditing);

            //這里兩者不相等,說明aggregateVersionInfo.Status成功更新為Editing了
            if (originalStatus != aggregateVersionInfo.Status)
            {
                if (currentEventVersion == aggregateVersionInfo.CurrentVersion + 1)
                {
                    //這里,將事件加入到一個List,真實的eventstore會在這里持久化事件到文件;
                    aggregateEventsDict[aggregateId].Add(currentEventVersion);
                    //更新聚合根的最新版本
                    aggregateVersionInfo.CurrentVersion++;
                }
                else
                {
                    //進入這里,說明有別的線程已經添加了該版本,也就是遇到並發沖突了。
                }

                //處理完后,將聚合根的版本狀態修改回UnEditing
                Interlocked.Exchange(ref aggregateVersionInfo.Status, AggregateVersionInfo.UnEditing);
            }
            else
            {
                //進入這里,說明有別的線程正在更改當前聚合根的版本信息,也可以認為是遇到並發沖突了。
            }
        };

        //該方法用於模擬並行產生事件並調用事件的持久化邏輯
        Action generateEventAction = () =>
        {
            foreach (var aggregateId in aggregateIdList) //循環處理每個聚合根
            {
                //對每個聚合根產生指定個數的事件,為了簡化,僅使用事件版本號表示事件了
                for (var eventVersion = 1; eventVersion <= eventCountPerAggregate; eventVersion++)
                {
                    for (var i = 0; i < 100000; i++) //這里純粹為了性能測試,對每個事件再循環10W次調用持久化邏輯
                    {
                        persistEventAction(aggregateId, eventVersion); //調用持久化方法持久化聚合根的當前事件
                    }
                }
            }    
        };

        var watch = Stopwatch.StartNew();
        //模擬同時4個線程同時產生事件並持久化,這里其實只要開2個夠了,因為我的筆記本只有2個核
        Parallel.Invoke(generateEventAction, generateEventAction, generateEventAction, generateEventAction);
        watch.Stop();
        var time = watch.ElapsedMilliseconds;

        //最后輸出結果,輸出總運行時間,以及驗證每個聚合根的當前版本以及聚合根的每個事件的版本是否是順序逐個遞增的。
        Console.WriteLine("total time:{0}ms", time);
        foreach (var aggregateId in aggregateIdList)
        {
            Console.WriteLine("aggregateId:{0}, currentVersion:{1}, events:{2}",
                aggregateId,
                aggregateCurrentVersionDict[aggregateId].CurrentVersion,
                string.Join(",", aggregateEventsDict[aggregateId].ToArray()));
        }

        Console.ReadLine();
    }
}

DEMO運行結果及分析

從上圖可以看出,開啟4個線程,並行操作4個聚合根,每個聚合根產生10個不同版本的事件(事件版本號連續遞增),每個事件重復產生10W次,只花了大概1s時間。另外,最后每個聚合根的當前版本號以及所對應的事件也都是正確的。所以,可以看出,性能還不錯。4個線程並行處理,每秒可以處理400W個事件(當然實際肯定沒這么高,這里是因為大部分處理都被CompareExchange方法判斷掉了。所以,只有沒並發的情況,才是理想情況下的最快的性能點,因為每個事件都會做持久化和更新當前版本的邏輯,上面的代碼主要是為了驗證並發情況下是否會產生重復版本的事件這個功能。),且能保證不會持久化重復版本的事件。明天有空把持久化事件替換為真實的寫文件流的方式,看看性能會有多少,理論上只要寫文件流夠快,那性能應該依舊很高。

遺留問題

上面還有一個問題我還沒提及,那就是光用一個文件來存儲所有的事件還不夠的,我們還需要一個文件存儲每個事件在文件中的位置和長度,否則我們沒辦法知道每個事件存儲在文件的哪里。也就是在當事件寫入到文件后,我們需要知道當前寫入的起始位置,然后我們可以將這個起始位置信息再寫入到另一個相當於索引作用的文件。這個問題下次有機會在詳細分析吧,總體思路和淘寶開源的高性能分布式消息隊列metaq的消息存儲架構非常相似。淘寶的metaq之所以能高性能,很大一方面原因也是設計為順序寫文件,隨機讀文件的思路。如下圖所示:

 

上圖中的commitlog文件相當於我上面提到的用來存儲事件的文本文件,commitlog在metaq消息隊列中是用來存儲消息的。index文件相當於用來存儲事件在commitlog中的位置和長度。在metaq中,則是用來存儲消息在commitlog中的位置和長度。所以,從存儲結構的角度來看,metaq的消息存儲和eventstore的事件存儲的結構一致;但不一樣的是,metaq在存儲消息時,不需要做並發控制,所有消息只要append消息到commitlog即可,所有的index文件也只要append寫入即可,關於metaq具體更詳細的設計我還沒深入研究,有興趣的朋友也可以和我交流。而eventstore則必須對事件的版本號做並發控制,這是最大的區別。另外,實際上,事件的索引信息可以只需要維護在內存中即可,因為這些索引信息在eventstore啟動時總是可以通過commitlog還原出來。當然我們維護一份Index文件也可以,只是會增加事件持久化時的復雜度,這里到底是否需要這個Index文件,我需要再研究下metaq后才能更進一步明確。

關於使用LevelDB的思考

在調研的過程中,無意中發現LevelDB的插入性能非常高。它是由Google的MapReduce和BigTable的作者設計的一個基於key/value結構的輕量級的非常高效的開源的NoSQL數據庫。它能夠支持10億級別的數據量存儲。LevelDB 是單進程的服務,性能非常之高,在一台4個Q6600的CPU機器上,每秒鍾寫數據超過40w,而隨機讀的性能每秒鍾超過10w,足見性能之高。正因為他的高效,所以現在很多其他NoSQL都使用它來作為底層的數據持久化,比如淘寶的Tair支持用LevelDB來持久化緩存數據。所以有時間研究下LevelDB的設計與實現非常有必要。但是LevelDB只提供最簡單的key/value的操作。對於順序插入事件的需求,可以調用LevelDB的put操作。但是這里的put操作不支持並發沖突的檢測,也就是如果連續put了兩個key相同的value,則前一個value就會被后一個value所覆蓋,這不是我們想要的。所以我們如果使用LevelDB,對於同一個聚合根不能有兩個版本號相同的事件這個需求仍然需要我們自己來保證,可以通過上面DEMO中的思路來實現。也就是說,我們僅僅用LevelDB來代替日志。其實這樣已經省去我們很多的工作量,因為我們自己寫日志以及記錄每個事件的存儲位置和長度不是一件容易的事情,要求對算法和邏輯非常嚴密,否則只要一個bit錯位了,可能讀取出來的所有數據都錯了。而LevelDB幫我們完成了最復雜和頭疼的事情了。但不幸的是,LevelDB沒有官方的windows版本。我能找到.net平台下的實現,但要在生產環境使用,還是要多做很多驗證才行。另外,如果要用LevelDB來持久化事件,那我們的key可以是聚合根ID+事件版本號的字符串拼接。這點應該不難理解吧!

結束語

這篇文章洋洋灑灑,都是思路性的東西,希望大家看了不會枯燥,呵呵。歡迎大家提出自己的意見和建議!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM