EQueue - 一個純C#寫的分布式消息隊列介紹2


一年前,當我第一次開發完EQueue后,寫過一篇文章介紹了其整體架構,做這個框架的背景,以及架構中的所有基本概念。通過那篇文章,大家可以對EQueue有一個基本的了解。經過了1年多的完善,EQueue無論是功能上還是成熟性上都完善了不少。所以,希望再寫一篇文章,介紹一下EQueue的整體架構和關鍵特性。

EQueue架構

EQueue是一個分布式的、輕量級、高性能、具有一定可靠性,純C#編寫的消息隊列,支持消費者集群消費模式。

主要包括三個部分:producer, broker, consumer。producer就是消息發送者;broker就是消息隊列服務器,負責接收producer發送過來的消息,以及持久化消息;consumer就是消息消費者,consumer從broker采用拉模式到broker拉取消息進行消費,具體采用的是long polling(長輪訓)的方式。這種方式的最大好處是可以讓broker非常簡單,不需要主動去推消息給consumer,而是只要負責持久化消息即可,這樣就減輕了broker server的負擔。同時,consumer由於是自己主動去拉取消息,所以消費速度可以自己控制,不會出現broker給consumer消息推的太快導致consumer來不及消費而掛掉的情況。在消息實時性方面,由於是長輪訓的方式,所以消息消費的實時性也可以保證,實時性和推模型基本相當。

EQueue是面向topic的架構,和傳統的MSMQ這種面向queue的方式不同。使用EQueue,我們不需要關心queue。producer發送消息時,指定的是消息的topic,而不需要指定具體發送到哪個queue。同樣,consumer發送消息也是一樣,訂閱的是topic,不需要關心自己想從哪個queue接收消息。然后,producer客戶端框架內部,會根據當前的topic獲取所有可用的queue,然后通過某種queue select strategy選擇一個queue,然后把消息發送到該queue;同樣,consumer端,也會根據當前訂閱的topic,獲取其下面的所有的queue,以及當前所有訂閱這個topic的consumer,按照平均的方式計算出當前consumer應該分配到哪些queue。這個分配的過程就是消費者負載均衡。

Broker的主要職責是:

發送消息時:負責接收producer的消息,然后持久化消息,然后建立消息索引信息(把消息的全局offset和其在queue中的offset簡歷映射關系),然后返回結果給producer;

消費消息時:負責根據consumer的pull message request,查詢一批消息(默認是一次pull request拉取最多32個消息),然后返回給consumer;

各位看官如果對EQueue中的一些基本概念還不太清楚,可以看一下我去年寫的介紹1,寫的很詳細。下面,我想介紹一下EQueue的一些有特色的地方。

EQueue關鍵特性

高性能與可靠性設計

網絡通信模型,采用.NET自帶的SocketAsyncEventArgs,內部基於Windows IOCP網絡模型。發送消息支持async, sync, oneway三種模式,無論是哪種模式,內部都是異步模式。當同步發送消息時,就是框架幫我們在異步發送消息后,同步等待消息發送結果,等到結果返回后,才返回給消息發送者;如果一定時間還不返回,則報超時異常。在異步發送消息時,采用從EventStore開源項目中學習到的優秀的socket消息發送設計,目前測試下來,性能高效、穩定。通過幾個案例運行很長時間,沒有出現通信層方面的問題。

broker消息持久化的設計。采用WAL(Write-Ahead Log)技術,以及異步批量持久化到SQL Server的方式確保消息高效持久化且不會丟。消息到達broker后,先寫入本地日志文件,這種設計在db, nosql等數據庫中很常見,都是為了確保消息或請求不丟失。然后,再異步批量持久化消息到SQL Server,采用.NET自帶的SqlBulkCopy技術。這種方式,我們可以確保消息持久化的實時性和很高的吞吐量,因為一條消息只要寫入本地日志文件,然后放入內存的一個dict即可。

當broker意外宕機,可能會有一些消息還沒持久化到SQL Server;所以,我們在重啟broker時,我們除了先從SQL Server恢復所有未消費的消息到內存外,同時記錄當前SQL Server中的最后一條消息的offset,然后我們從本地日志文件掃描offset+1開始的所有消息,全部恢復到SQL Server以及內存。

需要簡單提一下的是,我們在把消息寫入到本地日志文件時,不可能全部寫入到一個文件,所以要拆文件。目前是根據log4net來寫消息日志,每100MB一個日志文件。為什么是100MB?是因為,我們的這個消息日志文件的用途主要是用來在Broker重啟時,恢復SQL Server中最后還沒來得及持久化的那些消息的。正常情況下,這些消息量應該不會很多。所以,我們希望,當掃描本地日志文件時,盡量能快速的掃描文件。通常100MB的消息日志文件,已經可以存儲不少的消息量,而SQL Server中未持久化的消息通常不會超過這個量,除非當機前,出現長時間消息無法持久化的情況,這種情況,應該會被我們監控到並及時發現,並采取措施。當然,每個消息日志文件的大小,可以支持配置。另外一點,就是從日志文件恢復的時候,還是需要有一個算法的,因為未被持久化的消息,有可能不只在最近的一個消息日志文件里,有可能在多個日志文件里,因為就像前面所說,會出現大量消息沒有持久化到SQL Server的情況。

但總之,在保證高性能的前提下,消息不丟(可靠性)是完全可以保證的。

消費消息方面,采用批量拉取消息進行消費的方式。默認consumer一個pull message request會最多拉取32個消息(只要存在這么多未消費消息的話);然后consumer會並行消費這些消費,除了並行消費外,也可以配置為單線程線性消費。broker在查詢消息時,一般情況未消費消息總是在內存的,只有有一種情況不在內存,這個下面詳細分析。所以,查詢消息應該說非常快。

不過上面提到的消息可靠性,只能盡量保證單機不丟消息。由於消息是放在DB,以及本地日志。所以,如果DB服務器硬盤壞了,或者broker的硬盤壞了,那就會有丟消息的可能性。要解決這個問題,就需要做replication了。EQueue下一步會支持broker的集群和故障轉移(failover)。目前,我開發了一個守護進程服務,會監控broker進程是否掛掉,如果掛掉,則自動重啟,一定程度上也會提高broker的可用性。

我覺得,做事情,越簡單越好,不要一開始就搞的太復雜。復雜的東西,往往難以維護和駕馭,雖然理論很美好,但總是會出現各種問題,呵呵。就像去中心化的架構雖然理論好像很美好,但實際使用中,發現還是中心化的架構更好,更具備實戰性。

支持消費者負載均衡

消費者負載均衡是指某個topic的所有消費者,可以平均消費這個topic下的所有queue。我們使用消息隊列,我認為這個特性非常重要。設想,某一天,我們的網站搞了一個活動,然后producer產生的消息猛增。此時,如果我們的consumer服務器如果還是只有原來的數量,那很可能會來不及處理這么多的消息,導致broker上的消息大量堆積。最終會影響用戶請求的響應時間,因為很多消息無法及時被處理。

所以,遇到這種情況,我們希望分布式消息隊列可以方便的允許我們動態添加消費者機器,提高消費能力。EQueue支持這樣的動態擴展能力。假如某個topic,默認有4個queue,然后每個queue對應一台consumer機器進行消費。然后,我們希望增加一倍的consumer時,只要在EQueue Web控制台上,為這個topic增加4個queue,然后我們再新增4台consumer機器即可。這樣EQueue客戶端會支持自動負載均衡,幾秒鍾后,8個consumer就可以各自消費對應的queue了。然后,當活動過后,消息量又會回退到正常水平,那么我們就可以再減少queue,並下線多余的consumer機器。

另外,EQueue還充分考慮到了下線queue時的平滑性,可以支持先凍結某個queue,這樣可以確保不會有新的消息發送到該queue。然后我們等到這個queue的消息都消費完后,就可以下線consumer機器和刪除該queue了。這點,應該說,阿里的rocketmq也沒有做到,呵呵。

broker支持大量消息堆積

這個特性,我之前專門寫過一篇文章,詳細介紹設計思路,這里也簡單介紹一下。MQ的一個很重要的作用就是削峰,就是在遇到一瞬間大量消息產生而消費者來不及一下子消費時,消息隊列可以起到一個緩沖的作用,從而可以確保消息消費者服務器不會垮掉,這個就是削峰。如果使用RPC的方式,那最后所有的請求,都會壓倒DB,DB就會承受不了這么多的請求而掛掉。

所以,我們希望MQ支持消息堆積的能力,不能因為為了快,而只能支持把消息放入服務器內存。因為服務器內存的大小是有限的,假設我們的消息服務器內存大小是128G,每個消息大小為1KB,那差不多最多只能堆積1.3億個消息。不過一般來說1.3億也夠了,呵呵。但這個畢竟要求大內存作為前提的。但有時我們可能沒有那么大的服務器內存,但也需要堆積這么多的消息的能力。那就需要我們的MQ在設計上也提供支持。EQueue可以允許我們在啟動時配置broker服務器上允許在內存里存放的消息數以及消息隊列里消息的全局offset和queueOffset的映射關系(我稱之為消息索引信息)的數量。我們可以根據我們的服務器內存的大小進行配置。然后,broker上會有定時的掃描線程,定時掃描是否有多出來的消息和消息索引,如果有,則移除多出來的部分。通過這個設計,可以確保服務器內存一定不會用完。但是否要移除也有一個前提,就是必須要求這個消息已經持久化到SQL Server了。否則就不能移除。這個應該通常可以保證,因為一般不會出現1億個消息都還沒持久化到DB,如果出現這個情況,說明DB一定出了什么嚴重的問題,或者broker無法與db建立連接了。這種情況下,我們應該早就已經發現了,EQueue Web監控控制台上隨時可以查看消息的最大全局offset,已經持久化的最大全局offset。

上面這個設計帶來的一個問題是,假如現在consumer要拉取的消息不在內存了怎么辦?一種辦法是從DB把這個消息拉取到內存,但一條條拉,肯定太慢了。所以,我們可以做一個優化,就是發現當前消息不在內存時,因為很可能下一條消息也不在內存,所以我們可以一次性從Sql Server DB拉取10000個消息(可配置),這樣后續的10000個消息就一定在內存了,我們需要再訪問DB。這個設計其實是在內存使用和拉取消息性能之間的一個權衡后的設計。Linux的pagecache的目的也是這個。

另外一點,就是我們broker重啟時,不能全部把所有消息都恢復到內存,而是要判斷是否已經到達內存可以承受的最大消息數了。如果已經到達,那就不能再放入內存了;同理,消息索引信息的恢復也是一樣。否則,在消息堆積過多的時候,就會導致broker重啟時,內存爆掉了。

消息消費進度更新的設計

EQueue的消息消費進度的設計,和kafka, rocketmq是一個思路。就是定時保存每個queue的消費進度(queue consumed offset),一個long值。這樣設計的好處是,我們不用每次消費完一個消息后,就立即發送一個ack回復消息到broker。如果是這樣,對broker的壓力是很大的。而如果只是定時發送一個消費進度,那對broker的壓力很小。那這個消費進度怎么來?就是采用滑動門技術。就是consumer端,在拉取到一批消息后,先放入本地內存的一個SortedDictionary里。然后繼續去拉下一批消息。然后會啟動task去並行消費這些剛剛拉取到的消息。所以,這個本地的SortedDictionary就是存放了所有已經拉取到本地但還沒有被消費掉的消息。然后當某個task thread消費掉一個消息后,會把它從SortedDictionary中移除。然后,我上面所說的滑動門技術,就是指,在每次移除一個消息后,獲取當前SortedDictionary里key最小的那個消息的queue offset。隨着消息的不斷消費,這個queue offset也會不斷增大,從宏觀的角度看來,就像是一扇門在不停的往前移動。

但這個設計有個問題,就是假如這個Dict里,有一個offset=100的消息一直沒被消費掉,那就算后面的消息都被消費了,最后這個滑動門還是不會前進。因為這個dict里的最后的那個queue offset總是100。這個應該好理解的吧。所以這種情況下,當consumer重啟后,下次消費的位置還是會從100開始,后面的也會重新消費一遍。所以,我們的消費者內部,需要都支持冪等處理消息。

支持消息回溯

因為broker上的消息,不是消息消費掉了就立即刪除,而是定時刪除,比如每2天刪除一次(可以配置)。所以,當我們哪天希望重新消費1天前的消息的時候,EQueue也是完全支持的。只要在consumer啟動前,修改消費進度到以前的某個特定的值即可。

Web管理控制台

EQueue有一個完善的Web管理控制台,我們可以通過該控制台管理topic,管理queue,查看消息,查看消息消費進度,查看消息堆積情況等信息。但是目前還不支持報警,以后會慢慢增加報警功能。

通過這個控制台,我們使用EQueue就會方便很多,可以實時了解消息隊列服務器的健康狀況。貼一個管理控制台的UI界面,讓大家有個印象:

 

EQueue未來的計划

  1. broker支持集群,master-slave模式,使其具備更高的可用性和擴展性;
  2. Web管理控制台支持報警;
  3. 出一份性能測試報告,目前我主要是沒有實際服務器,沒辦法實際測試;
  4. 考慮支持非DBC持久化的支持,比如本地key/value存儲支持,或者完全的本地文件持久化消息(難度很大);
  5. 其他小功能完善和代碼局部調整;

我相信:沒有做不好,只有沒耐心。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM