package czc.superzig.modular.utils; import java.util.ArrayList; import java.util.List; import java.util.TimerTask; import java.util.concurrent.BlockingDeque; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import org.apache.commons.collections.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.DefaultTransactionDefinition; import com.google.common.collect.Lists; import czc.superzig.common.operatingtable.base.entity.Result; import czc.superzig.common.operatingtable.base.entity.Results; import czc.superzig.modular.system.operatingtable.entity.DetectionIndicator; /** *多線程操作數據庫 其中一個線程發生異常則所有線程發生回滾 * */ public abstract class ThreadUtil<T> { private DataSourceTransactionManager txManager; public abstract void run(T entity); public ThreadUtil(List<T> list,DataSourceTransactionManager txManager){ this.txManager=txManager; createThread(list); } private Result createThread(List<T> list) { Result result = new Result(); //每條線程最小處理任務數 int perThreadHandleCount = 1; //線程池的最大線程數 int nThreads = 10; //任務數 int taskSize = list.size(); if (taskSize > nThreads * perThreadHandleCount) { perThreadHandleCount = taskSize % nThreads == 0 ? taskSize / nThreads : taskSize / nThreads + 1; nThreads = taskSize % perThreadHandleCount == 0 ? taskSize / perThreadHandleCount : taskSize / perThreadHandleCount + 1; } else { nThreads = taskSize; } //主線程的同步計時器 CountDownLatch mainLatch = new CountDownLatch(1); //監控子線程的同步計時器 CountDownLatch threadLatch = new CountDownLatch(nThreads); //根據子線程執行結果判斷是否需要回滾 BlockingDeque<Boolean> resultList = new LinkedBlockingDeque<>(nThreads); //必須要使用對象,如果使用變量會造成線程之間不可共享變量值 RollBack rollBack = new RollBack(false); ExecutorService fixedThreadPool = Executors.newFixedThreadPool(nThreads); List<Future<List<Object>>> futures = Lists.newArrayList(); //返回數據的列表 List<Object> returnDataList = Lists.newArrayList(); //給每個線程分配任務 for (int i = 0; i < nThreads; i++) { int lastIndex = (i + 1) * perThreadHandleCount; List<T> listVos = list.subList(i * perThreadHandleCount, lastIndex >= taskSize ? taskSize : lastIndex); FunctionThread functionThread = new FunctionThread(mainLatch, threadLatch, rollBack, resultList, listVos); Future<List<Object>> future = fixedThreadPool.submit(functionThread); futures.add(future); } /** 存放子線程返回結果. */ List<Boolean> backUpResult = Lists.newArrayList(); try { //等待所有子線程執行完畢 boolean await = threadLatch.await(20, TimeUnit.SECONDS); //如果超時,直接回滾 if (!await) { rollBack.setRollBack(true); } else { //查看執行情況,如果有存在需要回滾的線程,則全部回滾 for (int i = 0; i < nThreads; i++) { Boolean flag = resultList.take(); backUpResult.add(flag); if (flag) { /** 有線程執行異常,需要回滾子線程. */ rollBack.setRollBack(true); } } } } catch (InterruptedException e) { result.setSuccess(false); result.setMsg("等待所有子線程執行完畢時,出現異常,整體回滾"); } finally { //子線程再次開始執行 mainLatch.countDown(); fixedThreadPool.shutdown(); } /** 檢查子線程是否有異常,有異常整體回滾. */ for (int i = 0; i < nThreads; i++) { if (CollectionUtils.isNotEmpty(backUpResult)) { Boolean flag = backUpResult.get(i); if (flag) { result.setSuccess(false); result.setMsg("運行失敗"); } } else { result.setSuccess(false); result.setMsg("運行失敗"); } } //拼接結果 try { for (Future<List<Object>> future : futures) { returnDataList.addAll(future.get()); } } catch (Exception e) { e.printStackTrace(); result.setSuccess(false); result.setMsg("運行失敗子線程正常創建參保人成功,主線程出現異常,回滾失敗"); } if(result.getSuccess()){ result.setData(returnDataList); } return result; } public class FunctionThread implements Callable<List<Object>> { /** * 主線程監控 */ private CountDownLatch mainLatch; /** * 子線程監控 */ private CountDownLatch threadLatch; /** * 是否回滾 */ private RollBack rollBack; private BlockingDeque<Boolean> resultList; private List<T> taskList; public FunctionThread(CountDownLatch mainLatch, CountDownLatch threadLatch, RollBack rollBack, BlockingDeque<Boolean> resultList, List<T> taskList) { this.mainLatch = mainLatch; this.threadLatch = threadLatch; this.rollBack = rollBack; this.resultList = resultList; this.taskList = taskList; } @Override public List<Object> call() throws Exception { //為了保證事務不提交,此處只能調用一個有事務的方法,spring 中事務的顆粒度是方法,只有方法不退出,事務才不會提交 return FunctionTask(mainLatch, threadLatch, rollBack, resultList, taskList); } } public class RollBack { private Boolean isRollBack; public Boolean getRollBack() { return isRollBack; } public void setRollBack(Boolean rollBack) { isRollBack = rollBack; } public RollBack(Boolean isRollBack) { this.isRollBack = isRollBack; } } public List<Object> FunctionTask(CountDownLatch mainLatch, CountDownLatch threadLatch, RollBack rollBack, BlockingDeque<Boolean> resultList, List<T> taskList) { //開啟事務 DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setName(java.util.UUID.randomUUID().toString()); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); // 事物隔離級別,開啟新事務 TransactionStatus status=txManager.getTransaction(def); List<Object> returnDataList = Lists.newArrayList(); Boolean result = false; try { for (T entity : taskList) { //執行業務邏輯 try { run(entity); } catch (Exception e) { result=true; } returnDataList.add(entity); } //Exception 和 Error 都需要抓 } catch (Throwable throwable) { throwable.printStackTrace(); result = true; } //隊列中插入回滾的結果 並對計數器減1 resultList.add(result); threadLatch.countDown(); try { //等待主程序的計數器 mainLatch.await(); } catch (InterruptedException e) { System.err.println("批量操作線程InterruptedException異常"); } if (rollBack.getRollBack()) { System.err.println("批量操作線程回滾"); txManager.rollback(status); }else{ txManager.commit(status); } return returnDataList; } }
1.
CountDownLatch 線程計數器 創建時指定計數的大小 和監控的線程數相同 是同步的
wait方法 等計數為0的時候才能繼續執行下面代碼 並可設置等待時間
countDown 計數減一
用於主線程和多個子線程之間的相互等待
2.阻塞隊列 生產不足的時候 阻塞消費 消費不足的時候 阻塞生產 放置數據丟失的問題