前言
最近花了我幾個月的業余時間,對EQueue做了一個重大的改造,消息持久化采用本地寫文件的方式。到現在為止,總算完成了,所以第一時間寫文章分享給大家這段時間我所積累的一些成果。
- EQueue開源地址:https://github.com/tangxuehua/equeue
- EQueue相關文檔:http://www.cnblogs.com/netfocus/category/598000.html
- EQueue Nuget地址:http://www.nuget.org/packages/equeue
昨天,我寫過一篇關於EQueue 2.0性能測試結果的文章,有興趣的可以看看。
文章地址:http://www.cnblogs.com/netfocus/p/4926305.html
為什么要改為文件存儲?
SQL Server的問題
之前EQueue的消息持久化是采用SQL Server的。一開始我覺得沒什么問題,采用的是異步定時批量持久化,使用SqlBulkCopy的方法,這個方法測試下來,批量插入消息的性能還不錯,就決定使用了。一開始我並沒有在使用到EQueue后做集成的性能測試。在功能上確實沒什么問題了。而且使用DB持久化也有很多好處,比如消息查詢很簡單,DB天生支持各種方式的查詢。刪除消息也非常簡單,一條DELETE語句即可。所以功能實現比較順利。但后來當我對EQueue做性能測試時,發現一些問題。當數據庫服務器和Broker本身部署在不同的服務器上時,持久化消息也會走網卡,消耗帶寬,影響消息的發送和消費的TPS。而如果數據庫服務器部署在Broker同一台服務器上,則因為SQLServer本身也會消耗CPU以及內存,也會影響Broker的消息發送和消費的TPS。另外SqlBulkCopy的速度,再本身機器正在接收大量的發送消息和拉取消息的請求時,會不太穩定。經過一些測試,發現整個EQueue Broker的性能不太理想。然后又想想,Broker服務器有有一個硬件一直沒有好好利用起來,那就是硬盤。假設我們的消息是持久化到本地硬盤的,順序寫文件,就應該能解決SQL Server的問題了。所以,開始調研如何實現文件持久化消息的方案了。
消息緩存在托管內存的GC的問題
之前消息存儲在SQL Server,如果消費者每次讀取消息時,總是從數據庫去讀取,那對數據庫就是不斷的寫入和讀取,性能不太理想。所以當初的思路是,盡量把最近可能要被消費的消息緩存在本地內存中。當初的做法是設計了一個很大的ConcurrentDictionary<long, Message>,這個字典就是存放了所有可能會被消費的消息。如果要消費的消息當前不在這個字典里,就批量從DB拉取一批出來消費。這個設計可以盡可能的避免讀取DB的情況。但是帶來了另一個問題。就是我們對這個字典在高並發不斷的寫入和讀取。且這個字典里緩存的消息又很多,到到達幾百上千萬時,GC的壓力過大,導致很多線程都會被阻塞。嚴重影響Broker的TPS。
所以,基於上面的兩個主要原因,我想到了兩個思路來解決:1)采用寫文件的方式來持久化消息;2)使用非托管內存來緩存將要被消費的消息;下面我們來看看這兩個設計的一些關鍵問題的設計思路。
文件存儲的關鍵問題設計
心路背景
之前一直無法駕馭寫文件的設計。因為精細化的將數據寫入文件,並能要精確的讀取想要的數據,實在沒什么經驗。之前雖然也知道阿里的RocketMQ的消息持久化也是采用順序寫文件的方式的,但是看了代碼,發現設計很復雜,一下子也比較難懂。嘗試看了多次也無法完全理解。所以一直無法掌握這種方式。有一天不經意間想到之前看過的EventStore這個開源項目中,也有寫文件的設計。這個項目是CQRS架構之父greg young所主導的開源項目,是一個專門為ES(Event Sourcing)設計模式中提供保存事件流支持的事件流存儲系統。於是下定決心專研其源碼,看C#代碼肯定還是比Java容易,呵呵。經過一段時間的摸索之后,基本學到了它是如何寫文件以及如何讀文件的。了解了很多設計思路。然后,在看懂了EventStore的文件存儲設計之后,再去看RocketMQ的文件持久化的設計,發現驚人的相似。原來看不懂的代碼現在也能看懂了,因為思路差不多的。所以,這給我開始動手提供了很大的信心。經過自己的一些准備(文件讀寫的性能驗證)和設計思路整理后,終於開始動手了。
如何寫消息到文件?
其實說出來也很簡單。之前一直以為寫文件就是一個消息一行唄。這樣當我們要找哪個消息時,只需要知道行號即可。確實,理論上這樣也挺好。但上面這兩個開源項目都不是這樣做的,而是都是采用更精細化的直接寫二進制的方式。搞清楚寫入的格式之后,還要考慮一個文件寫不下的時候怎么辦?因為一個文件總是有大小的,比如1G,那超過1G后,必然要創建新的文件,然后把消息寫入新的文件。所以,我們就又有了Chunk的概念。一個Chunk就是一個文件,假設我們現在實現了一個FileMessageStore,表示對文件持久化的封裝,那這個FileMessageStore肯定維護了一堆的Chunk。然后我們也很容易想到一點,就是Chunk有3種狀態:1)New,表示剛創建的Chunk,這種Chunk我們可以寫入新消息進去;2)Completed,已寫入完成的Chunk,這種Chunk是只讀的;3)OnGoing的Chunk,就是當FileMessageStore初始化時,要從磁盤的某個chunk的目錄下加載所有的Chunk文件,那不難理解,最后一個文件之前的Chunk文件應該都是Completed的;最后一個Chunk文件可能寫入了一半,就是之前沒完全用完的。所以,本質上New和Ongoing的Chunk其實是一樣的,只是初始化的方式不同。
至此,我們知道了寫文件的兩個關鍵思路:1)按二進制寫;2)拆分為Chunk文件,且每個Chunk文件有狀態;按二進制寫主要的思路是,假如我們當前要寫入的消息的二進制數組大小為100個字節,也就是說消息的長度為100,那我們可以先把消息的長度寫入文件,再接着寫入消息本身。這樣我們讀取消息時,只要知道了寫入消息長度時的那個Position,就能先讀取到消息的長度,然后就能知道接下來要讀取多少字節為消息內容。從而能正確讀取消息出來。
另外再分享一點,EventStore中,寫入一個事件到文件中時,還會在寫入消息內容后再寫入這個消息的長度到文件里。也就是說,寫入一個數據到文件時,會在頭尾都寫入該數據的長度。這樣做的好處是什么呢?就是當我們想從后往前讀數據時,也能方便的做到,因為每個數據的前后都記錄了該數據的長度。這點應該不難理解吧?而EventStore是一個面向流的存儲系統,我們對事件流確實可能從前往后讀,也可能是從后往前讀。另外這個設計還有一個好處,就是起到了校驗數據合法性的目的。當我們根據長度讀取數據后,再數據之后再讀取一個長度,如果這兩個長度一致,那數據應該就沒問題的。在RocketMQ中,是通過CRC校驗的方式來保證讀取的數據沒有問題。我個人還是比較喜歡EventStore的做法。所以EQueue里現在寫入數據就是這樣做的。
上面我介紹了一種寫入不定長數據到文件的設計思路,這種設計是為了解決寫入消息到文件的情況,因為消息的長度是不定的。在EQueue中,我們還有一另一種寫文件的場景。就是隊列信息的持久化。EQueue的架構是一個Topic下有多個Queue,每個Queue里有很多消息,消費者負載均衡是通過給消費者分配均勻數量的Queue的方式來達到的。這樣我們只要確保寫入Queue的消息是均勻的,那每個Consumer消費到的消息數就是均勻的。那一個Queue里記錄的是什么呢?就是一個消息和其在隊列的位置的對應關系。假設消息寫入在文件的物理位置為10000,然后這個消息在Queue里的索引是100,那這個隊列就會把這兩個位置對應起來。這樣當我們要消費這個Queue中索引為100的消息時,就能找到這個消息在文件中的物理位置為10000,就能根據這個位置找到消息的內容了。如果是托管內存,我們只需要弄一個Dictionary,key是消息在隊列中的Offset,value是消息在文件中的物理Offset即可。這樣我們有了這個dict,就能輕松建立起對應關系了。但上面我說過,這種巨大的dict是要占用內存的,會有GC的問題。所以更好的辦法是,把這個對應關系也寫入文件,那怎么做呢?這時就又需要更精細化的設計了。想到了其實也很簡單,這個設計我是從RocketMQ中學到的。就是我們設計一種固定長度的結構體,這個結構體里就存放一個數據,就是消息在文件的物理位置(為了后面好表達,我命名為MessagePosition),一個Long值,一個Long的長度是8個字節。也就是說,這個文件中,每個寫入的數據的長度都是8個字節。假設我們一個文件要保存100W個MessagePosition。那這個文件的長度就是100W * 8這么多字節,大概為7.8MB。那么這樣做有什么好處呢?好處就是,假如我們現在要消費這個Queue里的第一個消息,那這個消息的MessagePosition在這個文件中的位置0,第二個消息在這個文件中的位置是8,第三個就是16,以此類推,第N 個消息就是(N-1) * 8。也就是說,我們無須顯式的把消息在隊列中的位置信息也寫入到文件,而是通過這樣的固定算法,就能精確的算出Queue中某個消息的MessagePosition是寫入在文件的哪個位置。然后拿到了MessagePosition之后,就能從Message的Chunk文件中讀取到這個消息了。
通過上面的分析,我們知道了,Producer發送一個消息到Broker時,Broker會寫兩次磁盤。一次是現將消息本身寫入磁盤(Message Chunk里),另一次是將消息的寫入位置寫入到磁盤(Queue Chunk里)。細心的朋友可能會問,假如我第一次寫入成功,但第二次寫入時失敗,比如正好機器斷電或者當前Broker服務器正好出啥問題 了,沒有寫入成功。那怎么辦呢?這個沒有什么大的影響。因為首先這種情況會被認為是消息發送失敗。所以Producer還會重新發送該消息,然后Broker收到消息后還會再做一次這兩個寫入操作。也就是說,第一次寫入的消息內容永遠也不會用到了,因為那個寫入位置永遠也不會在Queue Chunk里有記錄。
下面的代碼展示了寫消息到文件的核心代碼:
//消息寫文件需要加鎖,確保順序寫文件 MessageStoreResult result = null; lock (_syncObj) { var queueOffset = queue.NextOffset; var messageRecord = _messageStore.StoreMessage(queueId, queueOffset, message); queue.AddMessage(messageRecord.LogPosition, message.Tag); queue.IncrementNextOffset(); result = new MessageStoreResult(messageRecord.MessageId, message.Code, message.Topic, queueId, queueOffset, message.Tag); }
StoreMessage方法內部實現:
public MessageLogRecord StoreMessage(int queueId, long queueOffset, Message message) { var record = new MessageLogRecord( message.Topic, message.Code, message.Body, queueId, queueOffset, message.CreatedTime, DateTime.Now, message.Tag); _chunkWriter.Write(record); return record; }
queue.AddMessage方法的內部實現:
public void AddMessage(long messagePosition, string messageTag) { _chunkWriter.Write(new QueueLogRecord(messagePosition + 1, messageTag.GetHashcode2())); }
ChunkWriter的內部實現:
public long Write(ILogRecord record) { lock (_lockObj) { if (_isClosed) { throw new ChunkWriteException(_currentChunk.ToString(), "Chunk writer is closed."); } //如果當前文件已經寫完,則需要新建文件 if (_currentChunk.IsCompleted) { _currentChunk = _chunkManager.AddNewChunk(); } //先嘗試寫文件 var result = _currentChunk.TryAppend(record); //如果當前文件已滿 if (!result.Success) { //結束當前文件 _currentChunk.Complete(); //新建新的文件 _currentChunk = _chunkManager.AddNewChunk(); //再嘗試寫入新的文件 result = _currentChunk.TryAppend(record); //如果還是不成功,則報錯 if (!result.Success) { throw new ChunkWriteException(_currentChunk.ToString(), "Write record to chunk failed."); } } //如果需要同步刷盤,則立即同步刷盤 if (_chunkManager.Config.SyncFlush) { _currentChunk.Flush(); } //返回數據寫入位置 return result.Position; } }
當然,我上面為了簡化問題的復雜度。所以沒有引入關於如何根據某個全局的MessagePosition找到其在哪個Message Chunk的問題。這個其實也很好做,我們首先固定好每個Message Chunk文件的大小。比如大小為256MB,然后我們為每個Chunk文件設計一個ChunkHeader,每個Chunk文件總是先把這個ChunkHeader寫入文件,這個Header里記錄了這個文件的起始位置和結束位置,以及文件的大小。這樣我們根據某個MessagePosition計算其在哪個Chunk文件時,只需要把這個MessagePositon對Chunk的大小做取摸操作即可。根據數據的位置找其在哪個Chunk的代碼看起來如下面這樣這樣:
public Chunk GetChunkFor(long dataPosition) { var chunkNum = (int)(dataPosition / _config.GetChunkDataSize()); return GetChunk(chunkNum); } public Chunk GetChunk(int chunkNum) { if (_chunks.ContainsKey(chunkNum)) { return _chunks[chunkNum]; } return null; }
代碼很簡單,就不多講了。拿到了Chunk對象后,我們就可以把dataPosition傳給Chunk,然后Chunk內部把這個全局的dataPosition轉換為本地的一個位置,就能准確的定位到這個數據在當前Chunk文件的實際位置了。將全局位置轉換為本地的位置的算法也很簡單直接:
public int GetLocalDataPosition(long globalDataPosition) { if (globalDataPosition < ChunkDataStartPosition || globalDataPosition > ChunkDataEndPosition) { throw new Exception(string.Format("globalDataPosition {0} is out of chunk data positions [{1}, {2}].", globalDataPosition, ChunkDataStartPosition, ChunkDataEndPosition)); } return (int)(globalDataPosition - ChunkDataStartPosition); }
只需要把這個全局的位置減去當前Chunk的數據開始位置,就能知道這個全局位置相對於當前Chunk的本地位置了。
好了,上面介紹了消息如何寫入的主要思路以及如何讀取數據的思路。
另外一點還想提一下,就是關於刷盤的策略。一般我們寫數據到文件后,是需要調用文件流的Flush方法的,確保數據最終刷入到了磁盤上。否則數據就還是在緩沖區里。當然,我們需要注意到,即便調用了Flush方法,數據可能也還沒真正邏輯到磁盤,而只是在操作系統內部的緩沖區里。這個我們就無法控制了,我們能做到的是調用了Flush方法即可。那當我們每次寫入一個數據到文件都要調用Flush方法的話,無疑性能是低下的,所以就有了所謂的異步刷盤的設計。就是我們寫入消息后不立即調用Flush方法,而是采用一個獨立的線程,定時調用Flush方法來實現刷盤。目前EQueue支持同步刷盤和異步刷盤,開發者可以自己配置決定采用哪一種。異步刷盤的間隔默認是100ms。當我們在追求高吞吐量時,應該考慮異步刷盤,但要求數據可靠性更高但對吞吐量可以低一點時,則可以使用同步刷盤。如果又要高吞吐又要數據高可靠,那就只有一個辦法了,呵呵。就是多增加一些Broker機器,通過集群來彌補單台Broker寫入數據的瓶頸。
如何從文件讀取消息?
假設我們現在要從一個文件讀取數據,且是多線程並發的讀取,要怎么設計?一個辦法是,每次讀取時,創建文件流,然后創建StreamReader,然后讀取文件,讀取完成后釋放StreamReader並關閉文件流。但每次要讀取文件的一個數據都要這樣做的話性能不是太好,因為我們反復的創建這樣的對象。所以,這里我們可以使用對象池的概念。就是Chunk內部,預先創建好一些Reader,當需要讀文件時,獲取一個可用的Reader,讀取完成后,再把Reader歸還到對象池里。基於這個思路,我設計了一個簡單的對象池:
private readonly ConcurrentQueue<ReaderWorkItem> _readerWorkItemQueue = new ConcurrentQueue<ReaderWorkItem>(); private void InitializeReaderWorkItems() { for (var i = 0; i < _chunkConfig.ChunkReaderCount; i++) { _readerWorkItemQueue.Enqueue(CreateReaderWorkItem()); } _isReadersInitialized = true; } private ReaderWorkItem CreateReaderWorkItem() { var stream = default(Stream); if (_isMemoryChunk) { stream = new UnmanagedMemoryStream((byte*)_cachedData, _cachedLength); } else { stream = new FileStream(_filename, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, _chunkConfig.ChunkReadBuffer, FileOptions.None); } return new ReaderWorkItem(stream, new BinaryReader(stream)); } private ReaderWorkItem GetReaderWorkItem() { ReaderWorkItem readerWorkItem; while (!_readerWorkItemQueue.TryDequeue(out readerWorkItem)) { Thread.Sleep(1); } return readerWorkItem; } private void ReturnReaderWorkItem(ReaderWorkItem readerWorkItem) { _readerWorkItemQueue.Enqueue(readerWorkItem); }
當一個Chunk初始化時,我們預先初始化好固定數量(可配置)的Reader對象,並把這些對象放入一個ConcurrentQueue里(對象池的作用),然后要讀取數據時,從從ConcurrentQueue里拿一個可用的Reader即可,如果當前並發太高拿不到怎么辦,就等待直到拿到為止,目前我是等待1ms后繼續嘗試拿,直到最后拿到為止。然后ReturnReaderWorkItem就是數據讀取完之后歸還Reader到對象池。就是不是很簡單哦。這樣的設計,可以避免不斷的創建文件流和Reader對象,可以避免GC的副作用。
Broker重啟時如何做?
大家知道,當Broker重啟時,我們是需要掃描磁盤上Chunk目錄下的所有Chunk文件的。那怎么掃描呢?上面其實我也簡單提到過。首先,我們可以對每個Chunk文件的文件名的命名定義一個規則,第一個Chunk文件的文件名比如為:message-chunk-000000000,第二個為:message-chunk-000000001,以此類推。那我們掃描時,只要先把所有的文件名獲取到,然后對文件名升序排序。那最后一個文件之前的文件肯定都是寫入完全了的,即上面我說的Completed狀態的,而最后一個文件是還沒有寫入完的,還可以接着寫。所以我們初始化時,只需要先初始化最后一個之前的所有Chunk文件,最后再初始化最后一個文件即可。這里我所說的初始化不是要把整個Chunk文件的內容都加載到內存,而是只是讀取這個文件的ChunkHeader的信息維護在內存即可。有了Header信息,我們就可以為后續的數據讀取提供位置計算了。所以,整個加載過程是很快的,讀取100個Chunk文件的ChunkHeader也不過一兩秒的時間,完全不影響Broker的啟動時間。對於初始化Completed的Chunk比較簡單,只需要讀取ChunkHeader信息即可。但是初始化最后一個文件比較麻煩,因為我們還要知道這個文件當前寫入到哪里了?從而我們可以從這個位置的下一個位置接着往下寫。那怎么知道這個文件當前寫入到哪里了呢?其實比較復雜。有很多技術,我看到RocketMQ和EventStore這兩個開源項目中都采用了Checkpoint的技術。就是當我們每次寫入一個數據到文件后,都會更新一下Checkpoint,即表示當前寫入到這個文件的哪里了。然后這個Checkpoint值我們也是定時異步保存到某個獨立的小文件里,這個文件里只保存了這個Checkpoint。這樣的設計有一個問題,就是假如數據寫入了,但由於Checkpoint的保存不是實時的,所以理論上會出現Checkpint值會小於實際文件寫入的位置的情況。一般我們忽略這種情況即可,即可能會存在初始化時,下次寫入可能會覆蓋一定的之前已經寫入的數據,因為Checkpoint可能是稍微老一點的。
而我在設計時,希望能再嚴謹一點,取消Checkpoint的設計,而是采用在初始化Ongoing狀態的Chunk文件時,從文件的頭開始不斷往下讀,當最后無法往下讀時,我們就知道這個文件我們當前寫入到哪里了。那怎么知道無法往下讀了呢?也就是說怎么確定后續的文件內容不是我們寫入的?也很簡單。對於不固定數據長度的Chunk來說,由於我們每次寫入一個數據時都是同時在前后寫入這個數據的長度;所以我們再初始化讀取這個文件時,可以借助這一點來校驗,但出現不符合這個規則的數據時,就認為后續不是正常的數據了。對於固定長度的Chunk來說,我們只要保證每次寫入的數據的數據是非0了。而對於EQueue的場景,固定數據的Chunk里存儲的都是消息在Message Chunk中的全局位置,一個Long值;但這個Long值我們正常是從0開始的,怎么辦呢?很簡單,我們寫入MessagePosition時,總是加1即可。即假如當前的MessagePosition為0,那我們實際寫入1,如果為100,則實際寫入的值是101。這樣我們就能確保這個固定長度的Chunk文件里每個數據都是非0的。然后我們在初始化這樣的Chunk文件時,只要不斷讀取固定長度(8個字節)的數據,當出現讀取到的數據為0時,就認為已經到頭了,即后續的不是我們寫入的數據了。然后我們就能知道接下來要從哪里開始讀取了哦。
如何盡量避免讀文件?
上面我介紹了如何讀文件的思路。我們也知道了,我們是在消費者要消費消息時,從文件讀取消息的。但對從文件讀取消息總是沒有比從內存讀取消息來的快。我們前面的設計都沒有把內存好好利用起來。所以我們能否考慮把未來可能要消費的Chunk文件的內容直接緩存在內存呢?這樣我們就可以避免對文件的讀取了。肯定可以的。那怎么做呢?前面我提高多,曾經我們用托管內存中的ConcurrentDictionary<long, Message>這樣的字典來緩存消息。我也提到這會帶來垃圾回收而影響性能的問題。所以我們不能直接這樣簡單的設計。經過我的一些嘗試,以及從EventStore中的源碼中學到的,我們可以使用非托管內存來緩存Chunk文件。我們可以使用Marshal.AllocHGlobal來申請一塊完整的非托管內存,然后再需要釋放時,通過Marshal.FreeHGlobal來釋放。然后,我們可以通過UnmanagedMemoryStream來訪問這個非托管內存。這個是核心思路。那么怎樣把一個Chunk文件緩存到非托管內存呢?很簡單了,就是掃描這個文件的所有內容,把內容都寫入內存即可。代碼如下:
private void LoadFileChunkToMemory() { using (var fileStream = new FileStream(_filename, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 8192, FileOptions.None)) { var cachedLength = (int)fileStream.Length; var cachedData = Marshal.AllocHGlobal(cachedLength); try { using (var unmanagedStream = new UnmanagedMemoryStream((byte*)cachedData, cachedLength, cachedLength, FileAccess.ReadWrite)) { fileStream.Seek(0, SeekOrigin.Begin); var buffer = new byte[65536]; int toRead = cachedLength; while (toRead > 0) { int read = fileStream.Read(buffer, 0, Math.Min(toRead, buffer.Length)); if (read == 0) { break; } toRead -= read; unmanagedStream.Write(buffer, 0, read); } } } catch { Marshal.FreeHGlobal(cachedData); throw; } _cachedData = cachedData; _cachedLength = cachedLength; } }
代碼很簡單,不用多解釋了。需要注意的是,上面這個方法針對的是Completed狀態的Chunk,即已經寫入完成的Chunk的。已經寫入完全的Chunk是只讀的,不會再發生更改,所以我們可以隨便緩存在內存中。
那對於新創建出來的Chunk文件呢?正常情況下,消費者來得及消費時,我們總是在不斷的寫入最新的Chunk文件,也在不斷的從這個最新的Chunk文件讀取消息。那我們怎么確保消費最新的消息時,也不需要從文件讀取呢?也很簡單,就是在新建一個Chunk文件時,如果內存足夠,也同時創建一個一樣大小的基於非托管內存的Chunk。然后我們再寫入消息到文件Chunk成功后,再同時寫入這個消息到非托管內存的Chunk。這樣,我們在消費消息,讀取消息時總是首先判斷當前Chunk是否關聯了一個非托管內存的Chunk,如果有,就優先從內存讀取即可。如果沒有才從文件Chunk讀取。
但是從文件讀取時,可能會遇到一個問題。就是我們剛寫入到文件的數據可能無法立即讀取到。因為寫入的數據沒有立即刷盤,所以無法通過Reader讀取到。所以,我們不能僅通過判斷當前寫入的位置來判斷當前是否還有數據可以被讀取,而是考慮當前的最后一次刷盤的位置。理論上只能讀取刷盤之前的數據。但即便這樣設計了,在如果當前硬盤不是SSD的情況下,好像也會出現讀不到數據的問題。偶爾會報錯,有朋友在測試時已經遇到了這樣的問題。那怎么辦呢?我想了一個辦法。因為這種情況歸根接地還是因為我們邏輯上認為已經寫入到文件的數據由於未及時刷盤或者操作系統本身的內部緩存的問題,導致數據未能及時寫入磁盤。出現這種情況一定是最近的一些數據。所以我們如果能夠把比如最近寫入的10000(可配置)個數據都緩存在本地托管內存中,然后讀取時先看本地緩存的托管內存中有沒有這個位置的數據,如果有,就不需要讀文件了。這樣就能很好的解決這個問題了。那怎么確保我們只緩存了最新的10000個數據且不會超出10000個呢?答案是環形隊列。這個名字聽起來很高大上,其實就是一個數組,數組的長度為10000,然后我們在寫入數據時,我們肯定知道這個數據在文件中的位置的,我們可以把這個位置(一個long值)對10000取摸,就能知道該把這個數據緩存在這個數組的哪個位置了。通過這個設計確保緩存的數據不會超過10000個,且確保一定只緩存最新的數據,如果新的數據保存到數組的某個下標時,該下標已經存在以前已經保存過的數據了,就自動覆蓋掉即可。由於這個數組的長度不是很長,所以每什么GC的問題。
但是光這樣還不夠,我們這個數組中的每個元素至少要記錄這個元素對應的數據在文件中的位置。這個是為了我們在從數組中獲取到數據后,進一步校驗這個數據是否是我想要的那個位置的數據。這點大家應該可以理解的吧。下面這段代碼展示了如何從環形數組中讀取想要的數據:
if (_cacheItems != null) { var index = dataPosition % _chunkConfig.ChunkLocalCacheSize; var cacheItem = _cacheItems[index]; if (cacheItem != null && cacheItem.RecordPosition == dataPosition) { var record = readRecordFunc(cacheItem.RecordBuffer); if (record == null) { throw new ChunkReadException( string.Format("Cannot read a record from data position {0}. Something is seriously wrong in chunk {1}.", dataPosition, this)); } if (_chunkConfig.EnableChunkReadStatistic) { _chunkStatisticService.AddCachedReadCount(ChunkHeader.ChunkNumber); } return record; } }
_cacheItems是當前Chunk內的一個環形數組,然后假如當前我們要讀取的數據的位置是dataPosition,那我們只需要先對環形數據的長度取摸,得到一個下標,即上面代碼中的index。然后就能從數組中拿到對應的數據了,然后如果這個數據存在,就進一步判斷這個數據dataPosition是否和要求的dataPosition,如果一致,我們就能確定這個數據確實是我們想要的數據了。就可以返回了。
所以,通過上面的兩種緩存(非托管內存+托管內存環形數組)的設計,我們可以確保幾乎不用再從文件讀取消息了。那什么時候還是會從文件讀取呢?就是在1)內存不夠用了;2)當前要讀取的數據不是最近的10000個;這兩個前提下,才會從文件讀取。一般我們線上服務器,肯定會保證內存是可用的。EQueue現在有兩個內存使用的水位。一個是當物理內存使用到多少百分比(默認值為40%)時,開始清理已經不再活躍的Chunk文件的非托管內存Chunk;那什么是不活躍呢?就是在最近5s內沒有發生過讀寫的Chunk。這個設計我覺得是非常有效的,因為假如一個Chunk有5s沒有發生過讀寫,那一般肯定是沒有消費者在消費它了。另一個水位是指,最多EQueue Broker最多使用物理內存的多少百分比(默認值為75%),這個應該好理解。這個水位是為了保證EQueue不會把所有物理內存都吃光,是為了確保服務器不會因為內存耗盡而宕機或導致服務不可用。
那什么時候會出現大量使用服務器內存的情況呢?我們可以推導出來的。正常情況下,消息寫入第一個Chunk,我們就在讀取第一個Chunk;寫入第二個Chunk我們也會跟着讀取第二個Chunk;假設當前寫入到了第10個Chunk,那理論上前面的9個Chunk之前緩存的非托管內存都可以釋放了。因為肯定超過5s沒有發生讀寫了。但是假如現在消費者有很多,且每個消費者的消費進度都不同,有些很快,有些很慢,當所有的消費者的消費進度正好覆蓋到所有的Chunk文件時,就意味着每個Chunk文件都在發生讀取。也就是說,每個Chunk都是活躍的。那此時就無法釋放任何一個Chunk的非托管內存了。這樣就會導致占用大量非托管內存了。但由於75%的水位的設計,Broker內存的使用是不會超過物理內存75%的。在創建新的Chunk或者嘗試緩存一個Completed的Chunk時,總是會判斷當前使用的物理內存是否已經超過75%,如果已經超過,就不會分配對應的非托管內存了。
如何刪除消息?
刪除消息的設計比較簡單。主要的思路是,當我們的消息已經被所有的消費者都消費過了,且滿足我們的刪除策略了,就可以刪除了。RocketMQ刪除消息的策略比較粗暴,沒有考慮消息是否經被消費,而是直接到了一定的時間就刪除了,比如最多只保留2天。這個是RocketMQ的設計。EQueue中,會確保消息一定是被所有的消費者都消費了才會考慮刪除。然后目前我設計的刪除策略有兩種:
- 按Chunk文件數;即設計一個閥值,表示磁盤上最多保存多少個Chunk文件。目前默認值為100,每個Chunk文件的大小為256MB。也就是大概總磁盤占用25G。一般我們的硬盤肯定有25G的。當我們不關心消息保存多久而只從文件數的角度來決定消息是否要刪除時,可以使用這個策略;
- 按時間來刪除,默認是7天,即當某個Chunk是7天前創建的,那我們就可以創建了。這種策略是不關心Chunk總共有多少,完全根據時間的維度來判斷。
實際上,應該可能還有一些需求希望能把兩個策略合起來考慮的。這個目前我沒有做,我覺得這兩種應該夠了。如果大家想做,可以自己擴展的。
另外,上面我說過EQueue中目前有兩種Chunk文件,一種是存儲消息本身的,我叫做Message Chunk;一種是存儲隊列信息的,我叫做Queue Chunk;而Queue Chunk的數據是依賴於Message Chunk的。上面我說的兩種刪除策略是針對Message Chunk而言的。而Queue Chunk,由於這個依賴性,我覺得比較合理的方式是,只需要判斷當前Queue Chunk中的所有的消息對應的Message Chunk是否已經都刪除了,如果是,難說明這個Queue Chunk也已經沒意義了,就可以刪除了。但只要這個Queue Chunk中至少還有一個消息的Chunk文件沒刪除,那這個Queue Chunk就不會刪除。
上面這個只是思路哦,真實的代碼肯定比這個復雜,呵呵。有興趣的朋友還是要看代碼的。
如何查消息?
之前用SQL Server的方式,由於DB很容易查消息,所以查詢消息不是大問題。但是現在我們的消息是放在文件里的,那要怎么查詢呢?目前支持根據消息ID來查詢。當Producer發送一個消息到Broker,Broker返回結果里會包含消息的ID。Producer的正確做法應該是要用日志或其他方式記錄這個ID,並最好和自己的當前業務消息的某個業務ID一起記錄,比如CommandId或者EventId,這樣我們就能根據我們的業務ID找到消息ID,然后根據消息ID找到消息內容了。那消息ID現在是怎么設計的呢?也是受到RocketMQ的啟發,消息ID由兩部分組成:1)Broker的IP;2)消息在Broker的文件中的全局位置;這樣,當我們要根據某個消息ID查詢時,就可以先定位到這個消息在哪個Broker上,也同時知道了消息在文件的哪個位置了,這樣就能最終讀取這個消息的內容了。
為什么要這樣設計呢?如果我們的Broker沒有集群,那其實不需要包含Broker的IP;這個設計是為了未來EQueue Broker會支持集群的,那個時候,我們就必須要知道某個消息ID對應的Broker是哪個了。
如何保存隊列消費進度?
EQueue中,每個Queue,都會有一個對應的Consumer。消費進度就是這個Queue當前被消費到哪里了,一個Offset值。比如Offset為100,就表示當前這個Queue已經消費到第99(因為是從0開始的)個位置的消息了。因為一個Broker上有很多的Queue,比如有100個。而我們現在是使用文件的方式來存儲信息了。所以自然消費進度也是用文件了。但由於消費進度的信息很少,也不是遞增的形式。所以我們可以簡單設計,目前EQueue采用一個文件的方式來保存所有Queue的消費進度,文件內容為JSON,JSON里記錄了每個Queue的消費進度。文件內容看起來像下面這樣:
{"SampleGroup":{"topic1-3":89055,"topic1-2":89599,"topic1-1":89471,"topic1-0":89695}}
上面的JSON標識一個名為SampleGroup的ConsumerGroup,他消費了一個名為topic1的topic,然后這個topic下的每個Queue的消費進度記錄了下來。如果有另一個ConsumerGroup,也消費了這個topic,那消費進度是隔離的。如果還不清楚ConsumerGroup的同學,要去看一下我之前寫的EQueue的文章了。
還有沒有可以優化的地方?
到目前為止,還有沒有其他可優化的大的地方呢?有。之前我做EQueue時,總是把消息從數據庫讀取出來,然后構造出消息對象,再把消息對象序列化為二進制,再返回給Consumer。這里涉及到從DB拿出來,再序列化為二進制。學習了RocketMQ的代碼后,我們可以做的更聰明一點。因為其實基於文件存儲時,我們從文件里拿出來的已經是二進制了。所以可以直接把二進制返回給消費者即可。不需要先轉換為對象再做序列化了。通過這個設計的改進,我們現在的消費者消費消息,可以說無任何瓶頸了,非常快。
如何統計消息讀寫情況?
在測試寫文件的這個版本時,我們很希望知道每個Chunk的讀寫情況的統計,從而確定設計是正確的。所以,我給EQueue的Chunk增加了實時統計Chunk讀寫的統計服務。目前我們在運行EQueue自帶的例子時,Broker會每個一秒打印出所有Chunk的讀寫情況,這個特性極大的方便我們判斷消息的發送和消費是否正常,消費是否有延遲等。
其他新增功能
更完善和安全的隊列擴容和縮容設計
這次我給EQueue的Web后台管理控制台也完善了一下隊列的增加和減少的設計。增加隊列(即隊列的擴容)比較簡單,直接新增即可。但是當我們要刪除一個隊列時,怎樣安全的刪除呢?主要是要確保刪除這個隊列時,已經沒有Producer或Consumer在使用這個隊列了。要怎么做到呢?我的思路是,為每個Queue對象設計兩個屬性,表示對Producer是否可見,對Consumer是否可見。當我們要刪除某個Queue時,可以:1)先讓其對Producer不可見,這樣Producer后續就不會再發送新的消息到這個隊列了;然后等待,直到這個隊列里的消息都被所有的消費者消費掉了;然后再設置為對Consumer不可見。然后再過幾秒,確保每個消費者都不會再向這個隊列發出拉取消息的請求了。這樣我們就能安全的刪除這個隊列了。刪除隊列的邏輯大概如如下:
public void DeleteQueue(string topic, int queueId) { lock (this) { var key = QueueKeyUtil.CreateQueueKey(topic, queueId); Queue queue; if (!_queueDict.TryGetValue(key, out queue)) { return; } //檢查隊列對Producer或Consumer是否可見,如果可見是不允許刪除的 if (queue.Setting.ProducerVisible || queue.Setting.ConsumerVisible) { throw new Exception("Queue is visible to producer or consumer, cannot be delete."); } //檢查是否有未消費完的消息 var minConsumedOffset = _consumeOffsetStore.GetMinConsumedOffset(topic, queueId); var queueCurrentOffset = queue.NextOffset - 1; if (minConsumedOffset < queueCurrentOffset) { throw new Exception(string.Format("Queue is not allowed to delete, there are not consumed messages: {0}", queueCurrentOffset - minConsumedOffset)); } //刪除隊列的消費進度信息 _consumeOffsetStore.DeleteConsumeOffset(queue.Key); //刪除隊列本身,包括所有的文件 queue.Delete(); //最后將隊列從字典中移除 _queueDict.Remove(key); } }
代碼應該很簡單直接,不多解釋了。隊列的動態新增和刪除,可以方便我們線上應付在線活動時,隨時為消費者提供更高的並行消費能力,以及活動結束后去掉多余的隊列。是非常實用的功能。
支持Tag功能
這個功能,也是非常實用的。這個版本我加了上去。以前EQueue只有Topic的概念,沒有Tag的概念。Tag是對Topic的二級過濾。比如當某個Producer發送了3個消息,Topic都是topic,然后tag分別是01,02,03。然后Consumer訂閱了這個Topic,但是訂閱這個Topic時同時制定了Tag,比如指定為02,那這個Consumer就只會收到一個消息。Tag為01,03的消息是不會收到的。這個就是Tag的功能。我覺得Tag對我們是非常有用的,它可以極大的減少我們定義Topic。本來我們必須要定義一個新的Topic時,現在可能只需要定義一個Tag即可。關於Tag的實現,我就不展開了。
支持消息堆積報警
終於到最后一點了,終於堅持快寫完了,呵呵。EQueue Web后台管理控制台現在支持消息堆積的報警了。當EQueue Broker上當前所有未消費的消息數達到一定的閥值時,就會發送郵件進行報警。我們可以把我們的郵件和我們的手機短信進行綁定,比如移動的139郵箱我記得就有這個功能。這樣我們就能第一時間知道Broker上是否有大量消息堆積了,可以讓我們第一時間處理問題。
結束語
這篇文章感覺是我有史以來寫過的最有干貨的一篇了,呵呵。一氣呵成,也是對我前面幾個月的所有積累知識經驗的一次性釋放吧。希望能給大家一些幫助。我寫文章比較喜歡寫思路,不太喜歡介紹如何用。我覺得一個程序員,最重要的是要學會如何思考去解決自己想解決的問題。而不是別人直接告訴你如何去解決。通過做EQueue這個分布式消息隊列,也算是我自己的一個實踐過程。我非常鼓勵大家寫開源項目哦,當你專注於實現某個你感興趣的開源項目時,你就會有目標性的去學習相關的知識,你的學習就不會迷茫,不會為了學技術而學技術了。我在做EQuque時,要考慮各種東西,比如通信層的設計、消息持久化、整個架構設計,等等。我覺得是非常鍛煉人的。
一個人時間短暫,如果能用有限的時間做出好的東西可以造福后人,那我們來到這個世上也算沒白來了,你說對嗎?所以,我們千萬不要放棄我們的理想,雖然堅持理想很難,但也要堅持。