想在網上找個多線程任務並發的代碼,沒找到,我自己寫了個。
功能:提交一個匿名函數和數據列表,開啟多線程執行此函數處理數據。
核心調用函數
runTask(Consumer task, Consumer finaltask, List datas, int maxThreads)
task 任務函數
finaltask 全部線程結束后執行的函數
datas task.accept 要用到的處理數據列表,成員是 HashMap,必須要有id屬性。
maxThreads 指定並發線程數量
1 package fsrm.tools; 2 3 import java.util.*; 4 import java.util.function.Consumer; 5 6 public class ParallelTask { 7 8 private static int listcount = 0; 9 private static Vector datas = new Vector(); 10 11 public class RunThread extends Thread { 12 Consumer task, fianltask; 13 14 public RunThread(String name, Consumer task, Consumer ftask) { 15 super(name); 16 this.task = task; 17 this.fianltask = ftask; 18 } 19 20 @Override 21 public void interrupt() { 22 super.interrupt(); 23 synchronized (this) { 24 listcount--; 25 if (listcount == 0) { 26 // 我是最后一個了,那么,好吧 27 fianltask.accept(null); 28 } 29 } 30 } 31 32 @Override 33 public void run() { 34 while (!datas.isEmpty()) { 35 HashMap<String, Object> data = (HashMap) datas.remove(0); 36 System.out.println("線程" + getName() + " 開始處理新數據 " + data.get("id")); 37 if (task != null) task.accept(data); 38 System.out.println("線程" + getName() + " 處理數據完成 " + data.get("id")); 39 } 40 synchronized (this) { 41 listcount--; 42 if (listcount == 0) { 43 // 我是最后一個了,那么,好吧 44 fianltask.accept(null); 45 } 46 } 47 } 48 } 49 50 void runTask(Consumer task, Consumer finaltask, List datas, int maxThreads) { 51 ParallelTask.listcount = maxThreads; 52 this.datas.addAll(datas); 53 for (int i = 0; i < maxThreads; i++) { 54 new RunThread(String.valueOf(i), task, finaltask).start(); 55 } 56 } 57 58 public static void main(String[] args) throws Exception { 59 ParallelTask pt = new ParallelTask(); 60 61 ArrayList<Object> datas = new ArrayList(); 62 for (int i = 1; i < 33; i++) { 63 HashMap<String, Object> d = new HashMap<>(); 64 d.put("id", "task_" + i); 65 d.put("data", i * 3); 66 datas.add(d); 67 } 68 69 pt.runTask(o -> { 70 try { 71 System.out.println("do...... " + ((HashMap) o).get("data")); 72 Thread.sleep(Math.round(Math.random() * 60 + 5) * 100); 73 } catch (InterruptedException e) { 74 e.printStackTrace(); 75 } 76 }, o -> { 77 System.out.println("全部工作完成了,歡迎下次再來!!!"); 78 }, datas, 10); 79 } 80 }