Java線程池ThreadPoolExecutor使用和分析(三) - 終止線程池原理


    相關文章目錄:

    Java線程池ThreadPoolExecutor使用和分析(一)

    Java線程池ThreadPoolExecutor使用和分析(二) - execute()原理

    Java線程池ThreadPoolExecutor使用和分析(三) - 終止線程池原理

 

    終止線程池主要有兩個方法:shutdown() 和 shutdownNow()。

    shutdown()后線程池將變成shutdown狀態,此時不接收新任務,但會處理完正在運行的 和 在阻塞隊列中等待處理的任務。

    shutdownNow()后線程池將變成stop狀態,此時不接收新任務,不再處理在阻塞隊列中等待的任務,還會嘗試中斷正在處理中的工作線程。

    下面是對線程池的幾種終止方式的分析,基於JDK 1.7

 

    以下是本文的目錄大綱:

    一、shutdown()  --  溫柔的終止線程池

        interruptIdleWorkers()  --  中斷空閑worker

        tryTerminate()  --  嘗試終止線程池

    二、shutdownNow()  --  強硬的終止線程池

        interruptWorkers()  --  中斷所有worker

    三、awaitTermination()  --  等待線程池終止

 

    若有不正之處請多多諒解,歡迎批評指正、互相討論。

    請尊重作者勞動成果,轉載請標明原文鏈接:

    http://www.cnblogs.com/trust-freedom/p/6693601.html

一、shutdown()  --  溫柔的終止線程池

/**
 * Initiates an orderly shutdown in which previously submitted
 * tasks are executed, but no new tasks will be accepted.
 * Invocation has no additional effect if already shut down.
 * 開始一個有序的關閉,在關閉中,之前提交的任務會被執行(包含正在執行的,在阻塞隊列中的),但新任務會被拒絕
 * 如果線程池已經shutdown,調用此方法不會有附加效應
 *
 * <p>This method does not wait for previously submitted tasks to
 * complete execution.  Use {@link #awaitTermination awaitTermination}
 * to do that.
 * 當前方法不會等待之前提交的任務執行結束,可以使用awaitTermination()
 *
 * @throws SecurityException {@inheritDoc}
 */
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock(); //上鎖
    
    try {
    	//判斷調用者是否有權限shutdown線程池
        checkShutdownAccess();
        
        //CAS+循環設置線程池狀態為shutdown
        advanceRunState(SHUTDOWN);
        
        //中斷所有空閑線程
        interruptIdleWorkers();
        
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } 
    finally {
        mainLock.unlock(); //解鎖
    }
    
    //嘗試終止線程池
    tryTerminate();
}

shutdown()執行流程:

1、上鎖,mainLock是線程池的主鎖,是可重入鎖,當要操作workers set這個保持線程的HashSet時,需要先獲取mainLock,還有當要處理largestPoolSize、completedTaskCount這類統計數據時需要先獲取mainLock

2、判斷調用者是否有權限shutdown線程池

3、使用CAS操作將線程池狀態設置為shutdown,shutdown之后將不再接收新任務

4、中斷所有空閑線程  interruptIdleWorkers()

5、onShutdown(),ScheduledThreadPoolExecutor中實現了這個方法,可以在shutdown()時做一些處理

6、解鎖

7、嘗試終止線程池  tryTerminate()

 

可以看到shutdown()方法最重要的幾個步驟是:更新線程池狀態為shutdown中斷所有空閑線程tryTerminated()嘗試終止線程池

那么,什么是空閑線程?interruptIdleWorkers() 是怎么中斷空閑線程的?

/**
 * Interrupts threads that might be waiting for tasks (as
 * indicated by not being locked) so they can check for
 * termination or configuration changes. Ignores
 * SecurityExceptions (in which case some threads may remain
 * uninterrupted).
 * 中斷在等待任務的線程(沒有上鎖的),中斷喚醒后,可以判斷線程池狀態是否變化來決定是否繼續
 *
 * @param onlyOne If true, interrupt at most one worker. This is
 * called only from tryTerminate when termination is otherwise
 * enabled but there are still other workers.  In this case, at
 * most one waiting worker is interrupted to propagate shutdown
 * signals in case(以免) all threads are currently waiting.
 * Interrupting any arbitrary thread ensures that newly arriving
 * workers since shutdown began will also eventually exit.
 * To guarantee eventual termination, it suffices to always
 * interrupt only one idle worker, but shutdown() interrupts all
 * idle workers so that redundant workers exit promptly, not
 * waiting for a straggler task to finish.
 * 
 * onlyOne如果為true,最多interrupt一個worker
 * 只有當終止流程已經開始,但線程池還有worker線程時,tryTerminate()方法會做調用onlyOne為true的調用
 * (終止流程已經開始指的是:shutdown狀態 且 workQueue為空,或者 stop狀態)
 * 在這種情況下,最多有一個worker被中斷,為了傳播shutdown信號,以免所有的線程都在等待
 * 為保證線程池最終能終止,這個操作總是中斷一個空閑worker
 * 而shutdown()中斷所有空閑worker,來保證空閑線程及時退出
 */
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock(); //上鎖
    try {
        for (Worker w : workers) { 
            Thread t = w.thread;
            
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock(); //解鎖
    }
}

interruptIdleWorkers() 首先會獲取mainLock鎖,因為要迭代workers set,在中斷每個worker前,需要做兩個判斷:

1、線程是否已經被中斷,是就什么都不做

2、worker.tryLock() 是否成功

第二個判斷比較重要,因為Worker類除了實現了可執行的Runnable,也繼承了AQS,本身也是一把鎖,具體可見 ThreadPoolExecutor內部類Worker解析

tryLock()調用了Worker自身實現的tryAcquire()方法,這也是AQS規定子類需要實現的嘗試獲取鎖的方法

protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) { 
        setExclusiveOwnerThread(Thread.currentThread()); 
        return true;
    }
    return false;
}

tryAcquire()先嘗試將AQS的state從0-->1,返回true代表上鎖成功,並設置當前線程為鎖的擁有者

可以看到compareAndSetState(0, 1)只嘗試了一次獲取鎖,且不是每次state+1,而是0-->1,說明鎖不是可重入的

 

但是為什么要worker.tryLock()獲取worker的鎖呢?

這就是Woker類存在的價值之一,控制線程中斷

在runWorker()方法中每次獲取到task,task.run()之前都需要worker.lock()上鎖,運行結束后解鎖,即正在運行任務的工作線程都是上了worker鎖的

 

在interruptIdleWorkers()中斷之前需要先tryLock()獲取worker鎖,意味着正在運行的worker不能中斷,因為worker.tryLock()失敗,且鎖是不可重入的

故shutdown()只有對能獲取到worker鎖的空閑線程(正在從workQueue中getTask(),此時worker沒有加鎖)發送中斷信號

由此可以將worker划分為:
1、空閑worker:正在從workQueue阻塞隊列中獲取任務的worker
2、運行中worker:正在task.run()執行任務的worker

正阻塞在getTask()獲取任務的worker在被中斷后,會拋出InterruptedException,不再阻塞獲取任務

捕獲中斷異常后,將繼續循環到getTask()最開始的判斷線程池狀態的邏輯,當線程池是shutdown狀態,且workQueue.isEmpty時,return null,進行worker線程退出邏輯

 

某些情況下,interruptIdleWorkers()時多個worker正在運行,不會對其發出中斷信號,假設此時workQueue也不為空

那么當多個worker運行結束后,會到workQueue阻塞獲取任務,獲取到的執行任務,沒獲取到的,如果還是核心線程,會一直workQueue.take()阻塞住,線程無法終止,因為workQueue已經空了,且shutdown后不會接收新任務了

這就需要在shutdown()后,還可以發出中斷信號

Doug Lea大神巧妙的在所有可能導致線程池產終止的地方安插了tryTerminated()嘗試線程池終止的邏輯,並在其中判斷如果線程池已經進入終止流程,沒有任務等待執行了,但線程池還有線程,中斷喚醒一個空閑線程

shutdown()的最后也調用了tryTerminated()方法,下面看看這個方法的邏輯:

/**
 * Transitions to TERMINATED state if either (SHUTDOWN and pool
 * and queue empty) or (STOP and pool empty).  If otherwise
 * eligible to terminate but workerCount is nonzero, interrupts an
 * idle worker to ensure that shutdown signals propagate. This
 * method must be called following any action that might make
 * termination possible -- reducing worker count or removing tasks
 * from the queue during shutdown. The method is non-private to
 * allow access from ScheduledThreadPoolExecutor.
 * 
 * 在以下情況將線程池變為TERMINATED終止狀態
 * shutdown 且 正在運行的worker 和 workQueue隊列 都empty
 * stop 且  沒有正在運行的worker
 * 
 * 這個方法必須在任何可能導致線程池終止的情況下被調用,如:
 * 減少worker數量
 * shutdown時從queue中移除任務
 * 
 * 這個方法不是私有的,所以允許子類ScheduledThreadPoolExecutor調用
 */
final void tryTerminate() {
	//這個for循環主要是和進入關閉線程池操作的CAS判斷結合使用的
    for (;;) {
        int c = ctl.get();
        
        /**
         * 線程池是否需要終止
         * 如果以下3中情況任一為true,return,不進行終止
         * 1、還在運行狀態
         * 2、狀態是TIDYING、或 TERMINATED,已經終止過了
         * 3、SHUTDOWN 且 workQueue不為空
         */
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        
        /**
         * 只有shutdown狀態 且 workQueue為空,或者 stop狀態能執行到這一步
         * 如果此時線程池還有線程(正在運行任務,正在等待任務)
         * 中斷喚醒一個正在等任務的空閑worker
         * 喚醒后再次判斷線程池狀態,會return null,進入processWorkerExit()流程
         */
        if (workerCountOf(c) != 0) { // Eligible to terminate 資格終止
            interruptIdleWorkers(ONLY_ONE); //中斷workers集合中的空閑任務,參數為true,只中斷一個
            return;
        }

        /**
         * 如果狀態是SHUTDOWN,workQueue也為空了,正在運行的worker也沒有了,開始terminated
         */
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
        	//CAS:將線程池的ctl變成TIDYING(所有的任務被終止,workCount為0,為此狀態時將會調用terminated()方法),期間ctl有變化就會失敗,會再次for循環
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated(); //需子類實現
                } 
                finally {
                    ctl.set(ctlOf(TERMINATED, 0)); //將線程池的ctl變成TERMINATED
                    termination.signalAll(); //喚醒調用了 等待線程池終止的線程 awaitTermination() 
                }
                return;
            }
        }
        finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
        // 如果上面的CAS判斷false,再次循環
    }
}

tryTerminate() 執行流程:

1、判斷線程池是否需要進入終止流程(只有當shutdown狀態+workQueue.isEmpty 或 stop狀態,才需要)

2、判斷線程池中是否還有線程,有則 interruptIdleWorkers(ONLY_ONE) 嘗試中斷一個空閑線程(正是這個邏輯可以再次發出中斷信號,中斷阻塞在獲取任務的線程)

3、如果狀態是SHUTDOWN,workQueue也為空了,正在運行的worker也沒有了,開始terminated

    會先上鎖,將線程池置為tidying狀態,之后調用需子類實現的 terminated(),最后線程池置為terminated狀態,並喚醒所有等待線程池終止這個Condition的線程

二、shutdownNow()  --  強硬的終止線程池

/**
 * Attempts to stop all actively executing tasks, halts the
 * processing of waiting tasks, and returns a list of the tasks
 * that were awaiting execution. These tasks are drained (removed)
 * from the task queue upon return from this method.
 * 嘗試停止所有活動的正在執行的任務,停止等待任務的處理,並返回正在等待被執行的任務列表
 * 這個任務列表是從任務隊列中排出(刪除)的
 *
 * <p>This method does not wait for actively executing tasks to
 * terminate.  Use {@link #awaitTermination awaitTermination} to
 * do that.
 * 這個方法不用等到正在執行的任務結束,要等待線程池終止可使用awaitTermination()
 *
 * <p>There are no guarantees beyond best-effort attempts to stop
 * processing actively executing tasks.  This implementation
 * cancels tasks via {@link Thread#interrupt}, so any task that
 * fails to respond to interrupts may never terminate.
 * 除了盡力嘗試停止運行中的任務,沒有任何保證
 * 取消任務是通過Thread.interrupt()實現的,所以任何響應中斷失敗的任務可能永遠不會結束
 *
 * @throws SecurityException {@inheritDoc}
 */
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock(); //上鎖
    
    try {
    	//判斷調用者是否有權限shutdown線程池
        checkShutdownAccess();
        
        //CAS+循環設置線程池狀態為stop
        advanceRunState(STOP);
        
        //中斷所有線程,包括正在運行任務的
        interruptWorkers();
        
        tasks = drainQueue(); //將workQueue中的元素放入一個List並返回
    } 
    finally {
        mainLock.unlock(); //解鎖
    }
    
    //嘗試終止線程池
    tryTerminate();
    
    return tasks; //返回workQueue中未執行的任務
}

shutdownNow() 和 shutdown()的大體流程相似,差別是:

1、將線程池更新為stop狀態

2、調用 interruptWorkers() 中斷所有線程,包括正在運行的線程

3、將workQueue中待處理的任務移到一個List中,並在方法最后返回,說明shutdownNow()后不會再處理workQueue中的任務

 

interruptWorkers()

private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}

interruptWorkers() 很簡單,循環對所有worker調用 interruptIfStarted(),其中會判斷worker的AQS state是否大於0,即worker是否已經開始運作,再調用Thread.interrupt()

需要注意的是,對於運行中的線程調用Thread.interrupt()並不能保證線程被終止,task.run()內部可能捕獲了InterruptException,沒有上拋,導致線程一直無法結束

三、awaitTermination()  --  等待線程池終止

 

參數:
    timeout:超時時間
    unit:     timeout超時時間的單位
返回:
    true:線程池終止
    false:超過timeout指定時間
在發出一個shutdown請求后,在以下3種情況發生之前,awaitTermination()都會被阻塞
1、所有任務完成執行
2、到達超時時間
3、當前線程被中斷

 

/**
 * Wait condition to support awaitTermination
 */
private final Condition termination = mainLock.newCondition();

awaitTermination() 循環的判斷線程池是否terminated終止 或 是否已經超過超時時間,然后通過termination這個Condition阻塞等待一段時間

termination.awaitNanos() 是通過 LockSupport.parkNanos(this, nanosTimeout)實現的阻塞等待

阻塞等待過程中發生以下具體情況會解除阻塞(對上面3種情況的解釋):

1、如果發生了 termination.signalAll()(內部實現是 LockSupport.unpark())會喚醒阻塞等待,且由於ThreadPoolExecutor只有在 tryTerminated()嘗試終止線程池成功,將線程池更新為terminated狀態后才會signalAll(),故awaitTermination()再次判斷狀態會return true退出

2、如果達到了超時時間 termination.awaitNanos() 也會返回,此時nano==0,再次循環判斷return false,等待線程池終止失敗

3、如果當前線程被 Thread.interrupt(),termination.awaitNanos()會上拋InterruptException,awaitTermination()繼續上拋給調用線程,會以異常的形式解除阻塞

故終止線程池並需要知道其是否終止可以用如下方式:

executorService.shutdown();
try{
    while(!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
        LOGGER.debug("Waiting for terminate");
    }
} 
catch (InterruptedException e) {
    //中斷處理
}

 

參考資料:

    深入理解java線程池—ThreadPoolExecutor


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM