Java線程池詳解


一、介紹

  線程我們可以使用 new 的方式去創建,但如果並發的線程很多,每個線程執行的時間又不長,這樣頻繁的創建線程會大大的降低系統處理的效率,因為創建和銷毀進程都需要消耗資源,線程池就是用來解決類似問題。

  線程池實現了一個線程在執行完一段任務后,不銷毀,繼續執行下一段任務。用《Java並發編程藝術》提到線程池的優點:

  1、降低資源的消耗:使得線程可以重復使用,不需要在創建線程和銷毀線程上浪費資源

  2、提高響應速度:任務到達時,線程可以不需要創建即可以執行

  3、線程的可管理性:線程是稀缺資源,如果無限制的創建會嚴重影響系統效率,線程池可以對線程進行管理、監控、調優。

二、Excutor框架

  Excutor框架是線程池處理線程的核心,包括創建任務,傳遞任務,任務的執行三個方面

1、創建任務

  執行的任務需要實現 Runnable 或者 Callable 接口,然后重寫里面的 run 方法,這兩個接口區別下面會寫

2、傳遞任務

  以前執行線程都是直接創建線程然后調用 start() 方法去執行線程,現在我們需要把任務傳遞到線程池里面去,傳遞任務的核心接口就是 Excutor 接口,而它下面有幾個實現的方法,如圖所示:

 

 

  可以看到真正實現了功能的其實就是兩個類 ThreadPoolExecutor 和 ScheduledTreadPollExecutor,而其中用的最多的就是這個 ThreadPoolExecutor ,下面會詳細介紹,我們把任務通過這個方法的對象進行傳遞。

3、任務的執行及返回的結果

  任務傳遞給線程池執行完畢后,對於不同的傳遞方式,會有不同的返回策略,對於利用 excutor 方法傳遞的任務,不管執行的怎么樣都不會有值傳遞回來,而對於 submit 方法傳遞的任務會返回一個 FutureTask 對象,返回用戶希望接受的值。

 

Excutor 框架使用示意圖如圖:

 

 三、ThreadPoolExecutor 類介紹

  ThreadPoolExecutor 是線程池最為核心的一個類,而線程池為它提供了四個構造方法,我們先來看一下其中最原始的一個構造方法,其余三個都是由它衍生而來

 1  /**
 2      * 用給定的初始參數創建一個新的ThreadPoolExecutor。
 3      */
 4     public ThreadPoolExecutor(int corePoolSize,//線程池的核心線程數量
 5                               int maximumPoolSize,//線程池的最大線程數
 6                               long keepAliveTime,//當線程數大於核心線程數時,多余的空閑線程存活的最長時間
 7                               TimeUnit unit,//時間單位
 8                               BlockingQueue<Runnable> workQueue,//任務隊列,用來儲存等待執行任務的隊列
 9                               ThreadFactory threadFactory,//線程工廠,用來創建線程,一般默認即可
10                               RejectedExecutionHandler handler//拒絕策略,當提交的任務過多而不能及時處理時,我們可以定制策略來處理任務
11                                ) {
12         if (corePoolSize < 0 ||
13             maximumPoolSize <= 0 ||
14             maximumPoolSize < corePoolSize ||
15             keepAliveTime < 0)
16             throw new IllegalArgumentException();
17         if (workQueue == null || threadFactory == null || handler == null)
18             throw new NullPointerException();
19         this.corePoolSize = corePoolSize;
20         this.maximumPoolSize = maximumPoolSize;
21         this.workQueue = workQueue;
22         this.keepAliveTime = unit.toNanos(keepAliveTime);
23         this.threadFactory = threadFactory;
24         this.handler = handler;
25     }

  可以看到這里有6個參數,這些參數直接影響到線程池的效果,以下是具體分析每個參數的意義

1、corePoolSize:線程池最小創建線程的數目,默認情況下,線程池中是沒有線程的,也就是當沒有任務來臨的時候,初始化的線程池容量為0,而最小創建線程的數目則是在有線程來臨的時候,直接創建 corePoolSize 個線程

 

2、maximumPoolSize:線程池能創建的最大線程的數量,在核心線程都被占用的時候,繼續申請的任務會被擱置在等待隊列里面,而當等待隊列滿了的時候,線程池就會把線程數量創建至 maximumPoolSize 個。

 

3、workQueue:核心線程被占有時,任務被擱置在任務隊列

 

4、keepAliveTime:當線程池中的線程數量大於 corePoolSize 時這個參數就會生效,即當大於 corePoolSize 的線程在經過 keepAliveTime 仍然沒有任務執行,則銷毀線程

 

5、unit :參數keepAliveTime的時間單位

 

6、ThreadFactory:線程工廠:主要用來創建線程,一般默認即可

 

7、handler:飽和策略,即當線程池和等待隊列都達到最大負荷量時,下一個任務來臨時采取的策略

 

 

飽和策略的介紹: 即如果當前同時運行的線程數量達到最大線程數量並且隊列也已經被放滿了任務時,ThreadPoolTaskExecutor 定義一些策略:

 

  • ThreadPoolExecutor.AbortPolicy :拋出 RejectedExecutionException來拒絕新任務的處理。
  • ThreadPoolExecutor.CallerRunsPolicy :調用執行自己的線程運行任務,也就是直接在調用execute方法的線程中運行(run)被拒絕的任務,如果執行程序已關閉,則會丟棄該任務。因此這種策略會降低對於新任務提交速度,影響程序的整體性能。如果您的應用程序可以承受此延遲並且你要求任何一個任務請求都要被執行的話,你可以選擇這個策略。
  • ThreadPoolExecutor.DiscardPolicy :不處理新任務,直接丟棄掉。
  • ThreadPoolExecutor.DiscardOldestPolicy : 此策略將丟棄最早的未處理的任務請求。

四、實現一個簡單的線程池

1、創建任務

class worker implements Runnable{
    @Override
    public void run(){
        System.out.println(Thread.currentThread().getName());
    }
    
}

2、創建線程池,並將任務傳遞進去

 

 1 public class Test {
 2     private static final int corePoolSize = 4;
 3     private static final int maximumPoolSize = 6;
 4     private static final long keepAliveTime = 2;
 5     private static final TimeUnit unit = TimeUnit.SECONDS;
 6     private static final int QueueSize = 5;//將參數設定為固定值
 7     public static void main(String[] args) {
 8         ThreadPoolExecutor executor = new ThreadPoolExecutor(
 9                 corePoolSize,
10                 maximumPoolSize,
11                 keepAliveTime,
12                 unit,
13                 new ArrayBlockingQueue<>(QueueSize),
14                 new ThreadPoolExecutor.CallerRunsPolicy()
15         );//構造線程池
16         for(int i = 0; i < 10; i++){
17             worker w = new worker();
18             executor.execute(w);
19         }//傳入10個任務
20         executor.shutdown();//關閉線程池
21         while(!executor.isTerminated()){
22     }
23         System.out.println("finish");
24 }}

 

3、運行結果

 1 pool-1-thread-4
 2 pool-1-thread-2
 3 pool-1-thread-3
 4 pool-1-thread-1
 5 pool-1-thread-5
 6 pool-1-thread-1
 7 pool-1-thread-4
 8 pool-1-thread-5
 9 pool-1-thread-3
10 pool-1-thread-2
11 finish

  我們可以根據運行的結果就知道我傳進去的 10 個任務是通過 5 個線程復用所得到。

五、線程池相關問題

1、線程池狀態

在ThreadPoolExecutor中定義了一個volatile變量,另外定義了幾個static final變量表示線程池的各個狀態:

1 volatile int runState;
2 static final int RUNNING    = 0;
3 static final int SHUTDOWN   = 1;
4 static final int STOP       = 2;
5 static final int TERMINATED = 3;

  runState表示當前線程池的狀態,它是一個volatile變量用來保證線程之間的可見性;

  下面的幾個static final變量表示runState可能的幾個取值。

  當創建線程池后,初始時,線程池處於RUNNING狀態;

  如果調用了shutdown()方法,則線程池處於SHUTDOWN狀態,此時線程池不能夠接受新的任務,它會等待所有任務執行完畢;

  如果調用了shutdownNow()方法,則線程池處於STOP狀態,此時線程池不能接受新的任務,並且會去嘗試終止正在執行的任務;

  當線程池處於SHUTDOWN或STOP狀態,並且所有工作線程已經銷毀,任務緩存隊列已經清空或執行結束后,線程池被設置為TERMINATED狀態。

2、線程池處理任務的策略

  • 如果當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;
  • 如果當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程將其取出去執行;若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務;
  • 如果當前線程池中的線程數目達到maximumPoolSize,則會采取任務拒絕策略進行處理;
  • 如果線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;如果允許為核心池中的線程設置存活時間,那么核心池中的線程空閑時間超過keepAliveTime,線程也會被終止。

 3、Runnable 和 Callable的區別

   Runnable:

@FunctionalInterface
public interface Runnable {
   /**
    * 被線程執行,沒有返回值也無法拋出異常
    */
    public abstract void run();//單純的run
}

  Callable:

1 @FunctionalInterface
2 public interface Callable<V> {
3     /**
4      * 計算結果,或在無法這樣做時拋出異常。
5      * @return 計算得出的結果
6      * @throws 如果無法計算結果,則拋出異常
7      */
8     V call() throws Exception;//無法計算則拋出異常
9 }

 

4、Excutor 和 Submit的區別

  Excutor用於提交不需要返回值的任務,線程在執行完后既不會返回結果,也不會拋出異常

 

  Submit則是用於提交需要返回值的任務,在線程執行完Submit提交的任務后,會返回一個 future 對象,這個 future 對象可以用來判斷任務是否執行成功,並且可以用 future 的get()方法來獲取其返回值,如:

 1 ExecutorService executorService = Executors.newFixedThreadPool(3);
 2 
 3 Future<String> submit = executorService.submit(() -> {
 4     try {
 5         Thread.sleep(5000L);
 6     } catch (InterruptedException e) {
 7         e.printStackTrace();
 8     }
 9     return "abc";//帶有返回值
10 });
11 
12 String s = submit.get();//get返回值
13 System.out.println(s);
14 executorService.shutdown();

 

 六、幾種線程池比較

1、FixThreadPool

  稱為可重用固定線程池,其源碼:

 

1    /**
2      * 創建一個可重用固定數量線程的線程池
3      */
4     public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
5         return new ThreadPoolExecutor(nThreads, nThreads,
6                                       0L, TimeUnit.MILLISECONDS,
7                                       new LinkedBlockingQueue<Runnable>(),
8                                       threadFactory);
9     }

  從源碼可以看出,這個這個線程池的核心線程數 和 最大線程數量都設置為 nThreads ,這個參數是我們在創建線程池的時候傳遞的

  其運行示意圖,來源自《Java並發編程的藝術》:

 

 

 

上圖說明:

  1. 如果當前運行的線程數小於 corePoolSize, 如果再來新任務的話,就創建新的線程來執行任務;
  2. 當前運行的線程數等於 corePoolSize 后, 如果再來新任務的話,會將任務加入 LinkedBlockingQueue;
  3. 線程池中的線程執行完 手頭的任務后,會在循環中反復從 LinkedBlockingQueue 中獲取任務來執行;

 

 使用FixThreadPool的缺點

   在 FixThreadPool 中的等待隊列是使用的 LinkedBlockingQueue,這是一個無界隊列,最大的容量為 Integer.MAX_VALUE,可以無限制的接受任務,從而導致線程池的最大線程數參數生無效的,在核心線程全部被占有時,新來的任務就會被安置在無界隊列中,當任務很多的時候,線程池沒有拒絕任務的策略,就可能導致OOM(內存不足)

  1. 當線程池中的線程數達到核心線程數后,新任務將在無界隊列中等待,因此線程池中的線程數不會超過 corePoolSize;
  2. 由於使用無界隊列時 最大線程數 將是一個無效參數,因為不可能存在任務隊列滿的情況。所以,通過創建 的源碼可以看出創建的 FixThreadPool 的 核心線程數最大線程數 被設置為同一個值。
  3. 運行中的 FixThreadPool(未執行 shutdown()或 shutdownow())不會拒絕任務,在任務比較多的時候會導致 OOM(內存溢出)。

 

2、SingleThreadExcutor

 1  /**
 2      *返回只有一個線程的線程池
 3      */
 4     public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
 5         return new FinalizableDelegatedExecutorService
 6             (new ThreadPoolExecutor(1, 1,
 7                                     0L, TimeUnit.MILLISECONDS,
 8                                     new LinkedBlockingQueue<Runnable>(),
 9                                     threadFactory));
10     }

  可以看到特點是只有一個核心線程

 

 

 

 SingleThreadExcutor的缺點

  同樣的其等待隊列也是用的 LinkedBlockingQueue 無界隊列,同樣也可能出現 OOM。

3、CachedThreadPool

  /**
     * 創建一個線程池,根據需要創建新線程,但會在先前構建的線程可用時重用它。
     */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

  從源碼可以看到,其核心線程數為0,但最大線程數目為 Integer.MAX_VALUE,即是無界,其只要來任務需要線程就會創建線程,極端情況會造成線程耗盡的情況

 

其執行過程是當有任務傳遞給線程池會先查看現有的線程是否夠,如果不夠,則會創建新的線程。 其缺點就是可能創建大量線程,導致OOM

 

七、線程池大小的設置

 

線程池數量的確定一直是困擾着程序員的一個難題,大部分程序員在設定線程池大小的時候就是隨心而定。

 

很多人甚至可能都會覺得把線程池配置過大一點比較好!我覺得這明顯是有問題的。就拿我們生活中非常常見的一例子來說:並不是人多就能把事情做好,增加了溝通交流成本。你本來一件事情只需要 3 個人做,你硬是拉來了 6 個人,會提升做事效率嘛?我想並不會。 線程數量過多的影響也是和我們分配多少人做事情一樣,對於多線程這個場景來說主要是增加了上下文切換成本。不清楚什么是上下文切換的話,以看我下面的介紹。

 

上下文切換:

多線程編程中一般線程的個數都大於 CPU 核心的個數,而一個 CPU 核心在任意時刻只能被一個線程使用,為了讓這些線程都能得到有效執行,CPU 采取的策略是為每個線程分配時間片並輪轉的形式。當一個線程的時間片用完的時候就會重新處於就緒狀態讓給其他線程使用,這個過程就屬於一次上下文切換。概括來說就是:當前任務在執行完 CPU 時間片切換到另一個任務之前會先保存自己的狀態,以便下次再切換回這個任務時,可以再加載這個任務的狀態。任務從保存到再加載的過程就是一次上下文切換

上下文切換通常是計算密集型的。也就是說,它需要相當可觀的處理器時間,在每秒幾十上百次的切換中,每次切換都需要納秒量級的時間。所以,上下文切換對系統來說意味着消耗大量的 CPU 時間,事實上,可能是操作系統中時間消耗最大的操作。

Linux 相比與其他操作系統(包括其他類 Unix 系統)有很多的優點,其中有一項就是,其上下文切換和模式切換的時間消耗非常少。

 

類比於實現世界中的人類通過合作做某件事情,我們可以肯定的一點是線程池大小設置過大或者過小都會有問題,合適的才是最好。

如果我們設置的線程池數量太小的話,如果同一時間有大量任務/請求需要處理,可能會導致大量的請求/任務在任務隊列中排隊等待執行,甚至會出現任務隊列滿了之后任務/請求無法處理的情況,或者大量任務堆積在任務隊列導致 OOM。這樣很明顯是有問題的! CPU 根本沒有得到充分利用。

但是,如果我們設置線程數量太大,大量線程可能會同時在爭取 CPU 資源,這樣會導致大量的上下文切換,從而增加線程的執行時間,影響了整體執行效率。

 

 

 

 

 

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM