Java線程池ThreadPoolExector的源碼分析


 前言:線程是我們在學習java過程中非常重要的也是繞不開的一個知識點,它的重要程度可以說是java的核心之一,線程具有不可輕視的作用,對於我們提高程序的運行效率、壓榨CPU處理能力、多條線路同時運行等都是強有力的殺手鐧工具。線程是如此的重要,那么我們來思考這樣一個問題。假設我們有一個高並發,多線程的項目,多條線程在運行的時候,來一個任務我們new一個線程,任務結束了,再把它銷毀結束,這樣看似沒有問題,適合於低並發的場景,可是當我們的項目投入到生產環境,一下涌入千條任務的時候,線程不斷的new執行,JVM不斷的回收,這樣重復這個過程,並且任務過多,一下子new不過來,任務就會有遺漏,可能發生這樣的情況,任務執行了但是卻沒有返回結果,並且很容易發生宕機。這是我們絕對不願意看到的場景。所以,我們為什么不思考一下,然后用一個管理池把所有的線程管理起來呢?那么線程池就應運而生了。本期的博客就來聚焦線程池的源碼,走進線程池的內部,看看它究竟是如何工作的,使用線程池到底能帶給我們什么好處?我們為什么要使用它?

本篇博客的目錄:

一:線程池的介紹

二:使用線程池的好處

三:線程池的分類

四:ThreadPooolExector類的實現源碼分析

五:關於線程池的總結

一:線程池的介紹

  1: 前言中我們已經說明了單線程創建處理任務的時候的場景的弊端,線程池的出現,就是專門為了解決這一問題應運而生的,它解決了由於線程創建太過頻繁而發生的性能損耗,采用了線程處理完任務而繼續保持留在線程池內的機制,而勉去重新創建新線程的這一特別耗費時間和資源的開銷。線程池的定義如下:線程池是指管理一組工作線程的資源池,線程池是與工作隊列密切相關的,其中在工作隊列中宏保存了所有等待執行的任務,從工作隊列中獲取一個任務,執行任務,然后在線程池中並等待一下個任務。

2:線程池的特點:線程池是專門用來管理線程的,首先呢,它本身創建出來的時候,里面是沒有線程的。只有當有任務進來的時候,它才會去創建線程去處理任務線程,這時候線程池的大小就處於一個增長的過程。在這里注意,有兩個線程的概念,一個是工作線程,一個是任務線程。當然我們建立一個池,不能讓它無限制的增長,因此它具有以下幾個屬性:coreSize、maxiumSize、BlockingQueue,其中coreSize我們暫且先翻譯為核心池大小,它是一個int的數值類型,表示的是創建線程的個數,這是一個臨界值,當線程池中的數量超過coreSize個大小的時候,它就會將任務放在阻塞隊列中,等到線程中的數目降到coreSize大小之下,然后它會從隊列中取,繼續執行任務。當然也會有任務數量比較多的情況,這時候就會創建maxiumSize個線程數了,注意:maxiumSize是它的最大值,超過這個數,線程池就要對其進行采取拒絕策略了。

我們用圖來表示一下這個過程:

二:使用線程池的好處

我總結了一下,主要有以下幾點:

1:線程統一管理,線程池具有創建線程和銷毀線程的能力,線程集中在一起比起分散開來,更加便於管理

2:重用現有的線程而不是創建新線程,可以在處理多個請求的時候分攤線程創建和銷毀過程中產生的巨大開銷

3:當請求到達的時候,工作線程通常已經存在,因此不會由於等待創建線程而延遲任務的執行,從而提高了響應性

4:可以創建足夠多的線程便於處理器保持忙碌狀態,而對這些足夠多多線程數量又進行了限制,會防止其溢出,耗盡處理器內存

5:可以防止多線程相互爭奪資源而使應用程序而產生的並發問題

三:線程池的分類

    從jdk1.5開始中,api就提供了靈活的線程池以及一些配置,我們可以調用Executors中的靜態工廠方法來創建一個線程池,Executors是一個接口,它的子類是ExecutorService,然后有一個具體的實現類:ThreadPoolExector.關於線程池,它有以下幾種不同的分類:

1:newFixedThreadPool

     Fixed這個詞的意思是固定不變的,newFixedThreadPool將創建一個固定長度的線程池,每當提交一個任務時就創建一個線程,直到達到線程池的數量,這時線程池的大小將不會變化。如果線程在執行的過程中發生異常,這個時候線程池就會補充一個線程

2: newCachedThreadPool

  newCachedThreadPool將會創建一個可緩存的線程池,如果線程池的當前數量超過了線程池,那么線程池將回收空閑的線程,如果任務很多的時候,它將會繼續創建線程而不會存在任何的限制。

   3: newSingleThreadExector

newSingleThreadExector將只會創建一個線程來執行任務,這點有點類似於node.js,采用單線程的機制,如果這個線程異常結束,那么會再重新創建一個線程,它始終保證在線程池里的只有一個線程,這樣就可以保證任務在隊列總是按照預定的順序執行。

    4: newScheduledThreadPool

newScheduledThreadPool創建一個固定長度的線程池,而且是以延遲或者定時的方式來執行任務,這點有點類似於Timer類

 

四:ThreadPooolExector類的實現源碼分析

1:為什么要講ThreadPoolExector類?

Exector是ThreadPoolExector的祖父類接口,ThreadPoolExector的直接父類接口是ExectorService,而我們所講的第三點,其中的不同線程池的分類其實都是Exector中的方法,而在ThreadPoollExector中得到了實現,所以我們要構建的不同種類的線程池主要還是依賴這個類完成,接下來我們就聚焦ThreadPoolExector來看其具體的實現方法

2:ThreadPoolExector的源碼講解

2.1:同樣的,它是一個類,首先我們來看它有哪些全局變量

   private final BlockingQueue<Runnable> workQueue;//存放線程的隊列。這里是一個阻塞隊列

    
    private final ReentrantLock mainLock = new ReentrantLock();//維持一個可重入鎖

    
    private final Condition termination = mainLock.newCondition();//獲取鎖的狀態

    
    private final HashSet<Worker> workers = new HashSet<Worker>();//存放Worker類的集合

    
    private volatile long  keepAliveTime;//保持存活的時間

    
    private volatile boolean allowCoreThreadTimeOut;//是否允許核心線程超時

    
    private volatile int   corePoolSize;//核心池大小

    
    private volatile int   maximumPoolSize;//最大線程池的大小
    
 
    private volatile int   poolSize;//池的大小

    
    private volatile RejectedExecutionHandler handler;//注入執行的處理器

    
    private volatile ThreadFactory threadFactory;//線程工廠,主要是用來生產線程

    
    private int largestPoolSize;//最大線程大小

    
    private long completedTaskCount;//已執行完成的任務數量

    
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();                      //拒絕策略類

 

從中,我們也可以看出我們在線程池介紹中談到的關於coreSize和maxiumSize等參數,這些int值對線程池的中的線程池數量進行了限制,還有一些關於鎖ReentrantLock
 

的類,這是一個可重入鎖,它的主要目的是鎖住其操作,因為線程的操作要保證其原子性,防止沖突發生,所以在其源碼中很多都對其進行了上鎖操作。還有一個很重要的值的全局的變量state:

volatile int runState;//運行狀態
    static final int RUNNING    = 0;//0表示正在運行中
    static final int SHUTDOWN   = 1;//1表示關閉
    static final int STOP       = 2;//2表示停止
    static final int TERMINATED = 3;//3表示結束

這些狀態值是線程池目前所處環境的狀態的體現,它采用int數字來表現,記住這些值很重要,因為后面有很多方法調用線程池的運行狀態,有很多對其值進行判斷。采用volatile修飾,也保證其在線程池中是隨時保證對其他線程的可見,防止並發過程中未知異常的發生。

 

2.2:ThreadPoolExector的構造函數

     public ThreadPoolExecutor(int corePoolSize,                  //核心池大小
                              int maximumPoolSize,               //池的最大大小
                              long keepAliveTime,                //保持活動的時間
                              TimeUnit unit,        //時間單位
                              BlockingQueue<Runnable> workQueue) {         //阻塞隊列
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    
    public ThreadPoolExecutor(int corePoolSize,//核心池大小
                              int maximumPoolSize,//池的最大大小
                              long keepAliveTime,//保持存活的時間
                              TimeUnit unit,//時間單位
                              BlockingQueue<Runnable> workQueue,//阻塞隊列
                              ThreadFactory threadFactory) {//線程工廠
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,//調用其他構造函數
             threadFactory, defaultHandler);
    }

    
    public ThreadPoolExecutor(int corePoolSize,//核心池的大小
                              int maximumPoolSize,//最大池的大小
                              long keepAliveTime,//保持喚醒的時間
                              TimeUnit unit,//時間單元
                              BlockingQueue<Runnable> workQueue,//阻塞隊列
                              RejectedExecutionHandler handler) {//注入的執行處理器
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,//調用其他函數
             Executors.defaultThreadFactory(), handler);
    }

    
    public ThreadPoolExecutor(int corePoolSize,//核心池的大小
                              int maximumPoolSize,//最大池大小
                              long keepAliveTime,//保持存活的時間
                              TimeUnit unit,//時間單位
                              BlockingQueue<Runnable> workQueue,//阻塞隊列
                              ThreadFactory threadFactory,//線程工廠
                              RejectedExecutionHandler handler) {//注入的執行處理器
        if (corePoolSize < 0 ||                     //如果核心池的大小小於0
            maximumPoolSize <= 0 ||                //最大池大小小於0
            maximumPoolSize < corePoolSize ||       //最大池大小不小於核心池大小
            keepAliveTime < 0)  //保持喚醒的時間小於0
            throw new IllegalArgumentException();//拋出主題違法異常
        if (workQueue == null || threadFactory == null || handler == null)//如果隊列為null或者線程工廠為null或者處理器為null
            throw new NullPointerException();//拋出空指針異常
        this.corePoolSize = corePoolSize;//構造核心池大小
        this.maximumPoolSize = maximumPoolSize;//最大池的大小
        this.workQueue = workQueue;//工作隊列
        this.keepAliveTime = unit.toNanos(keepAliveTime);//以保持的存活時間構造存活時間
        this.threadFactory = threadFactory;//構造線程工廠
        this.handler = handler;//處理器
    }

可以看出ThreadPoolExector一共有四個構造函數,但是最后調用的都是最后一個,我們可以只看最后一個,它主要有核心池大小、最大池大小、存活時間、時間單位、阻塞隊列、線程工廠這幾個參數,其中又對其進行了值范圍的檢查,如果參數違法就拋出異常,然后構造進去。關於這幾個參數,隨着后面我們對其方法的講解,會理解越來越深刻的。

2.3:ThreadPool的主要方法講解

2.3.1:我們先來看一下最重要的方法,執行任務的方法:

    public void execute(Runnable command) {//執行方法
        if (command == null)//如果任務線程為null
            throw new NullPointerException();//拋出空指針異常
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {//如果線程池的大小大於核心池的大小或者線程池沒有保證在coreSize下
            if (runState == RUNNING && workQueue.offer(command)) {//如果運行狀態是正在運行並且阻塞隊列中把線程任務放進去
                if (runState != RUNNING || poolSize == 0)//如果運行狀態不是正在運行或者池的大小是0
                    ensureQueuedTaskHandled(command);//確保隊列中的任務被處理
            }
            else if (!addIfUnderMaximumPoolSize(command))//沒有在保證最大的線程池大小下添加線程任務
                reject(command); // 調用注入方法
        }
    }

這是執行任務的方法,其中可以看出當任務線程為null的時候,就拋出異常,然后如果池的大小大於核心池的大小,也就是說此時它要將任務放在阻塞隊列中,這個時候它采用的是BlockingQueue的offer方法,將任務放進去,同時保證狀態是正在運行。因為線程池的狀態隨時可能被修改,所以處處得判斷。添加進去以后,如果狀態不是在運行中,或者池里沒有線程,就調用ensureQueueTaskHandled處理任務,我們來看看這個方法的源代碼:

    private void ensureQueuedTaskHandled(Runnable command) {//確保隊列中的任務被處理
        final ReentrantLock mainLock = this.mainLock;//主鎖
        mainLock.lock();//上鎖
        boolean reject = false;//設置reject為false
        Thread t = null;//聲明一個線程t
        try {
            int state = runState;//獲取運行狀態
            if (state != RUNNING && workQueue.remove(command))//如果狀態不是在運行中,在隊列中移除線程
                reject = true;//注入設置為true
            else if (state < STOP &&//如果狀態小於停止(這時候int的值作用就發揮了,看一下小於stop的有幾?stop是2,小於它也就是0和1,就表示運行和結束,上面的if說明了不是0,那么這里就表示1,也就是結束) 並且 如果池的大小大於1並且隊列不是空,也就是還存在任務在隊列中沒有執行完
                     poolSize < Math.max(corePoolSize, 1) &&
                     !workQueue.isEmpty())
                t = addThread(null);//添加一個線程,設置參數為null
        } finally {
            mainLock.unlock();//最后解鎖
        }
        if (reject)//如果注入為ture
            reject(command);//注入線程
        else if (t != null)//如果線程不為null
            t.start();//線程開始運行
    }

其中可以看到如果線程池的狀態是關閉的時候,這時候隊列也會移除任務,然后調用addThread方法,並且參數為null:

    
    private Thread addThread(Runnable firstTask) {//添加線程
        Worker w = new Worker(firstTask);//工作類新構造一個線程
        Thread t = threadFactory.newThread(w);//用線程工廠生產一個新類
        if (t != null) {//如果生產出來的類不是null
            w.thread = t;//設置一個線程t
            workers.add(w);//工作線程添加
            int nt = ++poolSize;//池的大小增加
            if (nt > largestPoolSize)//如果線程池的大小大於最大池的大小
                largestPoolSize = nt;//把增加后的池的大小賦給最大池大小
        }
        return t;//返回該線程
    }

我們來到addThread方法,此時參數為null,Worker類此時會構建一個null,我們再來看Worker的構造方法,然后看threadFactory,也就是線程工廠的方法

Worker(Runnable firstTask) {//構造一個任務線程
            this.firstTask = firstTask;
        }

 

從中可以看出,ThreadFactory只是進行了一個對創建線程的封裝,如果傳入的參數為null,那么它也會返回null,那么addThread就會返回null,我們再回去看看,也就是說如果其線程池狀態為停止,那么阻塞隊列會移除其中的任務,並不再執行任務線程。好了,我們再回過頭,看看按照正常邏輯怎么走。看一下addIfUnderCorePoolSize的源碼:

private boolean addIfUnderCorePoolSize(Runnable firstTask) {//保證在核心池的大小下添加任務
        Thread t = null;//線程為null
        final ReentrantLock mainLock = this.mainLock;//主鎖
        mainLock.lock();//開始上鎖
        try {
            if (poolSize < corePoolSize && runState == RUNNING)//如果池的大小小於核心池大小並且是正在運行狀態
                t = addThread(firstTask);//把第一個任務添加進去
        } finally {
            mainLock.unlock();//最后解鎖
        }
        if (t == null)//如果線程是null的話
            return false;//返回false
        t.start();//線程開始執行
        return true;//返回true
    }

這個方法主要是說如果線程數還未超過coreSize的時候,此時就會通過線程工廠生產出一個線程然后執行任務,這是絕大多數的線程的狀態。那么此時就還剩一種狀態,也就是在maxiumSize下,這個時候,會調用addIfUnderMaximumPoolSize方法,我們來分析一下這個方法的源碼:

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {//添加任務在未超過池的最大容量時
        Thread t = null;//聲明一個線程
        final ReentrantLock mainLock = this.mainLock;//開啟一個主鎖
        mainLock.lock();//開始上鎖
        try {
            if (poolSize < maximumPoolSize && runState == RUNNING)//如果池的大小小於最大池的大小並且運行狀態是在運行中
                t = addThread(firstTask);//調用添加線程的方法添加線程
        } finally {
            mainLock.unlock();//最后一定解鎖
        }
        if (t == null)//如果線程為null
            return false;//返回false
        t.start();//線程開始運行
        return true;//返回true
    }

這里也是對其池的大小進行了判斷,如果其符合小於最大池大小並且線程池狀態是正在運行,那么就生產線程去執行任務。

總結:execute方法主要是對線程數在不同的大小的判斷,並且是時刻判斷線程池的運行狀太,如果是正在運行,那么就會生產線程去執行任務,一旦超過coresize就放在阻塞隊列中去,未超過maxiumSize也會繼續執行,如果線程池狀態是停止的話,那么此時阻塞隊列會移除任務,並且不再創建線程去執行任務。

 2.3.2:ThreadPoolExector中的內部類Worker的講解

Worker作為ThreadPoolExector中的一個內部類,它也繼承了Runnable接口,所以其本質上是一個線程,按照我們分析源碼的技巧,先來看看它的全局變量:

       private final ReentrantLock runLock = new ReentrantLock();//新建一個可重入鎖

        
        private Runnable firstTask;//第一個任務線程

        
        volatile long completedTasks;//已經完成的任務數

        
        Thread thread;//聲明一個線程

從中可以看出它的任務,而Woker其實也就是線程池中運行的工作線程,這個類中主要是對線程的打斷和對線程池的關閉等一些列的方法操作,這里我們拿出兩個方法來講,一個是shutdown方法,顧名思義也就是關閉線程池,我們來看一下具體的代碼:

public void shutdown() {   //關閉方法
        
    SecurityManager security = System.getSecurityManager();//獲取系統安全管理器
    if (security != null)//如果安全接口不為null
            security.checkPermission(shutdownPerm);//檢查權限

        final ReentrantLock mainLock = this.mainLock;//獲取可重入鎖
        mainLock.lock();//開始上鎖
        try {
            if (security != null) { //如果安全接口不為null
                for (Worker w : workers)
                    security.checkAccess(w.thread);
            }

            int state = runState;
            if (state < SHUTDOWN)//如果狀態小於關閉,那么肯定是running或者介於running和關閉之間
                runState = SHUTDOWN;//狀態設置為關閉

            try {
                for (Worker w : workers) {//遍歷整個工作線程
                    w.interruptIfIdle();//如果是空閑的就打斷
                }
            } catch (SecurityException se) { // Try to back out
                runState = state;
                // tryTerminate() here would be a no-op
                throw se;
            }

            tryTerminate(); // Terminate now if pool and queue empty
        } finally {
            mainLock.unlock();
        }
    }

可以看出結束方法首先調用的是系統安全接口,這主要的目的是為了在系統層面對線程池進行保護,防止其發生意外。比如中斷系統進程等,獲取了安全管理器之后接下來再對其進行權限檢查,然后就是加鎖控制。接下來就是取狀態進行判斷了,接着遍歷循環整個線程池里的工作線程,一旦發現空閑的就進行打斷終止。然后是調用tryTerminate方法,我們來看看其中的源碼:

  
    private void tryTerminate() { //結束
        if (poolSize == 0) {                      
            int state = runState; //取當前運行狀態
            if (state < STOP && !workQueue.isEmpty()) {//如果運行狀態小於stop,也就是有運行和關閉的可能並且隊列不為空
                state = RUNNING; //運行狀態設置為Running
                Thread t = addThread(null);//不創造線程
                if (t != null)//
                    t.start();
            }
            if (state == STOP || state == SHUTDOWN) {
                runState = TERMINATED;
                termination.signalAll();
                terminated();
            }
        }
    }

如果線程池里沒有線程了,就取當前的運行狀態,然后再阻塞隊列中還有線程的情況下,可以看到不創建線程了,此時傳入addThread的參數為null,我們來具體看一下:

private Thread addThread(Runnable firstTask) {//添加線程
        Worker w = new Worker(firstTask);//工作類新構造一個線程
        Thread t = threadFactory.newThread(w);//用線程工廠生產一個新類
        if (t != null) {//如果生產出來的類不是null
            w.thread = t;//設置一個線程t
            workers.add(w);//工作線程添加
            int nt = ++poolSize;//池的大小增加
            if (nt > largestPoolSize)//如果線程池的大小大於最大池的大小
                largestPoolSize = nt;//把增加后的池的大小賦給最大池大小
        }
        return t;//返回該線程
    }

如果參數為null,那會就會返回一個null的線程,可以看出這里將不會產生新的線程,隊列中的任務有不會進行處理。然后是通過Conditon.signal方法喚醒所有等待線程,接下來再調用terminated()方法。

 protected void terminated() { }

目前的jdk1.6這個方法是空的,也就是不執行任何操作,那么這個方法就結束了。總結來說就是線程關閉的時候,檢查隊列中是否還有任務,如果有任務,不再繼續創建線程,用原來的線程把其執行完后關閉線程池,狀態設置為Terminted,如果沒有任務,檢查線程池是不是空,如果不是空,把其空間線程全部中斷,等待工作的線程處理完成以后關閉整個線程池!

 五:總結

       本篇博客主要分析了線程池的實現原理,在線程池中很重要和關鍵的一部分就是對部分參數的理解,還有它的運行原理,作為設計到線程部分的知識,可以看出線程池的每步操作基本上都有一個ReentrantLock可重入鎖,這是一種保護機制,它類似於synchronized,但是要比它更強大,並且是建立在java類的基礎上,實現更靈活,比如它可以實現中斷等待鎖、性能更加優良等特性,還有對其狀態的控制和操作,主要是通過Worker這個內部類來實現的,具體的可以參考jdk代碼,我這里只是起到一個拋磚引玉的作用,本篇博客暫時寫到這里。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM