四種線程池詳細解析


為什么會有線程池??

  之前的時候,我們每使用一次線程就去創建一個線程,這樣雖然實現起來非常簡便,但是會有一個問題,如果並發的線程數量很多,並且每個線程都是執行一個時間很短的任務就結束了,這樣頻繁創建線程就會大大降低系統的效率,因為頻繁創建線程和銷毀線程需要時間,而且還容易造成線程池溢出。於是人們想到了使用線程池的方法,這樣使得線程就可以被復用,就是說當一個線程執行完一個任務后,並不會被銷毀,而是放到線程池中,這樣再次需要使用線程時,就可以直接從線程池中拿取一個線程使用,從而降低資源消耗,提高利用效率。

  • 線程的創建和銷毀是需要時間的。加入一個服務完成需要的時間:T1表示創建線程時間,T2表示線程執行任務的耗時,T3表示線程消耗的時間,如果:T1+T3遠大於T2,得不償失。
  • 線程需要占用內存資源,大量線程的創建會占用寶貴內存資源,可以導致OOM(Out Of Memory)異常。
  • 大量的線程回收時也會給GC操作帶來很大的壓力,延長GC的停頓時間。
  • 線程搶占CPU資源,CPU不停的在各個線程中進行上下文切換,上下文的切換也是耗時的過程。

線程池溢出:頻繁創建多線程,非常占用CPU內存,一個線程占1MB內存。

線程池的目的:

  1.通過重復利用線程池中已創建的代碼,來降低創建和銷毀線程時的開銷,從而降低資源消耗,提高利用效率。

  2.提高響應速度。當任務到達時,任務不需要來等待線程創建就能立即執行,因為線程池中有已創建好的線程可以直接供使用。

  3.提高線程的可管理性。線程池時稀缺資源,如果無限制的創建,當線程數到達一定數量時,不僅會消耗系統資源,還會降低系統的穩定性,所以使用線程池,就可以進行統一分配,調優和監控。

  4. 提供定時執行、定期執行、單線程、並發數控制等功能。

 線程池的繼承:

  Java里面線程池的頂級接口是Executor,但是嚴格意義上講Executor並不是一個線程池,而只是一個執行線程的工具。真正的線程池接口是ExecutorService,ExecutorService接口繼承了Excutor接口,提供了一些對線程池操作的擴展方法:shoudown,submit,是真正的線程池接口。Executors是線程池的靜態工廠,其提供了快捷創建線程池的靜態方法。AbstractExecutorService抽象類實現了ExecutorService接口提供的大部分方法

 

比較重要的幾個類:

ExecutorService接口

真正的線程池接口。

ScheduledExecutorService接口

能和Timer/TimerTask類似,解決那些需要任務重復執行的問題。

ThreadPoolExecutor類

ExecutorService的默認實現。

 

ScheduledThreadPoolExecutor類

繼承ThreadPoolExecutor和實現ScheduledExecutorService接口,周期性任務調度的類實現。

 

 

Executors慎用:
在《阿里巴巴Java開發手冊》中有一條
【強制】線程池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。
Executors 返回的線程池對象的弊端如下:
FixedThreadPool 和 SingleThreadPool : 允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。
CachedThreadPool 和 ScheduledThreadPool : 允許的創建線程數量為 Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM。

 

要配置一個線程池是比較復雜的,尤其是對於線程池的原理不是很清楚的情況下,很有可能配置的線程池不是較優的,因此在Executors類里面提供了一些靜態工廠,生成一些常用的線程池。

線程池的四種創建方式:

Java通過Executors(java1.5並發包中的類)來創建四種線程池,分別為:

newCachedThreadPool創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則創建線程。

newFixedThreadPool創建一個定長線程池,可控制線程池最大並發數,超出的線程會在隊列中等待。

newScheduledThreadPool創建一個 定時線程池,支持定時以及周期性任務執行。

newSingleThreadExecutor創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序執行。(FIFO,LIFO,優先級)

 

無論創建哪種線程池在其底層都 必須要調用ThreadPoolExecutor:

  •  corePoolSize: 核心線程數---線程池中實際運行的線程數
  •  maximumPoolSize:最大線程數---線程池創建線程的最大數量 。值得注意的是,如果使用了無界的任務隊列這個參數就沒什么效果。
  •  keepAliveTime: 線程池維護線程所允許的空閑時間 
  •  unit: 線程池維護線程所允許的空閑時間的單位 。可選的單位有天(DAYS)、小時(HOURS)、分鍾(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS,千分之一微秒)。
  •  workQueue: 線程池所使用的緩沖隊列 
  •  handler: 線程池對拒絕任務的處理策略。當隊列和線程池都滿了,說明線程池處於飽和狀態,那么必須采取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy,表示無法處理新任務時拋出異常。在JDK 1.5中Java線程池框架提供了以下4種策略。
    ·AbortPolicy:直接拋出異常。
    ·CallerRunsPolicy:只用調用者所在線程來運行任務。
    ·DiscardOldestPolicy:丟棄隊列里最近的一個任務,並執行當前任務。
    ·DiscardPolicy:不處理,丟棄掉

 

線程池原理:

提交一個任務到線程池中,線程池的處理流程如下:

1.判斷線程池里的核心線程數是否都在執行任務,如果不是(核心線程空閑或者還有核心線程沒有被創建),則創建一個新的工作線程來執行任務。如果核心線程數都在執行任務,則進入下一個流程。

2.線程池判斷工作隊列是否已滿,如果工作隊列沒有滿,則將新提交的任務存儲在這個工作隊列里。如果工作隊列滿了,則進入下一個流程。

3.判斷線程池里的線程是否都處於工作狀態,如果沒有,則創建一個新的工作流程來執行任務。如果已經滿了,則交給飽和策略來處理這個任務。

 

   如果當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;如果當前線程池中的線程數目大於等於corePoolSize,則每來一個任務,就會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程將其取出去執行;若添加失敗(一般情況是因為任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務,如果此時線程池中總線程數小於maximumPoolSize,則會創建新的線程,如果當前線程池中的線程數目大於等於maximumPoolSize,則會采取任務拒絕策略進行處理。

   注意:線程數目統計的是正在在執行任務的線程,而execute提交的只是任務,不一定每次提交都會創建出線程來。

1:newCachedThreadPool創建一個可緩存線程池。

  ♦CachedThreadPool的corePoolSize被設置為0,即corePool為空;

  ♦maximumPoolSize被設置為Integer.MAX_VALUE,即maximum是無界的。

  ♦這里keepAliveTime設置為60秒,意味着空閑的線程最多可以等待任務60秒,否則將被回收。

  ♦CachedThreadPool使用沒有容量的SynchronousQueue作為主線程池的工作隊列,它是一個沒有容量的阻塞隊列。每個插入操作必須等待另一個線程的對應移除操作。這意味着,如果主線程提交任務的速度高於線程池中處理任務的速度時,CachedThreadPool會不斷創建新線程。極端情況下,CachedThreadPool會因為創建過多線程而耗盡CPU資源。

執行過程如下:

  1.首先執行SynchronousQueue.offer(Runnable task)。如果在當前的線程池中有空閑的線程正在執行SynchronousQueue.poll(),那么主線程執行的offer()插入操作與空閑線程執行的poll()彈出操作配對成功,主線程把任務交給空閑線程執行,則execute()方法執行成功,否則執行步驟2

  2.當線程池為空(初始maximumPool為空)或沒有空閑線程時,將沒有線程執行SynchronousQueue.poll操作,則配對失敗。這種情況下,線程池會創建一個新的線程執行任務。

  3.在創建完新的線程以后,將會執行poll操作,與執行SynchronousQueue.offer(Runnable task)的任務相匹配。當步驟2的線程執行完成后,將等待60秒,如果此時主線程提交了一個新任務,那么這個空閑線程將執行新任務,否則被回收。因此長時間不提交任務的CachedThreadPool不會占用系統資源。

  SynchronousQueue是一個不存儲元素阻塞隊列,每次要進行offer插入操作時必須等待poll彈出操作,否則不能繼續添加元素。

  【代碼演示】:耗時短,不需要考慮同步的情況。

 1 import java.util.concurrent.ExecutorService;
 2 import java.util.concurrent.Executors;
 3 public class CachedThreadDemo {
 4     public static void main(String[] args) {
 5         ExecutorService cachedthreadpool=Executors.newCachedThreadPool();
 6         for (int i = 0; i <10; i++) {
 7             cachedthreadpool.execute(new Runnable() {
 8                 @Override
 9                 public void run() {
10                     System.out.println(Thread.currentThread().getName());
11                 }
12             });
13         }
14         cachedthreadpool.shutdown();
15     }
16 }

運行結果:

2:newFixedThreadPool創建一個定長線程池。(指定core和max,且core=max=nThraed)

      

  通過構造函數可知,該線程池的核心線程數和最大線程數是一樣的。

  • FixedThreadPool的corePoolSize的值和maxiumPoolSize的值都被設置為創建FixedThreadPool時指定的參數nThreads的大小。
  • 0L則表示當線程池中的線程數量超過核心線程的數量時,多余的線程將被立即停止
  • 最后一個參數表示FixedThreadPool使用了無界隊列LinkedBlockingQueue作為線程池的做工隊列,由於是無界的,當線程池的線程數達到corePoolSize后,新任務將在無界隊列中等待,因此線程池的線程數量不會超過maxiumPoolSize,同時maxiumPoolSize也就變成了一個無效的參數,並且運行中的線程池也並不會拒絕任務。

執行過程如下:

  1.如果當前工作中的線程數量少於corePool的數量,就創建新的線程來執行任務。

  2.當線程池的工作中的線程數量達到了corePool,則將任務加入LinkedBlockingQueue。

  3.線程執行完1中的任務后會從隊列中去取任務。

注意LinkedBlockingQueue是無界隊列,所以可以一直添加新任務到線程池。

  【代碼演示】:newFixedThreadPool創建一個定長線程池,可控制線程池最大並發數,超出的線程會在隊列中等待。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。

 1 import java.util.concurrent.ExecutorService;
 2 import java.util.concurrent.Executors;
 3 public class FixedThreadDemo {
 4     public static void main(String[] args) {
 5         ExecutorService  fixedThreadpool= Executors.newFixedThreadPool(3);//參數是線程池中線程的數目
 6         for (int i = 0; i <10 ; i++) {
 7             int tmp=i;
 8             fixedThreadpool.execute(new Runnable() {
 9             //execute方法表示啟動線程
10                 @Override
11                 public void run() {
12                     System.out.println(Thread.currentThread().getName()+", "+tmp);
13                 }
14             });
15         }
16         fixedThreadpool.shutdown();//停止線程池的運行
17     }
18 }

運行結果:

 

   不論運行多少次,我們都可以看到,整個結果中其實只有3個線程,而且被反復利用着。

3:newScheduledThreadPool創建一個周期性的線程池。(指定core,max為max)

       

執行過程如下:

  1.如果當前工作中的線程數量少於corePool的數量,就創建一個新的線程來執行任務。

  2.當線程池的工作中的線程數量達到了corePool,則將任務加入DelayedWorkQueue。

  3.線程執行完1中的任務后會從隊列中去任務。

注意該線程池的最大線程數為MAX_VALUE。

  【代碼演示】:創建一個大小無限的線程池。此線程池支持定時以及周期性執行任務的需求。

 1 import java.util.concurrent.Executors;
 2 import java.util.concurrent.ScheduledExecutorService;
 3 import java.util.concurrent.TimeUnit;
 4 public class ScheduledThreadDemo {
 5     public static void main(String[] args) {
 6         ScheduledExecutorService scheduledThreadpool=Executors.newScheduledThreadPool(2);//核心線程數
 7         //注意返回的是ScheduledExecutorService 類型的
 8         for (int i = 0; i <4; i++) {
 9             int tmp=i;
10             /*scheduledThreadpool.schedule(new Runnable() {
11                 @Override
12                 public void run() {
13                     System.out.println(Thread.currentThread().getName()+", "+tmp);
14                 }
15             }, 2, TimeUnit.SECONDS);
16             //傳進一個線程,2表示線程開始時的延時*/
17             scheduledThreadpool.scheduleAtFixedRate(new Runnable() {
18                 @Override
19                 public void run() {
20                     System.out.println(Thread.currentThread().getName()+", "+tmp);
21                 }
22             }, 2,3, TimeUnit.SECONDS);
23             //傳進一個線程,2表示線程開始時的延遲時間,3表示周期即每3秒運行一次,TimeUnit.SECONDS
24         }
25         try {
26             Thread.sleep(30000);//先睡眠會,在關閉線程池,否則其他線程運行不了
27         } catch (InterruptedException e) {
28             e.printStackTrace();
29         }finally {
30             scheduledThreadpool.shutdown();//關閉線程池
31         }
32 
33     }
34 }

運行結果:

   通過結果我可以發現它是周期性運行的,且在開始打印時延時了2秒,然后每過3秒重復一次。

4:newSingleThreadExecutor創建一個單線程化的線程池。(core和max都默認為1)

 

 

 

  •    SingleThreadExecutor的corePoolSize和maxiumPoolSize都被設置1。

執行過程如下:

  1.如果當前工作中的線程數量少於corePool的數量,就創建一個新的線程來執行任務。

  2.當線程池的工作中的線程數量達到了corePool,則將任務加入LinkedBlockingQueue。

  3.線程執行完1中的任務后會從隊列中去任務。

注意:由於在線程池中只有一個工作線程,所以任務可以按照添加順序執行。

 【代碼演示】:創建一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當於單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。此線程池保證所有任務的執行順序按照任務的提交順序執行。

 1 import java.util.concurrent.ExecutorService;
 2 import java.util.concurrent.Executors;
 3 public class SingleThreadDemo {
 4     public static void main(String[] args) {
 5         ExecutorService singlethread=Executors.newSingleThreadExecutor();//單線程池
 6         for (int i = 0; i <5; i++) {
 7             singlethread.execute(new Runnable() {
 8                 @Override
 9                 public void run() {
10                     System.out.println(Thread.currentThread().getName());
11                 }
12             });
13         }
14         singlethread.shutdown();//關閉線程池
15     }
16 }

運行結果:

   通過運行結果可以發現,始終只有一個線程在被反復運行着。

 

線程池的狀態:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;  //32-3
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }//獲取高3位的線程狀態
private static int workerCountOf(int c) { return c & CAPACITY; }//獲取低29位的線程數量
private static int ctlOf(int rs, int wc) { return rs | wc; } //rs表示線程池狀態,wc表示線程池中線程數量,通過兩個參數可獲取ctl值

  其中ctl是一個AtomicInteger類型的屬性,其提供了線程池的狀態及線程池中線程的數量的統計,其中高3位用於維護線程池的運行狀態,低29位用來維護線程池中線程的數量

RUNNING = -1 << COUNT_BITS;即高3位為1,低29位0,該狀態的線程池可以接受新任務,也會處理阻塞隊列中的任務
SHUTDOWN = 0 << COUNT_BITS:即高3位是0,低29位為0,該狀態的線程池不會接受新的任務,但是會處理阻塞隊列中的任務
STOP = 1 << COUNT_BITS:即高3位為001,低29位為0,該狀態的線程池不會接受新任務,也不會處理阻塞隊列中的任務,並且還會終止正在執行的任務
TIDYING = 2 << COUNT_BITS:即高3位為010,低29位為0,所有的任務都被終止了,線程的數量為0,還要調用terminated()方法
TERMINATED = 3 << COUNT_BITS:即高3位為011,低29位為0,terminated()方法執行之后的狀態
這些狀態是int類型,大小關系為RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED

 

 

任務提交過程:
execute():提交任務

public void execute(Runnable command) {
  //提交的任務不能為null
  if (command == null)
    throw new NullPointerException();

  int c = ctl.get();
  //判斷工作線程數量小於核心線程數量
  if (workerCountOf(c) < corePoolSize) {
  //新創建一個線程來執行任務體====addWorker(command, true)
    if (addWorker(command, true))
    //添加成功,返回
    return;
    //添加任務失敗;線程狀態已經到showdown,不在接收新任務/工作線程數量大於核心線程數量
    c = ctl.get();
  }
  //線程是運行狀態,將任務添加到阻塞隊列
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
      reject(command);
    else if (workerCountOf(recheck) == 0)
      addWorker(null, false);
    }
  else if (!addWorker(command, false))
    reject(command);
  }

 

 

Java並發包中的阻塞隊列一共7個,當然他們都是線程安全的。 

DealyQueue:一個使用優先級隊列實現的無界阻塞隊列。  

·ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按FIFO(先進先出)原則對元素進行排序。
·LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。
·SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於Linked-BlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。
·PriorityBlockingQueue:一個具有優先級的無限阻塞隊列

 LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。 

 LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。

1.submit和execute的區別。

execute()方法用於提交不需要返回值的任務,所以無法判斷任務是否被線程池執行成功。且execute()方法輸入的任務是一個Runnable類的實例。
submit()方法用於提交需要返回值的任務。線程池會返回一個future類型的對象,通過這個future對象可以判斷任務是否執行成功,並且可以通過future的get()方法來獲取返回值,get()方法會阻塞當前線程直到任務完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞當前線程一段時間后立即返回,這時候有可能任務沒有執行完
概括:

  submit即可以提交Runnable類型的,也可以提交Callable類型的,而execute只能提交Runnable類型的。

  submit方法具有返回值,Runnable返回的是null,Callable返回的是call方法的返回值。而execute方法不具有返回值,所以說execute方法也不能判斷線程是否執行完畢。

2.shutdown和shutdownNow的區別。

  可以通過調用線程池的shutdown或shutdownNow方法來關閉線程池。它們的原理是遍歷線程池中的工作線程,然后逐個調用線程的interrupt方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止。但是它們存在一定的區別,shutdownNow首先將線程池的狀態設置成STOP,然后嘗試停止所有的正在執行或暫停任務的線程,並返回等待執行任務的列表,而shutdown只是將線程池的狀態設置成SHUTDOWN狀態,然后中斷所有沒有正在執行任務的線程。

 

  只要調用了這兩個關閉方法中的任意一個,isShutdown方法就會返回true。當所有的任務都已關閉后,才表示線程池關閉成功,這時調用isTerminaed方法會返回true。至於應該調用哪一種方法來關閉線程池,應該由提交到線程池的任務特性決定,通常調用shutdown方法來關閉線程池,如果任務不一定要執行完,則可以調用shutdownNow方法。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM