RocketMQ(4.8.0)——事務消息機制


RocketMQ——事務消息機制

一、事務消息概述

  2018 年 07 月 24 日,RocketMQ 社區發布 4.3.0 版本,開始正式支持事務消息。

  事務消息的實現方案目前分為2種:

    • 兩階段提交方案
    • 三階段提交方案

  RocketMQ 采取了兩階段提交的方案進行實現。

  我們在說到事務時,通常會想到關系型數據庫的事務,支持 ACID 四個特性。

    • A,Atomicity,原子性。操作是一個不可分割的整體,要么都執行成功,要么都執行失敗。
    • C,Consistency,一致性。事務操作前后,數據必須是一致性的。
    • I,Isolation,隔離性。多個事務同時執行時,不能互相干擾。
    • D,Durability,持久性。一旦事務被提交,數據改變就是永久的,即使數據庫宕機等都不會改變。

  分布式事務是指在多個系統或多個數據庫中的多個操作要么全部成功,要么全部失敗,並且需要滿足 ACID 四個特性。

二、事務消息機制

  我們將事務消息的發送和處理總結為 4 個過程:

    • 生產者發送事務消息
    • 執行本地事務
    • Broker回查事務消息
    • Broker提交或者回滾事務消息

  2.1 生產者發送事務消息和執行本地事務

  事務消息的發送過程分為兩個階段:

    • 第一階段,發送事務消息
    • 第二階段,發送 endTransaction 消息

  Broker 發送事務消息的過程,如下圖所示:

  事務發送過程的實現類 D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\producer\TransactionMQProducer.java,該類繼承於 DefaultMQProducer,不僅能發送事務消息,還能發送其他消息。TransactionMQProducer的核心屬性和方法如下:

  transactionListener:事務監聽器,主要功能是執行本地事務和執行事務回查。事務監聽器包含 executeLocalTransaction() 和 checkLocalTransaction() 兩個方法。executeLocalTransaction()方法執行本地事務,checkLocalTransaction()方法是當生產者由於各種問題導致未發送 Commit 或 Rollback 消息給 Broker 時,Broker 回調生產者查詢本地事務狀態的處理方法。

  executorService:Broker 回查請求處理的線程池。

  start():事務消息生產者的啟動方法,與普通啟動方法不同,增加了 this.defaultMQProducerImpl.initTransactionEnv() 的調用,即增加了初始化事務消息的環境信息。代碼路徑:D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\impl\producer\DefaultMQProducerImpl.java,具體代碼如下:

 1     public void initTransactionEnv() {
 2         TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
 3         if (producer.getExecutorService() != null) {
 4             this.checkExecutor = producer.getExecutorService();
 5         } else {
 6             this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
 7             this.checkExecutor = new ThreadPoolExecutor(
 8                 producer.getCheckThreadPoolMinSize(),
 9                 producer.getCheckThreadPoolMaxSize(),
10                 1000 * 60,
11                 TimeUnit.MILLISECONDS,
12                 this.checkRequestQueue);
13         }
14     }

  從上面看,事務消息的環境初始化主要用於初始化 Broker 回查請求處理的線程池,在初始化事務消息生產者時,我們可以指定初始化對象,如果不指定初始化對象,那么這里會初始化一個單線程的線程池。

  shutdown():關閉生產者,回收生產者資源。該方法是啟動方法的逆過程,功能是關閉生產者、銷毀事務環境。銷毀事務環境是指銷毀事務回查線程池,清除回查任務隊列。

生產者發送事務消息,主要分為以下兩個階段:

(1)發送 Half 消息的過程。

    • 第一步:數據校驗。
    • 第二步:消息預處理。
    • 第三步:發送事務消息。

(2)發送 Commit 或 Rollback 消息。

  當前 Half 消息發送完成后,會返回生產者消息發送到哪個 Broker、消息位點是多少,再根據本地事務的執行結果封裝 EndTransactionReuqestHeader 對象,最后調用 MQClientAPIImpl.endTransactionOneway()方法通知 Broker 進行 Commit 或 Rollback。

  2.2 Broker 存儲事務消息

  事務消息的初始化是通過 D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\BrokerController.java 中 initialTransaction() 方法執行的,事務消息處理的初始化代碼如下:

 1     private void initialTransaction() {
 2         this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
 3         if (null == this.transactionalMessageService) {
 4             this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
 5             log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
 6         }
 7         this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
 8         if (null == this.transactionalMessageCheckListener) {
 9             this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
10             log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
11         }
12         this.transactionalMessageCheckListener.setBrokerController(this);
13         this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
14     }

  (1) transactionalMessageService

事務消息主要用於處理服務,默認實現類是 TransactionalMessageServiceImpl。如果想自定義事務消息處理實現類,需要實現 TransactionalMessageService 接口,然后通過 ServiceProvider.loadClass() 方法進行加載。TransactionalMessageService 接入是如何定義事務的基本操作的呢?代碼路徑:D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\transaction\TransactionalMessageService.java,具體代碼如下:

 1 /*
 2  * Licensed to the Apache Software Foundation (ASF) under one or more
 3  * contributor license agreements.  See the NOTICE file distributed with
 4  * this work for additional information regarding copyright ownership.
 5  * The ASF licenses this file to You under the Apache License, Version 2.0
 6  * (the "License"); you may not use this file except in compliance with
 7  * the License.  You may obtain a copy of the License at
 8  *
 9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package org.apache.rocketmq.broker.transaction;
18 
19 import org.apache.rocketmq.common.message.MessageExt;
20 import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
21 import org.apache.rocketmq.store.MessageExtBrokerInner;
22 import org.apache.rocketmq.store.PutMessageResult;
23 import java.util.concurrent.CompletableFuture;
24 
25 public interface TransactionalMessageService {
26 
27     /**
28      * Process prepare message, in common, we should put this message to storage service.
29      *
30      * @param messageInner Prepare(Half) message.
31      * @return Prepare message storage result.
32      */
33     PutMessageResult prepareMessage(MessageExtBrokerInner messageInner); #用於保存 Half 事務消息,用戶可以對其進行 Commit 或 Rollback。 34 
35     /**
36      * Process prepare message in async manner, we should put this message to storage service
37      *
38      * @param messageInner Prepare(Half) message.
39      * @return CompletableFuture of put result, will be completed at put success(flush and replica done)
40      */
41     CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner);
42 
43     /**
44      * Delete prepare message when this message has been committed or rolled back.
45      *
46      * @param messageExt
47      */
48     boolean deletePrepareMessage(MessageExt messageExt);   #用於刪除事務消息,一般用於 Broker 回查失敗的 Half 消息。 49 
50     /**
51      * Invoked to process commit prepare message.
52      *
53      * @param requestHeader Commit message request header.
54      * @return Operate result contains prepare message and relative error code.
55      */
56     OperationResult commitMessage(EndTransactionRequestHeader requestHeader); #用於提交事務消息,使消費者可以正常地消費事務消息。 57 
58     /**
59      * Invoked to roll back prepare message.
60      *
61      * @param requestHeader Prepare message request header.
62      * @return Operate result contains prepare message and relative error code.
63      */
64     OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader); #用於回滾事務消息,回滾后消費者將不能夠消費該消息。通常用於生產者主動進行 Rollback 時,以及 Broker 回查的生產者本地事務失敗時。 65 
66     /**
67      * Traverse uncommitted/unroll back half message and send check back request to producer to obtain transaction
68      * status.
69      *
70      * @param transactionTimeout The minimum time of the transactional message to be checked firstly, one message only
71      * exceed this time interval that can be checked.
72      * @param transactionCheckMax The maximum number of times the message was checked, if exceed this value, this
73      * message will be discarded.
74      * @param listener When the message is considered to be checked or discarded, the relative method of this class will
75      * be invoked.
76      */
77     void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener);
78 
79     /**
80      * Open transaction service.
81      *
82      * @return If open success, return true.
83      */
84     boolean open();  #用於打開事務服務 85 
86     /**
87      * Close transaction service.
88      */
89     void close();   #用於關閉事務服務 90 }

  (2) transactionalMessageService

  事務消息回查監聽器,默認實現類是 DefaultTransactionalMessageCheckListener。如果想自定義回查監聽處理,需要繼承 AbstractTransactionalMessageCheckListener 接口,然后通過 ServiceProvider.loadClass()方法被加載。

  (3) transactionalMessageCheckService

  事務消息回查服務是一個線程服務,定時調用 transactionalMessageService.check() 方法,檢查超時的 Half 消息是否需要回查。

上面三個事務處理完成初始化后,Broker 就可以處理事務消息了。

  Broker 存儲事務消息和普通消息都是通過 org.apache.rocketmq.broker.processor.SendMessageProcessor 類進行處理的,只是在存儲消息時有兩處事務消息需要單獨處理。

第一個單獨處理:判斷是否是事務消息,處理方法的代碼路徑:D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\processor\SendMessageProcessor.java 中 sendMessage(),具體代碼如下:

 1     private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
 2                                         final RemotingCommand request,
 3                                         final SendMessageContext sendMessageContext,
 4                                         final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
 5 
 6         final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
 7         final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
 8 
 9         response.setOpaque(request.getOpaque());
10 
11         response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
12         response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
13 
14         log.debug("receive SendMessage request command, {}", request);
15 
16         final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
17         if (this.brokerController.getMessageStore().now() < startTimstamp) {
18             response.setCode(ResponseCode.SYSTEM_ERROR);
19             response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
20             return response;
21         }
22 
23         response.setCode(-1);
24         super.msgCheck(ctx, requestHeader, response);
25         if (response.getCode() != -1) {
26             return response;
27         }
28 
29         final byte[] body = request.getBody();
30 
31         int queueIdInt = requestHeader.getQueueId();
32         TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
33 
34         if (queueIdInt < 0) {
35             queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
36         }
37 
38         MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
39         msgInner.setTopic(requestHeader.getTopic());
40         msgInner.setQueueId(queueIdInt);
41 
42         if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
43             return response;
44         }
45 
46         msgInner.setBody(body);
47         msgInner.setFlag(requestHeader.getFlag());
48         MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
49         msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
50         msgInner.setBornHost(ctx.channel().remoteAddress());
51         msgInner.setStoreHost(this.getStoreHost());
52         msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
53         String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
54         MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
55         msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
56         PutMessageResult putMessageResult = null;
57         Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
58         String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); #如果該值為 True 則當前消息是事務消息。 59         if (traFlag != null && Boolean.parseBoolean(traFlag)
60             && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
61             if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { #判斷當前 Broker 師傅支持事務消息。 62                 response.setCode(ResponseCode.NO_PERMISSION);
63                 response.setRemark(
64                     "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
65                         + "] sending transaction message is forbidden");
66                 return response;
67             }
68             putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); #調用TransactionalMessageService.prepareMessage()方法保存 Half 消息。 69         } else {
70             putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
71         }
72 
73         return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
74 
75     }

第二個單獨處理:存儲前事務消息預處理,代碼路徑:D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\transaction\queue\TransactionalMessageBridge.java,執行 parseHalfMessageInner() 方法,具體代碼如下:

 1     private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
 2         MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
 3         MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
 4             String.valueOf(msgInner.getQueueId()));
 5         msgInner.setSysFlag(
 6             MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
 7         msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
 8         msgInner.setQueueId(0);
 9         msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
10         return msgInner;
11     }

  以上代碼的功能是將原消息的 Topic、queueId、sysFlg 存儲在消息的擴展字段中,並且修改 Topic 的值為 RMQ_SYS_TRANS_HALF_TOPIC,修改 queueId 的值為 0。然后,與其他消息一樣,調用 DefaultMessageStore.putMessage()方法保存到 CommitLog 中。

  CommitLog 存儲成功后,通過 org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback.doAppend() 方法單獨對事務消息進行處理,代碼路徑:D:\rocketmq-master\store\src\main\java\org\apache\rocketmq\store\CommitLog.java,具體代碼如下:

  1         public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
  2             final MessageExtBrokerInner msgInner) {
  3             // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
  4 
  5             // PHY OFFSET
  6             long wroteOffset = fileFromOffset + byteBuffer.position();
  7 
  8             int sysflag = msgInner.getSysFlag();
  9 
 10             int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
 11             int storeHostLength = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
 12             ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
 13             ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
 14 
 15             this.resetByteBuffer(storeHostHolder, storeHostLength);
 16             String msgId;
 17             if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
 18                 msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
 19             } else {
 20                 msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
 21             }
 22 
 23             // Record ConsumeQueue information
 24             keyBuilder.setLength(0);
 25             keyBuilder.append(msgInner.getTopic());
 26             keyBuilder.append('-');
 27             keyBuilder.append(msgInner.getQueueId());
 28             String key = keyBuilder.toString();
 29             Long queueOffset = CommitLog.this.topicQueueTable.get(key);
 30             if (null == queueOffset) {
 31                 queueOffset = 0L;
 32                 CommitLog.this.topicQueueTable.put(key, queueOffset);
 33             }
 34 
 35             // Transaction messages that require special handling
 36             final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
 37             switch (tranType) {
 38                 // Prepared and Rollback message is not consumed, will not enter the
 39                 // consumer queuec
 40                 case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
 41                 case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
 42                     queueOffset = 0L;
 43                     break;
 44                 case MessageSysFlag.TRANSACTION_NOT_TYPE:
 45                 case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
 46                 default:
 47                     break;
 48             }
 49 
 50             /**
 51              * Serialize message
 52              */
 53             final byte[] propertiesData =
 54                 msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
 55 
 56             final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
 57 
 58             if (propertiesLength > Short.MAX_VALUE) {
 59                 log.warn("putMessage message properties length too long. length={}", propertiesData.length);
 60                 return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
 61             }
 62 
 63             final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
 64             final int topicLength = topicData.length;
 65 
 66             final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
 67 
 68             final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
 69 
 70             // Exceeds the maximum message
 71             if (msgLen > this.maxMessageSize) {
 72                 CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
 73                     + ", maxMessageSize: " + this.maxMessageSize);
 74                 return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
 75             }
 76 
 77             // Determines whether there is sufficient free space
 78             if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
 79                 this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
 80                 // 1 TOTALSIZE
 81                 this.msgStoreItemMemory.putInt(maxBlank);
 82                 // 2 MAGICCODE
 83                 this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
 84                 // 3 The remaining space may be any value
 85                 // Here the length of the specially set maxBlank
 86                 final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
 87                 byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
 88                 return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
 89                     queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
 90             }
 91 
 92             // Initialization of storage space
 93             this.resetByteBuffer(msgStoreItemMemory, msgLen);
 94             // 1 TOTALSIZE
 95             this.msgStoreItemMemory.putInt(msgLen);
 96             // 2 MAGICCODE
 97             this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
 98             // 3 BODYCRC
 99             this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
100             // 4 QUEUEID
101             this.msgStoreItemMemory.putInt(msgInner.getQueueId());
102             // 5 FLAG
103             this.msgStoreItemMemory.putInt(msgInner.getFlag());
104             // 6 QUEUEOFFSET
105             this.msgStoreItemMemory.putLong(queueOffset);
106             // 7 PHYSICALOFFSET
107             this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
108             // 8 SYSFLAG
109             this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
110             // 9 BORNTIMESTAMP
111             this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
112             // 10 BORNHOST
113             this.resetByteBuffer(bornHostHolder, bornHostLength);
114             this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
115             // 11 STORETIMESTAMP
116             this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
117             // 12 STOREHOSTADDRESS
118             this.resetByteBuffer(storeHostHolder, storeHostLength);
119             this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
120             // 13 RECONSUMETIMES
121             this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
122             // 14 Prepared Transaction Offset
123             this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
124             // 15 BODY
125             this.msgStoreItemMemory.putInt(bodyLength);
126             if (bodyLength > 0)
127                 this.msgStoreItemMemory.put(msgInner.getBody());
128             // 16 TOPIC
129             this.msgStoreItemMemory.put((byte) topicLength);
130             this.msgStoreItemMemory.put(topicData);
131             // 17 PROPERTIES
132             this.msgStoreItemMemory.putShort((short) propertiesLength);
133             if (propertiesLength > 0)
134                 this.msgStoreItemMemory.put(propertiesData);
135 
136             final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
137             // Write messages to the queue buffer
138             byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
139 
140             AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
141                 msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
142 
143             switch (tranType) {
144                 case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
145                 case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
146                     break;
147                 case MessageSysFlag.TRANSACTION_NOT_TYPE:
148                 case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
149                     // The next update ConsumeQueue information
150                     CommitLog.this.topicQueueTable.put(key, ++queueOffset);
151                     break;
152                 default:
153                     break;
154             }
155             return result;
156         }

  Prepared 消息其實就是 Half 消息,其實現邏輯是,設置當前 Half 消息的 queueOffset 值為 0,而不是其真實的位點值。這樣,該位點就不會建立 Consume Queue 索引,自然也不能被消費者消費。

  2.3 Broker 回查事務消息

  如果用於由於某種原因,在第二階段中沒有將 endTransaction 消息發送給 Broker,那么 Broker 的 Half 消息要怎么處理呢?

  RocketMQ 在設計時已經考慮到這個問題,通過“回查機制”處理第二階段既未發送 Commit 也沒有發送 Rollback 的消息。回查是 Broker 發起的,Broker 認為在接收 Half 消息后的一段時間內,如果生產者都沒有發送 Commit 或 Rollback 消息給 Broker,那么 Broker 會主動 "詢問"生產者該事務消息對應的本地事務執行結果,以此來決定事務是否要 Commit。

  2.4 Broker 提交或回滾事務消息

  當生產者本地事務處理完成並且 Broker 回查事務消息后,不管執行 Commit 還是 Rollback,都會根據用戶本地事務的執行結果發送一個 end_transaction 的 RPC 請求給 Broker,Broker 端處理該請求的類是 D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\processor\EndTransactionProcessor.java,其核心處理步驟如下:

  第一步:end_transaction 請求校驗。主要檢查項如下:

    • Broker 角色檢查。Slave Broker 不處理事務消息。
    • 事務消息類型檢查。EndTransactionProcessor 只處理 Commit 或 Rollback 類型的事務消息,其余消息都不處理。

  第二步:進行 Commit 或 Rollback。根據生產者請求頭中的參數判斷,是 Commit 請求還是 Rollback 請求,然后分別進行處理。 1 if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { 2 result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); #提交 Half 消息。

 3 if (result.getResponseCode() == ResponseCode.SUCCESS) {  4 RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); #Half消息數據校驗。校驗內容包含發送消息的生產者組與當前執行 Commit/Rollack 的生產者是否一致,當前Half消息是否與請求Commit/Rollback的消息是同一條消息。  5 if (res.getCode() == ResponseCode.SUCCESS) {  6 MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); #消息對象類型轉化,將 MessageExt 對象轉化為 MessageExtBrokerInner 對象,並且還原消息之前的 Topic 和 Consume Queue 等信息。  7  msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));  8  msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());  9  msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); 10  msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp()); 11  MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED); 12 RemotingCommand sendResult = sendFinalMessage(msgInner); #將還原后的事務消息最終發送到 CommitLog 中。一旦發送成功,消費者就可以正常拉取消息並消費。 13 if (sendResult.getCode() == ResponseCode.SUCCESS) { 14 this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); #在 sendFinalMessage() 執行成功后,刪除 Half 消息。其實 RocketMQ 是不能真正刪除消息的,其實質是順序寫磁盤,
                相當於做了一個“假刪除”。“假刪除”通過 putOpMessage() 方法將消息保存到 TransactionalMessageUtil.buildOpTopic()的 Topic 中,並且做上標記 TransactionalMessageUtil.REMOVETAG,表示消息已刪除。 15 } 16 return sendResult; 17 } 18 return res; 19 } 20 } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { 21 result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader); 22 if (result.getResponseCode() == ResponseCode.SUCCESS) { 23 RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); 24 if (res.getCode() == ResponseCode.SUCCESS) { 25 this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); 26 } 27 return res; 28 } 29 }

   保存 OP 消息,代碼路徑:D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\transaction\queue\TransactionalMessageBridge.java,具體代碼如下:

1     public boolean putOpMessage(MessageExt messageExt, String opType) {
2         MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(),
3             this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId());
4         if (TransactionalMessageUtil.REMOVETAG.equals(opType)) {
5             return addRemoveTagInTransactionOp(messageExt, messageQueue);
6         }
7         return true;
8     }

  如果消息被標記為已刪除,則調用 addRemoveTagInTransactionOp() 方法,利用標記為已刪除的 OP 消息構造 Message 消息對象,並且調用存儲方法保存消息。代碼路徑:D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\transaction\queue\TransactionalMessageBridge.java,具體代碼如下:

1     private boolean addRemoveTagInTransactionOp(MessageExt messageExt, MessageQueue messageQueue) {
2         Message message = new Message(TransactionalMessageUtil.buildOpTopic(), TransactionalMessageUtil.REMOVETAG,
3             String.valueOf(messageExt.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));
4         writeOp(message, messageQueue);
5         return true;
6     }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM