Spring 事務擴展機制 TransactionSynchronization


在進行數據庫操作的時候,如果需要多個操作要么一起成功,要么一起失敗那么就需要使用事務操作了。使用 Spring 框架只需要在方法上添加 @Transactional 注解這個方法就具有事務特性了。而且 Spring 也事務操作給開發者提供了很方便的擴展。

1、TransactionSynchronizationManager

操作多個方法 Spring 是如何來進行事務處理的呢?Spring 對於事務的管理都是基於 TransactionSynchronizationManager,下面我們就來簡單的分析一下這個類。

TransactionSynchronizationManager.java

	private static final ThreadLocal<Map<Object, Object>> resources =
			new NamedThreadLocal<Map<Object, Object>>("Transactional resources");

	private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
			new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations");

	private static final ThreadLocal<String> currentTransactionName =
			new NamedThreadLocal<String>("Current transaction name");

	private static final ThreadLocal<Boolean> currentTransactionReadOnly =
			new NamedThreadLocal<Boolean>("Current transaction read-only status");

	private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
			new NamedThreadLocal<Integer>("Current transaction isolation level");

	private static final ThreadLocal<Boolean> actualTransactionActive =
			new NamedThreadLocal<Boolean>("Actual transaction active");

  

這個對象里面通過 ThreadLocal 保存了線程需要狀態以及資源對象。

  • resources:保存連接資源,因為一個方法里面可能包含兩個事務(比如事務傳播特性為:TransactionDefinition#PROPAGATION_REQUIRES_NEW),所以就用 Map 來保存資源.
  • synchronizations在進行數據庫操作的時候,如果需要多個操作要么一起成功,要么一起失敗那么就需要使用事務操作了。使用 Spring 框架只需要在方法上添加 @Transactional 注解這個方法就具有事務特性了。而且 Spring 也事務操作給開發者提供了很方便的擴展。:線程同步器,這個就是對 Spring 事務的擴展,通過 TransactionSynchronizationManager#registerSynchronization 來注冊,我們稍后來分析這個對象在進行數據庫操作的時候,如果需要多個操作要么一起成功,要么一起失敗那么就需要使用事務操作了。使用 Spring 框架只需要在方法上添加 @Transactional 注解這個方法就具有事務特性了。而且 Spring 也事務操作給開發者提供了很方便的擴展。
  • currentTransactionReadOnly:用於保存當前事務是否只讀
  • 在進行數據庫操作的時候,如果需要多個操作要么一起成功,要么一起失敗那么就需要使用事務操作了。使用 Spring 框架只需要在方法上添加 @Transactional 注解這個方法就具有事務特性了。而且 Spring 也事務操作給開發者提供了很方便的擴展。
  • currentTransactionName:用於保存當前事務名稱,默認為空
  • currentTransactionIsolationLevel:用來保存當前事務的隔離級別
  • actualTransactionActive:用於保存當前事務是否還是 Active 狀態

下面我們來分析一下事務操作對於連接資源的處理,也就是事務處理中 TransactionSynchronizationManager 對於資源(resources 屬性)的管理。關於 Spring 對於事務的處理可以看我之前的文章 – Spring Annotation Transaction

2、Spring 事務處理

TransactionSynchronizationManager#bindResource綁定連接資源到 TransactionSynchronizationManager 中的 resources 屬性當中。下面是綁定時序圖:

 

 

 TransactionInterceptor 是 Spring 對於事務方法處理的代理入口,里面對 JDBC 事務的抽象:

// 獲取連接
Connection conn = DataSource.getConnection();

// 設置自動提交 false
conn..setAutoCommit(false);

try {
    // 業務操作
    doSomething();
} catch (Exception e) {
    // 回滾事務
    conn.rollback();
}
// 提交事務
conn.commit();

TransactionAspectSupport#invokeWithinTransaction

TransactionAspectSupport#invokeWithinTransaction 是 Spring 對處理的處理。下面我們來大概分析一下它的處理過程:

 

 

 

上面的代碼邏輯如下:

  • TransactionAttributeSource#getTransactionAttribute 獲取事務相關的信息(TransactionAttribute),以注解型事務為例,看方法獲取類上有沒有標注@Transactional注解。
  • 獲取到 Spring 容器中配置的事務管理器 (PlatformTransactionManager),后面就是真正的事務處理
  • 創建事務信息(TransactionInfo),里面包含事務管理器(PlatformTransactionManager) 以及事務相關信息(TransactionAttribute)
  • 后面就是 Spring 對於事務的抽象操作,包含設置自動提交 false業務操作異常回滾事務正常就提交事務

我們回到正題, Spring 通過創建事務信息(TransactionInfo),把數據庫連接通過 TransactionSynchronizationManager#bindResource 綁定到 ThreadLocal 變量當中。然后標注到一個事務當中的其它數據庫操作就可以通過TransactionSynchronizationManager#getResource 獲取到這個連接。

數據庫的事務是基於連接的,Spring 對於多個數據庫操作的事務實現是基於 ThreadLocal。所以在事務操作當中不能使用多線程

3、Spring 事務的擴展 – TransactionSynchronization

在上面的 TransactionSynchronizationManager 類中我們知道,事務操作的時候它的當前線程還保存了 TransactionSynchronization 對象。而這個對象伴隨着 Spring 對 事務處理的各個生命周期都會有相應的擴展。

TransactionSynchronization.java.

public interface TransactionSynchronization extends Flushable {

    /** 事務提交狀態 */
    int STATUS_COMMITTED = 0;

    /** 事務回滾狀態 */
    int STATUS_ROLLED_BACK = 1;

    /**系統異常狀態 */
    int STATUS_UNKNOWN = 2;

    void suspend();

    void resume();

    void flush();

    // 事務提交之前
    void beforeCommit(boolean readOnly);

    // 事務成功或者事務回滾之前
    void beforeCompletion();

    // 事務成功提交之后
    void afterCommit();

    // 操作完成之后(包含事務成功或者事務回滾)
    void afterCompletion(int status);

}

事務的事務擴展項目中的應用場景是當訂單成功之后,發送一條消息到 MQ 當中去。由於事務是和數據庫連接相綁定的,如果把發送消息和數據庫操作放在一個事務里面。當發送消息時間過長時會占用數據庫連接,所以就要把數據庫操作與發送消息到 MQ 解耦開來。可以利用 TransactionSynchronization#afterCommit 的這個方法,當數據成功保存到數據庫並且事務提交了就把消息發送到 MQ 里面。

@Transactional
public void finishOrder(Order order){
    // 修改訂單成功
    updateOrderSuccess(order);

    // 發送消息到 MQ
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter(){
       @Override
       public void afterCommit() {
           mqService.send(order);
       }
    });
}

當事務成功提交之后,就會把消息發送給 MQ,並且不會占用數據庫連接資源。

4、Spring 事務擴展 – @TransactionalEventListener

在 Spring framework 4.2 之后還可以使用@TransactionalEventListener處理數據庫事務提交成功后再執行操作。這種方式比 TransactionSynchronization 更加優雅。它的使用方式如下:

    @Transactional
    public void finishOrder(Order order){
        // 修改訂單成功
        updateOrderSuccess(order);
    
        // 發布 Spring Event 事件
        applicationEventPublisher.publishEvent(new MyAfterTransactionEvent(order));
    }
    
    @Slf4j
    @Component
    private static class MyTransactionListener {
        @Autowired
        private MqService mqService;

        @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
        private void onHelloEvent(MyAfterTransactionEvent event) {
            Order order = event.getOrder();
            mqService.send(order);
        }
    }

    // 定一個事件,繼承自ApplicationEvent 
    private static class MyAfterTransactionEvent extends ApplicationEvent {

        private Order order;

        public MyAfterTransactionEvent(Object source, Order order) {
            super(source);
            this.order = order;
        }

        public Order getOrder() {
            return order;
        }
    }

它的實現原理是當 Spring Bean 的方法標注了通過 TransactionalEventListenerFactory#createApplicationListener創建 ApplicationListenerMethodTransactionalAdapter 然后在事件回調當中創建 TransactionSynchronization的實現類TransactionSynchronizationEventAdapter。並且通過 TransactionSynchronizationManager.registerSynchronization
把 TransactionSynchronizationEventAdapter 注冊到當前線程當中。

TransactionSynchronizationEventAdapter

    private static class TransactionSynchronizationEventAdapter extends TransactionSynchronizationAdapter {

        private final ApplicationListenerMethodAdapter listener;

        private final ApplicationEvent event;

        private final TransactionPhase phase;

        public TransactionSynchronizationEventAdapter(ApplicationListenerMethodAdapter listener,
                ApplicationEvent event, TransactionPhase phase) {

            this.listener = listener;
            this.event = event;
            this.phase = phase;
        }

        @Override
        public int getOrder() {
            return this.listener.getOrder();
        }

        @Override
        public void beforeCommit(boolean readOnly) {
            if (this.phase == TransactionPhase.BEFORE_COMMIT) {
                processEvent();
            }
        }

        @Override
        public void afterCompletion(int status) {
            if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
                processEvent();
            }
            else if (this.phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
                processEvent();
            }
            else if (this.phase == TransactionPhase.AFTER_COMPLETION) {
                processEvent();
            }
        }

        protected void processEvent() {
            this.listener.processEvent(this.event);
        }
    }

上面就是使用@TransactionalEventListener處理數據庫事務提交成功后再執行操作的原理。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM