TransactionProducer(事務消息):
在分布式系統中,我們時常會遇到分布式事務的問題,除了常規的解決方案之外,我們還可以利用RocketMQ的事務性消息來解決分布式事務的問題。RocketMQ和其他消息中間件最大的一個區別是支持了事務消息,這也是分布式事務里面的基於消息的最終一致性方案。
RocketMQ消息的事務架構設計:
- 生產者執行本地事務,修改訂單支付狀態,並且提交事務
- 生產者發送事務消息到broker上,消息發送到broker上在沒有確認之前,消息對於consumer是不可見狀態
- 生產者確認事務消息,使得發送到broker上的事務消息對於消費者可見
- 消費者獲取到消息進行消費,消費完之后執行ack進行確認
這里可能會存在一個問題,生產者本地事務成功后,發送事務確認消息到broker上失敗了怎么辦?這個時候意味着消費者無法正常消費到這個消息。所以RocketMQ提供了消息回查機制,如果事務消息一直處於中間狀態,broker會發起重試去查詢broker上這個事務的處理狀態。一旦發現事務處理成功,則把當前這條消息設置為可見。
RocketMQ事務消息的實踐:
生產者producer:
public class TransactionProducer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, InterruptedException { TransactionMQProducer transactionMQProducer=new TransactionMQProducer("tx_producer"); transactionMQProducer.setNamesrvAddr("192.168.1.101:9876;192.168.1.102:9876"); ExecutorService executorService= Executors.newFixedThreadPool(10); transactionMQProducer.setExecutorService(executorService); transactionMQProducer.setTransactionListener(new TransactionListenerLocal()); //本地事務的監聽 transactionMQProducer.start(); for(int i=0;i<10;i++){ String orderId= UUID.randomUUID().toString(); String body="{'operation':'doOrder','orderId':'"+orderId+"'}"; Message message=new Message("testTopic2", null,orderId,body.getBytes(RemotingHelper.DEFAULT_CHARSET)); transactionMQProducer.sendMessageInTransaction(message,orderId); Thread.sleep(1000); } } }
TransactionListenerLocal:
public class TransactionListenerLocal implements TransactionListener { private Map<String,Boolean> results=new ConcurrentHashMap<>(); //執行本地事務 @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { System.out.println("開始執行本地事務:"+o.toString()); //o String orderId=o.toString(); //模擬數據庫保存(成功/失敗) boolean result=Math.abs(Objects.hash(orderId))%2==0; if(!result) { results.put(orderId, result); // } return result? LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW; } //提供給事務執行狀態檢查的回調方法,給broker用的(異步回調) //如果回查失敗,消息就丟棄 @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { String orderId=messageExt.getKeys(); System.out.println("執行事務回調檢查: orderId:"+orderId); if(results.size()==0){ return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE; } }
消費端 consumer:
public class TransactionConsumer { //rocketMQ 除了在同一個組和不同組之間的消費者的特性和kafka相同之外 //RocketMQ可以支持廣播消息,就意味着,同一個group的每個消費者都可以消費同一個消息 public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer defaultMQPushConsumer= new DefaultMQPushConsumer("tx_consumer"); defaultMQPushConsumer.setNamesrvAddr("192.168.1.101:9876;192.168.1.102:9876"); defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //subExpression 可以支持sql的表達式. or and a=? ,,, defaultMQPushConsumer.subscribe("testTopic2","*"); defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { list.stream().forEach(message->{ System.out.println("開始業務處理邏輯:消息體:"+new String(message.getBody())+"->key:"+message.getKeys()); }); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //簽收 } }); defaultMQPushConsumer.start(); } }
RocketMQ事務消息的三種狀態:
- ROLLBACK_MESSAGE:回滾事務
- COMMIT_MESSAGE: 提交事務
- UNKNOW: broker會定時的回查Producer消息狀態,直到徹底成功或失敗。
當executeLocalTransaction方法返回ROLLBACK_MESSAGE時,表示直接回滾事務,當返回COMMIT_MESSAGE提交事務當返回UNKNOW時,Broker會在一段時間之后回查checkLocalTransaction,根據checkLocalTransaction返回狀態執行事務的操作(回滾或提交),如示例中,當返回ROLLBACK_MESSAGE時消費者不會收到消息,且不會調用回查函數,當返回COMMIT_MESSAGE時事務提交,消費者收到消息,當返回UNKNOW時,在一段時間之后調用回查函數,並根據status判斷返回提交或回滾狀態,返回提交狀態的消息將會被消費者消費,所以此時消費者可以消費部分消息
消息的存儲和發送:
由於分布式消息隊列對於可靠性的要求比較高,所以需要保證生產者將消息發送到broker之后,保證消息是不出現丟失的,因此消息隊列就少不了對於可靠性存儲的要求
從主流的幾種MQ消息隊列采用的存儲方式來看,主要會有三種
- 分布式KV存儲,比如ActiveMQ中采用的levelDB、Redis, 這種存儲方式對於消息讀寫能力要求不高的情況下可以使用
- 文件系統存儲,常見的比如kafka、RocketMQ、RabbitMQ都是采用消息刷盤到所部署的機器上的文件系統來做持久化,這種方案適合對於有高吞吐量要求的消息中間件,因為消息刷盤是一種高效率,高可靠、高性能的持久化方式,除非磁盤出現故障,否則一般是不會出現無法持久化的問題
- 關系型數據庫,比如ActiveMQ可以采用mysql作為消息存儲,關系型數據庫在單表數據量達到千萬級的情況下IO性能會出現瓶頸,所以ActiveMQ並不適合於高吞吐量的消息隊列場景。總的來說,對於存儲效率,文件系統要優於分布式KV存儲,分布式KV存儲要優於關系型數據庫
消息的存儲結構:
RocketMQ就是采用文件系統的方式來存儲消息,消息的存儲是由ConsumeQueue和CommitLog配合完成的。CommitLog是消息真正的物理存儲文件。ConsumeQueue是消息的邏輯隊列,有點類似於數據庫的索引文件,里面存儲的是指向CommitLog文件中消息存儲的地址。每個Topic下的每個Message Queue都會對應一個ConsumeQueue文件,文件的地址是:${store_home}/consumequeue/${topicNmae}/${queueId}/${filename}, 默認路徑: /root/store在rocketMQ的文件存儲目錄下,可以看到這樣一個結構的的而文件。
CommitLog:
CommitLog是用來存放消息的物理文件,每個broker上的commitLog本當前機器上的所有consumerQueue共享,不做任何的區分。CommitLog中的文件默認大小為1G,可以動態配置; 當一個文件寫滿以后,會生成一個新的commitlog文件。所有的Topic數據是順序寫入在CommitLog文件中的。文件名的長度為20位,左邊補0,剩余未起始偏移量,比如00000000000000000000 表示第一個文件, 文件大小為102410241024,當第一個文件寫滿之后,生成第二個文件000000000001073741824 表示第二個文件,起始偏移量為1073741824。
ConsumeQueue:
consumeQueue表示消息消費的邏輯隊列,這里面包含MessageQueue在commitlog中的其實物理位置偏移量offset,消息實體內容的大小和Message Tag的hash值。對於實際物理存儲來說,consumeQueue對應每個topic和queueid下的文件,每個consumeQueue類型的文件也是有大小,每個文件默認大小約為600W個字節,如果文件滿了后會也會生成一個新的文件。
IndexFile:
索引文件,如果一個消息包含Key值的話,會使用IndexFile存儲消息索引。Index索引文件提供了對CommitLog進行數據檢索,提供了一種通過key或者時間區間來查找CommitLog中的消息的方法。在物理存儲中,文件名是以創建的時間戳明明,固定的單個IndexFile大小大概為400M,一個IndexFile可以保存2000W個索引。
abort:
broker在啟動的時候會創建一個空的名為abort的文件,並在shutdown時將其刪除,用於標識進程是否正常退出,如果不正常退出,會在啟動時做故障恢復。
Config:
可以看到這個里面保存了 消費端consumer的偏移量:
以及topic的一些配置信息:
消息存儲的整體結構:
RocketMQ的消息存儲采用的是混合型的存儲結構,也就是Broker單個實例下的所有隊列公用一個日志數據文件CommitLog。這個是和Kafka又一個不同之處。為什么不采用kafka的設計,針對不同的partition存儲一個獨立的物理文件呢?這是因為在kafka的設計中,一旦kafka中Topic的Partition數量過多,隊列文件會過多,那么會給磁盤的IO讀寫造成比較大的壓力,也就造成了性能瓶頸。所以RocketMQ進行了優化,消息主題統一存儲在CommitLog中。當然它也有它的優缺點
- 優點在於:由於消息主題都是通過CommitLog來進行讀寫,ConsumerQueue中只存儲很少的數據,所以隊列更加輕量化。對於磁盤的訪問是串行化從而避免了磁盤的競爭
- 缺點在於:消息寫入磁盤雖然是基於順序寫,但是讀的過程確是隨機的。讀取一條消息會先讀取ConsumeQueue,再讀CommitLog,會降低消息讀的效率。
消息發送到消息接收的整體流程:
1. Producer將消息發送到Broker后,Broker會采用同步或者異步的方式把消息寫入到CommitLog。RocketMQ所有的消息都會存放在CommitLog中,為了保證消息存儲不發生混亂,對CommitLog寫之前會加鎖,同時也可以使得消息能夠被順序寫入到CommitLog,只要消息被持久化到磁盤文件CommitLog,那么就可以保證Producer發送的消息不會丟失。
2. commitLog持久化后,會把里面的消息Dispatch到對應的Consume Queue上,Consume Queue相當於kafka中的partition,是一個邏輯隊列,存儲了這個Queue在CommiLog中的起始offset,log大小和MessageTag的hashCode。
3. 當消費者進行消息消費時,會先讀取consumerQueue , 邏輯消費隊列ConsumeQueue保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量Offset,消息大小、和消息Tag的HashCode值
4. 直接從consumequeue中讀取消息是沒有數據的,真正的消息主體在commitlog中,所以還需要從commitlog中讀取消息
什么時候清理物理消息文件?那消息文件到底刪不刪,什么時候刪?
消息存儲在CommitLog之后,的確是會被清理的,但是這個清理只會在以下任一條件成立才會批量刪除消息文件(CommitLog):
- 消息文件過期(默認48小時),且到達清理時點(默認是凌晨4點),刪除過期文件。
- 消息文件過期(默認48小時),且磁盤空間達到了水位線(默認75%),刪除過期文件。
- 磁盤已經達到必須釋放的上限(85%水位線)的時候,則開始批量清理文件(無論是否過期),直到空間充足。
注:若磁盤空間達到危險水位線(默認90%),出於保護自身的目的,broker會拒絕寫入服務。