ActiveMQ 事務和XA


1. 客戶端怎樣顯式地使用事務?

producer 開啟事務(代碼片段):

ActiveMQSession session = (ActiveMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("TEST.FOO");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);

// 開啟事務 
// 發送 TransactionInfo 消息 BEGIN
session.getTransactionContext().begin();

for (int i = 0; i < 2; i++) {
    // Create a message
    String text = "zhang";
    TextMessage message = session.createTextMessage(text);
    producer.send(message);
}
// session.getTransactionContext().rollback();
//提交事務
// 發送 TransactionInfo 消息 COMMIT_ONE_PHASE
session.getTransactionContext().commit();

 

2. broker 處理事務的入口:

TransportConnection.processBeginTransaction
TransportConnection.processCommitTransactionOnePhase
TransportConnection.processCommitTransactionTwoPhase

broker 處理事務的邏輯在 TransactionBroker 類中。

那么,具體在 Queue 中是怎樣體現事務的呢?

ActiveMQ 客戶端默認不會開啟事務,而如果客戶端顯式地開啟了事務,則 Queue 中可能會存在多個事務,一個事務中必然會有一個消息列表,當客戶端提交事務時,Queue 接收事務對應的消息列表,而如果客戶端回滾事務,則 Queue 會刪除這些消息。

Queue 中的事務變量:

// 鍵是Transaction,值是對應的消息列表
final ConcurrentHashMap<Transaction, SendSync> sendSyncs = new ConcurrentHashMap<Transaction, SendSync>();
private final LinkedList<Transaction> orderIndexUpdates = new LinkedList<Transaction>();

Queue 內部類 SendSync 封裝了消息和同步操作:

class SendSync extends Synchronization {

    class MessageContext {
        public Message message;
        public ConnectionContext context;

        public MessageContext(ConnectionContext context, Message message) {
            this.context = context;
            this.message = message;
        }
    }

    final Transaction transaction;
    // 這就是我要找的消息列表
    List<MessageContext> additions = new ArrayList<MessageContext>();

    public SendSync(Transaction transaction) {
        this.transaction = transaction;
    }

    public void add(ConnectionContext context, Message message) {
        additions.add(new MessageContext(context, message));
    }

    @Override
    public void beforeCommit() throws Exception {
        synchronized (orderIndexUpdates) {
            orderIndexUpdates.addLast(transaction);
        }
    }

    @Override
    public void afterCommit() throws Exception {
        ArrayList<SendSync> syncs = new ArrayList<SendSync>(200);
        sendLock.lockInterruptibly();
        try {
            synchronized (orderIndexUpdates) {
                Transaction next = orderIndexUpdates.peek();
                while( next!=null && next.isCommitted() ) {
                    syncs.add(sendSyncs.remove(orderIndexUpdates.removeFirst()));
                    next = orderIndexUpdates.peek();
                }
            }
            for (SendSync sync : syncs) {
                sync.processSend();
            }
        } finally {
            sendLock.unlock();
        }
        for (SendSync sync : syncs) {
            sync.processSent();
        }
    }

    // called with sendLock
    private void processSend() throws Exception {

        for (Iterator<MessageContext> iterator = additions.iterator(); iterator.hasNext(); ) {
            MessageContext messageContext = iterator.next();
            // It could take while before we receive the commit
            // op, by that time the message could have expired..
            if (broker.isExpired(messageContext.message)) {
                broker.messageExpired(messageContext.context, messageContext.message, null);
                destinationStatistics.getExpired().increment();
                iterator.remove();
                continue;
            }
            sendMessage(messageContext.message);
            messageContext.message.decrementReferenceCount();
        }
    }

    private void processSent() throws Exception {
        for (MessageContext messageContext : additions) {
            messageSent(messageContext.context, messageContext.message);
        }
    }

    @Override
    public void afterRollback() throws Exception {
        try {
            for (MessageContext messageContext : additions) {
                messageContext.message.decrementReferenceCount();
            }
        } finally {
            sendSyncs.remove(transaction);
        }
    }
}

 

3. 那么 XA 事務又是什么呢?ActiveMQ 實現了分布式事務,當系統中存在多數據源的情況下,也許會需要使用 XA ,為了方便,只提供一個單數據源的例子:

Xid xid = new MyXid(1, new byte[]{0x01}, new byte[]{0x02});
session.getTransactionContext().start(xid, XAResource.TMSUCCESS);
// 操作mq
session.getTransactionContext().end(xid, XAResource.TMSUCCESS);
int prepare = session.getTransactionContext().prepare(xid);
System.out.println("prepare:" + prepare);
// 根據prepare結果決定是否提交
session.getTransactionContext().commit(xid, false);

這些操作步驟,和 MySQL的 XA 是一樣的,也是 start,end,prepare,commit,實現的都是javax transaction 那一套接口。

public class MyXid implements Xid {
    private int formatId;
    private byte[] globalTid;
    private byte[] branchQ;
    
    public MyXid(int formatId, byte[] globalTid, byte[] branchQ) {
        this.formatId = formatId;
        this.globalTid = globalTid;
        this.branchQ = branchQ;
    }
    
    public byte[] getBranchQualifier() {
        return this.branchQ;
    }

    public int getFormatId() {
        return formatId;
    }

    public byte[] getGlobalTransactionId() {
        return this.globalTid;
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM