1. 執行器服務 ExecutorService
java.util.concurrent.ExecutorService 接口表示一個異步執行機制,使我們能夠在后台執行任務。因此一個 ExecutorService 很類似於一個線程池。實際上,存在於 java.util.concurrent 包里的 ExecutorService 實現就是一個線程池實現。
ExecutorService 例子
以下是一個簡單的 ExecutorService 例子:
ExecutorService executorService = Executors.newFixedThreadPool(10); executorService.execute(new Runnable() { public void run() { System.out.println("Asynchronous task"); } }); executorService.shutdown();
任務委派
下圖說明了一個線程是如何將一個任務委托給一個 ExecutorService 去異步執行的:
一個線程將一個任務委派給一個 ExecutorService 去異步執行。
一旦該線程將任務委派給 ExecutorService,該線程將繼續它自己的執行,獨立於該任務的執行。
ExecutorService 實現
既然 ExecutorService 是個接口,如果你想用它的話就得去使用它的實現類之一。
java.util.concurrent 包提供了 ExecutorService 接口的以下實現類:
- ThreadPoolExecutor(可以使用 Executors 工廠類來創建)
- ScheduledThreadPoolExecutor
創建一個 ExecutorService
ExecutorService 的創建依賴於你使用的具體實現。但是你也可以使用 Executors 工廠類來創建 ExecutorService 實例。
以下是幾個創建 ExecutorService 實例的例子:
// 創建單一線程池 ExecutorService executorService1 = Executors.newSingleThreadExecutor(); // 創建固定數目線程池 ExecutorService executorService2 = Executors.newFixedThreadPool(10); // 創建執行計划線程池 ExecutorService executorService3 = Executors.newScheduledThreadPool(10); //創建緩存線程池 ExecutorService executorService4 = Executors.newCachedThreadPool()
ExecutorService 使用
有幾種不同的方式來將任務委托給 ExecutorService 去執行:
- execute(Runnable)
- submit(Runnable)
- submit(Callable)
- invokeAny(...)
- invokeAll(...)
接下來我們挨個看一下這些方法。
execute(Runnable)
execute(Runnable) 方法要求一個 java.lang.Runnable 對象,然后對它進行異步執行。以下是使用 ExecutorService 執行一個 Runnable 的示例:
ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.execute(new Runnable() { public void run() { System.out.println("Asynchronous task"); } }); executorService.shutdown();
沒有辦法得知被執行的 Runnable 的執行結果。如果有需要的話你得使用一個 Callable(以下將做介紹)。
submit(Runnable)
submit(Runnable) 方法也要求一個 Runnable 實現類,但它返回一個 Future 對象。這個 Future 對象可以用來檢查 Runnable 是否已經執行完畢。以下是 ExecutorService submit() 示例:
Future future = executorService.submit(new Runnable() { public void run() { System.out.println("Asynchronous task"); } }); future.get(); //returns null if the task has finished correctly
submit(Callable)
submit(Callable) 方法類似於 submit(Runnable) 方法,除了它所要求的參數類型之外。Callable 實例除了它的 call() 方法能夠返回一個結果之外和一個 Runnable 很相像。Runnable.run() 不能夠返回一個結果。Callable 的結果可以通過 submit(Callable) 方法返回的 Future 對象進行獲取。
以下是一個 ExecutorService Callable 示例:
Future future = executorService.submit(new Callable(){ public Object call() throws Exception { System.out.println("Asynchronous Callable"); return "Callable Result"; } }); System.out.println("future.get() = " + future.get());
以上代碼輸出:
Asynchronous Callable
future.get() = Callable Result
invokeAny()
invokeAny() 方法要求一系列的 Callable 或者其子接口的實例對象。調用這個方法並不會返回一個 Future,但它返回其中一個 Callable 對象的結果。無法保證返回的是哪個 Callable 的結果 - 只能表明其中一個已執行結束。
如果其中一個任務執行結束(或者拋了一個異常),其他 Callable 將被取消。以下是示例代碼:
ExecutorService executorService = Executors.newSingleThreadExecutor(); Set<Callable<String>> callables = new HashSet<Callable<String>>(); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 1"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 2"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 3"; } }); String result = executorService.invokeAny(callables); System.out.println("result = " + result); executorService.shutdown();
上述代碼將會打印出給定 Callable 集合中的一個的執行結果。我自己試着執行了它幾次,結果始終在變。有時是 "Task 1",有時是 "Task 2" 等等。
invokeAll()
invokeAll() 方法將調用你在集合中傳給 ExecutorService 的所有 Callable 對象。invokeAll() 返回一系列的 Future 對象,通過它們你可以獲取每個 Callable 的執行結果。記住,一個任務可能會由於一個異常而結束,因此它可能沒有 "成功"。
無法通過一個 Future 對象來告知我們是兩種結束中的哪一種。以下是一個代碼示例:
ExecutorService executorService = Executors.newSingleThreadExecutor(); Set<Callable<String>> callables = new HashSet<Callable<String>>(); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 1"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 2"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 3"; } }); List<Future<String>> futures = executorService.invokeAll(callables); for(Future<String> future : futures){ System.out.println("future.get = " + future.get()); } executorService.shutdown();
ExecutorService 關閉
使用完 ExecutorService 之后你應該將其關閉,以使其中的線程不再運行。
executorService.shutdown();
比如,如果你的應用是通過一個 main() 方法啟動的,之后 main 方法退出了你的應用,如果你的應用有一個活動的 ExexutorService 它將還會保持運行。ExecutorService 里的活動線程阻止了 JVM 的關閉。
要終止 ExecutorService 里的線程你需要調用 ExecutorService 的 shutdown() 方法。ExecutorService 並不會立即關閉,但它將不再接受新的任務,而且一旦所有線程都完成了當前任務的時候,ExecutorService 將會關閉。在 shutdown() 被調用之前所有提交給 ExecutorService 的任務都被執行。如果你想要立即關閉 ExecutorService,你可以調用 shutdownNow() 方法。這樣會立即嘗試停止所有執行中的任務,並忽略掉那些已提交但尚未開始處理的任務。無法擔保執行任務的正確執行。可能它們被停止了,也可能已經執行結束。
2. 線程池執行者 ThreadPoolExecutor
java.util.concurrent.ThreadPoolExecutor 是 ExecutorService 接口的一個實現。ThreadPoolExecutor 使用其內部池中的線程執行給定任務(Callable 或者 Runnable)。
ThreadPoolExecutor 包含的線程池能夠包含不同數量的線程。池中線程的數量由以下變量決定:
- corePoolSize
- maximumPoolSize
當一個任務委托給線程池時,如果池中線程數量低於 corePoolSize,一個新的線程將被創建,即使池中可能尚有空閑線程。如果內部任務隊列已滿,而且有至少 corePoolSize 正在運行,但是運行線程的數量低於 maximumPoolSize,一個新的線程將被創建去執行該任務。
ThreadPoolExecutor 圖解:

一個 ThreadPoolExecutor。
創建一個 ThreadPoolExecutor
ThreadPoolExecutor 有若干個可用構造子。比如:
int corePoolSize = 5; int maxPoolSize = 10; long keepAliveTime = 5000; ExecutorService threadPoolExecutor = new ThreadPoolExecutor( corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() );
但是,除非你確實需要顯式為 ThreadPoolExecutor 定義所有參數,使用 java.util.concurrent.Executors 類中的工廠方法之一會更加方便,正如 ExecutorService小節所述。
3. 定時執行者服務 ScheduledExecutorService
java.util.concurrent.ScheduledExecutorService 是一個 ExecutorService, 它能夠將任務延后執行,或者間隔固定時間多次執行。 任務由一個工作者線程異步執行,而不是由提交任務給 ScheduledExecutorService 的那個線程執行。
ScheduledExecutorService 例子
以下是一個簡單的 ScheduledExecutorService 示例:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5); ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(new Callable() { public Object call() throws Exception { System.out.println("Executed!"); return "Called!"; } }, 5, TimeUnit.SECONDS);
首先一個內置 5 個線程的 ScheduledExecutorService 被創建。之后一個 Callable 接口的匿名類示例被創建然后傳遞給 schedule() 方法。后邊的倆參數定義了 Callable 將在 5 秒鍾之后被執行。
ScheduledExecutorService 實現
既然 ScheduledExecutorService 是一個接口,你要用它的話就得使用 java.util.concurrent 包里對它的某個實現類。ScheduledExecutorService 具有以下實現類:ScheduledThreadPoolExecutor
創建一個 ScheduledExecutorService
如何創建一個 ScheduledExecutorService 取決於你采用的它的實現類。但是你也可以使用 Executors 工廠類來創建一個 ScheduledExecutorService 實例。比如:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
ScheduledExecutorService 使用
一旦你創建了一個 ScheduledExecutorService,你可以通過調用它的以下方法:
- schedule (Callable task, long delay, TimeUnit timeunit)
- schedule (Runnable task, long delay, TimeUnit timeunit)
- scheduleAtFixedRate (Runnable, long initialDelay, long period, TimeUnit timeunit)
- scheduleWithFixedDelay (Runnable, long initialDelay, long period, TimeUnit timeunit)
下面我們就簡單看一下這些方法。
schedule (Callable task, long delay, TimeUnit timeunit)
這個方法計划指定的 Callable 在給定的延遲之后執行。這個方法返回一個 ScheduledFuture,通過它你可以在它被執行之前對它進行取消,或者在它執行之后獲取結果。以下是一個示例
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5); ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(new Callable() { public Object call() throws Exception { System.out.println("Executed!"); return "Called!"; } }, 5, TimeUnit.SECONDS); System.out.println("result = " + scheduledFuture.get()); scheduledExecutorService.shutdown();
示例輸出結果:
Executed!
result = Called!
schedule (Runnable task, long delay, TimeUnit timeunit)
除了 Runnable 無法返回一個結果之外,這一方法工作起來就像以一個 Callable 作為一個參數的那個版本的方法一樣,因此 ScheduledFuture.get() 在任務執行結束之后返回 null。
scheduleAtFixedRate (Runnable, long initialDelay, long period, TimeUnit timeunit)
這一方法規划一個任務將被定期執行。該任務將會在首個 initialDelay 之后得到執行,然后每個 period 時間之后重復執行。如果給定任務的執行拋出了異常,該任務將不再執行。如果沒有任何異常的話,這個任務將會持續循環執行到 ScheduledExecutorService 被關閉。如果一個任務占用了比計划的時間間隔更長的時候,下一次執行將在當前執行結束執行才開始。計划任務在同一時間不會有多個線程同時執行。
scheduleWithFixedDelay (Runnable, long initialDelay, long period, TimeUnit timeunit)
除了 period 有不同的解釋之外這個方法和 scheduleAtFixedRate() 非常像。
scheduleAtFixedRate() 方法中,period 被解釋為前一個執行的開始和下一個執行的開始之間的間隔時間。而在本方法中,period 則被解釋為前一個執行的結束和下一個執行的結束之間的間隔。因此這個延遲是執行結束之間的間隔,而不是執行開始之間的間隔。
ScheduledExecutorService 關閉
正如 ExecutorService,在你使用結束之后你需要把 ScheduledExecutorService 關閉掉。否則他將導致 JVM 繼續運行,即使所有其他線程已經全被關閉。
你可以使用從 ExecutorService 接口繼承來的 shutdown() 或 shutdownNow() 方法將 ScheduledExecutorService 關閉。參見 ExecutorService 關閉部分以獲取更多信息。
4. 使用 ForkJoinPool 進行分叉和合並
ForkJoinPool 在 Java 7 中被引入。它和 ExecutorService 很相似,除了一點不同。ForkJoinPool 讓我們可以很方便地把任務分裂成幾個更小的任務,這些分裂出來的任務也將會提交給 ForkJoinPool。任務可以繼續分割成更小的子任務,只要它還能分割。可能聽起來有些抽象,因此本節中我們將會解釋 ForkJoinPool 是如何工作的,還有任務分割是如何進行的。
分叉和合並解釋
在我們開始看 ForkJoinPool 之前我們先來簡要解釋一下分叉和合並的原理。
分叉和合並原理包含兩個遞歸進行的步驟。兩個步驟分別是分叉步驟和合並步驟。
分叉
一個使用了分叉和合並原理的任務可以將自己分叉(分割)為更小的子任務,這些子任務可以被並發執行。如下圖所示
通過把自己分割成多個子任務,每個子任務可以由不同的 CPU 並行執行,或者被同一個 CPU 上的不同線程執行。只有當給的任務過大,把它分割成幾個子任務才有意義。把任務分割成子任務有一定開銷,因此對於小型任務,這個分割的消耗可能比每個子任務並發執行的消耗還要大。
什么時候把一個任務分割成子任務是有意義的,這個界限也稱作一個閥值。這要看每個任務對有意義閥值的決定。很大程度上取決於它要做的工作的種類。
合並
當一個任務將自己分割成若干子任務之后,該任務將進入等待所有子任務的結束之中。一旦子任務執行結束,該任務可以把所有結果合並到同一個結果。圖示如下:

當然,並非所有類型的任務都會返回一個結果。如果這個任務並不返回一個結果,它只需等待所有子任務執行完畢。也就不需要結果的合並啦。
ForkJoinPool
ForkJoinPool 是一個特殊的線程池,它的設計是為了更好的配合 分叉-和-合並 任務分割的工作。ForkJoinPool 也在 java.util.concurrent 包中,其完整類名為 java.util.concurrent.ForkJoinPool。
創建一個 ForkJoinPool
你可以通過其構造子創建一個 ForkJoinPool。作為傳遞給 ForkJoinPool 構造子的一個參數,你可以定義你期望的並行級別。並行級別表示你想要傳遞給 ForkJoinPool 的任務所需的線程或 CPU 數量。以下是一個 ForkJoinPool 示例:
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
這個示例創建了一個並行級別為 4 的 ForkJoinPool。
提交任務到 ForkJoinPool
就像提交任務到 ExecutorService 那樣,把任務提交到 ForkJoinPool。你可以提交兩種類型的任務。一種是沒有任何返回值的(一個 "行動"),另一種是有返回值的(一個"任務")。這兩種類型分別由 RecursiveAction 和 RecursiveTask 表示。接下來介紹如何使用這兩種類型的任務,以及如何對它們進行提交。
RecursiveAction
RecursiveAction 是一種沒有任何返回值的任務。它只是做一些工作,比如寫數據到磁盤,然后就退出了。一個 RecursiveAction 可以把自己的工作分割成更小的幾塊,這樣它們可以由獨立的線程或者 CPU 執行。你可以通過繼承來實現一個 RecursiveAction。示例如下:
import java.util.ArrayList; import java.util.List; import java.util.concurrent.RecursiveAction; public class MyRecursiveAction extends RecursiveAction { private long workLoad = 0; public MyRecursiveAction(long workLoad) { this.workLoad = workLoad; } @Override protected void compute() { //if work is above threshold, break tasks up into smaller tasks if(this.workLoad > 16) { System.out.println("Splitting workLoad : " + this.workLoad); List<MyRecursiveAction> subtasks = new ArrayList<MyRecursiveAction>(); subtasks.addAll(createSubtasks()); for(RecursiveAction subtask : subtasks){ subtask.fork(); } } else { System.out.println("Doing workLoad myself: " + this.workLoad); } } private List<MyRecursiveAction> createSubtasks() { List<MyRecursiveAction> subtasks = new ArrayList<MyRecursiveAction>(); MyRecursiveAction subtask1 = new MyRecursiveAction(this.workLoad / 2); MyRecursiveAction subtask2 = new MyRecursiveAction(this.workLoad / 2); subtasks.add(subtask1); subtasks.add(subtask2); return subtasks; } }
MyRecursiveAction myRecursiveAction = new MyRecursiveAction(24); forkJoinPool.invoke(myRecursiveAction);
RecursiveTask
RecursiveTask 是一種會返回結果的任務。它可以將自己的工作分割為若干更小任務,並將這些子任務的執行結果合並到一個集體結果。可以有幾個水平的分割和合並。以下是一個 RecursiveTask 示例:
import java.util.ArrayList; import java.util.List; import java.util.concurrent.RecursiveTask; public class MyRecursiveTask extends RecursiveTask<Long> { private long workLoad = 0; public MyRecursiveTask(long workLoad) { this.workLoad = workLoad; } protected Long compute() { //if work is above threshold, break tasks up into smaller tasks if(this.workLoad > 16) { System.out.println("Splitting workLoad : " + this.workLoad); List<MyRecursiveTask> subtasks = new ArrayList<MyRecursiveTask>(); subtasks.addAll(createSubtasks()); for(MyRecursiveTask subtask : subtasks){ subtask.fork(); } long result = 0; for(MyRecursiveTask subtask : subtasks) { result += subtask.join(); } return result; } else { System.out.println("Doing workLoad myself: " + this.workLoad); return workLoad * 3; } } private List<MyRecursiveTask> createSubtasks() { List<MyRecursiveTask> subtasks = new ArrayList<MyRecursiveTask>(); MyRecursiveTask subtask1 = new MyRecursiveTask(this.workLoad / 2); MyRecursiveTask subtask2 = new MyRecursiveTask(this.workLoad / 2); subtasks.add(subtask1); subtasks.add(subtask2); return subtasks; } }
除了有一個結果返回之外,這個示例和 RecursiveAction 的例子很像。MyRecursiveTask 類繼承自 RecursiveTask<Long>,這也就意味着它將返回一個 Long 類型的結果。
MyRecursiveTask 示例也會將工作分割為子任務,並通過 fork() 方法對這些子任務計划執行。
此外,本示例還通過調用每個子任務的 join() 方法收集它們返回的結果。子任務的結果隨后被合並到一個更大的結果,並最終將其返回。對於不同級別的遞歸,這種子任務的結果合並可能會發生遞歸。
你可以這樣規划一個 RecursiveTask:
MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128); long mergedResult = forkJoinPool.invoke(myRecursiveTask); System.out.println("mergedResult = " + mergedResult);
注意是如何通過 ForkJoinPool.invoke() 方法的調用來獲取最終執行結果的。
轉載:https://www.jianshu.com/p/8e04a1b6e2a5