Executor框架集對線程調度進行了封裝,將任務提交和任務執行解耦。
它提供了線程生命周期調度的所有方法,大大簡化了線程調度和同步的門檻。
Executor框架集的核心類圖如下:
從上往下,可以很清晰的看出框架集的各個類,以及它們之間的關系:
Executor,是一個可以提交可執行(Runnable)任務的Object,這個接口解耦了任務提交和執行細節(線程使用、調度等),Executor主要用來替代顯示的創建和運行線程;
ExecutorService提供了異步的管理一個或多個線程終止、執行過程(Future)的方法;
AbstractExecutorService提供了ExecutorService的一個默認實現,這個類通過RunnableFuture(實現類FutureTask)實現了submit, invokeAny, invokeAll幾個方法;
ThreadPoolExecutor是ExecutorService的一個可配置的線程池實現,它的兩個重要的配置參數:corePoolSize(線程池的基本大小,即在沒有任務需要執行的時候線程池的大小,並且只有在工作隊列滿了的情況下才會創建超出這個數量的線程。這里需要注意的是:在剛剛創建ThreadPoolExecutor的時候,線程並不會立即啟動,而是要等到有任務提交時才會啟動,除非調用了prestartCoreThread/prestartAllCoreThreads事先啟動核心線程。再考慮到keepAliveTime和allowCoreThreadTimeOut超時參數的影響,所以沒有任務需要執行的時候,線程池的大小不一定是corePoolSize。), maximumPoolSize(線程池中允許的最大線程數,線程池中的當前線程數目不會超過該值。如果隊列中任務已滿,並且當前線程個數小於maximumPoolSize,那么會創建新的線程來執行任務。這里值得一提的是largestPoolSize,該變量記錄了線程池在整個生命周期中曾經出現的最大線程個數。為什么說是曾經呢?因為線程池創建之后,可以調用setMaximumPoolSize() 改變運行的最大線程的數目。);
ScheduledExecutorService 是添加了調度特性(延遲或者定時執行)的ExecutorService;
ScheduledThreadPoolExecutor是具有調度特性的ExecutorService的池化實現;
Executors是一個Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, Callable的工具類,它能滿足大部分的日常應用場景。使用它創建線程池:
接下來,分析下ThreadPoolExecutor的實現。
ThreadPoolExecutor的作者Doug Lea,他將workerCount(線程池當前有效線程數)和runState(線程池當前所處狀態)放置到一個原子變量ctl(AtomicInteger)上,原子變量高三位保存runStatus,低29位保存workerCount。因此,ThreadPoolExecutor(JDK8)支持的最大線程數為2^29-1。線程池狀態有以下五中:
RUNNING(正常運行,-1): Accept new tasks and process queued tasks
SHUTDOWN(關閉,0): Don't accept new tasks, but process queued tasks
STOP(停止,1): Don't accept new tasks, don't process queued tasks, and interrupt in-progress tasks
TIDYING(整理中,2): All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method
TERMINATED(終結,3): terminated() has completed
線程池狀態的變遷,並不嚴格按照數字增加變化:
RUNNING -> SHUTDOWN
On invocation of shutdown(), perhaps implicitly in finalize()
(RUNNING or SHUTDOWN) -> STOP
On invocation of shutdownNow()
SHUTDOWN -> TIDYING
When both queue and pool are empty
STOP -> TIDYING
When pool is empty
TIDYING -> TERMINATED
When the terminated() hook method has completed
Threads waiting in awaitTermination() will return when the
state reaches TERMINATED.
當前工作線程計數以及線程池的狀態變遷,通過ctl原子變量的CAS操作完成。
ThreadPoolExecutor會將所有提交的任務放置到workQueue中,它是一個BlockingQueue.
所有的工作線程集(workers,HashSet<Worker>)的獲取和預定,使用一個final的ReentrantLock(mainLock)控制,還有mainLock上的等待條件termination(Condition)。
largestPoolSize(最大池容量),completedTaskCount(已完成線程計數)等私有變量,通過mainLock控制訪問。
threadFactory(volatile,線程工廠,工廠模式的典型運用),所有的線程通過addWorker方法,間接調用這個工廠創建,以下為Executors中的DefaultThreadFactory類的默認構造方法(namePrefix非常熟悉)。
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
keepAliveTime,線程等待工作的空閑時間(當allowCoreThreadTimeOut設置或者工作線程workerCount大於corePoolSize時,會超時退出,否則線程講一直運行)
allowCoreThreadTimeOut,允許核心線程超時退出(默認為false)
corePoolSize,核心線程數目(如果沒有設置allowCoreThreadTimeOut,它將是線程池中,最少活躍的線程數)
類Worker主要維護線程執行任務時的狀態打斷和其它功能預定,它通過繼承AbstractQueuedSynchronizer來簡化任務執行時鎖的獲取和釋放,Worker沒有使用可重入鎖,而是實現了一個互斥鎖,因為我們不想工作線程訪問線程池控制變量時再次獲得鎖(如setCorePoolSize)。
接下來,我們看看addWorker方法,通過指定參數,它允許以核心線程運行任務。addWorker會首先檢查當前的線程池狀態,當前運行的線程數是否允許(添加新worker),前兩項檢查通過后,會嘗試設置ctl中的線程計數(因為活躍工作線程數存儲在ctl的低位,因此,直接自增ctl便可)。線程池計數器設置后,剩下的就是添並啟動Worker,Worker集合由mainLock控制,所有workers集的修改都是由mainLock控制的。只有當集合添加成功並且新添加的線程啟動成功時,線程池計數器的設置生效,否則,計數器將回退(由addWorkerFailed方法執行)。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
只有當新添加的worker線程啟動成功時,addWorker返回成功(此時worker線程啟動start(),它的run方法中調用了runWorker方法),其它情況返回失敗。
最后看一個方法,runWorker 方法:worker線程,不斷從BlockQueu中取出任務,執行它並處理執行過程中的各種情況(如線程池的狀態變化,已執行計數)。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
runWorker方法中,直接調用了task的run()方法,大致交互過程。