在進行數據庫操作的時候,如果需要多個操作要么一起成功,要么一起失敗那么就需要使用事務操作了。使用 Spring 框架只需要在方法上添加 @Transactional
注解這個方法就具有事務特性了。而且 Spring 也事務操作給開發者提供了很方便的擴展。
1、TransactionSynchronizationManager
操作多個方法 Spring 是如何來進行事務處理的呢?Spring 對於事務的管理都是基於 TransactionSynchronizationManager
,下面我們就來簡單的分析一下這個類。
TransactionSynchronizationManager.java
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<Map<Object, Object>>("Transactional resources"); private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations"); private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal<String>("Current transaction name"); private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal<Boolean>("Current transaction read-only status"); private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal<Integer>("Current transaction isolation level"); private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal<Boolean>("Actual transaction active");
這個對象里面通過 ThreadLocal 保存了線程需要狀態以及資源對象。
resources
:保存連接資源,因為一個方法里面可能包含兩個事務(比如事務傳播特性為:TransactionDefinition#PROPAGATION_REQUIRES_NEW
),所以就用 Map 來保存資源.synchronizations
在進行數據庫操作的時候,如果需要多個操作要么一起成功,要么一起失敗那么就需要使用事務操作了。使用 Spring 框架只需要在方法上添加@Transactional
注解這個方法就具有事務特性了。而且 Spring 也事務操作給開發者提供了很方便的擴展。:線程同步器,這個就是對 Spring 事務的擴展,通過TransactionSynchronizationManager#registerSynchronization
來注冊,我們稍后來分析這個對象在進行數據庫操作的時候,如果需要多個操作要么一起成功,要么一起失敗那么就需要使用事務操作了。使用 Spring 框架只需要在方法上添加@Transactional
注解這個方法就具有事務特性了。而且 Spring 也事務操作給開發者提供了很方便的擴展。currentTransactionReadOnly
:用於保存當前事務是否只讀- 在進行數據庫操作的時候,如果需要多個操作要么一起成功,要么一起失敗那么就需要使用事務操作了。使用 Spring 框架只需要在方法上添加
@Transactional
注解這個方法就具有事務特性了。而且 Spring 也事務操作給開發者提供了很方便的擴展。 currentTransactionName
:用於保存當前事務名稱,默認為空currentTransactionIsolationLevel
:用來保存當前事務的隔離級別actualTransactionActive
:用於保存當前事務是否還是 Active 狀態
下面我們來分析一下事務操作對於連接資源的處理,也就是事務處理中 TransactionSynchronizationManager
對於資源(resources 屬性)的管理。關於 Spring 對於事務的處理可以看我之前的文章 – Spring Annotation Transaction:
2、Spring 事務處理
TransactionSynchronizationManager#bindResource
綁定連接資源到 TransactionSynchronizationManager
中的 resources 屬性當中。下面是綁定時序圖:
TransactionInterceptor
是 Spring 對於事務方法處理的代理入口,里面對 JDBC 事務的抽象:
// 獲取連接 Connection conn = DataSource.getConnection(); // 設置自動提交 false conn..setAutoCommit(false); try { // 業務操作 doSomething(); } catch (Exception e) { // 回滾事務 conn.rollback(); } // 提交事務 conn.commit();
TransactionAspectSupport#invokeWithinTransaction
TransactionAspectSupport#invokeWithinTransaction
是 Spring 對處理的處理。下面我們來大概分析一下它的處理過程:
上面的代碼邏輯如下:
TransactionAttributeSource#getTransactionAttribute
獲取事務相關的信息(TransactionAttribute
),以注解型事務為例,看方法獲取類上有沒有標注@Transactional
注解。- 獲取到 Spring 容器中配置的事務管理器 (
PlatformTransactionManager
),后面就是真正的事務處理 - 創建事務信息(
TransactionInfo
),里面包含事務管理器(PlatformTransactionManager
) 以及事務相關信息(TransactionAttribute
) - 后面就是 Spring 對於事務的抽象操作,包含
設置自動提交 false
、業務操作
、異常回滾事務
和正常就提交事務
我們回到正題, Spring 通過創建事務信息(TransactionInfo
),把數據庫連接通過 TransactionSynchronizationManager#bindResource
綁定到 ThreadLocal
變量當中。然后標注到一個事務當中的其它數據庫操作就可以通過TransactionSynchronizationManager#getResource
獲取到這個連接。
數據庫的事務是基於連接的,Spring 對於多個數據庫操作的事務實現是基於 ThreadLocal。所以在事務操作當中不能使用多線程
3、Spring 事務的擴展 – TransactionSynchronization
在上面的 TransactionSynchronizationManager
類中我們知道,事務操作的時候它的當前線程還保存了 TransactionSynchronization
對象。而這個對象伴隨着 Spring 對 事務處理的各個生命周期都會有相應的擴展。
TransactionSynchronization.java.
public interface TransactionSynchronization extends Flushable { /** 事務提交狀態 */ int STATUS_COMMITTED = 0; /** 事務回滾狀態 */ int STATUS_ROLLED_BACK = 1; /**系統異常狀態 */ int STATUS_UNKNOWN = 2; void suspend(); void resume(); void flush(); // 事務提交之前 void beforeCommit(boolean readOnly); // 事務成功或者事務回滾之前 void beforeCompletion(); // 事務成功提交之后 void afterCommit(); // 操作完成之后(包含事務成功或者事務回滾) void afterCompletion(int status); }
事務的事務擴展項目中的應用場景是當訂單成功之后,發送一條消息到 MQ 當中去。由於事務是和數據庫連接相綁定的,如果把發送消息和數據庫操作放在一個事務里面。當發送消息時間過長時會占用數據庫連接,所以就要把數據庫操作與發送消息到 MQ 解耦開來。可以利用 TransactionSynchronization#afterCommit
的這個方法,當數據成功保存到數據庫並且事務提交了就把消息發送到 MQ 里面。
@Transactional public void finishOrder(Order order){ // 修改訂單成功 updateOrderSuccess(order); // 發送消息到 MQ TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter(){ @Override public void afterCommit() { mqService.send(order); } }); }
當事務成功提交之后,就會把消息發送給 MQ,並且不會占用數據庫連接資源。
4、Spring 事務擴展 – @TransactionalEventListener
在 Spring framework 4.2 之后還可以使用@TransactionalEventListener
處理數據庫事務提交成功后再執行操作。這種方式比 TransactionSynchronization
更加優雅。它的使用方式如下:
@Transactional public void finishOrder(Order order){ // 修改訂單成功 updateOrderSuccess(order); // 發布 Spring Event 事件 applicationEventPublisher.publishEvent(new MyAfterTransactionEvent(order)); } @Slf4j @Component private static class MyTransactionListener { @Autowired private MqService mqService; @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) private void onHelloEvent(MyAfterTransactionEvent event) { Order order = event.getOrder(); mqService.send(order); } } // 定一個事件,繼承自ApplicationEvent private static class MyAfterTransactionEvent extends ApplicationEvent { private Order order; public MyAfterTransactionEvent(Object source, Order order) { super(source); this.order = order; } public Order getOrder() { return order; } }
它的實現原理是當 Spring Bean 的方法標注了通過 TransactionalEventListenerFactory#createApplicationListener
創建 ApplicationListenerMethodTransactionalAdapter
然后在事件回調當中創建 TransactionSynchronization
的實現類TransactionSynchronizationEventAdapter
。並且通過 TransactionSynchronizationManager.registerSynchronization
把 TransactionSynchronizationEventAdapter
注冊到當前線程當中。
TransactionSynchronizationEventAdapter
private static class TransactionSynchronizationEventAdapter extends TransactionSynchronizationAdapter { private final ApplicationListenerMethodAdapter listener; private final ApplicationEvent event; private final TransactionPhase phase; public TransactionSynchronizationEventAdapter(ApplicationListenerMethodAdapter listener, ApplicationEvent event, TransactionPhase phase) { this.listener = listener; this.event = event; this.phase = phase; } @Override public int getOrder() { return this.listener.getOrder(); } @Override public void beforeCommit(boolean readOnly) { if (this.phase == TransactionPhase.BEFORE_COMMIT) { processEvent(); } } @Override public void afterCompletion(int status) { if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) { processEvent(); } else if (this.phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) { processEvent(); } else if (this.phase == TransactionPhase.AFTER_COMPLETION) { processEvent(); } } protected void processEvent() { this.listener.processEvent(this.event); } }
上面就是使用@TransactionalEventListener
處理數據庫事務提交成功后再執行操作的原理。