Executor線程池只看這一篇就夠了


線程池為線程生命周期的開銷和資源不足問題提供了解決方 案。通過對多個任務重用線程,線程創建的開銷被分攤到了多個任務上。

線程實現方式

Thread、Runnable、Callable

//實現Runnable接口的類將被Thread執行,表示一個基本任務
public interface Runnable {
    //run方法就是它所有內容,就是實際執行的任務
    public abstract void run();
}
//Callable同樣是任務,與Runnable接口的區別在於它接口泛型,同時它執行任務候帶有返回值;
//Callable的使用通過外層封裝成Future來使用
public interface Callable<V> {
    //相對於run方法,call方法帶有返回值
    V call() throws Exception;
}

注意:啟動Thread線程只能用start(JNI方法)來啟動,start方法通知虛擬機,虛擬機通過調用器映射到底層操作系統,通過操作系統來創建線程來執行當前任務的run方法

Executor框架

Executor接口是線程池框架中最基礎的部分,定義了一個用於執行Runnable的execute方法。
從圖中可以看出Exectuor下有一個重要的子接口ExecutorService,其中定義了線程池的具體行為:

  • execute(Runnable runnable):執行Runnable類型的任務
  • submit(task):用來提交Callable或者Runnable任務,並返回代表此任務的Future對象
  • shutdown():在完成已經提交的任務后封閉辦事,不在接管新的任務
  • shutdownNow():停止所有正在履行的任務並封閉辦事
  • isTerminated():是一個鈎子函數,測試是否所有任務都履行完畢了
  • isShutdown():是一個鈎子函數,測試是否該ExecutorService是否被關閉
    Executor框架

ExecutorService中的重點屬性:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

ctl:對線程池的運行狀態和線程池中有效線程的數量進行控制的一個字段,它包含兩部分信息:線程池的運行狀態(runState)和線程池內有效線程的數量(workerCount)。

這里可以看到,使用Integer類型來保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY 就是1左移29位減1(29個1),這個常量表示workerCount的上限值,大約是5億。

ctl相關方法:

//獲取運行狀態
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//獲取活動線程數
private static int workerCountOf(int c)  { return c & CAPACITY; }
//獲取運行狀態和活動線程數的值
private static int ctlOf(int rs, int wc) { return rs | wc; }

線程池的狀態:

RUNNING = ­1 << COUNT_BITS; //高3位為111
SHUTDOWN = 0 << COUNT_BITS; //高3位為000
STOP = 1 << COUNT_BITS; //高3位為001
TIDYING = 2 << COUNT_BITS; //高3位為010
TERMINATED = 3 << COUNT_BITS; //高3位為011

線程池運行狀態扭轉

1、RUNNING

  • 狀態說明:線程池處於RUNNING狀態,能夠接收新任務,以及對已添加的任務進行處理。
  • 狀態切換:線程池的初始化狀態是RUNNING。換句話說,線程池一旦被創建,就處於RUNNING狀態,並且線程池中的任務數為0。

2、SHUTDOWN

  • 狀態說明:線程池處於SHUTDOWN狀態,不接收新任務,能夠處理已經添加的任務。
  • 狀態切換:調用shutdown()方法時,線程池由RUNNING -> SHUTDOWN。

3、STOP

  • 狀態說明:線程池處於STOP狀態,不接收新任務,不處理已提交的任務,並且會中斷正在處理的任務。
  • 狀態切換:調用線程池中的shutdownNow()方法時,線程池由(RUNNING or SHUTDOWN) -> STOP。

4、TIDYING

  • 狀態說明:當所有的任務已經停止,ctl記錄“任務數量”為0,線程池會變為TIDYING狀態。當線程池處於TIDYING狀態時,會執行鈎子函數 terminated()。 terminated()在ThreadPoolExecutor類中是空, 的,若用戶想在線程池變為TIDYING時,進行相應處理,可以通過重載 terminated()函數來實現。
  • 狀態切換:當線程池在SHUTDOWN狀態下,阻塞隊列為空並且線程池中執行任務也為空時,就會由SHUTDOWN -> TIDYING。當線程池在STOP狀態下,線程池中執行的任務為空時,就會由STOP-> TIDYING。

5、TERMINATED

  • 狀態說明:線程池線程池徹底停止,線程池處於TERMINATED狀態,
  • 狀態切換:線程池處於TIDYING狀態時,執行完terminated()之后, 就會由TIDYING->TERMINATED。

線程池使用

RunTask類:

public class RunTask implements Runnable {
    public void run() {
        System.out.println("Thread name:"+Thread.currentThread().getName());
    }
}

ExecutorSample類:

public class ExecutorSample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i=0;i<20;i++){
            //提交任務無返回值
            executor.execute(new RunTask());
            //任務執行完成后有返回值
            Future<Object> future = executor.submit(new RunTask());
        }
    }
}

線程池的具體使用:

  • ThreadPoolExecutor 默認線程池
  • ScheduledThreadPoolExecutor 定時線程池

ThreadPoolExecutor

線程池的創建:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}
  • corePoolSize:線程池中的核心線程數。當提交一個任務時,線程池創建一個新線程執行任務,直到當前線程數等於corePoolSize;如果當前線程數為corePoolSize,繼續提交的任務被保存到阻塞隊列中,等待被執行;如果執行了線程池的prestartAllCoreThreads()方法,線程池會提前創建並啟動所有核心線程。
  • maximumPoolSize:線程池中允許的最大線程數。如果當前阻塞隊列滿了,且繼續提交任務,則創建新的線程執行任務,前提是當前線程數小於maximumPoolSize。
  • keepAliveTime:線程池維護線程所允許的空閑時間。當線程池中的線程數量大於corePoolSize時候,如果這時候沒有新的任務提交,核心線程外的線程不會立即被銷毀,而是會等待,直到等待的時間超過了keepAliveTime
    unit:keepAliveTime的單位時間
  • workQueue:用於保存等待被執行的任務的阻塞隊列,且任務必須實現Runnable接口,在JDK中提供了如下阻塞隊列:
    ArrayBlockingQueue:基於數組結構的有界阻塞隊列,按FIFO排序任務。
    LinkedBlockingQueue:基於鏈表結構的阻塞隊列,按FIFO排序任務,吞吐量通常要高於ArrayBlockingQueue。
    SynchronousQueue:一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常高於LinkedBlockingQueue。
  • PriorityBlockingQueue:具有優先級的無界阻塞隊列。
  • threadFactory:ThreadFactory 類型的變量,用來創建新線程。默認使用ThreadFactory.defaultThreadFactory來創建線程, 會使新創建線程具有相同的NORM_PRIORITY優先級並且都是非守護線程,同時也設置了線程名稱。
  • handler:線程池的飽和策略。當阻塞隊列滿了,且沒有空閑的工作隊列,如果繼續提交任務,必須采用一種策略處理該任務.

線程池的監控:

public long getTaskCount() //線程池已執行與未執行的任務總數
public long getCompletedTaskCount() //已完成的任務數
public int getPoolSize() //線程池當前的線程數
public int getActiveCount() //線程池中正在執行任務的線程數量

線程池的原理

線程池執行示意圖

  • 如果當前運行的線程少於corePoolSize,則創建新線程來執行任務(注意這一個步驟需要獲取全局鎖)。
  • 如果運行的線程等於或多於corePoolSize,則將任務加入BlockingQueue。
  • 如果無法將任務加入BlockingQueue(隊列已滿),則創建新的線程來處理任務(注意這一個步驟需要獲取全局鎖)。
  • 如果創建的新線程將使當前運行的線程超出maximumPoolSize,任務將被執行飽和策略。
    ThreadPoolExecutor 采用上述的設計思路,是為執行execute()方法時,盡可能避免獲取全局鎖(一個嚴重的可伸縮瓶頸)。在ThreadPoolExecutor完成預熱之后,幾乎所有的execute()方法調用都是在執行步驟2,而步驟2不需要獲取全局鎖。

還沒關注我的公眾號?

  • 掃文末二維碼關注公眾號【小強的進階之路】可領取如下:
  • 學習資料: 1T視頻教程:涵蓋Javaweb前后端教學視頻、機器學習/人工智能教學視頻、Linux系統教程視頻、雅思考試視頻教程;
  • 100多本書:包含C/C++、Java、Python三門編程語言的經典必看圖書、LeetCode題解大全;
  • 軟件工具:幾乎包括你在編程道路上的可能會用到的大部分軟件;
  • 項目源碼:20個JavaWeb項目源碼。
    小強的進階之路二維碼


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM