含源碼解析,深入Java 線程池原理


從池化技術到底層實現,一篇文章帶你貫通線程池技術。

1、池化技術簡介

在系統開發過程中,我們經常會用到池化技術來減少系統消耗,提升系統性能。
在編程領域,比較典型的池化技術有:
線程池、連接池、內存池、對象池等。

對象池通過復用對象來減少創建對象、垃圾回收的開銷;連接池(數據庫連接池、Redis連接池和HTTP連接池等)通過復用TCP連接來減少創建和釋放連接的時間。線程池通過復用線程提升性能。簡單來說,池化技術就是通過復用來提升性能。

線程、內存、數據庫的連接對象都是資源,在程序中,當你創建一個線程或者在堆上申請一塊內存的時候都涉及到很多的系統調用,也是非常消耗CPU的。如果你的程序需要很多類似的工作線程或者需要頻繁地申請釋放小塊內存,在沒有對這方面進行優化的情況下,這部分代碼很可能會成為影響你整個程序性能的瓶頸。

如果每次都是如此的創建線程->執行任務->銷毀線程,會造成很大的性能開銷。復用已創建好的線程可以提高系統的性能,借助池化技術的思想,通過預先創建好多個線程,放在池中,這樣可以在需要使用線程的時候直接獲取,避免多次重復創建、銷毀帶來的開銷。

(1)線程池的優點

  • 線程是稀缺資源,使用線程池可以減少創建和銷毀線程的次數,每個工作線程都可以重復使用。
  • 可以根據系統的承受能力,調整線程池中工作線程的數量,防止因為消耗過多內存導致服務器崩潰。

(2)線程池的風險

雖然線程池是構建多線程應用程序的強大機制,但使用它並不是沒有風險的。用線程池構建的應用程序容易遭受任何其它多線程應用程序容易遭受的所有並發風險,諸如同步錯誤和死鎖,它還容易遭受特定於線程池的少數其它風險,諸如與池有關的死鎖、資源不足和線程泄漏。

  • 死鎖

任何多線程應用程序都有死鎖風險。當一組進程或線程中的每一個都在等待一個只有該組中另一個進程才能引起的事件時,我們就說這組進程或線程 死鎖了。死鎖的最簡單情形是:線程 A 持有對象 X 的獨占鎖,並且在等待對象 Y 的鎖,而線程 B 持有對象 Y 的獨占鎖,卻在等待對象 X 的鎖。除非有某種方法來打破對鎖的等待(Java 鎖定不支持這種方法),否則死鎖的線程將永遠等下去。

  • 資源不足

線程池的一個優點在於:相對於其它替代調度機制(有些我們已經討論過)而言,它們通常執行得很好。但只有恰當地調整了線程池大小時才是這樣的。

線程消耗包括內存和其它系統資源在內的大量資源。除了
Thread 對象所需的內存之外,每個線程都需要兩個可能很大的執行調用堆棧。除此以外,JVM 可能會為每個 Java
線程創建一個本機線程,這些本機線程將消耗額外的系統資源。最后,雖然線程之間切換的調度開銷很小,但如果有很多線程,環境切換也可能嚴重地影響程序的性能。

如果線程池太大,那么被那些線程消耗的資源可能嚴重地影響系統性能。在線程之間進行切換將會浪費時間,而且使用超出比您實際需要的線程可能會引起資源匱乏問題,因為池線程正在消耗一些資源,而這些資源可能會被其它任務更有效地利用。

除了線程自身所使用的資源以外,服務請求時所做的工作可能需要其它資源,例如 JDBC 連接、套接字或文件,這些也都是有限資源,有太多的並發請求也可能引起失效,例如不能分配 JDBC 連接。

  • 並發錯誤

線程池和其它排隊機制依靠使用
wait() 和 notify()
方法,這兩個方法都難於使用。如果編碼不正確,那么可能丟失通知,導致線程保持空閑狀態,盡管隊列中有工作要處理。使用這些方法時,必須格外小心;即便是專家也可能在它們上面出錯。而最好使用現有的、已經知道能工作的實現,例如在
util.concurrent 包。

  • 線程泄漏

各種類型的線程池中一個嚴重的風險是線程泄漏,當從池中除去一個線程以執行一項任務,而在任務完成后該線程卻沒有返回池時,會發生這種情況。發生線程泄漏的一種情形出現在任務拋出一個 RuntimeException 或一個 Error 時。

如果池類沒有捕捉到它們,那么線程只會退出而線程池的大小將會永久減少一個。當這種情況發生的次數足夠多時,線程池最終就為空,而且系統將停止,因為沒有可用的線程來處理任務。

  • 請求過載

僅僅是請求就壓垮了服務器,這種情況是可能的。在這種情形下,我們可能不想將每個到來的請求都排隊到我們的工作隊列,因為排在隊列中等待執行的任務可能會消耗太多的系統資源並引起資源缺乏。在這種情形下決定如何做取決於您自己;在某些情況下,您可以簡單地拋棄請求,依靠更高級別的協議稍后重試請求,您也可以用一個指出服務器暫時很忙的響應來拒絕請求。

2、 如何配置線程池大小配置

一般需要根據任務的類型來配置線程池大小:

  • 如果是CPU密集型任務,就需要盡量壓榨CPU,參考值可以設為 NCPU+1
  • 如果是IO密集型任務,參考值可以設置為2*NCPU

當然,這只是一個參考值,具體的設置還需要根據實際情況進行調整,比如可以先將線程池大小設置為參考值,再觀察任務運行情況和系統負載、資源利用率來進行適當調整。

3、線程池的底層原理

(1)線程池的狀態

線程池和線程一樣擁有自己的狀態,在ThreadPoolExecutor類中定義了一個volatile變量runState來表示線程池的狀態,線程池有四種狀態,分別為RUNNING、SHURDOWN、STOP、TERMINATED。

  • 線程池創建后處於RUNNING狀態。
  • 調用shutdown后處於SHUTDOWN狀態,線程池不能接受新的任務,會等待緩沖隊列的任務完成。
  • 調用shutdownNow后處於STOP狀態,線程池不能接受新的任務,並嘗試終止正在執行的任務。
  • 當線程池處於SHUTDOWN或STOP狀態,並且所有工作線程已經銷毀,任務緩存隊列已經清空或執行結束后,線程池被設置為TERMINATED狀態。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

其中ctl這個AtomicInteger的功能很強大,其高3位用於維護線程池運行狀態,低29位維護線程池中線程數量

RUNNING:-1<<COUNT_BITS,即高3位為1,低29位為0,該狀態的線程池會接收新任務,也會處理在阻塞隊列中等待處理的任務

SHUTDOWN:0<<COUNT_BITS,即高3位為0,低29位為0,該狀態的線程池不會再接收新任務,但還會處理已經提交到阻塞隊列中等待處理的任務

STOP:1<<COUNT_BITS,即高3位為001,低29位為0,該狀態的線程池不會再接收新任務,不會處理在阻塞隊列中等待的任務,而且還會中斷正在運行的任務

TIDYING:2<<COUNT_BITS,即高3位為010,低29位為0,所有任務都被終止了,workerCount為0,為此狀態時還將調用terminated()方法

TERMINATED:3<<COUNT_BITS,即高3位為100,低29位為0,terminated()方法調用完成后變成此狀態

這些狀態均由int型表示,大小關系為 RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED,這個順序基本上也是遵循線程池從 運行 到 終止這個過程。

  • runStateOf(int c) 方法:c & 高3位為1,低29位為0的~CAPACITY,用於獲取高3位保存的線程池狀態

  • workerCountOf(int c)方法:c & 高3位為0,低29位為1的CAPACITY,用於獲取低29位的線程數量

  • ctlOf(int rs, int wc)方法:參數rs表示runState,參數wc表示workerCount,即根據runState和workerCount打包合並成ctl

(2)為什么ctl負責兩種角色

在Doug Lea的設計中,ctl負責兩種角色可以避免多余的同步邏輯。

很多人會想,一個變量表示兩個值,就節省了存儲空間,但是這里很顯然不是為了節省空間而設計的,即使將這輛個值拆分成兩個Integer值,一個線程池也就多了4個字節而已,為了這4個字節而去大費周章地設計一通,顯然不是Doug Lea的初衷。

在多線程的環境下,運行狀態和有效線程數量往往需要保證統一,不能出現一個改而另一個沒有改的情況,如果將他們放在同一個AtomicInteger中,利用AtomicInteger的原子操作,就可以保證這兩個值始終是統一的。

(3)線程池工作流程

預先啟動一些線程,線程無限循環從任務隊列中獲取一個任務進行執行,直到線程池被關閉。如果某個線程因為執行某個任務發生異常而終止,那么重新創建一個新的線程而已,如此反復。

一個任務從提交到執行完畢經歷過程如下:

第一步:如果當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;

第二步:如果當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程將其取出去執行;若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務;

第三步:如果線程池中的線程數量大於等於corePoolSize,且隊列workQueue已滿,但線程池中的線程數量小於maximumPoolSize,則會創建新的線程來處理被添加的任務

第四步:如果當前線程池中的線程數目達到maximumPoolSize,則會采取任務拒絕策略進行處理;

流程圖如下:

4、ThreadPoolExecutor解析

ThreadPoolExecutor繼承自AbstractExecutorService,同時實現了ExecutorService接口,也是Executor框架默認的線程池實現類,一般我們使用線程池,如沒有特殊要求,直接創建ThreadPoolExecutor,初始化一個線程池,如果需要特殊的線程池,則直接繼承ThreadPoolExecutor,並實現特定的功能,如ScheduledThreadPoolExecutor,它是一個具有定時執行任務的線程池。

(1)Executor框架

在深入源碼之前先來看看J.U.C包中的線程池類圖:

它們的最頂層是一個Executor接口,它只有一個方法:

public interface Executor {
    void execute(Runnable command);
}

它提供了一個運行新任務的簡單方法,Java線程池也稱之為Executor框架。

ExecutorService擴展了Executor,添加了操控線程池生命周期的方法,如shutDown(),shutDownNow()等,以及擴展了可異步跟蹤執行任務生成返回值Future的方法,如submit()等方法。

(2)Worker解析

Worker類繼承了AQS,並實現了Runnable接口,它有兩個重要的成員變量:firstTask和thread。firstTask用於保存第一次新建的任務;thread是在調用構造方法時通過ThreadFactory來創建的線程,是用來處理任務的線程。

 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

需要注意workers的數據結構為HashSet,非線程安全,所以操作workers需要加同步鎖。添加步驟做完后就啟動線程來執行任務了。

 /**
     * Set containing all worker threads in pool. Accessed only when
     * holding mainLock.
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();

(3)如何在線程池中添加任務

線程池要執行任務,那么必須先添加任務,execute()雖說是執行任務的意思,但里面也包含了添加任務的步驟在里面,下面源碼:

public void execute(Runnable command) {
  // 如果添加訂單任務為空,則空指針異常
  if (command == null)
    throw new NullPointerException();
  // 獲取ctl值
  int c = ctl.get();
  // 1.如果當前有效線程數小於核心線程數,調用addWorker執行任務(即創建一條線程執行該任務)
  if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
      return;
    c = ctl.get();
  }
  // 2.如果當前有效線程大於等於核心線程數,並且當前線程池狀態為運行狀態,則將任務添加到阻塞隊列中,等待空閑線程取出隊列執行
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
      reject(command);
    else if (workerCountOf(recheck) == 0)
      addWorker(null, false);
  }
  // 3.如果阻塞隊列已滿,則調用addWorker執行任務(即創建一條線程執行該任務)
  else if (!addWorker(command, false))
    // 如果創建線程失敗,則調用線程拒絕策略
    reject(command);
}

addWorker添加任務,方法源碼有點長,按照邏輯拆分成兩部分講解:

java.util.concurrent.ThreadPoolExecutor#addWorker:

retry:
for (;;) {
  int c = ctl.get();
  // 獲取線程池當前運行狀態
  int rs = runStateOf(c);

  // 如果rs大於SHUTDOWN,則說明此時線程池不在接受新任務了
  // 如果rs等於SHUTDOWN,同時滿足firstTask為空,且阻塞隊列如果有任務,則繼續執行任務
  // 也就說明了如果線程池處於SHUTDOWN狀態時,可以繼續執行阻塞隊列中的任務,但不能繼續往線程池中添加任務了
  if (rs >= SHUTDOWN &&
      ! (rs == SHUTDOWN &&
         firstTask == null &&
         ! workQueue.isEmpty()))
    return false;

  for (;;) {
    // 獲取有效線程數量
    int wc = workerCountOf(c);
    // 如果有效線程數大於等於線程池所容納的最大線程數(基本不可能發生),不能添加任務
    // 或者有效線程數大於等於當前限制的線程數,也不能添加任務
    // 限制線程數量有任務是否要核心線程執行決定,core=true使用核心線程執行任務
    if (wc >= CAPACITY ||
        wc >= (core ? corePoolSize : maximumPoolSize))
      return false;
    // 使用AQS增加有效線程數量
    if (compareAndIncrementWorkerCount(c))
      break retry;
    // 如果再次獲取ctl變量值
    c = ctl.get();  // Re-read ctl
    // 再次對比運行狀態,如果不一致,再次循環執行
    if (runStateOf(c) != rs)
      continue retry;
    // else CAS failed due to workerCount change; retry inner loop
  }
}

這里特別強調,firstTask是開啟線程執行的首個任務,之后常駐在線程池中的線程執行的任務都是從阻塞隊列中取出的,需要注意。

以上for循環代碼主要作用是判斷ctl變量當前的狀態是否可以添加任務,特別說明了如果線程池處於SHUTDOWN狀態時,可以繼續執行阻塞隊列中的任務,但不能繼續往線程池中添加任務了;同時增加工作線程數量使用了AQS作同步,如果同步失敗,則繼續循環執行。

// 任務是否已執行
boolean workerStarted = false;
// 任務是否已添加
boolean workerAdded = false;
// 任務包裝類,我們的任務都需要添加到Worker中
Worker w = null;
try {
  // 創建一個Worker
  w = new Worker(firstTask);
  // 獲取Worker中的Thread值
  final Thread t = w.thread;
  if (t != null) {
    // 操作workers HashSet 數據結構需要同步加鎖
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
      // Recheck while holding lock.
      // Back out on ThreadFactory failure or if
      // shut down before lock acquired.
      // 獲取當前線程池的運行狀態
      int rs = runStateOf(ctl.get());
      // rs < SHUTDOWN表示是RUNNING狀態;
      // 如果rs是RUNNING狀態或者rs是SHUTDOWN狀態並且firstTask為null,向線程池中添加線程。
      // 因為在SHUTDOWN時不會在添加新的任務,但還是會執行workQueue中的任務
      // rs是RUNNING狀態時,直接創建線程執行任務
      // 當rs等於SHUTDOWN時,並且firstTask為空,也可以創建線程執行任務,也說說明了SHUTDOWN狀態時不再接受新任務
      if (rs < SHUTDOWN ||
          (rs == SHUTDOWN && firstTask == null)) {
        if (t.isAlive()) // precheck that t is startable
          throw new IllegalThreadStateException();
        workers.add(w);
        int s = workers.size();
        if (s > largestPoolSize)
          largestPoolSize = s;
        workerAdded = true;
      }
    } finally {
      mainLock.unlock();
    }
    // 啟動線程執行任務
    if (workerAdded) {
      t.start();
      workerStarted = true;
    }
  }
} finally {
  if (! workerStarted)
    addWorkerFailed(w);
}
return workerStarted;
}

以上源碼主要的作用是創建一個Worker對象,並將新的任務裝進Worker中,開啟同步將Worker添加進workers中,這里需要注意workers的數據結構為HashSet,非線程安全,所以操作workers需要加同步鎖。添加步驟做完后就啟動線程來執行任務了,繼續往下看。

(4)前置和后置鈎子

如果需要在任務執行前后插入邏輯,你可以實現ThreadPoolExecutor以下兩個方法:

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }

這樣一來,就可以對任務的執行進行實時監控。

5、線程池總結

線程池原理關鍵技術:鎖(lock,cas)、阻塞隊列、hashSet(資源池)

所謂線程池本質是一個Worker對象的hashSet,多余的任務會放在阻塞隊列中,只有當阻塞隊列滿了后,才會觸發非核心線程的創建,非核心線程只是臨時過來打雜的,直到空閑,然后自己關閉。
線程池提供了兩個鈎子(beforeExecute,afterExecute)給我們,我們繼承線程池,在執行任務前后做一些事情。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM