ENode 2.0 - 深入分析ENode的內部實現流程和關鍵地方的冪等設計


前言

ENode是一個基於消息的架構,使用ENode開發的系統,每個環節都是處理消息,處理完后產生新的消息。本篇文章我想詳細分析一下ENode框架內部是如何實現整個消息處理流程的。為了更好的理解我后面的流程的描述,我覺得還是應該先把ENode的架構圖貼出來,好讓大家在看后面的分析時,可以對照這個架構圖進行思考和理解。

ENode架構圖

ENode框架內部實現流程分析

  1. Controller發送ICommand到消息隊列(EQueue);
  2. 【從這一步開始處理Command】ENode.EQueue中的CommandConsumer接收到該ICommand,先創建一個ICommandContext實例,然后調用ENode中的ICommandExecutor執行當前ICommand並將ICommandContext傳遞給ICommandExecutor;
  3. ICommandExecutor根據當前ICommand的類型,獲取到一個唯一的ICommandHandler,然后調用ICommandHandler的Handle方法處理當前ICommand,調用時傳遞當前的ICommandContext給ICommandHandler;
  4. ICommandHandler處理完Command后,ICommandExecutor獲取當前ICommandContext中新增或修改的聚合根;
  5. 檢查當前ICommandContext中是否只有一個新增或修改的聚合根;如果超過1個,則報錯,通過這樣的檢查來從框架級別保證一個Command一次只能修改一個聚合根;
  6. 如果發現當前新增或修改的聚合根為0個,則直接認為當前的ICommand已處理完成,就調用ICommandContext的OnCommandExecuted方法,該方法內部會通知EQueue發送CommandResult消息給Controller;然后Controller那邊的進程,會有一個CommandResultProcessor接收到這個CommandResult的消息,然后就知道該ICommand的處理結果了;
  7. ICommandExecutor從ICommandContext拿到當前唯一修改的聚合根后,取出該聚合根里產生的IDomainEvent。由於一個聚合根一次可能會長生多個IDomainEvent,所以我們會構建一個EventStream對象。這個對象包含了所有當前聚合根所產生的IDomainEvent。一個EventStream會包含很多重要的信息,包括當前ICommand的ID、聚合根的ID、聚合根的Version(版本號),以及所有的IDomainEvent,等等;
  8. ICommandExecutor將該Command添加到ICommandStore。因為ICommandStore是以CommandId為主鍵(即Key),所以如果CommandId重復,框架就會知道,然后就會做重復時的邏輯處理,這點后面再詳細分析;
  9. 如果Command成功添加到ICommandStore,則接下來調用IEventService的Commit方法將當前EventStream持久化到IEventStore;
  10. IEventService內部主要做3件事情:1)將EventStream持久化到IEventStore;2)持久化成功后調用IMemoryCache更新緩存(緩存可以配置為本地緩存也可以配置為分布式緩存Redis,如果Command的處理是集群處理的,那我們應該用共享緩存,也就是用Redis這種分布式緩存);3)緩存更新好之后,調用IEventPublisher接口的Publish方法將EventStream發布出去,IEventPublisher的具體實現者會把當前的EventStream發送到EQueue。這3步是正常情況的流程。如果遇到持久化到IEventStore時遇到版本號重復(同一個聚合根ID+聚合根的Version相同,則認為有並發沖突),此時框架需要做不同的邏輯處理;這點也在后面詳細分析。
  11. 【從這一步開始處理Domain Event】EventStream被ENode.EQueue中的EventConsumer接收到,然后EventConsumer調用IEventProcessor處理當前的EventStream;
  12. IEventProcessor首先判斷當前的EventStream是否可以被處理,這里我們需要保證的很關鍵的一點是,必須確保事件的持久化順序和被事件的訂閱者處理的順序要嚴格一樣,否則就會出現Command端的數據和Query端的Read DB中的數據不一致的情況。關於如何保證這個順序的一致,后面我們在詳細分析。這里先舉個簡單的例子來說明為什么要順序一致。比如假如現在有一個聚合根的一個屬性,該屬性的默認值是0,然后該屬性先后發生了三個Domain Event(代表的意思分別是對這個屬性做+1,*2,-1)。這三個事件如果按照這樣的順序發生后,那這個屬性最后的值是1;但是如果這3個事件被消費者消費的順序是+1,-1,*2那最后的結果就不是1了,而是0了;所以通過這個例子,我想大家應該都知道了為什么要嚴格保證聚合根持久化事件的順序必須和被消費的順序要完全一致了;
  13. 假如當前的EventStream允許被處理,則IEventProcessor對當前的EventStream中的每個IDomainEvent做如下處理:1)根據IDomainEvent的類型得到所有當前IEventProcessor節點上所有注冊的IEventHandler,然后調用它們的Handle方法,完成比如Query端的Read DB的更新。但是事情還沒那么簡單,因為我們還需要保證當前的IDomainEvent只會被當前的IEventHandler處理一次否則IEventHandler就會因為重復處理了IDomainEvent而導致最后的數據不對;這里的冪等也在后面詳細討論。
  14. 有些IEventHandler處理完IDomainEvent后會產生新的ICommand(就是Saga Process Manager)的情況。這種情況下,我們還需要把這些產生的ICommand由框架自動發送到消息隊列(EQueue);但是事情也沒那么簡單,要是發送這些ICommand失敗了呢?那就需要重發,那重發如何設計才能保證不管重發多少次,也不會導致ICommand的重復執行呢?這里其實最關鍵的一點是要保證你每次重發的ICommand的Id總是和第一次發送時要相同的,否則框架就無法知道是否是同一個Command了。這里的具體設計后面再分析。

Command的冪等處理

上面流程中的第8步,Command會被添加到ICommandStore。這里,實際上我添加到ICommandStore的是一個HandleCommand對象,該對象包含當前的Command之外,還有當前被修改的聚合根ID。這樣做的理由請看我后面的解釋。我們知道ICommandStore會對CommandId作為主鍵,這樣我們就能絕對保證一個Command不會被重復添加。如果Command添加到ICommandStore成功,那自然最好了,直接進入后續的步驟即可;但是如果出現CommandId重復的時候,我們需要做怎么樣的處理呢?

如果出現重復,則需要根據CommandId(主鍵),把之前已經持久化過的HandledCommand取出來;然后然后我們從HandledCommand拿到被修改的聚合根ID,然后最關鍵的一步:我們將該聚合根ID以及CommandId作為條件從IEventStore中查詢出一個可能存在的EventStream。如果存在,就說明這個Command所產生的Domain Event已經被持久化了,所以我們只要再做一遍發布事件的操作即可。即調用IEventPublisher.Publish方法來發布事件到Query Side。那么為什么要發布呢?因為雖然事件被持久化了,但並不代表已經成功被發布出去了。因為理論上有可能Domain Event被持久化成功了,但是在要發布事件的時候,斷電了!所以這種情況下,重啟服務器就會出現這里討論的情況了。所以我們需要再次Publish事件。

然后,如果沒有根據CommandId和聚合根ID查找到EventStream呢?那也好辦,因為這種情況就說明這個Command雖然被持久化了,但是它所產生的EventStream卻沒有被持久化到EventStore,所以我們需要將當前的EventStream調用IEventService.Commit方法進行持久化事件。

另外,這里其實有一個疑問,為什么查找EventStream不能僅僅根據CommandId呢?原因是:從技術上來說,我們可以只根據CommandId來查找到一個唯一的EventStream,但這樣設計的話,就要求EventStore必須要支持通過一個CommandId來全局唯一定位到一個EventStream了。但是因為考慮到EventStore的數據量是非常大的,我們以后可能會根據聚合根ID做水平拆分(sharding)。這樣的話,我們僅僅靠CommandId就無法知道到哪個分片下去查找對應的EventStream了。所以,如果查詢時,能同時指定聚合根ID,那我們就能輕松知道首先到哪個分片下去找EventStream,然后再根據CommandId就能輕松定位到一個唯一的EventStream了。

既然說到這里,我再說一下CommandStore的水平分割的設計吧,CommandStore的數據量也是非常大的,因為它會存儲所有的Command。不過幸好,我們對於CommandStore只需要根據CommandId去查找即可,所以我們可以根據CommandId來做Hash取模的方式來水平拆分。這樣即便是分片了,我們只要知道了一個給定的CommandId,也能知道它當前是在哪個分片下的,就很容易找到該Command了。

所以,通過上面的分析,我們知道了CommandStore和EventStore在設計上不僅僅考慮了如何存儲數據,還考慮了未來大數據量時如何分片,以及如何在分片的情況下仍然能方便的查找到我們的數據。

最后,上面還有一種情況沒有說明,就是當出現Command添加到CommandStore時發現重復,但是嘗試從CommandStore中根據CommandId查詢該Command時,發現查不到,天哪!這種情況實際上不應該出現,如果出現,那說明CommandStore內部有問題了。因為為什么添加時說有重復,而查詢卻差不多來呢?呵呵。這種情況就無法處理了,我們只能記錄錯誤日志,然后進行后續的排查。

Domain Event持久化時的並發沖突檢測和處理

上面流程中的第10步,我們提到:如果遇到EventStream持久化到IEventStore時遇到版本號重復(同一個聚合根ID+聚合根的Version相同,則認為有並發沖突),此時框架需要做不同的邏輯處理。具體是:

首先,我們可以先想想為什么會出現同一個聚合根會在幾乎同一時刻產生兩個版本號一樣的領域事件,並持久化到EventStore。首先,我先說一下這種情況幾乎不會出現的理由:ENode中,在ICommandExecutor在處理一個Command時,會檢查當前該Command所要修改的聚合根是否已經有至少一個聚合根正在被處理,如果有,則會將當前Command排入到這個聚合根所對應的等候隊列。也就是說,它暫時不會被執行。然后當當前聚合根的前面的Command被執行完了后才會從這個等候隊列取出下一個等待的Command進行處理。通過這樣的設計,我們保證了,對一個聚合根的所有Command,不會並行被執行,只會按照順序被執行。因為每個ICommandExecutor會在需要的時候,為某個聚合根自動創建這種等候隊列,只要對該聚合根的Command同一時刻進來2個或以上。

那么,要是集群的時候呢?你一台機器的話,通過上面的方式可以確保一個聚合根實例的所有的Command會被順序處理。但是集群的時候,可能一個聚合根會在多台機器被同時處理了。要解決這個問題的思路就是對Command按照聚合根ID進行路由了,因為一般只要是修改聚合根的Command總是會帶有一個聚合根ID,所以我們可以按照這個特性,對被發送的Command按照聚合根ID進行路由。只要CommandId相同,則總是會被路由到同一個隊列,然后因為一個隊列總是只會被一台機器消費,從而我們能保證對同一個聚合根的Command總是會落到一台機器上被處理。那么你可能會說,要是熱點數據呢?比如有些聚合根突然對他修改的Command可能非常多(增加了一倍),而有些則很少,那怎么辦呢?沒關系,我們還有消息隊列的監控平台。當出現某個聚合根的Command突然非常多的時候,我們可以借助於EQueue的Topic的Queue可以隨時進行增加的特性來應付這個問題。比如原來這個Topic下只有4個Queue,那現在增加到8個,然后消費者機器也從4台增加到8台。這樣相當於Command的處理能力也增加了一倍。從而可以方便的解決熱點數據問題。因此,這也是我想要自己實現分布式消息隊列EQueue的原因啊!有些場景,要是自己沒有辦法完全掌控,會很被動,直接導致整個架構的嚴重缺陷,最后導致系統癱瘓,而自己卻無能為了。當然你可以說我們可以使用Kafka, Rocketmq這樣的高性能分布式隊列,確實。但是畢竟這種高大上的隊列非常復雜,且都是非.NET平台。除了問題,維護起來肯定比自己開發的要難維護。當然除非你對它們非常精通且有自信的運維能力。

通過上面的思路實現的,確保聚合根的Command總是被順序線性處理的設計,對EventStore有非常大的意義。因為這樣可以讓EventStore不會出現並發沖突,從而不會造成無謂的對EventStore的訪問,也可以極大的降低EventStore的壓力。

但是什么時候還是可能會出現並發沖突呢?因為:

1)當處理Command的某台機器掛了,然后這台機器所消費的Queue里的消息就會被其他機器接着消費。其他機器可能會從這個Queue里批量拉取一些Command消息來消費。然后此時假如我們重啟了這台有問題的服務器,重啟完之后,因為又會開始消費這個Queue。然后一個關鍵的點是,每次一台機器啟動時,會從EQueue的Broker拉取這個Queue最后一個被消費的消息的位置,也就是Offset,而由於這個Offset的更新是異步的,比如5s才會更新到EQueue的Broker,所以導致這台重啟后的服務器從Broker上拉取到的消費位置其實是有延遲的,從而就可能會去消費在那台之前接替你的服務器已經消費過的或者正在消費的Command消息了。當然這種情況因為條件太苛刻,所以基本不會發生,即便會發生,一般也不會導致Command的並發執行。但是這畢竟也是一種可能性。實際上這里不僅僅是某個服務器掛掉后再重啟的情況會導致並發沖突,只要是處理Comand的機器的集群中有任何的機器的增加或減少,由於都會導致Command消息的消費者集群重新負載均衡。在這個負載均衡的過程中,就會導致同一個Topic下的同一個Queue里的部分消息可能會在兩台服務器上被消費。原因是Queue的消費位置(offset)的更新不是實時的,而是定時的。所以,我們一般建議,盡量不要在消息很多的時候做消費者集群內機器的變動,而是盡量在沒什么消息的時候,比如凌晨4點時,做集群的擴容操作。這樣可以盡量避免所有可能帶來的消息重復消費或者並發沖突的可能性。呵呵,這段話也許很多人看的雲里霧里,我只能說到這個程度了,也許要完全理解,大家還需要對EQueue的設計很清楚才行!

2)就算同一個機器內,其實也是有可能出現對同一個聚合根的並發修改,也就是針對同一個聚合根的兩個Command被同時執行。原因是:當一個Command所對應的EventStream在被持久化時出現重復,然后我就會放在一個本地的內存隊列進行重試,然后重試由於是在另一個專門的重試線程里,該線程不是正常處理Command的線程。所以假如對該聚合根后續還有Command要被處理,那就有可能會出現同一時刻,一個聚合根被兩個Command修改的情況了。

現在,我們在回來討論,假如遇到沖突時,要怎么做?這個上面我簡單提到過,就是需要重試Command。但也不是這么簡單的邏輯。我們需要:

a. 先檢查當前的EventStream的Version是否為1,假如為1,說明有一個創建聚合根的Command被並發執行了。此時我們無須在重試了,因為即便再重試,那最后產生的EventStream的版本號也總是1,因為只要是第一次創建聚合根,那這個聚合根所產生的DomainEvent的版本總是1。所以這種情況下,我們只需要直接從EventStore拿出這個已經存在的EventStream,然后通過IEventPublisher.Publish方法發布該EventStream即可。為什么要再次發布,上面解釋Command的冪等時,也解釋了原因,這里是一樣的原因。這里也有一個小的點需要注意,就是假如嘗試從EventStore拿出這個EventStream時,假如沒獲取到呢?這個問題實際上不應該出現,原因就像上面分析Command冪等時一樣,為什么會出現添加時提示存在,但查詢時卻查不到的情況呢?這種情況就是EventStore的設計有問題了,讀寫存在非強一致性的情況了。

b. 如果當前的EventStream的Version大於1,則我們需要先更新內存緩存(Redis),然后做Command的重試處理。為什么要先更新緩存呢?因為如果不更新,有可能重試時,拿到的聚合根的狀態還是舊的,所以重試后還是導致版本號沖突。那為什么從緩存中拿到的聚合根的狀態可能還是舊的呢?因為EventStream已經存在於EventStore並不代表這個EventStream的修改已經更新到緩存了。因為我們是先持久化到EventStore,在更新緩存的。完全有可能你還沒來得及更新緩存的時候,另一個Command正好需要重試呢!所以,最保險的做法,就是再重試的時候將緩存中的聚合根狀態更新到最新值。那怎么更新呢?呵呵,很簡單,就是通過事件溯源(即Event Sourcing技術)了。我們只要從Event Store獲取當前聚合根的所有的Event Stream,然后溯源這些事件,最后就能得到聚合根的最新版本的狀態了,然后更新到緩存即可。

最后,如果需要重試的話,要怎么重試呢?很簡單,只要扔到一個本地的基於內存的重試隊列即可。我現在是用BlockingCollection的。

如何保證事件產生的順序和被消費的順序相同

為什么要保證這個相同的順序,在上面的流程步驟介紹里已經說明了。這里我們分析一下如何實現這個順序的一致。基本的思路是用一個表,存放所有聚合根當前已經處理過的最大版本號,假如當前已經處理過的最大版本號是10,那接下來只能處理這個聚合根版本號為11的EventStream。即便Version=12或者更后面的先過來,也只能等着。那怎么等呢?也是類似Command的重試隊列一樣,在一個本地的內存隊列等就行了。比如現在最大已處理的版本號是10,然后現在12,13這兩個版本號的EventStream先過來,那就先到隊列等着,然后版本號是11的這個事件過來了,就可以處理。處理好之后,當前最大已處理的版本號就編程11了,所以等候隊列中的版本號為12的EventStream就可以允許被處理了。整個控制邏輯就是這樣。那么這是單機的算法,要是集群呢?實際上這不必考慮集群的情況,因為我們每台機器上都是這個順序控制邏輯,所以如果是集群,那最多可能出現的情況(實際上這種情況存在的可能性也是非常的低)是,版本號為11的EventStream被並發的處理。這種情況就是我下面要分析的。

這里實際上還有一個細節我還沒說到,這個細節和EQueue的Consumer的ConsumerGroup相關,就是假如一種消息,有很多Consumer消費,然后這些Consumer假如分為兩個ConsumerGroup,那這兩個ConsumerGroup的消費是相互隔離的。也就是說,所有這些消息,兩個ConsumerGroup內的Consumer都會消費到。這里如果不做一些其他的設計,可能會在用戶使用時遇到潛在的問題。這里我沒辦法說的很清楚,說的太清楚估計會讓大家思維更混亂,且因為這個點不是重點。所以就不展開了。有興趣的朋友可以看一下ENode中的EventPublishInfo表中的EventProcessorName字段的用意。

如何保證一個IDomainEvent只會被一個IEventHandler處理一次

這一條的原因,我想大家都能理解。比如一個Event Handler是更新讀庫的,可能我們會執行一條有副作用的SQL,比如update product set price = price + 1 where id = 1000。這條SQL如果被重復執行一次,那price字段的值就多了1了,這不是我們所期望的結果。所以框架需要有基本的責任可以基本避免這種情況的發生。那怎么實現呢?思路也是用一張表,記錄被執行的DomainEvent的ID以及當前處理這個DomainEvent的Event Handler的類型的一個Code,對這兩個聯合字段做聯合唯一索引。每次當一個Event Handler要處理一個Domain Event時,先判斷是否已經處理過,如果沒處理過,則處理,處理過之后把被處理的Domain Event ID和EventHandler Type Code添加到這個表里即可。那假如添加的時候失敗了呢?因為有可能也會有並發的情況,導致Event Handler重復處理同一個Domain Event。這種情況框架就不做嚴謹的處理了,因為框架本身也無法做到。因為框架式無法知道Event Handler里面到底在做什么的。有可能是發送郵件,也有可能是記錄日志,也可能是更新讀取(Read DB)。所以,最根本的,還是要求Event Handler內部,也就是開發這自己需要考慮冪等的實現。當然框架會提供給開發者必要的信息來幫助他們完成嚴謹冪等控制。比如框架會提供當前Domain Event 的版本號給Event Handler,這樣Event Handler里就能在Update SQL的Where部分把這個Version帶上,從而實現樂觀並發控制。比如下面的代碼示例:

public void Handle(IEventContext context, SectionNameChangedEvent evnt)
{
    TryUpdateRecord(connection =>
    {
        return connection.Update(
            new
            {
                Name = evnt.Name,
                UpdatedOn = evnt.Timestamp,
                Version = evnt.Version
            },
            new
            {
                Id = evnt.AggregateRootId,
                Version = evnt.Version - 1
            }, Constants.SectionTable);
    });
}

上面的代碼中,當我們更新一個論壇的版塊時,我們可以在sql的where條件中,用version = evnt.Verion - 1這樣的條件。從而確保當前你要處理的事件一定是上一次已處理的事件的版本號的下一個版本號,也就是保證了Query Side的更新的順序嚴格和事件產生的順序一致。這樣即便框架在有漏網之魚的時候,Event Handler內部也能做嚴謹的順序控制。當然如果你的Event Handler是發送郵件,那我還真不知道該如何進一步保證這個嚴謹的順序或者並發沖突了,呵呵。有興趣的朋友可以和我交流。

在Saga Process Manager中產生的ICommand如何能夠支持重試發送而不導致操作的重復

終於到最后一點了,好累。堅持就是勝利!假如現在的Saga Event Handler里是會產生Command,那框架在發送這些Command時,要確保不能重復執行。怎么辦呢?假如在Saga Event Handler里產生的Command的Id每次都是新new出來的一個唯一的值,那框架就無法知道這個Command是否和之前的重復了,框架會認為這是兩個不同的Command。這里其實有兩種做法:

1. 框架先把Saga Event Handler中產生的Command保存起來,然后慢慢發送到EQueue。發送成功一個,就刪除一個。直到全部發送完為止。這種做法是可行的,因為這樣一來,我們要發送的Command就總是從存儲這些Command的地方去拿了,所以不會出現每次要發送的同一個Command的ID都是不同的情況。但是這種設計性能不是太好,因為要發送的Command必須要先被保存起來,然后再發送,發送完之后還要刪除。性能上肯定不會太高。

2.第二種做法是,Command不存儲起來,而是直接把Saga Event Handler中產生的Command拿去發送。但這種設計要求:框架對這種要發送的Command的ID總是按照某個特定的規則來產生的。這種規則要保證產生的CommandId首先是要唯一的,其次是確定的。下面我們看一下下面的代碼:

private string BuildCommandId(ICommand command, IDomainEvent evnt, int eventHandlerTypeCode)
{
    var key = command.GetKey();
    var commandKey = key == null ? string.Empty : key.ToString();
    var commandTypeCode = _commandTypeCodeProvider.GetTypeCode(command.GetType());
    return string.Format("{0}{1}{2}{3}", evnt.Id, commandKey, eventHandlerTypeCode, commandTypeCode);
}

上面這個代碼是一個函數,用來構建要被發送的Command的Id的,我們可以看到ID是由Command的一個key+要被發送的Command的類型的code+當前被處理的Domain Event的ID,以及當前的Saga Event Handler的類型的code這四個信息組成。對於同一個Domain Event被同一個Event Handler處理,然后如果產生的Command的類型也是一樣的,那我們基本可以通過這三個信息構建一個唯一的CommandId了,但是有時這樣還不夠,因為我們可能在一個Event Handler里構建兩個類型完全一樣的Command,但是他們修改的聚合根的ID不同。所以,我上面才有一個commandKey的組成部分。這個key默認就是這個Command要修改的聚合根的ID。這樣,通過這樣4個信息的組合,我們可以確保不管某個Domain Event被某個Saga Event Handler處理多少次,最后產生的Command的ID總是確定的,不變的。當然上面的commandKey有時僅僅考慮聚合根ID可能還不夠,雖然我還沒遇到過這種情況,呵呵。所以我框架設計上,就允許開發者能重新GetKey方法,開發者需要理解何時需要重寫這個方法。看了這里的說明應該就知道了!

好了,差不多了,該睡覺了!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM