RocketMQ事務性消息及持久化


TransactionProducer(事務消息):

  在分布式系統中,我們時常會遇到分布式事務的問題,除了常規的解決方案之外,我們還可以利用RocketMQ的事務性消息來解決分布式事務的問題。RocketMQ和其他消息中間件最大的一個區別是支持了事務消息,這也是分布式事務里面的基於消息的最終一致性方案。

RocketMQ消息的事務架構設計:

  1. 生產者執行本地事務,修改訂單支付狀態,並且提交事務
  2. 生產者發送事務消息到broker上,消息發送到broker上在沒有確認之前,消息對於consumer是不可見狀態
  3. 生產者確認事務消息,使得發送到broker上的事務消息對於消費者可見
  4. 消費者獲取到消息進行消費,消費完之后執行ack進行確認

  這里可能會存在一個問題,生產者本地事務成功后,發送事務確認消息到broker上失敗了怎么辦?這個時候意味着消費者無法正常消費到這個消息。所以RocketMQ提供了消息回查機制,如果事務消息一直處於中間狀態,broker會發起重試去查詢broker上這個事務的處理狀態。一旦發現事務處理成功,則把當前這條消息設置為可見。

RocketMQ事務消息的實踐:

  生產者producer:

public class TransactionProducer {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, InterruptedException {
        TransactionMQProducer transactionMQProducer=new
                TransactionMQProducer("tx_producer");
        transactionMQProducer.setNamesrvAddr("192.168.1.101:9876;192.168.1.102:9876");
        ExecutorService executorService= Executors.newFixedThreadPool(10);
        transactionMQProducer.setExecutorService(executorService);
        transactionMQProducer.setTransactionListener(new TransactionListenerLocal()); //本地事務的監聽

        transactionMQProducer.start();

        for(int i=0;i<10;i++){
            String orderId= UUID.randomUUID().toString();
            String body="{'operation':'doOrder','orderId':'"+orderId+"'}";
            Message message=new Message("testTopic2",
                    null,orderId,body.getBytes(RemotingHelper.DEFAULT_CHARSET));
            transactionMQProducer.sendMessageInTransaction(message,orderId);
            Thread.sleep(1000);
        }
    }
}

  TransactionListenerLocal:

public class TransactionListenerLocal implements TransactionListener {

    private Map<String,Boolean> results=new ConcurrentHashMap<>();

    //執行本地事務
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        System.out.println("開始執行本地事務:"+o.toString()); //o
        String orderId=o.toString();
        //模擬數據庫保存(成功/失敗)
        boolean result=Math.abs(Objects.hash(orderId))%2==0;
        if(!result) {
            results.put(orderId, result); //
        }
        return result? LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW;
    }
    //提供給事務執行狀態檢查的回調方法,給broker用的(異步回調)
    //如果回查失敗,消息就丟棄
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        String orderId=messageExt.getKeys();
        System.out.println("執行事務回調檢查: orderId:"+orderId);
        if(results.size()==0){
           return LocalTransactionState.COMMIT_MESSAGE;
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

  消費端 consumer:

public class TransactionConsumer {

    //rocketMQ 除了在同一個組和不同組之間的消費者的特性和kafka相同之外
    //RocketMQ可以支持廣播消息,就意味着,同一個group的每個消費者都可以消費同一個消息
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer defaultMQPushConsumer=
                new DefaultMQPushConsumer("tx_consumer");
        defaultMQPushConsumer.setNamesrvAddr("192.168.1.101:9876;192.168.1.102:9876");
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //subExpression 可以支持sql的表達式. or  and  a=? ,,,
        defaultMQPushConsumer.subscribe("testTopic2","*");
        defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                list.stream().forEach(message->{
                    System.out.println("開始業務處理邏輯:消息體:"+new String(message.getBody())+"->key:"+message.getKeys());
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //簽收
            }
        });
        defaultMQPushConsumer.start();
    }

}

RocketMQ事務消息的三種狀態:

  1. ROLLBACK_MESSAGE:回滾事務
  2. COMMIT_MESSAGE: 提交事務
  3. UNKNOW: broker會定時的回查Producer消息狀態,直到徹底成功或失敗。

  當executeLocalTransaction方法返回ROLLBACK_MESSAGE時,表示直接回滾事務,當返回COMMIT_MESSAGE提交事務當返回UNKNOW時,Broker會在一段時間之后回查checkLocalTransaction,根據checkLocalTransaction返回狀態執行事務的操作(回滾或提交),如示例中,當返回ROLLBACK_MESSAGE時消費者不會收到消息,且不會調用回查函數,當返回COMMIT_MESSAGE時事務提交,消費者收到消息,當返回UNKNOW時,在一段時間之后調用回查函數,並根據status判斷返回提交或回滾狀態,返回提交狀態的消息將會被消費者消費,所以此時消費者可以消費部分消息

消息的存儲和發送:

  由於分布式消息隊列對於可靠性的要求比較高,所以需要保證生產者將消息發送到broker之后,保證消息是不出現丟失的,因此消息隊列就少不了對於可靠性存儲的要求

  從主流的幾種MQ消息隊列采用的存儲方式來看,主要會有三種

  1. 分布式KV存儲,比如ActiveMQ中采用的levelDB、Redis, 這種存儲方式對於消息讀寫能力要求不高的情況下可以使用
  2. 文件系統存儲,常見的比如kafka、RocketMQ、RabbitMQ都是采用消息刷盤到所部署的機器上的文件系統來做持久化,這種方案適合對於有高吞吐量要求的消息中間件,因為消息刷盤是一種高效率,高可靠、高性能的持久化方式,除非磁盤出現故障,否則一般是不會出現無法持久化的問題
  3. 關系型數據庫,比如ActiveMQ可以采用mysql作為消息存儲,關系型數據庫在單表數據量達到千萬級的情況下IO性能會出現瓶頸,所以ActiveMQ並不適合於高吞吐量的消息隊列場景。總的來說,對於存儲效率,文件系統要優於分布式KV存儲,分布式KV存儲要優於關系型數據庫

消息的存儲結構:

  RocketMQ就是采用文件系統的方式來存儲消息,消息的存儲是由ConsumeQueue和CommitLog配合完成的。CommitLog是消息真正的物理存儲文件。ConsumeQueue是消息的邏輯隊列,有點類似於數據庫的索引文件,里面存儲的是指向CommitLog文件中消息存儲的地址。每個Topic下的每個Message Queue都會對應一個ConsumeQueue文件,文件的地址是:${store_home}/consumequeue/${topicNmae}/${queueId}/${filename}, 默認路徑: /root/store在rocketMQ的文件存儲目錄下,可以看到這樣一個結構的的而文件。

CommitLog:

  CommitLog是用來存放消息的物理文件,每個broker上的commitLog本當前機器上的所有consumerQueue共享,不做任何的區分。CommitLog中的文件默認大小為1G,可以動態配置; 當一個文件寫滿以后,會生成一個新的commitlog文件。所有的Topic數據是順序寫入在CommitLog文件中的。文件名的長度為20位,左邊補0,剩余未起始偏移量,比如00000000000000000000 表示第一個文件, 文件大小為102410241024,當第一個文件寫滿之后,生成第二個文件000000000001073741824 表示第二個文件,起始偏移量為1073741824。

ConsumeQueue:

  consumeQueue表示消息消費的邏輯隊列,這里面包含MessageQueue在commitlog中的其實物理位置偏移量offset,消息實體內容的大小和Message Tag的hash值。對於實際物理存儲來說,consumeQueue對應每個topic和queueid下的文件,每個consumeQueue類型的文件也是有大小,每個文件默認大小約為600W個字節,如果文件滿了后會也會生成一個新的文件。

IndexFile:

  索引文件,如果一個消息包含Key值的話,會使用IndexFile存儲消息索引。Index索引文件提供了對CommitLog進行數據檢索,提供了一種通過key或者時間區間來查找CommitLog中的消息的方法。在物理存儲中,文件名是以創建的時間戳明明,固定的單個IndexFile大小大概為400M,一個IndexFile可以保存2000W個索引。

abort:

  broker在啟動的時候會創建一個空的名為abort的文件,並在shutdown時將其刪除,用於標識進程是否正常退出,如果不正常退出,會在啟動時做故障恢復。

Config:

  可以看到這個里面保存了 消費端consumer的偏移量:

  以及topic的一些配置信息:

消息存儲的整體結構:

  RocketMQ的消息存儲采用的是混合型的存儲結構,也就是Broker單個實例下的所有隊列公用一個日志數據文件CommitLog。這個是和Kafka又一個不同之處。為什么不采用kafka的設計,針對不同的partition存儲一個獨立的物理文件呢?這是因為在kafka的設計中,一旦kafka中Topic的Partition數量過多,隊列文件會過多,那么會給磁盤的IO讀寫造成比較大的壓力,也就造成了性能瓶頸。所以RocketMQ進行了優化,消息主題統一存儲在CommitLog中。當然它也有它的優缺點

  • 優點在於:由於消息主題都是通過CommitLog來進行讀寫,ConsumerQueue中只存儲很少的數據,所以隊列更加輕量化。對於磁盤的訪問是串行化從而避免了磁盤的競爭
  • 缺點在於:消息寫入磁盤雖然是基於順序寫,但是讀的過程確是隨機的。讀取一條消息會先讀取ConsumeQueue,再讀CommitLog,會降低消息讀的效率。

消息發送到消息接收的整體流程:

1. Producer將消息發送到Broker后,Broker會采用同步或者異步的方式把消息寫入到CommitLog。RocketMQ所有的消息都會存放在CommitLog中,為了保證消息存儲不發生混亂,對CommitLog寫之前會加鎖,同時也可以使得消息能夠被順序寫入到CommitLog,只要消息被持久化到磁盤文件CommitLog,那么就可以保證Producer發送的消息不會丟失。

2. commitLog持久化后,會把里面的消息Dispatch到對應的Consume Queue上,Consume Queue相當於kafka中的partition,是一個邏輯隊列,存儲了這個Queue在CommiLog中的起始offset,log大小和MessageTag的hashCode。

3. 當消費者進行消息消費時,會先讀取consumerQueue , 邏輯消費隊列ConsumeQueue保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量Offset,消息大小、和消息Tag的HashCode值

4. 直接從consumequeue中讀取消息是沒有數據的,真正的消息主體在commitlog中,所以還需要從commitlog中讀取消息

什么時候清理物理消息文件?那消息文件到底刪不刪,什么時候刪?

  消息存儲在CommitLog之后,的確是會被清理的,但是這個清理只會在以下任一條件成立才會批量刪除消息文件(CommitLog):

  • 消息文件過期(默認48小時),且到達清理時點(默認是凌晨4點),刪除過期文件。
  • 消息文件過期(默認48小時),且磁盤空間達到了水位線(默認75%),刪除過期文件。
  • 磁盤已經達到必須釋放的上限(85%水位線)的時候,則開始批量清理文件(無論是否過期),直到空間充足。

  注:若磁盤空間達到危險水位線(默認90%),出於保護自身的目的,broker會拒絕寫入服務。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM