多線程操作數據庫 異常拋出全部回滾的問題


package czc.superzig.modular.utils;

import java.util.ArrayList;
import java.util.List;
import java.util.TimerTask;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import com.google.common.collect.Lists;

import czc.superzig.common.operatingtable.base.entity.Result;
import czc.superzig.common.operatingtable.base.entity.Results;
import czc.superzig.modular.system.operatingtable.entity.DetectionIndicator;

/**
 *多線程操作數據庫  其中一個線程發生異常則所有線程發生回滾 
 * 
 */

public abstract class ThreadUtil<T> {
    
    private DataSourceTransactionManager txManager;
    
    public abstract void run(T entity);
    
    public ThreadUtil(List<T> list,DataSourceTransactionManager txManager){
        this.txManager=txManager;
        createThread(list);
    }
    
    
    
    private Result createThread(List<T> list) {
        Result result = new Result();
       
        //每條線程最小處理任務數
        int perThreadHandleCount = 1;
        //線程池的最大線程數
        int nThreads = 10;
        //任務數
        int taskSize = list.size();

        if (taskSize > nThreads * perThreadHandleCount) {
            perThreadHandleCount = taskSize % nThreads == 0 ? taskSize / nThreads : taskSize / nThreads + 1;
            nThreads = taskSize % perThreadHandleCount == 0 ? taskSize / perThreadHandleCount : taskSize / perThreadHandleCount + 1;
        } else {
            nThreads = taskSize;
        }
        //主線程的同步計時器
        CountDownLatch mainLatch = new CountDownLatch(1);
        //監控子線程的同步計時器
        CountDownLatch threadLatch = new CountDownLatch(nThreads);
        //根據子線程執行結果判斷是否需要回滾
        BlockingDeque<Boolean> resultList = new LinkedBlockingDeque<>(nThreads);
        //必須要使用對象,如果使用變量會造成線程之間不可共享變量值
        RollBack rollBack = new RollBack(false);
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(nThreads);

        List<Future<List<Object>>> futures = Lists.newArrayList();
        //返回數據的列表
        List<Object> returnDataList = Lists.newArrayList();
        //給每個線程分配任務
        for (int i = 0; i < nThreads; i++) {
            int lastIndex = (i + 1) * perThreadHandleCount;
            List<T> listVos = list.subList(i * perThreadHandleCount, lastIndex >= taskSize ? taskSize : lastIndex);
            FunctionThread functionThread = new FunctionThread(mainLatch, threadLatch, rollBack, resultList, listVos);
            Future<List<Object>> future = fixedThreadPool.submit(functionThread);
            futures.add(future);
        }

        /** 存放子線程返回結果. */
        List<Boolean> backUpResult = Lists.newArrayList();
        try {
            //等待所有子線程執行完畢
            boolean await = threadLatch.await(20, TimeUnit.SECONDS);
            //如果超時,直接回滾
            if (!await) {
                rollBack.setRollBack(true);
            } else {
                //查看執行情況,如果有存在需要回滾的線程,則全部回滾
                for (int i = 0; i < nThreads; i++) {
                    Boolean flag = resultList.take();
                    backUpResult.add(flag);
                    if (flag) {
                        /** 有線程執行異常,需要回滾子線程. */
                        rollBack.setRollBack(true);
                    }
                }
            }
        } catch (InterruptedException e) {
            result.setSuccess(false);
            result.setMsg("等待所有子線程執行完畢時,出現異常,整體回滾");
        } finally {
            //子線程再次開始執行
            mainLatch.countDown();
            fixedThreadPool.shutdown();
        }

        /** 檢查子線程是否有異常,有異常整體回滾. */
        for (int i = 0; i < nThreads; i++) {
            if (CollectionUtils.isNotEmpty(backUpResult)) {
                Boolean flag = backUpResult.get(i);
                if (flag) {
                    result.setSuccess(false);
                    result.setMsg("運行失敗");
                }
            } else {
                result.setSuccess(false);
                result.setMsg("運行失敗");
            }
        }

        //拼接結果
        try {
            for (Future<List<Object>> future : futures) {
                returnDataList.addAll(future.get());
            }
        } catch (Exception e) {
            e.printStackTrace();
            result.setSuccess(false);
            result.setMsg("運行失敗子線程正常創建參保人成功,主線程出現異常,回滾失敗");
        }
        if(result.getSuccess()){
            result.setData(returnDataList);
        }
        return result;
    }

    public class FunctionThread implements Callable<List<Object>> {
        /**
         * 主線程監控
         */
        private CountDownLatch mainLatch;
        /**
         * 子線程監控
         */
        private CountDownLatch threadLatch;
        /**
         * 是否回滾
         */
        private RollBack rollBack;
        private BlockingDeque<Boolean> resultList;
        private List<T> taskList;

        public FunctionThread(CountDownLatch mainLatch, CountDownLatch threadLatch, RollBack rollBack, BlockingDeque<Boolean> resultList, List<T> taskList) {
            this.mainLatch = mainLatch;
            this.threadLatch = threadLatch;
            this.rollBack = rollBack;
            this.resultList = resultList;
            this.taskList = taskList;
        }

        @Override
        public List<Object> call() throws Exception {
            //為了保證事務不提交,此處只能調用一個有事務的方法,spring 中事務的顆粒度是方法,只有方法不退出,事務才不會提交
            return FunctionTask(mainLatch, threadLatch, rollBack, resultList, taskList);
        }

    }

    public class RollBack {
        private Boolean isRollBack;

        public Boolean getRollBack() {
            return isRollBack;
        }

        public void setRollBack(Boolean rollBack) {
            isRollBack = rollBack;
        }

        public RollBack(Boolean isRollBack) {
            this.isRollBack = isRollBack;
        }
    }

    public List<Object> FunctionTask(CountDownLatch mainLatch, CountDownLatch threadLatch, RollBack rollBack, BlockingDeque<Boolean> resultList, List<T> taskList) {
        //開啟事務
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setName(java.util.UUID.randomUUID().toString());
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); // 事物隔離級別,開啟新事務
        TransactionStatus status=txManager.getTransaction(def);
        
        List<Object> returnDataList = Lists.newArrayList();
        Boolean result = false;
        try {
            for (T entity : taskList) {
                //執行業務邏輯
                try {
                    run(entity);
                } catch (Exception e) {
                    result=true;
                }
                returnDataList.add(entity);
            }
            //Exception 和 Error 都需要抓
        } catch (Throwable throwable) {
            throwable.printStackTrace();
            result = true;
        }
        //隊列中插入回滾的結果 並對計數器減1
        resultList.add(result);
        threadLatch.countDown();

        try {
            //等待主程序的計數器
            mainLatch.await();
        } catch (InterruptedException e) {
            System.err.println("批量操作線程InterruptedException異常");
        }

        if (rollBack.getRollBack()) {
             System.err.println("批量操作線程回滾");
             txManager.rollback(status);
        }else{
            txManager.commit(status);
        }
        return returnDataList;
    }

}

 1.

CountDownLatch 線程計數器 創建時指定計數的大小 和監控的線程數相同  是同步的
wait方法 等計數為0的時候才能繼續執行下面代碼 並可設置等待時間
countDown 計數減一
用於主線程和多個子線程之間的相互等待

2.阻塞隊列 生產不足的時候 阻塞消費 消費不足的時候 阻塞生產 放置數據丟失的問題


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM