問題背景
Conference案例,是一個關於在線創建會議(類似QCon這種全球開發者大會)、在線管理會議位置信息、在線預訂某個會議的位置的,這樣一個系統。具體可以看微軟的這個項目的主頁:http://cqrsjourney.github.io。
然后我們設計了一個Conference聚合根,對應領域中的會議這個領域概念。Conference聚合根下面,有一些位置信息SeatType。一個會議聚合根下面可以添加不同類型的位置,每種類型的位置可以指定數量以及價格。所以,Conference是聚合根,Conference本身有一些我們所關心的基本屬性,同時它內部聚合了一些SeatType子實體。每個SeatType包含了位置的價格、數量這兩個信息。
然后,在UI層面,我們會有如下界面邊界管理一個會議的所有位置信息。
上圖列出了某個會議的兩類位置,Quota表示位置的配額數量;當我們要修改某種位置時,可以點擊鏈接,然后出現如下圖所示:
出現四個編輯框,我們可以修改任何一個框。修改完后點擊保存,我們就能更新某個類型的位置信息了。然后,我們在domain里,設計了兩個domain event;分別表示位置基本信息改變和位置配額數量的改變。
為什么要獨立出數量改變的domain event呢?因為當用戶在前台下單訂購位置時,這個數量也會變化。也就是位置數量可能會單獨變化。所以,我們考慮單獨為位置數量的變化定義一個domain event。
然后,我們目前的代碼是,當點擊保存時,首先更新會更新位置的基本信息,然后判斷數量是否有變化,如果沒變化,則只產生位置基本信息變化的domain event;如果有變化,則同時產生位置數量改變的domain event。Conference聚合根相關方法的具體實現如下:
上面的代碼的大致意思是,先從聚合內找出需要修改的位置類型,如果不存在就拋異常;如果存在,則先產生位置基本信息的改變事件;然后判斷數量是否有變化,如果有變化,則繼續判斷當前輸入的數量是否太小,如果太小也是不允許的。
比如,假如用戶錄入的數量是10,但是當前這種類型的位置已經有11個被預定了,那就不能改為10,而是必須至少為11。最后,如果一切都合法,就產生一個SeatTypeQuantityChanged的事件,表示某個類型的位置的數量發生了變化,同時在事件中帶上可預定的剩余位置的數量。
然后讀庫我們就根據上面這兩個事件來更新。
現在的問題是,假如兩個事件都發生了,那讀庫要怎么原子更新(在一個事務里更新)?我們的一個event handler只能處理一個event;也就是說,我們會有兩個event handler,分別處理對應的事件。由於domain aggregate是一次性原子的方式同時產生兩個domain event。所以,我們要確保兩個event handler要么都更新成功,要么都不更新成功,這個問題之前沒考慮到過,下面我們來想想辦法。
解決思路
思路1
想辦法把這兩個event handler包裝在一個事務里,但這要求框架支持這樣的跨多個event handler的事務機制;對框架要求的的改造有點大,復雜度高,不太可行。因為框架要考慮的問題是要更通用的,比如,一旦引入事務,也許還會引入分布式事務等問題。而且這種做法,性能也不高,違反ENode一開始就是為高並發設計的初衷。
思路2
要求領域里不要設計兩個domain event了,就用一個domain event解決;這個event包含所有信息的修改,包括數量的修改。這個辦法可行,但要求模型做出妥協和讓步了。假如有一天我們遇到模型必須要產生多個事件的情況,那怎么辦呢?所以,這個思路還是在逃避問題。
思路3
不采用事務,而是采用樂觀鎖+順序控制+冪等支持的方式解決問題。思路是,框架按照順序調用這兩個event handler,調用的順序和這兩個事件的順序一致;兩個event handler允許不在一個事務里。
這樣的問題是,假如第一個事件處理成功了,然后此時機器斷電了,第二個事件沒被處理,怎么辦?那就是要做到,當下一次機器重啟后,第二個事件能被處理。然后,因為整個架構是分布式的,所以第一個事件也是有可能被重復處理的,框架在調用event handler時,為了性能方面的考慮,只會盡量保證同一個event不會被同一個event handler重復處理,不會絕對保證;但是框架有提供機制,讓開發人員在event handler內部通過依賴版本號的方式來解決重復處理的問題。所以,總結一下,我們需要處理的問題有以下3個:
- 需要保證任何event handler內部自己能做到絕對的冪等,框架提供支持;
- 需要保證任何一個event至少被處理一次,即便是在任何時候斷電的情況下;
- 需要保證同一個事件流里的事件,處理的順序也要按照事件流的順序處理;
為了做到上面這3點,我對ENode做了一個完善,就是為事件引入了一個子版本號的概念。
就是當聚合根每次做出修改后,不管產生多少個domain event,這些domain event都是在一個event stream里;每個event stream都有一個版本號,然后每個domain event的主版本號就是其所在的event stream的版本號。比如某個聚合根某次變化產生了2個domain event,它們被保證在一個event stream里,然后假如這個event stream的版本號為10,那每個domain event的主版本號也是10;這點ENode框架可以做保證。那event stream的版本號哪里來的呢?就是從聚合根上得來,因為每個聚合根都維護了當前自己的版本號是什么,用version表示,那它下一次產生的event stream的版本號就是version+1。
上面解釋了什么是事件的主版本號。下面我們在說一下什么是事件的子版本號。子版本號比較簡單,就是假如一個event stream里包含2個事件,那第一個事件的子版本號是1,第二個則是2;所以,其實子版本號就是事件在事件流里的順序號。
然后,有了事件的主版本號和子版本號的概念。我們就可以做到上面的3點要求了。其中的第2點,EQueue會做到確保任何一個消息至少被處理一次,這里不做展開了。第1、3點,我們通過下面的代碼結合分析討論。
為了代碼效果好一點,我直接通過截圖的方式了,博客園以后官方提供一套這樣的代碼模板吧,呵呵。@蟋蟀,上次你跟我說的那個模板,我后來忘記使用了:)
上面的代碼中,每個event handler內部有一個事務,為什么還需要事務?因為我們現在更新的是聚合根,子實體(位置信息)是聚合根的一部分;所以讀庫更新時,自然也要更新聚合根本身的。只不過這里只需要更新聚合根的版本號即可。
第一個event handler,我們先啟動一個事務,然后先更新聚合根的主版本號,以及次版本號;假如數據庫里conference記錄的當前的主版本號是10,次版本號是1,那這個evnt.Version就是11,evnt.Sequence是1,Sequence就是次版本號。然后通過第一條Update SQL我們就能更新聚合根的主版本號以及次版本號了。由於單條update sql是原子事務(無並發問題)的,所以我們只要判斷更新的影響行數是否為1。如果是1,則說明更新成功,那就可以更新位置那條記錄了。然后,由於這兩條更新語句在一個事務里,所以要么全部完成,要么什么都不做,不會有做了一半的情況。
第二個event handler,同樣,我們也是先啟動一個事務。然后區別是,因為我們知道SeatTypeQuantityChanged事件和SeatTypeUpdate事件總是在一個事件流里發生的,且它總是位於第二個順序。所以,當這個event handler被執行時,聚合根的主版本號一定已經是11了,且子版本號是1。那么,我們在第二個event handler中,對聚合根,只需要更新子版本號為2即可。就是第一個Update語句。然后同樣判斷影響行數是否為1。如果是,則更新位置的數量以及可用數量;如果不是1,則什么都不做。
有一個問題,什么時候會出現不是1呢?就是在這個event handler被重復執行的時候。這種情況,我們忽略即可。因為我們就是為了要做到update的冪等處理。
到這里基本差不多了。但是還需要說明一個大前提。就是上面這個大家可以看到,第一個event handler里,更新聚合根的主版本號時,where條件里會判斷聚合根記錄的當前版本號是evnt.version - 1;這個就是為了保證,讀庫更新時,總是按照domain event的發生順序依次更新的,不能跳過更新,也不能亂序。否則讀庫的最終數據就不一致了。所以,event handler內部要做這樣的判斷,確保絕對不會發生這樣的事情。但光event handler內部判斷還不夠。ENode框架也要保證event stream消息的處理順序也是這樣依次按照順序的,否則event handler里聚合根更新的影響行數也許永遠都不能為1了。
ENode已經意識到這個問題,所以已經幫我們做了這樣的保證!
總結
上面的最后一個方案,我覺得是比較通用的解決方案。框架不需要做支持跨event handler的事務,改動比較小。同時還能保證讀庫更新的性能,另外,在斷電的時候,也能保證事件被處理。
總之,一切的一切都是為了高性能、為了保證最終一致性。又花了一篇文章分享了一點小小的設計,呵呵。