多線程分批處理集合(可擴展為分批從數據庫中讀取數據)的測試一例子【我】


 

 

任務類:

import java.util.List;
import java.util.Map;


public class MyTask implements Runnable {
    
    //當前待處理數據集合
    private List dataList;
    //其他參數Map
    private Map paramMap;
    
    public MyTask() {
        super();
    }

    public MyTask(List dataList, Map paramMap) {
        super();
        this.dataList = dataList;
        this.paramMap = paramMap;
    }

    public List getDataList() {
        return dataList;
    }

    public void setDataList(List dataList) {
        this.dataList = dataList;
    }

    public Map getParamMap() {
        return paramMap;
    }

    public void setParamMap(Map paramMap) {
        this.paramMap = paramMap;
    }


    @Override
    public void run() {
        try {
            
            long threadStartTime = System.currentTimeMillis();
//            System.out.println("--T--線程: {"+Thread.currentThread().getName()+"} -- 開始執行,當前批次數據: {"+dataList.size()+"} 條,線程數:{"+paramMap.get("threadNum")+"},批次數:{"+paramMap.get("batchNum")+"},當前模值: {"+paramMap.get("mod")+"},文檔待處理總文件數:{"+paramMap.get("dataNum")+"},文檔ID:{}");
            System.out.println("--T--線程: {"+Thread.currentThread().getName()+"} -- 開始執行,當前批次數據: {"+dataList.size()+"} 條,當前模值: "+paramMap.get("mod"));
            for (int y = 0; y < dataList.size(); y++) {
            
                Object object = dataList.get(y);
                
                try {
                    long st = System.currentTimeMillis();
//                    System.out.println("--T--線程: {"+Thread.currentThread().getName()+"正在處理的數據是:"+object);
                    Thread.sleep(10);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            System.out.println("--T--線程: {"+Thread.currentThread().getName()+"} -- 結束執行,當前批次數據: {"+dataList.size()+"} 條,當前模值: {"+paramMap.get("mod")+"},當前線程總耗時:"+(System.currentTimeMillis() - threadStartTime));
        } catch (Exception e) {
            e.printStackTrace();
        }
    
    }

}

 

測試類:

import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class T2 {

    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        
        ArrayList<Object> dataList = new ArrayList<>();
        for (int i = 0; i < 193570; i++) {
            dataList.add(i);
        }
        
        long t1 = System.currentTimeMillis();
        // 創建線程服務
        ExecutorService exec = null;
        
        //數據總數
        int dataNum = dataList.size();
        
        // 線程數默認為1,數據小於等於100時使用
        int threadNum = 1;
        // 分隔數據的批次數
        int batchNum = 1;
        // 系統能承受最大線程數 batch.maxThreadNum 取自配置文件
//        int maxThreadNum = Integer.parseInt(PropertieUtil.getConfig("maxThreadNum"));
        int maxThreadNum = 100;
        // 默認一次處理100條左右
        int onceNum = 100;

        if (dataNum <= 100) {
            batchNum = 1;
            exec = Executors.newCachedThreadPool();
        } else if (100 < dataNum && dataNum <= 10000) {

            // 批次數不會大於100
            batchNum = dataNum / onceNum;

            if (batchNum > maxThreadNum) {
                // 設置固定線程數100
                threadNum = maxThreadNum;
            } else {
                // 線程數等於批次數
                threadNum = batchNum;
            }
            // 開啟緩存線程池
            exec = Executors.newCachedThreadPool();
        } else if (dataNum > 10000) {
            // 計划每批次500條左右
            onceNum = 500;
            // 批次數計算
            batchNum = dataNum / onceNum; // bathNum 范圍在20到400之間
            if (batchNum > maxThreadNum) {
                // 設置固定線程數100
                threadNum = maxThreadNum;
            } else {
                // 線程數等於批次數
                threadNum = batchNum;
            }
            // 開啟固定線程池
            exec = Executors.newFixedThreadPool(threadNum);
        }

        System.out.println("--B--預計線程數為:{"+threadNum+"},預計批次數:{"+batchNum+"},總待處理數量為:{"+dataNum+"}");
        // 定義多線程相關
        // final Semaphore semaphore = new Semaphore(10);
        // ExecutorService exec = Executors.newCachedThreadPool();

        // 處理的文件總數(查詢出的)
        int sumHandler = 0;
        
        // 根據批次數啟動線程
        for (int i = 0; i < batchNum; i++) {
            // 根據線程數和當前模值查出一批數據
            ArrayList onceList = new ArrayList();
            //根據對分批數量的模值切割數據
            for (int j = 0; j < dataNum; j++) {
                //用數據的id(這里是用數據集合的角標模擬)對 批次數量 取模,進行切分
                //【實際項目中這步是用sql語句從數據庫中按照相同的條件查詢數據】
                if (j%batchNum==i) {
                    onceList.add(dataList.get(j));
                }
            }
//            System.out.println("-----主線程中的i:"+i);
            //每個線程用一個參數Map【注意:這里必須在循環內部new參數map,如果在循環外,會出現問題】
            HashMap paramMap = new HashMap();
            paramMap.put("dataNum", dataNum);
            paramMap.put("batchNum", batchNum);
            paramMap.put("threadNum", threadNum);
            //當前模值
            paramMap.put("mod", i);
            // 開啟線程
            Runnable task = new MyTask(onceList, paramMap);
            exec.submit(task);
            //計數
            sumHandler += onceList.size();
        }
        exec.shutdown();
        // exec.awaitTermination(1, TimeUnit.HOURS);
        while (true) {
            if (exec.isTerminated()) {
                System.out.println("--B--所有子線程都結束了,共計校驗記錄:{"+sumHandler+"}");
                break;
            }
        }
        System.out.println("--B--總耗時:"+(System.currentTimeMillis()-t1));
    }
    
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM