Java並發編程之線程池的使用


1. 為什么要使用多線程?

隨着科技的進步,現在的電腦及服務器的處理器數量都比較多,以后可能會越來越多,比如我的工作電腦的處理器有8個,怎么查看呢?

計算機右鍵--屬性--設備管理器,打開屬性窗口,然后點擊“設備管理器”,在“處理器”下可看到所有的處理器:

也可以通過以下Java代碼獲取到處理器的個數:

System.out.println("CPU個數:" + Runtime.getRuntime().availableProcessors());

運行結果如下所示:

CPU個數:8

既然處理器的個數增加了,如果還使用傳統的串行編程,就有點浪費資源了,因此,為了提高資源利用率,讓各個處理器都忙碌起來,就需要引入並發編程,要引入並發編程,就引入了多線程。

可以說,使用多線程的最直接目的就是為了提高資源利用率,資源的利用率提高了,系統的吞吐率也就相應提高了。

2. 為什么要使用線程池?

在一定的范圍內,增加線程可以提高應用程序的吞吐率,但線程並不是越多越好(因為線程的創建與銷毀都需要很大的開銷),如果超過了某個范圍,不僅會降低應用程序的執行速度,嚴重的話,應用程序甚至會崩潰,以至於不得不重啟應用程序。

為了避免這種問題,就需要對應用程序可以創建的線程數量進行限制,確保在線程數量達到限制時,程序也不會耗盡資源,線程池就是為了解決這種問題而出現的。

線程池:管理一組工作線程的資源池。

線程池與工作隊列密切相關,工作隊列中保存了所有等待執行的任務。

工作者線程的任務就是從工作隊列中獲取一個任務,執行任務,然后返回線程池並等待下一個任務。

使用線程池可以帶來以下好處:

  1. 通過重用現有的線程而不是創建新線程,可以在處理多個任務時減少在線程創建與銷毀過程中產生的巨大開銷。
  2. 當任務到達時,工作線程通常已經存在,因此不會由於等待創建線程而延遲任務的執行,從而提高了響應性。
  3. 可以通過調整線程池的大小,創建足夠多的線程使處理器保持忙碌狀態,同時還可以防止過多線程相互競爭資源而使應用程序耗盡內存或崩潰。

3. 創建線程池

3.1 使用Executors靜態方法創建(不推薦)

Executors類提供了以下4個靜態方法來快速的創建線程池:

  1. newFixedThreadPool
  2. newCachedThreadPool
  3. newSingleThreadExecutor
  4. newScheduledThreadPool

首先看下newFixedThreadPool()方法的使用方式:

ExecutorService threadPool = Executors.newFixedThreadPool(10);

它的源碼為:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

說明:newFixedThreadPool將創建一個固定長度的線程池,每當提交一個任務時就創建一個線程,直到達到線程池的最大數量,這時線程池的規模將不再變化(如果某個線程由於發生了未預期的Exception而結束,那么線程池會補充一個新的線程)。

然后看下newCachedThreadPool()方法的使用方式:

ExecutorService threadPool = Executors.newCachedThreadPool();

它的源碼為:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

說明:newCachedThreadPool將創建一個可緩存的線程池,如果線程池的規模超過了處理需求時,那么將回收空閑的線程,而當需求增加時,則添加新的線程,線程池的最大規模為Integer.MAX_VALUE。

然后看下newSingleThreadExecutor()方法的使用方式:

ExecutorService threadPool = Executors.newSingleThreadExecutor();

它的源碼為:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

說明:newSingleThreadExecutor是一個單線程的Executor,它創建單個工作者線程來執行任務,如果這個線程異常結束,就創建一個新的線程來替代。

newSingleThreadExecutor可以確保依照任務在隊列中的順序來串行執行。

最后看下newScheduledThreadPool()方法的使用方式:

ExecutorService threadPool = Executors.newScheduledThreadPool(10);

它的源碼為:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

super指向如下代碼:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

說明:newScheduledThreadPool將創建一個固定長度的線程池,而且以延遲或者定時的方式來執行任務,類似於Timer。

可以發現,以上4種方式最終都指向了ThreadPoolExecutor的以下構造函數,只是很多參數沒讓你指定,傳遞了默認值而已:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // 省略具體的代碼
}

雖然使用這4個方法可以快速的創建線程池,但還是不推薦使用,第一,很多參數都設置了默認值,不便於你理解各個參數的具體含義,第二,參數的默認值可能會造成一定的問題,最好是由使用者根據自己的需求自行指定。

那么這7個參數分別代表什么含義呢?請接着往下看。

3.2 使用ThreadPoolExecutor構造函數創建(推薦)

ThreadPoolExecutor共有以下4個構造函數,推薦使用這種方式來創建線程池:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

以上3個也都指向參數最全的第4個構造函數:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // 省略具體的代碼
}

以下為各個參數的講解:

  • corePoolSize:核心線程數。

  • maximumPoolSize:最大線程數。

    最大線程數=核心線程數+非核心線程數。

  • keepAliveTime:非核心線程閑置超時時間。

    一個非核心線程,如果不干活(閑置狀態)的時長超過這個參數所設定的時長,就會被銷毀掉,如果設置了allowCoreThreadTimeOut = true,則會作用於核心線程。

  • unit:參數keepAliveTime的時間單位,如秒、分、小時。

  • workQueue:工作隊列,即要執行的任務隊列,里面存儲等待執行的任務。

    這里的阻塞隊列可選擇的有:LinkedBlockingQueue、ArrayBlockingQueue、SynchronousQueue、DelayedWorkQueue。

    newFixedThreadPool()方法默認使用的LinkedBlockingQueue,

    newCachedThreadPool()方法默認使用的SynchronousQueue,

    newSingleThreadExecutor()方法默認使用的LinkedBlockingQueue,

    newScheduledThreadPool()方法默認使用的DelayedWorkQueue。

  • threadFactory:線程工廠,用來用來創建線程。

  • handler:飽和策略/拒絕處理任務時的策略。

    當workQueue已滿,並且線程池的線程數已達到maximumPoolSize,此時新提交的任務會交由RejectedExecutionHandler handler處理,主要有以下4種策略:

    AbortPolicy:中止策略,拋棄任務並拋出未檢查的RejectedExecutionException,這也是默認的飽和策略。

    DiscardPolicy:拋棄策略,直接拋棄任務,但不拋出異常。

    DiscardOldestPolicy:拋棄最舊的策略,拋棄下一個將被執行的任務,然后嘗試重新提交新的任務。

    CallerRunsPolicy:調用者運行策略,將任務回退到調用者,在調用者所在的線程執行該任務。

4. 線程池的運行原理

可以通過下面2張圖來理解線程池的運行原理:

1)如果線程池中的線程小於corePoolSize,則創建新線程來處理任務,這時創建的線程為核心線程。

2)如果線程中的線程等於或者大於corePoolSize,則將任務放到工作隊列中,即上圖中的BlockingQueue。

3)如果工作隊列已滿,無法將任務加入到BlockingQueue,則創建新的線程來處理任務,這時創建的線程為非核心線程,非核心線程在空閑一段時間后會被回收銷毀掉(keepAliveTime和unit就是用來定義這個空閑的時間是多少)。

4)如果創建新線程導致線程池中的線程數超過了maximumPoolSize,任務將被拒絕,並調用RejectedExecutionHandler.rejectedExecution()方法。

5. ThreadPoolExecutor示例

新建如下示例代碼,創建1個corePoolSize為2,maximumPoolSize為3的線程池:

import java.util.concurrent.*;

public class ThreadPoolExecutorTest {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 3, 60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1));

        threadPoolExecutor.execute(() -> {
            try {
                Thread.sleep(3 * 1000);
                System.out.println("任務1執行線程:" + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        threadPoolExecutor.execute(() -> {
            System.out.println("任務2執行線程:" + Thread.currentThread().getName());
        });
    }
}

運行結果為:

任務2執行線程:pool-1-thread-2

任務1執行線程:pool-1-thread-1

可以看出,因為線程池中的線程數小於corePoolSize,線程池創建了2個核心線程來分別執行任務1和任務2。

修改代碼為如下所示,開啟3個任務:

import java.util.concurrent.*;

public class ThreadPoolExecutorTest {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 3, 60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1));

        threadPoolExecutor.execute(() -> {
            try {
                Thread.sleep(3 * 1000);
                System.out.println("任務1執行線程:" + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        threadPoolExecutor.execute(() -> {
            try {
                Thread.sleep(5 * 1000);
                System.out.println("任務2執行線程:" + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        threadPoolExecutor.execute(() -> System.out.println("任務3執行線程:" + Thread.currentThread().getName()));
    }
}

運行結果為:

任務1執行線程:pool-1-thread-1

任務3執行線程:pool-1-thread-1

任務2執行線程:pool-1-thread-2

可以看出,執行任務3時並沒有新建線程,而是先放入了工作隊列,最后由線程1執行完成。

在上面的代碼中新增個任務4:

threadPoolExecutor.execute(() -> System.out.println("任務4執行線程:" + Thread.currentThread().getName()));

此時運行結果為:

任務4執行線程:pool-1-thread-3

任務3執行線程:pool-1-thread-3

任務1執行線程:pool-1-thread-1

任務2執行線程:pool-1-thread-2

可以看出,任務3是先放入了工作隊列,任務4放不到工作隊列(空間已滿),所以創建了第3個線程來執行,執行完畢后從隊列里獲取到任務3執行,任務1和任務2分別由線程1和線程2執行。

修改下任務4的代碼,並添加任務5:

threadPoolExecutor.execute(() -> {
    try {
        Thread.sleep(2 * 1000);
        System.out.println("任務4執行線程:" + Thread.currentThread().getName());
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});

threadPoolExecutor.execute(() -> System.out.println("任務5執行線程:" + Thread.currentThread().getName()));

此時運行結果為:

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task ThreadPoolExecutorTest$$Lambda$5/935044096@179d3b25 rejected from java.util.concurrent.ThreadPoolExecutor@254989ff[Running, pool size = 3, active threads = 3, queued tasks = 1, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at ThreadPoolExecutorTest.main(ThreadPoolExecutorTest.java:37)
任務4執行線程:pool-1-thread-3

任務3執行線程:pool-1-thread-3

任務1執行線程:pool-1-thread-1

任務2執行線程:pool-1-thread-2

可以看出,當提交任務5時,由於工作隊列已滿, 且線程池中的線程數已經為3,所以該任務被拋棄並拋出了java.util.concurrent.RejectedExecutionException異常。

如果你看到了這里,是否會好奇參數maximumPoolSize設置為多少合適呢?

這個問題,我們下次講解,歡迎持續關注,哈哈!

6. 源碼及參考

Brian Goetz《Java並發編程實戰》

怎么查看處理器(cpu)的核數

ThreadPoolExecutor使用方法

Java線程池-ThreadPoolExecutor原理分析與實戰

深入理解 Java 多線程核心知識:跳槽面試必備

互聯網大廠Java面試題:使用無界隊列的線程池會導致內存飆升嗎?【石杉的架構筆記】


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM