1. 客戶端怎樣顯式地使用事務?
producer 開啟事務(代碼片段):
ActiveMQSession session = (ActiveMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("TEST.FOO"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 開啟事務 // 發送 TransactionInfo 消息 BEGIN session.getTransactionContext().begin(); for (int i = 0; i < 2; i++) { // Create a message String text = "zhang"; TextMessage message = session.createTextMessage(text); producer.send(message); } // session.getTransactionContext().rollback(); //提交事務 // 發送 TransactionInfo 消息 COMMIT_ONE_PHASE session.getTransactionContext().commit();
2. broker 處理事務的入口:
TransportConnection.processBeginTransaction
TransportConnection.processCommitTransactionOnePhase
TransportConnection.processCommitTransactionTwoPhase
broker 處理事務的邏輯在 TransactionBroker 類中。
那么,具體在 Queue 中是怎樣體現事務的呢?
ActiveMQ 客戶端默認不會開啟事務,而如果客戶端顯式地開啟了事務,則 Queue 中可能會存在多個事務,一個事務中必然會有一個消息列表,當客戶端提交事務時,Queue 接收事務對應的消息列表,而如果客戶端回滾事務,則 Queue 會刪除這些消息。
Queue 中的事務變量:
// 鍵是Transaction,值是對應的消息列表 final ConcurrentHashMap<Transaction, SendSync> sendSyncs = new ConcurrentHashMap<Transaction, SendSync>(); private final LinkedList<Transaction> orderIndexUpdates = new LinkedList<Transaction>();
Queue 內部類 SendSync 封裝了消息和同步操作:
class SendSync extends Synchronization { class MessageContext { public Message message; public ConnectionContext context; public MessageContext(ConnectionContext context, Message message) { this.context = context; this.message = message; } } final Transaction transaction; // 這就是我要找的消息列表 List<MessageContext> additions = new ArrayList<MessageContext>(); public SendSync(Transaction transaction) { this.transaction = transaction; } public void add(ConnectionContext context, Message message) { additions.add(new MessageContext(context, message)); } @Override public void beforeCommit() throws Exception { synchronized (orderIndexUpdates) { orderIndexUpdates.addLast(transaction); } } @Override public void afterCommit() throws Exception { ArrayList<SendSync> syncs = new ArrayList<SendSync>(200); sendLock.lockInterruptibly(); try { synchronized (orderIndexUpdates) { Transaction next = orderIndexUpdates.peek(); while( next!=null && next.isCommitted() ) { syncs.add(sendSyncs.remove(orderIndexUpdates.removeFirst())); next = orderIndexUpdates.peek(); } } for (SendSync sync : syncs) { sync.processSend(); } } finally { sendLock.unlock(); } for (SendSync sync : syncs) { sync.processSent(); } } // called with sendLock private void processSend() throws Exception { for (Iterator<MessageContext> iterator = additions.iterator(); iterator.hasNext(); ) { MessageContext messageContext = iterator.next(); // It could take while before we receive the commit // op, by that time the message could have expired.. if (broker.isExpired(messageContext.message)) { broker.messageExpired(messageContext.context, messageContext.message, null); destinationStatistics.getExpired().increment(); iterator.remove(); continue; } sendMessage(messageContext.message); messageContext.message.decrementReferenceCount(); } } private void processSent() throws Exception { for (MessageContext messageContext : additions) { messageSent(messageContext.context, messageContext.message); } } @Override public void afterRollback() throws Exception { try { for (MessageContext messageContext : additions) { messageContext.message.decrementReferenceCount(); } } finally { sendSyncs.remove(transaction); } } }
3. 那么 XA 事務又是什么呢?ActiveMQ 實現了分布式事務,當系統中存在多數據源的情況下,也許會需要使用 XA ,為了方便,只提供一個單數據源的例子:
Xid xid = new MyXid(1, new byte[]{0x01}, new byte[]{0x02}); session.getTransactionContext().start(xid, XAResource.TMSUCCESS); // 操作mq session.getTransactionContext().end(xid, XAResource.TMSUCCESS); int prepare = session.getTransactionContext().prepare(xid); System.out.println("prepare:" + prepare); // 根據prepare結果決定是否提交 session.getTransactionContext().commit(xid, false);
這些操作步驟,和 MySQL的 XA 是一樣的,也是 start,end,prepare,commit,實現的都是javax transaction 那一套接口。
public class MyXid implements Xid { private int formatId; private byte[] globalTid; private byte[] branchQ; public MyXid(int formatId, byte[] globalTid, byte[] branchQ) { this.formatId = formatId; this.globalTid = globalTid; this.branchQ = branchQ; } public byte[] getBranchQualifier() { return this.branchQ; } public int getFormatId() { return formatId; } public byte[] getGlobalTransactionId() { return this.globalTid; } }
