Java多線程之ThreadPoolExecutor詳解使用


1、概述

我將講解JAVA原生線程池的基本使用,並由此延伸出JAVA中和線程管理相關的類結構體系,然后我們詳細描述JAVA原生線程池的結構和工作方式

2、為什么要使用線程池

這里寫圖片描述

前文我們已經講到,線程是一個操作系統概念。操作系統負責這個線程的創建、掛起、運行、阻塞和終結操作。而操作系統創建線程、切換線程狀態、終結線程都要進行CPU調度。

另一方面,目前大多數生產環境我們所面臨問題的技術背景一般是:處理某一次請求的時間是非常短暫的,但是請求數量是巨大的。這種技術背景下,如果我們為每一個請求都單獨創建一個線程,那么物理機的所有資源基本上都被操作系統創建線程、切換線程狀態、銷毀線程這些操作所占用,用於業務請求處理的資源反而減少了。所以最理想的處理方式是,將處理請求的線程數量控制在一個范圍,既保證后續的請求不會等待太長時間,又保證物理機將足夠的資源用於請求處理本身。

另外,一些操作系統是有最大線程數量限制的。當運行的線程數量逼近這個值的時候,操作系統會變得不穩定。這也是我們要限制線程數量的原因。

3、線程池的基本使用方式

JAVA語言為我們提供了兩種基礎線程池的選擇:ScheduledThreadPoolExecutor和ThreadPoolExecutor。它們都實現了ExecutorService接口(注意,ExecutorService接口本身和“線程池”並沒有直接關系,它的定義更接近“執行器”,而“使用線程管理的方式進行實現”只是其中的一種實現方式)。這篇文章中,我們主要圍繞ThreadPoolExecutor類進行講解。

3-1、簡單使用

首先我們來看看ThreadPoolExecutor類的最簡單使用方式:

  1.  
    package test.thread.pool;
  2.  
     
  3.  
    import java.util.concurrent.SynchronousQueue;
  4.  
    import java.util.concurrent.ThreadPoolExecutor;
  5.  
    import java.util.concurrent.TimeUnit;
  6.  
     
  7.  
    import org.apache.commons.logging.Log;
  8.  
    import org.apache.commons.logging.LogFactory;
  9.  
    import org.apache.log4j.BasicConfigurator;
  10.  
     
  11.  
    public class PoolThreadSimple {
  12.  
     
  13.  
    static {
  14.  
    BasicConfigurator.configure();
  15.  
    }
  16.  
     
  17.  
    public static void main(String[] args) throws Throwable {
  18.  
     
  19.  
    /*
  20.  
    * corePoolSize:核心大小,線程池初始化的時候,就會有這么大
  21.  
    * maximumPoolSize:線程池最大線程數
  22.  
    * keepAliveTime:如果當前線程池中線程數大於corePoolSize。
  23.  
    * 多余的線程,在等待keepAliveTime時間后如果還沒有新的線程任務指派給它,它就會被回收
  24.  
    *
  25.  
    * unit:等待時間keepAliveTime的單位
  26.  
    *
  27.  
    * workQueue:等待隊列。這個對象的設置是本文將重點介紹的內容
  28.  
    * */
  29.  
    ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor( 5, 10, 1, TimeUnit.MINUTES, new SynchronousQueue<Runnable>());
  30.  
    for( int index = 0 ; index < 10 ; index ++) {
  31.  
    poolExecutor.submit( new PoolThreadSimple.TestRunnable(index));
  32.  
    }
  33.  
     
  34.  
    // 沒有特殊含義,只是為了保證main線程不會退出
  35.  
    synchronized (poolExecutor) {
  36.  
    poolExecutor.wait();
  37.  
    }
  38.  
    }
  39.  
     
  40.  
    /**
  41.  
    * 這個就是測試用的線程
  42.  
    * @author yinwenjie
  43.  
    */
  44.  
    private static class TestRunnable implements Runnable {
  45.  
     
  46.  
    /**
  47.  
    * 日志
  48.  
    */
  49.  
    private static Log LOGGER = LogFactory.getLog(TestRunnable.class);
  50.  
     
  51.  
    /**
  52.  
    * 記錄任務的唯一編號,這樣在日志中好做識別
  53.  
    */
  54.  
    private Integer index;
  55.  
     
  56.  
    public TestRunnable(int index) {
  57.  
    this.index = index;
  58.  
    }
  59.  
     
  60.  
    /**
  61.  
    * @return the index
  62.  
    */
  63.  
    public Integer getIndex() {
  64.  
    return index;
  65.  
    }
  66.  
     
  67.  
    @Override
  68.  
    public void run() {
  69.  
    /*
  70.  
    * 線程中,就只做一件事情:
  71.  
    * 等待60秒鍾的事件,以便模擬業務操作過程
  72.  
    * */
  73.  
    Thread currentThread = Thread.currentThread();
  74.  
    TestRunnable.LOGGER.info( "線程:" + currentThread.getId() + " 中的任務(" + this.getIndex() + ")開始執行===");
  75.  
    synchronized (currentThread) {
  76.  
    try {
  77.  
    currentThread.wait( 60000);
  78.  
    } catch (InterruptedException e) {
  79.  
    TestRunnable.LOGGER.error(e.getMessage(), e);
  80.  
    }
  81.  
    }
  82.  
     
  83.  
    TestRunnable.LOGGER.info( "線程:" + currentThread.getId() + " 中的任務(" + this.getIndex() + ")執行完成");
  84.  
    }
  85.  
     
  86.  
    }
  87.  
    }

隨后的文章中我們將對線程池中的corePoolSize、maximumPoolSize、keepAliveTime、timeUnit、workQueue、threadFactory、handler參數和一些常用/不常用的設置項進行逐一講解。

3-2、ThreadPoolExecutor邏輯結構和工作方式

在上面的代碼中,我們創建線程池的時候使用了ThreadPoolExecutor中最簡單的一個構造函數:

  1.  
    public ThreadPoolExecutor(int corePoolSize,
  2.  
    int maximumPoolSize,
  3.  
    long keepAliveTime,
  4.  
    TimeUnit unit,
  5.  
    BlockingQueue<Runnable> workQueue)
  • 構造函數中需要傳入的參數包括corePoolSize、maximumPoolSize、keepAliveTime、timeUnit和workQueue。要明確理解這些參數(和后續將要介紹的參數)的含義,就首先要搞清楚ThreadPoolExecutor線程池的邏輯結構。

這里寫圖片描述

一定要注意一個概念,即存在於線程池中容器的一定是Thread對象,而不是您要求運行的任務(所以叫線程池而不叫任務池也不叫對象池,更不叫游泳池);您要求運行的任務將被線程池分配給某一個空閑的Thread運行。

從上圖中,我們可以看到構成線程池的幾個重要元素:

  • 等待隊列:顧名思義,就是您調用線程池對象的submit()方法或者execute()方法,要求線程池運行的任務(這些任務必須實現Runnable接口或者Callable接口)。但是出於某些原因線程池並沒有馬上運行這些任務,而是送入一個隊列等待執行(這些原因后文馬上講解)。

  • 核心線程:線程池主要用於執行任務的是“核心線程”,“核心線程”的數量是您創建線程時所設置的corePoolSize參數決定的。如果不進行特別的設定,線程池中始終會保持corePoolSize數量的線程數(不包括創建階段)。

  • 非核心線程:一旦任務數量過多(由等待隊列的特性決定),線程池將創建“非核心線程”臨時幫助運行任務。您設置的大於corePoolSize參數小於maximumPoolSize參數的部分,就是線程池可以臨時創建的“非核心線程”的最大數量。這種情況下如果某個線程沒有運行任何任務,在等待keepAliveTime時間后,這個線程將會被銷毀,直到線程池的線程數量重新達到corePoolSize。

  • 要重點理解上一條描述中黑體字部分的內容。也就是說,並不是所謂的“非核心線程”才會被回收;而是誰的空閑時間達到keepAliveTime這個閥值,就會被回收。直到線程池中線程數量等於corePoolSize為止。

  • maximumPoolSize參數也是當前線程池允許創建的最大線程數量。那么如果您設置的corePoolSize參數和您設置的maximumPoolSize參數一致時,線程池在任何情況下都不會回收空閑線程。keepAliveTime和timeUnit也就失去了意義。

  • keepAliveTime參數和timeUnit參數也是配合使用的。keepAliveTime參數指明等待時間的量化值,timeUnit指明量化值單位。例如keepAliveTime=1,timeUnit為TimeUnit.MINUTES,代表空閑線程的回收閥值為1分鍾。

說完了線程池的邏輯結構,下面我們討論一下線程池是怎樣處理某一個運行任務的。下圖描述了一個完整的任務處理過程:

這里寫圖片描述

1、首先您可以通過線程池提供的submit()方法或者execute()方法,要求線程池執行某個任務。線程池收到這個要求執行的任務后,會有幾種處理情況:

1.1、如果當前線程池中運行的線程數量還沒有達到corePoolSize大小時,線程池會創建一個新的線程運行您的任務,無論之前已經創建的線程是否處於空閑狀態。

1.2、如果當前線程池中運行的線程數量已經達到設置的corePoolSize大小,線程池會把您的這個任務加入到等待隊列中。直到某一個的線程空閑了,線程池會根據您設置的等待隊列規則,從隊列中取出一個新的任務執行。

1.3、如果根據隊列規則,這個任務無法加入等待隊列。這時線程池就會創建一個“非核心線程”直接運行這個任務。注意,如果這種情況下任務執行成功,那么當前線程池中的線程數量一定大於corePoolSize。

1.4、如果這個任務,無法被“核心線程”直接執行,又無法加入等待隊列,又無法創建“非核心線程”直接執行,且您沒有為線程池設置RejectedExecutionHandler。這時線程池會拋出RejectedExecutionException異常,即線程池拒絕接受這個任務。(實際上拋出RejectedExecutionException異常的操作,是ThreadPoolExecutor線程池中一個默認的RejectedExecutionHandler實現:AbortPolicy,這在后文會提到)

2、一旦線程池中某個線程完成了任務的執行,它就會試圖到任務等待隊列中拿去下一個等待任務(所有的等待任務都實現了BlockingQueue接口,按照接口字面上的理解,這是一個可阻塞的隊列接口),它會調用等待隊列的poll()方法,並停留在哪里。

3、當線程池中的線程超過您設置的corePoolSize參數,說明當前線程池中有所謂的“非核心線程”。那么當某個線程處理完任務后,如果等待keepAliveTime時間后仍然沒有新的任務分配給它,那么這個線程將會被回收。線程池回收線程時,對所謂的“核心線程”和“非核心線程”是一視同仁的,直到線程池中線程的數量等於您設置的corePoolSize參數時,回收過程才會停止。

3-3、不常用的設置

在ThreadPoolExecutor線程池中,有一些不常用的設置。我建議如果您在應用場景中沒有特殊的要求,就不需要使用這些設置:

3-3-1、 allowCoreThreadTimeOut:

前文我們討論到,線程池回收線程只會發生在當前線程池中線程數量大於corePoolSize參數的時候;當線程池中線程數量小於等於corePoolSize參數的時候,回收過程就會停止。

allowCoreThreadTimeOut設置項可以要求線程池:將包括“核心線程”在內的,沒有任務分配的任何線程,在等待keepAliveTime時間后全部進行回收:

  1.  
    ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor( 5, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>( 1));
  2.  
     
  3.  
    poolExecutor.allowCoreThreadTimeOut( true);

以下是設置前的效果:

這里寫圖片描述

以下是設置后的效果:

這里寫圖片描述

3-3-2 prestartAllCoreThreads

前文我們還討論到,當線程池中的線程還沒有達到您設置的corePoolSize參數值的時候,如果有新的任務到來,線程池將創建新的線程運行這個任務,無論之前已經創建的線程是否處於空閑狀態。這個描述可以用下面的示意圖表示出來:

這里寫圖片描述

prestartAllCoreThreads設置項,可以在線程池創建,但還沒有接收到任何任務的情況下,先行創建符合corePoolSize參數值的線程數:

1、概述

我將講解JAVA原生線程池的基本使用,並由此延伸出JAVA中和線程管理相關的類結構體系,然后我們詳細描述JAVA原生線程池的結構和工作方式

2、為什么要使用線程池

這里寫圖片描述

前文我們已經講到,線程是一個操作系統概念。操作系統負責這個線程的創建、掛起、運行、阻塞和終結操作。而操作系統創建線程、切換線程狀態、終結線程都要進行CPU調度。

另一方面,目前大多數生產環境我們所面臨問題的技術背景一般是:處理某一次請求的時間是非常短暫的,但是請求數量是巨大的。這種技術背景下,如果我們為每一個請求都單獨創建一個線程,那么物理機的所有資源基本上都被操作系統創建線程、切換線程狀態、銷毀線程這些操作所占用,用於業務請求處理的資源反而減少了。所以最理想的處理方式是,將處理請求的線程數量控制在一個范圍,既保證后續的請求不會等待太長時間,又保證物理機將足夠的資源用於請求處理本身。

另外,一些操作系統是有最大線程數量限制的。當運行的線程數量逼近這個值的時候,操作系統會變得不穩定。這也是我們要限制線程數量的原因。

3、線程池的基本使用方式

JAVA語言為我們提供了兩種基礎線程池的選擇:ScheduledThreadPoolExecutor和ThreadPoolExecutor。它們都實現了ExecutorService接口(注意,ExecutorService接口本身和“線程池”並沒有直接關系,它的定義更接近“執行器”,而“使用線程管理的方式進行實現”只是其中的一種實現方式)。這篇文章中,我們主要圍繞ThreadPoolExecutor類進行講解。

3-1、簡單使用

首先我們來看看ThreadPoolExecutor類的最簡單使用方式:

  1.  
    package test.thread.pool;
  2.  
     
  3.  
    import java.util.concurrent.SynchronousQueue;
  4.  
    import java.util.concurrent.ThreadPoolExecutor;
  5.  
    import java.util.concurrent.TimeUnit;
  6.  
     
  7.  
    import org.apache.commons.logging.Log;
  8.  
    import org.apache.commons.logging.LogFactory;
  9.  
    import org.apache.log4j.BasicConfigurator;
  10.  
     
  11.  
    public class PoolThreadSimple {
  12.  
     
  13.  
    static {
  14.  
    BasicConfigurator.configure();
  15.  
    }
  16.  
     
  17.  
    public static void main(String[] args) throws Throwable {
  18.  
     
  19.  
    /*
  20.  
    * corePoolSize:核心大小,線程池初始化的時候,就會有這么大
  21.  
    * maximumPoolSize:線程池最大線程數
  22.  
    * keepAliveTime:如果當前線程池中線程數大於corePoolSize。
  23.  
    * 多余的線程,在等待keepAliveTime時間后如果還沒有新的線程任務指派給它,它就會被回收
  24.  
    *
  25.  
    * unit:等待時間keepAliveTime的單位
  26.  
    *
  27.  
    * workQueue:等待隊列。這個對象的設置是本文將重點介紹的內容
  28.  
    * */
  29.  
    ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor( 5, 10, 1, TimeUnit.MINUTES, new SynchronousQueue<Runnable>());
  30.  
    for( int index = 0 ; index < 10 ; index ++) {
  31.  
    poolExecutor.submit( new PoolThreadSimple.TestRunnable(index));
  32.  
    }
  33.  
     
  34.  
    // 沒有特殊含義,只是為了保證main線程不會退出
  35.  
    synchronized (poolExecutor) {
  36.  
    poolExecutor.wait();
  37.  
    }
  38.  
    }
  39.  
     
  40.  
    /**
  41.  
    * 這個就是測試用的線程
  42.  
    * @author yinwenjie
  43.  
    */
  44.  
    private static class TestRunnable implements Runnable {
  45.  
     
  46.  
    /**
  47.  
    * 日志
  48.  
    */
  49.  
    private static Log LOGGER = LogFactory.getLog(TestRunnable.class);
  50.  
     
  51.  
    /**
  52.  
    * 記錄任務的唯一編號,這樣在日志中好做識別
  53.  
    */
  54.  
    private Integer index;
  55.  
     
  56.  
    public TestRunnable(int index) {
  57.  
    this.index = index;
  58.  
    }
  59.  
     
  60.  
    /**
  61.  
    * @return the index
  62.  
    */
  63.  
    public Integer getIndex() {
  64.  
    return index;
  65.  
    }
  66.  
     
  67.  
    @Override
  68.  
    public void run() {
  69.  
    /*
  70.  
    * 線程中,就只做一件事情:
  71.  
    * 等待60秒鍾的事件,以便模擬業務操作過程
  72.  
    * */
  73.  
    Thread currentThread = Thread.currentThread();
  74.  
    TestRunnable.LOGGER.info( "線程:" + currentThread.getId() + " 中的任務(" + this.getIndex() + ")開始執行===");
  75.  
    synchronized (currentThread) {
  76.  
    try {
  77.  
    currentThread.wait( 60000);
  78.  
    } catch (InterruptedException e) {
  79.  
    TestRunnable.LOGGER.error(e.getMessage(), e);
  80.  
    }
  81.  
    }
  82.  
     
  83.  
    TestRunnable.LOGGER.info( "線程:" + currentThread.getId() + " 中的任務(" + this.getIndex() + ")執行完成");
  84.  
    }
  85.  
     
  86.  
    }
  87.  
    }

隨后的文章中我們將對線程池中的corePoolSize、maximumPoolSize、keepAliveTime、timeUnit、workQueue、threadFactory、handler參數和一些常用/不常用的設置項進行逐一講解。

3-2、ThreadPoolExecutor邏輯結構和工作方式

在上面的代碼中,我們創建線程池的時候使用了ThreadPoolExecutor中最簡單的一個構造函數:

  1.  
    public ThreadPoolExecutor(int corePoolSize,
  2.  
    int maximumPoolSize,
  3.  
    long keepAliveTime,
  4.  
    TimeUnit unit,
  5.  
    BlockingQueue<Runnable> workQueue)
  • 構造函數中需要傳入的參數包括corePoolSize、maximumPoolSize、keepAliveTime、timeUnit和workQueue。要明確理解這些參數(和后續將要介紹的參數)的含義,就首先要搞清楚ThreadPoolExecutor線程池的邏輯結構。

這里寫圖片描述

一定要注意一個概念,即存在於線程池中容器的一定是Thread對象,而不是您要求運行的任務(所以叫線程池而不叫任務池也不叫對象池,更不叫游泳池);您要求運行的任務將被線程池分配給某一個空閑的Thread運行。

從上圖中,我們可以看到構成線程池的幾個重要元素:

  • 等待隊列:顧名思義,就是您調用線程池對象的submit()方法或者execute()方法,要求線程池運行的任務(這些任務必須實現Runnable接口或者Callable接口)。但是出於某些原因線程池並沒有馬上運行這些任務,而是送入一個隊列等待執行(這些原因后文馬上講解)。

  • 核心線程:線程池主要用於執行任務的是“核心線程”,“核心線程”的數量是您創建線程時所設置的corePoolSize參數決定的。如果不進行特別的設定,線程池中始終會保持corePoolSize數量的線程數(不包括創建階段)。

  • 非核心線程:一旦任務數量過多(由等待隊列的特性決定),線程池將創建“非核心線程”臨時幫助運行任務。您設置的大於corePoolSize參數小於maximumPoolSize參數的部分,就是線程池可以臨時創建的“非核心線程”的最大數量。這種情況下如果某個線程沒有運行任何任務,在等待keepAliveTime時間后,這個線程將會被銷毀,直到線程池的線程數量重新達到corePoolSize。

  • 要重點理解上一條描述中黑體字部分的內容。也就是說,並不是所謂的“非核心線程”才會被回收;而是誰的空閑時間達到keepAliveTime這個閥值,就會被回收。直到線程池中線程數量等於corePoolSize為止。

  • maximumPoolSize參數也是當前線程池允許創建的最大線程數量。那么如果您設置的corePoolSize參數和您設置的maximumPoolSize參數一致時,線程池在任何情況下都不會回收空閑線程。keepAliveTime和timeUnit也就失去了意義。

  • keepAliveTime參數和timeUnit參數也是配合使用的。keepAliveTime參數指明等待時間的量化值,timeUnit指明量化值單位。例如keepAliveTime=1,timeUnit為TimeUnit.MINUTES,代表空閑線程的回收閥值為1分鍾。

說完了線程池的邏輯結構,下面我們討論一下線程池是怎樣處理某一個運行任務的。下圖描述了一個完整的任務處理過程:

這里寫圖片描述

1、首先您可以通過線程池提供的submit()方法或者execute()方法,要求線程池執行某個任務。線程池收到這個要求執行的任務后,會有幾種處理情況:

1.1、如果當前線程池中運行的線程數量還沒有達到corePoolSize大小時,線程池會創建一個新的線程運行您的任務,無論之前已經創建的線程是否處於空閑狀態。

1.2、如果當前線程池中運行的線程數量已經達到設置的corePoolSize大小,線程池會把您的這個任務加入到等待隊列中。直到某一個的線程空閑了,線程池會根據您設置的等待隊列規則,從隊列中取出一個新的任務執行。

1.3、如果根據隊列規則,這個任務無法加入等待隊列。這時線程池就會創建一個“非核心線程”直接運行這個任務。注意,如果這種情況下任務執行成功,那么當前線程池中的線程數量一定大於corePoolSize。

1.4、如果這個任務,無法被“核心線程”直接執行,又無法加入等待隊列,又無法創建“非核心線程”直接執行,且您沒有為線程池設置RejectedExecutionHandler。這時線程池會拋出RejectedExecutionException異常,即線程池拒絕接受這個任務。(實際上拋出RejectedExecutionException異常的操作,是ThreadPoolExecutor線程池中一個默認的RejectedExecutionHandler實現:AbortPolicy,這在后文會提到)

2、一旦線程池中某個線程完成了任務的執行,它就會試圖到任務等待隊列中拿去下一個等待任務(所有的等待任務都實現了BlockingQueue接口,按照接口字面上的理解,這是一個可阻塞的隊列接口),它會調用等待隊列的poll()方法,並停留在哪里。

3、當線程池中的線程超過您設置的corePoolSize參數,說明當前線程池中有所謂的“非核心線程”。那么當某個線程處理完任務后,如果等待keepAliveTime時間后仍然沒有新的任務分配給它,那么這個線程將會被回收。線程池回收線程時,對所謂的“核心線程”和“非核心線程”是一視同仁的,直到線程池中線程的數量等於您設置的corePoolSize參數時,回收過程才會停止。

3-3、不常用的設置

在ThreadPoolExecutor線程池中,有一些不常用的設置。我建議如果您在應用場景中沒有特殊的要求,就不需要使用這些設置:

3-3-1、 allowCoreThreadTimeOut:

前文我們討論到,線程池回收線程只會發生在當前線程池中線程數量大於corePoolSize參數的時候;當線程池中線程數量小於等於corePoolSize參數的時候,回收過程就會停止。

allowCoreThreadTimeOut設置項可以要求線程池:將包括“核心線程”在內的,沒有任務分配的任何線程,在等待keepAliveTime時間后全部進行回收:

  1.  
    ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor( 5, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>( 1));
  2.  
     
  3.  
    poolExecutor.allowCoreThreadTimeOut( true);
  • 1
  • 2
  • 3

以下是設置前的效果:

這里寫圖片描述

以下是設置后的效果:

這里寫圖片描述

3-3-2 prestartAllCoreThreads

前文我們還討論到,當線程池中的線程還沒有達到您設置的corePoolSize參數值的時候,如果有新的任務到來,線程池將創建新的線程運行這個任務,無論之前已經創建的線程是否處於空閑狀態。這個描述可以用下面的示意圖表示出來:

這里寫圖片描述

prestartAllCoreThreads設置項,可以在線程池創建,但還沒有接收到任何任務的情況下,先行創建符合corePoolSize參數值的線程數:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM