使用線程池的好處
引用自 http://ifeve.com/java-threadpool/ 的說明:
- 降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。
- 提高響應速度。當任務到達時,任務可以不需要的等到線程創建就能立即執行。
- 提高線程的可管理性。線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的分配,調優和監控。
Java中的線程池是用ThreadPoolExecutor類來實現的. 本文就結合JDK 1.8對該類的源碼來分析一下這個類內部對於線程的創建, 管理以及后台任務的調度等方面的執行原理。ThreadPoolExecutor結構如下圖:

Executor接口
此接口提供了一種將任務提交與每個任務的運行機制分離的方法,包括線程使用,調度等的詳細信息。該接口中只有execute(Runnable command)方法,用來替代通常創建或啟動線程的方法。例如使用Thread創建線程
Thread thread = new Thread(); thread.start();
使用execute創建運行線程,具體的線程執行會由相應的實現類去執行(jdk默認線程池execute的實現是由ThreadPoolExecutor來實現的)
Thread thread = new Thread(); executor.execute(thread);
ExecutorService接口

ExecutorService接口提供管理終止的方法和可以生成Future的方法,用於跟蹤一個或多個異步任務的進度, 它繼承了Executor接口,同時增加了shutDown(),shutDownNow(),invokeAll(),invokeAny()和submit()等方法。
shutDown() : 允許之前提交的任務繼續執行(執行完后shutDown,不會再接收新的任務) shutDownNow():立即停止正在執行的任務 invokeAll():執行給定的任務,當所有任務完成后返回任務狀態和結果的Futures列表
invokeAny():執行給定的任務,返回已完成的任務的結果 submit():提交線程
AbstractExecutorService類
ExecutorService接口的默認實現,同時也是線程池實現類ThreadPoolExecutor的父類,主要看下submit()方法與invokeAll()方法:
submit:
/**不管參數是Callable還是Runable, 執行方法都一樣,生成一個task,然后執行task,execute方法的具體實現在ThreadPoolExecutor中,后續分析**/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
invokeAll :
/**代碼很簡單,將給定的任務線程封裝成Future對象,等待所有任務執行完成,統一返回Future對象,如果出現異常,會將未完成的任務取消**/
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
/** 沒有完成,阻塞**/
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
ThreadPoolExecutor類
在關注ThreadPoolExecutor之前,先來了解下線程的基本狀態信息。
線程總的來說有NEW(初始)、RUNNABLE(運行)、WAITING(等待)、TIME_WAITING(超時等待)、BLOCKED(阻塞)、TERMINATED(終止)6種狀態。
NEW:初始狀態,線程被構建,但是還沒有調用 start 方法
RUNNABLED:運行狀態,JAVA 線程把操作系統中的就緒和運行兩種狀態統一稱為“運行中” BLOCKED:阻塞狀態,表示線程進入等待狀態,也就是線程因為某種原因放棄了 CPU 使用權,阻塞也分為幾種情況 等待阻塞:運行的線程執行 wait 方法,jvm 會把當前線程放入到等待隊列 同步阻塞:運行的線程在獲取對象的同步鎖時,若該同步鎖被其他線程鎖占用了,那么 jvm 會把當前的線程放入到鎖池中 其他阻塞:運行的線程執行 Thread.sleep 或者 Thread.join 方法,或者發出了 I/O請求時,JVM 會把當前線程設置為阻塞狀態,當 sleep 結束、join 線程終止、
io 處理完畢則線程恢復
WAITING:等待,需要主動喚醒 TIME_WAITING:超時等待狀態,超時以后自動返回. TERMINATED:終止狀態,表示當前線程執行完畢
具體的轉化關系如下圖:

對於線程池而言,也有五種種不同的狀態,分別為RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED
RUNNING:運行狀態,可以處理任務,並且接收任務(前提阻塞隊列處於未滿狀態,阻塞隊列一旦滿了,會根據相應的飽和策略進行不同的處理) SHUTDOWN:關閉狀態,不能接收新的任務,但是能處理隊列中的任務(shutdow方法) STOP:停止狀態,不能接收行的任務,不能處理隊列中的任務並且會中斷正在運行的任務(shutdownNow方法) TIDYING:所有的任務都終止了,workCount為0,會進入該狀態,將調用terminated方法進入TERMINATED狀態 TERMINATED:terminated()方法執行完成
各個狀態之間的轉化關系(借用這里的圖)

ThreadPoolExcecutor類有一些重要的屬性:
corePoolSize:線程池中核心線程的數量 maximumPoolSize:線程池中最大線程的數量 defaultHandler:默認的線程池飽和執行策略,一般是阻塞隊列滿了后且沒有空閑線程,再有任務提交是拋出異常,還是直接丟棄等,默認的策略是拋出異:
ctl:對線程池運行狀態以及線程池中有效線程數進行記錄的一個原子性int變量,主要記錄兩部分:線程池中的有效線程(workerCount);線程的狀態(runstate)包含運行,shutdown
等狀態。該變量的高3位用來記錄runstate,低29位用來記錄有效線程數(約5億條)(其實這個地方與ReentReadWriteLock中的state變量相似)
COUNT_BITS:workerCount計數位數,低29位
CAPACITY:workerCount的最大值2^29 - 1
飽和策略(內部類)
ThreadPoolExecutor中提供了四種可選擇的飽和策略(拒絕策略),用來處理阻塞隊列已滿且沒有空閑線程,后續新來任務的處理
AbortPolicy:直接拋出異常(默認策略) CallerRunsPolicy:用調用者所在的線程執行任務 CallerRunsPolicy:丟棄隊列中最靠前的任務,執行該任務 DiscardPolicy:直接丟棄
worker類(內部類)
worker類是實現線程池的重要類,它繼承了AQS類並實現了Runnable接口,結構如下:

Worker內部類主要是用來將運行線程封裝,維護運行任務線程中斷狀態的類,該類繼承了AQS類並實現了Runnable接口
變量:
firstTask: 提交的任務線程; thread: worker類封裝后的線程,用來處理任務線程; completeTasks: 完成的任務數;
構造方法:
Worker(Runnable firstTask) {
/**初始化鎖的獲取次數**/
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
獲取鎖、釋放鎖
從Worker類獲取鎖的方式可以看到worker類只會去獲取獨占鎖,也就是說不支持重入的,這也是為什么Worker不直接使用ReentrantLock的原因,ReentrantLock是可重入的;當worker獲取到鎖時表明工作線程正在運行,不允許中斷(可以在runWorker中查看);
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
構造方法
ThreadPoolExecutor總共有四種構造方法
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
/**所有的構造方法調用的都是該方法**/
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
字段說明:
corePoolSize:線程池初始化核心線程數
maximumPoolSize:線程池最大線程數
keepAliveTime:空閑線程存活時間
workQueue:存放任務的隊列(阻塞隊列)
threadFactory:線程池的類型
handler:飽和處理策略
線程池的執行流程
下圖是個人理解的線程池執行的簡單流程(有不對的地方煩請指正)

execute方法
執行給定的任務,可能是用的是新創建的線程,也可能是已存在的線程
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/**獲取ctl,記錄workCount以及runState, 為32**/
int c = ctl.get();
/**判斷線程池中的線程數是否小於核心線程數**/
if (workerCountOf(c) < corePoolSize) {
/**添加一個工作線程線程**/
if (addWorker(command, true))
return;
/**添加失敗重新獲取ctl**/
c = ctl.get();
}
/**線程池是運行狀態,並且線程成功添加到隊列(線程池中線程數大於核心線程或者小於核心線程且添加線程失敗)**/
if (isRunning(c) && workQueue.offer(command)) {
/**重新獲取ctl**/
int recheck = ctl.get();
/**該處的二次檢查是為了防止線程池被shutdown或者上次檢查后有線程死亡**/
/**重新判斷線程池是否是運行狀態,如果不是運行狀態,將成功添加到隊列中的線程從隊列中移除,同時通過對應的飽和策略處理**/
if (! isRunning(recheck) && remove(command))
/**執行拒絕策略**/
reject(command);
/**如果工作線程為0,執行添加工作線程操作**/
else if (workerCountOf(recheck) == 0)
/**添加一個工作線程但不啟動**/
addWorker(null, false);
}
/** 執行到這里說有存在兩種情況
* 1.線程池是running狀態,工作線程數大於核心線程數且阻塞隊列已滿導致添加任務失敗。
* 2.線程池不是工作狀態
**/
else if (!addWorker(command, false))
reject(command);
}
addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
/**獲取線程池的運行狀態**/
int c = ctl.get();
int rs = runStateOf(c);
/** 判斷是否需要添加新的線程(不在添加需要滿足兩個條件:rs >= shutdown; 第二個條件整體為false)
* 1.rs >= SHUTDOWN 即線程池是shutdown、stop、tidying、terminated狀態,表示線程池不在接收新的任務。
*
* 2.rs == SHUTDOWN 即線程池不在接收新的任務;firstTask == null 即提交執行的線程為空;!workQueue.isEmpty() 即阻塞隊列不為空只要三個條件有
* 一個不滿足,則返回false。
* 2.1. 能執行到這里表名rs一定是>=SHUTDOWN的,如果rs不是SHUTDOWN狀態,線程池不會接受新的任務,以及正在處理的任務一會停掉,所以不需要添加新的
* 工作線程。
* 2.2. fistTask為空,沒必要為該任務創建新的工作線程
* 2.3. 阻塞隊列為空,進行該判斷表明rs = SHUTDOWN且阻塞隊列中的任務已經處理完,不會創建新的工作線程
**/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
/**獲取線程池中的工作線程**/
int wc = workerCountOf(c);
/**判斷工作線程是否超限**/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
/**通過cas方法添加一個工作線程數**/
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
/**根據firstTask創建一個工作線程**/
w = new Worker(firstTask);
final Thread t = w.thread;
/**firstTask為null只創建,不啟動**/
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
/**1. 線程池是running狀態
*2. 線程池是shutdown狀態並且firstTask為null
*滿足上面任意一個條件,會去添加工作線程,對於第二個條件來說,不會去接收新的任務,但阻塞隊列可能沒有處理完,可以添加新的工作線程
**/
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
/**線程是否已經啟動**/
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
/**啟動線程**/
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
/**添加工作線程失敗,進行回滾操作
*1.將添加的工作線程從工作線程集合中移除
*2.線程池工作線程數減一
*3.重新執行線程池的terminate狀態轉換
**/
addWorkerFailed(w);
}
return workerStarted;
}
runWorker方法(執行任務)
/**僅僅會在addWorker()成功時調用,內容比較簡單,需要注意三個地方getTask()、beforeExecute()、afterExecute()(后兩個可以自己重寫)**/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
/** 釋放鎖,對應於worker類構造方法中的setState(-1), 將state狀態恢復為0,允許中斷
* 線程池正在初始化任務線程時,會將鎖的初始值設置為-1,這樣做的目的是禁止執行前對任務進行中斷
**/
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
/**通過getTask()方法獲取任務**/
while (task != null || (task = getTask()) != null) {
w.lock();
/**判斷線程/線程池是否處於中斷/stop狀態**/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
/**獲得鎖並運行任務**/
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
/**釋放鎖,任務完成數加1**/
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask方法
private Runnable getTask() {
/**從阻塞隊列中獲取任務是否超時的變量設置**/
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/**如果線程池不是運行狀態
*1.線程是是否是stop、TIDYING、terminate狀態
*2.阻塞隊列是否為空
*滿足以上條件 1||2,表明線程池不處理任務,不接受新的任務,線程池任務線程數-1
**/
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
/**allowCoreThreadTimeOut為false表示線程池中核心線程數不需要進行超時判斷**/
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
/**獲取任務(都會阻塞)
* 如果設置了核心線程運行超時,或者是線程池中任務線程數多於核心線程數,通過pool設置超時時間獲取任務。
* 沒事設置超時時間,通過take方法獲取任務
**/
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
shutdownNow方法
與shutdown方法相比,多了一個drainQueue清空阻塞隊列的方法,並且所有線程進行中斷操作
/**shutdown方法主要調用了四個方法**/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
/**如果存在安全管理器,判斷是否有權限interrupt權限**/
checkShutdownAccess();
/**設置線程池運行狀態**/
advanceRunState(STOP);
/**中斷任務線程**/
interruptWorkers();
/**清空阻塞隊列**/
tasks = drainQueue();
} finally {
mainLock.unlock();
}
/**嘗試將線程池設置為terminate狀態**/
tryTerminate();
return tasks;
}
/**該方法是worker類中的方法,直接中斷,與shutdown方法相比,改方法是對所有的任務線程進行中斷操作,
*shutdown方法會去先嘗試獲取鎖,如果獲取鎖成功,表示當前線程正在等待任務,對於這種任務線程進行中斷操作**/
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
tryTerminate方法
final void tryTerminate() {
for (;;) {
int c = ctl.get();
/**1.線程池是否是運行狀態
*2.線程池是都是Tidying、terminate狀態
*3.線程池是否是shutdown狀態,並且阻塞隊列不為空
*滿足上述3個條件任意一個立即返回:
*運行狀態,線程池允許任務的處理以及添加,不能直接轉換到terminate
*shutdown狀態,阻塞隊列不為空,表示還在處理任務,不能直接轉換到terminate
**/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
/**線程池為shutdown或者stop狀態,且阻塞隊列為空
*如果線程池工作線程數不為0,至少中斷一個工作線程, 此處可能存在getTask獲取任務是一直處於阻塞的任務線程,避免隊列為空,任務線程一直阻塞的情況
**/
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
/**設置為tidying狀態**/
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
/**設置成terminated狀態**/
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
線程池的監控
getPoolSize() : 獲取當前線程池的工作線程數量 getQueue() : 獲取線程池中阻塞隊列(間接獲取阻塞隊列中任務的數量) getCompletedTaskCount() : 獲取也完成的任務數量 getTaskCount() : 獲取已運行、未運行的任務總數 getLargestPoolSize() : 線程池線程數最大值 getActiveCount():當前線程池中正在執行任務的線程數量。 getCorePoolSize() : 線程池核心線程數
常見的線程池(Executors)
Executors是線程池的工廠類,通過Executors可以創建四種不同的線程池 (newFixedThreadPool、newCachedThreadPool、newScheduledThreadPool、newSingleThreadExecutor、newWorkStealingPool(也是一種線程池,但不是通過ThreadPoolExecutor實現,不做討論))
阻塞隊列(引用這里)
SynchronousQueue:newCachedThreadPool
LinkedBlockingQueue(無界隊列):基於鏈表的阻塞隊列LinkedBlockingQueue。如果使用這種方式,那么線程池中能夠創建的最大線程數就是corePoolSize,
而maximumPoolSize就不會起作用了(后面也會說到)。當線程池中所有的核心線程都是RUNNING狀態時,這時一個新的任務提交就會放入等待隊列中。
newFixedThreadPool使用
ArrayBlockingQueue(有界隊列):使用該方式可以將線程池的最大線程數量限制為maximumPoolSize,這樣能夠降低資源的消耗,但同時這種方式也使得線程池對線程的調度變
得更困難,因為線程池和隊列的容量都是有限的值,所以要想使線程池處理任務的吞吐率達到一個相對合理的范圍,又想使線程調度相對簡單,並且還要盡可
能的降低線程池對資源的消耗,就需要合理的設置這兩個數量。
1. 如果要想降低系統資源的消耗(包括CPU的使用率,操作系統資源的消耗,上下文環境切換的開銷等), 可以設置較大的隊列容量和較小的線程池容量,
但這樣也會降低線程處理任務的吞吐量。
2. 如果提交的任務經常發生阻塞,那么可以考慮通過調用 setMaximumPoolSize() 方法來重新設定線程池的容量。
3. 如果隊列的容量設置的較小,通常需要將線程池的容量設置大一點,這樣CPU的使用率會相對的高一些。但如果線程池的容量設置的過大,則在提交的任
務數量太多的情況下,並發量會增加,那么線程之間的調度就是一個要考慮的問題,因為這樣反而有可能降低處理任務的吞吐量。
DelayedWorkQueue : ScheduledThreadPoolExecutor使用
newFixedThreadPool
固定線程數量的線程池,corePoolSize==maximumPoolSize 1.所有工作線程都在執行任務,新來任務需要在隊列中等待直到有空閑工作線程 2.工作線程在執行任務時被shutdown了,新來任務是會創建一個新的任務線程
newCachedThreadPool
可緩存線程池,corePoolSize==0, maximumPoolSize=Integer.MAX_VALUE 1.沒有核心任務處理線程 2.新來任務是如果有空閑的處理線程,直接使用已有的處理線程,否則創建一個處理線程 3.當超過60s工作線程沒有任務處理,將會被銷毀 該線程池適合處理執行時間短,數量多的任務
newScheduledThreadPool
調度線程池,jdk中單獨一個類實現,初始化對象時設置corePoolSize,maximumPoolSize=Integer.MAX_VALUE 用來設置給定延遲時間后執行
newSingleThreadExecutor
只有一個工作線程來處理任務的線程池,corePoolSize==maximumPoolSize==1
