最近打算用C#實現一個基於文件的EventStore。
什么是EventStore
關於什么是EventStore,如果還不清楚的朋友可以去了解下CQRS/Event Sourcing這種架構,我博客中也有大量介紹。EventStore是在Event Sourcing(下面簡稱ES)模式中,用於存儲事件用的。從DDD的角度來說,每個聚合根在自己的狀態發生變化時都會產生一個或多個領域事件,我們需要把這些事件持久化起來。然后當我們需要恢復聚合根的最新狀態到內存時,可以通過ES這種技術,從EventStore獲取該聚合根的所有事件,然后重演這些事件,就能將該聚合根恢復到最新狀態了。這種技術和MySQL的Redo日志以及Redis的AOF日志或者leveldb的WAL日志的原理是類似的。但是區別是,redo/AOF/WAL日志是Command Sourcing,而我們這里說的是Event Sourcing。關於這兩個概念的區別,我不多展開了,有興趣的朋友可以去了解下。
為什么要寫一個EventStore
目前ENode使用的EventStore,是基於關系型數據庫SqlServer的。雖然功能上完全滿足要求,但是性能上和數據容量上,離我的預期還有一些距離。比如:
- 關於性能,雖然可以通過SqlBulkCopy方法,實現較大的寫入吞吐,但是我對EventStore的要求是,需要支持兩個唯一索引:1)聚合根ID+事件版本號唯一;2)聚合根ID+命令ID唯一;當添加這兩個唯一索引后,會很大影響SqlBulkCopy寫入數據的性能;而且SqlBulkCopy只有SqlServer才有,其他數據庫如MySQL沒有,這樣也無形之中限制了ENode的使用場景;
- 關於使用場景,DB是基於SQL的,他不是簡單的幫我們保存數據,每次寫入數據都要解析SQL,執行SQL,寫入RedoLOG,等;另外,DB還要支持修改數據、通過SQL查詢數據等場景。所以,這就要求DB內部在設計存儲結構時,要兼顧各種場景。而我們現在要實現的EventStore,針對的場景比較簡單:1)追求高吞吐的寫入,沒有修改和刪除;2)查詢非常少,不需要支持復雜的關系型查詢,只需要能支持查詢某個聚合根的所有事件即可;所以,針對這種特定的使用場景,如果有針對性的實現一個EventStore,我相信性能上可以有更大的提升空間;
- 關於數據量,一個EventStore可能需要存儲大量的事件,百億或千億級別。如果采用DB,那我們只能進行分庫分表,因為單表能存儲的記錄數是有限的,比如1000W,超過這個數量,對寫入性能也會有一定的影響。假設我們現在要存儲100億事件記錄,單表存儲1000W,那就需要1000個表,如果單個物理庫中分100個表,那就需要10個物理庫;如果將來數據量再增加,則需要進一步擴容,那就需要牽涉到數據庫的數據遷移(全量同步、增量同步)這種麻煩的事情。而如果是基於文件版本的EventStore,由於沒有表的概念了,所以單機只要硬盤夠大,就能存儲非常多的數據。並且,最重要的,性能不會因為數據量的增加而下降。當然,EventStore也同樣需要支持擴容,但是由於EventStore中的數據只會Append寫入,不會修改,也不會刪除,所以擴容方案相對於DB來說,要容易做很多。
- 那為何不使用NoSQL?NoSQL一般都是為大數據、可伸縮、高性能而設計的。因為通常NoSQL不支持上面第一點中所說的二級索引,當然一些文檔型數據庫如MongoDB是支持的,但是對我來說是一個黑盒,我無法駕馭,也沒有使用經驗,所以沒有考慮。
- 從長遠來看,如果能夠自己根據自己的場景實現一個有針對性的EventStore,那未來如果出現性能瓶頸的問題,自己就有足夠的能力去解決。另外,對自己的技術能力的提高也是一個很大的鍛煉機會。而且這個做好了,說不定又是自己的一個很好的作品,呵呵。所以,為何不嘗試一下呢?
EventStore的設計目標
- 要求高性能順序寫入事件;
- 要求嚴格判斷聚合根的事件是否按版本號順序遞增寫入;
- 支持命令ID的唯一性判斷;
- 支持大量事件的存儲;
- 支持按照聚合根ID查詢該聚合根的所有事件;
- 支持動態擴容;
- 高可用(HA),需要支持集群和主備,二期再做;
EventStore核心問題描述、問題分析、設計思路
核心問題描述
一個EventStore需要解決的核心問題有兩點:1)持久化事件;2)持久化事件之前判斷事件版本號是否合法、事件對應的命令是否重復。一個事件包含的信息如下:
- 聚合根ID
- 事件版本號
- 命令ID
- 事件內容
- 事件發生時間
為什么是這些信息?
本文所提到的事件是CQRS架構中,由C端的某個命令操作某個聚合根后,導致該聚合根的狀態發生變化,然后每次變化都會產生一個對應的事件。所以,針對聚合根的每個事件,我們關注的信息就是:哪個命令操作哪個聚合根,產生了什么版本號的一個事件,事件的內容和產生的時間分別是什么。
事件的版本號是什么意思?
由於一個聚合根在生命周期內經常會被修改,也就是說經常會有命令去修改聚合根的狀態,而每次狀態的變化都會產生一個對應的事件,也就是說一個聚合根在生命周期內會產生多個事件。聚合根是領域驅動設計(DDD)中的一個概念,聚合根是一個具有全局唯一ID的實體,具有獨立的生命周期,是數據強一致性的最小邊界。為了保證聚合根內的數據的強一致性,針對單個聚合根的任何修改都必須是線性的,因為只有線性的操作,才能保證當前的操作所基於的聚合根的狀態是最新的,這樣才能保證聚合根內數據的完整性,總是滿足業務規則的不變性。關於線性操作這點,就像對DB的一張表中的某一條記錄的修改也必須是線性的一樣,數據庫中的同一條記錄不可能同時被兩個線程同時修改。所以,分析到這里,我們知道同一個聚合根的多個事件的產生必定是有先后順序的。那如何保證這個先后順序呢?答案是,在聚合根上設計一個版本號,通過版本號的順序遞增來保證對同一個聚合根的修改也總是線性依次的。這個思路其實就是一種樂觀並發控制的思路。聚合根的第一個事件的版本號為1,第二個事件的版本號為2,第N個事件的版本號為N。當第N個事件產生時,它所基於的聚合根的狀態必須是N-1。當某個版本號為N的事件嘗試持久化到EventStore時,如果EventStore中已經存在了一個版本號為N的事件,則認為出現並發沖突,需要告訴上層應用當前事件持久化遇到並發沖突了,然后上層應用需要獲取該聚合根的最新狀態,然后再重試當前命令,然后再產生新的版本號的事件,再持久化到EventStore。
希望能自動檢測命令是否重復處理
CQRS架構,任何聚合根的修改都是通過命令來完成的。命令就是一個DTO,當我們要修改一個聚合根的狀態時,就發送一個命令到分布式MQ即可,然后MQ的消費者處理該命令。但是大家都知道任何分布式MQ一般都只能做到至少投遞一次(At Least Once)的消息投遞語義。也就是說,一個命令可能會被消費者重復處理。在有些情況下,某個聚合根如果重復處理某個命令,會導致聚合根的最終狀態不正確,比如重復扣款會導致賬號余額不正確。所以,我們希望在框架層面能支持命令的重復處理的檢測。那最理想的檢測位置在哪里呢?如果是傳統的DB,我們會在數據庫層面通過建立唯一索引保證命令絕對不會重復執行。那對應到我們的EventStore,自然也應該在EventStore內部檢測。
核心問題分析
通過上面的問題描述,我們知道,其實一個EventStore需要解決的問題就兩點:1)以文件的形式持久化事件;2)持久化之前判斷事件的版本號是否沖突、事件的命令是否重復。
關於第一點,自然是通過順序寫文件來實現,機械硬盤在順序寫文件的情況下,性能也是非常高的。寫文件的思路非常簡單,我們可以固定單個文件的大小,比如512MB。然后先寫第一個文件,寫滿后新建一個文件,再寫第二個,后面以此類推。
關於第二點,本質上是兩個索引的需求:a. 聚合根ID+事件版本號唯一(當然,這里不僅要保證唯一,還要判斷是否是連續遞增);b. 聚合根ID + 命令ID唯一,即針對同一個聚合根的命令不能重復處理;那如何實現這兩個索引的需求呢?第一個索引的實現成本相對較低,我們只需要在內存維護每個聚合根的當前版本號,然后當一個事件過來時,判斷事件的版本號是否是當前版本號的下一個版本號即可,如果不是,則認為版本號非法;第二個索引的事件成本比較高,我們必須維護每個聚合根的所有產生的事件對應的命令的ID,然后在某個事件過來時,判斷該事件對應的命令ID是否和已經產生的任何一個事件的命令ID重復,如果有,則認為出現重復。所以,歸根結底,當需要持久化某個聚合根的事件時,我們需要加載該聚合根的所有已產生的事件的版本號以及事件對應的命令ID到內存,然后在內存進行判斷,從而檢查當前事件是否滿足這兩個索引需求。
好了,上面是基本的也是最直接的解決問題的思路了。但是我們不難發現,要實現上面這兩個問題並不容易。因為:首先我們的機器的內存大小是有限的,也就是說,無法把所有的聚合根的事件的索引信息都放在內存。那么當某個聚合根的事件要持久化時,發現內存中並無這個聚合根的事件索引時,必然要從磁盤中加載該聚合根的事件索引。但問題是,我們的事件由於為了追求高性能的寫入到文件,總是只是簡單的Append追加到最后一個文件的末尾。這樣必然導致某個聚合根的事件可能分散在多個文件中,這樣就給我們查找這個聚合根的所有相關事件帶來了極大的困難。那該如何權衡的去設計這兩個需求呢?
我覺得設計是一種權衡,我們總是應該根據我們的實際業務場景去有側重點的進行設計,優先解決主要問題,然后次要問題盡量去解決。就像leveldb在設計時,也是側重於寫入時非常簡單快速,而讀取時,可能會比較迂回曲折。EventStore,是非常典型的高頻寫入但很少讀取的系統。但寫入時需要保證上述的兩個索引需求,所以,應該說這個寫入的要求比leveldb的寫入要求還要高一些。那我們該如何去權衡呢?
EventStore核心設計思路
- 在內存中維護每個聚合根的版本索引eventVersion,eventVersion中維護了當前聚合根的所有的版本、每個版本對應的cmdId,以及每個版本的事件在event文件中的物理位置;當一個事件過來時,通過這個eventVersion來判斷version,cmdId是否合法(version必須是currentVersion+1,cmdId必須唯一);
- 當寫入一個事件時,只寫入一個文件,event.file文件;假設一個文件的大小為512MB,一個事件的大小為1KB,則一個文件大概存儲52W個事件;
- 一個event.file文件寫滿后:
- 完成當前event.file文件,然后新建一個新的event.file文件,接下來的事件寫入新的event.file文件;
- 啟動一個后台線程,在內存中對當前完成的event.file文件中的event按照聚合根ID和事件版本號進行排序;
- 排序完成后,我們就知道了該文件中的事件涉及到哪些聚合根,他們的順序,以及最大最小聚合根ID分別是什么;
- 新建一個和event.file文件一樣大小的臨時文件;
- 在臨時文件的header中記錄當前event.file已排序過;
- 在臨時文件的數據區域將排好序的事件順序寫入文件;
- 臨時文件寫入完成后,將臨時文件替換當前已完成的event.file文件;
- 為event.file文件新建一個對應的事件索引文件eventIndex.file;
- 將event.file文件中的最大和最小聚合根ID寫入到eventIndex.file索引文件的header;每個event.file的最大最小的聚合根ID的關系,會在EventStore啟動時自動加載並緩存到內存中,這樣可以方便我們快速知道某個聚合根在某個event.file中是否存在事件,因為直接在內存中判斷即可。這個緩存我暫時命名為aggregateIdRangeCache吧,以便下面更方便的進一步說明如何使用它。
- 將event.file文件中的每個聚合根的每個事件的索引信息寫入eventIndex.file文件,事件索引信息包括:聚合根ID+事件版本號+事件的命令ID+事件在event.file文件中的物理位置這4個信息;有了這些索引信息,我們就可以只需要訪問事件索引文件就能獲取某個聚合根的所有版本信息(就是上面說的eventVersion)了;
- 但僅僅在事件索引文件中記錄最大最小聚合根ID以及每個事件的索引信息還不是不夠的。原因是,當我們要查找某個聚合根的所有版本信息時,雖然可以先根據內存中緩存的每個event.file文件的最大最小聚合根ID快速定位該聚合根在哪些event.file中存在事件(也就是明確了在哪些對應的事件索引文件中存在版本信息),但是當我們要從這些事件索引文件中找出該聚合根的事件索引到底具體在文件的哪個位置時,只能從文件的起始位置順序掃描文件才能知道,這樣的順序掃描無疑是不高效的。假設一個event.file文件的大小固定為512MB,一個事件的大小為1KB,則一個event.file文件大概存儲52W個事件,每個事件索引的大小大概為:24 + 4 + 24 + 8 = 60個字節。所以,這52W個事件的索引信息大概占用30MB,也就是最終一個事件索引文件的大小大概為30MB多一點。當我們要獲取某個聚合根的所有版本信息時,如果每次訪問某個事件索引文件時,總是要順序掃描30MB的文件數據,那無疑效率不高。所以,我還需要進一步想辦法優化,因為事件索引文件里的事件索引信息都是按照聚合根ID和事件版本號排序的,假設現在有52W個事件索引,則我們可以將這52W個事件索引記錄均等切分為100個點,然后把每個點對應的事件索引的聚合根ID都記錄到事件索引文件的header中,一個聚合根ID的長度為24個字節,則100個也就2.4KB左右。這樣一來,當我們想要知道某個聚合根的事件索引大概在事件索引文件的哪個位置時,我們可以先通過訪問header里的信息,快速知道應該從哪個位置去掃描。這樣一來,本來對於一個事件索引文件我們要掃描30MB的數據,現在變為只需要掃描百分之一的數據,即300KB,這樣掃描的速度就快很多了。這一段寫的有點啰嗦,但一切都是為了盡量詳細的描述我的設計思路,不知道各位看官是否看懂了。
- 除了記錄記錄最大最小聚合根ID以及記錄100個等分的切割點外,還有一點可以優化來提高獲取聚合根的版本信息的性能,就是:如果內存足夠,當某個eventIndex.file被讀取一次后,EventStore可以自動將這個eventIndex.file文件緩存到非托管內存中;這樣下次就可以直接在非托管內存訪問這個eventIndex.file了,減少了磁盤IO的讀取;
- 因為內存大小有限,所以eventVersion不可能全部緩存在內存;所以,當某個聚合根的eventVersion不在內存中時,需要從磁盤加載。加載的思路是:掃描aggregateIdRangeCache,快速找出該聚合根的事件在哪些event.file文件中存在;然后通過上面提到的查找算法快速查找這些event.file文件對應的eventIndex.file文件,這樣就能快速獲取該聚合根的eventVersion信息了;
- 另外,EventStore啟動時,最好需要預加載一些熱門聚合根的eventVersion信息到緩存。那該預加載哪些聚合根呢?我們可以在內存中維護一個固定大小(N)的環形數組,環形數組中維護了最近修改的聚合根的ID;當某個聚合根有事件產生,則將該聚合根ID的hashcode取摸N得到環形數組的下標,然后將該聚合根ID放入該下標;定時將該環形數組中的聚合根ID dump到文件preloadAggregateId.file進行存儲;這樣當EventStore啟動時,就可以從preloadAggregateId.file加載指定聚合根的eventVersion;
思路總結:
上面的設計的主要思路是:
- 寫入一個事件前先內存中判斷是否允許寫入,如果允許,則順序寫入event.file文件;
- 對一個已經寫入完成的event.file文件,則用一個后台異步線程對文件中的事件按照聚合根ID和事件版本號進行排序,然后將排序后的臨時event.file文件替換原event.file文件,同時將排序后得到的事件索引信息寫入eventIndex.file文件;
- 寫入一個事件時,如果當前聚合根的版本信息不在內存,則需要從相關的eventIndex.file文件加載到內存;
- 由於加載版本信息可能需要訪問多個eventIndex.file文件,會有多次讀磁盤的IO,對性能影響較大,所以,我們總是應該盡量在內存緩存聚合根的版本信息;
- 整個EventStore的性能瓶頸在於內存中能緩存多少聚合根版本信息,如果能夠緩存百分百的聚合根版本信息,且能做到沒有GC的問題(盡量避免),那我們就可以做到寫入事件非常快速;所以,如何設計一個支持大容量緩存(比如緩存幾十個GB的數據),且沒有GC問題的高性能緩存服務,就變得很關鍵了;
- 由於有了事件索引信息以及這么多的緩存機制,所以,當要查詢某個聚合根的所有事件,也就非常簡單了;
如何解決多線程並發寫的時候的CPU占用高的問題?
到這里,我們分析了如何存儲數據,如何寫入數據,還有如何查詢聚合根的所有事件,應該說核心功能的實現思路已經想好了。如果現在是單線程訪問EventStore,我相信性能應該不會很低了。但是,實際的情況是N多客戶端會同時並發的訪問EventStore。這個時候就會導致EventStore服務器會有很多線程要求同時寫入事件到數據文件,但是大家知道寫文件必須是單線程的,如果是多線程,那也要用鎖的機制,保證同一個時刻只能有一個線程在寫文件。最簡單的辦法就是寫文件時用一個lock搞定。但是經過測試發現簡單的使用lock,在多線程的情況下,會導致CPU很高。因為每個線程在處理當前事件時,由於要寫文件或讀文件,都是IO操作,所以鎖的占用時間比較長,導致很多線程都在阻塞等待。
為了解決這個問題,我做了一些調研,最后決定使用雙緩沖隊列的技術來解決。大致思路是:
設計兩個隊列,將要寫入的事件先放入隊列1,然后當前要真正處理的事件放在隊列2。這樣就做到了把接收數據和處理數據這兩個過程在物理上分離,先快速接收數據並放在隊列1,然后處理時把隊列1里的數據放入隊列2,然后隊列2里的數據單線程線性處理。這里的一個關鍵問題是,如何把隊列1里的數據傳給隊列2呢?是一個個拷貝嗎?不是。這種做法太低效。更好的辦法是用交換兩個隊列的引用的方式。具體思路這里我不展開了,大家可以網上找一下雙緩沖隊列的概念。這個設計我覺得最大的好處是,可以有效的降低多線程寫入數據時對鎖的占用時間,本來一次鎖占用后要直接處理當前事件的,而現在只需要把事件放入隊列即可。雙緩沖隊列可以在很多場景下被使用,我認為,只要是多個消息生產者並發產生消息,然后單個消費者單線程消費消息的場景,都可以使用。而且這個設計還有一個好處,就是我們可以有機會單線程批量處理隊列2里的數據,進一步提高處理數據的吞吐能力。
如何緩存大量事件索引信息?
最簡單的辦法是使用支持並發訪問的字典,如ConcurrentDictionary<T,K>,Java中就是ConcurrentHashmap。但是經過測試發現ConcurrentDictionary在key增加到3000多萬的時候就會非常慢,所以我自己實現了一個簡單的緩存服務,初步測試下來,基本滿足要求。具體的設計思路本文先不介紹了,總之我們希望實現一個進程內的,支持緩存大量key/value的一個字典,支持並發操作,不要因為內存占用越多而導致緩存能力的下降,盡量不要有GC的問題,能滿足這些需求就OK。
如何擴容?
我們再來看一下最后一個我認為比較重要的問題,就是如何擴容。
雖然我們單台EventStore機器只要硬盤夠大,就可以存儲相當多的事件。但是硬盤再大也有上限,所以擴容的需求總是有的。所以如何擴容(將數據遷移到其他服務器上)呢?通過上面的設計我們了解到,EventStore中最核心的文件就是event.file,其余文件都可以通過event.file文件來生成。所以,我們擴容時只需要遷移event.file文件即可。
那如何擴容呢?假設現在有4台EventStore機器,要擴容到8台。
有兩個辦法:
- 土豪的做法:准備8台全新的機器,然后把原來4台機器的全部數據分散到新准備的8台機器上,然后再把老機器上的數據全部刪除;
- 屌絲的做法:准備4台全新的機器,然后把原來4台機器的一半數據分散到新准備的4台機器上,然后再把老機器上的那一半數據刪除;
對比之下,可以很容易發現土豪的做法比較簡單,因為只需要考慮如何遷移數據到新機器即可,不需要考慮遷移后把已經遷移過去的數據還要刪除。大體的思路是:
- 采用拉的方式,新的8台目標機器都在向老的4台源機器拖事件數據;目標機器記錄當前拖到哪里了,以便如果遇到意外中斷停止后,下次重啟能繼續從該位置繼續拖;
- 每台源機器都掃描所有的事件數據文件,一個個事件進行掃描,掃描的起始位置由當前要拖數據的目標機器給出;
- 每台目標機器該拖哪些事件數據?預先在源機器上配置好這次擴容的目標機器的所有唯一標識,如IP;然后當某一台目標機器過來拖數據時,告知自己的機器的IP。然后源機器根據IP就能知道該目標機器在所有目標機器中排第幾,然后源機器就能知道應該把哪些事件數據同步給該目標機器了。舉個例子:假設當前目標機器的IP在所有IP中排名第3,則針對每個事件,獲取事件的聚合根ID,然后將聚合根ID hashcode取摸8,如果余數為3,則認為該事件需要同步給該目標機器,否則就跳過該事件;通過這樣的思路,我們可以保證同一個聚合根的所有事件都最終同步到了同一台新的目標機器。只要我們的聚合根ID夠均勻,那最終一定是均勻的把所有聚合根的事件均勻的同步到目標機器上。
- 當目標機器上同步完整了一個event.file后,就自動異步生成其對應的eventIndex.file文件;
擴容過程的數據同步遷移的思路差不多了。但是擴容過程不僅僅只有數據遷移,還有客戶端路由切換等。那如客戶端何動態切換路由信息呢?或者說如何做到不停機動態擴容呢?呵呵。這個其實是一個外圍的技術。只要數據遷移的速度跟得上數據寫入的速度,然后再配合動態推送新的路由配置信息到所有的客戶端。最終就能實現動態庫容了。這個問題我這里先不深入了,搞過數據庫動態擴容的朋友應該都了解原理。無非就是一個全量數據遷移、增量數據遷移、數據校驗、短暫停止寫服務,切換路由配置信息這幾個關鍵的步驟。我上面介紹的是最核心的數據遷移的思路。
結束語
本文介紹了我之前一直想做的一個基於文件版本的EventStore的關鍵設計思路,希望通過這篇文章把自己的思路系統地整理出來。一方面通過寫文章可以進一步確信自己的思路是否OK,因為如果你文章寫不出來,其實思路一定是哪里有問題,寫文章的過程就是大腦整理思緒的過程。所以,寫文章也是檢查自己設計的一種好方法。另一方面,也可以通過自己的原創分享,希望和大家交流,希望大家能給我一些意見或建議。這樣也許可以在我動手寫代碼前能及時糾正一些設計上的錯誤。最后再補充一點,語言不重要,重要的是架構設計、數據結構,以及算法。誰說C#語言做不出好東西呢?呵呵。