【原】通過多線程分批處理派發任務


前言:

   前幾天有運營部門人員反應派發紅包很慢,經常出現504或者無響應,於是由我這邊進行一個優化后,發放速度由原來的超時或者1分鍾變為幾秒。

發放流程:

         活動后台導入一個xls表格,大概2W左右條,經過后台的篩選處理等邏輯后會循環調用插入數據庫的代碼。

優化過程:

         分析慢的原因:

                                  發放的時候循環發放,導致發放的紅包一多的時候要循環幾萬次,而且每次插入都是new一個對象,然后往里面set數據,最后調用 jdbcTemplate插入。

          優化思路 :

      1. jdbcTemplate 有個batchUpdate的api,可以通過這個api完成批處理                 

      2. 通過多線程拆分大的單元,類似於 jdk的 forkJoin,然后每個線程處理一批,最后的結果通過回調統一歸並。

   拆分List代碼片段:

 

 抽取出一個公共的接口,用於調用具體的處理方法。

 

public interface I2pTask<T, E> {
    T execute(List e, Map<String, Object> params);
}

 

    由於發放紅包需要實時展示給運營人員看,所以需要有回調處理函數,可以將不同結果的線程收集起來統一給主線程返回,但jdk的Callable又滿足不了,所以得自己新建一個類重寫Callable,這個類只負責調度處理任務以及返回執行任務結果。

 

public class I2pHanderCallable<E> implements Callable<ResultBean> {
    private static Logger logger = LoggerFactory.getLogger(HandleCallable.class);
    // 線程名稱 
    private String threadName = "";
    // 需要處理的數據
    private List<E> data;
    // 輔助參數
    private Map<String, Object> params;
    // 具體執行任務
    private I2pTask<ResultBean<String>, E> task;

    public HandleCallable(String threadName, List<E> data, Map<String, Object> params,
            ITask<ResultBean<String>, E> task) {
        this.threadName = threadName;
        this.data = data;
        this.params = params;
        this.task = task;
    }

    @Override
    public ResultBean<List<ResultBean<String>>> call() throws Exception {
        // 該線程中所有數據處理返回結果
        ResultBean<List<ResultBean<String>>> resultBean = ResultBean.newInstance();
        if (data != null && data.size() > 0) {
            logger.info("線程:{},共處理:{}個數據,開始處理......", threadName, data.size());
            // 返回結果集
            List<ResultBean<String>> resultList = new ArrayList<>();
            resultList.add(task.execute(data, params));
            /*resultList.add(task.execute(data, params));*/
            // 循環處理每個數據
          /*  for (int i = 0; i < data.size(); i++) {
                // 需要執行的數據
                E e = data.get(i);
                // 將數據執行結果加入到結果集中
                resultList.add(task.execute(e, params));
                logger.info("線程:{},第{}個數據,處理完成", threadName, (i + 1));
            }*/
            logger.info("線程:{},共處理:{}個數據,處理完成......", threadName, data.size());
            resultBean.setData(resultList);
            resultBean.setCode(data.size());
        }
        return resultBean;
    }

}
 

      具體的處理線程的類有了,那么還需要考慮來切分任務,所以新建一個 線程工具類,主要業務就是切分任務,創建具體的線程個數,統一收集結果。

public class I2pThreadUtils<T> {
    private static Logger logger = LoggerFactory.getLogger(MultiThreadUtils.class);

    // 線程個數,如不賦值,默認為5
    private int threadCount = 20;
    // 具體業務任務
//    private I2pTask<ResultBean<String>, T> task;
    // 線程池管理器
    private CompletionService<ResultBean> pool = null;

  
    public static I2pThreadUtils newInstance(int threadCount) {
        I2pThreadUtils instance = new I2pThreadUtils();
        threadCount = threadCount;
        instance.setThreadCount(threadCount);
        return instance;
    }


    @SuppressWarnings("rawtypes")
    public ResultBean execute(List<T> data, Map<String, Object> params, ITask<ResultBean<String>, T> task) {
        // 創建線程池
        int num = 0;
        ExecutorService threadpool = Executors.newFixedThreadPool(threadCount);
        // 根據線程池初始化線程池管理器
        pool = new ExecutorCompletionService<ResultBean>(threadpool);
        // 開始時間(ms)
        long l = System.currentTimeMillis();
        // 數據量大小
        int length = data.size();
        // 每個線程處理的數據個數
        int taskCount = length / threadCount;
        // 划分每個線程調用的數據
        for (int i = 0; i < threadCount; i++) {
            // 每個線程任務數據list
            List<T> subData = null;
            if (i == (threadCount - 1)) {
                subData = data.subList(i * taskCount, length);
            } else {
                subData = data.subList(i * taskCount, (i + 1) * taskCount);
            }
            // 將數據分配給各個線程
            HandleCallable execute = new HandleCallable<T>(String.valueOf(i), subData, params, task);
            // 將線程加入到線程池
            pool.submit(execute);
        }

        // 總的返回結果集
        List<ResultBean<String>> result = new ArrayList<>();
        for (int i = 0; i < threadCount; i++) {
            // 每個線程處理結果集
            ResultBean<List<ResultBean<String>>> threadResult;
            try {
                threadResult = pool.take().get();
                if(threadResult!=null && threadResult.getData()!=null){
                    System.out.println("======線程" + i + "執行完畢,返回結果數據:" + threadResult.getCode());
                    List<ResultBean<String>>  list =  threadResult.getData();
                    num+=threadResult.getCode();
                }
                System.out.println("每個線程處理結果集"+threadResult.getData());
                result.addAll(threadResult.getData());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }

        }
        // 關閉線程池
        threadpool.shutdownNow();
        // 執行結束時間
        long end_l = System.currentTimeMillis();
        logger.info("總耗時:{}ms", (end_l - l));
        logger.info("總數量:{}num:", num);
        return ResultBean.newInstance().setData(num);
    }

    public int getThreadCount() {
        return threadCount;
    }

    public void setThreadCount(int threadCount) {
        this.threadCount = threadCount;
    }

}

 以上


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM