java線程池:ThreadPoolExecutor


 

jdk自帶線程池ThreadPoolExecutor包含了大量的信息,其中包括真正的線程池實現,工作隊列,線程池狀態,線程池的統計信息(工作線程數,完成任務數)以及為了使線程池適配各種各樣場合而產生的各種可調整參數以及鈎子方法。使用Executors種的各種便利工廠方法基本已經可以滿足日常情況下的需求。這里處於學習目的研究一下其工作機理。

 

線程池狀態是控制線程池生命周期至關重要的屬性,這里就以線程池狀態為出發點,進行研究。

需要說明的是,在查看jdk1.6的源碼和jdk1.7的源碼時發現jdk1.7對線程池做了略微的修改,線程池狀態由jdk1.6的RUNNING,SHUTDOWN,STOP,TERMINATED四個狀態變為了RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED五個狀態,並且修改線程池最大上限為2的29次方-1(而不是2的31次方-1),並將狀態值與工作線程數打包在一個整型中,用整形的上3位表示狀態,下29位表示工作線程數,這個屬性字段為"ctl",下面將直接使用ctl狀態”來描述當前線程池狀態,用“ctl計數”來描述當前線程池工作線程計數而不再解釋。(這個屬性為AtomicInteger ,主要是為了保持工作線程數的原子性)。因為手頭只有jdk1.7,這次的隨筆將以jdk1.7版本作為標准

這里先列出兩種可能的線程池狀態流程:1.RUNNING –>SHUTDONW –>TIDYING –>TERMINATED

                                                                              2.RUNNING –>STOP –>TIDYING –>TERMINATED

RUNNING

線程池初始狀態為RUNNING,這在ctl屬性初始化時進行了設置:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

RUNNING狀態為線程池的運行態,運行態時,RUNNING將提供豐富的操作來使用線程池,例如可以使用prestartAllCoreThreads()方法啟動所有核心線程,可以使用prestartCoreThread()啟動單個核心線程等等。大部分情況下,使用做多的是execute(Runnable)方法,通過調用線程池的execute(Runnable)方法,可以執行Runnable對象的run()方法而不用去關心具體是哪個線程去執行的這個方法,線程的調控由線程池負責。execute(Runnable)方法代碼如下:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

execute執行時:

1.調用workCountOf()方法獲取當前線程池中線程數量,判斷當前線程數量是否超過了核心線程數,若未超過,則直接添加一個核心線程,並用此線程完成當前提交的任務。

2.若當前線程數已經超過核心線程數且ctl狀態等於RUNNING且工作成功進入工作等待隊列,則我們進一步復查ctl,若ctl狀態依舊為RUNNING,且調用workCountOf()方法檢查發現此時的工作線程數為0時,將添加一個工作線程;若此時已經不為RUNNING,則嘗試移除任務,並調用拒絕任務方法:reject(command)。(這里之所以需要復查ctl狀態是由於在執行workQueue.offer(command)方法時,ctl狀態隨時可能由於調用shutDown方法或者shutDownNow方法而發生變化)。

3.如果上述兩種情況都不吻合,即此時已經有超過核心線程數的的線程在工作,且任務隊列也已堆滿,則嘗試增加一個工作線程(如果此時線程數達到限定最大線程數,則會失敗),若失敗則調用拒絕任務方法reject(command).

在execute的方法中添加工作線程的所調用的法為addWorker(Runnable runnable,Boolean core).該方法接受兩個參數,runnable對象為這個新建線程的第一個工作任務,可以為空。core指代新建的任務是否是核心線程。

addWorker,runWorker,getTask方法為線程池工作的主要方法,下面一一介紹。

 

addWorker的代碼如下所示:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

逐行查看:

1.循環中首個判斷條件:

if (rs >= SHUTDOWN &&
               ! (rs == SHUTDOWN &&
                  firstTask == null &&
                  ! workQueue.isEmpty()))
               return false;

這表示ctl狀態為RUNNING狀態或者為SHUTDOWN狀態且此時任務隊列仍有任務未執行完時,可以繼續調用addWorker添加工作線程,但不能新建任務,即firstTask參數必須為null.否則這里將返回false,即新建工作線程失敗。

2.內層循環:

for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

這部分對當前線程數量做了判斷,依據core參數,若core參數為true,即添加的為核心線程,則當前工作的線程數量不應當超過corePoolSize,否則返回false。若core參數為false,即添加的為普通線程,則當前工作的線程數量不應當超過maximumPoolSize,否則返回false。通過上述校驗,則調用compareAndIncrementWorkerCount(c)嘗試增加當前ctl計數,若成功則跳出外曾循環。若失敗則重復價檢查當前線程池ctl狀態(之所以檢查ctl是因為造成失敗的原因為ctl發生變化,這有兩種可能,一種是作為下29位的工作線程計數發生變化,一種是作為上3位的狀態標志位發生變化,這里檢查的就是上3位的變化),若是ctl狀態發生變化,則重新嘗試外層循環(這是有可能外層循環會由於ctl狀態的變化而直接return false)。若是ctl計數發生變化,則重新嘗試內層循環。

3.添加worker

       try{ 

       final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }

        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }

這部分獲取了線程池的主鎖mainLock。從而保證對共享資源workers的排他訪問。這里通過new Worker(firstTask) 新建了一個worker對象,該對象持有他的第一個任務firstTask。Woker類的構造方法,將通過ThreadFactory獲取一個thread對象,這里可能失敗,返回null,或者拋出異常。所以在上面的代碼段中對這種情況作了處理,若返回null,則workerStarted將為false。將執行addWorkerFailed方法處理失敗情況;若拋出異常同樣workerStarted將為false。將同樣執行addWorkerFailed方法。若成功通過ThreadFactory新建了一個持有當前worker對象的thread對象,則繼續往下recheck ctl狀態(規則與1沒有區別),通過校驗則將當前worker放入workers數組中,然后重新校正隊則池大小(largestPoolSize),置workerAdded標志位為true。最后通過wokerAdded標志位校驗,置workerStarted標志位為true,啟動線程,該線程持有當前worker對象,會執行worker對象的run方法,而woker對象的run方法又調用了自身的runWorker(this)方法。至此為止一個worker正式被添加進入workers數組並且正式開始運轉。

 

runWorker方法代碼如下所示:

  final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

逐行查看:

1.先說明一個變量completedAbruptly,這個布爾值被用於指代runWorker停止的原因,當completedAbruptly為true時代表worker停止是因為worker執行的外部業務邏輯代碼拋出異常引起的。當completedAbruptly為false時代表worker停止是線程池內部工作機制下的正常退出。

2.獲取當前執行線程,獲取當前worker的第一個任務置於task變量中,之后執行w.firstTask  = null。若不不將worker對象的firstTask置為空,則firstTask引用指向的task對象將不會被gc回收。將worker對象解鎖,從而允許其在尚未獲得task之前被中斷。進入while循環,這里將調用getTask()方法獲取任務task對象(可能造成線程等待),若因為各種原因(下面說明getTask()方法時會說到)返回null(即獲取不到任務)則停止while循環,將completedAbruptly置為false。

3.進入循環內部,代表已經獲取到可執行的task對象,則鎖定worker對象(保證不被shutDown方法中斷),接着做條件判斷,若ctl狀態超過STOP態,或者當前線程已經因為線程池內部狀態變化而被中斷(如何判斷中斷是因為線程池內部狀態變化而中斷的?重復檢查ctl狀態即可,若線程的中斷信號是在外部邏輯代碼中設置的(不使用線程池的shutDownNow方法,直接使用的thread.interrupt()方法),則ctl狀態不會為STOP,這樣就可判定為不是因為線程池內部狀態變化而引起的中斷),則設置該工作線程為中斷狀態(wt.interrupt())。否則執行beforeExecute(wt,task)鈎子方法,用戶可以重寫該方法而達到一些自定義需求(例如統計)。接着執行獲得task的run方法,至此該worker成功獲得並執行了一個任務。執行期間拋出的異常都有處理,不再多說。最終將置task為空,增加worker的完成任務數(completedTasks),隨后解鎖worker。

4.在最外層的finally塊中,執行了processWorkerExit(w,completedAbruptly)方法,該方法會處理處理統計信息,將worker的completedTask累加入線程池的completedTaskCount,並從線程池的workers中移除該worker,之后嘗試終止線程池(因為這個worker可能是當前線程池中最后一個worker,tryTerminate方法應該在所有可能終止當前線程的地方被調用)。最后根據completedAbruptly的值選擇策略,若為true,則直接調用addWorker增加一個新worker用以替代當前移除的worker;若為false則判斷當前線程池大小是否小於允許的最小線程池大小(可能是corePoolSize也可能小於),若小於則調用addWorker增加一個新worker,否則什么也不做。

 

getTask方法代碼如下所示:

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            boolean timed;      // Are workers subject to culling?

            for (;;) {
                int wc = workerCountOf(c);
                timed = allowCoreThreadTimeOut || wc > corePoolSize;

                if (wc <= maximumPoolSize && ! (timedOut && timed))
                    break;
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

逐行查看:

1.同樣首先說明一個變量,timeOut,這個布爾值指代當前getTask方法是否超時未能獲取到task對象。

2.外層for循環首先校驗ctl狀態,若ctl狀態大於SHUTDOWN或者等於SHUTDOWN且任務隊列workQueue為空,則返回null。否則進入內層循環,若當前線程數量小於maximumPoolSize且並未超時或者當前線程池不存在超時限制(由timed變量指代),則跳出循環,否則則嘗試減小ctl計數,並返回null。若嘗試減小ctl計數失敗(由於ctl值變化引起,上面addWorker中第2點已經提到過),則先檢查ctl狀態,若失敗是由ctl狀態變化引起,則嘗試外層循環,若失敗是由於ctl計數變化而引起則嘗試內層循環。

3.在第二點中,跳出循環之后,將根據timed的值做不同策略。若timed為true,即線程池存在超時限制,則執行workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),這個方法將只在keepAliveTime時間內等待獲取任務,一旦超過則返回null;若timed為false,則執行workQueue.take()方法,該方法將無限時等待任務,直至獲取任務或者出現中斷異常。在這兩個方法之一返回之后,若返回task為null,則設置timeOut為true,嘗試外層循環;若task不為null,則返回成功獲取的task對象。

4.如果在getTask期間出現中斷異常,則設置timeOut為false,並且重試外層循環,即InterruptedException並不會對getTask方法造成任何影響,真正能影響getTask方法的是ctl狀態的轉變。

 

 

上面詳細描述了線程池RUNNING狀態下的三個主要的方法,addWorker,getTask,runWorker,這三個方法構成了線程池的主要工作流程,如下所示:

addWorker—>runWoker—>getTask—>runTask—>getAnotherTask—>runTask….

以上就為RUNNING狀態下的基本流程,下面介紹RUNNING狀態是如何跳轉至SHUTDOWN和STOP狀態。

 

SHUTDOWN

 

SHUTDOWN狀態由shutdown()方法產生,代碼如下所示:

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

1.調用shutdown()方法,將獲得主鎖mainLock,並且查看是否有執行shutdown的權限,然后調用advanceRunState方法,將ctl狀態置為shutdown。調用interruptIdleWorkers()方法關閉尚未獲得task對象的worker(即還未執行到getTask()方法或者還未得到getTask()返回的worker)。之后調用onShutdown()鈎子方法,用戶可以在這里處理自定義邏輯。最后調用tryTerminate()方法嘗試終止線程池。

 

STOP

STOP狀態由shutdownNow()方法產生,代碼如下所示:

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

1.shutdownNow()方法將ctl狀態置為STOP狀態,並且調用 interruptWorkers()方法(注意不是interruptIdleWorkers方法,該方法不管worker是否獲取到task對象,都會將worker所在的線程置為interrupt狀態),之后移除任務隊列中的所有任務。最后調用tryTerminate()方法嘗試終止線程池。

 

至此線程池進入SHUTDOWN或者STOP狀態,在shutdown和shutdownNow,以及之前在runWorker中提到過的processWorkerExit方法中都調用了tryTerminate()方法,這個方法將使線程進入過渡狀態TIDYING,並且最終變為STOP狀態。

 

TIDYING,STOP

 

TIDYING,STOP狀態由tryTerminate()方法產生,代碼如下所示:

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

 

逐行查看:

1.tryTerminate()方法嘗試終止線程池活動,滿足終止條件的因素有兩個:首先,ctl狀態為STOP,或者為SHUTDOWN且任務隊列為空(STOP狀態之所以一直都不用判斷workQueue是因為上面講到的,shutdownNow()方法調用了drainQueue()方法清空了workQueue所以其必然為空,這里解釋一下);其次,ctl計數為0。第二個條件的滿足是由一連串連鎖反應保證的,shutdownNow()方法置ctl狀態為STOP,使得所有worker調用getTask()方法滿足rs>=SHUTDOWN條件從而調用decrementWorkerCount()方法,這將最終導致ctl計數為0,同時所有work都將從getTask()方法獲得null,最終導致runWorker()方法調用processWorkerExit()方法,將workers數組真正清空。shutdown()方法稍微復雜,它置ctl狀態為SHUTDOWN,但是線程池仍將繼續運行,直至所有workers將工作隊列中的任務全部完成,之后的邏輯和stop狀態下的完全一樣,不再多說。

2.在保證了上述兩個條件之后,tryTerminate()方法獲取住所mainLock,置ctl狀態為TIDYING,之后執行terminated鈎子方法,最后置ctl狀態為TERMINATED。

 

至此ctl狀態變為TERMINATED,且workers已清空,workQueue也已清空,線程池的生命周期到此結束。

線程池關於線程池工作信息的統計方法,拒絕任務請求方法,以及為了適應各種工作環境的構造方法由於過於瑣碎且和本文主線不符,這里就不再介紹。以上為本文全部,歡迎探討。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM