1 發送事務消息的入口為:TransactionMQProducer#sendMessageInTransaction:
public TransactionSendResult sendMessageInTransaction(Message msg, Object arg) throws MQClientException {
if (null == this.transactionListener) { // @1
throw new MQClientException("TransactionListener is null", (Throwable)null);
} else {
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, (LocalTransactionExecuter)null, arg); // @2
}
}
代碼@1:如果transactionListener為空,則直接拋出異常。
代碼@2:調用defaultMQProducerImpl的sendMessageInTransaction方法。
2. DefaultMQProducerImpl#sendMessageInTransaction
public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter localTransactionExecuter, Object arg) throws MQClientException {
TransactionListener transactionListener = this.getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", (Throwable)null);
} else {
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
MessageAccessor.putProperty(msg, "TRAN_MSG", "true"); // @1
MessageAccessor.putProperty(msg, "PGROUP", this.defaultMQProducer.getProducerGroup());
try {
sendResult = this.send(msg);
} catch (Exception var11) {
throw new MQClientException("send message Exception", var11);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch(sendResult.getSendStatus()) {
case SEND_OK:
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty("UNIQ_KEY");
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
this.log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
this.log.info("executeLocalTransactionBranch return {}", localTransactionState);
this.log.info(msg.toString());
}
} catch (Throwable var10) {
this.log.info("executeLocalTransactionBranch exception", var10);
this.log.info(msg.toString());
localException = var10;
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
}
try {
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception var9) {
this.log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", var9);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
}
Step1:首先先闡述一下參數含義。final Message msg:消息;TransactionListener tranExecuter:事務監聽器; Object arg:其他附加參數,該參數會再TransactionListener 回調函數中原值傳入。
Step2:代碼@1 在消息屬性中,添加兩個屬性:TRAN_MSG,其值為true,表示為事務消息;PGROUP:消息所屬發送者組,然后以同步方式發送消息。
DefaultMQProducerImpl#sendKernelImpl
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
在消息發送之前,會先檢查消息的屬性TRAN_MSG,如果存在並且值為true,則通過設置消息系統標記的方式,設置消息為MessageSysFlag.TRANSACTION_PREPARED_TYPE。
SendMessageProcessor#sendMessage
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
Step3:Broker端首先客戶發送消息請求后,判斷消息類型,如果是事務消息,則調用TransactionalMessageService#prepareMessage方法,否則走原先的邏輯,調用MessageStore#putMessage方法。
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#prepareMessage
public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
return transactionalMessageBridge.putHalfMessage(messageInner);
}
step4:事務消息,將調用TransactionalMessageServiceImpl#prepareMessage方法,繼而調用TransactionalMessageBridge#prepareMessage方法。
TransactionalMessageBridge#parseHalfMessageInner
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
return store.putMessage(parseHalfMessageInner(messageInner));
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
Step5:備份消息的原主題名稱與原隊列ID,然后取消是事務消息的消息標簽,重新設置消息的主題為:RMQ_SYS_TRANS_HALF_TOPIC,隊列ID固定為0。然后調用MessageStore#putMessage方法將消息持久化,這里TransactionalMessageBridge橋接類,就是封裝事務消息的相關流程,最終調用MessageStore完成消息的持久化。消息入庫后,會繼續回到DefaultMQProducerImpl#sendMessageInTransaction,上文的Step2后面,也就是通過同步將消息發送到消息服務端。
DefaultMQProducerImpl#sendMessageInTransaction
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
localTransactionState = tranExecuter.executeLocalTransaction(msg, arg);
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
Step6:如果消息發送成功,會回調TransactionListener#executeLocalTransaction方法,執行本地事務,並且返回本地事務狀態為:public enum LocalTransactionState {COMMIT_MESSAGE,ROLLBACK_MESSAGE,
UNKNOW,} 之一,注意:TransactionListener#executeLocalTransaction是在發送者成功發送PREPARED消息后,會執行本地事務方法,然后返回本地事務狀態;如果PREPARED消息發送失敗,則不會調用
TransactionListener#executeLocalTransaction,並且本地事務消息,設置為
LocalTransactionState.ROLLBACK_MESSAGE,表示消息需要被回滾。
DefaultMQProducerImpl#sendMessageInTransaction
try {
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
step7:調用endTransaction方法結束事務(提交或回滾)。
DefaultMQProducerImpl#endTransaction
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
step8:組裝結束事務請求,主要參數為:事務ID、事務操作(commitOrRollback)、消費組、消息隊列偏移量、消息ID,fromTransactionCheck,從這里發出的請求,默認為false。Broker端的請求處理器為:EndTransactionProcessor。
step9:EndTransactionProcessor根據事務提交類型:TRANSACTION_COMMIT_TYPE(提交事務)、TRANSACTION_ROLLBACK_TYPE(回滾事務)、TRANSACTION_NOT_TYPE、忽略該請求,會記錄info級別的日志相關的代碼將在下文詳細分析,在這里,我們先大概梳理一條消息發送的路徑TransactionMQProducer#sendMessageInTransaction的調用鏈來總結一下事務消息的發送流程。
本文到這里,初步展示了事務消息的發送流程,總的說來,RocketMQ的事務消息發送使用二階段提交思路,首先,在消息發送時,先發送消息類型為Prepread類型的消息,然后在將該消息成功存入到消息服務器后,會回調 TransactionListener#executeLocalTransaction,執行本地事務狀態回調函數,然后根據該方法的返回值,結束事務:
1、COMMIT_MESSAGE :提交事務。
2、ROLLBACK_MESSAGE:回滾事務。
3、UNKNOW:未知事務狀態,此時消息服務器(Broker)收到EndTransaction命令時,將不對這種消息做處理,消息還處於Prepared類型,存儲在主題為:RMQ_SYS_TRANS_HALF_TOPIC的隊列中,然后消息發送流程將結束,那這些消息如何提交或回滾呢?為了實現避免客戶端需要再次發送提交、回滾命令,RocketMQ會采取定時任務將RMQ_SYS_TRANS_HALF_TOPIC中的消息取出,然后回到客戶端,判斷該消息是否需要提交或回滾,來完成事務消息的聲明周期,該部分內容將在下節重點探討。
作者:唯有堅持不懈
來源:CSDN
原文:https://blog.csdn.net/prestigeding/article/details/81263833
