13. 線程池
第四種獲取線程的方法:線程池,一個 ExecutorService,它使用可能的幾個池線程之一執行每個提交的任務,通常使用 Executors 工廠方法配置。
線程池可以解決兩個不同問題:由於減少了每個任務調用的開銷,它們通常可以在執行大量異步任務時提供增強的性能,並且還可以提供綁定和管理資源(包括執行任務集時使用的線程)的方法。每個 ThreadPoolExecutor 還維護着一些基本的統計數據,如完成的任務數。
為了便於跨大量上下文使用,此類提供了很多可調整的參數和擴展鈎子 (hook)。但是,強烈建議程序員使用較為方便的 Executors 工廠方法 :
- Executors.newCachedThreadPool()(無界線程池,可以進行自動線程回收)
- Executors.newFixedThreadPool(int)(固定大小線程池)
- Executors.newSingleThreadExecutor()(單個后台線程)
它們均為大多數使用場景預定義了設置。
創建包含5個線程的線程池,對變量進行增加操作
/* * 一、線程池:提供了一個線程隊列,隊列中保存着所有等待狀態的線程。避免了創建與銷毀額外開銷,提高了響應的速度。 * * 二、線程池的體系結構: * java.util.concurrent.Executor : 負責線程的使用與調度的根接口 * |--**ExecutorService 子接口: 線程池的主要接口 * |--ThreadPoolExecutor 線程池的實現類 * |--ScheduledExecutorService 子接口:負責線程的調度 * |--ScheduledThreadPoolExecutor :繼承 ThreadPoolExecutor, 實現 ScheduledExecutorService * * 三、工具類 : Executors * ExecutorService newFixedThreadPool() : 創建固定大小的線程池 * ExecutorService newCachedThreadPool() : 緩存線程池,線程池的數量不固定,可以根據需求自動的更改數量。 * ExecutorService newSingleThreadExecutor() : 創建單個線程池。線程池中只有一個線程 * * ScheduledExecutorService newScheduledThreadPool() : 創建固定大小的線程,可以延遲或定時的執行任務。 */ public class TestThreadPool { public static void main(String[] args) throws Exception { //1. 創建線程池 ExecutorService pool = Executors.newFixedThreadPool(5); ThreadPoolDemo tpd = new ThreadPoolDemo(); //2. 為線程池中的線程分配任務,>5,可將線程池里的五個線程都給調用 for (int i = 0; i < 10; i++) { pool.submit(tpd); } //3. 關閉線程池 pool.shutdown(); } // new Thread(tpd).start(); // new Thread(tpd).start(); } class ThreadPoolDemo implements Runnable{ private int i = 0; @Override public void run() { while(i <= 100){ System.out.println(Thread.currentThread().getName() + " : " + i++); } } }
線程池結合Callable和Future創建線程
public static void main(String[] args) throws Exception { //1. 創建線程池 ExecutorService pool = Executors.newFixedThreadPool(5); List<Future<Integer>> list = new ArrayList<>(); for (int i = 0; i < 10; i++) { //Future對象用於接收Callable線程的返回值 Future<Integer> future = pool.submit(new Callable<Integer>(){ //線程調用方法,查詢1-100之和 @Override public Integer call() throws Exception { int sum = 0; for (int i = 0; i <= 100; i++) { sum += i; } return sum; } }); list.add(future); } //關閉線程池 pool.shutdown(); //遍歷結果集,會輸出10次5050 for (Future<Integer> future : list) { System.out.println(future.get()); } }
14. 線程調度
接口ScheduledExecutorService 繼承自 ExecutorService接口,由ScheduledThreadPoolExecutor類(ThreadPoolExecutor類的子類)實現,可安排在給定的延遲后運行或定期執行的命令。
ScheduledExecutorService newScheduledThreadPool() : 創建固定大小的線程,可以延遲或定時的執行任務。
參考java.util.concurrent.ScheduledThreadPoolExecutor.class中schedule方法源碼
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); delayedExecute(t); return t; }
示例:
public class TestScheduledThreadPool { public static void main(String[] args) throws Exception { //創建ScheduledExecutorService類型的線程池對象 ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 5; i++) { Future<Integer> result = pool.schedule(new Callable<Integer>(){ @Override public Integer call() throws Exception { int num = new Random().nextInt(100);//生成隨機數 System.out.println(Thread.currentThread().getName() + " : " + num); return num; } }, 1, TimeUnit.SECONDS);
System.out.println(result.get()); } //線程池關閉 pool.shutdown(); } }
15. ForkJoinPool 分支合並框架-工作竊取
Fork/Join 框架:就是在必要的情況下,將一個大任務,進行拆分(fork)成若干個小任務(拆到不可再拆時),再將一個個的小任務運算的結果進行 join 匯總。

/* @since 1.7 * @author Doug Lea */ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { ... }
- 采用 “工作竊取”模式(work-stealing):當執行新的任務時它可以將其拆分分成更小的任務執行,並將小任務加到線程隊列中,然后再從一個隨機線程的隊列中偷一個並把它放在自己的隊列中。

- 相對於一般的線程池實現,fork/join框架的優勢體現在對其中包含的任務的處理方式上.在一般的線程池中,如果一個線程正在執行的任務由於某些原因無法繼續運行,那么該線程會處於等待狀態。而在fork/join框架實現中,如果某個子問題由於等待另外一個子問題的完成而無法繼續運行。那么處理該子問題的線程會主動尋找其他尚未運行的子問題來執行.這種方式減少了線程的等待時間,提高了性能。
jdk1.7之后提供了兩個Fork/Join 框架,兩個框架最大區別為是否有返回值
//有返回值 public abstract class RecursiveTask<V> extends ForkJoinTask<V> {} //無返回值 public abstract class RecursiveAction extends ForkJoinTask<Void> {}
下面為一實現示例(求兩數之間所有數之和,如1-100——>5050):
class ForkJoinSumCalculate extends RecursiveTask<Long>{ private static final long serialVersionUID = -1812835340478767238L; private long start; private long end; private static final long THURSHOLD = 10000L; //臨界值 public ForkJoinSumCalculate(long start, long end) { this.start = start; this.end = end; } @Override protected Long compute() { long length = end - start; //小於臨界值,則不進行拆分,直接計算初始值到結束值之間所有數之和 if(length <= THURSHOLD){ long sum = 0L; for (long i = start; i <= end; i++) { sum += i; } return sum; }else{ //大於臨界值,取中間值進行拆分,遞歸調用 long middle = (start + end) / 2; ForkJoinSumCalculate left = new ForkJoinSumCalculate(start, middle); left.fork(); //進行拆分,同時壓入線程隊列 ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle+1, end); right.fork(); // return left.join() + right.join(); } } }
測試1-50000000000的和:
public static void main(String[] args) { Instant start = Instant.now(); ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinSumCalculate(0L, 50000000000L); Long sum = pool.invoke(task); System.out.println(sum); Instant end = Instant.now(); System.out.println("耗費時間為:" + Duration.between(start, end).toMillis()); }
結果:cpu利用率達到100%,耗時19.361s

和for循環累加比較一下:
@Test public void test1(){ Instant start = Instant.now(); long sum = 0L; for (long i = 0L; i <= 50000000000L; i++) { sum += i; } System.out.println(sum); Instant end = Instant.now(); System.out.println("耗費時間為:" + Duration.between(start, end).toMillis());//35-3142-15704 }
結果如下:耗時18.699s

由於fork/join框架在復雜邏輯時不易拆分,java8為fork/join進行了改進,代碼如下:
//java8 新特性 @Test public void test2(){ Instant start = Instant.now(); Long sum = LongStream.rangeClosed(0L, 50000000000L) .parallel() .reduce(0L, Long::sum); System.out.println(sum); Instant end = Instant.now(); System.out.println("耗費時間為:" + Duration.between(start, end).toMillis());//1536-8118 }
結果:耗時15.428s

測試了幾個值,發現效率方面: java8 > for循環 > fork/join
| 10000000000L | 50000000000L |
100000000000L | |
| java8 | 3320ms | 15428ms | 34770ms |
| for | 3902ms | 18699ms | 37858ms |
| fork/join | 4236ms | 19361ms | 40977ms |
按理來說,隨着計算量的增大,fork/join的效率會超過for循環,但是在本機測試出的結果如上,fork/join框架的效率始終不如貼近底層的for循環。這方面可能一方面在於compute方法設計中long類型的裝箱拆箱存在一定時間開銷,另一方面可能由於臨界值選擇不合理,測試時選擇10000,在測試10000000000L累加時,采取四個臨界值:5000、10000、20000、100000,結果還是臨界值為10000時效率最高。還是相信眼見為實吧。
