1 public String test() { 2 String result = ""; 3 CountDownLatch rollBackLatch = new CountDownLatch(1); 4 CountDownLatch mainThreadLatch = new CountDownLatch(2); 5 AtomicBoolean rollbackFlag = new AtomicBoolean(false); 6 List<Future<String>> list = new ArrayList<Future<String>>(); 7 // 線程有返回值 8 Future<String> future = executor1(rollBackLatch, mainThreadLatch, rollbackFlag); 9 list.add(future); 10 // 線程無返回值 11 executor2(rollBackLatch, mainThreadLatch, rollbackFlag); 12 // 主線程業務執行完畢 如果其他線程也執行完畢 且沒有報異常 正在阻塞狀態中 喚醒其他線程 提交所有的事務 13 // 如果其他線程或者主線程報錯 則不會進入if 會觸發回滾 14 if (!rollbackFlag.get()) { 15 try { 16 mainThreadLatch.await(); 17 rollBackLatch.countDown(); 18 for (Future<String> f : list) 19 if (!"success".equals(f.get())) 20 result = f.get() + "。"; 21 } catch (InterruptedException | ExecutionException e) { 22 e.printStackTrace(); 23 } 24 } 25 return result; 26 } 27 28 @Autowired 29 private PlatformTransactionManager transactionManager; 30 31 public Future<String> executor1(CountDownLatch rollBackLatch, CountDownLatch mainThreadLatch, 32 AtomicBoolean rollbackFlag) { 33 ExecutorService executor = Executors.newCachedThreadPool(); 34 Future<String> result = executor.submit(new Callable<String>() { 35 36 @Override 37 public String call() throws Exception { 38 if (rollbackFlag.get()) 39 return "error"; // 如果其他線程已經報錯 就停止線程 40 // 設置一個事務 41 DefaultTransactionDefinition def = new DefaultTransactionDefinition(); 42 def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔離級別,開啟新事務,這樣會比較安全些。 43 TransactionStatus status = transactionManager.getTransaction(def); // 獲得事務狀態 44 try { 45 // 業務處理開始 46 // .............. 47 // 業務處理結束 48 mainThreadLatch.countDown(); 49 rollBackLatch.await();// 線程等待 50 if (rollbackFlag.get()) { 51 transactionManager.rollback(status); 52 } else { 53 transactionManager.commit(status); 54 } 55 return "success"; 56 } catch (Exception e) { 57 e.printStackTrace(); 58 // 如果出錯了 就放開鎖 讓別的線程進入提交/回滾 本線程進行回滾 59 rollbackFlag.set(true); 60 rollBackLatch.countDown(); 61 mainThreadLatch.countDown(); 62 transactionManager.rollback(status); 63 return "操作失敗:" + e.getMessage(); 64 } 65 } 66 67 }); 68 // result.get()阻塞線程 69 return result; 70 } 71 72 public void executor2(CountDownLatch rollBackLatch, CountDownLatch mainThreadLatch, AtomicBoolean rollbackFlag) { 73 ExecutorService executorService = Executors.newCachedThreadPool(); 74 executorService.execute(new Runnable() { 75 76 @Override 77 public void run() { 78 if (rollbackFlag.get()) 79 return; // 如果其他線程已經報錯 就停止線程 80 // 設置一個事務 81 DefaultTransactionDefinition def = new DefaultTransactionDefinition(); 82 def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔離級別,開啟新事務,這樣會比較安全些。 83 TransactionStatus status = transactionManager.getTransaction(def); // 獲得事務狀態 84 try { 85 // 業務處理開始 86 // ..... 87 // 業務處理結束 88 mainThreadLatch.countDown(); 89 rollBackLatch.await();// 線程等待 90 if (rollbackFlag.get()) { 91 transactionManager.rollback(status); 92 } else { 93 transactionManager.commit(status); 94 } 95 } catch (Exception e) { 96 e.printStackTrace(); 97 // 如果出錯了 就放開鎖 讓別的線程進入提交/回滾 本線程進行回滾 98 rollbackFlag.set(true); 99 rollBackLatch.countDown(); 100 mainThreadLatch.countDown(); 101 transactionManager.rollback(status); 102 } 103 } 104 }); 105 }