線程池的源碼及原理[JDK1.6實現]
1.線程池的包含的內容
2.線程池的數據結構【核心類ThreadPoolExecutor】:

worker:工作類,一個worker代表啟動了一個線程,它啟動后會
循環執行workQueue里面的所有任務
workQueue:任務隊列,用於存放待執行的任務
keepAliveTime:線程活動保持時間,線程池的工作線程空閑后,保持存活的時間。
線程池原理:預先啟動一些線程,線程無限循環從任務隊列中獲取一個任務進行執行,直到線程池被關閉。如果某個線程因為執行某個任務發生異常而終止,那么重新創建一個新的線程而已。如此反復。
3.線程池任務submit及執行流程

a.一個任務提交,如果線程池大小沒達到corePoolSize,則每次都啟動一個worker也就是一個線程來立即執行
b.如果來不及執行,則把多余的線程放到workQueue,等待已啟動的worker來循環執行
c.如果隊列
workQueue
都放滿了還沒有執行,則在maximumPoolSize下面啟動新的worker來循環執行workQueue
d.如果啟動到
maximumPoolSize還有任務進來,線程池已達到滿負載,此時就執行任務拒絕RejectedExecutionHandler
Java Code
線程池核心的代碼
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
// 流程就是:沒達到corePoolSize,創建worker執行,達到corePoolSize加入workQueue // workQueue滿了且在maximumPoolSize下,創建新worker,達到maximumPoolSize,執行reject public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 1:poolSize達到corePoolSize,執行3把任務加入workQueue // 2:poolSize沒達到,執行addIfUnderCorePoolSize()在corePoolSize內創建新worker立即執行任務 // 如果達到corePoolSize,則同上執行3 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { // 3:workQueue滿了,執行5 if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) { // 4:如果線程池關閉,執行拒絕策略 // 如果poolSize==0,新啟動一個線程執行隊列內任務 ensureQueuedTaskHandled(command); } // 5:在maximumPoolSize內創建新worker立即執行任務 // 如果達到maximumPoolSize,執行6拒絕策略 } else if (!addIfUnderMaximumPoolSize(command)) // 6:拒絕策略 reject(command); // is shutdown or saturated } } |
從上面代碼可以看出,一個任務提交什么時候
立即執行呢?
runState
==
RUNNING
&&
( poolSize
<
corePoolSize
||
(
workQueue.isFull() && poolSize < maxnumPoolSize
)
)
4.工作Worker的原理
上面講過線程池創建線程其實是委托給Worker這個對象完成的。worker會循環獲取工作隊列的任務來完成
Java Code
工作線程Worker執行
1
2 3 4 5 6 7 8 9 10 11 12 13 14 |
public
void run() {
try { Runnable task = firstTask; firstTask = null; // getTask()是從workQueue里面阻塞獲取任務,如果getTask()返回null則終結本線程 while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { // 走到這里代表這個worker或者說這個線程由於線程池關閉或超過aliveTime需要關閉了 workerDone( this); } } |
5.線程的銷毀
keepAliveTime:代表的就是線程空閑后多久后銷毀,線程的銷毀是通過worker的getTask()來實現的。
一般來說,Worker會循環獲取getTask(),
如果getTask()返回null則工作線程worker終結,那我們再看看什么時候getTask()返回null
Java Code Worker的getTask方法
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
Runnable getTask() {
for (;;) { try { int state = runState; if (state > SHUTDOWN) return null; Runnable r; if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut) // 在poolSize大於corePoolSize或允許核心線程超時時 // 阻塞超時獲取有可能獲取到null,此時worker線程銷毀 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take(); if (r != null) return r; // 這里是是否運行worker線程銷毀的判斷 if ( workerCanExit()) { if (runState >= SHUTDOWN) // STOP或TERMINATED狀態,終止空閑worker interruptIdleWorkers(); return null; // 這里返回null,代表工作線程worker銷毀 } // 其他:retry,繼續循環 } catch (InterruptedException ie) { // On interruption, re-check runState } } } |
6.線程池關閉
平緩關閉
shutdown
:已經啟動的任務全部執行完畢,同時不再接受新的任務
立即關閉
shutdownNow
:取消所有正在執行和未執行的任務
具體參考源碼
7.線程池的監控
通過線程池提供的參數進行監控。線程池里有一些屬性在監控線程池的時候可以使用
taskCount:線程池需要執行的任務數量。completedTaskCount:線程池在運行過程中已完成的任務數量。小於或等於taskCount。largestPoolSize:線程池曾經創建過的最大線程數量。通過這個數據可以知道線程池是否滿過。如等於線程池的最大大小,則表示線程池曾經滿了。getPoolSize:線程池的線程數量。如果線程池不銷毀的話,池里的線程不會自動銷毀,所以這個大小只增不+ getActiveCount:獲取活動的線程數。
通過擴展線程池進行監控。通過繼承線程池並重寫線程池的beforeExecute,afterExecute和terminated方法,我們可以在任務執行前,執行后和線程池關閉前干一些事情。如監控任務的平均執行時間,最大執行時間和最小執行時間等。這幾個方法在線程池里是空方法。
8.線程池調優[更多可參考:線程池與工作隊列]
調整線程池的大小 - 線程池的最佳大小取決於可用處理器的數目以及工作隊列中的任務的性質。
調整線程池的大小基本上就是
避免兩類錯誤:線程太少或線程太多。
a.CPU限制的任務,提高CPU利用率。
在運行於具有 N 個處理器機器上的計算限制的應用程序中,在線程數目接近 N 時添加額外的線程可能會改善總處理能力,而在線程數目超過 N 時添加額外的線程將不起作用。事實上,太多的線程甚至會降低性能,因為它會導致額外的環境切換開銷。
若在一個具有 N 個處理器的系統上只有一個工作隊列,其中全部是計算性質的任務,在線程池具有
N 或 N+1個線程時一般會獲得最大的 CPU 利用率。
b.I/O限制的任務(例如,從套接字讀取 HTTP 請求的任務)
需要讓池的大小超過可用處理器的數目,因為並不是所有線程都一直在工作。通過使用概要分析,您可以或得一些數據,並計算出大概的線程池大小。
Amdahl 法則提供很好的近似公式。用 WT 表示每項任務的平均等待時間,ST 表示每項任務的平均服務時間(計算時間)。則 WT/ST 是每項任務等待所用時間的百分比。對於 N 處理器系統,池中可以近似有
N*(1+WT/ST) 個線程。
c.綜合考慮線程池性能瓶頸
a.處理器利用率
b.隨着線程池的增長,您可能會碰到調度程序、可用內存方面的限制,或者其它系統資源方面的限制,例如套接字、打開的文件句柄或數據庫連接等的數目。
9.線程池擴展 - 延時線程池 ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor是在普通線程池的基礎上增加了兩個功能,
一是延時執行+定時執行,二是重復執行
定時Executor的流程在大體上與普通線程池一致,因此它繼承於ThreadPoolExecutor,
對於問題1,它采用了DelayedQueue來實現此功能。對於問題2,定時Executor每次執行完調用ThreadPoolExecutor.runAndReset()重置狀態,然后重新把任務加入到Delayed隊列中
定時Executor在外部Runnable的基礎上套了一個ScheduledFutureTask,其核心源碼如下:
Java Code
普通任務的外部封裝Future
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
// 加入的任務外部封裝了ScheduledFutureTask,繼承於FutureTask,因此也可以獲取任務結果 private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { // 省略部分代碼 // 周期性運行,執行完成就把任務加入到delay隊列中 private void runPeriodic() { // 這里重置線程池狀態 boolean ok = ScheduledFutureTask. super.runAndReset(); boolean down = isShutdown(); // Reschedule if not cancelled and not shutdown or policy allows if (ok && (!down || (getContinueExistingPeriodicTasksAfterShutdownPolicy() && !isTerminating()))) { long p = period; if (p > 0) time += p; else time = now() - p; // 重復把任務加入到線程池delay隊列中 ScheduledThreadPoolExecutor. super.getQueue().add( this); } else if (down) interruptIdleWorkers(); } // 線程池調用的run方法 public void run() { if (isPeriodic()) runPeriodic(); else ScheduledFutureTask. super.run(); } } |
10.線程池擴展 - ExecutorCompletionService
ExecutorCompletionService描述的是:我們提交一堆任務到線程池,
線程池在任務執行完成后把結果FutureTask放到FIFO隊列中,然后我們就可以來對結果進行操作,比如說匯總,累加等等。
其實現原理很簡單,就是利用FutureTask的done()擴展方法,把此Future加入到queue,請看源碼
Java Code
ExecutorCompletionService核心代碼
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
public
class ExecutorCompletionService<V>
implements CompletionService<V> {
// 部分代碼省略 // 外部Future的封裝類 private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } // 這里把Future加入到 completionQueue protected void done() { completionQueue.add(task); } private final Future<V> task; } public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); // 對f外層又包了一層 QueueingFuture executor.execute( new QueueingFuture(f)); return f; } // 外部則可通過 completionQueue 來獲取已完成的任務Future public Future<V> take() throws InterruptedException { return completionQueue.take(); } } |