前言
之前寫了一篇文章,總體介紹了EQueue。在看這篇文章之前如果還沒看過那篇文章,可能會看不懂這篇文章。所以建議沒看過的朋友務必先看一下那篇文章中所提到的各種概念,這樣才能更好的理解本文所說的內容。說實話我當初寫EQueue也是抱着一種玩的態度的,就是想嘗試寫一個分布式消息隊列,用來為ENode提供分布式消息通信的能力。后來寫着寫着,發現越來越好玩,因為覺得這個隊列以后應該會很實用,所以就花了更多的時間去設計它,完善它。希望它最終能被更多的人使用。到目前為止,我覺得目前基本實現了以下特性:輕量級、分布式、高性能、消息可持久化、支持大量堆積消息(不受限於內存大小)、支持消費者集群、消費者流控、消息的監控支持。本文我想再重點分析一下消息的持久化以及消息堆積方面的設計。首先說明一下,這里的設計都是我個人的設計,沒有參考成熟的消息隊列的做法,比如rabbitmq, rocketmq, kafka,等。我覺得能夠按照自己的思路去實現一個作品,這種感覺真好!接下來我們開始分析吧。
EQueue核心架構簡介
關於消息的持久化,我在第一篇文章中其實已經基本提到過,請看這個鏈接。為了后面方便說明問題,我在這里再貼一下EQueue的核心概念圖:
如上圖所示,EQueue主要由三部分組成:1)Producer,即消息生產者;2)Broker,即存儲消息的地方,上圖中紅框的部分;3)Consumer,即消息消費者。
消息的數據結構設計
上圖中的Broker我們可以理解為一台服務器,該服務器就是用來存儲消息的。Producer發送消息是發到這個服務器,Consumer消費消息也是從這個服務器拿消息。Broker上的每個消息都有Topic和Queue的概念。Topic與Queue的關系是,一個Topic下可以有多個Queue,QueueId用一個序號來區分。比如一個Topic下有4個Queue,那這幾個Queue的QueueId分別為0,1,2,3。然后一個Producer發送消息時,它會按照某個路由方式(ENode中會按照聚合根ID或CommandId的hashcode取模來路由到某個特定的Queue)來把消息發送到Broker上當前Topic下的哪個Queue。所以,在發送一個消息時,Producer除了會傳遞消息的內容外,還會傳遞消息的Topic以及QueueId來告訴Broker,這個消息是要放在哪里的。然后Broker根據Topic以及QueueId得到對應的Queue,就可以開始做持久化消息的邏輯了。下面我們看一下一個持久化在Broker上的消息的數據結構:
[Serializable] public class QueueMessage { /// <summary>消息所屬Topic /// </summary> public string Topic { get; set; } /// <summary>消息內容,一個二進制數組 /// </summary> public byte[] Body { get; set; } /// <summary>消息的全局序號 /// </summary> public long MessageOffset { get; set; } /// <summary>消息所屬的隊列ID /// </summary> public int QueueId { get; set; } /// <summary>消息在所屬隊列的序號 /// </summary> public long QueueOffset { get; set; } /// <summary>消息的存儲時間 /// </summary> public DateTime StoredTime { get; set; } }
MessageOffset是消息的全局序號,就是Broker上的消息的唯一ID,QueueOffset是該消息在Queue中的序號,是在該Queue中唯一。其他的屬性應該就不用多解釋了。
從上面的數據結構可以看出,只要我們持久化了消息,那只要消息不丟,那我們一定可以根據這些消息重建出所有的Topic以及Queue,以及可以知道消息在Queue中的序號與其全局序號的映射關系,因為消息本身都已經包含了這個映射關系了。
消息的持久化和堆積的思考
為什么要持久化?因為我們的消息不能丟,我覺得這一個理由就夠了!那么怎么持久化?我能想到的方案有三個:1)寫文件;2)存key/value的nosql,比如leveldb;3)存儲在關系型db,比如Sql Server;
關於前面兩個方案的討論,在前一篇文章中有比較詳細的討論,本文不想多討論了。因為EQueue最終采用的是第三種方案,也就是用關系型db來持久化,目前實現的是Sql Server。
選擇Sql Server作為消息存儲的權衡考慮
如果是每個過來的消息,都直接持久化到Sql Server,那持久化會成為瓶頸。所以,在性能和可能丟消息之間,我選擇了可能丟消息的方案。所以我設計為定時持久化消息(就像我們寫入文件的內存,nosql或者操作系統都是有緩存的,並沒有立即刷入磁盤一樣!),現在是每隔500毫秒,使用SqlBulkCopy的方法批量持久化消息。大家知道Sql Server的SqlBulkCopy的性能是非常高的,據我個人的測試,每秒20W肯定不是問題。所以,我可以認為,SqlBulkCopy不會成為消息持久化的瓶頸。但是因為是定時持久化的。所以假如服務器斷電了,那我們有可能會丟失最近500毫秒的消息。實際情況有可能丟失的還會更多。因為假如Sql Server有問題,導致消息都沒辦法持久化,那斷電的時候,我們丟失的就不只是500ms的消息了。那有辦法嗎?就是監控,我覺得我們可以通過對消息隊列的監控,如果發現當前已經持久化的消息的MessageOffset和當前最新的最大MessageOffset之間的差值大於一定的警界值,就報警。這樣我們就能及時發現消息的持久化是否有大量延遲,提早做出措施解決。好,通過SqlBulkCopy的定時持久化,消息持久化的性能瓶頸解決了。同時我們也知道了潛在的風險以及如何預防這個風險了。另外一個保護措施是,我們可以給我們的Broker服務器配置UPS電源,保證服務器斷電時,還能繼續工作。
消費者拉取消息時,從內存直接拉取還是從db拉取的權衡
假如,我們的消息都在內存,那消費者拉取性能肯定很高,因為當消費者要獲取並消費消息時,它總是在內存,所以不會有IO,所以性能好。但是問題是,內存有限,放不下很多的消息。假設一個消息200字節,那16G的內存,最多只能放下8500W多個消息。這個數量顯然還是不夠高,因為我們的目標是大量消息的堆積,所以我們必須要做出一些權衡。我覺得是否可以這樣,我們可以配置一個閥值,比如1000W,只要當前Broker上目前存在的消息數不到1000W,那就放在內存;如果超過1000W,則不再存放在內存,而是僅存儲在db。但是,好像如果通過閥值來判斷的話,好像還需要開發者自己評估Broker服務器的內存大小以及平均每個消息的大小,從而才能計算出一個合理的閥值。能否讓框架自動根據當前內存的使用量來評估是否后續的消息可以放在內存呢?之前我也是這個想法,但是后來還是采用了閥值的方案。因為后來實現的時候,發現效果不好,而且由於消息消費后還會被定時刪除,且消息也在同時不斷增加,且我們拿到的當前服務器的可用內存有可能不是那么准確。總之,算法比較復雜,且不太穩定。所以,最后還是采用了簡單可靠的閥值方案。
那么這樣的話,就是一部分消息在內存,一部分不在。那當我們拉取到不在內存的消息時,那不是要去訪問db了?那消費者消費消息性能不是會急劇降低了嗎?我也有這個顧慮,所以,想了個辦法。就是當發現當前要消費的消息不在內存時,則一次性批量從db拉取N多消息,比如5000個(可配置)。拉取過來后,立馬存儲到內存。這樣當消費者要消費這5000個消息時,我們能確保這些消息一定在內存了。也就是說,最差的情況,假如現在所有的消息都不在內存,那我們訪問db的頻率是,每5000個消息,去訪問一次db。這樣我們可以極大的降低拉取消息的頻率,從而基本不會影響消費者消費消息的性能。另外你可能會擔心一次性從db拉取5000個消息可能會比較耗時,這點我測試了下,性能還是很高的,肯定不會成為瓶頸。因為我們的消息的主鍵就是消息的全局序號,是一個long類型的值,在db的消息表中就是主鍵,根據主鍵用>=, <=的條件去查詢,性能是非常快的。
另外,實際上由於這個閥值是基於當前內存中還沒被消費的消息數來判斷的,只要我們的消費者性能跟得上生產者發送消息的性能,那Broker上理論上不會有大量堆積的消息。因為我們的消息假如被及時消費了,那Broker上會有定時的線程,定時從Broker的內存移除已被所有消費者已消費的消息。這樣Broker上目前還沒被消費的消息數應該不會到達閥值。這樣所有的消息都一定是在內存存在的。只有當消費者的性能長時間跟不上生產者的性能時,才會出現消息堆積,我們才會啟動閥值保護措施,將超過閥值的消息不再放入內存,以防止內存爆滿,使機器掛掉。所以,我們可以預見,監控系統是多么重要,我們應該及時關注監控數據,設置好報警規則,確保盡量讓消息都在內存,這是一個好的實踐。
通過上面的分析,我們知道了如何確保內存不會被沒有消費的消息占滿的一個措施,接下來我們來看看消息索引的問題。
假如消息的索引信息太多,內存放不下了呢?
上面有提到過,一個Topic下有多個Queue,每個Queue實際上就是一個邏輯隊列,該隊列里存放了消息當前在Queue中的位置(序號)以及該消息的全局位置(序號)之間的映射關系。簡單的說,就是每個Queue中有一個ConcurrentDictionary<long, long>,key就是queueOffset,value就是messageOffset。我把這個映射關系字典叫做消息的索引信息。
通過前面我們對消息的數據結構的分析,我們知道這些Queue我們是無需持久化的,也就是說我們可以放在內存。那內存真的放的下嗎?經過我的測試,我發現ConcurrentDictionary<long, long>還是很占用內存的。每100M的內存測試下來只能放100W個中等大小的消息索引(long類型的key/value鍵值對)。這樣的話,1G內存只能放1000W個左右的索引了,10G內存也只能放1億個左右的索引。太少了!那怎么辦呢?思路是,既然放不下,那就內存也不放了。我思考后發現,其實我們無須把每個消息的索引都放在Queue里,我們其實只要知道當前Queue的當前最大的QueueOffset即可。
然后,任何一個消費者消費消息時,都是面向單個Queue的,EQueue采用的是拉模型(Pull Message),消費者每次過來拉消息都是拉一批去消費,現在默認值是32個。所以,假如當前Queue的最大QueueOffset是100,然后消費者發過來的拉取消息的請求中要拉取的起始消息的QueueOffset是50,然后假如現在Queue里的第50個開始到100個之間的所有消息所以都不在內存。那怎么辦呢?很簡單,我們可以在此時,用一條SQL到Sql Server中的消息表中批量拉取一定數量的消息索引(比如5000(可配置)個,實際可能只能拉取到50個,因為當前這個Queue最大的Offset只有100),然后將這些拉取到的索引設置到Queue上,這樣這個Queue上就有50到100的這些消息的索引信息了。這樣我們就能知道從50到81(共32個索引)之間的這些消息的全局序號了,從而就能拿到最終的消息了,然后就能返回給消費者了。由於這里我們也是批量拉取消息索引,所以和前面一樣,也不會有什么性能問題。
這里,因為這里要根據Topic, QueueId, QueueOffset這幾個條件從消息表中查詢消息,所以如果不使用Sql Server這種關系型數據庫,還真不好實現。因為假如用文件持久化消息或者用key/value的nosql,雖然持久化性能很高,都可以立即持久化(實際上也沒那么好,呵呵,因為NoSQL也不是立即刷盤的哦!斷電后也有丟數據的風險),但都不支持像這種需要用到二級索引的查詢需求。所以,這也是我傾向於考慮使用關系型db的原因。另外,為了查詢時性能達到最優,我對這三個字段建立了索引,確保不會全表掃描,提高查詢性能。
實際上,我這里也給Queue可以在內存緩存多少個消息索引設計了一個閥值,默認是500W(大家可以根據實際情況和Queue的多少來配置,以后EQueue會考慮給不同Topic下的Queue支持配置不同的閥值)。也就是說,只要Queue中的消息索引不超過500W,那所有索引都在內存,只有超過了500W,才不會再把消息索引放在內存。同樣,因為只要消息被消費及時的話,Queue中的消息索引都會被及時移除,這樣Queue中的消息索引一般就不可能超過500W了。只有消費者消費消息的速度長時間低於產生消息的速度,才會出現Queue中的消息索引出現堆積最終超過500W的情況。同理,我們這里只要做好監控,那也能保證不會到達閥值。
好了,Broker上最占用內存的2個點我們都分析了如何控制內存的占用大小。總體思路是通過控制內存中緩存的消息數以及消息的索引數來達到內存不會因為消息和消息索引的不斷增加而導致用完。然后在內存中消息或消息索引不存在時(監控做的話的話,這種情況一般不會發生),通過批量從db快速拉取(兩種拉取都是可以走索引)再放入內存的方式,來實現消費者拉取消息時,依然能保持高性能。
消息的監控設計
通過上面的分析,我們知道,我們非常需要知道,當前Broker上的消息堆積情況。當出現有堆積時,我們要盡快做出處理。所以,我實現了最基本的監控數據的展示。主要實現了兩個頁面:
1)展示當前Broker上有哪些Queue,每個Queue的當前Offset,當前已被所有消費者消費的最大Offset,Queue中的當前消息數。如下如所示:
2)展示當前Broker上每個Queue被消費的情況,每個Queue正在被哪個消費者消費,已經被這個消費者消費到哪里了(也就是消費位置)。如下圖所示:
具體的數據,大家可以到http://www.enode.me/equeueadmin這個監控頁面查看。上面的監控數據還只是最基礎的,但也是最重要的數據。以后我會不斷豐富監控數據以及添加報警功能。
ps: http://www.enode.me是用ENode實現的一個簡單論壇,以上這個監控頁面的地址則是對這個論壇的消息監控頁面。
又快2點了,就到這吧!又收獲一篇,開心,呵呵。