線程池工作原理,任務拒接策略有哪幾種


轉載請標明原文鏈接:

  http://www.cnblogs.com/dolphin0520/p/3932921.html

 

  java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,因此如果要透徹地了解Java中的線程池,必須先了解這個類。下面我們來看一下ThreadPoolExecutor類的具體實現源碼。

 

  在ThreadPoolExecutor類中提供了四個構造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public  class  ThreadPoolExecutor  extends  AbstractExecutorService {
     .....
     public  ThreadPoolExecutor( int  corePoolSize, int  maximumPoolSize, long  keepAliveTime,TimeUnit unit,
             BlockingQueue<Runnable> workQueue);
 
     public  ThreadPoolExecutor( int  corePoolSize, int  maximumPoolSize, long  keepAliveTime,TimeUnit unit,
             BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
     public  ThreadPoolExecutor( int  corePoolSize, int  maximumPoolSize, long  keepAliveTime,TimeUnit unit,
             BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
     public  ThreadPoolExecutor( int  corePoolSize, int  maximumPoolSize, long  keepAliveTime,TimeUnit unit,
         BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
     ...
}

   從上面的代碼可以得知,ThreadPoolExecutor繼承了AbstractExecutorService類,並提供了四個構造器,事實上,通過觀察每個構造器的源碼具體實現,發現前面三個構造器都是調用的第四個構造器進行的初始化工作。

   下面解釋下一下構造器中各個參數的含義:

  • corePoolSize:核心池的大小,這個參數跟后面講述的線程池的實現原理有非常大的關系。在創建了線程池后,默認情況下,線程池中並沒有任何線程,而是等待有任務到來才創建線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預創建線程的意思,即在沒有任務到來之前就創建corePoolSize個線程或者一個線程。默認情況下,在創建了線程池后,線程池中的線程數為0,當有任務來之后,就會創建一個線程去執行任務,當線程池中的線程數目達到corePoolSize后,就會把到達的任務放到緩存隊列當中;
  • maximumPoolSize:線程池最大線程數,這個參數也是一個非常重要的參數,它表示在線程池中最多能創建多少個線程;
  • keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。默認情況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大於corePoolSize,即當線程池中的線程數大於corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;
  • unit:參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:
復制代碼
TimeUnit.DAYS;               //天 TimeUnit.HOURS; //小時 TimeUnit.MINUTES; //分鍾 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //納秒
復制代碼
  • workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,一般來說,這里的阻塞隊列有以下幾種選擇:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;

  ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關。

  • threadFactory:線程工廠,主要用來創建線程;
  • handler:表示當拒絕處理任務時的策略,有以下四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。 
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。 
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務 

   具體參數的配置與線程池的關系將在下一節講述。

  從上面給出的ThreadPoolExecutor類的代碼可以知道,ThreadPoolExecutor繼承了AbstractExecutorService,我們來看一下AbstractExecutorService的實現:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public  abstract  class  AbstractExecutorService  implements  ExecutorService {
 
     
     protected  <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
     protected  <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
     public  Future<?> submit(Runnable task) {};
     public  <T> Future<T> submit(Runnable task, T result) { };
     public  <T> Future<T> submit(Callable<T> task) { };
     private  <T> T doInvokeAny(Collection<?  extends  Callable<T>> tasks,
                             boolean  timed,  long  nanos)
         throws  InterruptedException, ExecutionException, TimeoutException {
     };
     public  <T> T invokeAny(Collection<?  extends  Callable<T>> tasks)
         throws  InterruptedException, ExecutionException {
     };
     public  <T> T invokeAny(Collection<?  extends  Callable<T>> tasks,
                            long  timeout, TimeUnit unit)
         throws  InterruptedException, ExecutionException, TimeoutException {
     };
     public  <T> List<Future<T>> invokeAll(Collection<?  extends  Callable<T>> tasks)
         throws  InterruptedException {
     };
     public  <T> List<Future<T>> invokeAll(Collection<?  extends  Callable<T>> tasks,
                                          long  timeout, TimeUnit unit)
         throws  InterruptedException {
     };
}

   AbstractExecutorService是一個抽象類,它實現了ExecutorService接口。

  我們接着看ExecutorService接口的實現:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public  interface  ExecutorService  extends  Executor {
 
     void  shutdown();
     boolean  isShutdown();
     boolean  isTerminated();
     boolean  awaitTermination( long  timeout, TimeUnit unit)
         throws  InterruptedException;
     <T> Future<T> submit(Callable<T> task);
     <T> Future<T> submit(Runnable task, T result);
     Future<?> submit(Runnable task);
     <T> List<Future<T>> invokeAll(Collection<?  extends  Callable<T>> tasks)
         throws  InterruptedException;
     <T> List<Future<T>> invokeAll(Collection<?  extends  Callable<T>> tasks,
                                   long  timeout, TimeUnit unit)
         throws  InterruptedException;
 
     <T> T invokeAny(Collection<?  extends  Callable<T>> tasks)
         throws  InterruptedException, ExecutionException;
     <T> T invokeAny(Collection<?  extends  Callable<T>> tasks,
                     long  timeout, TimeUnit unit)
         throws  InterruptedException, ExecutionException, TimeoutException;
}

   而ExecutorService又是繼承了Executor接口,我們看一下Executor接口的實現:

1
2
3
public  interface  Executor {
     void  execute(Runnable command);
}

   到這里,大家應該明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor幾個之間的關系了。

  Executor是一個頂層接口,在它里面只聲明了一個方法execute(Runnable),返回值為void,參數為Runnable類型,從字面意思可以理解,就是用來執行傳進去的任務的;

  然后ExecutorService接口繼承了Executor接口,並聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

  抽象類AbstractExecutorService實現了ExecutorService接口,基本實現了ExecutorService中聲明的所有方法;

  然后ThreadPoolExecutor繼承了類AbstractExecutorService。

  在ThreadPoolExecutor類中有幾個非常重要的方法:

1
2
3
4
execute()
submit()
shutdown()
shutdownNow()

   execute()方法實際上是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向線程池提交一個任務,交由線程池去執行。

  submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中並沒有對其進行重寫,這個方法也是用來向線程池提交任務的,但是它和execute()方法不同,它能夠返回任務執行的結果,去看submit()方法的實現,會發現它實際上還是調用的execute()方法,只不過它利用了Future來獲取任務執行結果(Future相關內容將在下一篇講述)。

  shutdown()和shutdownNow()是用來關閉線程池的。

  還有很多其他的方法:

  比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關屬性的方法,有興趣的朋友可以自行查閱API。

  在上一節我們從宏觀上介紹了ThreadPoolExecutor,下面我們來深入解析一下線程池的具體實現原理,將從下面幾個方面講解:

 

  1.線程池狀態

  2.任務的執行

  3.線程池中的線程初始化

  4.任務緩存隊列及排隊策略

  5.任務拒絕策略

  6.線程池的關閉

  7.線程池容量的動態調整

 

1.線程池狀態

  在ThreadPoolExecutor中定義了一個volatile變量,另外定義了幾個static final變量表示線程池的各個狀態:

1
2
3
4
5
volatile  int  runState;
static  final  int  RUNNING    =  0 ;
static  final  int  SHUTDOWN   =  1 ;
static  final  int  STOP       =  2 ;
static  final  int  TERMINATED =  3 ;

   runState表示當前線程池的狀態,它是一個volatile變量用來保證線程之間的可見性;

  下面的幾個static final變量表示runState可能的幾個取值。

  當創建線程池后,初始時,線程池處於RUNNING狀態;

  如果調用了shutdown()方法,則線程池處於SHUTDOWN狀態,此時線程池不能夠接受新的任務,它會等待所有任務執行完畢;

  如果調用了shutdownNow()方法,則線程池處於STOP狀態,此時線程池不能接受新的任務,並且會去嘗試終止正在執行的任務;

  當線程池處於SHUTDOWN或STOP狀態,並且所有工作線程已經銷毀,任務緩存隊列已經清空或執行結束后,線程池被設置為TERMINATED狀態。

2.任務的執行

  在了解將任務提交給線程池到任務執行完畢整個過程之前,我們先來看一下ThreadPoolExecutor類中其他的一些比較重要成員變量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private  final  BlockingQueue<Runnable> workQueue;               //任務緩存隊列,用來存放等待執行的任務
private  final  ReentrantLock mainLock =  new  ReentrantLock();    //線程池的主要狀態鎖,對線程池狀態(比如線程池大小
                                                               //、runState等)的改變都要使用這個鎖
private  final  HashSet<Worker> workers =  new  HashSet<Worker>();   //用來存放工作集
 
private  volatile  long   keepAliveTime;     //線程存貨時間   
private  volatile  boolean  allowCoreThreadTimeOut;    //是否允許為核心線程設置存活時間
private  volatile  int    corePoolSize;      //核心池的大小(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列)
private  volatile  int    maximumPoolSize;    //線程池最大能容忍的線程數
 
private  volatile  int    poolSize;        //線程池中當前的線程數
 
private  volatile  RejectedExecutionHandler handler;  //任務拒絕策略
 
private  volatile  ThreadFactory threadFactory;    //線程工廠,用來創建線程
 
private  int  largestPoolSize;    //用來記錄線程池中曾經出現過的最大線程數
 
private  long  completedTaskCount;    //用來記錄已經執行完畢的任務個數

   每個變量的作用都已經標明出來了,這里要重點解釋一下corePoolSize、maximumPoolSize、largestPoolSize三個變量。

  corePoolSize在很多地方被翻譯成核心池大小,其實我的理解這個就是線程池的大小。舉個簡單的例子:

  假如有一個工廠,工廠里面有10個工人,每個工人同時只能做一件任務。

  因此只要當10個工人中有工人是空閑的,來了任務就分配給空閑的工人做;

  當10個工人都有任務在做時,如果還來了任務,就把任務進行排隊等待;

  如果說新任務數目增長的速度遠遠大於工人做任務的速度,那么此時工廠主管可能會想補救措施,比如重新招4個臨時工人進來;

  然后就將任務也分配給這4個臨時工人做;

  如果說着14個工人做任務的速度還是不夠,此時工廠主管可能就要考慮不再接收新的任務或者拋棄前面的一些任務了。

  當這14個工人當中有人空閑時,而新任務增長的速度又比較緩慢,工廠主管可能就考慮辭掉4個臨時工了,只保持原來的10個工人,畢竟請額外的工人是要花錢的。

 

  這個例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。

  也就是說corePoolSize就是線程池大小,maximumPoolSize在我看來是線程池的一種補救措施,即任務量突然過大時的一種補救措施。

  不過為了方便理解,在本文后面還是將corePoolSize翻譯成核心池大小。

  largestPoolSize只是一個用來起記錄作用的變量,用來記錄線程池中曾經有過的最大線程數目,跟線程池的容量沒有任何關系。

 

  下面我們進入正題,看一下任務從提交到最終執行完畢經歷了哪些過程。

  在ThreadPoolExecutor類中,最核心的任務提交方法是execute()方法,雖然通過submit也可以提交任務,但是實際上submit方法里面最終調用的還是execute()方法,所以我們只需要研究execute()方法的實現原理即可:

1
2
3
4
5
6
7
8
9
10
11
12
public  void  execute(Runnable command) {
     if  (command ==  null )
         throw  new  NullPointerException();
     if  (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
         if  (runState == RUNNING && workQueue.offer(command)) {
             if  (runState != RUNNING || poolSize ==  0 )
                 ensureQueuedTaskHandled(command);
         }
         else  if  (!addIfUnderMaximumPoolSize(command))
             reject(command);  // is shutdown or saturated
     }
}

   上面的代碼可能看起來不是那么容易理解,下面我們一句一句解釋:

  首先,判斷提交的任務command是否為null,若是null,則拋出空指針異常;

  接着是這句,這句要好好理解一下:

1
if  (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))

   由於是或條件運算符,所以先計算前半部分的值,如果線程池中當前線程數不小於核心池大小,那么就會直接進入下面的if語句塊了。

  如果線程池中當前線程數小於核心池大小,則接着執行后半部分,也就是執行

1
addIfUnderCorePoolSize(command)

  如果執行完addIfUnderCorePoolSize這個方法返回false,則繼續執行下面的if語句塊,否則整個方法就直接執行完畢了。

  如果執行完addIfUnderCorePoolSize這個方法返回false,然后接着判斷:

1
if  (runState == RUNNING && workQueue.offer(command))

   如果當前線程池處於RUNNING狀態,則將任務放入任務緩存隊列;如果當前線程池不處於RUNNING狀態或者任務放入緩存隊列失敗,則執行:

1
addIfUnderMaximumPoolSize(command)

  如果執行addIfUnderMaximumPoolSize方法失敗,則執行reject()方法進行任務拒絕處理。

  回到前面:

1
if  (runState == RUNNING && workQueue.offer(command))

   這句的執行,如果說當前線程池處於RUNNING狀態且將任務放入任務緩存隊列成功,則繼續進行判斷:

1
if  (runState != RUNNING || poolSize ==  0 )

   這句判斷是為了防止在將此任務添加進任務緩存隊列的同時其他線程突然調用shutdown或者shutdownNow方法關閉了線程池的一種應急措施。如果是這樣就執行:

1
ensureQueuedTaskHandled(command)

   進行應急處理,從名字可以看出是保證 添加到任務緩存隊列中的任務得到處理。

  我們接着看2個關鍵方法的實現:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private  boolean  addIfUnderCorePoolSize(Runnable firstTask) {
     Thread t =  null ;
     final  ReentrantLock mainLock =  this .mainLock;
     mainLock.lock();
     try  {
         if  (poolSize < corePoolSize && runState == RUNNING)
             t = addThread(firstTask);         //創建線程去執行firstTask任務   
         finally  {
         mainLock.unlock();
     }
     if  (t ==  null )
         return  false ;
     t.start();
     return  true ;
}

   這個是addIfUnderCorePoolSize方法的具體實現,從名字可以看出它的意圖就是當低於核心吃大小時執行的方法。下面看其具體實現,首先獲取到鎖,因為這地方涉及到線程池狀態的變化,先通過if語句判斷當前線程池中的線程數目是否小於核心池大小,有朋友也許會有疑問:前面在execute()方法中不是已經判斷過了嗎,只有線程池當前線程數目小於核心池大小才會執行addIfUnderCorePoolSize方法的,為何這地方還要繼續判斷?原因很簡單,前面的判斷過程中並沒有加鎖,因此可能在execute方法判斷的時候poolSize小於corePoolSize,而判斷完之后,在其他線程中又向線程池提交了任務,就可能導致poolSize不小於corePoolSize了,所以需要在這個地方繼續判斷。然后接着判斷線程池的狀態是否為RUNNING,原因也很簡單,因為有可能在其他線程中調用了shutdown或者shutdownNow方法。然后就是執行

1
t = addThread(firstTask);

   這個方法也非常關鍵,傳進去的參數為提交的任務,返回值為Thread類型。然后接着在下面判斷t是否為空,為空則表明創建線程失敗(即poolSize>=corePoolSize或者runState不等於RUNNING),否則調用t.start()方法啟動線程。

  我們來看一下addThread方法的實現:

1
2
3
4
5
6
7
8
9
10
11
12
private  Thread addThread(Runnable firstTask) {
     Worker w =  new  Worker(firstTask);
     Thread t = threadFactory.newThread(w);   //創建一個線程,執行任務   
     if  (t !=  null ) {
         w.thread = t;             //將創建的線程的引用賦值為w的成員變量       
         workers.add(w);
         int  nt = ++poolSize;      //當前線程數加1       
         if  (nt > largestPoolSize)
             largestPoolSize = nt;
     }
     return  t;
}

   在addThread方法中,首先用提交的任務創建了一個Worker對象,然后調用線程工廠threadFactory創建了一個新的線程t,然后將線程t的引用賦值給了Worker對象的成員變量thread,接着通過workers.add(w)將Worker對象添加到工作集當中。

  下面我們看一下Worker類的實現:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
private  final  class  Worker  implements  Runnable {
     private  final  ReentrantLock runLock =  new  ReentrantLock();
     private  Runnable firstTask;
     volatile  long  completedTasks;
     Thread thread;
     Worker(Runnable firstTask) {
         this .firstTask = firstTask;
     }
     boolean  isActive() {
         return  runLock.isLocked();
     }
     void  interruptIfIdle() {
         final  ReentrantLock runLock =  this .runLock;
         if  (runLock.tryLock()) {
             try  {
         if  (thread != Thread.currentThread())
         thread.interrupt();
             finally  {
                 runLock.unlock();
             }
         }
     }
     void  interruptNow() {
         thread.interrupt();
     }
 
     private  void  runTask(Runnable task) {
         final  ReentrantLock runLock =  this .runLock;
         runLock.lock();
         try  {
             if  (runState < STOP &&
                 Thread.interrupted() &&
                 runState >= STOP)
             boolean  ran =  false ;
             beforeExecute(thread, task);    //beforeExecute方法是ThreadPoolExecutor類的一個方法,沒有具體實現,用戶可以根據
             //自己需要重載這個方法和后面的afterExecute方法來進行一些統計信息,比如某個任務的執行時間等           
             try  {
                 task.run();
                 ran =  true ;
                 afterExecute(task,  null );
                 ++completedTasks;
             catch  (RuntimeException ex) {
                 if  (!ran)
                     afterExecute(task, ex);
                 throw  ex;
             }
         finally  {
             runLock.unlock();
         }
     }
 
     public  void  run() {
         try  {
             Runnable task = firstTask;
             firstTask =  null ;
             while  (task !=  null  || (task = getTask()) !=  null ) {
                 runTask(task);
                 task =  null ;
             }
         finally  {
             workerDone( this );    //當任務隊列中沒有任務時,進行清理工作       
         }
     }
}

   它實際上實現了Runnable接口,因此上面的Thread t = threadFactory.newThread(w);效果跟下面這句的效果基本一樣:

1
Thread t =  new  Thread(w);

   相當於傳進去了一個Runnable任務,在線程t中執行這個Runnable。

  既然Worker實現了Runnable接口,那么自然最核心的方法便是run()方法了:

1
2
3
4
5
6
7
8
9
10
11
12
public  void  run() {
     try  {
         Runnable task = firstTask;
         firstTask =  null ;
         while  (task !=  null  || (task = getTask()) !=  null ) {
             runTask(task);
             task =  null ;
         }
     finally  {
         workerDone( this );
     }
}

   從run方法的實現可以看出,它首先執行的是通過構造器傳進來的任務firstTask,在調用runTask()執行完firstTask之后,在while循環里面不斷通過getTask()去取新的任務來執行,那么去哪里取呢?自然是從任務緩存隊列里面去取,getTask是ThreadPoolExecutor類中的方法,並不是Worker類中的方法,下面是getTask方法的實現:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Runnable getTask() {
     for  (;;) {
         try  {
             int  state = runState;
             if  (state > SHUTDOWN)
                 return  null ;
             Runnable r;
             if  (state == SHUTDOWN)   // Help drain queue
                 r = workQueue.poll();
             else  if  (poolSize > corePoolSize || allowCoreThreadTimeOut)  //如果線程數大於核心池大小或者允許為核心池線程設置空閑時間,
                 //則通過poll取任務,若等待一定的時間取不到任務,則返回null
                 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
             else
                 r = workQueue.take();
             if  (r !=  null )
                 return  r;
             if  (workerCanExit()) {     //如果沒取到任務,即r為null,則判斷當前的worker是否可以退出
                 if  (runState >= SHUTDOWN)  // Wake up others
                     interruptIdleWorkers();    //中斷處於空閑狀態的worker
                 return  null ;
             }
             // Else retry
         catch  (InterruptedException ie) {
             // On interruption, re-check runState
         }
     }
}

   在getTask中,先判斷當前線程池狀態,如果runState大於SHUTDOWN(即為STOP或者TERMINATED),則直接返回null。

  如果runState為SHUTDOWN或者RUNNING,則從任務緩存隊列取任務。

  如果當前線程池的線程數大於核心池大小corePoolSize或者允許為核心池中的線程設置空閑存活時間,則調用poll(time,timeUnit)來取任務,這個方法會等待一定的時間,如果取不到任務就返回null。

  然后判斷取到的任務r是否為null,為null則通過調用workerCanExit()方法來判斷當前worker是否可以退出,我們看一下workerCanExit()的實現:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private  boolean  workerCanExit() {
     final  ReentrantLock mainLock =  this .mainLock;
     mainLock.lock();
     boolean  canExit;
     //如果runState大於等於STOP,或者任務緩存隊列為空了
     //或者  允許為核心池線程設置空閑存活時間並且線程池中的線程數目大於1
     try  {
         canExit = runState >= STOP ||
             workQueue.isEmpty() ||
             (allowCoreThreadTimeOut &&
              poolSize > Math.max( 1 , corePoolSize));
     finally  {
         mainLock.unlock();
     }
     return  canExit;
}

   也就是說如果線程池處於STOP狀態、或者任務隊列已為空或者允許為核心池線程設置空閑存活時間並且線程數大於1時,允許worker退出。如果允許worker退出,則調用interruptIdleWorkers()中斷處於空閑狀態的worker,我們看一下interruptIdleWorkers()的實現:

1
2
3
4
5
6
7
8
9
10
void  interruptIdleWorkers() {
     final  ReentrantLock mainLock =  this .mainLock;
     mainLock.lock();
     try  {
         for  (Worker w : workers)   //實際上調用的是worker的interruptIfIdle()方法
             w.interruptIfIdle();
     finally  {
         mainLock.unlock();
     }
}

   從實現可以看出,它實際上調用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中:

1
2
3
4
5
6
7
8
9
10
11
12
void  interruptIfIdle() {
     final  ReentrantLock runLock =  this .runLock;
     if  (runLock.tryLock()) {     //注意這里,是調用tryLock()來獲取鎖的,因為如果當前worker正在執行任務,鎖已經被獲取了,是無法獲取到鎖的
                                 //如果成功獲取了鎖,說明當前worker處於空閑狀態
         try  {
     if  (thread != Thread.currentThread())  
     thread.interrupt();
         finally  {
             runLock.unlock();
         }
     }
}

    這里有一個非常巧妙的設計方式,假如我們來設計線程池,可能會有一個任務分派線程,當發現有線程空閑時,就從任務緩存隊列中取一個任務交給空閑線程執行。但是在這里,並沒有采用這樣的方式,因為這樣會要額外地對任務分派線程進行管理,無形地會增加難度和復雜度,這里直接讓執行完任務的線程去任務緩存隊列里面取任務來執行。

   我們再看addIfUnderMaximumPoolSize方法的實現,這個方法的實現思想和addIfUnderCorePoolSize方法的實現思想非常相似,唯一的區別在於addIfUnderMaximumPoolSize方法是在線程池中的線程數達到了核心池大小並且往任務隊列中添加任務失敗的情況下執行的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private  boolean  addIfUnderMaximumPoolSize(Runnable firstTask) {
     Thread t =  null ;
     final  ReentrantLock mainLock =  this .mainLock;
     mainLock.lock();
     try  {
         if  (poolSize < maximumPoolSize && runState == RUNNING)
             t = addThread(firstTask);
     finally  {
         mainLock.unlock();
     }
     if  (t ==  null )
         return  false ;
     t.start();
     return  true ;
}

   看到沒有,其實它和addIfUnderCorePoolSize方法的實現基本一模一樣,只是if語句判斷條件中的poolSize < maximumPoolSize不同而已。

  到這里,大部分朋友應該對任務提交給線程池之后到被執行的整個過程有了一個基本的了解,下面總結一下:

  1)首先,要清楚corePoolSize和maximumPoolSize的含義;

  2)其次,要知道Worker是用來起到什么作用的;

  3)要知道任務提交給線程池之后的處理策略,這里總結一下主要有4點:

  • 如果當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;
  • 如果當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程將其取出去執行;若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務;
  • 如果當前線程池中的線程數目達到maximumPoolSize,則會采取任務拒絕策略進行處理;
  • 如果線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;如果允許為核心池中的線程設置存活時間,那么核心池中的線程空閑時間超過keepAliveTime,線程也會被終止。

3.線程池中的線程初始化

  默認情況下,創建線程池之后,線程池中是沒有線程的,需要提交任務之后才會創建線程。

  在實際中如果需要線程池創建之后立即創建線程,可以通過以下兩個方法辦到:

  • prestartCoreThread():初始化一個核心線程;
  • prestartAllCoreThreads():初始化所有核心線程

  下面是這2個方法的實現:

1
2
3
4
5
6
7
8
9
10
public  boolean  prestartCoreThread() {
     return  addIfUnderCorePoolSize( null );  //注意傳進去的參數是null
}
 
public  int  prestartAllCoreThreads() {
     int  n =  0 ;
     while  (addIfUnderCorePoolSize( null )) //注意傳進去的參數是null
         ++n;
     return  n;
}

   注意上面傳進去的參數是null,根據第2小節的分析可知如果傳進去的參數為null,則最后執行線程會阻塞在getTask方法中的

1
r = workQueue.take();

   即等待任務隊列中有任務。

4.任務緩存隊列及排隊策略

  在前面我們多次提到了任務緩存隊列,即workQueue,它用來存放等待執行的任務。

  workQueue的類型為BlockingQueue<Runnable>,通常可以取下面三種類型:

  1)ArrayBlockingQueue:基於數組的先進先出隊列,此隊列創建時必須指定大小;

  2)LinkedBlockingQueue:基於鏈表的先進先出隊列,如果創建時沒有指定此隊列大小,則默認為Integer.MAX_VALUE;

  3)synchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。

5.任務拒絕策略

  當線程池的任務緩存隊列已滿並且線程池中的線程數目達到maximumPoolSize,如果還有任務到來就會采取任務拒絕策略,通常有以下四種策略:

1
2
3
4
ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

6.線程池的關閉

  ThreadPoolExecutor提供了兩個方法,用於線程池的關閉,分別是shutdown()和shutdownNow(),其中:

  • shutdown():不會立即終止線程池,而是要等所有任務緩存隊列中的任務都執行完后才終止,但再也不會接受新的任務
  • shutdownNow():立即終止線程池,並嘗試打斷正在執行的任務,並且清空任務緩存隊列,返回尚未執行的任務

7.線程池容量的動態調整

  ThreadPoolExecutor提供了動態調整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

  • setCorePoolSize:設置核心池大小
  • setMaximumPoolSize:設置線程池最大能創建的線程數目大小

  當上述參數從小變大時,ThreadPoolExecutor進行線程賦值,還可能立即創建新的線程來執行任務。

  前面我們討論了關於線程池的實現原理,這一節我們來看一下它的具體使用:

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public  class  Test {
      public  static  void  main(String[] args) {   
          ThreadPoolExecutor executor =  new  ThreadPoolExecutor( 5 10 200 , TimeUnit.MILLISECONDS,
                  new  ArrayBlockingQueue<Runnable>( 5 ));
          
          for ( int  i= 0 ;i< 15 ;i++){
              MyTask myTask =  new  MyTask(i);
              executor.execute(myTask);
              System.out.println( "線程池中線程數目:" +executor.getPoolSize()+ ",隊列中等待執行的任務數目:" +
              executor.getQueue().size()+ ",已執行玩別的任務數目:" +executor.getCompletedTaskCount());
          }
          executor.shutdown();
      }
}
 
 
class  MyTask  implements  Runnable {
     private  int  taskNum;
     
     public  MyTask( int  num) {
         this .taskNum = num;
     }
     
     @Override
     public  void  run() {
         System.out.println( "正在執行task " +taskNum);
         try  {
             Thread.currentThread().sleep( 4000 );
         catch  (InterruptedException e) {
             e.printStackTrace();
         }
         System.out.println( "task " +taskNum+ "執行完畢" );
     }
}

   執行結果:

復制代碼
正在執行task 0
線程池中線程數目:1,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0 線程池中線程數目:2,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0 正在執行task 1 線程池中線程數目:3,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0 正在執行task 2 線程池中線程數目:4,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0 正在執行task 3 線程池中線程數目:5,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0 正在執行task 4 線程池中線程數目:5,隊列中等待執行的任務數目:1,已執行玩別的任務數目:0 線程池中線程數目:5,隊列中等待執行的任務數目:2,已執行玩別的任務數目:0 線程池中線程數目:5,隊列中等待執行的任務數目:3,已執行玩別的任務數目:0 線程池中線程數目:5,隊列中等待執行的任務數目:4,已執行玩別的任務數目:0 線程池中線程數目:5,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0 線程池中線程數目:6,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0 正在執行task 10 線程池中線程數目:7,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0 正在執行task 11 線程池中線程數目:8,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0 正在執行task 12 線程池中線程數目:9,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0 正在執行task 13 線程池中線程數目:10,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0 正在執行task 14 task 3執行完畢 task 0執行完畢 task 2執行完畢 task 1執行完畢 正在執行task 8 正在執行task 7 正在執行task 6 正在執行task 5 task 4執行完畢 task 10執行完畢 task 11執行完畢 task 13執行完畢 task 12執行完畢 正在執行task 9 task 14執行完畢 task 8執行完畢 task 5執行完畢 task 7執行完畢 task 6執行完畢 task 9執行完畢
復制代碼

  從執行結果可以看出,當線程池中線程的數目大於5時,便將任務放入任務緩存隊列里面,當任務緩存隊列滿了之后,便創建新的線程。如果上面程序中,將for循環中改成執行20個任務,就會拋出任務拒絕異常了。

  不過在java doc中,並不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態方法來創建線程池:

1
2
3
Executors.newCachedThreadPool();         //創建一個緩沖池,緩沖池容量大小為Integer.MAX_VALUE
Executors.newSingleThreadExecutor();    //創建容量為1的緩沖池
Executors.newFixedThreadPool( int );     //創建固定容量大小的緩沖池

   下面是這三個靜態方法的具體實現;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public  static  ExecutorService newFixedThreadPool( int  nThreads) {
     return  new  ThreadPoolExecutor(nThreads, nThreads,
                                   0L, TimeUnit.MILLISECONDS,
                                   new  LinkedBlockingQueue<Runnable>());
}
public  static  ExecutorService newSingleThreadExecutor() {
     return  new  FinalizableDelegatedExecutorService
         ( new  ThreadPoolExecutor( 1 1 ,
                                 0L, TimeUnit.MILLISECONDS,
                                 new  LinkedBlockingQueue<Runnable>()));
}
public  static  ExecutorService newCachedThreadPool() {
     return  new  ThreadPoolExecutor( 0 , Integer.MAX_VALUE,
                                   60L, TimeUnit.SECONDS,
                                   new  SynchronousQueue<Runnable>());
}

  從它們的具體實現來看,它們實際上也是調用了ThreadPoolExecutor,只不過參數都已配置好了。

  newFixedThreadPool創建的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

  newSingleThreadExecutor將corePoolSize和maximumPoolSize都設置為1,也使用的LinkedBlockingQueue;

  newCachedThreadPool將corePoolSize設置為0,將maximumPoolSize設置為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就創建線程運行,當線程空閑超過60秒,就銷毀線程。

  實際中,如果Executors提供的三個靜態方法能滿足要求,就盡量使用它提供的三個方法,因為自己去手動配置ThreadPoolExecutor的參數有點麻煩,要根據實際任務的類型和數量來進行配置。

  另外,如果ThreadPoolExecutor達不到要求,可以自己繼承ThreadPoolExecutor類進行重寫。

  本節來討論一個比較重要的話題:如何合理配置線程池大小,僅供參考。

 

  一般需要根據任務的類型來配置線程池大小:

  如果是CPU密集型任務,就需要盡量壓榨CPU,參考值可以設為 NCPU+1

  如果是IO密集型任務,參考值可以設置為2*NCPU

  當然,這只是一個參考值,具體的設置還需要根據實際情況進行調整,比如可以先將線程池大小設置為參考值,再觀察任務運行情況和系統負載、資源利用率來進行適當調整。

  參考資料:

  http://ifeve.com/java-threadpool/

  http://blog.163.com/among_1985/blog/static/275005232012618849266/

  http://developer.51cto.com/art/201203/321885.htm

  http://blog.csdn.net/java2000_wl/article/details/22097059

  http://blog.csdn.net/cutesource/article/details/6061229

  http://blog.csdn.net/xieyuooo/article/details/8718741


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM