http://www.tuicool.com/articles/umQfMzA
1.序言
今天來和大家探討一下RocketMQ在消息存儲方面所作出的努力,在介紹RocketMQ的存儲模型之前,可以先探討一下MQ的存儲模型選擇。
2.MQ的存儲模型選擇
個人看來,從MQ的類型來看,存儲模型分兩種:
- 需要持久化(ActiveMQ,RabbitMQ,Kafka,RocketMQ)
- 不需要持久化(ZeroMQ)
本篇文章主要討論持久化MQ的存儲模型,因為現在大多數的MQ都是支持持久化存儲,而且業務上也大多需要MQ有持久存儲的能力,能大大增加系統的高可用性,下面幾種存儲方式:
- 分布式KV存儲(levelDB,RocksDB,redis)
- 傳統的文件系統
- 傳統的關系型數據庫
這幾種存儲方式從效率來看, 文件系統 > kv存儲 > 關系型數據庫 ,因為直接操作文件系統肯定是最快的,而關系型數據庫一般的TPS都不會很高,我印象中Mysql的寫不會超過5Wtps(現在不確定最新情況),所以如果追求效率就直接操作文件系統。
但是如果從可靠性和易實現的角度來說,則是 關系型數據庫 > kv存儲 > 文件系統 ,消息存在db里面非常可靠,但是性能會下降很多,所以具體的技術選型都是需要根據自己的業務需求去考慮。
3.RocketMQ的存儲架構
3.1存儲特點:
如上圖所示:
(1)消息主體以及元數據都存儲在**CommitLog**當中
(2)Consume Queue相當於kafka中的partition,是一個邏輯隊列,存儲了這個Queue在CommiLog中的起始offset,log大小和MessageTag的hashCode。 (3)每次讀取消息隊列先讀取consumerQueue,然后再通過consumerQueue去commitLog中拿到消息主體。
rocketMQ的設計理念很大程度借鑒了kafka,所以有必要介紹下kafka的存儲結構設計:

-
存儲特點:
和RocketMQ類似,每個Topic有多個partition(queue),kafka的每個partition都是一個獨立的物理文件,消息直接從里面讀寫。
根據之前阿里中間件團隊的測試,一旦kafka中Topic的partitoin數量過多,隊列文件會過多,會給磁盤的IO讀寫造成很大的壓力,造成tps迅速下降。
所以RocketMQ進行了上述這樣設計,consumerQueue中只存儲很少的數據,消息主體都是通過CommitLog來進行讀寫。
沒有一種方案是銀彈,那么RocketMQ這樣處理有什么 優缺點 ?
-
3.2.1優點:
1、隊列輕量化,單個隊列數據量非常少。對磁盤的訪問串行化,避免磁盤竟爭,不會因為隊列增加導致IOWAIT增高。
-
3.2.2缺點:
寫雖然完全是順序寫,但是讀卻變成了完全的隨機讀。
讀一條消息,會先讀ConsumeQueue,再讀CommitLog,增加了開銷。
要保證CommitLog與ConsumeQueue完全的一致,增加了編程的復雜度。
- 3.2.3以上缺點如何克服 :
隨機讀,盡可能讓讀命中page cache,減少IO讀操作,所以內存越大越好。如果系統中堆積的消息過多,讀數據要訪問磁盤會不會由於隨機讀導致系統性能急劇下降,答案是否定的。
訪問page cache 時,即使只訪問1k的消息,系統也會提前預讀出更多數據,在下次讀時,就可能命中內存。
隨機訪問Commit Log磁盤數據,系統IO調度算法設置為NOOP方式,會在一定程度上將完全的隨機讀變成順序跳躍方式,而順序跳躍方式讀較完全的隨機讀性能會高5倍以上。
另外4k的消息在完全隨機訪問情況下,仍然可以達到8K次每秒以上的讀性能。
由於Consume Queue存儲數據量極少,而且是順序讀,在PAGECACHE預讀作用下,Consume Queue的讀性能幾乎與內存一致,即使堆積情況下。所以可認為Consume Queue完全不會阻礙讀性能。
Commit Log中存儲了所有的元信息,包含消息體,類似於Mysql、Oracle的redolog,所以只要有Commit Log在,Consume Queue即使數據丟失,仍然可以恢復出來。
4 底層實現
先討論下RocketMQ中存儲的底層實現:
4.1 MappedByteBuffer
RocketMQ中的文件讀寫主要就是通過MappedByteBuffer進行操作,來進行文件映射。利用了nio中的FileChannel模型,可以直接將物理文件映射到緩沖區,提高讀寫速度。
具體的測試我沒有做benchmark,網上有相應的測試。
4.2 page cache
剛剛提到的緩沖區,也就是之前說到的page cache。
通俗的說:pageCache是系統讀寫磁盤時為了提高性能將部分文件緩存到內存中,下面是詳細解釋:
page cache:這里所提及到的page cache,在我看來是linux中vfs虛擬文件系統層的cache層,一般pageCache默認是4K大小,它被操作系統的內存管理模塊所管理,文件被映射到內存,一般都是被mmap()函數映射上去的。
總結一下這里使用的存儲底層(我認為的): 通過將文件映射到內存上,直接操作文件,相比於傳統的io(首先要調用系統IO,然后要將數據從內核空間傳輸到用戶空間),避免了很多不必要的數據拷貝,所以這種技術也被稱為 零拷貝 ,具體可見IBM團隊關於零拷貝的博客:
零拷貝5 具體實現
5.1 對象架構簡介
先說消息實體存儲的流程,老規矩,看圖說話,先畫個UML圖:

下面簡要介紹一下各個關鍵對象的作用:
DefaultMessageStore:這是存儲模塊里面最重要的一個類,包含了很多對存儲文件的操作API,其他模塊對消息實體的操作都是通過DefaultMessageStore進行操作。
commitLog:commitLog是所有物理消息實體的存放文件,這篇文章的架構圖里可以看得到。其中commitLog持有了MapedFileQueue。
**consumeQueue:**consumeQueue就對應了相對的每個topic下的一個邏輯隊列(rocketMQ中叫queque,kafka的概念里叫partition), 它是一個邏輯隊列!存儲了消息在commitLog中的offSet。
indexFile:存儲具體消息索引的文件,以一個類似hash桶的數據結構進行索引維護。
MapedFileQueue:這個對象包含一個MapedFileList,維護了多個mapedFile,升序存儲。一個MapedFileQueue針對的就是一個目錄下的所有二進制存儲文件。理論上無線增長,定期刪除過期文件。

(圖中左側的目錄樹中,一個0目錄就是一個MapedFileQueue,一個commitLog目錄也是一個MapedFileQueue,右側的000000000就是一個MapedFile。)
MapedFile:每個MapedFile對應的就是一個物理二進制文件了,在代碼中負責文件讀寫的就是MapedByteBuffer和fileChannel。相當於對pageCache文件的封裝。

5.2 消息存儲主流程
我根據源碼畫了消息存儲的時序圖,大致都是線性的調用,其中包含一些對pageCache是否繁忙、處理時間是否超時以及參數的校驗。

5.2.1 consumeQueue的消息處理
上述的消息存儲只是把消息主體存儲到了物理文件中,但是並沒有把消息處理到consumeQueue文件中,那么到底是哪里存入的?
任務處理一般都分為兩種:
-
一種是同步,把消息主體存入到commitLog的同時把消息存入consumeQueue,rocketMQ的早期版本就是這樣處理的。
-
另一種是異步處理,起一個線程,不停的輪詢,將當前的consumeQueue中的offSet和commitLog中的offSet進行對比,將多出來的offSet進行解析,然后put到consumeQueue中的MapedFile中。
問題:為什么要改同步為異步處理?應該是為了增加發送消息的吞吐量。
5.2.2 刷盤策略實現消息在調用MapedFile的appendMessage后,也只是將消息裝載到了ByteBuffer中,也就是內存中,還沒有落盤。落盤需要將內存flush到磁盤上,針對commitLog,rocketMQ提供了兩種落盤方式。
異步落盤
public void run() { CommitLog.log.info(this.getServiceName() + " service started"); //不停輪詢 while (!this.isStoped()) { boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed(); int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); //拿到要刷盤的頁數 int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); boolean printFlushProgress = false; // Print flush progress long currentTimeMillis = System.currentTimeMillis(); //控制刷盤間隔,如果當前的時間還沒到刷盤的間隔時間則不刷 if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { this.lastFlushTimestamp = currentTimeMillis; flushPhysicQueueLeastPages = 0; printFlushProgress = ((printTimes++ % 10) == 0); } try { //是否需要刷盤休眠 if (flushCommitLogTimed) { Thread.sleep(interval); } else { this.waitForRunning(interval); } if (printFlushProgress) { this.printFlushProgress(); } //commit開始刷盤 CommitLog.this.mapedFileQueue.commit(flushPhysicQueueLeastPages); long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); this.printFlushProgress(); } } // Normal shutdown, to ensure that all the flush before exit boolean result = false; for (int i = 0; i < RetryTimesOver && !result; i++) { result = CommitLog.this.mapedFileQueue.commit(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } this.printFlushProgress(); CommitLog.log.info(this.getServiceName() + " service end"); }
再看一下刷盤時檢查是否能刷的細節代碼:
MappedFile.java
public int commit(final int flushLeastPages) { //判斷當前是否能刷盤 if (this.isAbleToFlush(flushLeastPages)) { //類似於一個智能指針,控制刷盤線程數 if (this.hold()) { int value = this.wrotePostion.get(); System.out.println("value is "+value+",thread is "+Thread.currentThread().getName()); //刷盤,內存到硬盤 this.mappedByteBuffer.force(); this.committedPosition.set(value); //釋放智能指針 this.release(); } else { log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get()); this.committedPosition.set(this.wrotePostion.get()); } } return this.getCommittedPosition(); } //判斷是否能刷盤 private boolean isAbleToFlush(final int flushLeastPages) { //已經刷到的位置 int flush = this.committedPosition.get(); //寫到內存的位置 int write = this.wrotePostion.get(); System.out.println("flush is "+flush+",write is "+write); if (this.isFull()) { return true; } //滿足寫到內存的offset比已經刷盤的offset大4K*4(默認的最小刷盤頁數,一頁默認4k) if (flushLeastPages > 0) { return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages; } return write > flush; }
總的來說RocketMQ使用了java nio的文件api進行文件內存倒硬盤的持久化。主要是MappedByteBuffer之類的一些api。
- 同步落盤
批量落盤不同於之前的異步落盤,使用兩個讀寫list交替來避免上鎖,提高效率。
同時使用了countDownLatch來等待刷盤的間隔,消息的刷盤必須等待GroupCommitRequest的喚醒。
//封裝的一次刷盤請求 public static class GroupCommitRequest { //這次請求要刷到的offSet位置,比如已經刷到2, private final long nextOffset; //控制flush的拴 private final CountDownLatch countDownLatch = new CountDownLatch(1); private volatile boolean flushOK = false; public GroupCommitRequest(long nextOffset) { this.nextOffset = nextOffset; } public long getNextOffset() { return nextOffset; } //刷完了喚醒 public void wakeupCustomer(final boolean flushOK) { this.flushOK = flushOK; this.countDownLatch.countDown(); } public boolean waitForFlush(long timeout) { try { this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); return this.flushOK; } catch (InterruptedException e) { e.printStackTrace(); return false; } } } /** * GroupCommit Service * 批量刷盤服務 */ class GroupCommitService extends FlushCommitLogService { //用來接收消息的隊列,提供寫消息 private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>(); //用來讀消息的隊列,將消息從內存讀到硬盤 private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>(); //添加一個刷盤的request public void putRequest(final GroupCommitRequest request) { synchronized (this) { //添加到寫消息的list中 this.requestsWrite.add(request); //喚醒其他線程 if (!this.hasNotified) { this.hasNotified = true; this.notify(); } } } //交換讀寫隊列,避免上鎖 private void swapRequests() { List<GroupCommitRequest> tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp; } private void doCommit() { //讀隊列不為空 if (!this.requestsRead.isEmpty()) { //遍歷 for (GroupCommitRequest req : this.requestsRead) { // There may be a message in the next file, so a maximum of // two times the flush boolean flushOK = false; for (int i = 0; (i < 2) && !flushOK; i++) { // flushOK = (CommitLog.this.mapedFileQueue.getCommittedWhere() >= req.getNextOffset()); //如果沒刷完 即flushOK為false則繼續刷 if (!flushOK) { CommitLog.this.mapedFileQueue.commit(0); } } //刷完了喚醒 req.wakeupCustomer(flushOK); } long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } //清空讀list this.requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog.this.mapedFileQueue.commit(0); } } public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStoped()) { try { this.waitForRunning(0); this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } // Under normal circumstances shutdown, wait for the arrival of the // request, and then flush //正常關閉時要把沒刷完的刷完 try { Thread.sleep(10); } catch (InterruptedException e) { CommitLog.log.warn("GroupCommitService Exception, ", e); } synchronized (this) { this.swapRequests(); } this.doCommit(); CommitLog.log.info(this.getServiceName() + " service end"); } }
5.3 消息索引
這里的消息索引主要是提供根據起始時間、topic和key來查詢消息的接口。
首先根據給的topic、key以及起始時間查詢到一個list,然后將offset拉到commitLog中查詢,再反序列化成消息實體。
5.3.2 索引的具體實現
看一張圖,摘自官方文檔:

索引的邏輯結構類似一個hashMap。
先看什么時候開始構建索引:
構建consumeQueue的同時會buildIndex構建索引
如何構建索引?
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) { //索引頭的索引數小於indexNum if (this.indexHeader.getIndexCount() < this.indexNum) { //根據key第一次計算hash int keyHash = indexKeyHashMethod(key); //第二次計算出hash槽位 int slotPos = keyHash % this.hashSlotNum; int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * HASH_SLOT_SIZE; FileLock fileLock = null; try { // fileLock = this.fileChannel.lock(absSlotPos, HASH_SLOT_SIZE, // false); int slotValue = this.mappedByteBuffer.getInt(absSlotPos); if (slotValue <= INVALID_INDEX || slotValue > this.indexHeader.getIndexCount()) { slotValue = INVALID_INDEX; } long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp(); timeDiff = timeDiff / 1000; if (this.indexHeader.getBeginTimestamp() <= 0) { timeDiff = 0; } else if (timeDiff > Integer.MAX_VALUE) { timeDiff = Integer.MAX_VALUE; } else if (timeDiff < 0) { timeDiff = 0; } int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * HASH_SLOT_SIZE + this.indexHeader.getIndexCount() * INDEX_SIZE; //放入索引的內容 this.mappedByteBuffer.putInt(absIndexPos, keyHash); this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue); this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount()); if (this.indexHeader.getIndexCount() <= 1) { this.indexHeader.setBeginPhyOffset(phyOffset); this.indexHeader.setBeginTimestamp(storeTimestamp); } this.indexHeader.incHashSlotCount(); this.indexHeader.incIndexCount(); this.indexHeader.setEndPhyOffset(phyOffset); this.indexHeader.setEndTimestamp(storeTimestamp); return true; } catch (Exception e) { log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e); } finally { if (fileLock != null) { try { fileLock.release(); } catch (IOException e) { e.printStackTrace(); } } } } else { log.warn("putKey index count " + this.indexHeader.getIndexCount() + " index max num " + this.indexNum); } return false; }
下面摘自官方文檔:
- 根據查詢的 key 的 hashcode%slotNum 得到具體的槽的位置(slotNum 是一個索引文件里面包含的最大槽的數目,
例如圖中所示 slotNum=5000000) 。 - 根據 slotValue(slot 位置對應的值)查找到索引項列表的最后一項(倒序排列,slotValue 總是挃吐最新的一個項目開源主頁: https://github.com/alibaba/RocketMQ
21
索引項) 。 - 遍歷索引項列表迒回查詢時間范圍內的結果集(默訃一次最大迒回的 32 條記彔)
- Hash 沖突;尋找 key 的 slot 位置時相當亍執行了兩次散列函數,一次 key 的 hash,一次 key 的 hash 值叏模,
因此返里存在兩次沖突的情冴;第一種,key 的 hash 值丌同但模數相同,此時查詢的時候會在比較一次 key 的
hash 值(每個索引項保存了 key 的 hash 值),過濾掉 hash 值丌相等的項。第二種,hash 值相等但 key 丌等,
出亍性能的考慮沖突的檢測放到客戶端處理(key 的原始值是存儲在消息文件中的,避免對數據文件的解析),
客戶端比較一次消息體的 key 是否相同。 - 存儲;為了節省空間索引項中存儲的時間是時間差值(存儲時間-開始時間,開始時間存儲在索引文件頭中),
整個索引文件是定長的,結構也是固定的。
6 總結
RocketMQ利用改了kafka的思想,針對使用文件做消息存儲做了大量的實踐和優化。commitLog一直順序寫,增大了寫消息的吞吐量,對pageCache的利用也很好地提升了相應的效率,使文件也擁有了內存般的效率。其中很多細節都值得參考和學習。
由於本人水平有限,可能會有理解錯誤和內容描述錯誤,歡迎討論和指正。
我的郵箱:ma.rong@nexuslink.cn
參考: