【JDK源碼分析】線程池ThreadPoolExecutor原理解析


前言

一般情況下使用線程池都是通過Executors的工廠方法得到的,這些工廠方法又基本上是調用的ThreadPoolExecutor的構造器。也就是說常用到的線程池基本用到的是ThreadPoolExecutor。ThreadPoolExecutor的大概原理是先規定一個線程池的容量,然后給提交過來的任務創建執行線程,任務執行完畢后放在池子中等待新的任務提交過來,當然ThreadPoolExecutor的內部細節比這要復雜得多。下面就通過源碼來理解它的原理。

源碼

先從其屬性及構造器開始看

ThreadPoolExecutor類屬性及構造器

為了便於好理解,先將一些需要注意的地方列出: 
1. 原子整型變量ctl是它一個比較重要的屬性,它用來存儲線程池的運行狀態(運行狀態、停止狀態等)以及當前活動的線程數;它的前3位用來表示線程池的運行狀態,后29位用來表示當前活動的線程數,

運行狀態名稱 二進制數值
RUNNING 1110 0000 0000 0000 0000 0000 0000 0000
SHUTDOWN 0000 0000 0000 0000 0000 0000 0000 0000
STOP 0010 0000 0000 0000 0000 0000 0000 0000
TIDYING 0100 0000 0000 0000 0000 0000 0000 0000
TERMINATED 0110 0000 0000 0000 0000 0000 0000 0000

常量CAPACITY用於表示線程池最大線程數量不能超過該值,它的二進制數值為 0001 1111 1111 1111 1111 1111 1111 1111,了解了這些后獲取當前線程池運行狀態runStateOf、當前活動的線程workerCountOf方法、以及生成ctl的ctlOf方法就很好理解了。 
2. 最大線程數maximumPoolSize,該值小於等於常量CAPACITY的;核心線程數corePoolSize,表示一個線程池在沒有任務在隊列中等待時最大活動的線程數,即當新任務提交時,如果活動的線程小於核心線程數corePoolSize,則會創建新的線程來執行任務即使在池中有空閑的線程,如果活動的線程大於核心線程數corePoolSize且小於最大線程數maximumPoolSize,則僅當任務隊列滿時才會創建新的線程來執行;如果活動的線程等於maximumPoolSize時,此時會執行拒絕策略來拒絕提交的任務。 
3. 線程池的運行狀態

運行狀態名稱 狀態說明
RUNNING 接受新提交的任務並且處理任務隊列中的任務
SHUTDOWN 不接受新提交的任務,但是處理任務隊列中的任務
STOP 不接受新提交的任任務,不處理任務隊列中的任務,同時中斷正在運行中的任務
TIDYING 所有任務被終止, 活動的線程數workCount為0,此狀態下還會執行terminated鈎子方法
TERMINATED terminated鈎子方法已執行

線程池的狀態變化有如下幾種:

運行狀態變化 發生條件
RUNNING -> SHUTDOWN 調用shutdown方法
(RUNNING or SHUTDOWN) -> STOP 調用shutdownNow方法
SHUTDOWN -> TIDYING 任務隊列和線程池都為空
STOP -> TIDYING 線程池為空
TIDYING -> TERMINATED terminated 鈎子方法已執行

4. 拒絕策略RejectedExecutionHandler,該策略會在任務被拒絕時執行,JDK提供了4種實現,分別為AbortPolicy(默認的拒絕策略,會在拒絕任務時拋出運行時異常)、CallerRunsPolicy(直接在 execute 方法的調用線程中運行被拒絕的任務,線程池關閉時則直接丟棄任務)、DiscardOldestPolicy(放棄最久未被處理的,然后提交給線程池重試,線程池關閉時則直接丟棄任務)、DiscardPolicy(直接丟棄被拒絕的任務),當然也可以自己按需實現自定義的拒絕策略。

ThreadPoolExecutor屬性

// 繼承了AbstractExecutorService,AbstractExecutorService定義了基本的任務提交、執行等方法
public class ThreadPoolExecutor extends AbstractExecutorService {
    // 用於表示線程池的運行狀態以及當前活動的線程
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 一個Integer是32bit, 3位用於表示運行狀態,其余用於表示活動線程數
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 線程數最大值
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 任務隊列
    private final BlockingQueue<Runnable> workQueue;
    // 顯示鎖,用於控制workers的訪問等
    private final ReentrantLock mainLock = new ReentrantLock(); 
    // 存儲所有的工作線程,只有在mainLock下才能訪問
    private final HashSet<Worker> workers = new HashSet<Worker>();     
    // 等待中斷條件
    private final Condition termination = mainLock.newCondition();
    // 用於記錄線程池中最大的活動線程數
    private int largestPoolSize;  
    // 任務完成數量
    private long completedTaskCount;    
    // 線程工廠
    private volatile ThreadFactory threadFactory;
    // 拒絕策略
    private volatile RejectedExecutionHandler handler;
    // 空閑線程等待工作時間
    private volatile long keepAliveTime;
    // 為true時使用keepAliveTime作為超時等待,為false所有核心線程(包含空閑的)一直保持存活
    private volatile boolean allowCoreThreadTimeOut;    
    // 核心線程數
    private volatile int corePoolSize;
    // 最大線程數
    private volatile int maximumPoolSize;
    // 默認任務拒絕策略,對拒絕的任務拋出異常
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();  
    // 安全控制訪問(主要用於shutdown和 shutdownNow方法)
    private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");
    ...
}  

ThreadPoolExecutor構造器

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        // 控制參數合法
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        // 任務隊列、線程工廠、拒絕策略不能為空
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        // 使用時間單位unit將keepAliveTime轉成納秒
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        // 使用默認的線程工廠、默認的拒絕策略                             
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        // 使用默認的拒絕策略                
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        // 使用默認的線程工廠                              
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        // 運行的線程少於corePoolSize個
        if (workerCountOf(c) < corePoolSize) {
            // 添加任務
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 此時表示線程池處於運行中,並且將任務添加到隊列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 再次檢查線程池狀態
            if (! isRunning(recheck) && remove(command))
                // 如果線程池不處於運行狀態,將任務從隊列中移除
                // 拒絕任務
                reject(command);
            else if (workerCountOf(recheck) == 0)
                // 運行中的任務數為0,有以下2種情況:
                // 1.線程池處於運行中(加一個空任務是為了保證在隊列里等待的任務可以被喚醒后執行)
                // 2.線程池不處於運行中,remove(command)失敗
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command); 
    }

內部類Worker

Worker類繼承了AQS,實現了Runnable接口,Worker是線程池中用於執行任務的線程。

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        // 線程
        final Thread thread;
        // 需要執行的任務
        Runnable firstTask;
        // 當前線程完成的任務數
        volatile long completedTasks;

        // worker構造器
        Worker(Runnable firstTask) {
            // 將AQS的state設置為-1
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        // 實現runnable了run接口
        public void run() {
            // 執行任務
            runWorker(this);
        }

        // 是否鎖定
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        // 實現AQS的抽象方法,嘗試獲取鎖,將state狀態置為1
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        // 實現AQS的抽象方法,嘗試釋放鎖,將state狀態置為0
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // 獲取鎖
        public void lock()        { acquire(1); }
        // 嘗試獲取鎖
        public boolean tryLock()  { return tryAcquire(1); }
        // 釋放鎖
        public void unlock()      { release(1); }
        // 是否鎖定
        public boolean isLocked() { return isHeldExclusively(); }
        // 中斷已啟動線程
        void interruptIfStarted() {
            Thread t;
            // 線程處於活動狀態
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

主要方法

1. execute方法

先從執行任務方法開始,提交任務submit方法中也是調用的該方法進行任務的執行的。此方法會在活動線程數超過核心線程數corePoolSize時將任務放在任務隊列中等待活動的線程空閑。

    public void execute(Runnable command) {
        // 防止提交null任務
        if (command == null)
            throw new NullPointerException();

        // 獲取ctl
        int c = ctl.get();
        // 活動的線程小於核心線程數
        if (workerCountOf(c) < corePoolSize) {
            // 添加新的線程執行任務,第2個參數為true表示活動的線程數不要超過核心線程數corePoolSize
            if (addWorker(command, true))
                return;
            // 再次獲取ctl
            c = ctl.get();
        }
        // 執行到這里表示活動的線程數大於等於核心線程數
        // isRunning先判斷線程池是否處於運行狀態,然后將任務放入任務隊列等待
        if (isRunning(c) && workQueue.offer(command)) {
            // 再次獲取ctl
            int recheck = ctl.get(); 
            if (! isRunning(recheck) && remove(command))
                // 此時線程池狀態不處於運行狀態中,將任務從隊列中移除
                // 調用任務拒絕策略拒絕任務
                reject(command);
            else if (workerCountOf(recheck) == 0)
                // 執行到線程處於運行狀態或者不處於運行狀態時任務不在任務隊列中,並且線程池的活動線程數為0
                // 添加新的線程來執行一個空的任務,用來喚醒等待中的任務被線程執行
                addWorker(null, false);
        }
        // 執行到這里表示線程池狀態不處於運行狀態或者任務隊列已滿
        else if (!addWorker(command, false))
            reject(command);
    }

再來看看addWorker方法,此方法是用來根據需要創建新的線程執行任務,

    // 參數core,為true時表示活動的線程數不能超過核心線程數corePoolSize,反之則不能超過maximumPoolSize
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            // 獲取ctl
            int c = ctl.get();
            // 當前線程池的狀態
            int rs = runStateOf(c);

            /*  此處的表達式可以轉化成如下所示的表達式
                rs >= SHUTDOWN && rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()
                此表達式為真的情況有如下幾種:
                1. 線程池運行狀態大於SHUTDOWN(即為STOP、TIDYING、TERMINATED)
                2. 線程池運行狀態為SHUTDOWN且任務不為null
                3. 線程池運行狀態為SHUTDOWN且任務為null,任務隊列為空

                也就是之前提到的當狀態為SHUTDOWN時,不再允許添加新的任務,但是會執行已在任務隊列中的任務;
                當狀態為STOP、TIDYING、TERMINATED時表示不會再處理任務
            */
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            /*  執行到此處的情況為以下幾種:
                1. 線程池狀態為RUNNING狀態
                2. 運行狀態為SHUTDOWN, 任務為null,且任務隊列不為空
            */
            // 自旋
            for (;;) {
                // 獲取當前活動的線程數
                int wc = workerCountOf(c);
                // 當core為true時,只要當前線程數大於等於核心線程數corePoolSize,就不再往下執行
                // 為core為false,只要當前線程數大於等於核心線程數maximumPoolSize,就不再往下執行
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 通過CAS原子操作給ctl增加1
                if (compareAndIncrementWorkerCount(c))
                    //原子操作執行成功后直接跳出外部的循環
                    break retry;
                // 原子操作失敗后,重新獲取ctl
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    // 由於運行狀態變化導致之前的CAS原子操作失敗,回到外循環判斷此時狀態是否需要退出
                    continue retry;
                // 執行到這里表示是其它線程提交任務導致CAS原子操作失敗,通過內部循環再次操作
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        // 執行到這里表示已通過原子操作改變了ctl的值,遞增了一次ctl的值
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 構造新的worker對象
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 同步
                mainLock.lock();
                try {

                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    // 獲取當前線程池運行狀態
                    int rs = runStateOf(ctl.get());

                    /*
                     下列表達式為真的情況:
                     1. 線程池狀態為RUNNING
                     2. 線程池狀態為SHUTDOWN且提交的任務為null
                    */
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 當前worker中的線程已被啟動,則拋出宜昌
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 將worker放入集合中
                        workers.add(w);
                        // 獲取當前worker的數量
                        int s = workers.size();
                        // 如果當前worker的數量超過largestPoolSize,則需要更新largestPoolSize的值
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        // 新的worker線程已創建
                        workerAdded = true;
                    }
                } finally {
                    // 釋放同步
                    mainLock.unlock();
                }
                // 新的worker線程已創建
                if (workerAdded) {
                    // 啟動當前worker線程,會執行worker實現的run方法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                // 任務啟動失敗, 回滾操作
                addWorkerFailed(w);
        }
        return workerStarted;
    }

Woker的run方法會去執行runWorker方法,注意runWorker方法會先執行提交給Worker中的任務firstTask,如果firstTask為null則會去任務隊列中取任務來執行,且當任務執行后,會再去隊列中取任務來執行。

        // worker實現的run方法
        public void run() {
            // 調用線程池的方法runWorker
            runWorker(this);
        }
    // 執行任務
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        // 將worker中的任務置為null
        w.firstTask = null;
        // 釋放worker的狀態,允許被其它線程打斷
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 執行任務,直到隊列中的任務被取完
            while (task != null || (task = getTask()) != null) {
                // work加鎖
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                // 線程池為RUNNING、SHUTDOWN、STOP
                // 線程池為RUNNING、SHUTDOWN、STOP且當前線程已中斷
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    // 中斷線程
                    wt.interrupt();
                try {
                    // 在task執行任務前執行的方法,該方法為空方法,需要時由子類來重寫實現邏輯
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 執行task任務
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 在task執行任務后執行的方法,該方法為空方法,需要時由子類來重寫實現邏輯
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 任務執行完后將task置為null,用來執行任務隊列中的任務
                    task = null;
                    // 該worker線程執行的任務遞增一次
                    w.completedTasks++;
                    // 解鎖
                    w.unlock();
                }
            }
            // 所有任務執行完
            completedAbruptly = false;
        } finally {
            // 當任務隊列沒有任務執行或者線程池被關閉shutdown或shutdownNow時調用
            processWorkerExit(w, completedAbruptly);
        }
    }  

    // 從隊列中獲取任務
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 線程池狀態為STOP、TIDYING、TERMINATED或者線程為RUNNING、SHUTDOWN且任務隊列為空
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 遞減線程數
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 當線程使用keepAliveTime超時等待獲取任務或者活躍線程數已超過核心線程數
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            /*當下列情況出現時:
                1. 活躍的線程數大於maximumPoolSize且活躍線程大於1或任務隊列為空
                2. 從任務隊列獲取任務超時且活躍線程大於1或者任務隊列為空
            */
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                // 遞減活躍線程數
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // timed 為真時
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                // 表示r 為null
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }        

再看processWorkerExit方法

    // 處理
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 線程被中斷
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            // 遞減活動線程的數量
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 計算線程池完成的任務數
            completedTaskCount += w.completedTasks;
            // 從worker集合中移除w
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        //嘗試終止
        tryTerminate();

        int c = ctl.get();
        // 當線程池狀態為TIDYING、TERMINATED
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                // completedAbruptly為false表示任務隊列里的任務被執行完
                // allowCoreThreadTimeOut為true則min為0,反之為corePoolSize
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // min為0,且任務隊列不為空,表示還有任務需要執行
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                // 若活動的線程大於min
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            /* 執行到這里表示
               1. completedAbruptly為true,表示在任務執行時中斷過,保證線程執行任務
               2. 活動線程小於min,保證任務隊列中的任務被執行或者讓核心線程處於空閑等待狀態
            */
            addWorker(null, false);
        }
    }

再來看addWorkerFailed,該方法用來將已創建的worker線程回滾到之前的狀態

    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                // worker不為null時,將其從workers集合中移除
                workers.remove(w);
            // 遞減活動線程數量
            decrementWorkerCount();
            // 嘗試中斷
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

    // 通過遞減ctl來遞減活動線程數量
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

    // 嘗試終止,執行此方法后線程池的狀態會先STOP變為TIDYING,再變為TERMINATED
    final void tryTerminate() {
        // 自旋
        for (;;) {
            int c = ctl.get(); 
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                // 此時的狀態為RUNNING、SHUTDOWN、STOP、TIDYING或狀態為SHUTDOWN且任務隊列不為空
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                // 活躍的線程大於0時,中斷其中一個空閑worker的線程
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // CAS原子操作,將ctl狀態置為TIDYING,且worker數量置為0
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 執行terminated,默認是空方法,子類可以重寫實現邏輯
                        terminated();
                    } finally {
                        // 將ctl狀態置為TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

2. shutdown方法

shutdown方法的作用是不再接收新任務,但是還是會處理任務隊列中的任務,並且不會中斷已在中執行中的任務線程,執行此方法后線程池的狀態會被設置為SHUTDOWN

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 檢查是否有每個worker的訪問權限
            checkShutdownAccess();
            // 自旋設置線程池的狀態為SHUTDOWN
            advanceRunState(SHUTDOWN);
            // 中斷所有空閑的worker線程
            interruptIdleWorkers();
            // 空的方法,由子類實現重寫實現邏輯,比如ScheduledThreadPoolExecutor就實現了
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        // 嘗試終止
        tryTerminate();
    }

 

再來看advanceRunState方法,此方法自旋保證設置成目標狀態(目前狀態為SHUTDOWN或STOP)

    // 設置線程池的狀態
    private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            // 當線程的狀態大於目標狀態或者將當前狀態設置成目標狀態時跳出循環
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }
interruptIdleWorkers用來中斷所有空閑的線程

    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }

    // 根據參數來判斷是否中斷單個進程或者中斷所有進程
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 遍歷所有活動線程
            for (Worker w : workers) {
                Thread t = w.thread;
                // 線程t未中斷
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        // 中斷線程t
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                // 若為true中斷一個線程后直接跳出
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }   

3. shutdownNow方法

shutdownNow方法的作用是不再接收新任務,並且會清除任務隊列中的任務,會中斷所有任務線程包括已在執行任務的線程,執行此方法后線程池的狀態會被設置為STOP

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 檢查是否有每個worker的訪問權限
            checkShutdownAccess();
            // 自旋設置線程池的狀態為STOP
            advanceRunState(STOP);
            // 中斷所有線程
            interruptWorkers();
            // 清除任務隊列
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        // 嘗試終止
        tryTerminate();
        return tasks;
    }

drainQueue方法用來清除任務隊列中的任務,返回值為隊列中的任務

    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        // 將任務隊列中任務清除,然后將其添加到taskList待返回
        q.drainTo(taskList);
        // 這里再次判斷下任務隊列中是否已為空,主要防止BlockingQueue的實現為延遲隊列
        // 此時會將其一個個的遍歷清除
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }

到此源碼分析已完畢。

接下來看下Executors怎么利用工廠方法來創建常見的線程池

比如創建一個根據需要創建新線程的線程池,此線程池將corePoolSize設置為0,maximumPoolSize設置為Integer.MAX_VALUE,空閑線程等待時間為60秒,使用的是SynchronousQueue阻塞隊列。 
就是說每提交一個新任務都是先進入SynchronousQueue隊列,此隊列不會保存任務,它將任務直接提交給線程,然后如果有空閑線程在60秒內能獲取到任務,則用該線程執行這個獲取到的任務,沒有空閑線程時會創建新的線程來執行任務。空閑線程在60秒后如果沒有獲取到任務時會被從線程池中清除掉。最大活動線程的數量設置的是Integer.MAX_VALUE,而實際是小於CAPACITY(即229-1)。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

創建一個固定線程數量的的線程池,此線程池是設置corePoolSize、maximumPoolSize一樣為nThreads個,使用的無界隊列LinkedBlockingQueue(容量為Integer.MAX_VALUE)。 
就是說每提交一個新任務都是創建新的線程,直到線程數量超過corePoolSize,然后提交的任務會進入無界隊列中等待,在等待的任務會在有空閑線程時才會被執行。線程一旦創建就不會被清除,會一直存在於線程池中以復用來執行任務,直到調用shutdown方法或者由於執行任務時出錯中斷。

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

總結

通過以上分析,需要根據實際需要創建合適的線程池,如果對線程池的原理比較了解,最好直接使用線程池的構造方法來構造線程池,那樣你才能更為准確的控制核心線程數,最大線程數,線程空閑多少時間被清除,以及合適的拒絕策略,讓線程池更可控。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM