(轉)RocketMQ源碼學習--消息存儲篇


http://www.tuicool.com/articles/umQfMzA

1.序言

今天來和大家探討一下RocketMQ在消息存儲方面所作出的努力,在介紹RocketMQ的存儲模型之前,可以先探討一下MQ的存儲模型選擇。

2.MQ的存儲模型選擇

個人看來,從MQ的類型來看,存儲模型分兩種:

  • 需要持久化(ActiveMQ,RabbitMQ,Kafka,RocketMQ)
  • 不需要持久化(ZeroMQ)

本篇文章主要討論持久化MQ的存儲模型,因為現在大多數的MQ都是支持持久化存儲,而且業務上也大多需要MQ有持久存儲的能力,能大大增加系統的高可用性,下面幾種存儲方式:

  • 分布式KV存儲(levelDB,RocksDB,redis)
  • 傳統的文件系統
  • 傳統的關系型數據庫

這幾種存儲方式從效率來看, 文件系統 > kv存儲 > 關系型數據庫 ,因為直接操作文件系統肯定是最快的,而關系型數據庫一般的TPS都不會很高,我印象中Mysql的寫不會超過5Wtps(現在不確定最新情況),所以如果追求效率就直接操作文件系統。

但是如果從可靠性和易實現的角度來說,則是 關系型數據庫 > kv存儲 > 文件系統 ,消息存在db里面非常可靠,但是性能會下降很多,所以具體的技術選型都是需要根據自己的業務需求去考慮。

3.RocketMQ的存儲架構

3.1存儲特點:

如上圖所示:
(1)消息主體以及元數據都存儲在**CommitLog**當中
(2)Consume Queue相當於kafka中的partition,是一個邏輯隊列,存儲了這個Queue在CommiLog中的起始offset,log大小和MessageTag的hashCode。 (3)每次讀取消息隊列先讀取consumerQueue,然后再通過consumerQueue去commitLog中拿到消息主體。
3.2為什么要這樣設計?

rocketMQ的設計理念很大程度借鑒了kafka,所以有必要介紹下kafka的存儲結構設計:

  • 存儲特點:

    和RocketMQ類似,每個Topic有多個partition(queue),kafka的每個partition都是一個獨立的物理文件,消息直接從里面讀寫。

根據之前阿里中間件團隊的測試,一旦kafka中Topic的partitoin數量過多,隊列文件會過多,會給磁盤的IO讀寫造成很大的壓力,造成tps迅速下降。

所以RocketMQ進行了上述這樣設計,consumerQueue中只存儲很少的數據,消息主體都是通過CommitLog來進行讀寫。

沒有一種方案是銀彈,那么RocketMQ這樣處理有什么 優缺點

  • 3.2.1優點:

    1、隊列輕量化,單個隊列數據量非常少。對磁盤的訪問串行化,避免磁盤竟爭,不會因為隊列增加導致IOWAIT增高。

  • 3.2.2缺點:

    寫雖然完全是順序寫,但是讀卻變成了完全的隨機讀。

    讀一條消息,會先讀ConsumeQueue,再讀CommitLog,增加了開銷。

    要保證CommitLog與ConsumeQueue完全的一致,增加了編程的復雜度。

  • 3.2.3以上缺點如何克服
    隨機讀,盡可能讓讀命中page cache,減少IO讀操作,所以內存越大越好。如果系統中堆積的消息過多,讀數據要訪問磁盤會不會由於隨機讀導致系統性能急劇下降,答案是否定的。
    訪問page cache 時,即使只訪問1k的消息,系統也會提前預讀出更多數據,在下次讀時,就可能命中內存。
    隨機訪問Commit Log磁盤數據,系統IO調度算法設置為NOOP方式,會在一定程度上將完全的隨機讀變成順序跳躍方式,而順序跳躍方式讀較完全的隨機讀性能會高5倍以上。
    另外4k的消息在完全隨機訪問情況下,仍然可以達到8K次每秒以上的讀性能。
    由於Consume Queue存儲數據量極少,而且是順序讀,在PAGECACHE預讀作用下,Consume Queue的讀性能幾乎與內存一致,即使堆積情況下。所以可認為Consume Queue完全不會阻礙讀性能。
    Commit Log中存儲了所有的元信息,包含消息體,類似於Mysql、Oracle的redolog,所以只要有Commit Log在,Consume Queue即使數據丟失,仍然可以恢復出來。

4 底層實現

先討論下RocketMQ中存儲的底層實現:

4.1 MappedByteBuffer

RocketMQ中的文件讀寫主要就是通過MappedByteBuffer進行操作,來進行文件映射。利用了nio中的FileChannel模型,可以直接將物理文件映射到緩沖區,提高讀寫速度。

具體的測試我沒有做benchmark,網上有相應的測試。

4.2 page cache

剛剛提到的緩沖區,也就是之前說到的page cache。

通俗的說:pageCache是系統讀寫磁盤時為了提高性能將部分文件緩存到內存中,下面是詳細解釋:

page cache:這里所提及到的page cache,在我看來是linux中vfs虛擬文件系統層的cache層,一般pageCache默認是4K大小,它被操作系統的內存管理模塊所管理,文件被映射到內存,一般都是被mmap()函數映射上去的。

mmap()函數會返回一個指針,指向邏輯地址空間中的邏輯地址,邏輯地址通過MMU映射到page cache上。

關於內存映射我推薦一篇博客:

內存映射
4.3 總結

總結一下這里使用的存儲底層(我認為的): 通過將文件映射到內存上,直接操作文件,相比於傳統的io(首先要調用系統IO,然后要將數據從內核空間傳輸到用戶空間),避免了很多不必要的數據拷貝,所以這種技術也被稱為 零拷貝 ,具體可見IBM團隊關於零拷貝的博客:

零拷貝

5 具體實現

5.1 對象架構簡介

先說消息實體存儲的流程,老規矩,看圖說話,先畫個UML圖:

下面簡要介紹一下各個關鍵對象的作用:

DefaultMessageStore:這是存儲模塊里面最重要的一個類,包含了很多對存儲文件的操作API,其他模塊對消息實體的操作都是通過DefaultMessageStore進行操作。

commitLog:commitLog是所有物理消息實體的存放文件,這篇文章的架構圖里可以看得到。其中commitLog持有了MapedFileQueue

**consumeQueue:**consumeQueue就對應了相對的每個topic下的一個邏輯隊列(rocketMQ中叫queque,kafka的概念里叫partition), 它是一個邏輯隊列!存儲了消息在commitLog中的offSet。

indexFile:存儲具體消息索引的文件,以一個類似hash桶的數據結構進行索引維護。

MapedFileQueue:這個對象包含一個MapedFileList,維護了多個mapedFile,升序存儲。一個MapedFileQueue針對的就是一個目錄下的所有二進制存儲文件。理論上無線增長,定期刪除過期文件。

(圖中左側的目錄樹中,一個0目錄就是一個MapedFileQueue,一個commitLog目錄也是一個MapedFileQueue,右側的000000000就是一個MapedFile。)

MapedFile:每個MapedFile對應的就是一個物理二進制文件了在代碼中負責文件讀寫的就是MapedByteBuffer和fileChannel。相當於對pageCache文件的封裝。

5.2 消息存儲主流程

我根據源碼畫了消息存儲的時序圖,大致都是線性的調用,其中包含一些對pageCache是否繁忙、處理時間是否超時以及參數的校驗。

5.2.1 consumeQueue的消息處理

上述的消息存儲只是把消息主體存儲到了物理文件中,但是並沒有把消息處理到consumeQueue文件中,那么到底是哪里存入的?

任務處理一般都分為兩種:

  • 一種是同步,把消息主體存入到commitLog的同時把消息存入consumeQueue,rocketMQ的早期版本就是這樣處理的。

  • 另一種是異步處理,起一個線程,不停的輪詢,將當前的consumeQueue中的offSet和commitLog中的offSet進行對比,將多出來的offSet進行解析,然后put到consumeQueue中的MapedFile中。

問題:為什么要改同步為異步處理?應該是為了增加發送消息的吞吐量。

5.2.2 刷盤策略實現消息在調用MapedFile的appendMessage后,也只是將消息裝載到了ByteBuffer中,也就是內存中,還沒有落盤落盤需要將內存flush到磁盤上,針對commitLog,rocketMQ提供了兩種落盤方式。

 

異步落盤

 

 

public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");
    //不停輪詢
    while (!this.isStoped()) {
        boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();

        int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
        //拿到要刷盤的頁數
        int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();

        int flushPhysicQueueThoroughInterval =
                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

        boolean printFlushProgress = false;

        // Print flush progress
        long currentTimeMillis = System.currentTimeMillis();
        //控制刷盤間隔,如果當前的時間還沒到刷盤的間隔時間則不刷
        if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
            this.lastFlushTimestamp = currentTimeMillis;
            flushPhysicQueueLeastPages = 0;
            printFlushProgress = ((printTimes++ % 10) == 0);
        }

        try {
            //是否需要刷盤休眠
            if (flushCommitLogTimed) {
                Thread.sleep(interval);
            } else {
                this.waitForRunning(interval);
            }

            if (printFlushProgress) {
                this.printFlushProgress();
            }
            //commit開始刷盤
            CommitLog.this.mapedFileQueue.commit(flushPhysicQueueLeastPages); long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp();
            if (storeTimestamp > 0) {
                CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
            }
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
            this.printFlushProgress();
        }
    }

    // Normal shutdown, to ensure that all the flush before exit
    boolean result = false;
    for (int i = 0; i < RetryTimesOver && !result; i++) {
        result = CommitLog.this.mapedFileQueue.commit(0);
        CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
    }

    this.printFlushProgress();

    CommitLog.log.info(this.getServiceName() + " service end");
}

再看一下刷盤時檢查是否能刷的細節代碼:

MappedFile.java

public int commit(final int flushLeastPages) {
    //判斷當前是否能刷盤
    if (this.isAbleToFlush(flushLeastPages)) {
        //類似於一個智能指針,控制刷盤線程數
        if (this.hold()) {
            int value = this.wrotePostion.get();
            System.out.println("value is "+value+",thread is "+Thread.currentThread().getName());
            //刷盤,內存到硬盤
            this.mappedByteBuffer.force();
            this.committedPosition.set(value);
            //釋放智能指針
            this.release();
        } else {
            log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
            this.committedPosition.set(this.wrotePostion.get());
        }
    }

    return this.getCommittedPosition();
}

//判斷是否能刷盤
private boolean isAbleToFlush(final int flushLeastPages) {
    //已經刷到的位置
    int flush = this.committedPosition.get();
    //寫到內存的位置
    int write = this.wrotePostion.get();

    System.out.println("flush is "+flush+",write is "+write);

    if (this.isFull()) {
        return true;
    }

    //滿足寫到內存的offset比已經刷盤的offset大4K*4(默認的最小刷盤頁數,一頁默認4k)
    if (flushLeastPages > 0) {
        return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
    }

    return write > flush;
}

總的來說RocketMQ使用了java nio的文件api進行文件內存倒硬盤的持久化。主要是MappedByteBuffer之類的一些api。

  • 同步落盤

批量落盤不同於之前的異步落盤,使用兩個讀寫list交替來避免上鎖,提高效率。

同時使用了countDownLatch來等待刷盤的間隔,消息的刷盤必須等待GroupCommitRequest的喚醒。

//封裝的一次刷盤請求
public static class GroupCommitRequest {
    //這次請求要刷到的offSet位置,比如已經刷到2,
    private final long nextOffset;
    //控制flush的拴
    private final CountDownLatch countDownLatch = new CountDownLatch(1);
    private volatile boolean flushOK = false;


    public GroupCommitRequest(long nextOffset) {
        this.nextOffset = nextOffset;
    }


    public long getNextOffset() {
        return nextOffset;
    }

    //刷完了喚醒
    public void wakeupCustomer(final boolean flushOK) {
        this.flushOK = flushOK;
        this.countDownLatch.countDown();
    }


    public boolean waitForFlush(long timeout) {
        try {
            this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
            return this.flushOK;
        } catch (InterruptedException e) {
            e.printStackTrace();
            return false;
        }
    }
}

/**
 * GroupCommit Service
 * 批量刷盤服務
 */
class GroupCommitService extends FlushCommitLogService {
    //用來接收消息的隊列,提供寫消息
    private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
    //用來讀消息的隊列,將消息從內存讀到硬盤
    private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();

    //添加一個刷盤的request
    public void putRequest(final GroupCommitRequest request) {
        synchronized (this) {
            //添加到寫消息的list中
            this.requestsWrite.add(request);
            //喚醒其他線程
            if (!this.hasNotified) {
                this.hasNotified = true;
                this.notify();
            }
        }
    }

    //交換讀寫隊列,避免上鎖
    private void swapRequests() {
        List<GroupCommitRequest> tmp = this.requestsWrite;
        this.requestsWrite = this.requestsRead;
        this.requestsRead = tmp;
    }


    private void doCommit() {
        //讀隊列不為空
        if (!this.requestsRead.isEmpty()) {
            //遍歷
            for (GroupCommitRequest req : this.requestsRead) {
                // There may be a message in the next file, so a maximum of
                // two times the flush
                boolean flushOK = false;
                for (int i = 0; (i < 2) && !flushOK; i++) {
                    //
                    flushOK = (CommitLog.this.mapedFileQueue.getCommittedWhere() >= req.getNextOffset());
                    //如果沒刷完 即flushOK為false則繼續刷
                    if (!flushOK) {
                        CommitLog.this.mapedFileQueue.commit(0);
                    }
                }
                //刷完了喚醒
                req.wakeupCustomer(flushOK);
            }

            long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp();
            if (storeTimestamp > 0) {
                CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
            }
            //清空讀list
            this.requestsRead.clear();
        } else {
            // Because of individual messages is set to not sync flush, it
            // will come to this process
            CommitLog.this.mapedFileQueue.commit(0);
        }
    }


    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");

        while (!this.isStoped()) {
            try {
                this.waitForRunning(0);
                this.doCommit();
            } catch (Exception e) {
                CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }

        // Under normal circumstances shutdown, wait for the arrival of the
        // request, and then flush
        //正常關閉時要把沒刷完的刷完
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            CommitLog.log.warn("GroupCommitService Exception, ", e);
        }

        synchronized (this) {
            this.swapRequests();
        }

        this.doCommit();

        CommitLog.log.info(this.getServiceName() + " service end");
    }
    }

5.3 消息索引

5.3.1 消息索引的作用

這里的消息索引主要是提供根據起始時間、topic和key來查詢消息的接口。

首先根據給的topic、key以及起始時間查詢到一個list,然后將offset拉到commitLog中查詢,再反序列化成消息實體。

5.3.2 索引的具體實現

看一張圖,摘自官方文檔:

索引的邏輯結構類似一個hashMap。

先看什么時候開始構建索引:

構建consumeQueue的同時會buildIndex構建索引

如何構建索引?

public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    //索引頭的索引數小於indexNum
    if (this.indexHeader.getIndexCount() < this.indexNum) {
        //根據key第一次計算hash
        int keyHash = indexKeyHashMethod(key);
        //第二次計算出hash槽位
        int slotPos = keyHash % this.hashSlotNum;
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * HASH_SLOT_SIZE;

        FileLock fileLock = null;

        try {

            // fileLock = this.fileChannel.lock(absSlotPos, HASH_SLOT_SIZE,
            // false);
            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            if (slotValue <= INVALID_INDEX || slotValue > this.indexHeader.getIndexCount()) {
                slotValue = INVALID_INDEX;
            }

            long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();


            timeDiff = timeDiff / 1000;


            if (this.indexHeader.getBeginTimestamp() <= 0) {
                timeDiff = 0;
            } else if (timeDiff > Integer.MAX_VALUE) {
                timeDiff = Integer.MAX_VALUE;
            } else if (timeDiff < 0) {
                timeDiff = 0;
            }

            int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * HASH_SLOT_SIZE
                            + this.indexHeader.getIndexCount() * INDEX_SIZE;

            //放入索引的內容
            this.mappedByteBuffer.putInt(absIndexPos, keyHash);
            this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);


            this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());


            if (this.indexHeader.getIndexCount() <= 1) {
                this.indexHeader.setBeginPhyOffset(phyOffset);
                this.indexHeader.setBeginTimestamp(storeTimestamp);
            }

            this.indexHeader.incHashSlotCount();
            this.indexHeader.incIndexCount();
            this.indexHeader.setEndPhyOffset(phyOffset);
            this.indexHeader.setEndTimestamp(storeTimestamp);

            return true;
        } catch (Exception e) {
            log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
        } finally {
            if (fileLock != null) {
                try {
                    fileLock.release();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    } else {
        log.warn("putKey index count " + this.indexHeader.getIndexCount() + " index max num "
                + this.indexNum);
    }

    return false;
}

下面摘自官方文檔:

  1. 根據查詢的 key 的 hashcode%slotNum 得到具體的槽的位置(slotNum 是一個索引文件里面包含的最大槽的數目,
    例如圖中所示 slotNum=5000000) 。
  2. 根據 slotValue(slot 位置對應的值)查找到索引項列表的最后一項(倒序排列,slotValue 總是挃吐最新的一個項目開源主頁: https://github.com/alibaba/RocketMQ
    21
    索引項) 。
  3. 遍歷索引項列表迒回查詢時間范圍內的結果集(默訃一次最大迒回的 32 條記彔)
  4. Hash 沖突;尋找 key 的 slot 位置時相當亍執行了兩次散列函數,一次 key 的 hash,一次 key 的 hash 值叏模,
    因此返里存在兩次沖突的情冴;第一種,key 的 hash 值丌同但模數相同,此時查詢的時候會在比較一次 key 的
    hash 值(每個索引項保存了 key 的 hash 值),過濾掉 hash 值丌相等的項。第二種,hash 值相等但 key 丌等,
    出亍性能的考慮沖突的檢測放到客戶端處理(key 的原始值是存儲在消息文件中的,避免對數據文件的解析),
    客戶端比較一次消息體的 key 是否相同。
  5. 存儲;為了節省空間索引項中存儲的時間是時間差值(存儲時間-開始時間,開始時間存儲在索引文件頭中),
    整個索引文件是定長的,結構也是固定的。

6 總結

RocketMQ利用改了kafka的思想,針對使用文件做消息存儲做了大量的實踐和優化。commitLog一直順序寫,增大了寫消息的吞吐量,對pageCache的利用也很好地提升了相應的效率,使文件也擁有了內存般的效率。其中很多細節都值得參考和學習。

由於本人水平有限,可能會有理解錯誤和內容描述錯誤,歡迎討論和指正。

我的郵箱:ma.rong@nexuslink.cn

參考:

阿里雲棲社區-RocketMQ關鍵特性

RocketMQ原理解析

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM