JAVA並發(8)-ThreadPoolExecutor的講解


很久前(2020-10-23),就有想法學習線程池並輸出博客,但是寫着寫着感覺看不懂了,就不了了之了。現在重拾起,重新寫一下(學習一下)。

線程池的優點也是老生常談的東西了

  1. 減少線程創建的開銷(任務數大於線程數時)
  2. 統一管理一系列的線程(資源)

在講ThreadPoolExecutor前,我們先看看它的父類都有些啥。

Executor的繼承關系

Executor,執行提交的Runnable任務的對象,將任務提交與何時執行分離開。
execute方法是Executor接口的唯一方法。

    // 任務會在未來某時執行,可能執行在一個新線程中、線程池或調用該任務的線程中。
    void execute(Runnable command);

ExecutorService的繼承關系

ExecutorService是一個Executor,提供了管理終止的方法和返回Future來跟蹤異步任務的方法(sumbit)。
終止的兩個方法

  • shutdown(), 正在執行的任務繼續執行,不接受新任務
  • shutdownNow(), 正在執行的任務也要被終止

AbstractExecutorService的繼承關系

AbstractExecutorService,實現了ExecutorServicesumbitinvokeAny,invokeAll

ThreadPoolExecutor的繼承關系

介紹

🐱‍🏍線程池主要元素

線程池主要元素

線程池主要元素

底層變量

ctl

我們講講先ctl(The main pool control state), 其包含兩個信息

  1. 線程池的狀態(最高三位)
  2. 線程池的workerCount,有效的線程數
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

🐱‍🏍線程池的狀態轉化圖

線程池的狀態轉化圖

看一下每個狀態的含義

  1. RUNNING, 接受新的任務並且處理阻塞隊列的任務
  2. SHUTDOWN, 拒絕新任務,但是處理阻塞隊列的任務
  3. STOP, 拒絕新任務,並且拋棄阻塞隊列中的任務,還要中斷正在運行的任務
  4. TIDYING,所有任務執行完(包括阻塞隊列中的任務)后, 當前線程池活動線程為0, 將要調用terminated方法
  5. TERMINATED, 終止狀態。調用terminated方法后的狀態

workers

工作線程都添加到這個集合中。可以想象成一個集中管理的平台,可以通過workers獲取活躍的線程數,中斷所有線程等操作。

    private final HashSet<Worker> workers = new HashSet<Worker>();

可修改變量

構造器中的參數

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime, // 最大等待任務的時間,超過則終止超過corePoolSize的線程
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue, // 阻塞隊列
                              ThreadFactory threadFactory, // executor使用threadFactory創建一個線程
                              RejectedExecutionHandler handler) // 拒絕策略

corePoolSize、maximumPoolSize,workQueue三者的關系:

  • 當線程數小於corePoolSize,任務進入,即使有其他線程空閑,也會創建一個新的線程
  • 大於corePoolSize且小於maximumPoolSizeworkQueue未滿,將任務加入到workQueue中;只有workQueue滿了,才會新建一個線程
  • workQueue已滿,且任務大於maximumPoolSize,將會采取拒絕策略(handler)

拒絕策略:

  1. AbortPolicy, 直接拋出RejectedExecutionException
  2. CallerRunsPolicy, 使用調用者所在線程來執行任務
  3. DiscardPolicy, 默默丟棄
  4. DiscardOldestPolicy, 丟棄頭部的一個任務,重試

allowCoreThreadTimeOut

控制空閑時,core threads是否被清除。

探索源碼

最重要的方法就是execute

提交的任務將在未來某個時候執行

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */

        // 獲取workCount與runState
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

根據上面的注釋,我們將execute分為三個部分來講解

  • 當正在運行的線程數小於corePoolSize
  • 當大於corePoolSize時,需要入隊
  • 隊列已滿

當正在運行的線程數小於corePoolSize

        // execute第一部分代碼
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

addWorker, 創建工作線程。
當然它不會直接就添加一個新的工作線程,會檢測runStateworkCount,來避免不必要的新增。檢查沒問題的話,新建線程,將其加入到wokers,並將線程啟動。

   // firstTask,當線程啟動時,第一個任務
   // core,為true就是corePoolSize作為邊界,反之就是maximumPoolSize
   private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

上面的代碼很長,我們將它分為兩部分

// addWorker()第一部分代碼
// 這部分主要是通過CAS增加workerCount
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 下面的條件我們將它轉化成👇
            // if (rs >= SHUTDOWN &&
            //      (rs != SHUTDOWN ||
            //        firstTask != null ||
            //         workQueue.isEmpty()))
            // 結合線程池狀態分析!

            // 情況1. 當前的線程池狀態為STOP、TIDYING,TERMINATED
            // 情況2. 當前線程池的狀態為SHUTDOWN且firstTask不為空,只有RUNNING狀態才可以接受新任務
            // 情況3. 當前線程池的狀態為SHUTDOWN且firstTask為空且隊列為空。
            // 這幾種情況,沒有必要新建worker(線程)。
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // CAS增加workerCount成功,繼續第二部分操作
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 這里的線程池狀態被改變了,繼續外部循環,再次檢查線程池狀態
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

經過上面的代碼,我們成功通過CAS使workerCount + 1,下面我們就會新建worker並添加到workers中,並啟動通過threadFactory創建的線程。

// addWorker()第二部分代碼

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    // 第一種情況,rs為RUNNING
                    // 第二種情況是rs為SHUTDOWN,firstTask為null, 但是workQueue(阻塞隊列)不為null,創建線程進行處理
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 這里是該線程已經被啟動了,我覺得的原因是threadFactory創建了兩個相同的thread,不知道還有其他原因沒。
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 上面的線程創建可能失敗,或者線程工廠返回null
            // 或者線程啟動時,拋出OutOfMemoryError
            if (! workerStarted)
                // 回滾狀態
                addWorkerFailed(w);
        }
        return workerStarted;

看完了addWorker的步驟,代碼中有個Worker類,看似是線程但又不完全是線程,我們去看看它的結構。

Worker
這個類的主要作用是,維護線程運行任務的中斷控制狀態記錄每個線程完成的任務數

整體結構

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable{

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** 每個線程的任務完成數 */
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            // 在創建線程時,將任務傳入到threadFactory中
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            // 將運行委托給外部方法runWorker,下面會詳見。

            // 這里是運行任務的核心代碼✨
            runWorker(this);
        }

        // 實現AQS的獨占模式的方法,該鎖不能重入。
        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
        // 線程運行之后才可以被中斷
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {}
            }
        }
}

我們來看看runWorker的實現。這個類主要的工作就是,不停地阻塞隊列中獲取任務並執行,若firstTask不為空,就直接執行它。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // getTask控制阻塞等待任務或者是否超時就清除空閑的線程
            // getTask非常之重要✨,后面會講到
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                // 第二種情況,重新檢測線程池狀態,因為此時可能其他線程會調用shutdownNow
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 執行前
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 執行firstTask的run方法
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                    // 執行后
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 處理worker退出
            processWorkerExit(w, completedAbruptly);
        }
    }

🐱‍🏍 啟動一個線程,大致執行的方法流程
具體執行的方法

getTask,我們來看看它是怎樣阻塞或定時等待任務的

Performs blocking or timed wait for a task, depending on current configuration settings, or returns null if this worker must exit because of any of:

  1. There are more than maximumPoolSize workers (due to a call to setMaximumPoolSize).
  2. The pool is stopped.
  3. The pool is shutdown and the queue is empty.
  4. This worker timed out waiting for a task, and timed-out workers are subject to termination (that is, allowCoreThreadTimeOut || workerCount > corePoolSize) both before and after the timed wait, and if the queue is non-empty, this worker is not the last thread in the pool. 🐱‍🏍(超時等待任務的worker,在定時等待前后都會被終止(情況有,allowCoreThreadTimeOut || wc > corePoolSize)

Returns:
task, or null if the worker must exit, in which case workerCount is decremented(worker退出時,workerCount會減一)

  private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 檢測是否有必要返回新任務,注意每個狀態的含義就明白了
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 檢測worker是否需要被淘汰
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // 下面的代碼結合上面的timed變量,超時后,當大於corePoolSize時,返回null
            // 或者當allowCoreThreadTimeOut = true時,超時后,返回null
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                // 這里r為null的話,只能是timed = true的情況;take(),一直會阻塞直到有任務返回
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

我們來看看當getTask返回null時,線程池是如何處理worker退出的

根據runWorker的代碼,getTask為null,循環體正常退出,此時completedAbruptly = false;

processWorkerExit

  private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 1. 有異常退出的話, workerCount將會減一
        // 2. 正常退出的話,因為在getTask中已經減一,所以這里不用理會
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        // 將worker完成的任務數加到completedTaskCount
        // 從workers中移除當前worker
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        // 檢測線程池是否有資格設置狀態為TERMINATED
        tryTerminate();

        int c = ctl.get();
        // 此時的狀態是RUNNING或SHUTDOWN
        if (runStateLessThan(c, STOP)) {
            // 1. 非正常退出的,addWorker()
            // 2. 正常退出的, workerCount小於最小的線程數,就addWorker()
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

getTask是保證存在的線程不被銷毀的核心,getTask則利用阻塞隊列take方法,一直阻塞直到獲取到任務為止。

當大於corePoolSize時,需要入隊

// execute第二部分代碼
        // 線程池狀態是RUNNING(只有RUNNING才可以接受新任務)
        // 此時,workerCount >= corePoolSize, 將任務入隊
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 此時線程池可能被shutdown了。
            // 需要清除剛添加的任務,若任務還沒有被執行,就可以讓它不被執行
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 若此時沒有worker,新建一個worker去處理隊列中的任務
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

隊列已滿

       // execute第三部分代碼
       // addWorker第二個參數false表明,以maximumPoolSize為界限
        else if (!addWorker(command, false))
            // workerCount > maximumPoolSize 就對任務執行拒絕策略
            reject(command);

我們就講完了執行方法execute(),有興趣的同學可以去看看關閉方法shutdown()以及shutdownNow(),看看他們的區別。當然也可以去研究一下其他方法的源碼。

探究一些小問題

  1. runWorker為啥這樣拋錯
    try {
        task.run();
    } catch (RuntimeException x) {
        thrown = x; throw x;
    } catch (Error x) {
        thrown = x; throw x;
    } catch (Throwable x) {
        thrown = x; throw new Error(x);
    } finally {
    ...
    }

We separately handle RuntimeException, Error (both of which the specs guarantee that we trap) and arbitrary Throwables. Because we cannot rethrow Throwables within Runnable.run, we wrap them within Errors on the way out (to the thread's UncaughtExceptionHandler). Any thrown exception also conservatively causes thread to die.

大致意思就是,分別處理RuntimeException、Error和任何的Throwable。因為不能在 Runnable.run 中重新拋出 Throwables,所以將它們包裝在 Errors中(到線程的 UncaughtExceptionHandler).
Runnable.run不能拋出Throwables的原因是,Runnable中的run並沒有定義拋出任何異常,繼承它的子類,拋錯的范圍不能超過父類
UncaughtExceptionHandler可以處理“逃逸的異常”,可以去了解一下。

  1. 創建線程池最好手動創建,參數根據系統自定義
    自定義線程數的依據
    圖中的設置線程數的策略只是初步設置,下一篇我們去研究具體的線程數調優

  2. 為什么創建線程開銷大
    啟動一個線程時,將涉及大量的工作

  • 必須為線程堆棧分配和初始化一大塊內存。
  • 需要創建/注冊native thread在host OS中
  • 需要創建、初始化描述符並將其添加到 JVM 內部數據結構中。

雖然啟動一個線程的時間不長,耗費的資源也不大,但有個東西叫"積少成多"。就像
Doug Lea寫的源碼一樣,有些地方的細節優化,看似沒必要,但是請求一多起來,那些細節就是"點睛之筆"了。
當我們有大量需要線程時且每個任務都是獨立的,盡量考慮使用線程池

總結

線程池的總體流程圖
線程池的總體流程

線程池新建線程,如何保證可以不斷地獲取任務,就是通過阻塞隊列(BlockingQueue)的take方法,阻塞自己直到有任務才返回。

本篇博客也到這里就結束了,學習線程池以及輸出博客,中間也拖了很久,最后送給大家以及自己最近看到的一句話

往往最難的事和自己最應該做的事是同一件事

參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM