在Java中,我們如果想同時做多件事情,則需要將不同事情以任務的形式抽象出來(即實現了Runnable接口的類),將不同的任務交給線程來驅動,以完成同時執行多件事情的效果。創建任務很容易,new一個類就可以了,但是要跑起來還需要線程啊,線程可是稀缺資源啊,怎么獲取呢?
前面在Java線程機制一文中我們簡單介紹了線程創建的幾種方法,但這只是作為學習使用的,在生產環境中一般是不會直接通過新建線程來獲取線程資源的。因為Java中的線程是和操作系統底層的線程掛鈎的,創建線程是一個很消耗時間和資源的事情,如果頻繁創建和銷毀線程就可能會導致資源耗盡;而且如果創建了大量線程,也會導致線程之間的頻繁切換,這也是很耗時間的操作。因此,JDK中提供了線程池來幫助我們獲取和管理線程資源。
有了線程池,我們無需直接創建線程,只需將需要執行的任務交給線程池就好了,線程池會幫我們分配線程來執行任務。
使用線程池,有如下好處:
- 線程池幫我們管理線程,使得我們無需關心這些細節,可以更專注於任務的實現,解耦;
- 線程池通過統一管理創建的線程,實現線程的復用,避免線程的頻繁創建和銷毀,減少了在創建和銷毀線程上所花的時間以及系統資源的開銷,資源利用率更高;
- 當需要執行大量的異步任務時,由線程池統一管理和調配線程資源,可以獲得更好的性能;
本文我們會從如下幾個方面來進行總結:
1. Executor框架
既然線程池這么好,我們就來看看JDK中提供了哪些線程池供我們使用吧。Java中提供線程池工具的是Executor框架,如下是其類圖,我們看一下其基本組成:
1.1 Eecutor
處於最頂部的是Executor,這是一個基礎接口,只定義了一個唯一方法execute(),用於提交任務:
void execute(Runnable command);
1.2 ExecutorService
ExecutorService則提供了更多功能,包括service的管理功能如shutdown等方法,還包括不同於execute的更全面的提交任務機制,如返回Future的submit方法。因為Runnable是執行工作的獨立任務,但是它不返回任何值,如果希望任務在完成時能夠返回一個值,那么可以讓任務實現Callable接口而不是Runnable接口,並且必須使用ExecutorService.submit()方法提交任務,看一個demo吧:
// 定義一個帶返回值的任務,實現Callable接口
class TaskWithResult implements Callable<String>{ private int id; public TaskWithResult(int id){ this.id = id; }
// 這個就是提供返回值的方法,當獲取返回值時實際會調用這個方法 public String call(){ return "result of TaskWithResult " + id; } } public class CallableDemo{ public static void main(String[] args){ ExecutorService exec = Executors.newCachedThreadPool(); ArrayList<Futrue<String>> results = new ArrayList<Future<String>>(); for(int i = 0; i<10 ; i++){
// 提交任務之后會返回一個Future,可以通過它的get方法獲取任務計算返回的結果 results.add(exec.submit(new TaskWithResult(i))); } for(Future<String> fs : results){ try{ // 調用get()方法時必要的話(計算任務未完成)會阻塞 System.out.println(fs.get()); }catch(InterruptedException e){ System.out.println(e); return; }catch(ExecutionExecution e){ System.out.println(e); return; }finally{ exec.shutdown(); } } } } /** output: result of TaskWithResult 0 result of TaskWithResult 1 ... result of TaskWithResult 9 */
1.3 線程池實現
JDK提供了幾種線程池基礎實現,分別是ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool。通過不同的構造參數,我們可以產生多種不同特性的線程池以滿足復雜多變的實際應用場景。后面我們會進一步分析其構造函數部分源碼,來剖析這個靈活性的源頭。
1.4 Executors
借助Executors提供的靜態工廠方法,我們可以方便地創建出不同配置的線程池,Executors目前主要提供了如下幾種不同的線程池創建方式:
-
newCachedThreadPool(),它是一種用來處理大量短時間工作任務的線程池,它會試圖緩存線程並重用,當無緩存線程可用時,就會創建新的工作線程;如果線程閑置的時間超過60秒,則被終止並移出緩存;長時間閑置時,這種線程池,不會消耗什么資源。其內部使用 SynchronousQueue作為工作隊列。
-
newFixedThreadPool(int nThreads),重用指定數目(nThreads)的線程,其底層使用的是無界的工作隊列,任何時候最多有nThreads個工作線程是活動的。這意味着,如果任務數量超過了活動隊列數目,將在工作隊列中等待空閑線程出現;如果有工作線程退出,將會有新的工作線程被創建,以補足指定的數目 nThreads。
-
newSingleThreadExecutor(),它的特點在於工作線程數目被限制為1,操作一個無界的工作隊列,所以它保證了所有任務的都是被順序執行,最多會有一個任務處於活動狀態,並且不允許使用者改動線程池實例,因此可以避免其改變線程數目。
-
newSingleThreadScheduledExecutor()和newScheduledThreadPool(int corePoolSize),創建的是一個ScheduledExecutorService,可以進行定時或周期性的工作調度,區別在於單一工作線程還是多個工作線程。
-
newWorkStealingPool(int parallelism),這是一個經常被人忽略的線程池,java8才加入這個創建方法,其內部會構建ForkJoin Pool,利用Work-Stealing算法,並行地處理任務,不保證處理順序。
2. 線程池使用
利用這些工廠方法,常見的線程池創建方式如下:
ExecutorService threadPool1 = Executors.newCachedThreadPool(); ExecutorService threadPool2 = Executors.newFixedThreadPool(10); ExecutorService threadPool3 = Executors.newSingleThreadExecutor(); ExecutorService threadPool4 = Executors.newScheduledThreadPool(10); ExecutorService threadPool5 = Executors.newWorkStealingPool();
在大多數應用場景下,使用Executors提供的靜態工廠方法就足夠了,但是仍然可能需要直接利用ThreadPoolExecutor等構造函數線程池創建(其實如上5種方式除了newWorkStealingPool之外,其余都是通過ThreadPoolExecutor類的構造函數來實現的),比如:
ExecutorService service = new ThreadPoolExecutor(1,1, 60L,TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10));
為什么需要這樣做呢?因為這樣做可以根據我們的實際使用場景靈活調整線程池參數。這需要對線程池構造方式有進一步的了解,需要明白線程池的設計和結構。因為大部分線程池的構造函數都是調用的ThreadPoolExecutor的構造器,所以在本文以及后面的原理分析的文章中我們都是針對ThreadPoolExecutor,JDK為1.8,我們先來看一下ThreadPoolExecutor的構造函數:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
當然ThreadPoolExecutor還有很多構造函數,但是底層也都是調用的這個構造函數,只是傳的參數是默認參數而已,這里就不一一列出了,占空間。線程池的構造函數有一堆的參數,這個還是有必要看一下的:
-
corePoolSize:核心線程數量,常駐線程數量,包括空閑線程;
-
maximumPoolSize:最大的線程數量,常駐+臨時線程數量;
-
workQueue:多余任務等待隊列,此隊列僅保持由 execute方法提交的 Runnable任務,必須是BlockingQueue;
-
keepAliveTime:非核心線程空閑時間,即當線程數大於核心數時,多余的空閑線程等待新任務的最長時間;
-
unit:keepAliveTime 參數的時間單位;
-
threadFactory:執行程序創建新線程時使用的工廠,這里用到了抽象工廠模式,Executors提供了一個默認的線程工廠實現DefaultThreadFactory;
-
handler:線程池拒絕策略,當任務實在是太多,沒有空閑線程,等待隊列也滿了,如果還有任務怎么辦?默認是不處理,拋出異常告訴任務提交者,我這忙不過來了,你提交了也處理不了;
通過配置不同的參數,我們就可以創建出行為特性各異的線程池,而這,就是線程池高度靈活性的基石。
3. 線程池結構及狀態
到這里我們知道線程的優點,學習了怎樣創建線程池以及通過構造器部分的源碼我們知道了線程池靈活性的根源,是時候再進一步了。我們可以把線程池理解成為一個容器,幫我們創建線程,接受我們提交給它的任務,並幫我們執行任務。那我們就有必要詳細來看一下線程池內部是如何保存我們的任務以及線程,並通過什么方式來表征線程池自身的狀態的。
我們進入源碼,首先映入眼簾的便是如下這一堆代碼:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; // 工作線程的理論上限,大約5億多個線程 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; //11100000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; //0 private static final int STOP = 1 << COUNT_BITS; //00100000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; //01000000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS; //01100000000000000000000000000000 // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
ctl,即線程池的控制狀態,這是一個原子類,在這個整型數中封裝了兩層意思(限於表達能力,只能這樣表達):
- workerCount,即有效線程數量(也可以說是worker的數量);
- runState,你線程池的運行狀態;
我們來看一下Doug Lea大神是如何在一個整型變量中表達兩層含義的呢?
3.1 線程數量
我們知道Java中的int型整數是32位的,在線程池中利用整型的高3位來表征線程池的運行狀態,用剩下的低29位來表達有效線程數量,2的29次方是什么數量級,大概5億吧,在目前以及未來很長一段時間,單機上是很難達到這個級別的線程數量的(即便未來存在問題,也可以通過Long類型來解決),所以線程數量問題就滿足了,多出來的高三位就可以用來表達線程池運行狀態了。
3.2 線程池狀態
對照代碼來看,上面COUNT_BITS實際為29,CAPACITY表示最大有效線程數量,大概是2的29次方。線程的狀態和其對應的位的值如下:
- RUNNING:高三位為111,運行狀態,可以接受任務執行隊列里的任務;
- SHUTDOWN:高三位為000,指調用了 shutdown() 方法,不再接受新任務了,但是隊列里的任務得執行完畢;
- STOP:高三位為001,指調用了 shutdownNow() 方法,不再接受新任務,同時拋棄阻塞隊列里的所有任務並中斷所有正在執行任務;
- TIDYING:高三位為010,所有任務都執行完畢,在調用 shutdown()/shutdownNow() 中都會嘗試更新為這個狀態;
- TERMINATED:高三位為011,終止狀態,當執行 terminated() 后會更新為這個狀態;
這些狀態之間是會互相轉變的,它們之間的轉換時機如下:
- RUNNING -> SHUTDOWN,調用線程池的shutdown()方法;
- (RUNNING or SHUTDOWN) -> STOP,調用線程池的shutdownNow()方法時;
- SHUTDOWN -> TIDYING,當任務隊列和線程池(保存線程的一個hashSet)都為空時;
- STOP -> TIDYING,當任務隊列為空時;
- TIDYING -> TERMINATED,調用線程池的terminated()方法並執行完畢之后;
說了這么多,還是上張圖吧:
3.3 為什么這么設計
但是看上面那堆代碼,因為一個整型變量表示兩種含義,每次要使用的時候都要通過一些位運算來將需要的信息提取出來,為什么不直接用兩個變量來表示?難道是節約空間?嗯,起先我也是這樣認為的,后來才發現是自己too young了。。。一個整型總共才占用4個字節,兩個才多了4個字節,為了這4個字節需要這么大費周章嗎!后來才知道這是因為在多線程環境下,運行狀態和有效線程數量往往需要保證統一,不能出現一個改而另一個沒有改動的情況,如果將他們放在同一個AtmocInteger中,利用AtomicInteger的原子操作,就可以保證這兩個值始終是統一的,嗯,對Doug大神並發的理解真是出神入化。后面我們在源碼分析中可以有更直觀的體會。
3.4 線程池核心數據結構
我們接着看源碼,主要有兩個地方需要注意:
// 保存任務的阻塞隊列
private final BlockingQueue<Runnable> workQueue;
// 保存工作線程的set,即真正的池 private final HashSet<Worker> workers = new HashSet<Worker>();
對於這里,比較簡單:
- 工作隊列負責存儲用戶提交的任務,容量可以指定,必須為BlockingQueue
- 這個works才是真正的“線程池”,用來保存工作線程的集合,原來所謂的線程池中的線程都是保存在一個HashSet中。線程池的工作線程被抽象為靜態內部類Worker,是基於AQS實現,后面會詳細分析其原理。
4. 總結
1. 使用線程池有很多好處:
-
降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗;
-
提高響應速度。當任務到達時,任務可以不需要等到線程創建就能立即執行;
-
提高線程的可管理性。線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的分配,調優和監控;
- 解耦,用戶不用關心線程的創建,只需提交任務即可;
2. JDK中Executor框架提供如ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool等線程池的基本實現,可以通過Executors提供的靜態工廠方法創建多種線程池,也可使用ThreadPoolExecutor提供的構造函數定制化符合業務需求的線程池;
3. 線程通過一個整型變量ctl表示存活線程數量和線程池運行狀態;
4. 用戶提交的任務是保存在一個阻塞隊列中,線程池創建的工作線程是保存在一個HashSet中;
在本文中我們從線程池優點開始,再到了解整個Executor框架,通過一些簡單demo了解了線程池的基本使用,再結合源碼初步分析了線程池的內部數據結構以及狀態表征,關於線程池進一步的運行原理,有興趣的同學可以關注后面的文章。總結不易,覺得有幫助就點個贊吧^_^