線程池ThreadPoolExecutor——Worker源碼解析


線程池任務運行的主流程如下:

線程池調用execute提交任務
—>創建Worker(設置屬性thead、firstTask)
—>worker.thread.start()
—>實際上調用的是worker.run()
—>線程池的runWorker(worker)
—>worker.firstTask.run();

可以看到,在ThreadPoolExecutor中以Worker為單位對工作線程進行管理,下面分析一下Worker的執行原理:

1. Worker源碼

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;//執行任務的線程
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;//要執行的任務
        /** Per-thread task counter */
        volatile long completedTasks;//完成任務的數量

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //調用線程工廠創建線程
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            //實際是調用 ThreadPoolExecutor.runWorker()方法
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            //CAS獲取鎖,不會有阻塞
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

java.util.concurrent.ThreadPoolExecutor.Worker就是線程池中執行任務的類,其繼承了AQS並實現Runnable,所以它可以擁有AQS與Runnable的作用。

1.1 AQS作用

Worker繼承了AbstractQueuedSynchronizer,主要目的有兩個:

  • 將鎖的粒度細化到每個工Worker。
    • 如果多個Worker使用同一個鎖,那么一個Worker Running持有鎖的時候,其他Worker就無法執行,這顯然是不合理的。
  • 直接使用CAS獲取,避免阻塞。
    • 如果這個鎖使用阻塞獲取,那么在多Worker的情況下執行shutDown。如果這個Worker此時正在Running無法獲取到鎖,那么執行shutDown()線程就會阻塞住了,顯然是不合理的。

1.2 Runnable作用

Worker還實現了Runnable,它有兩個屬性thead、firstTask。根據整體流程:

線程池調用execute—>創建Worker(設置屬性thead、firstTask)—>worker.thread.start()—>實際上調用的是worker.run()—>線程池的runWorker(worker)—>worker.firstTask.run()(如果firstTask為null就從等待隊列中拉取一個)。

轉了一大圈最終調用最開始傳進來的任務的run方法,不過通過等待隊列可以重復利用worker與worker中的線程,變化的只是firstTask。下面我們對線程池的runWorker方法進行探究。

2. Worker.run源碼

2.1 runWorker方法

Worker實現了Runnable,其run()方法中最終是走到了線程池的runWorker()方法。

public void run() {
    //實際是調用 ThreadPoolExecutor.runWorker()方法
    runWorker(this);
}
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        //任務是否正常執行完成
        boolean completedAbruptly = true;
        try {
            //如果task為null就通過getTask方法獲取阻塞隊列中的下一個任務
            //getTask方法一般不會返回null,所以這個while類似於一個無限循環
            //worker對象就通過這個方法的持續運行來不斷處理新的任務
            while (task != null || (task = getTask()) != null) {
                //每一次任務的執行都必須獲取鎖來保證下方臨界區代碼的線程安全
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                //如果狀態值大於等於STOP(狀態值是有序的,即STOP、TIDYING、TERMINATED)且當前線程還沒有被中斷,則主動中斷線程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //執行任務前處理操作,默認是一個空實現;在子類中可以通過重寫來改變任務執行前的處理行為
                    beforeExecute(wt, task);
                    //保存任務執行過程中拋出的異常,提供給下面finally塊中的afterExecute方法使用
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        //異常包裝為Error
                        thrown = x; throw new Error(x);
                    } finally {
                        //任務后處理,同beforeExecute
                        afterExecute(task, thrown);
                    }
                } finally {
                    //將循環變量task設置為null,表示已處理完成
                    task = null;
                    //加當前worker已經完成的任務數
                    w.completedTasks++;
                    w.unlock();
                }
            }
            //將completedAbruptly變量設置為false,表示任務正常處理完成
            completedAbruptly = false;
        } finally {
            //銷毀當前的worker對象,並完成一些諸如完成任務數量統計之類的輔助性工作
            //在線程池當前狀態小於STOP的情況下會創建一個新的worker來替換被銷毀的worker
            processWorkerExit(w, completedAbruptly);
        }
    }

runWorker方法的源代碼中有兩個比較重要的方法調用,一個是while條件中對getTask方法的調用,一個是在方法的最后對processWorkerExit方法的調用。

2.2 getTask方法

private Runnable getTask() {
    // 通過timeOut變量表示線程是否空閑時間超時了
    boolean timedOut = false;

    // 無限循環
    for (;;) {
        // 獲取線程池狀態
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 如果 線程池狀態>=STOP
        //    或者 (線程池狀態==SHUTDOWN && 阻塞隊列為空)
        // 則直接減少一個worker計數並返回null(返回null會導致當前worker被銷毀)
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        // 獲取線程池中的worker計數
        int wc = workerCountOf(c);

        // 判斷當前線程是否會被超時銷毀
        // 會被超時銷毀的情況:線程池允許核心線程超時 或 當前線程數大於核心線程數
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 如果 (當前線程數大於最大線程數 或 (允許超時銷毀 且 當前發生了空閑時間超時))
        //   且 (當前線程數大於1 或 阻塞隊列為空) —— 該條件在阻塞隊列不為空的情況下保證至少會保留一個線程繼續處理任務
        // 則 減少worker計數並返回null(返回null會導致當前worker被銷毀)
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 從阻塞隊列中取出一個任務(如果隊列為空會進入阻塞等待狀態)
            // 如果允許空閑超時銷毀線程的話則帶有一個等待的超時時間
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            // 如果獲取到了任務就直接返回該任務,返回后會開始執行該任務
            if (r != null)
                return r;
            // 如果任務為null,則說明發生了等待超時,將空閑時間超時標志設置為true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 如果等待被中斷了,那說明空閑時間(等待任務的時間)還沒有超時
            timedOut = false;
        }
    }
}

getTask方法在阻塞隊列中有待執行的任務時會從隊列中彈出一個任務並返回,如果阻塞隊列為空,那么就會阻塞等待新的任務提交到隊列中直到超時(在一些配置下會一直等待而不超時),如果在超時之前獲取到了新的任務,那么就會將這個任務作為返回值返回。所以一般getTask方法是不會返回null的,只會阻塞等待下一個任務並在之后將這個新任務作為返回值返回。

當getTask方法返回null時會導致當前Worker退出,當前線程被銷毀。在以下情況下getTask方法才會返回null:

  1. 當前線程池中的線程數超過了最大線程數。這是因為運行時通過調用setMaximumPoolSize修改了最大線程數而導致的結果;
  2. 線程池處於STOP狀態。這種情況下所有線程都應該被立即回收銷毀;
  3. 線程池處於SHUTDOWN狀態,且阻塞隊列為空。這種情況下已經不會有新的任務被提交到阻塞隊列中了,所以線程應該被銷毀;
  4. 線程可以被超時回收的情況下等待新任務超時。線程被超時回收一般有以下兩種情況:
    • 超出核心線程數部分的線程等待任務超時
    • 允許核心線程超時(線程池配置)的情況下線程等待任務超時

2.3 processWorkerExit方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 如果completedAbruptly為true則表示任務執行過程中拋出了未處理的異常
    // 所以還沒有正確地減少worker計數,這里需要減少一次worker計數
    if (completedAbruptly)
        decrementWorkerCount();

    // 獲取線程池的主鎖
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 把將被銷毀的線程已完成的任務數累計到線程池的完成任務總數上
        completedTaskCount += w.completedTasks;
        // 從worker集合中去掉將會銷毀的worker
        workers.remove(w);
    } finally {
        // 釋放線程池主鎖
        mainLock.unlock();
    }

    // 嘗試結束線程池
    // 這里是為了在關閉線程池時等到所有worker都被回收后再結束線程池
    tryTerminate();

    int c = ctl.get();
    // 如果線程池狀態 < STOP,即RUNNING或SHUTDOWN
    // 則需要考慮創建新線程來代替被銷毀的線程
    if (runStateLessThan(c, STOP)) {
        // 如果worker是正常執行完的,則要判斷一下是否已經滿足了最小線程數要求
        // 否則直接創建替代線程
        if (!completedAbruptly) {
            // 如果允許核心線程超時則最小線程數是0,否則最小線程數等於核心線程數
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 如果阻塞隊列非空,則至少要有一個線程繼續執行剩下的任務
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 如果當前線程數已經滿足最小線程數要求
            // 那么就不創建替代線程了
            if (workerCountOf(c) >= min)
                return;
        }

        // 重新創建一個worker來代替被銷毀的線程
        addWorker(null, false);
    }
}

processWorkerExit方法會銷毀當前線程對應的Worker對象,並執行一些累加總處理任務數等輔助操作,但在線程池當前狀態小於STOP的情況下會創建一個新的Worker來替換被銷毀的Worker。

 

 

 

參考:

https://segmentfault.com/a/1190000018630751

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM