一. 線程的概念
在單線程情況下,計算機中存在一個控制權,並按照順序依次執行指令。單線程好像是一個只有一個隊長指揮的小隊,整個小隊同一個時間只能執行一個任務。在多線程情境下,計算機中有多個控制權。多個控制權可以同時進行,每個控制權依次執行一系列的指令。多線程好像是一個小隊中的成員同時執行不同的任務。
多個線程可以並存於同一個進程空間。在JVM的一個進程空間中,一個棧(stack)代表了方法調用的次序。對於多線程來說,進程空間中需要有多個棧,以記錄不同線程的調用次序。多個棧互不影響,但所有的線程將共享堆(heap)中的對象。
二. 線程的創建方式
1.繼承Thread類
通過繼承Thread類創建線程類的具體步驟和具體代碼如下:
• 定義一個繼承Thread類的子類,並重寫該類的run()方法;
• 創建Thread子類的實例,即創建了線程對象;
• 調用該線程對象的start()方法啟動線程。
class SomeThead extends Thraad { public void run() { //do something here } } public static void main(String[] args){ SomeThread thread1 = new SomeThread(); 步驟3:啟動線程: thread1.start(); }
2.實現Runnable接口
通過實現Runnable接口創建線程類的具體步驟和具體代碼如下:
• 定義Runnable接口的實現類,並重寫該接口的run()方法;
• 創建Runnable實現類的實例,並以此實例作為Thread的target對象,即該Thread對象才是真正的線程對象。
class SomeRunnable implements Runnable { public void run() { //do something here } } Runnable run1 = new SomeRunnable(); Thread thread1 = new Thread(run1); thread1.start();
三. 線程池
線程池是多線程編程中的核心概念,簡單來說就是一組可以執行任務的空閑線程。線程池維護着多個線程,等待着監督管理者分配可並發執行的任務。這避免了在處理短時間任務時創建與銷毀線程的代價。通常,線程池的線程數量取CPU+2。
- 重用存在的線程,省去線程的創建銷毀過程,性能佳。
- 有效控制最大並發線程數。提高了使用率並避免了競爭。
- 定時執行,定期執行,單線程,並發控制等功能。
盡管這帶來了諸多優勢,首當其沖的就是程序性能提高,但多線程編程也有缺點 —— 增加了代碼復雜度、同步問題、非預期結果和增加創建線程的開銷。接下來了解一下如何使用 Java 線程池來緩解這些問題。
Java 通過 executor 對象來實現自己的線程池模型。可以使用 executor 接口或其他線程池的實現,它們都允許細粒度的控制。
java.util.concurrent 包中有以下接口:
►Executor —— 執行任務的簡單接口
►ExecutorService —— 一個較復雜的接口,包含額外方法來管理任務和 executor 本身
►ScheduledExecutorService —— 擴展自 ExecutorService,增加了執行任務的調度方法
除了這些接口,這個包中也提供了 Executors 類直接獲取實現了這些接口的 executor 實例。一般來說,一個 Java 線程池包含以下部分:
►工作線程的池子,負責管理線程
►線程工廠,負責創建新線程
►等待執行的任務隊列
在下面的章節,讓我們仔細看一看 Java 類和接口如何為線程池提供支持。
Executors 類和 Executor 接口
Executors 類包含工廠方法創建不同類型的線程池,Executor 是個簡單的線程池接口,只有一個execute() 方法。
我們通過一個例子來結合使用這兩個類(接口),首先創建一個單線程的線程池,然后用它執行一個簡單的語句:
Executor executor = Executors.newSingleThreadExecutor(); executor.execute(() -> System.out.println("Single thread pool test"));
注意語句寫成了 lambda 表達式,會被自動推斷成 Runnable 類型。
如果有工作線程可用,execute() 方法將執行語句,否則就把 Runnable 任務放進隊列,等待線程可用。
基本上,executor 代替了顯式創建和管理線程。
Executors 類里的工廠方法可以創建很多類型的線程池:
►newSingleThreadExecutor():包含單個線程和無界隊列的線程池,同一時間只能執行一個任務
►newFixedThreadPool():包含固定數量線程並共享無界隊列的線程池;當所有線程處於工作狀態,有新任務提交時,任務在隊列中等待,直到一個線程變為可用狀態
►newCachedThreadPool():只有需要時創建新線程的線程池
►newWorkStealingThreadPool():基於工作竊取(work-stealing)算法的線程池,后面章節詳細說明
接下來,讓我們看一下 ExecutorService 接口提供了哪些新功能?
ExecutorService
創建 ExecutorService 方式之一便是通過 Excutors 類的工廠方法。
ExecutorService executor = Executors.newFixedThreadPool(10);
Besides the execute() method, this interface also defines a similar submit() method that can return a Future object:
除了 execute() 方法,接口也定義了相似的 submit() 方法,這個方法可以返回一個 Future 對象。
Callable<Double> callableTask = () -> { return employeeService.calculateBonus(employee); }; Future<Double> future = executor.submit(callableTask); // execute other operations try { if (future.isDone()) { double result = future.get(); } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
從上面的例子可以看到,Future 接口可以返回 Callable 類型任務的結果,而且能顯示任務的執行狀態。
當沒有任務等待執行時,ExecutorService 並不會自動銷毀,所以你可以使用 shutdown() 或shutdownNow() 來顯式關閉它。
executor.shutdown();
ScheduledExecutorService
這是 ExecutorService 的一個子接口,增加了調度任務的方法
ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
schedule() 方法的參數指定執行的方法、延時和 TimeUnit
Future<Double> future = executor.schedule(callableTask, 2, TimeUnit.MILLISECONDS);
另外,這個接口定義了其他兩個方法:
xecutor.scheduleAtFixedRate( () -> System.out.println("Fixed Rate Scheduled"), 2, 2000, TimeUnit.MILLISECONDS); executor.scheduleWithFixedDelay( () -> System.out.println("Fixed Delay Scheduled"), 2, 2000, TimeUnit.MILLISECONDS);
scheduleAtFixedRate() 方法延時 2 毫秒執行任務,然后每 2 秒重復一次。相似的,scheduleWithFixedDelay() 方法延時 2 毫秒后執行第一次,然后在上一次執行完成 2 秒后再次重復執行。
在下面的章節,我們來看一下 ExecutorService 接口的兩個實現:ThreadPoolExecutor 和ForkJoinPool。
ThreadPoolExecutor
這個線程池的實現增加了配置參數的能力。創建 ThreadPoolExecutor 對象
最方便的方式就是通過Executors 工廠方法:
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
這種情況下,線程池按照默認值預配置了參數。線程數量由以下參數控制:
►corePoolSize 和 maximumPoolSize:表示線程數量的范圍
►keepAliveTime:決定了額外線程存活時間
我們深入了解一下這些參數如何使用。
當一個任務被提交時,如果執行中的線程數量小於 corePoolSize,一個新的線程被創建。如果運行的線程數量大於 corePoolSize,但小於 maximumPoolSize,並且任務隊列已滿時,依然會創建新的線程。如果多於 corePoolSize 的線程空閑時間超過 keepAliveTime,它們會被終止。
上面那個例子中,newFixedThreadPool() 方法創建的線程池,corePoolSize=maximumPoolSize=10 並且 keepAliveTime 為 0 秒。
如果你使用 newCachedThreadPool() 方法,創建的線程池 maximumPoolSize 為Integer.MAX_VALUE,並且 keepAliveTime 為 60 秒。
ThreadPoolExecutor cachedPoolExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
The parameters can also be set through a constructor or through setter methods:
這些參數也可以通過構造函數或setter方法設置:
ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 6, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); executor.setMaximumPoolSize(8);
ThreadPoolExecutor 的一個子類便是 ScheduledThreadPoolExecutor,它實現了ScheduledExecutorService 接口。你可以通過 newScheduledThreadPool() 工廠方法來創建這種類型的線程池。
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(5);
上面語句創建了一個線程池,corePoolSize 為 5,maximumPoolSize 無限制,keepAliveTime 為 0 秒。
ForkJoinPool
另一個線程池的實現是 ForkJoinPool 類。它實現了 ExecutorService 接口,並且是 Java 7 中 fork/join 框架的重要組件。
fork/join 框架基於“工作竊取算法”。簡而言之,意思就是執行完任務的線程可以從其他運行中的線程“竊取”工作。
ForkJoinPool 適用於任務創建子任務的情況,或者外部客戶端創建大量小任務到線程池。
這種線程池的工作流程如下:
►創建 ForkJoinTask 子類
►根據某種條件將任務切分成子任務
►調用執行任務
►將任務結果合並
►實例化對象並添加到池中
創建一個 ForkJoinTask,你可以選擇 RecursiveAction 或 RecursiveTask 這兩個子類,后者有返回值。
我們來實現一個繼承 RecursiveTask 的類,計算階乘,並把任務根據閾值划分成子任務。
public class FactorialTask extends RecursiveTask<BigInteger> { private int start = 1; private int n; private static final int THRESHOLD = 20; // standard constructors @Override protected BigInteger compute() { if ((n - start) >= THRESHOLD) { return ForkJoinTask.invokeAll(createSubtasks()) .stream() .map(ForkJoinTask::join) .reduce(BigInteger.ONE, BigInteger::multiply); } else { return calculate(start, n); } } }
這個類需要實現的主要方法就是重寫 compute() 方法,用於合並每個子任務的結果。
具體划分任務邏輯在 createSubtasks() 方法中:
private Collection<FactorialTask> createSubtasks() { List<FactorialTask> dividedTasks = new ArrayList<>(); int mid = (start + n) / 2; dividedTasks.add(new FactorialTask(start, mid)); dividedTasks.add(new FactorialTask(mid + 1, n)); return dividedTasks;}
最后,calculate() 方法包含一定范圍內的乘數。
private BigInteger calculate(int start, int n) { return IntStream.rangeClosed(start, n) .mapToObj(BigInteger::valueOf) .reduce(BigInteger.ONE, BigInteger::multiply);}
接下來,任務可以添加到線程池:
ForkJoinPool pool = ForkJoinPool.commonPool(); BigInteger result = pool.invoke(new FactorialTask(100));
ThreadPoolExecutor 與 ForkJoinPool 對比
初看上去,似乎 fork/join 框架帶來性能提升。但是這取決於你所解決問題的類型。
當選擇線程池時,非常重要的一點是牢記創建、管理線程以及線程間切換執行會帶來的開銷。
ThreadPoolExecutor 可以控制線程數量和每個線程執行的任務。這很適合你需要在不同的線程上執行少量巨大的任務。
相比較而言,ForkJoinPool 基於線程從其他線程“竊取”任務。正因如此,當任務可以分割成小任務時可以提高效率。
為了實現工作竊取算法,fork/join 框架使用兩種隊列:
►包含所有任務的主要隊列
►每個線程的任務隊列
當線程執行完自己任務隊列中的任務,它們試圖從其他隊列獲取任務。為了使這一過程更加高效,線程任務隊列使用雙端隊列(double ended queue)數據結構,一端與線程交互,另一端用於“竊取”任務。
來自The H Developer的圖很好的表現出了這一過程:

和這種模型相比,ThreadPoolExecutor 只使用一個主要隊列。
最后要注意的一點 ForkJoinPool 只適用於任務可以創建子任務。否則它和 ThreadPoolExecutor沒區別,甚至開銷更大。
