開源地址:https://github.com/tangxuehua/enode
上一篇文章,簡單介紹了enode框架內部的整體實現思路,用到了staged event-driven architecture的思想。通過前一篇文章,我們知道了enode內部有兩種隊列:command queue、event queue;用戶發送的command會進入command queue排隊,domain model產生的domain event會進入event queue,然后等待被dispatch到所有的event handlers。本文介紹一下enode框架中這兩種消息隊列到底是如何設計的。
先貼一下enode框架的內部實現架構圖,這樣對大家理解后面的分析有幫助。
我們需要什么樣的消息隊列
enode的設計初衷是在單個進程內提供基於DDD+CQRS+EDA的應用開發。如果我們的業務需要和其他系統交互,那也可以,就是通過在event handler中與其他外部系統交互,比如廣播消息出去或者調用遠程接口,都可以。也許將來,enode也會內置支持遠程消息通信的功能。但是不支持遠程通信並不表示enode只能開發單機應用了。enode框架需要存儲的數據主要有三種:
- 消息,包括command消息和event消息,目前出於性能方面的考慮,是存儲在mongodb中;之所以要持久化消息是因為消息隊列里的消息不能丟失;
- 聚合根,聚合根會被序列化,然后存儲在內存緩存中,如redis或memcached中;
- 事件,就是由聚合根產生的事件,事件存儲在eventstore中,如mongodb中;
好,通過上面的分析,我們知道enode框架運行時的所有數據,就存儲在mongodb和redis這兩個地方。而這兩種存儲都是部署在獨立的服務器上,與web服務器無關。所以運行enode框架的每台web服務器上是無狀態的。所以,我們就能方便的對web服務器進行集群,我們可以隨時當用戶訪問量的增加時增加新的web服務器,以提高系統的響應能力;當然,當你發現隨着web服務器的增加,導致單台mongodb服務器或單台redis服務器處理不過來成為瓶頸時,也可以對mongodb和redis做集群,或者對數據做sharding(當然這兩種做法不是很好做,需要對mongodb,redis很熟悉才行),這樣就可以提高mongodb,redis的吞吐量了。
好了,上面的分析主要是為了說明enode框架的使用范圍,討論清楚這一點對我們分析需要什么樣的消息隊列有很大幫助。
現在我們知道,我們完全不需要分布式的消息隊列了,比如不需要MSMQ、RabbitMQ,等重量級成熟的支持遠程消息傳遞的消息隊列了。我們需要的消息隊列的特征是:
- 基於內存的消息隊列;
- 雖然基於內存,但消息不能丟失,也就是消息要支持持久化;
- 消息隊列要性能盡量高;
- 消息隊列里沒有消息的時候,隊列的消費者不能讓CPU空轉,CPU空轉會直接導致CPU占用100%,導致機器無法工作;
- 要支持多個消費者線程同時從隊列取消息,但是同一個消息只能被一個消費者處理,也就是一個消息不能同時被兩個消費者取走,也就是要支持並發的dequeue;
- 需要一種設計,實現消息至少會被處理一次;具體指:消息被消費者取走然后被處理的過程中,如果沒有處理成功(消費者自己知道有沒有處理成功)或者根本沒來得急處理(比如那時正好斷電了),那需要一種設計,可以我們有機會重新消費該消息;
- 因為我們做不到100%不會重復處理一個消息,所以我們的所有消息消費者要盡量做到支持等冪操作,就是重復的操作不會引起副作用;比如插入前先查詢是否存在就是一種支持等冪的措施;這一點,框架會盡量提供支持等冪的邏輯,當然,用戶自己在設計command handler或event handler時,也要盡量考慮等冪的問題。注意:一般command handler不用考慮,我們主要要考慮的是event handler。原因,下次文章中再細談吧。
內存隊列的設計
內存隊列,特點是快。但是我們不光是需要快,還要能支持並發的入隊和出對。那么看起來ConcurrentQueue<T>似乎能滿足我們的要求了,一方面性能還可以,另一方面內置支持了並發操作。但是有一點沒滿足,那就是我們希望當隊列里沒有消息的時候,隊列的消費者不能讓CPU空轉,CPU空轉會直接導致CPU占用100%,導致機器無法工作。幸運的是,.net中也有一個支持這種功能的集合,那就是:BlockingCollection<T>,這種集合能提供在隊列內無元素的時候block當前線程的功能。我們可以用以下的方式來實例化一個隊列:
private BlockingCollection<T> _queue = new BlockingCollection<T>(new ConcurrentQueue<T>());
並發入隊的時候,我們只要寫下面的代碼即可:
_queue.Add(message);
並發出隊的時候,只要:
_queue.Take();
我們不難看出,ConcurrentQueue<T>是提供了隊列加並發訪問的支持,而BlockingCollection<T>是在此基礎上再增加blocking線程的功能。
是不是非常簡單,經過我的測試,BlockingCollection<T>的性能已經非常好,每秒10萬次入隊出對肯定沒問題,所以不必擔心成為瓶頸。
關於Disruptor的調研:
了解過LMAX架構的朋友應該聽說過Disruptor,LMAX架構能支持每秒處理600W訂單,而且是單線程。這個速度是不是很驚人?大家有興趣的可以去了解下。LMAX架構是完全in memory的架構,所有的業務邏輯基於純內存實現,粗粒度的架構圖如下:
- Business Logic Processor完全在in memory中跑,簡稱BLP;
- Input Disruptor是一種特殊的基於內存運行的環形隊列(基於一種叫Ring Buffer的環形數據結構),負責接收消息,然后讓BLP處理消息;
- Output Disruptor也是同樣的隊列,負責將BLP產生的事件發布出去,給外部組件消費,外部組件消費后可能又會產生新的消息塞入到Input Disruptor;
LMAX架構之所以能這么快,除了完全基於in memory的架構外,還歸功於延遲率在納秒級別的disruptor隊列組件。下面是disruptor與java中的Array Blocking Queue的延遲率對比圖:
ns是納秒,我們可以從數據上看到,Disruptor的延遲時間比Array Blocking Queue快的不是一個數量級。所以,當初LMAX架構出來時,一時非常轟動。我曾經也對這個架構很好奇,但因為有些細節問題沒想清楚,就不敢貿然實踐。
通過上面的分析,我們知道,Disruptor也是一種隊列,並且也完全可以替代BlockingCollection,但是因為我們的BlockingCollection目前已經滿足我們的需要,且暫時不會成為瓶頸,所以,我暫時沒有采用Disruptor來實現我們的內存隊列。關於LMAX架構,大家還可以看一下這篇我以前寫的文章。
隊列消息的持久化
我們不光需要一個高性能且支持並發的內存隊列,還要支持隊列消息的持久化功能,這樣我們才能保證消息不會丟失,從而才能談消息至少被處理一次。
那消息什么時候持久化?
當我們發送一個消息給隊列,一旦發生成功,我們肯定認為消息已經不會丟了。所以,很明顯,消息隊列內部肯定是要在接收到入隊的消息時先持久化該消息,然后才能返回。
那么如何高效的持久化呢?
第一個想法:
基於txt文本文件的順序寫。原理是:當消息入隊時,將消息序列化為文本,然后append到一個txt1文件;當消息被處理完之后,再把該消息append到另一個txt2文件;然后,如果當前機器沒重啟,那內存隊列里當前存在的消息就是還未被處理的消息;如果機器重啟了,那如何知道哪些消息還沒被處理?很簡單,就是對比txt1,txt2這兩個文本文件,然后只要是txt1中存在,但是txt2中不存在的消息,就認為是沒被處理過,那需要在enode框架啟動時讀取txt1中這些沒被處理的消息文本,反序列化為消息對象,然后重新放入內存隊列,然后開始處理。這個思路其實挺好,關鍵的一點,這種做法性能非常高。因為我們知道順序寫文本文件是非常快的,經過我的測試,每秒200W行普通消息的文本不在話下。這意味着我們每秒可以持久化200W個消息,當然實際上我們肯定達不到這個高的速度,因為消息的序列化性能達不到這個速度,所以瓶頸是在序列化上面。但是,通過這種持久化消息的思路,也會有很多細節問題比較難解決,比如txt文件越來越大,怎么辦?txt文件不好管理和維護,萬一不小心被人刪除了呢?還有,如何比較這兩個txt文件?按行比較嗎?不行,因為消息入隊的順序和處理的順序不一定相同,比如command就是如此,當用戶發送一個command到隊列,但是處理的時候發現第一次由於並發沖突,導致command執行沒成功,所以會重試command,如果重試成功了,然后持久化該command,但是我們知道,此時持久化的時候,它的順序也許已經在后面的command的后面了。所以,我們不能按行比較;那么就要按消息的ID比較了?就算能做到,那這個比較過程也是很耗時的,假設txt1有100W個消息;txt2中有80W個消息,那如果按照ID來比較txt1中哪20W個消息還沒被處理,有什么算法能高效比較出來嗎?所以,我們發現,這個思路還是有很多細節問題需要考慮。
第二個想法:
采用NoSQL來存儲消息,通過一些思考和比較后,覺得還是MongoDB比較合適。一方面MongoDB實際上所有的存取操作優先使用內存,也就是說不會馬上持久化到磁盤。所以性能很快。另一方面,mongodb支持可靠的持久化功能,可以放心的用來持久化消息。性能方面,雖然沒有寫txt那么快,但也基本能接受了。因為我們畢竟不是整個網站的所有用戶請求的command都是放在一個隊列,如果我們的網站用戶量很大,那肯定會用web服務器集群,且每個集群機器上都會有不止一個command queue,所以,單個command queue里的消息我們可以控制為不會太多,而且,單個command queue里的消息都是放在不同的mongodb collection中存儲;當然持久化瓶頸永遠是IO,所以真的要快,那只能一個獨立的mongodb server上設計一個collection,該collection存放一個command queue里的消息;其他的command queue的消息就也采用這樣的做法放在另外的mongodb server上;這樣就能做到IO的並行,從而根本上提高持久化速度。但是這樣做代價很大的,可能需要好多機器呢,整個系統有多少個queue,那就需要多少台機器,呵呵。總而言之,持久化方面,我們還是有一些辦法可以去嘗試,還有優化的余地。
再回過頭來簡單說一下,采用mongodb來持久化消息的實現思路:入隊的時候持久化消息,出隊的時候刪除該消息;這樣當機器重啟時,要查看某個隊列有多少消息,只要通過一個簡單的查詢返回mongodb collection中當前存在的消息即可。這種做法設計簡單,穩定,性能方面目前應該還可以接受。所以,目前enode就是采用這種方法來持久化所有enode用到的內存隊列的消息。
代碼示意,有興趣的可以看看:

public abstract class QueueBase<T> : IQueue<T> where T : class, IMessage { #region Private Variables private IMessageStore _messageStore; private BlockingCollection<T> _queue = new BlockingCollection<T>(new ConcurrentQueue<T>()); private ReaderWriterLockSlim _enqueueLocker = new ReaderWriterLockSlim(); private ReaderWriterLockSlim _dequeueLocker = new ReaderWriterLockSlim(); #endregion public string Name { get; private set; } protected ILogger Logger { get; private set; } public QueueBase(string name) { if (string.IsNullOrEmpty(name)) { throw new ArgumentNullException("name"); } Name = name; _messageStore = ObjectContainer.Resolve<IMessageStore>(); Logger = ObjectContainer.Resolve<ILoggerFactory>().Create(GetType().Name); } public void Initialize() { _messageStore.Initialize(Name); var messages = _messageStore.GetMessages<T>(Name); foreach (var message in messages) { _queue.Add(message); } OnInitialized(messages); } protected virtual void OnInitialized(IEnumerable<T> initialQueueMessages) { } public void Enqueue(T message) { _enqueueLocker.AtomWrite(() => { _messageStore.AddMessage(Name, message); _queue.Add(message); }); } public T Dequeue() { return _queue.Take(); } public void Complete(T message) { _dequeueLocker.AtomWrite(() => { _messageStore.RemoveMessage(Name, message); }); } }
如何保證消息至少被處理一次
思路應該很容易想到,就是先把消息從內存隊列dequeue出來,然后交給消費者處理,然后由消費者告訴我們當前消息是否被處理了,如果沒被處理好,那需要嘗試重試處理,如果重試幾次后還是不行,那也不能把消息丟棄了,但也不能無休止的一直只處理這個消息,所以需要把該消息丟到另一個專門用於處理需要重試的本地純內存隊列。如果消息被處理成功了,那就把該消息從持久化設備中刪除即可。看一下代碼比較清晰吧:
private void ProcessMessage(TMessageExecutor messageExecutor) { var message = _bindingQueue.Dequeue(); if (message != null) { ProcessMessageRecursively(messageExecutor, message, 0, 3); } } private void ProcessMessageRecursively(TMessageExecutor messageExecutor, TMessage message, int retriedCount, int maxRetryCount) { var result = ExecuteMessage(messageExecutor, message); //這里表示在消費(即處理)消息 //如果處理成功了,就通知隊列從持久化設備刪除該消息,通過調用Complete方法實現 if (result == MessageExecuteResult.Executed) { _bindingQueue.Complete(message); } //如果處理失敗了,就重試幾次,目前是3次,如果還是失敗,那就丟到一個重試隊列,進行永久的定時重試 else if (result == MessageExecuteResult.Failed) { if (retriedCount < maxRetryCount) { _logger.InfoFormat("Retring to handle message:{0} for {1} times.", message.ToString(), retriedCount + 1); ProcessMessageRecursively(messageExecutor, message, retriedCount + 1, maxRetryCount); } else { //這里是丟到一個重試隊列,進行永久的定時重試,目前是每隔5秒重試一下,_retryQueue是一個簡單的內存隊列,也是一個BlockingCollection<T> _retryQueue.Add(message); } } }
代碼應該很清楚了,我就不多做解釋了。
總結:
本文主要介紹了enode框架中消息隊列的設計思路,因為enode中有command queue和event queue,兩種queue,所以邏輯是類似的;所以本來還想討論一下如何抽象和設計這些queue,已去掉重復代碼。但時間不早了,下次再詳細講吧。