RocketMQ消息存儲和ack


消息存儲架構

主要有下面三個跟消息存儲相關的文件構成。

(1) CommitLog:消息主體以及元數據的存儲主體,存儲Producer端寫入的消息主體內容,消息內容不是定長的。單個文件大小默認1G ,文件名長度為20位,左邊補零,剩余為起始偏移量,比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為1G=1073741824;當第一個文件寫滿了,第二個文件為00000000001073741824,起始偏移量為1073741824,以此類推。消息主要是順序寫入日志文件,當文件滿了,寫入下一個文件;

(2) ConsumeQueue:消息消費隊列,引入的目的主要是提高消息消費的性能,由於RocketMQ是基於主題topic的訂閱模式,消息消費是針對主題進行的,如果要遍歷commitlog文件中根據topic檢索消息是非常低效的。Consumer即可根據ConsumeQueue來查找待消費的消息。其中,ConsumeQueue(邏輯消費隊列)作為消費消息的索引,保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基於topic的commitlog索引文件,故consumequeue文件夾的組織方式如下:topic/queue/file三層組織結構,具體存儲路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同樣consumequeue文件采取定長設計,每一個條目共20個字節,分別為8字節的commitlog物理偏移量、4字節的消息長度、8字節tag hashcode,單個文件由30W個條目組成,可以像數組一樣隨機訪問每一個條目,每個ConsumeQueue文件大小約5.72M;

(3) IndexFile:IndexFile(索引文件)提供了一種可以通過key或時間區間來查詢消息的方法。Index文件的存儲位置是:$HOME \store\index${fileName},文件名fileName是以創建時的時間戳命名的,固定的單個IndexFile文件大小約為400M,一個IndexFile可以保存 2000W個索引,IndexFile的底層存儲設計為在文件系統中實現HashMap結構,故rocketmq的索引文件其底層實現為hash索引。

在上面的RocketMQ的消息存儲整體架構圖中可以看出,RocketMQ采用的是混合型的存儲結構,即為Broker單個實例下所有的隊列共用一個日志數據文件(即為CommitLog)來存儲。RocketMQ的混合型存儲結構(多個Topic的消息實體內容都存儲於一個CommitLog中)針對Producer和Consumer分別采用了數據和索引部分相分離的存儲結構,Producer發送消息至Broker端,然后Broker端使用同步或者異步的方式對消息刷盤持久化,保存至CommitLog中。只要消息被刷盤持久化至磁盤文件CommitLog中,那么Producer發送的消息就不會丟失。正因為如此,Consumer也就肯定有機會去消費這條消息。當無法拉取到消息后,可以等下一次消息拉取,同時服務端也支持長輪詢模式,如果一個消息拉取請求未拉取到消息,Broker允許等待30s的時間,只要這段時間內有新消息到達,將直接返回給消費端。這里,RocketMQ的具體做法是,使用Broker端的后台服務線程—ReputMessageService不停地分發請求並異步構建ConsumeQueue(邏輯消費隊列)和IndexFile(索引文件)數據。

PageCache與Mmap內存映射

這里有必要先稍微簡單地介紹下page cache的概念。系統的所有文件I/O請求,操作系統都是通過page cache機制實現的。對於操作系統來說,磁盤文件都是由一系列的數據塊順序組成,數據塊的大小由操作系統本身而決定,x86的linux中一個標准頁面大小是4KB。
操作系統內核在處理文件I/O請求時,首先到page cache中查找(page cache中的每一個數據塊都設置了文件以及偏移量地址信息),如果未命中,則啟動磁盤I/O,將磁盤文件中的數據塊加載到page cache中的一個空閑塊,然后再copy到用戶緩沖區中。
page cache本身也會對數據文件進行預讀取,對於每個文件的第一個讀請求操作,系統在讀入所請求頁面的同時會讀入緊隨其后的少數幾個頁面。因此,想要提高page cache的命中率(盡量讓訪問的頁在物理內存中),從硬件的角度來說肯定是物理內存越大越好。從操作系統層面來說,訪問page cache時,即使只訪問1k的消息,系統也會提前預讀取更多的數據,在下次讀取消息時, 就很可能可以命中內存。
在RocketMQ中,ConsumeQueue邏輯消費隊列存儲的數據較少,並且是順序讀取,在page cache機制的預讀取作用下,Consume Queue的讀性能會比較高近乎內存,即使在有消息堆積情況下也不會影響性能。而對於CommitLog消息存儲的日志數據文件來說,讀取消息內容時候會產生較多的隨機訪問讀取,嚴重影響性能。如果選擇合適的系統IO調度算法,比如設置調度算法為“Noop”(此時塊存儲采用SSD的話),隨機讀的性能也會有所提升。
另外,RocketMQ主要通過MappedByteBuffer對文件進行讀寫操作。其中,利用了NIO中的FileChannel模型直接將磁盤上的物理文件直接映射到用戶態的內存地址中(這種Mmap的方式減少了傳統IO將磁盤文件數據在操作系統內核地址空間的緩沖區和用戶應用程序地址空間的緩沖區之間來回進行拷貝的性能開銷),將對文件的操作轉化為直接對內存地址進行操作,從而極大地提高了文件的讀寫效率(這里需要注意的是,采用MappedByteBuffer這種內存映射的方式有幾個限制,其中之一是一次只能映射1.5~2G 的文件至用戶態的虛擬內存,這也是為何RocketMQ默認設置單個CommitLog日志數據文件為1G的原因了)。

原理:

https://blog.csdn.net/mg0832058/article/details/5890688

https://www.cnblogs.com/huxiao-tee/p/4660352.html

消息刷盤

(1) 同步刷盤:如上圖所示,只有在消息真正持久化至磁盤后RocketMQ的Broker端才會真正返回給Producer端一個成功的ACK響應。同步刷盤對MQ消息可靠性來說是一種不錯的保障,但是性能上會有較大影響,一般適用於金融業務應用該模式較多。

(2) 異步刷盤:能夠充分利用OS的PageCache的優勢,只要消息寫入PageCache即可將成功的ACK返回給Producer端。消息刷盤采用后台異步線程提交的方式進行,降低了讀寫延遲,提高了MQ的性能和吞吐量。

生產階段的持久化保障(消息不丟失)

消息發送同步刷盤

    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    producer.setNamesrvAddr("192.168.10.11:9876");
    producer.start();
    Message sendMessage = new Message(
                JmsConfig.TOPIC,
                "訂單001".getBytes());
    try {
     SendResult sendResult1 = producer.send(sendMessage);
    } catch (RemotingException e) {
        e.printStackTrace();
    } catch (MQBrokerException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();

異步刷盤

producer.getProducer().send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    //log.info("Product-異步發送-輸出信息={}", sendResult);
                    System.out.println("Product-異步發送-輸出信息={}" + sendResult);
                }
                @Override
                public void onException(Throwable e) {
                    //e.printStackTrace();
                    System.out.println("Product-異步發送-異常" + e.getMessage());
                    //寫入補償log,進行重發
                }
            });

他們都有一個共同點,即都需要返回SendResult 

消息發送成功僅代表消息已經到了 Broker 端,Broker 在不同配置下,可能會返回不同響應狀態:

  • SendStatus.SEND_OK
  • SendStatus.FLUSH_DISK_TIMEOUT
  • SendStatus.FLUSH_SLAVE_TIMEOUT
  • SendStatus.SLAVE_NOT_AVAILABLE

實例

[sendStatus=SEND_OK, msgId=A9FEC2CC067C18B4AAC2233C37560000, offsetMsgId=C0A80A0B00002A9F00000000007EA45A, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=1], queueOffset=1]

不管是同步還是異步的方式,都會碰到網絡問題導致發送失敗的情況。針對這種情況,我們可以設置合理的重試次數,當出現網絡問題,可以自動重試。設置方式如下:

// 同步發送消息重試次數,默認為 2
mqProducer.setRetryTimesWhenSendFailed(3);
// 異步發送消息重試次數,默認為 2
mqProducer.setRetryTimesWhenSendAsyncFailed(3);

生產者在發送消息時,同步消息失敗會重投,異步消息有重試,oneway沒有任何保證。消息重投保證消息盡可能發送成功、不丟失,但可能會造成消息重復,消息重復在RocketMQ中是無法避免的問題。消息重復在一般情況下不會發生,當出現消息量大、網絡抖動,消息重復就會是大概率事件。另外,生產者主動重發、consumer負載變化也會導致重復消息。如下方法可以設置消息重試策略:

  • retryTimesWhenSendFailed:同步發送失敗重投次數,默認為2,因此生產者會最多嘗試發送retryTimesWhenSendFailed + 1次。不會選擇上次失敗的broker,嘗試向其他broker發送,最大程度保證消息不丟。超過重投次數,拋出異常,由客戶端保證消息不丟。當出現RemotingException、MQClientException和部分MQBrokerException時會重投。
  • retryTimesWhenSendAsyncFailed:異步發送失敗重試次數,異步重試不會選擇其他broker,僅在同一個broker上做重試,不保證消息不丟。
  • retryAnotherBrokerWhenNotStoreOK:消息刷盤(主或備)超時或slave不可用(返回狀態非SEND_OK),是否嘗試發送到其他broker,默認false。十分重要消息可以開啟。

Broker的持久化保障

默認情況下,消息只要到了 Broker 端,將會優先保存到內存中,然后立刻返回確認響應給生產者。隨后 Broker 定期批量的將一組消息從內存異步刷入磁盤。

這種方式減少 I/O 次數,可以取得更好的性能,但是如果發生機器掉電,異常宕機等情況,消息還未及時刷入磁盤,就會出現丟失消息的情況。

若想保證 Broker 端不丟消息,保證消息的可靠性,我們需要將消息保存機制修改為同步刷盤方式,即消息存儲磁盤成功,才會返回響應。

修改 Broker 端配置如下:

## 默認情況為 ASYNC_FLUSH 
flushDiskType = SYNC_FLUSH

為了保證可用性,Broker 通常采用一主(master)多從(slave)部署方式。為了保證消息不丟失,消息還需要復制到 slave 節點。

默認方式下,消息寫入 master 成功,就可以返回確認響應給生產者,接着消息將會異步復制到 slave 節點。此時若 master 突然宕機且不可恢復,那么還未復制到 slave 的消息將會丟失。為了進一步提高消息的可靠性,我們可以采用同步的復制方式,master 節點將會同步等待 slave 節點復制完成,才會返回確認響應。

Broker master 節點 同步復制配置如下: 

## 默認為 ASYNC_MASTER 
brokerRole=SYNC_MASTER

如果 slave 節點未在指定時間內同步返回響應,生產者將會收到 SendStatus.FLUSH_SLAVE_TIMEOUT 返回狀態。

RocketMQ默認broker的刷盤策略為異步刷盤,如果有主從,同步策略也默認的是異步同步,這樣子可以提高broker處理消息的效率,但是會有丟失的可能性。因此可以通過同步刷盤策略+同步slave策略+主從的方式解決丟失消息的可能。

結合生產階段與存儲階段,若需要嚴格保證消息不丟失,broker 需要采用如下配置:

## master 節點配置
flushDiskType = SYNC_FLUSH
brokerRole=SYNC_MASTER
 
## slave 節點配置
brokerRole=slave
flushDiskType = SYNC_FLUSH

同時這個過程我們還需要生產者配合,判斷返回狀態是否是 SendStatus.SEND_OK。若是其他狀態,就需要考慮補償重試。

其實這些機制和kafak非常相似。

雖然上述配置提高消息的高可靠性,但是會降低性能,生產實踐中需要綜合選擇,到底是犧牲性能保證消息不丟失,還是高性能(默認的異步落盤)允許少量丟失(實際上也就是斷電的那一剎那會丟失極少量的數據),我建議如果是金融賬單業務,可以犧牲性能保證高可用,其他都可以使用log記錄,后期補償的方式進行。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM