當我們有業務需要在事務提交過后進行某一項或者某一系列的業務操作時候我們就可以使用TransactionSynchronizationManager
通過spring的aop機制將需要進行后置業務處理的操作,提交給spring的處理機制,並且切入到事務處理的后面
TransactionSynchronizationManager這個類中由一系列的ThreadLocal ,我們需要關注的是synchronizations,在后面使用到的TransactionSynchronizationManager.isSynchronizationActive()、TransactionSynchronizationManager.registerSynchronization()和new TransactionSynchronizationAdapter(),都與它密切有關。
在Spring在開啟數據庫事務(無論是使用@Transactional注解,還是用xml配置)時,都會向其中寫入一個實例,用於自動處理Connection的獲取、提交或回滾等操作。
再看isSynchronizationActive()方法,判斷了synchronizations中是否有數據(Set<TransactionSynchronization>非null即可,並不要求其中有TransactionSynchronization實例。
再看registerSynchronization()方法,首先調用isSynchronizationActive()做一個校驗;然后將入參synchronization添加到synchronizations 中。入參synchronization中的方法不會在這里執行,而是要等到事務執行到特定階段時才會被調用。
TransactionSynchronizationAdapter是一個適配器:它實現了TransactionSynchronization接口,並為每一個接口方法提供了一個空的實現。這類適配器的基本思想是:接口中定義了很多方法,然而業務代碼往往只需要實現其中一小部分。利用這種“空實現”適配器,我們可以專注於業務上需要處理的回調方法,而不用在業務類中放大量而且重復的空方法。
結合TransactionSynchronizationManager和TransactionSynchronizationAdapter利用ThreadPoolExecutor實現一個事務后多線程處理功能。
package com.*.module.spring.support; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.transaction.support.TransactionSynchronizationAdapter; import org.springframework.transaction.support.TransactionSynchronizationManager; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; /** * 事務提交異步線程 * * @author ly */ public class TransactionAfterCommitExecutor extends ThreadPoolExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(TransactionAfterCommitExecutor.class); public TransactionAfterCommitExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public TransactionAfterCommitExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public TransactionAfterCommitExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public TransactionAfterCommitExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } private ThreadLocal<List<Runnable>> currentRunables = new ThreadLocal<List<Runnable>>(){ @Override protected List<Runnable> initialValue() { return new ArrayList<>(5); } }; private ThreadLocal<Boolean> registed = new ThreadLocal<Boolean>(){ @Override protected Boolean initialValue() { return false; } }; /** * 默認策略丟棄最老的數據 */ public TransactionAfterCommitExecutor() { this( 50, 500, 500L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024), new ThreadFactoryBuilder().setNameFormat("transaction-after-commit-call-pool-%d").build(), new ThreadPoolExecutor.DiscardOldestPolicy()); } @Override public void execute(final Runnable runnable) { //如果事務同步未啟用則認為事務已經提交,馬上進行異步處理 if (!TransactionSynchronizationManager.isSynchronizationActive()) { super.execute(runnable); } else { //同一個事務的合並到一起處理 currentRunables.get().add(runnable); //如果存在事務則在事務結束后異步處理 if(!registed.get()){ TransactionSynchronizationManager.registerSynchronization(new AfterCommitTransactionSynchronizationAdapter()); registed.set(true); } } } @Override public Future<?> submit(final Runnable runnable) { //如果事務同步未啟用則認為事務已經提交,馬上進行異步處理 if (!TransactionSynchronizationManager.isSynchronizationActive()) { return super.submit(runnable); } else { final RunnableFuture<Void> ftask = newTaskFor(runnable, null); //如果存在事務則在事務結束后異步處理 TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { @Override public void afterCommit() { TransactionAfterCommitExecutor.super.submit(ftask); } }); return ftask; } } private class AfterCommitTransactionSynchronizationAdapter extends TransactionSynchronizationAdapter{ @Override public void afterCompletion(int status) { final List<Runnable> txRunables = new ArrayList<>(currentRunables.get()); currentRunables.remove(); registed.remove(); if(status == STATUS_COMMITTED){ TransactionAfterCommitExecutor.super.execute(new Runnable() { @Override public void run() { for (Runnable runnable : txRunables) { try { runnable.run(); } catch (Exception e) { LOGGER.error("ex:",e); } } } }); } } } }
import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.transaction.support.TransactionSynchronizationAdapter; @Transactional(readOnly = false,propagation=Propagation.REQUIRED)//開事物 public void save(String name,Integer age ,BigDecimal amount){ Zexample1Model zexample1Model = new Zexample1Model(); zexample1Model.setName(name+"_"); zexample1Model.setAge(age); zexample1Model.setAmount(amount); zexample1Model.setAddTime(new Date()); zexample1Model.setStatus(1); zexample1Dao.save(zexample1Model); System.out.println("id="+zexample1Model.getId()); TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { @Override public void afterCommit() { System.out.println("send email after transaction commit..."); } }); System.out.println("this method complete...."); }
或者用於切面的事務處理
package com.my.data.aop; import java.lang.reflect.Field; import java.util.Objects; import com.my.data.multisource.redismanager.RedisBean; import com.my.data.utils.ThreadLocalUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.*; import org.springframework.context.annotation.EnableAspectJAutoProxy; import org.springframework.core.annotation.Order; import org.springframework.data.redis.core.RedisConnectionUtils; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import org.springframework.transaction.support.TransactionSynchronizationManager; import javax.annotation.Resource; @Aspect @Component public class RedisAspect { private Logger logger = LogManager.getLogger(RedisAspect.class); /** * 定義切入點,切入點為com.example.aop下的所有函數 */ @Pointcut("execution(public * com.my.data.service..*.*(..))") public void redisPointcut() { } @Resource(name = RedisBean.defaultStringRedis) private StringRedisTemplate redis; /** * 前置通知:在連接點之前執行的通知 * * @param joinPoint * @throws Throwable */ @Before("redisPointcut()") public void doBefore(JoinPoint joinPoint) throws Throwable { try { Field field = joinPoint.getTarget().getClass().getDeclaredField("redis"); field.setAccessible(true); Object targetRedis = field.get(joinPoint.getTarget()); if (!Objects.isNull(targetRedis)) { this.redis = (StringRedisTemplate) targetRedis; logger.info("redis : {}", redis.hashCode()); } }catch (NoSuchFieldException e) { logger.info("not found redis"); }catch (Exception e) { logger.error("doAfterReturning error.", e); } } @AfterReturning(returning = "ret", pointcut = "redisPointcut()") public void doAfterReturning(Object ret) throws Throwable { try { if (!Objects.isNull(redis)) { logger.info("redis : {}", redis.hashCode()); Object bindResource = TransactionSynchronizationManager.getResource(redis.getConnectionFactory()); if(null == bindResource) { RedisConnectionUtils.unbindConnection(redis.getConnectionFactory()); } } } catch (Exception e) { logger.error("doAfterReturning error.", e); } } }