RocketMQ——事務消息機制
一、事務消息概述
2018 年 07 月 24 日,RocketMQ 社區發布 4.3.0 版本,開始正式支持事務消息。
事務消息的實現方案目前分為2種:
-
- 兩階段提交方案
- 三階段提交方案
RocketMQ 采取了兩階段提交的方案進行實現。
我們在說到事務時,通常會想到關系型數據庫的事務,支持 ACID 四個特性。
-
- A,Atomicity,原子性。操作是一個不可分割的整體,要么都執行成功,要么都執行失敗。
- C,Consistency,一致性。事務操作前后,數據必須是一致性的。
- I,Isolation,隔離性。多個事務同時執行時,不能互相干擾。
- D,Durability,持久性。一旦事務被提交,數據改變就是永久的,即使數據庫宕機等都不會改變。
分布式事務是指在多個系統或多個數據庫中的多個操作要么全部成功,要么全部失敗,並且需要滿足 ACID 四個特性。
二、事務消息機制
我們將事務消息的發送和處理總結為 4 個過程:
-
- 生產者發送事務消息
- 執行本地事務
- Broker回查事務消息
- Broker提交或者回滾事務消息
2.1 生產者發送事務消息和執行本地事務
事務消息的發送過程分為兩個階段:
-
- 第一階段,發送事務消息
- 第二階段,發送 endTransaction 消息
Broker 發送事務消息的過程,如下圖所示:
事務發送過程的實現類 D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\producer\TransactionMQProducer.java,該類繼承於 DefaultMQProducer,不僅能發送事務消息,還能發送其他消息。TransactionMQProducer的核心屬性和方法如下:
transactionListener:事務監聽器,主要功能是執行本地事務和執行事務回查。事務監聽器包含 executeLocalTransaction() 和 checkLocalTransaction() 兩個方法。executeLocalTransaction()方法執行本地事務,checkLocalTransaction()方法是當生產者由於各種問題導致未發送 Commit 或 Rollback 消息給 Broker 時,Broker 回調生產者查詢本地事務狀態的處理方法。
executorService:Broker 回查請求處理的線程池。
start():事務消息生產者的啟動方法,與普通啟動方法不同,增加了 this.defaultMQProducerImpl.initTransactionEnv() 的調用,即增加了初始化事務消息的環境信息。代碼路徑:D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\impl\producer\DefaultMQProducerImpl.java,具體代碼如下:
1 public void initTransactionEnv() { 2 TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer; 3 if (producer.getExecutorService() != null) { 4 this.checkExecutor = producer.getExecutorService(); 5 } else { 6 this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax()); 7 this.checkExecutor = new ThreadPoolExecutor( 8 producer.getCheckThreadPoolMinSize(), 9 producer.getCheckThreadPoolMaxSize(), 10 1000 * 60, 11 TimeUnit.MILLISECONDS, 12 this.checkRequestQueue); 13 } 14 }
從上面看,事務消息的環境初始化主要用於初始化 Broker 回查請求處理的線程池,在初始化事務消息生產者時,我們可以指定初始化對象,如果不指定初始化對象,那么這里會初始化一個單線程的線程池。
shutdown():關閉生產者,回收生產者資源。該方法是啟動方法的逆過程,功能是關閉生產者、銷毀事務環境。銷毀事務環境是指銷毀事務回查線程池,清除回查任務隊列。
生產者發送事務消息,主要分為以下兩個階段:
(1)發送 Half 消息的過程。
-
- 第一步:數據校驗。
- 第二步:消息預處理。
- 第三步:發送事務消息。
(2)發送 Commit 或 Rollback 消息。
當前 Half 消息發送完成后,會返回生產者消息發送到哪個 Broker、消息位點是多少,再根據本地事務的執行結果封裝 EndTransactionReuqestHeader 對象,最后調用 MQClientAPIImpl.endTransactionOneway()方法通知 Broker 進行 Commit 或 Rollback。
2.2 Broker 存儲事務消息
事務消息的初始化是通過 D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\BrokerController.java 中 initialTransaction() 方法執行的,事務消息處理的初始化代碼如下:
1 private void initialTransaction() { 2 this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class); 3 if (null == this.transactionalMessageService) { 4 this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore())); 5 log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName()); 6 } 7 this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class); 8 if (null == this.transactionalMessageCheckListener) { 9 this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener(); 10 log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName()); 11 } 12 this.transactionalMessageCheckListener.setBrokerController(this); 13 this.transactionalMessageCheckService = new TransactionalMessageCheckService(this); 14 }
(1) transactionalMessageService
事務消息主要用於處理服務,默認實現類是 TransactionalMessageServiceImpl。如果想自定義事務消息處理實現類,需要實現 TransactionalMessageService 接口,然后通過 ServiceProvider.loadClass() 方法進行加載。TransactionalMessageService 接入是如何定義事務的基本操作的呢?代碼路徑:D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\transaction\TransactionalMessageService.java,具體代碼如下:
1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.rocketmq.broker.transaction; 18 19 import org.apache.rocketmq.common.message.MessageExt; 20 import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; 21 import org.apache.rocketmq.store.MessageExtBrokerInner; 22 import org.apache.rocketmq.store.PutMessageResult; 23 import java.util.concurrent.CompletableFuture; 24 25 public interface TransactionalMessageService { 26 27 /** 28 * Process prepare message, in common, we should put this message to storage service. 29 * 30 * @param messageInner Prepare(Half) message. 31 * @return Prepare message storage result. 32 */ 33 PutMessageResult prepareMessage(MessageExtBrokerInner messageInner); #用於保存 Half 事務消息,用戶可以對其進行 Commit 或 Rollback。 34 35 /** 36 * Process prepare message in async manner, we should put this message to storage service 37 * 38 * @param messageInner Prepare(Half) message. 39 * @return CompletableFuture of put result, will be completed at put success(flush and replica done) 40 */ 41 CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner); 42 43 /** 44 * Delete prepare message when this message has been committed or rolled back. 45 * 46 * @param messageExt 47 */ 48 boolean deletePrepareMessage(MessageExt messageExt); #用於刪除事務消息,一般用於 Broker 回查失敗的 Half 消息。 49 50 /** 51 * Invoked to process commit prepare message. 52 * 53 * @param requestHeader Commit message request header. 54 * @return Operate result contains prepare message and relative error code. 55 */ 56 OperationResult commitMessage(EndTransactionRequestHeader requestHeader); #用於提交事務消息,使消費者可以正常地消費事務消息。 57 58 /** 59 * Invoked to roll back prepare message. 60 * 61 * @param requestHeader Prepare message request header. 62 * @return Operate result contains prepare message and relative error code. 63 */ 64 OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader); #用於回滾事務消息,回滾后消費者將不能夠消費該消息。通常用於生產者主動進行 Rollback 時,以及 Broker 回查的生產者本地事務失敗時。 65 66 /** 67 * Traverse uncommitted/unroll back half message and send check back request to producer to obtain transaction 68 * status. 69 * 70 * @param transactionTimeout The minimum time of the transactional message to be checked firstly, one message only 71 * exceed this time interval that can be checked. 72 * @param transactionCheckMax The maximum number of times the message was checked, if exceed this value, this 73 * message will be discarded. 74 * @param listener When the message is considered to be checked or discarded, the relative method of this class will 75 * be invoked. 76 */ 77 void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener); 78 79 /** 80 * Open transaction service. 81 * 82 * @return If open success, return true. 83 */ 84 boolean open(); #用於打開事務服務 85 86 /** 87 * Close transaction service. 88 */ 89 void close(); #用於關閉事務服務 90 }
(2) transactionalMessageService
事務消息回查監聽器,默認實現類是 DefaultTransactionalMessageCheckListener。如果想自定義回查監聽處理,需要繼承 AbstractTransactionalMessageCheckListener 接口,然后通過 ServiceProvider.loadClass()方法被加載。
(3) transactionalMessageCheckService
事務消息回查服務是一個線程服務,定時調用 transactionalMessageService.check() 方法,檢查超時的 Half 消息是否需要回查。
上面三個事務處理完成初始化后,Broker 就可以處理事務消息了。
Broker 存儲事務消息和普通消息都是通過 org.apache.rocketmq.broker.processor.SendMessageProcessor 類進行處理的,只是在存儲消息時有兩處事務消息需要單獨處理。
第一個單獨處理:判斷是否是事務消息,處理方法的代碼路徑:D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\processor\SendMessageProcessor.java 中 sendMessage(),具體代碼如下:
1 private RemotingCommand sendMessage(final ChannelHandlerContext ctx, 2 final RemotingCommand request, 3 final SendMessageContext sendMessageContext, 4 final SendMessageRequestHeader requestHeader) throws RemotingCommandException { 5 6 final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); 7 final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); 8 9 response.setOpaque(request.getOpaque()); 10 11 response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); 12 response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); 13 14 log.debug("receive SendMessage request command, {}", request); 15 16 final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); 17 if (this.brokerController.getMessageStore().now() < startTimstamp) { 18 response.setCode(ResponseCode.SYSTEM_ERROR); 19 response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); 20 return response; 21 } 22 23 response.setCode(-1); 24 super.msgCheck(ctx, requestHeader, response); 25 if (response.getCode() != -1) { 26 return response; 27 } 28 29 final byte[] body = request.getBody(); 30 31 int queueIdInt = requestHeader.getQueueId(); 32 TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); 33 34 if (queueIdInt < 0) { 35 queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); 36 } 37 38 MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); 39 msgInner.setTopic(requestHeader.getTopic()); 40 msgInner.setQueueId(queueIdInt); 41 42 if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) { 43 return response; 44 } 45 46 msgInner.setBody(body); 47 msgInner.setFlag(requestHeader.getFlag()); 48 MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); 49 msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); 50 msgInner.setBornHost(ctx.channel().remoteAddress()); 51 msgInner.setStoreHost(this.getStoreHost()); 52 msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); 53 String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName(); 54 MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName); 55 msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); 56 PutMessageResult putMessageResult = null; 57 Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); 58 String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); #如果該值為 True 則當前消息是事務消息。 59 if (traFlag != null && Boolean.parseBoolean(traFlag) 60 && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1 61 if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { #判斷當前 Broker 師傅支持事務消息。 62 response.setCode(ResponseCode.NO_PERMISSION); 63 response.setRemark( 64 "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() 65 + "] sending transaction message is forbidden"); 66 return response; 67 } 68 putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); #調用TransactionalMessageService.prepareMessage()方法保存 Half 消息。 69 } else { 70 putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); 71 } 72 73 return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt); 74 75 }
第二個單獨處理:存儲前事務消息預處理,代碼路徑:D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\transaction\queue\TransactionalMessageBridge.java,執行 parseHalfMessageInner() 方法,具體代碼如下:
1 private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { 2 MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); 3 MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, 4 String.valueOf(msgInner.getQueueId())); 5 msgInner.setSysFlag( 6 MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); 7 msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); 8 msgInner.setQueueId(0); 9 msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); 10 return msgInner; 11 }
以上代碼的功能是將原消息的 Topic、queueId、sysFlg 存儲在消息的擴展字段中,並且修改 Topic 的值為 RMQ_SYS_TRANS_HALF_TOPIC,修改 queueId 的值為 0。然后,與其他消息一樣,調用 DefaultMessageStore.putMessage()方法保存到 CommitLog 中。
CommitLog 存儲成功后,通過 org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback.doAppend() 方法單獨對事務消息進行處理,代碼路徑:D:\rocketmq-master\store\src\main\java\org\apache\rocketmq\store\CommitLog.java,具體代碼如下:
1 public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, 2 final MessageExtBrokerInner msgInner) { 3 // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br> 4 5 // PHY OFFSET 6 long wroteOffset = fileFromOffset + byteBuffer.position(); 7 8 int sysflag = msgInner.getSysFlag(); 9 10 int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; 11 int storeHostLength = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; 12 ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength); 13 ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength); 14 15 this.resetByteBuffer(storeHostHolder, storeHostLength); 16 String msgId; 17 if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) { 18 msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset); 19 } else { 20 msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset); 21 } 22 23 // Record ConsumeQueue information 24 keyBuilder.setLength(0); 25 keyBuilder.append(msgInner.getTopic()); 26 keyBuilder.append('-'); 27 keyBuilder.append(msgInner.getQueueId()); 28 String key = keyBuilder.toString(); 29 Long queueOffset = CommitLog.this.topicQueueTable.get(key); 30 if (null == queueOffset) { 31 queueOffset = 0L; 32 CommitLog.this.topicQueueTable.put(key, queueOffset); 33 } 34 35 // Transaction messages that require special handling 36 final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); 37 switch (tranType) { 38 // Prepared and Rollback message is not consumed, will not enter the 39 // consumer queuec 40 case MessageSysFlag.TRANSACTION_PREPARED_TYPE: 41 case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: 42 queueOffset = 0L; 43 break; 44 case MessageSysFlag.TRANSACTION_NOT_TYPE: 45 case MessageSysFlag.TRANSACTION_COMMIT_TYPE: 46 default: 47 break; 48 } 49 50 /** 51 * Serialize message 52 */ 53 final byte[] propertiesData = 54 msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); 55 56 final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; 57 58 if (propertiesLength > Short.MAX_VALUE) { 59 log.warn("putMessage message properties length too long. length={}", propertiesData.length); 60 return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); 61 } 62 63 final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); 64 final int topicLength = topicData.length; 65 66 final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; 67 68 final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength); 69 70 // Exceeds the maximum message 71 if (msgLen > this.maxMessageSize) { 72 CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength 73 + ", maxMessageSize: " + this.maxMessageSize); 74 return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); 75 } 76 77 // Determines whether there is sufficient free space 78 if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { 79 this.resetByteBuffer(this.msgStoreItemMemory, maxBlank); 80 // 1 TOTALSIZE 81 this.msgStoreItemMemory.putInt(maxBlank); 82 // 2 MAGICCODE 83 this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); 84 // 3 The remaining space may be any value 85 // Here the length of the specially set maxBlank 86 final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); 87 byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank); 88 return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(), 89 queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); 90 } 91 92 // Initialization of storage space 93 this.resetByteBuffer(msgStoreItemMemory, msgLen); 94 // 1 TOTALSIZE 95 this.msgStoreItemMemory.putInt(msgLen); 96 // 2 MAGICCODE 97 this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); 98 // 3 BODYCRC 99 this.msgStoreItemMemory.putInt(msgInner.getBodyCRC()); 100 // 4 QUEUEID 101 this.msgStoreItemMemory.putInt(msgInner.getQueueId()); 102 // 5 FLAG 103 this.msgStoreItemMemory.putInt(msgInner.getFlag()); 104 // 6 QUEUEOFFSET 105 this.msgStoreItemMemory.putLong(queueOffset); 106 // 7 PHYSICALOFFSET 107 this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position()); 108 // 8 SYSFLAG 109 this.msgStoreItemMemory.putInt(msgInner.getSysFlag()); 110 // 9 BORNTIMESTAMP 111 this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); 112 // 10 BORNHOST 113 this.resetByteBuffer(bornHostHolder, bornHostLength); 114 this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder)); 115 // 11 STORETIMESTAMP 116 this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); 117 // 12 STOREHOSTADDRESS 118 this.resetByteBuffer(storeHostHolder, storeHostLength); 119 this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder)); 120 // 13 RECONSUMETIMES 121 this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); 122 // 14 Prepared Transaction Offset 123 this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); 124 // 15 BODY 125 this.msgStoreItemMemory.putInt(bodyLength); 126 if (bodyLength > 0) 127 this.msgStoreItemMemory.put(msgInner.getBody()); 128 // 16 TOPIC 129 this.msgStoreItemMemory.put((byte) topicLength); 130 this.msgStoreItemMemory.put(topicData); 131 // 17 PROPERTIES 132 this.msgStoreItemMemory.putShort((short) propertiesLength); 133 if (propertiesLength > 0) 134 this.msgStoreItemMemory.put(propertiesData); 135 136 final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); 137 // Write messages to the queue buffer 138 byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen); 139 140 AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId, 141 msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); 142 143 switch (tranType) { 144 case MessageSysFlag.TRANSACTION_PREPARED_TYPE: 145 case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: 146 break; 147 case MessageSysFlag.TRANSACTION_NOT_TYPE: 148 case MessageSysFlag.TRANSACTION_COMMIT_TYPE: 149 // The next update ConsumeQueue information 150 CommitLog.this.topicQueueTable.put(key, ++queueOffset); 151 break; 152 default: 153 break; 154 } 155 return result; 156 }
Prepared 消息其實就是 Half 消息,其實現邏輯是,設置當前 Half 消息的 queueOffset 值為 0,而不是其真實的位點值。這樣,該位點就不會建立 Consume Queue 索引,自然也不能被消費者消費。
2.3 Broker 回查事務消息
如果用於由於某種原因,在第二階段中沒有將 endTransaction 消息發送給 Broker,那么 Broker 的 Half 消息要怎么處理呢?
RocketMQ 在設計時已經考慮到這個問題,通過“回查機制”處理第二階段既未發送 Commit 也沒有發送 Rollback 的消息。回查是 Broker 發起的,Broker 認為在接收 Half 消息后的一段時間內,如果生產者都沒有發送 Commit 或 Rollback 消息給 Broker,那么 Broker 會主動 "詢問"生產者該事務消息對應的本地事務執行結果,以此來決定事務是否要 Commit。
2.4 Broker 提交或回滾事務消息
當生產者本地事務處理完成並且 Broker 回查事務消息后,不管執行 Commit 還是 Rollback,都會根據用戶本地事務的執行結果發送一個 end_transaction 的 RPC 請求給 Broker,Broker 端處理該請求的類是 D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\processor\EndTransactionProcessor.java,其核心處理步驟如下:
第一步:end_transaction 請求校驗。主要檢查項如下:
-
- Broker 角色檢查。Slave Broker 不處理事務消息。
- 事務消息類型檢查。EndTransactionProcessor 只處理 Commit 或 Rollback 類型的事務消息,其余消息都不處理。
第二步:進行 Commit 或 Rollback。根據生產者請求頭中的參數判斷,是 Commit 請求還是 Rollback 請求,然后分別進行處理。 1 if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { 2 result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); #提交 Half 消息。
3 if (result.getResponseCode() == ResponseCode.SUCCESS) { 4 RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); #Half消息數據校驗。校驗內容包含發送消息的生產者組與當前執行 Commit/Rollack 的生產者是否一致,當前Half消息是否與請求Commit/Rollback的消息是同一條消息。 5 if (res.getCode() == ResponseCode.SUCCESS) { 6 MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); #消息對象類型轉化,將 MessageExt 對象轉化為 MessageExtBrokerInner 對象,並且還原消息之前的 Topic 和 Consume Queue 等信息。 7 msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback())); 8 msgInner.setQueueOffset(requestHeader.getTranStateTableOffset()); 9 msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); 10 msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp()); 11 MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED); 12 RemotingCommand sendResult = sendFinalMessage(msgInner); #將還原后的事務消息最終發送到 CommitLog 中。一旦發送成功,消費者就可以正常拉取消息並消費。 13 if (sendResult.getCode() == ResponseCode.SUCCESS) { 14 this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); #在 sendFinalMessage() 執行成功后,刪除 Half 消息。其實 RocketMQ 是不能真正刪除消息的,其實質是順序寫磁盤,
相當於做了一個“假刪除”。“假刪除”通過 putOpMessage() 方法將消息保存到 TransactionalMessageUtil.buildOpTopic()的 Topic 中,並且做上標記 TransactionalMessageUtil.REMOVETAG,表示消息已刪除。 15 } 16 return sendResult; 17 } 18 return res; 19 } 20 } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { 21 result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader); 22 if (result.getResponseCode() == ResponseCode.SUCCESS) { 23 RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); 24 if (res.getCode() == ResponseCode.SUCCESS) { 25 this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); 26 } 27 return res; 28 } 29 }
保存 OP 消息,代碼路徑:D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\transaction\queue\TransactionalMessageBridge.java,具體代碼如下:
1 public boolean putOpMessage(MessageExt messageExt, String opType) { 2 MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(), 3 this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId()); 4 if (TransactionalMessageUtil.REMOVETAG.equals(opType)) { 5 return addRemoveTagInTransactionOp(messageExt, messageQueue); 6 } 7 return true; 8 }
如果消息被標記為已刪除,則調用 addRemoveTagInTransactionOp() 方法,利用標記為已刪除的 OP 消息構造 Message 消息對象,並且調用存儲方法保存消息。代碼路徑:D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\transaction\queue\TransactionalMessageBridge.java,具體代碼如下:
1 private boolean addRemoveTagInTransactionOp(MessageExt messageExt, MessageQueue messageQueue) { 2 Message message = new Message(TransactionalMessageUtil.buildOpTopic(), TransactionalMessageUtil.REMOVETAG, 3 String.valueOf(messageExt.getQueueOffset()).getBytes(TransactionalMessageUtil.charset)); 4 writeOp(message, messageQueue); 5 return true; 6 }