線程池的基本概念


線程池,是一種線程的使用模式,它為了降低線程使用中頻繁的創建和銷毀所帶來的資源消耗與代價。
通過創建一定數量的線程,讓他們時刻准備就緒等待新任務的到達,而任務執行結束之后再重新回來繼續待命。

這就是線程池最核心的設計思路,「復用線程,平攤線程的創建與銷毀的開銷代價」。

相比於來一個任務創建一個線程的方式,使用線程池的優勢體現在如下幾點:

  1. 避免了線程的重復創建與開銷帶來的資源消耗代價
  2. 提升了任務響應速度,任務來了直接選一個線程執行而無需等待線程的創建
  3. 線程的統一分配和管理,也方便統一的監控和調優

線程池的實現天生就實現了異步任務接口,允許你提交多個任務到線程池,線程池負責選用線程執行任務調度。

異步任務在上一篇文章中已經做過一點鋪墊介紹,那么本篇就在前一篇的基礎上深入的去探討一下異步任務與線程池的相關內容。

基本介紹

在正式介紹線程池相關概念之前,我們先看一張線程池相關接口的類圖結構,網上盜來的,但畫的還是很全面的。

線程池相關類圖

右上角的幾個接口可以先不看,等我們介紹到組合任務的時候會繼續說的,我們看左邊,Executor、ExecutorService 以及 AbstractExecutorService 都是我們熟悉的,它們抽象了任務執行者的基本模型。

ThreadPoolExecutor 是對線程池概念的抽象,它天生實現了任務執行的相關接口,也就是說,線程池也是一個任務的執行者,允許你向其中提交多個任務,線程池將負責分配線程與調度任務。

至於 Schedule 線程池,它是擴展了基礎的線程池實現,提供「計划調度」能力,定時調度任務,延時執行等。

線程池基本原理

ThreadPoolExecutor 的創建並不復雜,直接 new 就好,只不過構造函數有好久個重載,我們直接看最底層的那個,也就是參數最多的那個。

public ThreadPoolExecutor
(   int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler
)

創建一個線程池需要傳這么多參數?是不是覺得有點喪心病狂?

不要擔心,我說了,這是最復雜的一個構造函數重載,需要傳入最全面的構造參數。而你日常使用時,當然可以使用 ThreadPoolExecutor 中的其他較為簡便的構造函數,只不過有些你沒傳的參數將配置為默認值而已。

下面我們將從這些參數的含義出發,看看線程池 ThreadPoolExecutor 具備一個怎樣的構成結構。

1、線程池容量問題

構造函數中有這么幾個參數是用於配置線程池中線程容量與生命周期的:

  • corePoolSize
  • maximumPoolSize
  • keepAliveTime

corePoolSize 指定了線程池中的核心線程的個數,核心線程就是永遠不會被銷毀的線程,一旦被創建出來就將永遠存活在線程池之中。

maximumPoolSize 指定了線程池能夠創建的最大線程數量。

keepAliveTime 是用於控制非核心線程最長空閑等待時間,如果一個非核心線程處理完任務后回到線程池待命,超過這個指定時長依然沒有新任務的分配將導致線程被銷毀。

2、任務阻塞問題

ThreadPoolExecutor 中有這么一個字段:

private final BlockingQueue workQueue;

這個隊列的作用很明顯,就是當線程池中的線程不夠用的時候,讓任務排隊,等待有線程空閑再來取任務去執行。

3、線程工廠

線程工廠 ThreadFactory 中只定義了一個方法 newThread,子類實現它並按照自己的需求創建一個線程返回。

例如 DefaultThreadFactory 實現的該方法將創建一個線程,名稱格式: pool-<線程池編號>-thread-<線程編號>,設置線程的優先級為標准優先級,非守護線程等。

4、任務拒絕策略

構造函數中還有一個參數 handle 是必須傳的,它將為 ThreadPoolExecutor 中的同名字段賦值。

private volatile RejectedExecutionHandler handler;

RejectedExecutionHandler 中定義了一個 rejectedExecution 用於描述一種任務拒絕策略。那么哪種情況下才會觸發該方法的調用呢?

當線程池中的所有線程全部分配出去工作了,並且任務阻塞隊列也阻塞滿了,那么此時新提交的任務將觸發任務拒絕策略

而拒絕策略主要有以下四個子類實現,而它們都是定義在 ThreadPoolExecutor 的內部類,我們看一看都是哪四種策略:

  • AbortPolicy
  • CallerRunsPolicy
  • DiscardOldestPolicy
  • DiscardPolicy

AbortPolicy 是默認的拒絕策略,他的實現就是直接拋出 RejectedExecutionException 異常。

CallerRunsPolicy 暫停當前提交任務的線程返回,自己去執行自己提交過來的任務。

DiscardOldestPolicy 策略將從阻塞任務隊列對頭移除一個任務並將自己排到隊列尾部等待調度執行。

DiscardPolicy 是一種佛系策略,方法體的實現為空,什么也不做,也即忽略當前任務的提交。

這樣,我們零零散散的對線程池的內部有了一個基本的認識,下面我們要把這些都串起來,看一看源碼。從一個任務的提交,到分配到線程執行任務,一整個過程的相關邏輯做一個探究。

看一看源碼

先來看一看任務的提交方法,submit

submit

之前的文章我們也說過,這個 submit 方法有四個重載,分別允許你傳入不同類型的任務,Runnable 或是 Callable。我們這里就以前者為例。

這個 RunnableFuture 類型我們之前說過,他只不過是同時繼承了 Runnable 和 Future 接口,象征性的描述了「這是一個可監控的任務」。

然后你會發現,整個 submit 的核心邏輯在 execute 方法里面,也就是說 execute 方法才是真正向線程池提交任務的方法。我們重點看一看這個 execute 方法。

先看看 ThreadPoolExecutor 中定義幾個重要的字段:

image

ctl 是一個原子變量,它用了一個 32 位的整型描述了兩個重要信息。當前線程池運行狀態(runState)和當前線程池中有效的線程個數(workCount)。

runState 占用高 3 比特位,workCount 占用低 29 比特位。

接着我們來看 execute 方法的實現:

image

紅框部分:

如果當前線程池中的實際工作線程數還未達到配置的核心線程數量,那么將調用 addWorker 為當前任務創建一個新線程並啟動執行。

addWorker 方法代碼還是有點多的,這里就截圖出來進行分析了,因為並不難,我們總結下該方法的邏輯:

  1. 死循環中判斷線程池狀態是否正常,如果不正常被關閉了等,將直接返回 false
  2. 如果正常則 CAS 嘗試為 workerCount 增加一,並創建一個新的線程調用 start 方法執行任務。

不知道你留意到 addWorker 方法的第二個參數了沒有,這個參數用於指定線程池的上界。

如果傳的是 true,則說明使用 corePoolSize 作為上界,也就是此次為任務分配線程如果線程池中所有的工作線程數達到這個 corePoolSize 則將拒絕分配並返回添加失敗。

如果傳的是 false,則使用 maximumPoolSize 作為上界,道理是一樣的。

藍框部分:

從紅框出來,你可以認為任務分配線程失敗了,大概率是所有正常工作的線程數達到核心線程數量了。這部分做的事情就是:

  1. 如果線程池狀態正常,就嘗試將當前任務添加到任務阻塞隊列上。
  2. 再一次檢查線程池狀態,如果異常了,將撤回剛才添加的任務並根據我們設定的拒絕策略予以拒絕。
  3. 如果發現線程池自上次檢查后,所喲線程全部死亡,那么將創建一個空閑線程,適當的時候他會去從任務隊列取我們剛剛添加的任務的

黃框部分:

到達黃色部分必然說明線程池狀態異常或是隊列添加失敗,大概率是因為隊列滿了無法再添加了。

此時再次調用 addWorker 方法,不過這次傳入 false,意思是,我知道所有的核心線程都在忙並且任務隊列也排滿了,那么你就額外創建一個非核心線程來執行我的任務吧。

如果失敗了,執行拒絕策略。

我們總結一下任務的提交到分配線程,甚至阻塞到任務隊列這一系列過程:

一個任務過來,如果線程池中的線程數不足我們配置的核心線程數,那么會嘗試創建新線程來執行任務,否則會優先把任務往阻塞隊列上添加

如果阻塞隊列上滿員了,那么說明當前線程池中核心線程工作量有點大,將開始創建非核心線程共同執行任務,直到達到上限或是阻塞隊列不再滿員。

到這里呢,我們對於任務的提交與線程分配已經有了一個基本的認識了,相信你也一定好奇當一個線程的任務執行結束之后,他是如何去取下一個任務的。

這部分我們也來分析分析

線程池的內部定義了一個 Worker 內部類,這個類有兩個字段,一個用於保存當前的任務,一個用於保存用於執行該任務的線程。

addWorker 中會調用線程的 start 方法,進而會執行 Worker 實例的 run 方法,這個 run 方法是這樣的:

public void run() {
    runWorker(this);
}

runWorker 很長,就不截出來一點點分析了,我總結下他的實現邏輯:

  1. 如果自己內部的任務是空,則嘗試從阻塞隊列上獲取一個任務
  2. 執行任務
  3. 循環的執行 1和2 兩個步驟,直到阻塞隊列中沒有任務可獲取
  4. 調用 processWorkerExit 方法移除當前線程在線程池中的引用,也就相當於銷毀了一個線程,因為不久后會被 GC 回收

但是這里有一個細節和大家說一下,第一個步驟從任務隊列中取一個任務調用的是 getTask 方法。

這個方法設定了一個邏輯,如果線程池中正在工作的線程數大於設定的核心線程數,也就是說線程池中存在非核心線程,那么當前線程獲取任務時,如果超過指定時長依然沒有獲取,就將返回跳過循環執行我們 runWorker 的第四個步驟,移除對該線程的引用。

反之,如果此時有效工作線程數少於規定的核心線程數,則認定當前線程是一個核心線程,於是對於獲取任務失敗的處理是「阻塞到條件隊列上,等待其他線程喚醒」。

什么時候喚醒也很容易想到了,就是當任務隊列有新任務添加時,會喚醒所有的核心線程,他們會去隊列上取任務,沒搶到的依然回去阻塞。

至此,線程池相關的內容介紹完畢,有些方法的實現我只是總結了大概的邏輯,具體的尤待你們自己去探究,有問題也歡迎你和我討論。

關注公眾不迷路,一個愛分享的程序員。

公眾號回復「1024」加作者微信一起探討學習!

每篇文章用到的所有案例代碼素材都會上傳我個人 github

https://github.com/SingleYam/overview_java

歡迎來踩!

YangAM 公眾號


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM