摘自馬士兵java並發編程
一、認識Executor、ExecutorService、Callable、Executors

/** * 認識Executor */ package yxxy.c_026; import java.util.concurrent.Executor; public class T01_MyExecutor implements Executor { public static void main(String[] args) { new T01_MyExecutor().execute(new Runnable(){ @Override public void run() { System.out.println("hello executor"); } }); } @Override public void execute(Runnable command) { //new Thread(command).run(); command.run(); } }
Executor執行器是一個接口,只有一個方法execute執行任務,在java的線程池的框架里邊,這個是最頂層的接口;
ExecutorService:從Executor接口繼承。
Callable:里面call方法,和Runnable接口很像,設計出來都是被其他線程調用的;但是Runnable接口里面run方法是沒有返回值的也不能拋出異常;而call方法有返回值可以拋異常;
Executors: 操作Executor的一個工具類;以及操作ExecutorService,ThreadFactory,Callable等;
二、ThreadPool:

/** * 線程池的概念 */ package yxxy.c_026; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class T05_ThreadPool { public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newFixedThreadPool(5); //execute submit for (int i = 0; i < 6; i++) { service.execute(() -> { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }); } System.out.println(service); service.shutdown(); System.out.println(service.isTerminated()); System.out.println(service.isShutdown()); System.out.println(service); TimeUnit.SECONDS.sleep(5); System.out.println(service.isTerminated()); System.out.println(service.isShutdown()); System.out.println(service); } }
console:

java.util.concurrent.ThreadPoolExecutor@53d8d10a[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0] false true java.util.concurrent.ThreadPoolExecutor@53d8d10a[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0] pool-1-thread-1 pool-1-thread-3 pool-1-thread-2 pool-1-thread-5 pool-1-thread-4 pool-1-thread-1 true true java.util.concurrent.ThreadPoolExecutor@53d8d10a[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
創建了一個線程池,扔了5個線程,接下來要執行6個任務,扔進去線程池里面就啟一個線程幫你執行一個,因為這里最多就起5個線程,接下來扔第6個任務的時候,不好意思,它排隊了,排在線程池所維護的一個任務隊列里面,任務隊列大多數使用的都是BlockingQueue,這是線程池的概念;
有什么好處?好處在於如果這個任務執行完了,這個線程不會消失,它執行完任務空閑下來了,如果有新的任務來的時候,直接交給這個線程來運行就行了,不需要新啟動線程;從這個概念上講,如果你的任務和線程池線程數量控制的比較好的情況下,你不需要啟動新的線程就能執行很多很多的任務,效率會比較高,並發性好;
service.shutdown():關閉線程池,shutdown是正常的關閉,它會等所有的任務都執行完才會關閉掉;還有一個是shutdownNow,二話不說直接就給關了,不管線程有沒有執行完;
service.isTerminated(): 代表的是這里所有執行的任務是不是都執行完了。isShutdown()為true,注意它關了但並不代表它執行完了,只是代表正在關閉的過程之中(注意打印Shutting down)
打印5個線程名字,而且第一個線程執行完了之后,第6個任務來了,第1個線程繼續執行,不會有線程6;
當所有線程全部執行完畢之后,線程池的狀態為Terminated,表示正常結束,complete tasks=6
線程池里面維護了很多線程,等着你往里扔任務,而扔任務的時候它可以維護着一個任務列表,還沒有被執行的任務列表,同樣的它還維護着另外一個隊列,complete tasks,結束的任務隊列,任務執行結束扔到這個隊列里,所以,一個線程池維護着兩個隊列;
三、Future

/** * 認識future */ package yxxy.c_026; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; public class T06_Future { public static void main(String[] args) throws InterruptedException, ExecutionException { /*FutureTask<Integer> task = new FutureTask<Integer>(new Callable<Integer>(){ @Override public Integer call() throws Exception { TimeUnit.MILLISECONDS.sleep(3000); return 1000; } });*/ FutureTask<Integer> task = new FutureTask<>(()->{ TimeUnit.MILLISECONDS.sleep(3000); return 1000; }); new Thread(task).start(); System.out.println(task.get()); //阻塞 //******************************* ExecutorService service = Executors.newFixedThreadPool(5); Future<Integer> f = service.submit(()->{ TimeUnit.MILLISECONDS.sleep(5000); return 1; }); System.out.println(f.isDone()); System.out.println(f.get()); System.out.println(f.isDone()); } }

1000 false 1 true
Future: ExecutorService里面有submit方法,它的返回值是Future類型,因為你扔一個任務進去需要執行一段時間,未來的某一個時間點上,任務執行完了產生給你一個結果,這個Future代表的就是那個Callable的返回值;
四、並行計算的例子:

/** * 線程池的概念 * nasa */ package yxxy.c_026; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class T07_ParallelComputing { public static void main(String[] args) throws InterruptedException, ExecutionException { long start = System.currentTimeMillis(); List<Integer> results = getPrime(1, 200000); long end = System.currentTimeMillis(); System.out.println(end - start); final int cpuCoreNum = 4; ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum); MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20 MyTask t2 = new MyTask(80001, 130000); MyTask t3 = new MyTask(130001, 170000); MyTask t4 = new MyTask(170001, 200000); Future<List<Integer>> f1 = service.submit(t1); Future<List<Integer>> f2 = service.submit(t2); Future<List<Integer>> f3 = service.submit(t3); Future<List<Integer>> f4 = service.submit(t4); start = System.currentTimeMillis(); f1.get(); f2.get(); f3.get(); f4.get(); end = System.currentTimeMillis(); System.out.println(end - start); } static class MyTask implements Callable<List<Integer>> { int startPos, endPos; MyTask(int s, int e) { this.startPos = s; this.endPos = e; } @Override public List<Integer> call() throws Exception { List<Integer> r = getPrime(startPos, endPos); return r; } } //判斷是否是質數 static boolean isPrime(int num) { for(int i=2; i<=num/2; i++) { if(num % i == 0) return false; } return true; } static List<Integer> getPrime(int start, int end) { List<Integer> results = new ArrayList<>(); for(int i=start; i<=end; i++) { if(isPrime(i)) results.add(i); } return results; } }
console:

2280 818
第二種方式使用了一個線程池,一般線程池有多少個線程,數量多少合適是需要調整的,大多數情況下cpu有幾個核至少就應該起多少個線程,可以多起一個但不能少於cpu核數,將20萬分成了4段;
這里為什么不將20萬平均分呢?
五、CachedThreadPool

package yxxy.c_026; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class T08_CachedPool { public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newCachedThreadPool(); System.out.println(service); for (int i = 0; i < 2; i++) { service.execute(() -> { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }); } System.out.println(service); TimeUnit.SECONDS.sleep(80); //cachedthreadPool里面的線程空閑狀態默認60s后銷毀,這里保險起見 System.out.println(service); } }
console:

java.util.concurrent.ThreadPoolExecutor@7852e922[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] java.util.concurrent.ThreadPoolExecutor@7852e922[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0] pool-1-thread-2 pool-1-thread-1 java.util.concurrent.ThreadPoolExecutor@7852e922[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]
FixedThreadPool為固定個數的線程池;
CachedThreadPool:剛開始一個線程都沒有,來一個任務就起一個線程,假設起了兩個線程A,B,如果來了第三個任務,這時候恰好線程B任務執行完了,線程池里面有空閑的,這時候直接讓線程池里空閑的線程B來執行;最多起多少個線程?你的系統能支撐多少個為止;默認的情況下,只要一個線程空閑的狀態超過60s,這個線程就自動的銷毀了,alivetime=60s;這個值也可以自己指定。
六、SingleThreadExecutor

package yxxy.c_026; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class T09_SingleThreadPool { public static void main(String[] args) { ExecutorService service = Executors.newSingleThreadExecutor(); for(int i=0; i<5; i++) { final int j = i; service.execute(()->{ System.out.println(j + " " + Thread.currentThread().getName()); }); } } }
console:

0 pool-1-thread-1 1 pool-1-thread-1 2 pool-1-thread-1 3 pool-1-thread-1 4 pool-1-thread-1
SingleThreadExecutor:線程池里就1個線程;扔5個任務,也永遠只有1個線程執行;
它能保證任務前后一定是順序執行,先扔的任務一定先執行完;只有等第一個任務執行完才執行第二個任務。
七、ScheduledThreadPool

package yxxy.c_026; import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class T10_ScheduledPool { public static void main(String[] args) { ScheduledExecutorService service = Executors.newScheduledThreadPool(4); service.scheduleAtFixedRate(()->{ try { TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }, 0, 500, TimeUnit.MILLISECONDS); } }
ScheduledThreadPool: 執行定時的任務,定時器線程池,一般可以用來替代timer,而且它里面的線程是可以復用的,第一個線程執行完了之后,任務來了如果第一個線程是空閑的,還可以拿第一個線程來執行。而Timer每次都是new一個新的線程。
scheduleAtFixedRate(
Runnable command, long initialDelay, long period,
TimeUnit unit),第1個參數是任務,第1個任務馬上執行,每隔500毫秒這個任務重復執行。
八、WorkStealingPool

/** * */ package yxxy.c_026; import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class T11_WorkStealingPool { public static void main(String[] args) throws IOException { ExecutorService service = Executors.newWorkStealingPool(); int count = Runtime.getRuntime().availableProcessors(); //看cpu多少核的;如果是4核,默認就幫你起4個線程 System.out.println(count); service.execute(new R(1000)); for(int i=0; i<count; i++){ service.execute(new R(2000)); } //由於產生的是精靈線程(守護線程、后台線程),主線程不阻塞的話,看不到輸出 System.in.read(); } static class R implements Runnable { int time; R(int t) { this.time = t; } @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(time + " " + Thread.currentThread().getName()); } } }
console:

8 1000 ForkJoinPool-1-worker-1 2000 ForkJoinPool-1-worker-2 2000 ForkJoinPool-1-worker-0 2000 ForkJoinPool-1-worker-5 2000 ForkJoinPool-1-worker-3 2000 ForkJoinPool-1-worker-6 2000 ForkJoinPool-1-worker-7 2000 ForkJoinPool-1-worker-4 2000 ForkJoinPool-1-worker-1
WorkStealingPool:工作竊取,假設有3個線程A、B、C在運行,workStealing可以簡單這么認為,每個線程都維護自己的一個隊列,線程A的隊列里頭積累了5個任務,線程B的隊列里1個任務,C的隊列里2個任務;那么當線程B執行完任務之后,他會去別的線程池所維護的隊列里面把任務偷過來繼續執行,主動的找活干。
本質上是使用ForkJoinPool來實現的:

public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
例子解釋:cpu多少核默認的起多少個線程,(這里是8),前面幾個任務都扔給1-8個線程了,第9個任務來的時候在那里等着了,誰會去執行它呢?先執行完任務的這個線程會去執行。第1個線程只睡1s鍾,首先執行完,所以第9個任務一定是第一個線程1去運行它,他會主動的把任務拿過去運行。
workStealing的線程是精靈線程,daemon線程,特點就是主線程main方法一旦結束了,它后台可能還在運行,但是你是看不到它任務輸出的;這里Syetem.in.read()讓主函數阻塞才能看到輸出。debug的時候能看到Daemon Thread[ForkJoinPool-1-worker-1]。為什么用精靈線程?它是在后台不斷的運行的,只要虛擬機不退出,這個線程就不會退出,你有任務來了之后,它永遠會主動去拿。
workStealing用於什么場景:就說任務分配的不是很均勻,有的線程維護的任務隊列比較長,有些線程執行完任務就結束了不太合適,所以他執行完了之后可以去別的線程維護的隊列里去偷任務,這樣效率更高。
九、ForkJoinPool
ForkJoinPool: forkjoin的意思就是如果有一個難以完成的大任務,需要計算量特別大,時間特別長,可以把大任務切分成一個個小任務,如果小任務還是太大,它還可以繼續分,至於分成多少你可以自己指定,... 分完之后,把結果進行合並,最后合並到一起join一起,產生一個總的結果。而里面任務的切分你可以自己指定,線程的啟動根據你任務切分的規則,由ForkJoinPool這個線程池自己來維護。

例子1:

package yxxy.c_026; import java.io.IOException; import java.util.Arrays; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.RecursiveTask; public class T12_ForkJoinPool { static int[] nums = new int[1000000]; static final int MAX_NUM = 50000; static Random r = new Random(); static { for(int i=0; i<nums.length; i++) { nums[i] = r.nextInt(100); } System.out.println(Arrays.stream(nums).sum()); //stream api } static class AddTask extends RecursiveAction { int start, end; AddTask(int s, int e) { start = s; end = e; } @Override protected void compute() { if(end-start <= MAX_NUM) { long sum = 0L; for(int i=start; i<end; i++) sum += nums[i]; System.out.println("from:" + start + " to:" + end + " = " + sum); } else { int middle = start + (end-start)/2; AddTask subTask1 = new AddTask(start, middle); AddTask subTask2 = new AddTask(middle, end); subTask1.fork(); subTask2.fork(); } } } public static void main(String[] args) throws IOException { ForkJoinPool fjp = new ForkJoinPool(); AddTask task = new AddTask(0, nums.length); fjp.execute(task); System.in.read(); } }
console:

49494882 from:906250 to:937500 = 1545274 from:968750 to:1000000 = 1537201 from:593750 to:625000 = 1548289 from:718750 to:750000 = 1546396 from:468750 to:500000 = 1550373 from:843750 to:875000 = 1543421 from:218750 to:250000 = 1549856 from:93750 to:125000 = 1548384 from:562500 to:593750 = 1541814 from:812500 to:843750 = 1547885 from:187500 to:218750 = 1546831 from:687500 to:718750 = 1554064 from:437500 to:468750 = 1547434 from:937500 to:968750 = 1547676 from:875000 to:906250 = 1551839 from:62500 to:93750 = 1548576 from:531250 to:562500 = 1550943 from:656250 to:687500 = 1544991 from:156250 to:187500 = 1548367 from:406250 to:437500 = 1539881 from:125000 to:156250 = 1548128 from:500000 to:531250 = 1545229 from:781250 to:812500 = 1544296 from:625000 to:656250 = 1545283 from:375000 to:406250 = 1553931 from:31250 to:62500 = 1544024 from:750000 to:781250 = 1543573 from:343750 to:375000 = 1546407 from:0 to:31250 = 1539743 from:281250 to:312500 = 1549470 from:312500 to:343750 = 1552190 from:250000 to:281250 = 1543113
例子解釋:
對數組中100萬個數求和計算,第一種方式是普通的將所有數加在一起(for循環);
第二種方式使用ForkJoinPool計算,分而治之,它里面執行的任務必須是ForkJoinTask,這個任務可以自動進行切分,一般用的時候從RecursiveAction或RecursiveTask繼承,RecursiveTask遞歸任務,因為它切分任務還可以在切分。RecursiveAction沒有返回值,RecursiveTask有返回值。
例子2:

package yxxy.c_026; import java.io.IOException; import java.util.Arrays; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.RecursiveTask; public class T12_ForkJoinPool { static int[] nums = new int[1000000]; static final int MAX_NUM = 50000; static Random r = new Random(); static { for(int i=0; i<nums.length; i++) { nums[i] = r.nextInt(100); } System.out.println(Arrays.stream(nums).sum()); //stream api } static class AddTask extends RecursiveTask<Long> { int start, end; AddTask(int s, int e) { start = s; end = e; } @Override protected Long compute() { if(end-start <= MAX_NUM) { long sum = 0L; for(int i=start; i<end; i++) sum += nums[i]; return sum; } int middle = start + (end-start)/2; AddTask subTask1 = new AddTask(start, middle); AddTask subTask2 = new AddTask(middle, end); subTask1.fork(); subTask2.fork(); return subTask1.join() + subTask2.join(); } } public static void main(String[] args) throws IOException { ForkJoinPool fjp = new ForkJoinPool(); AddTask task = new AddTask(0, nums.length); fjp.execute(task); long result = task.join(); System.out.println(result); } }
console:

49498457 49498457
和例子1差不多,唯一的區別是有返回值了,RecursiveTask<V>中的V泛型就是返回值類型。
long result = task.join(),因為join本身就是阻塞的,只有等所有的都執行完了,最后才得出總的執行結果。所以不需要System.in.read了;
十、自定義線程池 ThreadPoolExecutor
ThreadPoolExecutor:大多數的線程池的實現背后調用的都是ThreadPoolExecutor(前面6種就ForkJoinPool不是),它是線程池通用的一個類,可以自定義線程池;
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize:核心的線程池里的線程數,自己指定。
maximumPoolSize:最多這個線程池里裝多少個線程;
keepAliveTime:線程呆多久沒有任務傳給它就會消失;
unit:和上面統一指定的;
blockingQueue:真正的裝任務的容器,往往都是用blockingQueue;阻塞式的;任務來了就扔進去,什么時候用到了都可以取。
例如,fixedThreadPool的實現是:

public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
十一、parallel stream

package yxxy.c_026; import java.util.ArrayList; import java.util.List; import java.util.Random; public class T14_ParallelStreamAPI { public static void main(String[] args) { List<Integer> nums = new ArrayList<>(); Random r = new Random(); for(int i=0; i<10000; i++) nums.add(1000000 + r.nextInt(1000000)); //System.out.println(nums); long start = System.currentTimeMillis(); nums.forEach(v->isPrime(v)); long end = System.currentTimeMillis(); System.out.println(end - start); //使用parallel stream api start = System.currentTimeMillis(); nums.parallelStream().forEach(T14_ParallelStreamAPI::isPrime); end = System.currentTimeMillis(); System.out.println(end - start); } static boolean isPrime(int num) { for(int i=2; i<=num/2; i++) { if(num % i == 0) return false; } return true; } }
console:

1526 337
paralleStream(): 運用多線程,如果把這1萬個數看成是數據流,我們用多線程去訪問里面的數,共同來做計算,默認使用多線程。
---------------