切分大任務成多個子任務(事務),匯總后統一提交或回滾


示例代碼可以從github上獲取  https://github.com/git-simm/simm-framework.git
一、業務場景:
  系統中存在一個盤庫的功能,用戶一次盤庫形成一兩萬條的盤庫明細單,一次性提交給服務器進行處理。服務器性能比較優越,平均也得運行30秒左右。性能上需要進行優化。
 
二、處理方案:
  做過代碼分析后,發現單線程邏輯沒有什么優化空間。開始考慮引入多線程處理模型,用10個子線程進行任務切分處理。切分子線程問題需要考慮事務的一致性。10個子線程對應10個事務,需要保證所有事務一起提交或一起回滾。這里使用synchronized(wait,notifyall)機制做線程協作。
 
三、代碼實現:
   3.1、添加一個多線程協作標志類,用於做子線程運行狀態統計,通知子線程做事務提交還是回滾的操作;
package simm.framework.threadutils.multi;

import java.util.UUID;

/**
 * 多線程結束標志
 * 2018.09.22 by simm
 */
public class MultiEndFlag {
    private volatile boolean fired = false;
    //是否執行成功
    private volatile boolean isAllSuccess = false;
    private volatile int threadCount = 0;
    private volatile int failCount = 0;

    /**
     * 初始化子線程的總數
     * @param count
     */
    public MultiEndFlag(int count){
        threadCount = count;
    }

    public boolean isAllSuccess() {
        return isAllSuccess;
    }

    /**
     * 等待全部結束
     * @param threadId
     * @param result
     */
    public synchronized void waitForEnd(UUID threadId,int result){
        //統計失敗的線程個數
        if(result==0){
            failCount++;
        }
        threadCount--;
        while (!fired){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 執行結束通知
     */
    public synchronized void go(){
        fired = true;
        //結果都顯示成功
        isAllSuccess = (failCount == 0);
        notifyAll();
    }
    /**
     * 等待結束
     */
    public void end(){
        while (threadCount > 0){
            waitFunc(50);
        }
        System.out.println("線程全部執行完畢通知");
        go();
    }

    /**
     * 等待
     */
    private void waitFunc(long millis){
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

   3.2、提供一個數據保存服務的接口定義,一個默認的子線程任務執行類(需要接收數據保存服務實現,業務數據,協作標志變量);

package simm.framework.threadutils.multi;

import java.util.List;
import java.util.UUID;

/**
 * 保存服務接口
 * 2018.09.22 by simm
 * @param <T>
 */
public interface ISaveService<T> {
    /**
     * 子線程批量保存方法
     * @param list
     * @param endFlag
     * @param threadId
     * @return
     * @throws Exception
     */
    Integer batchSave(List<T> list, MultiEndFlag endFlag, UUID threadId) throws Exception;
}
package simm.framework.threadutils.multi;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;

/**
 * 默認的執行任務
 * 2018.09.22 by simm
 */
public class DefaultExecTask<T> implements Callable<Integer> {
    private List<T> list;
    private ISaveService saveService;
    private MultiEndFlag endFlag;
    private UUID threadId;
    /**
     * 盤庫子任務
     * @param saveService
     * @param notes
     * @param flag
     */
    public DefaultExecTask(ISaveService saveService, List<T> notes, MultiEndFlag flag){
        this.saveService = saveService;
        this.list = notes;
        this.endFlag = flag;
        this.threadId = UUID.randomUUID();
    }
    @Override
    public Integer call() throws Exception {
        return saveService.batchSave(this.list,this.endFlag,this.threadId);
    }
}

 

    3.3、實現最核心的線程池分發子線程,並匯總結果通知子線程事務做最終的提交或回滾。線程池使用定長池 newFixedThreadPool,子線程使用futureTask,可接收返回值和異常信息。
package simm.framework.threadutils.multi;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 多線程切分執行器
 * 2018.09.22 by simm
 */
public class MultiExecutor {
    private static int maxThreadCount = 10;
    /**
     * 執行方法(分批創建子線程)
     * @param saveService
     * @param notes
     * @param groupLen
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static <T> Boolean exec(ISaveService saveService,List<T> notes,int groupLen) throws ExecutionException, InterruptedException {
        if(notes==null || notes.size()==0) return true;
        //創建一個線程池,最大10個線程
        ExecutorService executorService = Executors.newFixedThreadPool(maxThreadCount);
        List<Future<Integer>> futures = new ArrayList<>();
        int noteSize = notes.size();
        int batches = (int) Math.ceil(noteSize * 1.0 /groupLen);
        //分組超長最大線程限制,則設置分組數為10,計算分組集合尺寸
        if(batches>maxThreadCount){
            batches = maxThreadCount;
            groupLen = (int) Math.ceil(noteSize * 1.0 /batches);
        }
        System.out.println("總長度:"+noteSize+"  批次信息:"+batches+"  分組長度:"+groupLen);
        MultiEndFlag flag = new MultiEndFlag(batches);
        int startIndex, toIndex, maxIndex = notes.size();
        for(int i=0;i<batches;i++){
            startIndex = i * groupLen;
            toIndex = startIndex + groupLen;
            if(toIndex> maxIndex) {
                toIndex = maxIndex;
            }
            List<T> temp = notes.subList(startIndex,toIndex);
            if(temp == null || temp.size()==0) continue;
            futures.add(executorService.submit(new DefaultExecTask(saveService,temp,flag)));
        }
        flag.end();
        //子線程全部等待返回(存在異常,則直接拋向主線程)
        for(Future<Integer> future:futures){
            future.get();
        }
        //所有線程返回后,關閉線程池
        executorService.shutdown();
        return true;
    }
}
 
四、給出一個調用偽代碼。需要注意的一點,子線程開啟事務,這里使用@Transactional聲明式事務,這要求服務的實體類需要通過spring的bean工廠創建,得到一個動態代理類,以達到支持事務攔截器的目的,保證注解的有效性。
package multi;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import simm.framework.threadutils.multi.DefaultExecTask;
import simm.framework.threadutils.multi.ISaveService;
import simm.framework.threadutils.multi.MultiEndFlag;
import simm.framework.threadutils.multi.MultiExecutor;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;

/**
 * 損益單保存服務
 */
@Service
public class DemoService implements ISaveService<NoteCheckBalance> {
    private static final Logger logger = LoggerFactory.getLogger(DefaultExecTask.class);
    @Autowired
    private NoteCheckBalanceMapper noteCheckBalanceMapper;

    /**
     * 業務保存
     * @param list
     */
    public void save(List<NoteCheckBalance> list){
        for(NoteCheckBalance item :list){
            noteCheckBalanceMapper.insert(item);
        }
    }
    /**
     * 批量保存事件
     */
    @Transactional(rollbackFor = Exception.class)
    @Override
    public Integer batchSave(List<NoteCheckBalance> list, MultiEndFlag endFlag, UUID threadId) throws Exception {
        int result = 0;
        try{
            //業務操作
            save(list);
            result = 1;
            //進行waitForEnd 操作,是為了確保所有的線程都最終通知同步協作標志
            endFlag.waitForEnd(threadId ,result);
            //其他線程異常手工回滾
            if(result==1 && !endFlag.isAllSuccess()){
                String message = "子線程未全部執行成功,對線程["+threadId+"]進行回滾";
                throw new Exception(message);
            }
            return result;
        }catch (Exception ex){
            logger.error(ex.toString());
            if(result ==0){
                //本身線程異常拋出異常,通知已經做完(判斷是為了防止 與 try塊中的通知重復)
                endFlag.waitForEnd(threadId ,result);
            }
            throw ex;
        }
    }

    /**
     * 調用示例
     * @param args
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //調用示例
        MultiExecutor.exec(new DemoService(), new ArrayList<NoteCheckBalance>(),500);
    }
}
 
參考文章


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM