背景
在项目中使用多线程抓取第三方数据执行数据入库时,如果某个子线程执行异常,其他子线事务全部回滚,spring对多线程无法进行事务控制,是因为多线程底层连接数据库的时候,是使用的线程变量(TheadLocal),线程之间事务隔离,每个线程有自己的连接,事务肯定不是同一个了。
解决办法
思想就是使用两个CountDownLatch实现子线程的二段提交
步骤:
1、主线程将任务分发给子线程,然后使用childMonitor.await();阻塞主线程,等待所有子线程处理向数据库中插入的业务,并使用BlockingDeque<Boolean>存储线程的返回结果。
2、使用childMonitor.countDown()释放子线程锁定,同时使用mainMonitor.await();阻塞子线程,将程序的控制权交还给主线程。
3、主线程检查子线程执行任务的结果,若有失败结果出现,主线程标记状态告知子线程回滚,然后使用mainMonitor.countDown();将程序控制权再次交给子线程,子线程检测回滚标志,判断是否回滚。
代码实现
线程池工具类
public class ThreadPoolTool<T> { /** * 多线程任务 * @param transactionManager 数据库事务管理 * @param data 需要执行的数据集合 * @param threadCount 核心线程数 * @param params 其他资源参数 * @param clazz 具体执行任务的类 */ public void excuteTask(DataSourceTransactionManager transactionManager, List data, int threadCount, Map<String, Object> params, Class clazz) { if (data == null || data.size() == 0) { return; } int batch = 0; ExecutorService executor = Executors.newFixedThreadPool(threadCount); //监控子线程的任务执行 CountDownLatch childMonitor = new CountDownLatch(threadCount); //监控主线程,是否需要回滚 CountDownLatch mainMonitor = new CountDownLatch(1); //存储任务的返回结果,返回true表示不需要回滚,反之,则回滚 BlockingDeque<Boolean> results = new LinkedBlockingDeque<Boolean>(threadCount); RollBack rollback = new RollBack(false); try { LinkedBlockingQueue<List> queue = splitQueue(data, threadCount); while (true) { List list = queue.poll(); if (list == null) { break; } batch++; params.put("batch", batch); Constructor constructor = clazz.getConstructor(new Class[]{CountDownLatch.class, CountDownLatch.class, BlockingDeque.class, RollBack.class, DataSourceTransactionManager.class, Object.class, Map.class}); ThreadTask task = (ThreadTask) constructor.newInstance(childMonitor, mainMonitor, results, rollback, transactionManager, list, params); executor.execute(task); } // 1、主线程将任务分发给子线程,然后使用childMonitor.await();阻塞主线程,等待所有子线程处理向数据库中插入的业务。 childMonitor.await(); System.out.println("主线程开始执行任务"); //根据返回结果来确定是否回滚 for (int i = 0; i < threadCount; i++) { Boolean result = results.take(); if (!result) { //有线程执行异常,需要回滚子线程 rollback.setNeedRoolBack(true); } } // 3、主线程检查子线程执行任务的结果,若有失败结果出现,主线程标记状态告知子线程回滚,然后使用mainMonitor.countDown();将程序控制权再次交给子线程,子线程检测回滚标志,判断是否回滚。 mainMonitor.countDown(); } catch (Exception e) { log.error(e.getMessage()); } finally { //关闭线程池,释放资源 executor.shutdown(); } } /** * 队列拆分 * * @param data 需要执行的数据集合 * @param threadCount 核心线程数 * @return */ private LinkedBlockingQueue<List<Object>> splitQueue(List<Object> data, int threadCount) { LinkedBlockingQueue<List<Object>> queueBatch = new LinkedBlockingQueue(); int total = data.size(); int oneSize = total / threadCount; int start; int end; for (int i = 0; i < threadCount; i++) { start = i * oneSize; end = (i + 1) * oneSize; if (i < threadCount - 1) { queueBatch.add(data.subList(start, end)); } else { queueBatch.add(data.subList(start, data.size())); } } return queueBatch; } }
子线程任务执public abstract class ThreadTask implements Runnable
/** * 监控子任务的执行 */ private CountDownLatch childMonitor; /** * 监控主线程 */ private CountDownLatch mainMonitor; /** * 存储线程的返回结果 */ private BlockingDeque<Boolean> resultList; /** * 回滚类 */ private RollBack rollback; private Map<String,Object> params; protected Object obj; protected DataSourceTransactionManager transactionManager; protected TransactionStatus status; public ThreadTask(CountDownLatch childCountDown, CountDownLatch mainCountDown, BlockingDeque<Boolean> result, RollBack rollback, DataSourceTransactionManager transactionManager, Object obj,Map<String,Object> params) { this.childMonitor = childCountDown; this.mainMonitor = mainCountDown; this.resultList = result; this.rollback = rollback; this.transactionManager = transactionManager; this.obj = obj; this.params = params; initParam(); } /** * 事务回滚 */ private void rollBack() { System.out.println(Thread.currentThread().getName()+"开始回滚"); transactionManager.rollback(status); } /** * 事务提交 */ private void submit() { System.out.println(Thread.currentThread().getName()+"提交事务"); transactionManager.commit(status); } protected Object getParam(String key){ return params.get(key); }
/**
* 初始化方法:作用是把线程池工具任务执行类所需的外部资源通过 ThreadTask.class的构造方法中 Map<String,Obejct> params参数进行初始化传递进来
*/ public abstract void initParam(); /** * 执行任务,返回false表示任务执行错误,需要回滚 * @return */ public abstract boolean processTask(); @Override public void run() { System.out.println(Thread.currentThread().getName()+"子线程开始执行任务"); DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); status = transactionManager.getTransaction(def); Boolean result = processTask(); //向队列中添加处理结果 resultList.add(result); //2、使用childMonitor.countDown()释放子线程锁定,同时使用mainMonitor.await();阻塞子线程,将程序的控制权交还给主线程。 childMonitor.countDown(); try { //等待主线程的判断逻辑执行完,执行下面的是否回滚逻辑 mainMonitor.await(); } catch (Exception e) { log.error(e.getMessage()); } System.out.println(Thread.currentThread().getName()+"子线程执行剩下的任务"); //3、主线程检查子线程执行任务的结果,若有失败结果出现,主线程标记状态告知子线程回滚,然后使用mainMonitor.countDown();将程序控制权再次交给子线程,子线程检测回滚标志,判断是否回滚。 if (rollback.isNeedRoolBack()) { rollBack(); }else{ //事务提交 submit(); } }
回滚标记类
@Data public class RollBack { public RollBack(boolean needRoolBack) { this.needRoolBack = needRoolBack; } private boolean needRoolBack; }
使用线程池工具:
1,首先建立自己的任务执行类 并且 extends ThreadTask ,实现initParam()和processTask()方法
/** * 多线程处理任务类 */ public class TestTask extends ThreadTask { /** 分批处理的数据 */ private List<Object> objectList; /** * 可能需要注入的某些服务 */ private TestService testService; public TestTask(CountDownLatch childCountDown, CountDownLatch mainCountDown, BlockingDeque<Boolean> result, RollBack rollback, DataSourceTransactionManager transactionManager, Object obj, Map<String, Object> params) { super(childCountDown, mainCountDown, result, rollback, transactionManager, obj, params); } @Override public void initParam() { this.objectList = (List<Object>) getParam("objectList"); this.testService = (TestService) getParam("testService"); } /** * 执行任务,返回false表示任务执行错误,需要回滚 * @return */ @Override public boolean processTask() { try { for (Object o : objectList) { testService.list(); System.out.println(o.toString()+"执行自己的多线程任务逻辑"); } return true; } catch (Exception e) { return false; } } }
2,编写主任务执行方法
/** * 执行多线程任务方法 */ public void testThreadTask() { try { int threadCount = 5; //需要分批处理的数据 List<Object> objectList = new ArrayList<>(); Map<String,Object> params = new HashMap<>(); params.put("objectList",objectList); params.put("testService",testService); //调用多线程工具方法 threadPoolTool.excuteTask(transactionManager,objectList,threadCount,params, TestTask.class); }catch (Exception e){ throw new RuntimeException(e.getMessage()); } }