Executor線程池原理詳解


線程池

    線程池的目的就是減少多線程創建的開銷,減少資源的消耗,讓系統更加的穩定。在web開發中,服務器會為了一個請求分配一個線程來處理,如果每次請求都創建一個線程,請求結束就銷毀這個線程。那么在高並發的情況下,就會有大量線程創建和銷毀,這就會降低系統的效率。線程池的誕生就是為了讓線程得到重復使用,減少了線程創建和銷毀的開銷,減少了線程的創建和銷毀自然的就提高了系統的響應速度,與此同時還提高了線程的管理性,使線程可以得到統一的分配,監控和調優。

   線程創建和銷毀為什么會有開銷呢,因為我們java運行的線程是依賴於計算機內核的核心線程的。java創建的線程是用戶層的線程,要依賴於線程調度去是用內核層的線程來執行,在執行銷毀的時候會通過TSS在用戶層和核心層的切換,這個切換就是很大的一筆開銷。具體結構如下圖:

 

 

 

線程實現方式

線程主要通過實現Runnable或者Callable接口來實現.Runnable與Callable的區別在於后者有返回值,但是前者沒有返回值。

public interface Runnable {
    public abstract void run(); } publuic interface Callable<V>{ V call() throws Exception; }

下面我們來看一下測試代碼:

package com.test.excutor;

import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; public class ExcutorTest { public static void main(String[] args) { Thread t =new Thread( new RunTask()); t.start(); FutureTask<Object> ft=new FutureTask<Object>(new CallTask()); Thread f=new Thread(ft); f.start(); try { System.out.println("callTask output:"+(String)ft.get()); } catch (InterruptedException e) { // TODO Auto-generated catch block  e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block  e.printStackTrace(); } } } class RunTask implements Runnable{ @Override public void run() { System.out.println(" RunTask Thread Name is "+Thread.currentThread().getName()); } } class CallTask implements Callable<Object>{ @Override public Object call() throws Exception { TimeUnit.SECONDS.sleep(1); //System.out.println("This is callTask"); return "callTask answer"; } }
//運行結果:

RunTask Thread Name is Thread-0
callTask output:callTask answer

 

什么時候使用線程池:

1、單個任務處理時間比較短

2、需要處理的任務數量很大

Executor框架

Executor接口是Executor框架的一個最基本的接口,Executor框架的大部分類都直接或間接地實現了此接口。

它只有一個方法

void execute(Runnable command): 在未來某個時間執行給定的命令。該命令可能在新的線程、已入池的線程或者正調用的線程中執行,這由 Executor 實現決定。

以下是框架圖:

 

 從以上圖中可以看出ExecutorService就是繼承了Executor接口的一個重要接口類。在這個接口類中定義了線程池的具體行為:

1、execute(Runnable command):履行Ruannable類型的任務,
2、submit(task):可用來提交Callable或Runnable任務,並返回代表此任務的Future對象
3、shutdown():在完成已提交的任務后封閉辦事,不再接管新任務,
4、shutdownNow():停止所有正在履行的任務並封閉辦事。
5、isTerminated():測試是否所有任務都履行完畢了。
6、isShutdown():測試是否該ExecutorService已被關閉。 
7、awaitTermination(long timeout, TimeUnit unit):阻塞,直到關閉后所有任務都已完成執行。請求,或發生超時,或當前線程中斷,以先發生者為准。
8、submit(Callable<T> task):提交一個返回值的任務用於執行,返回一個表示任務的未決結果的 Future。 該 Future 的 get 方法在成功完成時將會返回該任務的結果。如果想立即阻塞任務的等待,則可以使用 result = exec.submit(aCallable).get(); 形式的構造。
9、submit(Runnable task, T result):提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。該 Future 的 get 方法在成功完成時將會返回給定的結果。
10、submit(Runnable task):提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。該 Future 的 get 方法在成功 完成時將會返回 null。

11、invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException:執行給定的任務,當所有任務完成時,返回保持任務狀態和結果的 Future 列表。返回列表的所有元素的 Future.isDone() 為 true。注意,可以正常地或通過拋出異常來終止已完成 任務。如果正在進行此操作時修改了給定的 collection,則此方法的結果是不確定的。

12、invokeAll(Collection<? extends Callable<T>> tasks, long timeout,TimeUnit unit) throws InterruptedException:超時等待,同上。

13、invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException,ExecutionException:與 invokeAll的區別是,任務列表里只要有一個任務完成了,就立即返回。而且一旦正常或異常返回后,則取消尚未完成的任務。

14、invokeAll(Collection<? extends Callable<T>> tasks,long timeout,TimeUnit unit) throws InterruptedException: 超時等待,同上

線程池的創建

 線程池的創建是通過Executors來創建的。比如說你需要創建一個固定大小的線程池我們可以使用Executors.newFixedThreadPool(n)來實現。當然還有很多其他的方法

我們來看一下:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
//只允許一個線程執行 public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); } //可以無限的創建線程,會把創建的線程放在一個特殊的隊列中去排隊 public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); } public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); }
//定時線程池 
public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }
 
         

 

 
 
newFixedThreadPool(int nThreads)方法中的幾個參數如下:
nThreads:核心線程數量
nThreads:最大線程池的數量 0L: 如果線程已經工作完畢,沒有任務調用,該線程最大的存活時間
TimeUnit.MILLISECONDS:計時時間,單位為ms new LinkedBlockingQueue<Runnable>():當核心線程全都在工作,沒有空閑,此時會將多余的線程放到阻塞隊列中排隊,當核心線程執行完成以后再從阻塞隊列當中拿出來繼續執行。
下面我們來跑一段代碼:
public class ExecutorTest {    
    public static void main(String[] args) { Thread t =new Thread( new RunTask()); t.start(); FutureTask<Object> ft=new FutureTask<Object>(new CallTask()); Thread f=new Thread(ft); f.start(); try { System.out.println("callTask output:"+(String)ft.get()); } catch (InterruptedException e) { // TODO Auto-generated catch block  e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block  e.printStackTrace(); } System.out.println("Executor Test Start"); ExecutorService exe=Executors.newFixedThreadPool(5); for(int i=0;i<20;i++){ exe.execute(new RunTask()); exe.submit(new RunTask()); } /*//驗證線程創建慢 if(exe.isShutdown()){ System.out.println("Thread 已經stop,等待線程創建"); exe.execute(new RunTask()); System.out.println("Executor Test end"); }else{ exe.shutdownNow(); } */ } } //結果如下: RunTask Thread Name is Thread-0 callTask output:callTask answer Executor Test Start RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-2 RunTask Thread Name is pool-1-thread-2 RunTask Thread Name is pool-1-thread-2 RunTask Thread Name is pool-1-thread-2 RunTask Thread Name is pool-1-thread-2 RunTask Thread Name is pool-1-thread-2 RunTask Thread Name is pool-1-thread-2 RunTask Thread Name is pool-1-thread-2 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-2 RunTask Thread Name is pool-1-thread-5 RunTask Thread Name is pool-1-thread-5 RunTask Thread Name is pool-1-thread-5 RunTask Thread Name is pool-1-thread-5 RunTask Thread Name is pool-1-thread-5 RunTask Thread Name is pool-1-thread-5 RunTask Thread Name is pool-1-thread-5 RunTask Thread Name is pool-1-thread-5 RunTask Thread Name is pool-1-thread-5 RunTask Thread Name is pool-1-thread-5 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-4 RunTask Thread Name is pool-1-thread-3 RunTask Thread Name is pool-1-thread-2

根據代碼的測試結果我們可以看出線程1,線程2已經跑了好久了線程3,4,5才創建好,這個從側面驗證了線程的創建是很耗時的。

具體我們來看一下線程池類的工作流程和工作原理:

線程池線程的大小=核心線程+非核心線程

 

 線程池的工作原理如上圖所示已經非常清晰了,文字描述一下具體步驟:

1、啟動線程池執行任務的時候先創建核心線程來執行任務;

2、核心線程數量創建達到規定值以后,還有任務沒有線程執行的話就將任務放到阻塞隊列中取排隊等待;

3、等到隊列排滿了還有線程需要執行的話就創建非核心線程;

4、非核心線程還不夠執行任務的話就直接執行拒絕策略。

線程池的重要屬性

  在ThreadPoolExecutor中有以下幾個比較重要的屬性:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

  ctl是一個控制狀態,對線程池的運行狀態和線程池中的有效線程數量進行控制的一個字段。它包含兩部分的信息:

  1、線程池的運行狀態(runState),高3位保存運行狀態.相關方法是private static int runStateOf(int c){ return c& ~CAPACITY;}

  2、線程池內的有效線程數量(workerCount),ctl 是個Integer類型的數據,低29位保存;相關方法是 private static int workerCountOf(int c){ return c& ~CAPACITY;}

  3、控制狀態的方法是:private static int ctlOf(int rs,int wc){return rs|wc;}

private static final int COUNT_BITS = Integer.SIZE - 3;( Integer.SIZE=31

  count_bits=29,1<<29,也就是說workerCount最大值(2^29)-1(約5億)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

 容量=2^29-1。

線程池的5種狀態:

1、RUNNING:private static final int RUNNING = -1 << COUNT_BITS;高3位111

狀態說明:線程池處於Running狀態的時候,能夠接收新的任務,並且對已添加的任務進行處理,任務不能大於規定的最大任務數;

狀態切換:線程池的初始狀態是RUNNING,也就是說線程池一旦被創建就處於RUNNING狀態,並且線程池中的任務數量為0;

2、SHUTDOWN:private static final int SHUTDOWN = 0 << COUNT_BITS;高三位為000

狀態說明:線程池處於SHUTDOWN狀態的時候,不接收新的任務,但是可以處理已經添加的任務;

狀態切換:調用線程池的shutDown()方法的時候,線程狀態由RUNNING ---->>>>SHUTDOWN
3、 STOP :private static final int STOP = 1 << COUNT_BITS;高三位為001

狀態說明:線程池處於該狀態的時候,不接收新的任務,不處理已接收的任務,並且還會中斷正在處理的任務,中斷並不代表線程被殺死了,並且清空阻塞隊列。

狀態切換:線程池調用shutDownNow()接口的時候,線程池由RUNNING(SHUTDOWN)-------->>>STOP
4、TIDYING :private static final int TIDYING = 2 << COUNT_BITS;高三位為010

狀態說明:當所有的任務已經終止,ctl的值為0,線程池會變成TIDYING狀態,當線程池處於該狀態的時候會執行鈎子函數terminated().terminated()方法在ThreadPoolExecutor中是空的,用戶想在線程池變為TIDYING狀態的時候處理東西,可以通過重載terminated()方法實現。

狀態切換:當線程池處於SHUTDOWN狀態,並且阻塞隊列中的任務為0,就會SHUTDOWN----->>>TIDYING,當線程池處於STOP狀態下,線程池中執行任務數量為0,那么線程池狀態STOP----->>>TIDYING.
5、TERMINATED:private static final int TERMINATED = 3 << COUNT_BITS;高三位為011

狀態說明:線程池已經徹底終止,就會變成TERMINATED狀態

狀態切換:線程池處於TIDYING狀態,執行完terminated()方法以后,就會實現TIDYING----->>>TERMINATED

進入該狀態的條件如下:

1)線程池不是RUNNING狀態;

2)線程池不是TIDYING狀態或者TERMINATED狀態;

3)如果線程池狀態是SHUTDOWN,並且阻塞隊列中的任務數量為0;

4)workerCount=0;

5)設置TIDYING狀態成功;

 線程池狀態的切換如下圖所示:

 

線程池的默認實現-ThreadPoolExecutor

創建方法:

   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

 

任務的提交:

  public void execute():提交任務沒有返回值

public Future<?> submit():提交任務有返回值

參數解釋:

corePoolSize:核心線程數,當提交一個任務的時候,創建一個核心線程,直到達到核心線程的最大數量。

maximumPoolSize:線程池最大容量,即該線程池最大允許的線程數量,當當前的阻塞隊列滿的時候,還有任務繼續提交,則創建新的非核心線程繼續執行,但是線程總數不能大於最大容量maximumPoolSize。

keepAliveTime:空余線程存活時間,即當線程池的核心線程數達到最大的時候,沒有新的任務提交,那么非核心線程不會立即銷毀,而是等待,等待時間大於keepAliveTime才會進行銷毀。
unit:keepAliveTime的時間單位

BlockingQueue<Runnable> workQueue:阻塞隊列,當核心線程數達到最大值的時候,還有新任務提交則將這些線程放到該隊列中進行排隊等待,提交的任務必須實現Runnable接口。除了這個阻塞隊列以外,還有以下的阻塞隊列:

1、ArrayBlockingQueue:基於數組結構的有界隊列,遵循FIFO原則。
2、LinkedBlockingQueue:基於鏈表的有界隊列,遵循FIFO原則,吞吐量高於ArrayBlockingQueue。

3、SynchronousQueue:一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQuene;

4、priorityBlockingQuene:具有優先級的無界阻塞隊列;

threadFactory:用來創建線程,默認使用Executors.defaultThreadFactory() 來創建線程使用默認的ThreadFactory來創建線程時,會使新創建的線程具有相同的NORM_PRIORITY優先級並且是非守護線程,同時也設置了線程的名稱。

RejectedExecutionHandler handler:線程池的拒絕策略,線程池的拒絕策略有4種:

1、AbortPolicy:直接拋出異常,這是默認策略;

2、CallerRunsPolicy:用調用者所在的線程來執行任務:

3、DiscardOldestPolicy:拋棄阻塞隊列中最靠前的任務,並執行當前任務;

4、DiscardPolicy:直接丟棄任務

線程池監控:

 public int getPoolSize():獲取當前線程池的大小

public int getActiveCount():線程池中正在執行任務的線程數量

public int getLargestPoolSize():獲取線程池中出現出現過的最大線程數量

public long getTaskCount():獲取線程池已執行和未執行的線程總數

 public long getCompletedTaskCount():獲取已經完成的任務的數量

 
        
 線程池的工作原理圖如下:

 

 詳細的工作原理圖請參考線程池的執行過程圖。線程池中創建線程並執行用的方法是addWorker(Runnable firstTask, boolean core)方法。其中的firstTask用於指定新增線程執行的第一個任務,如果沒有任務執行可以為null;boolean core 主要是判斷當前線程池中活動的核心線程數是否達到最大,如果達到最大的話就創建非核心線程,該值為false,如果沒有達到最大核心線程數量,則為true,創建核心線程。

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
       //獲取線程池的運行狀態
int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary.當rs的值>=SHUTDOWN說明線程池不再接收新的任務了,然后判斷以下三個條件:
      1、rs==SHUTDOWN 表示此時是關閉狀態,不再接收新的任務,但是可以繼續處理阻塞隊列中的任務
      2、firstTask==null:firstTask為空
      3、!workQueue.isEmpty():阻塞隊列不為空
      以上三個條件有一個不滿足,則返回false,不創建新線程。
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c);//獲取工作線程數 if (wc >= CAPACITY || //判斷線程數是否大於初始容量,即ctl的低29位都為1 wc >= (core ? corePoolSize : maximumPoolSize))//根據core的值,為true,則將wc與核心線程數來比較,如果為false,就跟線程池最大容量相比 return false; //如果以上兩個條件滿足則說明線程池已經滿了,不能創建新線程,返回false if (compareAndIncrementWorkerCount(c))//嘗試增加workerCount,增加成功跳出此層for循環 break retry; c = ctl.get(); // Re-read ctl 增加workerCount失敗,重新獲取ctl值 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask);//根據firstTask創建worker final Thread t = w.thread;//每個worker創建一個線程 if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get());//獲取線程池的運行狀態 if (rs < SHUTDOWN || //狀態是RUNNING狀態 (rs == SHUTDOWN && firstTask == null)) {//或者狀態處於SHUTDOWN,且firstTask為空(SHUTDOWN不接收新任務但是任然執行隊列中的任務) if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w);//向線程池中添加線程,workers是一個HashSet int s = workers.size();//獲取線程池數量 if (s > largestPoolSize)//largestPoolSize記錄着線程池中出現過的最大線程數量,如果此時的線程數量大於之前的largestPoolSize,則重新賦值 largestPoolSize = s; workerAdded = true;//線程添加成功 } } finally { mainLock.unlock(); } if (workerAdded) { t.start();//運行線程 workerStarted = true;//線程運行成功 } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }

 Worker類

   worker類封裝了線程池中的每一個線程,線程池ThreadPool本質上維護的就是worker類,該類繼承了AQS,實現了Runnable接口。worker類包含屬性thread,firstTask,completedTasks三個屬性。

firstTask:保存傳入的任務

thread:執行任務創建的線程,在構造方法中通過 getThreadFactory().newThread(this)創建線程,傳入的是this,其實worker本身就實現了Runnable接口,所以worker本身就是一個線程,在線程啟動的時候就會調用worker類的run方法。Worker類繼承AQS,而不是ReetrantLo是因為AQS使用的是獨占鎖,是不可以重入的。lock一旦獲取到了獨占鎖,就表明線程在運行中,就不應該中斷。如果線程不是在獨占狀態,那么就說明該線程是空閑線程,可以進行中斷。線程池在執行shutdown方法或tryTerminate方法時會調用interruptIdleWorkers方法來中斷空閑的線interruptIdleWorker方法會使用tryLock方法來判斷線程池中的線程是否是空閑狀態;之所以設置為不可重入,是因為我們不希望任務在調用像setCorePoolSize這樣的線程池控制方法時重新獲取鎖。如果使用ReentrantLock,它是可重入的,這樣如果在任務中調用了如setCorePoolSize這類線程池控制的方法,會中斷正在運行的線程。
         final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker是因為AQS中默認的state是0,如果剛創建了一個Worker對象,還沒有執行任務時,這時就不應該被中斷
        
      this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
        }

在上述內容中我提到了,線程池中創建的線程主要是worker,線程的執行也是worker,worker在執行的時候調用的run方法,代碼里的run方法調用的就是runWorker()方法,下面我們就來解讀一下runWorker()方法:

 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts 將state設置為0
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {//任務不為空進入循環 getTask()從阻塞隊列中獲取任務
                w.lock();//線程運行上鎖
                /*如果線程正在停止,保證線程處於中斷狀態,如果不是的話保證當前線程不是中斷狀態;
           這里需要考慮執行If語句期間也執行了SHUTDOWNNOW方法,將狀態直接置為STOP,同時還會中斷線程池中的所有線程wt.interrupted()來判斷是否中斷,是為了確保在RUNNING和SHUTDOWN狀態的時候是處於非中斷狀態。
If pool is stopping, ensure thread is interrupted; if not, ensure thread is not interrupted. This requires a recheck in second case to deal with shutdownNow race while clearing interrupt
          */
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt();//復位中斷 try { beforeExecute(wt, task);//自己去實現,該類中此方法是空的 Throwable thrown = null; try { task.run();//運行任務 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++;//完成任務數量+1 w.unlock();//釋放鎖 } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }

 根據上述的代碼以及注釋,現在來總結一下,線程運行的runWorker()方法總體的運行流程如下:

1、while循環,加鎖,從阻塞隊列中獲取任務(getTask())

2、如果線程正在停止,保證當前線程處於中斷狀態,如果不是則保證當前線程不是中斷狀態

3、運行任務內容

4、任務運行完成,釋放鎖。

5、當獲取的任務為空跳出while循環,執行 processWorkerExit(w, completedAbruptly)方法。

 除了runWorker之外,上述還有提到getTask(),從阻塞隊列中獲取任務:

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?表示上次從阻塞隊列中取值的時候是否有超時

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.線程池是SHUTDOWN以上的狀態的時候,判斷線程池是否停止,或者阻塞隊列是否為空,如果都是的話那么workerCout減1
        並且返回空值。因為在線程池在SHUTDOWN以上的狀態的時候應該,不接收新的任務,也不允許往阻塞隊列當中添加新的任務。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; }        //獲取線程的數量 int wc = workerCountOf(c); // Are workers subject to culling?timed判斷是否需要進行超時控制。allowCoreThreadTimeOut參數是允許核心線程超時標識,默認是false.同時也判斷當前線程
數量是否是大於核心線程的數量的。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;        //當當前工作線程數量大於規定線程池最大容量,或者超時控制為真的時候,同時判斷當前線程數大於1或者阻塞隊列是空的兩個條件,當兩個條件中有一個為真,那么將workerCount-1,
c成功的話就返回null值,失敗的話就重試。該判斷比較重要,主要是當線程池中的線程數量處於大於corePoolSize,但是又小於maximumPoolSize的狀態下,獲取任務超時,說明阻塞隊列為空,也就說明
現在線程池不需要那么多線程來執行任務,需要把corePoolSize的線程銷毀,讓線程數量維持在corePoolSize即可。
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();//timed =true 那么就通過阻塞隊列的poll方法進行超時控制,否則的話就從隊列中獲取任務。 if (r != null) return r;//獲取任務成功,返回獲取的任務。 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }

在runWorker()方法中,getTask失敗,是會跳出while 循環,執行processWorkerExit()方法:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // 如果執行線程出現異常,那么workerCount-1
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;//completedTaskCount統計完成任務的線程數,同時移除已經執行完任務的worker
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {//線程的狀態是RUNNING或者SHUTDOWN進行以下判斷
            if (!completedAbruptly) {//如果線程非異常結束進行以下操作
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//如果允許核心線程超時,那么min=0,如果不允許那么min=核心線程的數量
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;//如果核心線程允許超時,min==0且阻塞隊列不為空的,min=1,即至少保證線程池中有1個worker
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);//如果worker異常結束那么就addWorker
        }
    }

綜上所述,線程池中線程的執行過程大致如下:

 

 也就是說,當線程池在執行Executors.newFixedThreadPool(n).execute(Runnable)的方法的時候,就進入到線程池ThreadPoolExecutor中去執行execute(Runnable)方法,該方法主要addWorker(),在執行addWorker的時候,worker類會創建線程getThreadFactory().newThread(this),創建好線程以后,線程會啟動,t.start()實際調用的就是worker類中的run()方法,該方法的實質是運行runWorker()方法,在執行該方法的時候就會從阻塞隊列中獲取任務,獲取任務成功以后執行線程,完成任務即可。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM