Java線程池,你了解多少?


一、前言

   隨着業務的發展,單線程已經遠遠不能滿足,隨即就有多線程的出現。多線程雖然能解決單線程解決不了的事情,但是它也會給你帶來額外的問題。比如成千上萬甚至上百萬的線程時候,你系統就會出現響應延遲、卡機、甚至直接卡死的情況。為什么會出現這樣的原因呢?因為為每個請求創建一個新線程的開銷很大:在創建和銷毀線程上花費的時間和消耗的系統資源要比花在處理實際的用戶請求的時間和資源更多

  除了創建和銷毀線程的開銷之外,活動的線程也消耗系統資源。在一個 JVM里創建太多的線程可能會導致系統由於過度消耗內存而用完內存或“切換過度”。所以為了防止資源不足,服務器應用程序需要一些辦法來限制任何給定時刻處理的請求數目。而線程池為線程生命周期開銷問題和資源不足問題提供了解決方案。

二、那么線程池有哪些作用呢?

  1、降低資源消耗,防止資源不足。合理配置線程池中的線程大小,防止請求線程猛增;另外通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。
  2、提高響應速度。線程池可以通過對多個任務重用線程,在請求到達時線程已經存在(如果有空閑線程時),所以無意中也消除了線程創建所帶來的延遲。這樣,就可以立即為請求服務,使應用程序響應更快。
  3、提高線程的可管理性。使用線程池可以統一分配、調優和監控線程。

  上面知道了線程池的作用,那么線程池它是如何工作的呢?其使用核心類是哪一個呢?所以要做到合理利用線程池,必須對其實現原理了如指掌。

三、線程池中核心類:ThreadPoolExecutor

  java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,所以必須了解這個類的用法及其內部原理,下面我們來看下ThreadPoolExecutor類的具體源碼解析。

3.1  繼承關系

  通過類的繼承關系可以得知哪些方法源於哪里(具體請看代碼),下面直接給出類的繼承結構的圖:

 
        

3.2 構造方法   

  在ThreadPoolExecutor類中提供了四個構造方法:

 1         // 五個參數的構造函數
 2     public class ThreadPoolExecutor extends AbstractExecutorService {
 3         public ThreadPoolExecutor(int corePoolSize,
 4                               int maximumPoolSize,
 5                               long keepAliveTime,
 6                               TimeUnit unit,
 7                               BlockingQueue<Runnable> workQueue) {
 8         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
 9              Executors.defaultThreadFactory(), defaultHandler);
10     }
11     // 六個參數的構造函數-1
12     public ThreadPoolExecutor(int corePoolSize,
13                               int maximumPoolSize,
14                               long keepAliveTime,
15                               TimeUnit unit,
16                               BlockingQueue<Runnable> workQueue,
17                               ThreadFactory threadFactory) {
18         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
19              threadFactory, defaultHandler);
20     }
21 
22     //六個參數的構造函數 -2
23     public ThreadPoolExecutor(int corePoolSize,
24                               int maximumPoolSize,
25                               long keepAliveTime,
26                               TimeUnit unit,
27                               BlockingQueue<Runnable> workQueue,
28                               RejectedExecutionHandler handler) {
29         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
30              Executors.defaultThreadFactory(), handler);
31     }
32     // 七個參數的構造函數
33     public ThreadPoolExecutor(int corePoolSize,
34                               int maximumPoolSize,
35                               long keepAliveTime,
36                               TimeUnit unit,
37                               BlockingQueue<Runnable> workQueue,
38                               ThreadFactory threadFactory,
39                               RejectedExecutionHandler handler) {
40         if (corePoolSize < 0 ||
41             maximumPoolSize <= 0 ||
42             maximumPoolSize < corePoolSize ||
43             keepAliveTime < 0)
44             throw new IllegalArgumentException();
45         if (workQueue == null || threadFactory == null || handler == null)
46             throw new NullPointerException();
47         this.corePoolSize = corePoolSize;
48         this.maximumPoolSize = maximumPoolSize;
49         this.workQueue = workQueue;
50         this.keepAliveTime = unit.toNanos(keepAliveTime);
51         this.threadFactory = threadFactory;
52         this.handler = handler;
53     }
View Code

  從源代碼中發現前面三個構造器都是調用的第四個構造器進行的初始化工作,那就以第四個構造函數為例,解釋下其中各個參數的含義(留意源碼中每個字段上的注釋):

  1. int corePoolSize核心線程數
    • 在創建了線程池后,默認情況下線程池中並沒有任何線程,而是等待有任務到來才創建線程去執行任務;
    • 當提交一個任務到線程池時,線程池會創建一個線程來執行任務,即使有其他空閑的基本線程能夠執行新任務也會創建線程(比方說:coreSize=5時,一開始只有一個任務會創建一個線程,等執行完后又來了一個任務時,依然會創建一個線程不會使用第一個線程)
    • 當線程池中的線程數目達到corePoolSize后就不再創建線程,會把到達的任務放到緩存隊列當中等待執行。如果調用了線程池的prestartAllCoreThreads方法,線程池會提前創建並啟動所有基本線程,另外prestartCoreThread方法也會啟動核心線程,不過每次只能啟動一個。

  2. int maximumPoolSize線程池允許創建的最大線程數。

    • 如果隊列滿了,並且已創建的線程數小於最大線程數,則線程池會再創建新的線程執行任務。值得注意的是如果使用了無界的任務隊列這個參數就沒什么效果

  3. long keepAliveTime空閑線程等待超時的時間

    • 默認情況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大於corePoolSize,即當線程池中的線程數大於corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。
    • 但是如果調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的空閑線程數為0;
    • 如果任務很多且每個任務執行的時間比較短,則可以調大時間,提高線程利用率。

  4. TimeUnit unit參數keepAliveTime的時間單位。共有七種單位,如下:

public enum TimeUnit {
    /**
     * 納秒=千分之一微妙
     */
    NANOSECONDS {...},

    /**
     * 微妙=千分之一毫秒
     */
    MICROSECONDS {...},

    /**
     * 毫秒
     */
    MILLISECONDS {...},

    /**
     * 秒
     */
    SECONDS {...},

    /**
     * 分鍾
     */
    MINUTES {...},

    /**
     * 小時
     */
    HOURS {...},

    /**
     * 天
     */
    DAYS {...};
}
View Code

  5. BlockingQueue<Runnable> workQueue: 任務隊列,用於保存等待執行任務的阻塞隊列。隊列也有好幾種詳細請看這里,這里就不做解釋了。

  6. ThreadFactory threadFactory:線程工廠,主要用於創建線程。其中可以指定線程名字(千萬別忽略這件小事,有意義的名字能讓你快速定位到源碼中的線程類)

  7. RejectedExecutionHandler handler:飽和策略,當隊列和線程池都滿了,說明線程處於飽和狀態,那么后續進來的任務需要一種策略處理。默認情況下是AbortPolicy:表示無法處理新任務時拋出異常。線程池框架提供了以下4中策略(當然也可以自己自定義策略:通過實現RejectedExecutionHandler接口自定義策略):

    • AbortPolicy:不處理新任務,拋出異常
    • CallerRunsPolicy:只用調用者所在的線程來運行任務。
    • DiscardOldestPolicy:丟棄隊列里最近的一個任務, 並執行當前任務。
    • DiscardPolicy:不處理,丟棄掉。

 3.3 重要參數方法和方法解讀

  1. 線程池狀態

     // 初始值 -536870912
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 初始值 29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 初始值 536870911
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    // RUNNING狀態:接受新任務並處理排隊任務
    private static final int RUNNING    = -1 << COUNT_BITS;
    // SHUTDOWN狀態:不接受新任務,但處理排隊任務
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // STOP狀態:不接受新任務,不處理排隊任務,並中斷正在進行的任務
    private static final int STOP       =  1 << COUNT_BITS;
    // All tasks have terminated, workerCount is zero,  the thread transitioning to state TIDYING will run the terminated() hook method
    private static final int TIDYING    =  2 << COUNT_BITS;
    // TERMINATED: terminated() has completed
    private static final int TERMINATED =  3 << COUNT_BITS;
    // 獲取線程池狀態,取前三位
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 獲取當前正在工作的worker,主要是取后面29位
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 生成ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }

  當創建線程池后,初始時,線程池處於RUNNING狀態;

  RUNNING -> SHUTDOWNN:如果調用了shutdown()方法,則線程池處於SHUTDOWN狀態,此時線程池不能夠接受新的任務,它會等待所有任務執行完畢;

  (RUNNING or SHUTDOWN) -> STOP:如果調用了shutdownNow()方法,則線程池處於STOP狀態,此時線程池不能接受新的任務,並且會去嘗試終止正在執行的任務;

  SHUTDOWN -> TIDYING or STOP -> TIDYING :當線程池處於SHUTDOWN或STOP狀態,並且所有工作線程已經銷毀,任務緩存隊列已經清空或執行結束后,線程池被設置為TERMINATED狀態。

  2. 線程池中的線程初始化

  在說corePoolSize參數時有說到初始化線程池的兩個方法,其實在默認情況下,創建線程池之后線程池中是沒有線程的,需要提交任務之后才會創建線程。所以如果想在創建線程池之后就創建線程的話,可以通過下面兩個方法創建:

/**
    * 單個創建核心線程
    */
    public boolean prestartCoreThread() {
        return workerCountOf(ctl.get()) < corePoolSize &&
            addWorker(null, true);
    }
    /**
    * 啟動所有核心線程
    */
    public int prestartAllCoreThreads() {
        int n = 0;
        // 添加工作線程
        while (addWorker(null, true))
            ++n;
        return n;
    }    

  3. 創建線程:addWorker()

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            // 獲取運行狀態
            int rs = runStateOf(c);
            /**
             * 如果當前的線程池的狀態>SHUTDOWN 那么拒絕Worker的add 如果=SHUTDOWN
             * 那么此時不能新加入不為null的Task,如果在WorkCount為empty的時候不能加入任何類型的Worker,
             * 如果不為empty可以加入task為null的Worker,增加消費的Worker
             */
            if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                            firstTask == null &&
                            ! workQueue.isEmpty()))
                return false;


            for (;;) {
                // 獲取有效線程數,並判斷//如果當前的數量超過了CAPACITY,或者超過了corePoolSize和maximumPoolSize(試core而定),則直接返回
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // //CAS嘗試增加線程數,如果失敗,證明有競爭,那么重新到retry。
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 繼續判斷當前線程池的運行狀態
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        /**
         * 新建任務
         */
        Worker w = new Worker(firstTask);
        Thread t = w.thread;

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.
            int c = ctl.get();
            int rs = runStateOf(c);
            /**
             * rs!=SHUTDOWN ||firstTask!=null
             *
             * 同樣檢測當rs>SHUTDOWN時直接拒絕減小Wc,同時Terminate,如果為SHUTDOWN同時firstTask不為null的時候也要Terminate
             */
            if (t == null ||
                    (rs >= SHUTDOWN &&
                            ! (rs == SHUTDOWN &&
                                    firstTask == null))) {
                decrementWorkerCount();
                tryTerminate();
                return false;
            }

            workers.add(w);

            int s = workers.size();
            if (s > largestPoolSize)
                largestPoolSize = s;
        } finally {
            mainLock.unlock();
        }

        t.start();
        //Stop或線程Interrupt的時候要中止所有的運行的Worker
        if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
            t.interrupt();
        return true;
    }

  從上面可以看出:

    在rs>SHUTDOWN時,拒絕一切線程的增加,因為STOP是會終止所有的線程,同時移除Queue中所有的待執行的線程的,所以也不需要增加first=null的Worker了。

    其次,在SHUTDOWN狀態時,是不能增加first!=null的Worker的,同時即使first=null,但是此時Queue為Empty也是不允許增加Worker的,SHUTDOWN下增加的Worker主要用於消耗Queue中的任務。

    SHUTDOWN狀態時,是不允許向workQueue中增加線程的,isRunning(c) && workQueue.offer(command) 每次在offer之前都要做狀態檢測,也就是線程池狀態變為>=SHUTDOWN時不允許新線程進入線程池了。

 

 

  
   4、執行任務:execute()
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         * 原注釋已經講的很清楚了,主要分三步進行:
         */
        int c = ctl.get();
        // 1、如果線程數小於基本線程數,則創建線程並執行當前任務
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 2、如果任務可以排隊,則會重新檢查看是否可以啟動新的任務還是拒絕任務
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 3、如果我們無法排隊任務,那么我們嘗試添加一個新線程。 如果失敗,我們知道我們已關閉或飽和,因此拒絕該任務。
        else if (!addWorker(command, false))
            reject(command);
    }

  注意:該方法是沒有返回值的,如果想獲取線程執行后的結果可以調用submit方法(當然它底層也是調用execute()方法)

  5、線程池關閉:

  ThreadPoolExecutor提供了兩個方法,用於線程池的關閉,分別是shutdown()和shutdownNow(),其中:

  • shutdown():不會立即終止線程池,而是要等所有任務緩存隊列中的任務都執行完后才終止,但再也不會接受新的任務
  • shutdownNow():立即終止線程池,並嘗試打斷正在執行的任務,並且清空任務緩存隊列,返回尚未執行的任務

四、線程池工作流程圖

  從上線的源碼分析后,應該知道線程池處理任務的大概流程了,下面統一梳理下當線程池接到任務時的處理流程:

  1、線程池首先會判斷核心線程池是否已滿(核心線程數是否超過corePoolSize),若沒有則創建新的核心線程來處理任務,否則進行第二步;

  2、接着會判斷阻塞隊列是否已滿(所以推薦使用有界隊列),如果沒有滿則進入阻塞隊列等待執行,否則進行第三步;

  3、然后線程池會判斷整個線程池是否已滿(整個線程數是否超過maximunPoolSize),若沒有則創建新線程處理任務,否則交個飽和策略處理新的任務。

五、關於線程池使用的注意事項

  1、創建線程或線程池時請指定有意義的線程名稱,方便回溯。來源《阿里巴巴 Java開發手冊》

  2、線程池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式,這樣 的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。來源《阿里巴巴 Java開發手冊》

    說明:Executors 返回的線程池對象的弊端如下:

    1)FixedThreadPool 和 SingleThreadPool: 允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。

     2)CachedThreadPool 和 ScheduledThreadPool: 允許的創建線程數量為 Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM。

  3、合理配置線程池大小,可以從以下幾個角度來進行分析:

    • 任務的性質:CPU密集型任務,IO密集型任務和混合型任務。
    • 任務的優先級:高,中和低。
    • 任務的執行時間:長,中和短。
    • 任務的依賴性:是否依賴其他系統資源,如數據庫連接。

    比方說:對於 CPU 密集型的計算場景,理論上“線程的數量 = CPU核數”是最合適的。

    如果是IO密集型任務,參考值可以設置為CPU 核數 * [ 1 +(I/O 耗時 / CPU耗時)]

    注意:以上值僅供參考,需要根據具體實際情況(壓測)而定。

  4、建議使用有界隊列

    • 有界隊列起碼會是maximumPoolSize參數生效。
    • 有界隊列能增加系統的穩定性和預警能力,根據需要設置隊列長度(隊列長度最好也不要太大,太大可能會堆積大量的請求,從而導致OOM)

   5、合理設置空閑線程等待時間。

    如果任務很多且每個任務執行的時間比較短,則可以調大時間,提高線程利用率。

六、參考資料

https://www.cnblogs.com/dolphin0520/p/3932921.html

http://ifeve.com/java-threadpool/

《Java並發編程的藝術》

《阿里巴巴Java開發手冊》


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM