任務類:
import java.util.List; import java.util.Map; public class MyTask implements Runnable { //當前待處理數據集合 private List dataList; //其他參數Map private Map paramMap; public MyTask() { super(); } public MyTask(List dataList, Map paramMap) { super(); this.dataList = dataList; this.paramMap = paramMap; } public List getDataList() { return dataList; } public void setDataList(List dataList) { this.dataList = dataList; } public Map getParamMap() { return paramMap; } public void setParamMap(Map paramMap) { this.paramMap = paramMap; } @Override public void run() { try { long threadStartTime = System.currentTimeMillis(); // System.out.println("--T--線程: {"+Thread.currentThread().getName()+"} -- 開始執行,當前批次數據: {"+dataList.size()+"} 條,線程數:{"+paramMap.get("threadNum")+"},批次數:{"+paramMap.get("batchNum")+"},當前模值: {"+paramMap.get("mod")+"},文檔待處理總文件數:{"+paramMap.get("dataNum")+"},文檔ID:{}"); System.out.println("--T--線程: {"+Thread.currentThread().getName()+"} -- 開始執行,當前批次數據: {"+dataList.size()+"} 條,當前模值: "+paramMap.get("mod")); for (int y = 0; y < dataList.size(); y++) { Object object = dataList.get(y); try { long st = System.currentTimeMillis(); // System.out.println("--T--線程: {"+Thread.currentThread().getName()+"正在處理的數據是:"+object); Thread.sleep(10); } catch (Exception e) { e.printStackTrace(); } } System.out.println("--T--線程: {"+Thread.currentThread().getName()+"} -- 結束執行,當前批次數據: {"+dataList.size()+"} 條,當前模值: {"+paramMap.get("mod")+"},當前線程總耗時:"+(System.currentTimeMillis() - threadStartTime)); } catch (Exception e) { e.printStackTrace(); } } }
測試類:
import java.util.ArrayList; import java.util.HashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class T2 { @SuppressWarnings("unchecked") public static void main(String[] args) { ArrayList<Object> dataList = new ArrayList<>(); for (int i = 0; i < 193570; i++) { dataList.add(i); } long t1 = System.currentTimeMillis(); // 創建線程服務 ExecutorService exec = null; //數據總數 int dataNum = dataList.size(); // 線程數默認為1,數據小於等於100時使用 int threadNum = 1; // 分隔數據的批次數 int batchNum = 1; // 系統能承受最大線程數 batch.maxThreadNum 取自配置文件 // int maxThreadNum = Integer.parseInt(PropertieUtil.getConfig("maxThreadNum")); int maxThreadNum = 100; // 默認一次處理100條左右 int onceNum = 100; if (dataNum <= 100) { batchNum = 1; exec = Executors.newCachedThreadPool(); } else if (100 < dataNum && dataNum <= 10000) { // 批次數不會大於100 batchNum = dataNum / onceNum; if (batchNum > maxThreadNum) { // 設置固定線程數100 threadNum = maxThreadNum; } else { // 線程數等於批次數 threadNum = batchNum; } // 開啟緩存線程池 exec = Executors.newCachedThreadPool(); } else if (dataNum > 10000) { // 計划每批次500條左右 onceNum = 500; // 批次數計算 batchNum = dataNum / onceNum; // bathNum 范圍在20到400之間 if (batchNum > maxThreadNum) { // 設置固定線程數100 threadNum = maxThreadNum; } else { // 線程數等於批次數 threadNum = batchNum; } // 開啟固定線程池 exec = Executors.newFixedThreadPool(threadNum); } System.out.println("--B--預計線程數為:{"+threadNum+"},預計批次數:{"+batchNum+"},總待處理數量為:{"+dataNum+"}"); // 定義多線程相關 // final Semaphore semaphore = new Semaphore(10); // ExecutorService exec = Executors.newCachedThreadPool(); // 處理的文件總數(查詢出的) int sumHandler = 0; // 根據批次數啟動線程 for (int i = 0; i < batchNum; i++) { // 根據線程數和當前模值查出一批數據 ArrayList onceList = new ArrayList(); //根據對分批數量的模值切割數據 for (int j = 0; j < dataNum; j++) { //用數據的id(這里是用數據集合的角標模擬)對 批次數量 取模,進行切分 //【實際項目中這步是用sql語句從數據庫中按照相同的條件查詢數據】 if (j%batchNum==i) { onceList.add(dataList.get(j)); } } // System.out.println("-----主線程中的i:"+i); //每個線程用一個參數Map【注意:這里必須在循環內部new參數map,如果在循環外,會出現問題】 HashMap paramMap = new HashMap(); paramMap.put("dataNum", dataNum); paramMap.put("batchNum", batchNum); paramMap.put("threadNum", threadNum); //當前模值 paramMap.put("mod", i); // 開啟線程 Runnable task = new MyTask(onceList, paramMap); exec.submit(task); //計數 sumHandler += onceList.size(); } exec.shutdown(); // exec.awaitTermination(1, TimeUnit.HOURS); while (true) { if (exec.isTerminated()) { System.out.println("--B--所有子線程都結束了,共計校驗記錄:{"+sumHandler+"}"); break; } } System.out.println("--B--總耗時:"+(System.currentTimeMillis()-t1)); } }
