ThreadPoolExecutor 線程池的源碼解析


 正文前先來一波福利推薦:

 福利一:

百萬年薪架構師視頻,該視頻可以學到很多東西,是本人花錢買的VIP課程,學習消化了一年,為了支持一下女朋友公眾號也方便大家學習,共享給大家。

福利二:

畢業答辯以及工作上各種答辯,平時積累了不少精品PPT,現在共享給大家,大大小小加起來有幾千套,總有適合你的一款,很多是網上是下載不到。

獲取方式:

微信關注 精品3分鍾 ,id為 jingpin3mins,關注后回復   百萬年薪架構師 ,精品收藏PPT  獲取雲盤鏈接,謝謝大家支持!

-----------------------正文開始---------------------------

 

1、背景介紹

  上一篇從整體上介紹了Executor接口,從上一篇我們知道了Executor框架的最頂層實現是ThreadPoolExecutor類,Executors工廠類中提供的newScheduledThreadPoolnewFixedThreadPoolnewCachedThreadPool方法

其實本質上也只是ThreadPoolExecutor的構造函數參數不同而已。通過傳入不同的參數,就可以構造出適用於不同應用場景下的線程池,那么它的底層原理是怎樣實現的呢,這篇就來介紹下ThreadPoolExecutor線程池的運行過程。

  java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,因此如果要透徹地了解Java中的線程池,必須先了解這個類。下面我們來看一下ThreadPoolExecutor類的具體實現源碼。

  在ThreadPoolExecutor類中提供了四個構造方法:

復制代碼
public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}
復制代碼

 

從上面的代碼可以得知,ThreadPoolExecutor繼承了AbstractExecutorService類,並提供了四個構造器,事實上,通過觀察每個構造器的源碼具體實現,發現前面三個構造器都是調用的第四個構造器進行的初始化工作。

ThreadPoolExecutor的完整構造方法的簽名是:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 

 

  • corePoolSize:核心池的大小,這個參數跟后面講述的線程池的實現原理有非常大的關系。在創建了線程池后,默認情況下,線程池中並沒有 任何線程,而是等待有任務到來才創建線程去執行任務,除非調用了prestartAllCoreThreads()或者 prestartCoreThread()方法,從這2個方法的名字就可以看出,是預創建線程的意思,即在沒有任務到來之前就創建 corePoolSize個線程或者一個線程。
  • 默認情況下,在創建了線程池后,線程池中的線程數為0,當有任務來之后,就會創建一個線程去執行任務,當線 程池中的線程數目達到corePoolSize后,就會把到達的任務放到緩存隊列當中;
  • maximumPoolSize:線程池最大線程數,這個參數也是一個非常重要的參數,它表示在線程池中最多能創建多少個線程;
  • keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。默認情況下,只有當線程池中的線程數大於corePoolSize 時,keepAliveTime才會起作用,直到線程池中的線程數不大於corePoolSize。即當線程池中的線程數大於corePoolSize 時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了 allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize 時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;
  • unit:參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:
復制代碼
TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小時
TimeUnit.MINUTES;           //分鍾
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //納秒
復制代碼
  • workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,一般來說,這里的阻塞隊列有以下幾種選擇:
    ArrayBlockingQueue; //有界隊列
    LinkedBlockingQueue; //無界隊列
    SynchronousQueue; //特殊的一個隊列,只有存在等待取出的線程時才能加入隊列,可以說容量為1,是無界隊列
    PriorityBlockingQueue

     ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關。

  • threadFactory:線程工廠,主要用來創建線程;
  • handler:表示當拒絕處理任務時的策略,有以下四種取值:
    ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。 
    ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。 
    ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程)
    ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務 

     

    ThreadPoolExecutorExecutors類的底層實現。

    在JDK幫助文檔中,有如此一段話:

    “強烈建議程序員使用較為方便的 Executors 工廠方法 Executors.newCachedThreadPool()(無界線程池,可以進行自動線程回收)、Executors.newFixedThreadPool(int)(固定大小線程池)Executors.newSingleThreadExecutor()(單個后台線程)

    它們均為大多數使用場景預定義了設置。”

    下面介紹一下幾個類的源碼:

    ExecutorService  es =  executor.newFixedThreadPool (int nThreads):固定大小線程池。

    可以看到,corePoolSize和maximumPoolSize的大小是一樣的(實際上,后面會介紹,如果使用無界queue的話 maximumPoolSize參數是沒有意義的),keepAliveTime和unit的設值表明什么?-就是該實現不想keep alive!最后的BlockingQueue選擇了LinkedBlockingQueue,該queue有一個特點,他是無界的,為什么是使用無界的LinkedBlockingqueue 是因為無法知道用戶設置多大的固定線程數量,但是數量肯定在無界的范圍內。

public static ExecutorService newFixedThreadPool(int nThreads) {   
          return new ThreadPoolExecutor(nThreads, nThreads,   
                                        0L, TimeUnit.MILLISECONDS,   
                                       new LinkedBlockingQueue<Runnable>());   
      }

  ExecutorService   newSingleThreadExecutor():單線程

復制代碼
 public static ExecutorService newSingleThreadExecutor() {   
          return new FinalizableDelegatedExecutorService   
              (new ThreadPoolExecutor(1, 1,   
                                      0L, TimeUnit.MILLISECONDS,   
                                      new LinkedBlockingQueue<Runnable>()));   
      }
復制代碼

ExecutorService newCachedThreadPool()無界線程池,可以進行自動線程回收

這個實現就有意思了。首先是無界的線程池,所以我們可以發現maximumPoolSize為big big。其次BlockingQueue的選擇上使用SynchronousQueue。可能對於該BlockingQueue有些陌生,簡單說:該 QUEUE中,每個插入操作必須等待另一個線程的對應移除操作。

public static ExecutorService newCachedThreadPool() {   
           return new ThreadPoolExecutor(0, Integer.MAX_VALUE,   
                                       60L, TimeUnit.SECONDS,   
                                        new SynchronousQueue<Runnable>());   
    }

 

從上面給出的ThreadPoolExecutor類的代碼可以知道,ThreadPoolExecutor繼承了AbstractExecutorService,AbstractExecutorService是一個抽象類,它實現了ExecutorService接口。

public abstract class AbstractExecutorService implements ExecutorService 

 

 而ExecutorService又是繼承了Executor接口

 

public interface ExecutorService extends Executor 

 

 Executor

|-- ExecutorService 
  |-- AbstractExecutorService
    |-- ThreadPoolExecutor

 

我們看一下Executor接口的實現:

public interface Executor {
    void execute(Runnable command);
}

 

到這里,大家應該明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor幾個之間的關系了。

  Executor是一個頂層接口,在它里面只聲明了一個方法execute(Runnable),返回值為void,參數為Runnable類型,從字面意思可以理解,就是用來執行傳進去的任務的;

  然后ExecutorService接口繼承了Executor接口,並聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

  抽象類AbstractExecutorService實現了ExecutorService接口,基本實現了ExecutorService中聲明的所有方法;

  然后ThreadPoolExecutor繼承了類AbstractExecutorService。

  在ThreadPoolExecutor類中有幾個非常重要的方法:

復制代碼
public void execute(Runnable command)

public <T> Future<T> submit(Callable<T> task)

public void shutdown()

public List<Runnable> shutdownNow()  //返回未執行的任務
復制代碼

execute()方法實際上是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向線程池提交一個任務,交由線程池去執行。

  submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中 並沒有對其進行重寫,這個方法也是用來向線程池提交任務的,但是它和execute()方法不同,它能夠返回任務執行的結果,去看submit()方法的 實現,會發現它實際上還是調用的execute()方法,只不過它利用了Future來獲取任務執行結果(Future相關內容已在其他文章寫過)。

  shutdown()和shutdownNow()是用來關閉線程池的。

 

2、具體源碼實現

既然要講運行過程,那么首先要了解下線程池的狀態分為哪些?

volatile int runState;
static final int RUNNING    = 0;
static final int SHUTDOWN   = 1;
static final int STOP       = 2;
static final int TERMINATED = 3;

ThreadPoolExecutor代碼中定義了上面幾個變量:定義了一個volatile變量runState,以及其他幾個表示狀態的常量。
runState:初始狀態,表示當前線程池的運行狀態,它的值就是上面的那4個常量值之一

RUNNING:線程池接受新任務並執行隊列任務中...

SHUTDOWN:不再接受新任務,但是會繼續執行等待隊列Queued中的任務。當調用了shutdown()方法,會從 RUNNING -> SHUTDOWN

STOP:不再接受新任務,同時也不執行等待隊列Queued中的任務,並且會嘗試終止正在執行中的任務。當調用了shutdownNow()方法, 會從(RUNNING or SHUTDOWN) -> STOP

TERMINATED:線程池中所有線程已經停止運行,其他行為同 STOP狀態。

  • 當等待隊列和線程池為空時,會從SHUTDOWN -> TERMINATED
  • 當線程池為空時,會從STOP -> TERMINATED

2.線程池運行任務

2.1變量介紹

在講解運行過程前,我們先看下ThreadPoolExecutor中的幾個比較重要的成員變量:

private final BlockingQueue<Runnable> workQueue; //任務緩存隊列,用來保存等待中的任務,等待worker線程空閑時執行任務
private final ReentrantLock mainLock = new ReentrantLock(); //更新 poolSize, corePoolSize,maximumPoolSize, runState, and workers set 時需要持有這個鎖
private final HashSet<Worker> workers = new HashSet<Worker>(); //用來保存工作中的執行線程
private volatile long  keepAliveTime; //超過corePoolSize外的線程空閑存活之間
private volatile boolean allowCoreThreadTimeOut; //是否對corePoolSize內的線程設置空閑存活時間
private volatile int   corePoolSize; //核心線程數
private volatile int   maximumPoolSize; //最大線程數(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列)
private volatile int   poolSize; //線程池中的當前線程數
private volatile RejectedExecutionHandler handler; //任務拒絕策略
private volatile ThreadFactory threadFactory; //線程工廠,用來新建線程
private int largestPoolSize; //記錄線程池中出現過的最大線程數大小
private long completedTaskCount; //已經執行完的線程數

 

這邊重點解釋下 corePoolSizemaximumPoolSizeworkQueue兩個變量,這兩個變量涉及到線程池中創建線程個數的一個策略。
corePoolSize: 這個變量我們可以理解為線程池的核心大小,舉個例子來說明(corePoolSize假設等於10,maximumPoolSize等於20):

有一個部門,其中有10(corePoolSize)名工人,當有新任務來了后,領導就分配任務給工人去做,每個工人只能做一個任務。
當10個工人都在忙時,新來的任務就要放到隊列(workQueue)中等待。
當任務越積累越多,遠遠超過工人做任務的速度時,領導就想了一個辦法:從其他部門借10個工人來,借的數量有一個公式(maximumPoolSize - corePoolSize)來計算。然后把新來的任務分配給借來的工人來做。
但是如果速度還是還不急的話,可能就要采取措施來放棄一些任務了(RejectedExecutionHandler)。
等到一定時間后,任務都完成了,工人比較閑的情況下,就考慮把借來的10個工人還回去(根據keepAliveTime判斷)
也就是說corePoolSize就是線程池大小,maximumPoolSize在我看來是線程池的一種補救措施,即任務量突然過大時的一種補救措施。

2.2線程執行過程

先看下前一篇文章中的一個例子:

ExecutorService executor = Executors.newFixedThreadPool(3);

        IntStream.range(0, 6).forEach(
      i -> executor.execute(() -> { String threadName = Thread.currentThread().getName(); System.out.println("finished: " + threadName); }
  )

);

 

上面代碼就是新建6個任務,然后扔到線程池中運行,輸出線程名稱,直到運行完畢。其中最核心的方法就是execute()方法,雖然submit()也可以執行任務,但它底層也是調用execute()方法,所以懂了execute()的實現原理即可:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {   //1.
            if (runState == RUNNING && workQueue.offer(command)) {    //2.
                if (runState != RUNNING || poolSize == 0)   //3.
                    ensureQueuedTaskHandled(command);  //4.
            }
            else if (!addIfUnderMaximumPoolSize(command))  //5.
                reject(command); // is shutdown or saturated //6
        }
}
 

上面的代碼看起來邏輯有點復雜,我們一個一個看,首先看上面1位置處:
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
是一個或表達式,它分成兩部分

  1. 首先判斷當前線程數是否大於等於核心線程數,是的話直接進入if語句塊中,否則判斷第二個部分
  2. 第二個部分addIfUnderCorePoolSize(command) ,這個方法是當線程數小於核心線程數時,用來新建線程執行任務(因為線程數小於corePoolSize時,直接新建線程來運行任務,不管當前線程池里有沒有空閑的線程)。如果新建失敗,那么進入if語句塊,成功了那么execute方法就執行結束了,因為線程已經新建成功了,任務已經開始在線程池中運行。

進入if語句塊后,看上面代碼2.if (runState == RUNNING && workQueue.offer(command))

  1. 判斷當前線程池狀態是否是RUNNING 而且 任務放入等待隊列中成功,那么直接進入if語句塊
  2. 否則到代碼5.處 if (!addIfUnderMaximumPoolSize(command)),判斷新任務用新線程執行是否成功(注:這里的新線程就是我們上面講的 “借來的工人” maximumPoolSize
  3. 如果“借來的工人”還是處理不了的話,執行任務拒絕策略

繼續進到代碼塊3 的if語句塊if (runState != RUNNING || poolSize == 0), 因為新任務加入到等待隊列中了,這句判斷是為了防止在將此任務添加進任務緩存隊列的同時其他線程突然調用shutdown或者shutdownNow方法關閉了線程池的一種應急措施。如果是的話,應急處理加入的新任務 ensureQueuedTaskHandled(command)


我們看下兩個關鍵方法的實現:
##### 1.addIfUnderCorePoolSize

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        return t != null;
    }

 

首先獲取鎖,因為涉及到線程池狀態的變化。然后再次判斷 if (poolSize < corePoolSize && runState == RUNNING),在execute()方法中我們已經判斷過一次,這邊再次判斷是為了防止其他線程又新增了新線程或者調用了shutdown、shutdownNow方法,這邊起到了雙重檢查的一個效果。如果為true的話,進行t = addThread(firstTask)新增線程執行任務。addThread方法里面比較簡單,就是通過線程工廠創建線程thread,然后封裝到Worker對象中,加入到 workers隊列中,並執行線程,可以把Worker對象看成是擁有一個線程的對象。

private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);
        Thread t = threadFactory.newThread(w);
        boolean workerStarted = false;
        if (t != null) {
            w.thread = t;
            workers.add(w); //用來保存工作中執行的線程
            int nt = ++poolSize;
            if (nt > largestPoolSize)
                largestPoolSize = nt;
            try {
                t.start(); //執行的是Worker對象中的run方法 Worker類實現了Runable接口
                workerStarted = true;
            }
        }
        return t;
    }

 

這里在介紹下Worker對象, 它實現了Runnable接口,你把它當成Runnable的一個代理類即可,最終也是執行它的run方法。只要注意一下Worker中的beforeExecuteafterExecute方法,這兩個方法在ThreadPoolExecutor中沒有具體實現,用戶可以重寫這個方法和后面的afterExecute方法來進行一些統計信息,比如某個任務的執行時間等,而afterExecute方法還有一個Throwable t參數,用戶可以用來記錄一些異常信息,因為新線程中的異常時捕獲不到的,需要在afterExecute中記錄。
看起來這個是不是和spring 切面有點像,可以看到 知識都是相通的。
看一下它的run方法:

public void run() {
            try {
                hasRun = true;
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = getTask()) != null) {  //1
                    runTask(task);                     task = null;                 }
            } finally {
                workerDone(this);
            }
        }

 

注意代碼塊1,可以看到這邊在循環獲取任務,並執行,直到任務全部執行完畢。除了第一個任務,其他任務都是通過getTask()方法去取,這個方法是ThreadPoolExecutor中的一個方法。我們猜一下,整個類中只有任務緩存隊列中保存了任務,應該就是去緩存隊列中取了。

Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll(); //取任務
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果線程數大於核心池大小或者允許為核心池線程設置空閑時間,
                //非核心線程通過poll取任務,若等待一定的時間取不到任務,則返回null
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); //制定時間過后再去進行poll取值 
            else
                r = workQueue.take(); //阻塞取值
            if (r != null)
                return r;
            if (workerCanExit()) {    //如果沒取到任務,即r為null,則判斷當前的worker是否可以退出
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();   //中斷處於空閑狀態的worker
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}

 

這里有一個非常巧妙的設計方式,假如我們來設計線程池,可能會有一個任務分派線程,當發現有線程空閑時,就從任務緩存隊列中取一個任務交給 空閑線程執行。但是在這里,並沒有采用這樣的方式,因為這樣會要額外地對任務分派線程進行管理,無形地會增加難度和復雜度,這里直接讓執行完任務的線程Worker去任務緩存隊列里面取任務來執行,因為每一個Worker里面都包含了一個線程thread,如果取到則進行處理,如果取不到則會阻塞在那指導取到任務去執行;

工作隊列workQueue會一直去拿任務,屬於核心線程的會一直卡在 workQueue.take()方法,直到拿到Runnable 然后返回,非核心線程會 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ,如果超時還沒有拿到,下一次循環判斷compareAndDecrementWorkerCount就會返回null,Worker對象的run()方法循環體的判斷為null,任務結束,然后線程被系統回收

下面看一下另一個方法 runTask()

private void runTask(Runnable task) {
            final ReentrantLock runLock = this.runLock;
            runLock.lock(); try {
                /*
                 * If pool is stopping ensure thread is interrupted;
                 * if not, ensure thread is not interrupted. This requires
                 * a double-check of state in case the interrupt was
                 * cleared concurrently with a shutdownNow -- if so,
                 * the interrupt is re-enabled.
                 */
         //當線程池的執行狀態為關閉等,則執行當前線程的interrupt()操作
                if ((runState >= STOP ||(Thread.interrupted() && runState >= STOP)) && hasRun)
                    thread.interrupt();
                /*
                 * Track execution state to ensure that afterExecute
                 * is called only if task completed or threw
                 * exception. Otherwise, the caught runtime exception
                 * will have been thrown by afterExecute itself, in
                 * which case we don't want to call it again.
                 */
                boolean ran = false;
                beforeExecute(thread, task); try {
                 //任務執行
 task.run();
                    ran = true;
                    afterExecute(task, null);
                    ++completedTasks;
                } catch (RuntimeException ex) {
                    if (!ran)
                        afterExecute(task, ex); throw ex;
                }
            } finally {
                runLock.unlock();
            }
        }

把上邊的方法放在一個整體代碼中看,理解起來更容易一些,代碼如下:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

 

還有一個方法是 每執行完一個線程任務 會調用 workerDone

 //記錄執行任務數量,將工作線程移除,當poolSize為0是則嘗試關閉線程池
    void workerDone(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w); if (--poolSize == 0)
                tryTerminate();
        } finally {
            mainLock.unlock();
        }

 

 


2. addIfUnderMaximumPoolSize

這個方法的實現思想和 addIfUnderCorePoolSize方法的實現思想非常相似,唯一的區別在於addIfUnderMaximumPoolSize方法是在線程 池中的線程數達到了核心池大小並且往任務隊列中添加任務失敗的情況下執行的:

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < maximumPoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        return t != null;
    }

 

到這里,大部分朋友應該對任務提交給線程池之后到被執行的整個過程有了一個基本的了解,下面總結一下:

  1. 首先,要清楚corePoolSize和maximumPoolSize的含義;
  2. 其次,要知道Worker是用來起到什么作用的;
  3. 要知道任務提交給線程池之后的處理策略,這里總結一下主要有4點:
  • 如果當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;
  • 如果當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程將其取出去執行;若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務;
  • 如果當前線程池中的線程數目達到maximumPoolSize,則會采取任務拒絕策略進行處理;
  • 如果線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於 corePoolSize;如果允許為核心池中的線程設置存活時間,那么核心池中的線程空閑時間超過keepAliveTime,線程也會被終止。

 


我的博客即將搬運同步至騰訊雲+社區,邀請大家一同入駐:https://cloud.tencent.com/developer/support-plan?invite_code=2da2gtpfllwkk 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM