RocketMQ源碼 — 十一、 RocketMQ事務消息


分布式事務是一個復雜的問題,rmq實現了事務的最終一致性,rmq保證本地事務成功消息一定會發送成功並被成功消費,如果本地事務失敗了,消息不會被發送。

rmq事務消息的實現過程為:

  1. producer發送half消息
  2. broker確認half消息,並通知producer,表示消息已經成功發送到broker(這個過程其實就是步驟1broker的返回)
  3. producer收到half確認消息之后,執行自己本地事務,並將事務結果(UNKNOW、commit、rollback)告訴broker(這是一個oneway消息,而且失敗不重試)
  4. broker收到producer本地事務的結果后決定是否投遞消息給consumer
  5. 鑒於producer發送本地事務結果可能失敗,broker會定時掃描集群中的事務消息,然后回查(apache4.2.0尚未實現,因為沒有調用org.apache.rocketmq.broker.client.net.Broker2Client#checkProducerTransactionState)

producer發送half消息

事務消息的發送過程和普通消息發送過程是不一樣的,發送消息的方法是org.apache.rocketmq.client.producer.TransactionMQProducer#sendMessageInTransaction,入參有一個LocalTransactionExecuter,需要用戶實現一個本地事務的executor,用戶可以在executor中執行事務操作

// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction
public TransactionSendResult sendMessageInTransaction(final Message msg,
    final LocalTransactionExecuter tranExecuter, final Object arg)
    throws MQClientException {
    if (null == tranExecuter) {
        throw new MQClientException("tranExecutor is null", null);
    }
    Validators.checkMessage(msg, this.defaultMQProducer);

    SendResult sendResult = null;
    // 標記消息是half消息
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
    try {
        // 發送half消息,該方法是同步發送,事務消息也必須是同步發送
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }

    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    switch (sendResult.getSendStatus()) {
        case SEND_OK: {
            // 只有在half消息發送成功的時候才會執行事務
            try {
                if (sendResult.getTransactionId() != null) {
                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                }
                // 執行本地事務
                localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
                if (null == localTransactionState) {
                    localTransactionState = LocalTransactionState.UNKNOW;
                }

                if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                    log.info("executeLocalTransactionBranch return {}", localTransactionState);
                    log.info(msg.toString());
                }
            } catch (Throwable e) {
                log.info("executeLocalTransactionBranch exception", e);
                log.info(msg.toString());
                localException = e;
            }
        }
        break;
        case FLUSH_DISK_TIMEOUT:
        case FLUSH_SLAVE_TIMEOUT:
        case SLAVE_NOT_AVAILABLE:
            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            break;
        default:
            break;
    }

    try {
        // 根據事務commit的情況來判斷下一步操作
        this.endTransaction(sendResult, localTransactionState, localException);
    } catch (Exception e) {
        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
    }

    TransactionSendResult transactionSendResult = new TransactionSendResult();
    transactionSendResult.setSendStatus(sendResult.getSendStatus());
    transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
    transactionSendResult.setMsgId(sendResult.getMsgId());
    transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
    transactionSendResult.setTransactionId(sendResult.getTransactionId());
    transactionSendResult.setLocalTransactionState(localTransactionState);
    return transactionSendResult;
}

為了保證本地事務和消息發送成功的原子性,producer會先發送一個half消息到broker

  • 只有half消息發送成功了,事務才會被執行
  • 如果half消息發送失敗了,事務不會被執行

half消息和普通的消息也不一樣,half消息發送到broker后並不會被consumer消費掉。之所以不會被消費掉的原因如下:

  • broker在將消息寫入CommitLog的時候會判斷消息類型,如果是是prepare或者rollback消息,ConsumeQueue的offset(每個消息對應ConsumeQueue中的一個數據結構(包含topic、tag的hashCode、消息對應CommitLog的物理offset),offset表示數據結構是第幾個)不會增加
  • broker在構造ConsumeQueue的時候會判斷是否是prepare或者rollback消息,如果是這兩種中的一種則不會將該消息放入ConsumeQueue,cnosumer在拉取消息的時候也就不會拉取到prepare和rollback的消息。

相關代碼如下:

// org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner)
switch (tranType) {
    case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
    case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
        break;
    case MessageSysFlag.TRANSACTION_NOT_TYPE:
    case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
        // The next update ConsumeQueue information
        CommitLog.this.topicQueueTable.put(key, ++queueOffset);
        break;
    default:
        break;
}

// org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue#dispatch

public void dispatch(DispatchRequest request) {
    final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
    switch (tranType) {
        case MessageSysFlag.TRANSACTION_NOT_TYPE:
        case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
            DefaultMessageStore.this.putMessagePositionInfo(request);
            break;
        case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
        case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
            break;
    }
}

以上兩點保證了prepare消息也就是half消息不會被消費。

producer結束事務

producer根據half消息發送結果和事務執行結果來處理事務——commit或者rollback。從上面發送消息的代碼可以看到最后調用了endTransaction來處理事務執行結果,這個方法里面就是將事務執行的結果通過消息發送給broker,由broker決定消息是否投遞。

public void endTransaction(
    final SendResult sendResult,
    final LocalTransactionState localTransactionState,
    final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
    final MessageId id;
    // 從broker返回的信息中獲取half消息的offset
    if (sendResult.getOffsetMsgId() != null) {
        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
    } else {
        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
    }
    String transactionId = sendResult.getTransactionId();
    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    // 需要把transactionId和offset發送給broker,便於broker查找half消息
    requestHeader.setTransactionId(transactionId);
    requestHeader.setCommitLogOffset(id.getOffset());
    switch (localTransactionState) {
        case COMMIT_MESSAGE:
            // 表明本地址事務成功commit,告訴broker可以提交事務
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
            break;
        case ROLLBACK_MESSAGE:
            // 說明事物需要回滾,有可能是half消息發送失敗,也有可能是本地事務執行失敗
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
            break;
        case UNKNOW:
            // 如果狀態是UNKNOW,broker還會反查producer,也就是接口:org.apache.rocketmq.example.transaction.TransactionCheckListenerImpl#checkLocalTransactionState的作用,但是目前rmq4.2.0並沒有向producer查詢,也就是源碼中都沒有調用這個接口
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
            break;
        default:
            break;
    }

    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
    requestHeader.setMsgId(sendResult.getMsgId());
    String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
    // 這個發送消息是onway的,也就是不會等待返回
    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
        this.defaultMQProducer.getSendMsgTimeout());
}

從最一開始的圖看出,producer將事務結果的消息發送給broker的時候可能會失敗,如果失敗了,broker就不知道本次事務是否應該commit,為了防止這種情況,rmq會向producer發送一個command查詢處於prepare狀態的事務的結果,上面也說了rmq4.2.0並沒有發送這個command,也就是說當前rmq並不能保證producer將事務結果通知到broker。

broker決定消息是否可以投遞

broker處理事務結果的消息的類是org.apache.rocketmq.broker.processor.EndTransactionProcessor

  1. 收到消息之后先檢查是否是事務類型的消息,不是事務消息直接返回。
  2. 根據header中的offset查詢half消息,查不到直接返回,不作處理
  3. 根據half消息構造新的消息,新構造的這個消息會被重新寫入CommitLog,如果是rollback消息則body為空
  4. 如果是rollback消息的話,該消息不會被投遞(原因和half不會被投遞的原因一樣),commit消息broker才會投遞給consumer

也就是說rmq對於commit和rollback都會新寫一個消息到CommitLog,只是rollback的消息的body是空的,而且該消息和half消息一樣不會被投遞,直到CommitLog刪除過期消息,會從磁盤中刪除;但是commit的時候,rmq會重新封裝half消息並“投遞”給consumer消費。

consumer保證消費成功

關於事務消息consumer端的消費方式和普通消息是一樣的,RocketMQ能保證消息能被consumer收到(消息重試等機制,其實有可能存在consumer消費失敗的情況,這種情況RocketMQ並不能解決,官方建議人工解決,這種情況出現的概率極低)。

總結

基於rmq的阿里雲ons實現了事務最終一致性的所有功能,但是apache rmq沒有實現消息回查的功能。所以rmq存在一定幾率會讓事務處於事務結果不明確的狀態。

參考

收發事務消息


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM