相關文章目錄:
Java線程池ThreadPoolExecutor使用和分析(一)
Java線程池ThreadPoolExecutor使用和分析(二) - execute()原理
Java線程池ThreadPoolExecutor使用和分析(三) - 終止線程池原理
終止線程池主要有兩個方法:shutdown() 和 shutdownNow()。
shutdown()后線程池將變成shutdown狀態,此時不接收新任務,但會處理完正在運行的 和 在阻塞隊列中等待處理的任務。
shutdownNow()后線程池將變成stop狀態,此時不接收新任務,不再處理在阻塞隊列中等待的任務,還會嘗試中斷正在處理中的工作線程。
下面是對線程池的幾種終止方式的分析,基於JDK 1.7
以下是本文的目錄大綱:
interruptIdleWorkers() -- 中斷空閑worker
interruptWorkers() -- 中斷所有worker
三、awaitTermination() -- 等待線程池終止
若有不正之處請多多諒解,歡迎批評指正、互相討論。
請尊重作者勞動成果,轉載請標明原文鏈接:
http://www.cnblogs.com/trust-freedom/p/6693601.html
一、shutdown() -- 溫柔的終止線程池
/** * Initiates an orderly shutdown in which previously submitted * tasks are executed, but no new tasks will be accepted. * Invocation has no additional effect if already shut down. * 開始一個有序的關閉,在關閉中,之前提交的任務會被執行(包含正在執行的,在阻塞隊列中的),但新任務會被拒絕 * 如果線程池已經shutdown,調用此方法不會有附加效應 * * <p>This method does not wait for previously submitted tasks to * complete execution. Use {@link #awaitTermination awaitTermination} * to do that. * 當前方法不會等待之前提交的任務執行結束,可以使用awaitTermination() * * @throws SecurityException {@inheritDoc} */ public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); //上鎖 try { //判斷調用者是否有權限shutdown線程池 checkShutdownAccess(); //CAS+循環設置線程池狀態為shutdown advanceRunState(SHUTDOWN); //中斷所有空閑線程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); //解鎖 } //嘗試終止線程池 tryTerminate(); }
shutdown()執行流程:
1、上鎖,mainLock是線程池的主鎖,是可重入鎖,當要操作workers set這個保持線程的HashSet時,需要先獲取mainLock,還有當要處理largestPoolSize、completedTaskCount這類統計數據時需要先獲取mainLock
2、判斷調用者是否有權限shutdown線程池
3、使用CAS操作將線程池狀態設置為shutdown,shutdown之后將不再接收新任務
4、中斷所有空閑線程 interruptIdleWorkers()
5、onShutdown(),ScheduledThreadPoolExecutor中實現了這個方法,可以在shutdown()時做一些處理
6、解鎖
7、嘗試終止線程池 tryTerminate()
可以看到shutdown()方法最重要的幾個步驟是:更新線程池狀態為shutdown、中斷所有空閑線程、tryTerminated()嘗試終止線程池
那么,什么是空閑線程?interruptIdleWorkers() 是怎么中斷空閑線程的?
/** * Interrupts threads that might be waiting for tasks (as * indicated by not being locked) so they can check for * termination or configuration changes. Ignores * SecurityExceptions (in which case some threads may remain * uninterrupted). * 中斷在等待任務的線程(沒有上鎖的),中斷喚醒后,可以判斷線程池狀態是否變化來決定是否繼續 * * @param onlyOne If true, interrupt at most one worker. This is * called only from tryTerminate when termination is otherwise * enabled but there are still other workers. In this case, at * most one waiting worker is interrupted to propagate shutdown * signals in case(以免) all threads are currently waiting. * Interrupting any arbitrary thread ensures that newly arriving * workers since shutdown began will also eventually exit. * To guarantee eventual termination, it suffices to always * interrupt only one idle worker, but shutdown() interrupts all * idle workers so that redundant workers exit promptly, not * waiting for a straggler task to finish. * * onlyOne如果為true,最多interrupt一個worker * 只有當終止流程已經開始,但線程池還有worker線程時,tryTerminate()方法會做調用onlyOne為true的調用 * (終止流程已經開始指的是:shutdown狀態 且 workQueue為空,或者 stop狀態) * 在這種情況下,最多有一個worker被中斷,為了傳播shutdown信號,以免所有的線程都在等待 * 為保證線程池最終能終止,這個操作總是中斷一個空閑worker * 而shutdown()中斷所有空閑worker,來保證空閑線程及時退出 */ private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); //上鎖 try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); //解鎖 } }
interruptIdleWorkers() 首先會獲取mainLock鎖,因為要迭代workers set,在中斷每個worker前,需要做兩個判斷:
1、線程是否已經被中斷,是就什么都不做
2、worker.tryLock() 是否成功
第二個判斷比較重要,因為Worker類除了實現了可執行的Runnable,也繼承了AQS,本身也是一把鎖,具體可見 ThreadPoolExecutor內部類Worker解析
tryLock()調用了Worker自身實現的tryAcquire()方法,這也是AQS規定子類需要實現的嘗試獲取鎖的方法
protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
tryAcquire()先嘗試將AQS的state從0-->1,返回true代表上鎖成功,並設置當前線程為鎖的擁有者
可以看到compareAndSetState(0, 1)只嘗試了一次獲取鎖,且不是每次state+1,而是0-->1,說明鎖不是可重入的
但是為什么要worker.tryLock()獲取worker的鎖呢?
這就是Woker類存在的價值之一,控制線程中斷
在runWorker()方法中每次獲取到task,task.run()之前都需要worker.lock()上鎖,運行結束后解鎖,即正在運行任務的工作線程都是上了worker鎖的
在interruptIdleWorkers()中斷之前需要先tryLock()獲取worker鎖,意味着正在運行的worker不能中斷,因為worker.tryLock()失敗,且鎖是不可重入的
故shutdown()只有對能獲取到worker鎖的空閑線程(正在從workQueue中getTask(),此時worker沒有加鎖)發送中斷信號
由此可以將worker划分為: |
正阻塞在getTask()獲取任務的worker在被中斷后,會拋出InterruptedException,不再阻塞獲取任務
捕獲中斷異常后,將繼續循環到getTask()最開始的判斷線程池狀態的邏輯,當線程池是shutdown狀態,且workQueue.isEmpty時,return null,進行worker線程退出邏輯
某些情況下,interruptIdleWorkers()時多個worker正在運行,不會對其發出中斷信號,假設此時workQueue也不為空
那么當多個worker運行結束后,會到workQueue阻塞獲取任務,獲取到的執行任務,沒獲取到的,如果還是核心線程,會一直workQueue.take()阻塞住,線程無法終止,因為workQueue已經空了,且shutdown后不會接收新任務了
這就需要在shutdown()后,還可以發出中斷信號
Doug Lea大神巧妙的在所有可能導致線程池產終止的地方安插了tryTerminated()嘗試線程池終止的邏輯,並在其中判斷如果線程池已經進入終止流程,沒有任務等待執行了,但線程池還有線程,中斷喚醒一個空閑線程
shutdown()的最后也調用了tryTerminated()方法,下面看看這個方法的邏輯:
/** * Transitions to TERMINATED state if either (SHUTDOWN and pool * and queue empty) or (STOP and pool empty). If otherwise * eligible to terminate but workerCount is nonzero, interrupts an * idle worker to ensure that shutdown signals propagate. This * method must be called following any action that might make * termination possible -- reducing worker count or removing tasks * from the queue during shutdown. The method is non-private to * allow access from ScheduledThreadPoolExecutor. * * 在以下情況將線程池變為TERMINATED終止狀態 * shutdown 且 正在運行的worker 和 workQueue隊列 都empty * stop 且 沒有正在運行的worker * * 這個方法必須在任何可能導致線程池終止的情況下被調用,如: * 減少worker數量 * shutdown時從queue中移除任務 * * 這個方法不是私有的,所以允許子類ScheduledThreadPoolExecutor調用 */ final void tryTerminate() { //這個for循環主要是和進入關閉線程池操作的CAS判斷結合使用的 for (;;) { int c = ctl.get(); /** * 線程池是否需要終止 * 如果以下3中情況任一為true,return,不進行終止 * 1、還在運行狀態 * 2、狀態是TIDYING、或 TERMINATED,已經終止過了 * 3、SHUTDOWN 且 workQueue不為空 */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; /** * 只有shutdown狀態 且 workQueue為空,或者 stop狀態能執行到這一步 * 如果此時線程池還有線程(正在運行任務,正在等待任務) * 中斷喚醒一個正在等任務的空閑worker * 喚醒后再次判斷線程池狀態,會return null,進入processWorkerExit()流程 */ if (workerCountOf(c) != 0) { // Eligible to terminate 資格終止 interruptIdleWorkers(ONLY_ONE); //中斷workers集合中的空閑任務,參數為true,只中斷一個 return; } /** * 如果狀態是SHUTDOWN,workQueue也為空了,正在運行的worker也沒有了,開始terminated */ final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //CAS:將線程池的ctl變成TIDYING(所有的任務被終止,workCount為0,為此狀態時將會調用terminated()方法),期間ctl有變化就會失敗,會再次for循環 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); //需子類實現 } finally { ctl.set(ctlOf(TERMINATED, 0)); //將線程池的ctl變成TERMINATED termination.signalAll(); //喚醒調用了 等待線程池終止的線程 awaitTermination() } return; } } finally { mainLock.unlock(); } // else retry on failed CAS // 如果上面的CAS判斷false,再次循環 } }
tryTerminate() 執行流程:
1、判斷線程池是否需要進入終止流程(只有當shutdown狀態+workQueue.isEmpty 或 stop狀態,才需要)
2、判斷線程池中是否還有線程,有則 interruptIdleWorkers(ONLY_ONE) 嘗試中斷一個空閑線程(正是這個邏輯可以再次發出中斷信號,中斷阻塞在獲取任務的線程)
3、如果狀態是SHUTDOWN,workQueue也為空了,正在運行的worker也沒有了,開始terminated
會先上鎖,將線程池置為tidying狀態,之后調用需子類實現的 terminated(),最后線程池置為terminated狀態,並喚醒所有等待線程池終止這個Condition的線程
二、shutdownNow() -- 強硬的終止線程池
/** * Attempts to stop all actively executing tasks, halts the * processing of waiting tasks, and returns a list of the tasks * that were awaiting execution. These tasks are drained (removed) * from the task queue upon return from this method. * 嘗試停止所有活動的正在執行的任務,停止等待任務的處理,並返回正在等待被執行的任務列表 * 這個任務列表是從任務隊列中排出(刪除)的 * * <p>This method does not wait for actively executing tasks to * terminate. Use {@link #awaitTermination awaitTermination} to * do that. * 這個方法不用等到正在執行的任務結束,要等待線程池終止可使用awaitTermination() * * <p>There are no guarantees beyond best-effort attempts to stop * processing actively executing tasks. This implementation * cancels tasks via {@link Thread#interrupt}, so any task that * fails to respond to interrupts may never terminate. * 除了盡力嘗試停止運行中的任務,沒有任何保證 * 取消任務是通過Thread.interrupt()實現的,所以任何響應中斷失敗的任務可能永遠不會結束 * * @throws SecurityException {@inheritDoc} */ public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); //上鎖 try { //判斷調用者是否有權限shutdown線程池 checkShutdownAccess(); //CAS+循環設置線程池狀態為stop advanceRunState(STOP); //中斷所有線程,包括正在運行任務的 interruptWorkers(); tasks = drainQueue(); //將workQueue中的元素放入一個List並返回 } finally { mainLock.unlock(); //解鎖 } //嘗試終止線程池 tryTerminate(); return tasks; //返回workQueue中未執行的任務 }
shutdownNow() 和 shutdown()的大體流程相似,差別是:
1、將線程池更新為stop狀態
2、調用 interruptWorkers() 中斷所有線程,包括正在運行的線程
3、將workQueue中待處理的任務移到一個List中,並在方法最后返回,說明shutdownNow()后不會再處理workQueue中的任務
private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } }
interruptWorkers() 很簡單,循環對所有worker調用 interruptIfStarted(),其中會判斷worker的AQS state是否大於0,即worker是否已經開始運作,再調用Thread.interrupt()
需要注意的是,對於運行中的線程調用Thread.interrupt()並不能保證線程被終止,task.run()內部可能捕獲了InterruptException,沒有上拋,導致線程一直無法結束
三、awaitTermination() -- 等待線程池終止
參數:
timeout:超時時間
unit: timeout超時時間的單位
返回:
true:線程池終止
false:超過timeout指定時間
在發出一個shutdown請求后,在以下3種情況發生之前,awaitTermination()都會被阻塞
1、所有任務完成執行
2、到達超時時間
3、當前線程被中斷
/** * Wait condition to support awaitTermination */ private final Condition termination = mainLock.newCondition();
awaitTermination() 循環的判斷線程池是否terminated終止 或 是否已經超過超時時間,然后通過termination這個Condition阻塞等待一段時間
termination.awaitNanos() 是通過 LockSupport.parkNanos(this, nanosTimeout)實現的阻塞等待
阻塞等待過程中發生以下具體情況會解除阻塞(對上面3種情況的解釋):
1、如果發生了 termination.signalAll()(內部實現是 LockSupport.unpark())會喚醒阻塞等待,且由於ThreadPoolExecutor只有在 tryTerminated()嘗試終止線程池成功,將線程池更新為terminated狀態后才會signalAll(),故awaitTermination()再次判斷狀態會return true退出
2、如果達到了超時時間 termination.awaitNanos() 也會返回,此時nano==0,再次循環判斷return false,等待線程池終止失敗
3、如果當前線程被 Thread.interrupt(),termination.awaitNanos()會上拋InterruptException,awaitTermination()繼續上拋給調用線程,會以異常的形式解除阻塞
故終止線程池並需要知道其是否終止可以用如下方式:
executorService.shutdown(); try{ while(!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) { LOGGER.debug("Waiting for terminate"); } } catch (InterruptedException e) { //中斷處理 }
參考資料: