java線程池技術(二): 核心ThreadPoolExecutor介紹


版權聲明:本文出自汪磊的博客,轉載請務必注明出處。

Java線程池技術屬於比較“古老”而又比較基礎的技術了,本篇博客主要作用是個人技術梳理,沒什么新玩意。

一、Java線程池技術的由來

我們平時使用線程來進行異步操作時,線程的創建,銷毀等相對來說都是比較消耗資源的,試想這樣一個業務情景:高並發請求,但是每次請求的時間非常短。如果我們為每一個請求都單獨創建一個線程來執行,就會消耗大量設備資源,使設備處於高負荷狀態,顯然這樣的處理就有很大問題了。這時候我們就可以用線程池技術來解決了,我們在線程池中創建若干條線程,當有任務需要執行時就從該線程池中獲取一個線程來執行任務,如果一時間任務過多,超出線程池的線程數量,那么后面的線程任務就進入一個等待隊列進行等待,直到線程池有線程處於空閑時才從等待隊列獲取要執行的任務進行處理,這樣就減少了線程創建和銷毀的開銷,實現了線程的復用。

二、Executor框架介紹

首先對整體框架有個大概了解,如圖:

 

 

Executor是個接口,就定義了一個void execute(Runnable command)方法。

ExecutorService同樣是一個接口並且繼承自Executor接口,對其方法進行擴展,其中最重要的是<T> Future<T> submit(Callable<T> task)方法,關於CallableFutureFutureTask不是本篇重點,有時間會單獨寫一篇博客介紹。也可以自行搜索了解,比較簡單。

AbstractExecutorService抽象類,實現了ExecutorService接口,主要實現了submit,ivokeAny方法。

ScheduledExecutorService同樣是一個接口,繼承自ExecutorService接口,對其進行擴展,主要就是schedule等方法。

ThreadPoolExecutor具體線程池實現類,繼承自AbstractExecutorService抽象類,我們使用的時候大部分就是使用這個類,后面會具體講到。

ScheduledThreadPoolExecutor 具有調度能力的線程池實現類,繼承自ThreadPoolExecutor類,且實現ScheduledExecutorService接口,其主要功能就是調用schedule方法,可以延時或者周期的執行某一任務,而ThreadPoolExecutor是沒有這一共能的。

三、ThreadPoolExecutor構造函數參數介紹

在我們使用ThreadPoolExecutor的時候會發現構造函數中有很多參數,如下:

 1 public ThreadPoolExecutor(int corePoolSize,
 2                               int maximumPoolSize,
 3                               long keepAliveTime,
 4                               TimeUnit unit,
 5                               BlockingQueue<Runnable> workQueue) {
 6         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
 7              Executors.defaultThreadFactory(), defaultHandler);
 8     }
 9 
10 public ThreadPoolExecutor(int corePoolSize,
11                               int maximumPoolSize,
12                               long keepAliveTime,
13                               TimeUnit unit,
14                               BlockingQueue<Runnable> workQueue,
15                               ThreadFactory threadFactory) {
16         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
17              threadFactory, defaultHandler);
18     }
19 
20 public ThreadPoolExecutor(int corePoolSize,
21                               int maximumPoolSize,
22                               long keepAliveTime,
23                               TimeUnit unit,
24                               BlockingQueue<Runnable> workQueue,
25                               RejectedExecutionHandler handler) {
26         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
27              Executors.defaultThreadFactory(), handler);
28     }
29 
30 
31 public ThreadPoolExecutor(int corePoolSize,
32                               int maximumPoolSize,
33                               long keepAliveTime,
34                               TimeUnit unit,
35                               BlockingQueue<Runnable> workQueue,
36                               ThreadFactory threadFactory,
37                               RejectedExecutionHandler handler) {
38         if (corePoolSize < 0 ||
39             maximumPoolSize <= 0 ||
40             maximumPoolSize < corePoolSize ||
41             keepAliveTime < 0)
42             throw new IllegalArgumentException();
43         if (workQueue == null || threadFactory == null || handler == null)
44             throw new NullPointerException();
45         this.corePoolSize = corePoolSize;
46         this.maximumPoolSize = maximumPoolSize;
47         this.workQueue = workQueue;
48         this.keepAliveTime = unit.toNanos(keepAliveTime);
49         this.threadFactory = threadFactory;
50         this.handler = handler;
51     }

前3個構造函數都是調用第4個構造函數,只不過有些參數使用默認的罷了。

接下來我們看下第4個構造函數每個參數意義:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,RejectedExecutionHandler handler)

參數 說明
corePoolSize 線程池中核心線程數量
maximumPoolSize 線程池中最大線程數量
keepAliveTime 非核心線程存活時間
unit keepAliveTime的時間單位
workQueue 存放任務的隊列
threadFactory 用來生產線程的工廠
handler 當線程池中不能再放入任務時執行的handler

如果有一個corePoolSize為5,maximumPoolSize為10的線程池,可用下圖形象展示:

 

這里要說明一下:所謂核心線程非核心線程只是一個數量的說明,並不是說核心線程非核心線程有本質上的不同,它們都是普通的線程而已,並且線程特性都一樣,不是說核心線程有特殊標記,線程池能“認”出來這是核心線程,對其有特殊操作。

四、線程池處理任務的策略

我們調用線程池的submit()或execute()方法,向線程池中放入一個任務執行的時候線程池到底是怎么按照其策略來執行的呢?接下來,我們對其執行策略進行詳細介紹,介紹完會對構造函數中每個參數有更深刻印象的。

1,調用線程池的submit()或execute()方法向線程池中放入一個任務,線程池內部會檢查運行的線程數量是否達到corePoolSize數量,如果沒有達到,則創建一個線程執行放入的任務,不管已經創建的線程是否處於空閑狀態,創建線程的任務由threadFactory來完成,關於ThreadFactory可以參考我的另一篇博客來學習:java線程池技術(一):ThreadFactory與BlockingQueue。

2,我們繼續向線程池中放入任務,此時線程池中運行的線程數量已經達到corePoolSize數量,則新加入的任務將會被放入workQueue中,直到有線程處於空閑狀態,則從workQueue中取出任務執行。

3,繼續向線程池中放入任務,此時線程池中運行的線程數量已經達到corePoolSize數量,並且workQueue中已經放滿任務不能再放入新的任務,那么這時候就繼續創建新的線程,注意此時線程池中線程數量已經多余corePoolSize數量,多出來的線程就叫做非核心線程。用非核心線程來執行新放入的任務。當線程池中的線程超過你設置的corePoolSize參數,說明當前線程池中有所謂的“非核心線程”。當某個線程處理完任務后,如果等待keepAliveTime時間后仍然沒有新的任務分配給它,那么這個線程將會被回收。線程池回收線程時,對所謂的“核心線程”和“非核心線程”是一視同仁的,直到線程池中線程的數量等於你設置的corePoolSize參數時,回收過程才會停止。

4繼續向線程池中放入任務,此時線程池中運行的線程數量已經達到maximumPoolSize數量,並且workQueue中已經放滿任務不能再放入新的任務,由於線程池中運行的線程

已經達到maximumPoolSize數量,所以無法再創建線程執行新放入的任務,此時handler參數就起作用了,在使用的時候相信大部分開發者都沒用過這個參數,我們看下系統默認怎么處理的,

系統默認闖入傳入的是defaultHandler,如下:

1 public ThreadPoolExecutor(int corePoolSize,
2                               int maximumPoolSize,
3                               long keepAliveTime,
4                               TimeUnit unit,
5                               BlockingQueue<Runnable> workQueue,
6                               ThreadFactory threadFactory) {
7         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
8              threadFactory, defaultHandler);
9     }

初始化如下:

1     private static final RejectedExecutionHandler defaultHandler =
2         new AbortPolicy();

接下來看下AbortPolicy這個類吧:

 1     public static class AbortPolicy implements RejectedExecutionHandler {
 2         /**
 3          * Creates an {@code AbortPolicy}.
 4          */
 5         public AbortPolicy() { }
 6 
 7         /**
 8          * Always throws RejectedExecutionException.
 9          *
10          * @param r the runnable task requested to be executed
11          * @param e the executor attempting to execute this task
12          * @throws RejectedExecutionException always
13          */
14         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
15             throw new RejectedExecutionException("Task " + r.toString() +
16                                                  " rejected from " +
17                                                  e.toString());
18         }
19     }

AbortPolicy實現了RejectedExecutionHandler接口,在rejectedExecution方法中拋出RejectedExecutionException異常。

所以如果線程池中運行的線程數量已經達到maximumPoolSize數量,並且workQueueworkQueue中已經放滿任務不能再放入新的任務,系統默認情況下就會拋出

RejectedExecutionException異常,我們也可以自己實現RejectedExecutionHandler接口,在rejectedExecution方法中實現自己策略。比如我自己寫的網絡請求框架就自己定義了

RejectedExecutionHandler,如下:

 1 public class RejectedPolicy implements RejectedExecutionHandler{
 2 
 3     @Override
 4     public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
 5          try {
 6              taskQuene.put(r);
 7         } catch (InterruptedException e) {
 8              e.printStackTrace();
 9         }
10     }
11 }

好了,以上就是線程池具體執行一個新任務的大體策略,是不是有了更深的認識???

以上分析中涉及的ThreadFactory與BlockingQueue如果你不是太了解,可以參考我的另一篇博客了解一下:java線程池技術(一):ThreadFactory與BlockingQueue。

好了,關於線程池大體介紹就到此為止,希望對你有用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM