分布式事務是一個復雜的問題,rmq實現了事務的最終一致性,rmq保證本地事務成功消息一定會發送成功並被成功消費,如果本地事務失敗了,消息不會被發送。
rmq事務消息的實現過程為:
- producer發送half消息
- broker確認half消息,並通知producer,表示消息已經成功發送到broker(這個過程其實就是步驟1broker的返回)
- producer收到half確認消息之后,執行自己本地事務,並將事務結果(UNKNOW、commit、rollback)告訴broker(這是一個oneway消息,而且失敗不重試)
- broker收到producer本地事務的結果后決定是否投遞消息給consumer
- 鑒於producer發送本地事務結果可能失敗,broker會定時掃描集群中的事務消息,然后回查(apache4.2.0尚未實現,因為沒有調用org.apache.rocketmq.broker.client.net.Broker2Client#checkProducerTransactionState)

producer發送half消息
事務消息的發送過程和普通消息發送過程是不一樣的,發送消息的方法是org.apache.rocketmq.client.producer.TransactionMQProducer#sendMessageInTransaction,入參有一個LocalTransactionExecuter,需要用戶實現一個本地事務的executor,用戶可以在executor中執行事務操作
// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter, final Object arg)
throws MQClientException {
if (null == tranExecuter) {
throw new MQClientException("tranExecutor is null", null);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
// 標記消息是half消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
// 發送half消息,該方法是同步發送,事務消息也必須是同步發送
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
// 只有在half消息發送成功的時候才會執行事務
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
// 執行本地事務
localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
// 根據事務commit的情況來判斷下一步操作
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
為了保證本地事務和消息發送成功的原子性,producer會先發送一個half消息到broker
- 只有half消息發送成功了,事務才會被執行
- 如果half消息發送失敗了,事務不會被執行
half消息和普通的消息也不一樣,half消息發送到broker后並不會被consumer消費掉。之所以不會被消費掉的原因如下:
- broker在將消息寫入CommitLog的時候會判斷消息類型,如果是是prepare或者rollback消息,ConsumeQueue的offset(每個消息對應ConsumeQueue中的一個數據結構(包含topic、tag的hashCode、消息對應CommitLog的物理offset),offset表示數據結構是第幾個)不會增加
- broker在構造ConsumeQueue的時候會判斷是否是prepare或者rollback消息,如果是這兩種中的一種則不會將該消息放入ConsumeQueue,cnosumer在拉取消息的時候也就不會拉取到prepare和rollback的消息。
相關代碼如下:
// org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner)
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
CommitLog.this.topicQueueTable.put(key, ++queueOffset);
break;
default:
break;
}
// org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue#dispatch
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
以上兩點保證了prepare消息也就是half消息不會被消費。
producer結束事務
producer根據half消息發送結果和事務執行結果來處理事務——commit或者rollback。從上面發送消息的代碼可以看到最后調用了endTransaction來處理事務執行結果,這個方法里面就是將事務執行的結果通過消息發送給broker,由broker決定消息是否投遞。
public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
// 從broker返回的信息中獲取half消息的offset
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
// 需要把transactionId和offset發送給broker,便於broker查找half消息
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
case COMMIT_MESSAGE:
// 表明本地址事務成功commit,告訴broker可以提交事務
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
// 說明事物需要回滾,有可能是half消息發送失敗,也有可能是本地事務執行失敗
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
// 如果狀態是UNKNOW,broker還會反查producer,也就是接口:org.apache.rocketmq.example.transaction.TransactionCheckListenerImpl#checkLocalTransactionState的作用,但是目前rmq4.2.0並沒有向producer查詢,也就是源碼中都沒有調用這個接口
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
// 這個發送消息是onway的,也就是不會等待返回
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}
從最一開始的圖看出,producer將事務結果的消息發送給broker的時候可能會失敗,如果失敗了,broker就不知道本次事務是否應該commit,為了防止這種情況,rmq會向producer發送一個command查詢處於prepare狀態的事務的結果,上面也說了rmq4.2.0並沒有發送這個command,也就是說當前rmq並不能保證producer將事務結果通知到broker。
broker決定消息是否可以投遞
broker處理事務結果的消息的類是org.apache.rocketmq.broker.processor.EndTransactionProcessor
- 收到消息之后先檢查是否是事務類型的消息,不是事務消息直接返回。
- 根據header中的offset查詢half消息,查不到直接返回,不作處理
- 根據half消息構造新的消息,新構造的這個消息會被重新寫入CommitLog,如果是rollback消息則body為空
- 如果是rollback消息的話,該消息不會被投遞(原因和half不會被投遞的原因一樣),commit消息broker才會投遞給consumer
也就是說rmq對於commit和rollback都會新寫一個消息到CommitLog,只是rollback的消息的body是空的,而且該消息和half消息一樣不會被投遞,直到CommitLog刪除過期消息,會從磁盤中刪除;但是commit的時候,rmq會重新封裝half消息並“投遞”給consumer消費。
consumer保證消費成功
關於事務消息consumer端的消費方式和普通消息是一樣的,RocketMQ能保證消息能被consumer收到(消息重試等機制,其實有可能存在consumer消費失敗的情況,這種情況RocketMQ並不能解決,官方建議人工解決,這種情況出現的概率極低)。
總結
基於rmq的阿里雲ons實現了事務最終一致性的所有功能,但是apache rmq沒有實現消息回查的功能。所以rmq存在一定幾率會讓事務處於事務結果不明確的狀態。
