多線程事務控制


背景

      在項目中使用多線程抓取第三方數據執行數據入庫時,如果某個子線程執行異常,其他子線事務全部回滾,spring對多線程無法進行事務控制,是因為多線程底層連接數據庫的時候,是使用的線程變量(TheadLocal),線程之間事務隔離,每個線程有自己的連接,事務肯定不是同一個了。

解決辦法

     思想就是使用兩個CountDownLatch實現子線程的二段提交

    步驟:

     1、主線程將任務分發給子線程,然后使用childMonitor.await();阻塞主線程,等待所有子線程處理向數據庫中插入的業務,並使用BlockingDeque<Boolean>存儲線程的返回結果。

     2、使用childMonitor.countDown()釋放子線程鎖定,同時使用mainMonitor.await();阻塞子線程,將程序的控制權交還給主線程。

     3、主線程檢查子線程執行任務的結果,若有失敗結果出現,主線程標記狀態告知子線程回滾,然后使用mainMonitor.countDown();將程序控制權再次交給子線程,子線程檢測回滾標志,判斷是否回滾。

代碼實現

線程池工具類

public class ThreadPoolTool<T> {

    /**
     * 多線程任務
     * @param transactionManager 數據庫事務管理
     * @param data 需要執行的數據集合
     * @param threadCount 核心線程數
     * @param params 其他資源參數
     * @param clazz 具體執行任務的類
     */
    public void excuteTask(DataSourceTransactionManager transactionManager, List data, int threadCount, Map<String, Object> params, Class clazz) {
        if (data == null || data.size() == 0) {
            return;
        }
        int batch = 0;

        ExecutorService executor = Executors.newFixedThreadPool(threadCount);

        //監控子線程的任務執行
        CountDownLatch childMonitor = new CountDownLatch(threadCount);
        //監控主線程,是否需要回滾
        CountDownLatch mainMonitor = new CountDownLatch(1);
        //存儲任務的返回結果,返回true表示不需要回滾,反之,則回滾
        BlockingDeque<Boolean> results = new LinkedBlockingDeque<Boolean>(threadCount);
        RollBack rollback = new RollBack(false);

        try {
            LinkedBlockingQueue<List> queue = splitQueue(data, threadCount);
            while (true) {
                List list = queue.poll();
                if (list == null) {
                    break;
                }
                batch++;
                params.put("batch", batch);
                Constructor constructor = clazz.getConstructor(new Class[]{CountDownLatch.class, CountDownLatch.class, BlockingDeque.class, RollBack.class, DataSourceTransactionManager.class, Object.class, Map.class});

                ThreadTask task = (ThreadTask) constructor.newInstance(childMonitor, mainMonitor, results, rollback, transactionManager, list, params);
                executor.execute(task);
            }

            //   1、主線程將任務分發給子線程,然后使用childMonitor.await();阻塞主線程,等待所有子線程處理向數據庫中插入的業務。
            childMonitor.await();
            System.out.println("主線程開始執行任務");

            //根據返回結果來確定是否回滾
            for (int i = 0; i < threadCount; i++) {
                Boolean result = results.take();
                if (!result) {
                    //有線程執行異常,需要回滾子線程
                    rollback.setNeedRoolBack(true);
                }
            }
            //  3、主線程檢查子線程執行任務的結果,若有失敗結果出現,主線程標記狀態告知子線程回滾,然后使用mainMonitor.countDown();將程序控制權再次交給子線程,子線程檢測回滾標志,判斷是否回滾。
            mainMonitor.countDown();

        } catch (Exception e) {
            log.error(e.getMessage());
        } finally {
            //關閉線程池,釋放資源
            executor.shutdown();
        }

    }

    /**
     * 隊列拆分
     *
     * @param data 需要執行的數據集合
     * @param threadCount 核心線程數
     * @return
     */
    private LinkedBlockingQueue<List<Object>> splitQueue(List<Object> data, int threadCount) {
        LinkedBlockingQueue<List<Object>> queueBatch = new LinkedBlockingQueue();
        int total = data.size();
        int oneSize = total / threadCount;
        int start;
        int end;

        for (int i = 0; i < threadCount; i++) {
            start = i * oneSize;
            end = (i + 1) * oneSize;
            if (i < threadCount - 1) {
                queueBatch.add(data.subList(start, end));
            } else {
                queueBatch.add(data.subList(start, data.size()));
            }
        }
        return queueBatch;
    }
}

 

子線程任務執public abstract class ThreadTask implements Runnable 

/** * 監控子任務的執行 */
    private CountDownLatch childMonitor; /** * 監控主線程 */
    private CountDownLatch mainMonitor; /** * 存儲線程的返回結果 */
    private BlockingDeque<Boolean> resultList; /** * 回滾類 */
    private RollBack rollback; private Map<String,Object> params; protected Object obj; protected DataSourceTransactionManager transactionManager; protected TransactionStatus status; public ThreadTask(CountDownLatch childCountDown, CountDownLatch mainCountDown, BlockingDeque<Boolean> result, RollBack rollback, DataSourceTransactionManager transactionManager, Object obj,Map<String,Object> params) { this.childMonitor = childCountDown; this.mainMonitor = mainCountDown; this.resultList = result; this.rollback = rollback; this.transactionManager = transactionManager; this.obj = obj; this.params = params; initParam(); } /** * 事務回滾 */
    private void rollBack() { System.out.println(Thread.currentThread().getName()+"開始回滾"); transactionManager.rollback(status); } /** * 事務提交 */
    private void submit() { System.out.println(Thread.currentThread().getName()+"提交事務"); transactionManager.commit(status); } protected Object getParam(String key){ return params.get(key); } 
/**
* 初始化方法:作用是把線程池工具任務執行類所需的外部資源通過 ThreadTask.class的構造方法中 Map<String,Obejct> params參數進行初始化傳遞進來
*/
public abstract void initParam(); /** * 執行任務,返回false表示任務執行錯誤,需要回滾 * @return */ public abstract boolean processTask(); @Override public void run() { System.out.println(Thread.currentThread().getName()+"子線程開始執行任務"); DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); status = transactionManager.getTransaction(def); Boolean result = processTask(); //向隊列中添加處理結果 resultList.add(result); //2、使用childMonitor.countDown()釋放子線程鎖定,同時使用mainMonitor.await();阻塞子線程,將程序的控制權交還給主線程。 childMonitor.countDown(); try { //等待主線程的判斷邏輯執行完,執行下面的是否回滾邏輯 mainMonitor.await(); } catch (Exception e) { log.error(e.getMessage()); } System.out.println(Thread.currentThread().getName()+"子線程執行剩下的任務"); //3、主線程檢查子線程執行任務的結果,若有失敗結果出現,主線程標記狀態告知子線程回滾,然后使用mainMonitor.countDown();將程序控制權再次交給子線程,子線程檢測回滾標志,判斷是否回滾。 if (rollback.isNeedRoolBack()) { rollBack(); }else{ //事務提交 submit(); } }

 

回滾標記類

 

@Data
public class RollBack {
    public RollBack(boolean needRoolBack) {
        this.needRoolBack = needRoolBack;
    }

    private boolean needRoolBack;


}

 使用線程池工具:

  1,首先建立自己的任務執行類 並且 extends ThreadTask ,實現initParam()和processTask()方法

/**
 * 多線程處理任務類
 */
public class TestTask extends ThreadTask {

    /**
       分批處理的數據
     */
    private List<Object> objectList;

    /**
     * 可能需要注入的某些服務
     */
    private TestService testService;

    public TestTask(CountDownLatch childCountDown, CountDownLatch mainCountDown, BlockingDeque<Boolean> result, RollBack rollback, DataSourceTransactionManager transactionManager, Object obj, Map<String, Object> params) {
        super(childCountDown, mainCountDown, result, rollback, transactionManager, obj, params);
    }

    @Override
    public void initParam() {
        this.objectList = (List<Object>) getParam("objectList");
        this.testService = (TestService) getParam("testService");
    }


    /**
     * 執行任務,返回false表示任務執行錯誤,需要回滾
     * @return
     */
    @Override
    public boolean processTask() {
        try {
            for (Object o : objectList) {
                testService.list();
                System.out.println(o.toString()+"執行自己的多線程任務邏輯");
            }
            return true;
        } catch (Exception e) {
            return false;
        }
    }

}

2,編寫主任務執行方法

  /**
     * 執行多線程任務方法
     */
    public void testThreadTask() {
        try {
                int threadCount = 5;
                //需要分批處理的數據
                List<Object> objectList = new ArrayList<>();
                Map<String,Object> params = new HashMap<>();
                params.put("objectList",objectList);
                params.put("testService",testService);
                //調用多線程工具方法
                threadPoolTool.excuteTask(transactionManager,objectList,threadCount,params, TestTask.class);
        }catch (Exception e){
            throw new RuntimeException(e.getMessage());
        }
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM