想在网上找个多线程任务并发的代码,没找到,我自己写了个。
功能:提交一个匿名函数和数据列表,开启多线程执行此函数处理数据。
核心调用函数
runTask(Consumer task, Consumer finaltask, List datas, int maxThreads)
task 任务函数
finaltask 全部线程结束后执行的函数
datas task.accept 要用到的处理数据列表,成员是 HashMap,必须要有id属性。
maxThreads 指定并发线程数量
1 package fsrm.tools; 2 3 import java.util.*; 4 import java.util.function.Consumer; 5 6 public class ParallelTask { 7 8 private static int listcount = 0; 9 private static Vector datas = new Vector(); 10 11 public class RunThread extends Thread { 12 Consumer task, fianltask; 13 14 public RunThread(String name, Consumer task, Consumer ftask) { 15 super(name); 16 this.task = task; 17 this.fianltask = ftask; 18 } 19 20 @Override 21 public void interrupt() { 22 super.interrupt(); 23 synchronized (this) { 24 listcount--; 25 if (listcount == 0) { 26 // 我是最后一个了,那么,好吧 27 fianltask.accept(null); 28 } 29 } 30 } 31 32 @Override 33 public void run() { 34 while (!datas.isEmpty()) { 35 HashMap<String, Object> data = (HashMap) datas.remove(0); 36 System.out.println("线程" + getName() + " 开始处理新数据 " + data.get("id")); 37 if (task != null) task.accept(data); 38 System.out.println("线程" + getName() + " 处理数据完成 " + data.get("id")); 39 } 40 synchronized (this) { 41 listcount--; 42 if (listcount == 0) { 43 // 我是最后一个了,那么,好吧 44 fianltask.accept(null); 45 } 46 } 47 } 48 } 49 50 void runTask(Consumer task, Consumer finaltask, List datas, int maxThreads) { 51 ParallelTask.listcount = maxThreads; 52 this.datas.addAll(datas); 53 for (int i = 0; i < maxThreads; i++) { 54 new RunThread(String.valueOf(i), task, finaltask).start(); 55 } 56 } 57 58 public static void main(String[] args) throws Exception { 59 ParallelTask pt = new ParallelTask(); 60 61 ArrayList<Object> datas = new ArrayList(); 62 for (int i = 1; i < 33; i++) { 63 HashMap<String, Object> d = new HashMap<>(); 64 d.put("id", "task_" + i); 65 d.put("data", i * 3); 66 datas.add(d); 67 } 68 69 pt.runTask(o -> { 70 try { 71 System.out.println("do...... " + ((HashMap) o).get("data")); 72 Thread.sleep(Math.round(Math.random() * 60 + 5) * 100); 73 } catch (InterruptedException e) { 74 e.printStackTrace(); 75 } 76 }, o -> { 77 System.out.println("全部工作完成了,欢迎下次再来!!!"); 78 }, datas, 10); 79 } 80 }