ENode 2.0 - 整體架構介紹


前言

今天是個開心的日子,又是周末,可以輕輕松松的寫寫文章了。去年,我寫了ENode 1.0版本,那時我也寫了一個分析系列。經過了大半年的時間,我對第一個版本做了很多架構上的改進,最重要的就是讓ENode實現了分布式,通過新增一個分布式消息隊列EQueue來實現。之所以要設計一個分布式的消息隊列是因為在enode 1.0版本中,某個特定的消息隊列只能被某個特定的消費者消費。這樣就會導致一個問題,就是如果這個消費者掛了,那這個消費者對應的消息隊列就不能自動被其他消費者消費了。這個問題會直接導致系統不可用。而ENode 2.0中,就不會有這個問題了,因為消息隊列被設計為獨立的,被消費者所共享的;一個消息隊列可以被多個消費者集群消費或廣播消費,如果一個消費者掛了,那其他的消費者會自動頂上。這里具體的細節,我會在后面詳細介紹。

ENode框架簡介

  1. 框架名稱:ENode
  2. 框架特色:DDD+CQRS + EDA + Event Sourcing + In Memory
  3. 設計目標:讓程序員只關注業務代碼、高性能、分布式、可水平擴展
  4. 開源地址:https://github.com/tangxuehua/enode
  5. 基於enode實現的一個完成案例,一個論壇:https://github.com/tangxuehua/forum
  6. nuget包Id:ENode
  7. 一個獨立的分布式消息隊列EQueue,可以為ENode提供Command,Domain Event的發布和訂閱:https://github.com/tangxuehua/equeue

ENode架構圖

熟悉CQRS架構的人看到這圖應該就再熟悉不過了,enode實現的是一個CQRS架構。基本的概念就不多介紹了,如果大家對上圖中的一些概念還不太清楚,可以看一下我的博客里的其他相關文章,我應該都有寫到。下面主要介紹一下enode 2.0在實現CQRS架構時的一些不一樣的地方(由於篇幅的限制,先說三點吧):

Command Handler一次只處理一個Command

就是你不能在command handler中一次修改多個聚合根,我覺得這應該是enode對開發人員的最大約束,可能也是最讓開發人員覺得不爽的地方。但我覺得這個不是約束,而是對數據強一致性和最終一致性的一個正確認識。在我學過ddd+cqrs+event sourcing這三個東西之后,我認識到,聚合內必須確保強一致性,聚合間最終一致性。傳統三層開發,我們通過unit of work模式(簡稱uow,比如nhibernate的session, entity framework的dbcontext)可以輕易實現多個對象修改的強一致性事務;確實在傳統三層模式開發中,這種利用uow的方式來實現跨聚合的強一致性事務的方式很實用,開發起來很方便,開發人員可以不必擔心會出現數據不一致的問題了,因為所有修改總是在一個事務內保存。

但enode的設計目標不是為了支持傳統三層開發,而是面向ddd+cqrs+eda+event sourcing架構的框架。曾經我也想讓command handler支持修改多個聚合根,但這樣做必須要面臨一個很棘手的問題:command在發送到command queue時,無法根據聚合根ID來路由了。因為一個command會修改多個聚合根,也就是說一個command不會和一個聚合根一一對應了。這意味着同一個聚合根沒辦法總是被路由到同一個command queue里,這樣就導致相同ID的聚合根可能會在兩台服務器被同時修改,這就會導致整個系統可能會頻繁的產生並發更新沖突。很多command就會不斷的重試,整個系統的性能就會下降。而enode設計之初就是為了高性能,所以這點讓我覺得很難接收。

相反,如果一個command總是只會創建或修改一個聚合根,那我們的command就能根據聚合根ID來路由到特定的消息隊列,同一個聚合根ID總是會被路由到同一個queue,而一個queue的消費者服務器(command handler所在的服務器)同一時刻總是只有一個,那我們就能保證一個聚合根的修改不會有並發問題。當然光這樣還不夠,在這個command消費者服務器里,enode框架會用內存級別的queue對同一個聚合根的所有command再次進行排隊(如果需要排隊的話),之所以要這樣是因為有時對一個聚合根的並發修改command可能1s內發送了很多過來,所以command handler肯定來不及在1s內全部處理掉這些command,所以需要在內存里再次排隊(天貓雙十一的時候,應用服務器內部也會有類似的對同一個聚合根設計一個相應的內存queue來避免對同一個聚合根的修改的並發沖突的問題)。通過這樣的設計,我們可以做到絕大部分情況下,不會再有並發沖突的問題,也就是command不會再出現重試的情況。這樣最后的效果就是:不同ID的聚合根的處理可以並行,同一個ID的聚合根的處理是串行,通過兩級排隊實現。前面說到,這樣只能做到絕大部分情況下不會有並發沖突,那么什么時候還是會有並發沖突呢?就是在新增command消費者服務器的時候,比如我們發現最近系統繁忙,我們希望增加command消費者服務器來加快command的處理,那在新增服務器后,原來修改某個聚合根的command可能會被路由到新的服務器,但是這個聚合根的有些command可能還在原來的服務器上還沒執行完,此時就會出現同一個聚合根在兩台服務器上被同時修改的可能了;那這個怎么解決呢?我現在的想法是框架層面不必解決了,我們只需要在系統最空的時候(比如凌晨4點)的時候,增加服務器即可,因為那個時候消息隊列里的消息是最少的,也就是不太可能會產生因為增加command handler服務器而導致並發沖突的問題,這樣我們就可以最大限度的避免可能帶來的並發沖突。

讓Domain生活在In Memory中

相比一般的CQRS架構,enode每次在處理一個command,在獲取聚合根時,不是從eventstore獲取,而是從緩存獲取。從上面的架構圖可以看出,enode架構中有一個domain memory cache,目前用redis實現。這樣做的好處是,將所有的聚合根都緩存在redis緩存中,這樣就能提高聚合根的讀取時間;有一個問題需要考慮,redis緩存服務器宕機了怎么辦?宕機后緩存數據就沒了,那如何恢復這些緩存數據呢?這也是我選擇redis的一個主要理由,因為redis支持持久化,我們可以利用redis的aof或快照方式的持久化功能,來持久化緩存數據。從而可以在redis掛了后能最快的速度恢復緩存,重啟redis服務器即可。那重啟之前以及重啟的過程中,因為無法從redis獲取聚合根了,那只能從eventstore通過event sourcing的方式去獲取,那樣的話性能肯定會比較差,那怎么辦呢?答案是通過定時為聚合根創建快照,這也是采用event sourcing架構的一個好處。我們可以定時對某些聚合跟創建快照(注意,我覺得只需要考慮那些對性能要求很高的模塊所涉及到的聚合根創建快照即可),那怎么創建呢?可以開一個獨立的進程,監聽domain event,對需要創建快照的domain event做出判斷,根據某種快照創建策略進行判斷,如果認為需要創建快照,則從event store拿出該聚合根的相關事件,通過event sourcing還原得到某個版本的聚合根,這樣就得到了某個聚合根的某個版本的快照了。然后持久化起來即可。然后,enode支持在從event store獲取聚合根前,先檢查是否有快照,如果有快照,則會先加載快照,再把快照之后的domain event從event store獲取,再把這些快照之后的domain event一個個apply到當前聚合根,從而得到最新狀態的聚合根。這個過程比獲取該聚合根的所有領域事件在一個個通過event sourcing還原得到聚合根要快的多;尤其是在一個聚合根的domain event比較多的情況下就更有意義。因此,通過緩存的引入,我們可以提高command handler的處理速度。

Event Store的設計

關於重復的command的冪等處理和聚合根可能存在的並發沖突的判斷

另外一點很重要的是,因為我們的command是會發送到分布式消息隊列,然后隊列中的command消息會被取出來執行;大家知道,我們很難保證一個消息不會被重復執行,也就是說,一個command可能會重復執行。因此,我們的應用要支持對command的密等處理。而對於使用enode框架的應用,因為整個command side的數據持久化就是持久化domain event,程序員不必關心domain event的持久化過程。所以enode很有必要能內置支持對command的重復處理的判斷。那么如何做呢?我覺得最靠譜的做法是,在持久化domain event的時候就能絕對靠譜的檢測出來某個command是否被重復執行了。那很自然就想到將被持久化的domain event和產生他的對應command關聯起來。所以我設計了如下的結構,用來表示一個command在操作聚合根后所產生的領域事件的信息。

/// <summary>The commandId which generate this event stream.
/// </summary>
public string CommitId { get; private set; }
/// <summary>The aggregate root id.
/// </summary>
public string AggregateRootId { get; private set; }
/// <summary>The aggregate root type code.
/// </summary>
public int AggregateRootTypeCode { get; private set; }
/// <summary>The version of the event stream.
/// </summary>
public int Version { get; private set; }
/// <summary>The occurred time of the event stream.
/// </summary>
public DateTime Timestamp { get; private set; }
/// <summary>The domain events of the event stream.
/// </summary>
public IEnumerable<IDomainEvent> Events { get; private set; }
  • CommitId:就是當前的CommandId;
  • AggregateRootId:當前被操作的聚合根的全局唯一ID;
  • AggregateRootTypeCode:表示聚合根的類型的一個code,通過該code我們可以知道當前記錄是哪個類型的聚合根的;
  • Version:一個版本號,表示聚合根產生領域事件后的新版本號,是產生事件前的版本號+1;也就是說,聚合根的版本是每次被修改一次,那Version就加1;
  • Timestamp:一個時間戳,用於記錄產生domain event時的時間;
  • Events:表示當前command操作聚合根后所產生的領域事件,一次操作可以產生多個領域事件;

對於上面的結構體,我們可以實現兩個重要的功能:1)為AggregateRootId和Version這兩個字段建立唯一索引,這樣我們就能實現判斷某個聚合根是否被並發修改,因為如果有並發修改導致並發沖突,那保存到eventstore時,它們的Version肯定是相同的;2)為AggregateRootId和CommitId兩個字段建立唯一索引,這樣我們就能判斷某個command是否被重復執行,因為一個command被實例化出來后,它所要修改的聚合根ID就不可能再修改了,所以如果該command被重復執行,那最后產生的領域事件(上面這個結構體)最后被持久化到eventstore時就會違反這個唯一索引,從而框架就能知道是否有command被重復執行了;

另外,上面這個結構體被保存到eventstore時,是以一條記錄的方式被保存,Events集合會被序列化為一段二進制;所以,假如我們用關系型數據庫來保存,那就是只有一條insert語句即可,這樣就實現了一個聚合根的一次修改的事務持久化。然后因為上兩個索引的存在,我們就能在保存時判斷是否有並發沖突或command是否被重復執行。

關於Domain Event大數據量的考慮

在設計event store時,我考慮了很多。最后認為event store要解決的最大的兩個問題是持久化性能和可水平擴展性。首先,因為每次command handler在處理完一個聚合根后,都會把產生的領域事件持久化到event store,沒持久化完成則不能認為該command已處理完,所以持久化的性能對處理command的吞吐量至關重要。另外一點就是可水平擴展性,因為event store里保存的都是domain event,而enode又是為了實現高性能為目標的,所以event store里的數據肯定會非常多,比如1s中要持久化1K個domain event,那一天就會有8600W條記錄要記錄,一天就真么多,那1年就更多了,所以用單點存儲所有的domain event顯示不靠譜了。所以我們的event store必須要支持水平擴展。比如我們可以設計100個分區,那每個分區一天只需要保存86W條記錄,一年也只需要保存3億多條記錄即可。之前我很追求單個存儲節點的高性能,所以曾經想過要用leveldb,stsdb,甚至redis這種高性能的基於key,value的nosql存儲。但后來發現這種nosql存儲雖然性能很高,但因為只是key,value的存儲結構,所以沒辦法支持二級索引,這樣就沒辦法實現上面第一點中提到的command的冪等處理和聚合根並發沖突的檢測。另一個重要的原因是,event store中的數據我們有時候是要被查詢的。比如現在某個command遇到的並發沖突,那框架需要自動重試,但是重試之前需要先更新redis緩存,就是把eventstore里的最新的聚合根更新到redis緩存里,這樣command在重試時才能拿到最新版本的聚合根,這樣重試才能成功。那如何從eventstore里拿最新的聚合根呢?只能根據聚合根ID從eventstore里查詢。而聚合根ID又不是key,value nosql的key,自然就沒辦法實現這個需求了;所以,我覺得合理的辦法應該是用關系型數據庫來實現eventstore。有人說關系型數據庫的性能不行。我覺得只要關系型數據庫支持水平擴展,也就是將domain event sharding(分片)到不同的分庫分表中,那平均到每個庫里的domain event的數量就不大了;這樣整個eventstore的持久化性能就可以隨着分庫的數量的增加而線性增加;比如我現在單個db insert domain event的性能是1K tps(mysql配合ssd硬盤完全無壓力,呵呵),那10個庫的tps就能達到1W tps了。因為我們分庫會根據聚合根ID的hash code來平均散列,這樣能確保每個庫中的聚合根的domain event數量是基本一樣的;從而就能實現整個event store的持久化性能隨着分庫的增加而線性增加。所以,有了分庫的優勢,大數據量和性能都不是問題了。且因為關系型數據庫支持二級索引和唯一索引,那查詢domain event也不是問題了。

ENode物理部署結構圖

上圖是enode在實際項目中我目前認為的一個物理部署結構圖。

首先客戶端瀏覽器通過網絡最后訪問到我們的web服務器集群,當然web服務器前面肯定還有網關和負載均衡器,我這里為了突出重點就不畫出來了。然后每個web服務器接受到httprequest后會生成command,然后通過enode框架發送到分布式消息隊列服務器(message queue server),目前由我開發的equeue實現。然后消息隊列服務器上的消息會被推送到command process servers,command process server就是執行command handler、完成domain logic,持久化domain event,以及publish domain event的服務器。command process server處理完之后,domain event會由enode框架自動發送到message queue server,然后會被event process server處理,event process server就是訂閱domain event,然后根據domain event更新query db。對於查詢,web server可以直接通過sql查詢query db即可。

各種服務器的集群:

  • web server:無狀態,可以任意增加服務器;
  • command process server:就是處理業務邏輯的服務器,也是無狀態,可以任意增加服務器;但服務器的數目最好和command所對應的topic下的queue的數量保持一致,這點后續在寫分布式消息隊列equeue的文章時在詳細談吧;
  • redis server:就是緩存聚合根的服務器,屬於緩存服務器;可以按需要存儲的容量來規划需要開多少台redis server;目前我覺得最好的redis動態擴容方法就是pre sharding;
  • event db server:就是存儲domain event的服務器,按照上的分析,我們采用的是關系型數據庫,比如用mysql;mysql的分庫分表技術已經很成熟,后續文章我們再詳細討論如何分庫以及如何做數據遷移;
  • event process server:就是訂閱domain event,根據domain event更新query db的服務器;可以根據需要來部署多少台,和command process server類似;這里有一點必須要先提一下,就是在更新query db時,因為每次更新都是針對某個domain event來更新query db的,而domain event只表示一個聚合根的修改,所以每次我們更新query db時,也只更新該聚合根所在范圍的表;我們千萬不要去更新超過該聚合根范圍的表,否則就會產生並發沖突,導致event handler執行失敗;這樣就會是的cqrs的query db同步數據變的很慢。對於query side,如果我們覺得直接從query db查詢數據太慢,可以考慮設計查詢緩存,也就是不走query db來查詢數據,而是走緩存。這種緩存就和我們平時的緩存設計類似了;利用domain event,我們先天就有優勢可以讓緩存非常及時的更新,呵呵。因為一旦有domain event過來,我們就能快速更新我們的query side緩存,而query db就可以異步更新即可。這樣就可以解決query side同步更新數據慢的問題。
  • message queue server:就是消息隊列服務器,目前equeue還不支持集群,只支持單機;這個以后有時間會考慮實現master-slave模式,類似於淘寶的rocketmq一樣。

好了,就寫這些吧,后續的再后續文章中補上,呵呵。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM