使用線程池的好處
引用自 http://ifeve.com/java-threadpool/ 的說明:
- 降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。
- 提高響應速度。當任務到達時,任務可以不需要的等到線程創建就能立即執行。
- 提高線程的可管理性。線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的分配,調優和監控。
Java中的線程池是用ThreadPoolExecutor類來實現的. 本文就結合JDK 1.8對該類的源碼來分析一下這個類內部對於線程的創建, 管理以及后台任務的調度等方面的執行原理。ThreadPoolExecutor結構如下圖:
Executor接口
此接口提供了一種將任務提交與每個任務的運行機制分離的方法,包括線程使用,調度等的詳細信息。該接口中只有execute(Runnable command)方法,用來替代通常創建或啟動線程的方法。例如使用Thread創建線程
Thread thread = new Thread(); thread.start();
使用execute創建運行線程,具體的線程執行會由相應的實現類去執行(jdk默認線程池execute的實現是由ThreadPoolExecutor來實現的)
Thread thread = new Thread(); executor.execute(thread);
ExecutorService接口
ExecutorService接口提供管理終止的方法和可以生成Future的方法,用於跟蹤一個或多個異步任務的進度, 它繼承了Executor接口,同時增加了shutDown(),shutDownNow(),invokeAll(),invokeAny()和submit()等方法。
shutDown() : 允許之前提交的任務繼續執行(執行完后shutDown,不會再接收新的任務) shutDownNow():立即停止正在執行的任務 invokeAll():執行給定的任務,當所有任務完成后返回任務狀態和結果的Futures列表
invokeAny():執行給定的任務,返回已完成的任務的結果 submit():提交線程
AbstractExecutorService類
ExecutorService接口的默認實現,同時也是線程池實現類ThreadPoolExecutor的父類,主要看下submit()方法與invokeAll()方法:
submit:
/**不管參數是Callable還是Runable, 執行方法都一樣,生成一個task,然后執行task,execute方法的具體實現在ThreadPoolExecutor中,后續分析**/ public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
invokeAll :
/**代碼很簡單,將給定的任務線程封裝成Future對象,等待所有任務執行完成,統一返回Future對象,如果出現異常,會將未完成的任務取消**/ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; try { for (Callable<T> t : tasks) { RunnableFuture<T> f = newTaskFor(t); futures.add(f); execute(f); } for (int i = 0, size = futures.size(); i < size; i++) { Future<T> f = futures.get(i); if (!f.isDone()) { try { /** 沒有完成,阻塞**/ f.get(); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } } } done = true; return futures; } finally { if (!done) for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); } }
ThreadPoolExecutor類
在關注ThreadPoolExecutor之前,先來了解下線程的基本狀態信息。
線程總的來說有NEW(初始)、RUNNABLE(運行)、WAITING(等待)、TIME_WAITING(超時等待)、BLOCKED(阻塞)、TERMINATED(終止)6種狀態。
NEW:初始狀態,線程被構建,但是還沒有調用 start 方法
RUNNABLED:運行狀態,JAVA 線程把操作系統中的就緒和運行兩種狀態統一稱為“運行中” BLOCKED:阻塞狀態,表示線程進入等待狀態,也就是線程因為某種原因放棄了 CPU 使用權,阻塞也分為幾種情況 等待阻塞:運行的線程執行 wait 方法,jvm 會把當前線程放入到等待隊列 同步阻塞:運行的線程在獲取對象的同步鎖時,若該同步鎖被其他線程鎖占用了,那么 jvm 會把當前的線程放入到鎖池中 其他阻塞:運行的線程執行 Thread.sleep 或者 Thread.join 方法,或者發出了 I/O請求時,JVM 會把當前線程設置為阻塞狀態,當 sleep 結束、join 線程終止、
io 處理完畢則線程恢復
WAITING:等待,需要主動喚醒 TIME_WAITING:超時等待狀態,超時以后自動返回. TERMINATED:終止狀態,表示當前線程執行完畢
具體的轉化關系如下圖:
對於線程池而言,也有五種種不同的狀態,分別為RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED
RUNNING:運行狀態,可以處理任務,並且接收任務(前提阻塞隊列處於未滿狀態,阻塞隊列一旦滿了,會根據相應的飽和策略進行不同的處理) SHUTDOWN:關閉狀態,不能接收新的任務,但是能處理隊列中的任務(shutdow方法) STOP:停止狀態,不能接收行的任務,不能處理隊列中的任務並且會中斷正在運行的任務(shutdownNow方法) TIDYING:所有的任務都終止了,workCount為0,會進入該狀態,將調用terminated方法進入TERMINATED狀態 TERMINATED:terminated()方法執行完成
各個狀態之間的轉化關系(借用這里的圖)
ThreadPoolExcecutor類有一些重要的屬性:
corePoolSize:線程池中核心線程的數量 maximumPoolSize:線程池中最大線程的數量 defaultHandler:默認的線程池飽和執行策略,一般是阻塞隊列滿了后且沒有空閑線程,再有任務提交是拋出異常,還是直接丟棄等,默認的策略是拋出異:
ctl:對線程池運行狀態以及線程池中有效線程數進行記錄的一個原子性int變量,主要記錄兩部分:線程池中的有效線程(workerCount);線程的狀態(runstate)包含運行,shutdown
等狀態。該變量的高3位用來記錄runstate,低29位用來記錄有效線程數(約5億條)(其實這個地方與ReentReadWriteLock中的state變量相似)
COUNT_BITS:workerCount計數位數,低29位
CAPACITY:workerCount的最大值2^29 - 1
飽和策略(內部類)
ThreadPoolExecutor中提供了四種可選擇的飽和策略(拒絕策略),用來處理阻塞隊列已滿且沒有空閑線程,后續新來任務的處理
AbortPolicy:直接拋出異常(默認策略) CallerRunsPolicy:用調用者所在的線程執行任務 CallerRunsPolicy:丟棄隊列中最靠前的任務,執行該任務 DiscardPolicy:直接丟棄
worker類(內部類)
worker類是實現線程池的重要類,它繼承了AQS類並實現了Runnable接口,結構如下:
Worker內部類主要是用來將運行線程封裝,維護運行任務線程中斷狀態的類,該類繼承了AQS類並實現了Runnable接口
變量:
firstTask: 提交的任務線程; thread: worker類封裝后的線程,用來處理任務線程; completeTasks: 完成的任務數;
構造方法:
Worker(Runnable firstTask) {
/**初始化鎖的獲取次數**/ setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
獲取鎖、釋放鎖
從Worker類獲取鎖的方式可以看到worker類只會去獲取獨占鎖,也就是說不支持重入的,這也是為什么Worker不直接使用ReentrantLock的原因,ReentrantLock是可重入的;當worker獲取到鎖時表明工作線程正在運行,不允許中斷(可以在runWorker中查看);
protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
構造方法
ThreadPoolExecutor總共有四種構造方法
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) /**所有的構造方法調用的都是該方法**/ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } 字段說明: corePoolSize:線程池初始化核心線程數 maximumPoolSize:線程池最大線程數 keepAliveTime:空閑線程存活時間 workQueue:存放任務的隊列(阻塞隊列) threadFactory:線程池的類型 handler:飽和處理策略
線程池的執行流程
下圖是個人理解的線程池執行的簡單流程(有不對的地方煩請指正)
execute方法
執行給定的任務,可能是用的是新創建的線程,也可能是已存在的線程
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /**獲取ctl,記錄workCount以及runState, 為32**/ int c = ctl.get(); /**判斷線程池中的線程數是否小於核心線程數**/ if (workerCountOf(c) < corePoolSize) { /**添加一個工作線程線程**/ if (addWorker(command, true)) return; /**添加失敗重新獲取ctl**/ c = ctl.get(); } /**線程池是運行狀態,並且線程成功添加到隊列(線程池中線程數大於核心線程或者小於核心線程且添加線程失敗)**/ if (isRunning(c) && workQueue.offer(command)) { /**重新獲取ctl**/ int recheck = ctl.get(); /**該處的二次檢查是為了防止線程池被shutdown或者上次檢查后有線程死亡**/ /**重新判斷線程池是否是運行狀態,如果不是運行狀態,將成功添加到隊列中的線程從隊列中移除,同時通過對應的飽和策略處理**/ if (! isRunning(recheck) && remove(command)) /**執行拒絕策略**/ reject(command); /**如果工作線程為0,執行添加工作線程操作**/ else if (workerCountOf(recheck) == 0) /**添加一個工作線程但不啟動**/ addWorker(null, false); } /** 執行到這里說有存在兩種情況 * 1.線程池是running狀態,工作線程數大於核心線程數且阻塞隊列已滿導致添加任務失敗。 * 2.線程池不是工作狀態 **/ else if (!addWorker(command, false)) reject(command); }
addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { /**獲取線程池的運行狀態**/ int c = ctl.get(); int rs = runStateOf(c); /** 判斷是否需要添加新的線程(不在添加需要滿足兩個條件:rs >= shutdown; 第二個條件整體為false) * 1.rs >= SHUTDOWN 即線程池是shutdown、stop、tidying、terminated狀態,表示線程池不在接收新的任務。 * * 2.rs == SHUTDOWN 即線程池不在接收新的任務;firstTask == null 即提交執行的線程為空;!workQueue.isEmpty() 即阻塞隊列不為空只要三個條件有 * 一個不滿足,則返回false。 * 2.1. 能執行到這里表名rs一定是>=SHUTDOWN的,如果rs不是SHUTDOWN狀態,線程池不會接受新的任務,以及正在處理的任務一會停掉,所以不需要添加新的 * 工作線程。 * 2.2. fistTask為空,沒必要為該任務創建新的工作線程 * 2.3. 阻塞隊列為空,進行該判斷表明rs = SHUTDOWN且阻塞隊列中的任務已經處理完,不會創建新的工作線程 **/ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { /**獲取線程池中的工作線程**/ int wc = workerCountOf(c); /**判斷工作線程是否超限**/ if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; /**通過cas方法添加一個工作線程數**/ if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { /**根據firstTask創建一個工作線程**/ w = new Worker(firstTask); final Thread t = w.thread; /**firstTask為null只創建,不啟動**/ if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); /**1. 線程池是running狀態 *2. 線程池是shutdown狀態並且firstTask為null *滿足上面任意一個條件,會去添加工作線程,對於第二個條件來說,不會去接收新的任務,但阻塞隊列可能沒有處理完,可以添加新的工作線程 **/ if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { /**線程是否已經啟動**/ if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { /**啟動線程**/ t.start(); workerStarted = true; } } } finally { if (! workerStarted) /**添加工作線程失敗,進行回滾操作 *1.將添加的工作線程從工作線程集合中移除 *2.線程池工作線程數減一 *3.重新執行線程池的terminate狀態轉換 **/ addWorkerFailed(w); } return workerStarted; }
runWorker方法(執行任務)
/**僅僅會在addWorker()成功時調用,內容比較簡單,需要注意三個地方getTask()、beforeExecute()、afterExecute()(后兩個可以自己重寫)**/ final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; /** 釋放鎖,對應於worker類構造方法中的setState(-1), 將state狀態恢復為0,允許中斷 * 線程池正在初始化任務線程時,會將鎖的初始值設置為-1,這樣做的目的是禁止執行前對任務進行中斷 **/ w.unlock(); // allow interrupts boolean completedAbruptly = true; try { /**通過getTask()方法獲取任務**/ while (task != null || (task = getTask()) != null) { w.lock(); /**判斷線程/線程池是否處於中斷/stop狀態**/ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { /**獲得鎖並運行任務**/ beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { /**釋放鎖,任務完成數加1**/ task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
getTask方法
private Runnable getTask() { /**從阻塞隊列中獲取任務是否超時的變量設置**/ boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); /**如果線程池不是運行狀態 *1.線程是是否是stop、TIDYING、terminate狀態 *2.阻塞隊列是否為空 *滿足以上條件 1||2,表明線程池不處理任務,不接受新的任務,線程池任務線程數-1 **/ // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); /**allowCoreThreadTimeOut為false表示線程池中核心線程數不需要進行超時判斷**/ // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } /**獲取任務(都會阻塞) * 如果設置了核心線程運行超時,或者是線程池中任務線程數多於核心線程數,通過pool設置超時時間獲取任務。 * 沒事設置超時時間,通過take方法獲取任務 **/ try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
shutdownNow方法
與shutdown方法相比,多了一個drainQueue清空阻塞隊列的方法,並且所有線程進行中斷操作
/**shutdown方法主要調用了四個方法**/ public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { /**如果存在安全管理器,判斷是否有權限interrupt權限**/ checkShutdownAccess(); /**設置線程池運行狀態**/ advanceRunState(STOP); /**中斷任務線程**/ interruptWorkers(); /**清空阻塞隊列**/ tasks = drainQueue(); } finally { mainLock.unlock(); } /**嘗試將線程池設置為terminate狀態**/ tryTerminate(); return tasks; } /**該方法是worker類中的方法,直接中斷,與shutdown方法相比,改方法是對所有的任務線程進行中斷操作, *shutdown方法會去先嘗試獲取鎖,如果獲取鎖成功,表示當前線程正在等待任務,對於這種任務線程進行中斷操作**/ void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }
tryTerminate方法
final void tryTerminate() { for (;;) { int c = ctl.get(); /**1.線程池是否是運行狀態 *2.線程池是都是Tidying、terminate狀態 *3.線程池是否是shutdown狀態,並且阻塞隊列不為空 *滿足上述3個條件任意一個立即返回: *運行狀態,線程池允許任務的處理以及添加,不能直接轉換到terminate *shutdown狀態,阻塞隊列不為空,表示還在處理任務,不能直接轉換到terminate **/ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; /**線程池為shutdown或者stop狀態,且阻塞隊列為空 *如果線程池工作線程數不為0,至少中斷一個工作線程, 此處可能存在getTask獲取任務是一直處於阻塞的任務線程,避免隊列為空,任務線程一直阻塞的情況 **/ if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { /**設置為tidying狀態**/ if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { /**設置成terminated狀態**/ ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
線程池的監控
getPoolSize() : 獲取當前線程池的工作線程數量 getQueue() : 獲取線程池中阻塞隊列(間接獲取阻塞隊列中任務的數量) getCompletedTaskCount() : 獲取也完成的任務數量 getTaskCount() : 獲取已運行、未運行的任務總數 getLargestPoolSize() : 線程池線程數最大值 getActiveCount():當前線程池中正在執行任務的線程數量。 getCorePoolSize() : 線程池核心線程數
常見的線程池(Executors)
Executors是線程池的工廠類,通過Executors可以創建四種不同的線程池 (newFixedThreadPool、newCachedThreadPool、newScheduledThreadPool、newSingleThreadExecutor、newWorkStealingPool(也是一種線程池,但不是通過ThreadPoolExecutor實現,不做討論))
阻塞隊列(引用這里)
SynchronousQueue:newCachedThreadPool
LinkedBlockingQueue(無界隊列):基於鏈表的阻塞隊列LinkedBlockingQueue。如果使用這種方式,那么線程池中能夠創建的最大線程數就是corePoolSize,
而maximumPoolSize就不會起作用了(后面也會說到)。當線程池中所有的核心線程都是RUNNING狀態時,這時一個新的任務提交就會放入等待隊列中。
newFixedThreadPool使用
ArrayBlockingQueue(有界隊列):使用該方式可以將線程池的最大線程數量限制為maximumPoolSize,這樣能夠降低資源的消耗,但同時這種方式也使得線程池對線程的調度變
得更困難,因為線程池和隊列的容量都是有限的值,所以要想使線程池處理任務的吞吐率達到一個相對合理的范圍,又想使線程調度相對簡單,並且還要盡可
能的降低線程池對資源的消耗,就需要合理的設置這兩個數量。
1. 如果要想降低系統資源的消耗(包括CPU的使用率,操作系統資源的消耗,上下文環境切換的開銷等), 可以設置較大的隊列容量和較小的線程池容量,
但這樣也會降低線程處理任務的吞吐量。
2. 如果提交的任務經常發生阻塞,那么可以考慮通過調用 setMaximumPoolSize() 方法來重新設定線程池的容量。
3. 如果隊列的容量設置的較小,通常需要將線程池的容量設置大一點,這樣CPU的使用率會相對的高一些。但如果線程池的容量設置的過大,則在提交的任
務數量太多的情況下,並發量會增加,那么線程之間的調度就是一個要考慮的問題,因為這樣反而有可能降低處理任務的吞吐量。
DelayedWorkQueue : ScheduledThreadPoolExecutor使用
newFixedThreadPool
固定線程數量的線程池,corePoolSize==maximumPoolSize 1.所有工作線程都在執行任務,新來任務需要在隊列中等待直到有空閑工作線程 2.工作線程在執行任務時被shutdown了,新來任務是會創建一個新的任務線程
newCachedThreadPool
可緩存線程池,corePoolSize==0, maximumPoolSize=Integer.MAX_VALUE 1.沒有核心任務處理線程 2.新來任務是如果有空閑的處理線程,直接使用已有的處理線程,否則創建一個處理線程 3.當超過60s工作線程沒有任務處理,將會被銷毀 該線程池適合處理執行時間短,數量多的任務
newScheduledThreadPool
調度線程池,jdk中單獨一個類實現,初始化對象時設置corePoolSize,maximumPoolSize=Integer.MAX_VALUE 用來設置給定延遲時間后執行
newSingleThreadExecutor
只有一個工作線程來處理任務的線程池,corePoolSize==maximumPoolSize==1