線程池原理


一、線程池的作用

線程池類似於數據庫鏈接池、Redis鏈接池等池化技術。池化技術的優點如下:

1. 統一管理資源,線程是操作系統一個重要監控管理指標,過多的線程會導致占用內存、上下文切換頻繁等問題,所以需要管理起來線程,而每處都用new Thread()方法來創建線程,那線程資源散落在應用程序各地,沒法管理。

2. 不需要每次要用到線程時都再次創建一個新的線程,可以做到線程重用。線程池默認初始化時是沒有創建線程的(也可以在創建線程池時自動創建好核心線程),線程池里的線程的初始化與其他線程一樣,但是在完成任務以后,該線程不會自行銷毀,而是以掛起的狀態返回到線程池。直到應用程序再次向線程池發出請求時,線程池里掛起的線程就會再度激活執行任務。這樣既節省了建立線程所造成的性能損耗,也可以讓多個任務反復重用同一線程,從而在應用程序生存期內節約大量開銷。

二、Java中提供的創建線程池的API

為了方便大家對於線程池的使用,在 Executors 里面提供了幾個線程池的工廠方法,這樣很多新手就不需要了解太多關於 ThreadPoolExecutor 的知識了,他們只需要直接使用 Executors 的工廠方法,就可以使用線程池。但是作為有目標的青年,還是要了解下里面的概念和坑。

先來解釋一下每個參數的作用,稍后我們在分析源碼的過程中再來詳細了解參數的意義。 

public ThreadPoolExecutor(int corePoolSize, // 核心線程數
                          int maximumPoolSize,// 最大線程數
                          long keepAliveTime,// 非核心線程數空閑時,回收時長
                          TimeUnit unit,// 回收時長單位
                          BlockingQueue<Runnable> workQueue,// 阻塞隊列
                          RejectedExecutionHandler handler/** 拒絕策略*/) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), handler);
}
1. FixedThreadPool:
核心線程數=最大線程數,阻塞隊列用的是LinkedBlockingQueue(且默認隊列長度是Integer.MAX_VALUE),這樣的話就造成了阻塞隊列是無界隊列,不會有非核心線程和拒絕策略。這個線程池執行任務的流程如下:

1. 線程數少於核心線程數(也就是設置的線程數)時,新建線程執行任務;

2. 線程數等於核心線程數后,將任務加入阻塞隊列;

3. 由於隊列容量非常大,所以可以一直添加;

4. 執行完任務的線程反復去隊列中取任務執行;

用途:FixedThreadPool 用於負載比較大的服務器,為了資源的合理利用,需要限制當前線程數量

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
            // 不會有非核心線程,所以回收時間間隔為0
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>());
}

2. CachedThreadPool:核心線程數為0,然后任務進入SynchronousQueue阻塞隊列,最后在由非核心線程來處理其余的任務(在60秒內非核心線程處理完后可以繼續服用)。(先來的先做,后來的全部找外包干,來多少活就找多少外包,反正老子有的是錢,結果最后老板跑路,發生了著名的OOM事件)

最大線程數和非核心線程數是Integer.MAX_VALUE,不會有拒絕策略。所有線程執行完任務的線程有 60 秒生存時間,如果在這個時間內可以接到新任務,就可以繼續活下去,否則就被回收。

它的執行流程如下:
1. 沒有核心線程,直接向 SynchronousQueue 中提交任務
2. 如果有空閑的非核心線程,就去取出任務執行;如果沒有空閑的非核心線程,就新建一個
3. 執行完任務的非核心線程有 60 秒生存時間,如果在這個時間內可以接到新任務,就可以繼續活下去,否則就被回收 

/*
 * 核心線程數為0(沒有核心線程,直接向 SynchronousQueue 中提交任務),
 * 最大線程數是Integer.MAX_VALUE,不會有拒絕策略,導致大量線程的創建出現 CPU 使用過高或者 OOM 的問題
 * 執行完任務的線程有 60 秒生存時間,如果在這個時間內可以接到新任務,就可以繼續活下去,否則就被回收
 */
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
            60L, TimeUnit.SECONDS,
            new SynchronousQueue<>());
}

3. newSingleThreadExecutor():核心線程數=最大線程數=1,使用無界阻塞隊列。(老子就一個人,慢慢做,先來先做,后來的全部去排隊)

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>()));
}

三、線程池的實現原理分析:

我們先看下線程池原理分析(FixedThreadPool)。
舉個例子:好比現在有家賓館,一共有500個床位(核心線程數),200個預約名額(阻塞隊列),外加100個臨時床位以備不時之需(臨時線程數)。好了,現在賓館開業,先來了300個客人,OK,直接搞定。后來又來了300個客人,這下搞了,其中200個客人直接搞定,還有100個客人呢?我那100個臨時床位可是以備不時之需的,先不給他們,這100個客人給我排隊預約(進入阻塞隊列),后來又TM來了200個客人,這200個客人中有100個我可以讓他們去預約排隊,那還剩下100個人呢,我只得操家伙拿出壓箱底的那100個臨時床位來伺候了,那如果后面在來客人怎么辦?拒絕策略伺候!!!!!
這個例子大致意思描述對了,但是里面還是有些不准確的地方,線程池默認初始化時,里面是沒有現成的,而例子中賓館剛開業其實已經准備好床位了。但是這點不影響大家理解,湊活着用唄。

源碼分析:

ThreadPoolExecutor的execute()方法:

/**
* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
* 線程池初始化時是沒有創建線程的,線程池里的線程的初始化與其他線程一樣,但是在完成任務以后,該線程不會自行銷毀,
* 而是以掛起的狀態返回到線程池。直到應用程序再次向線程池發出請求時,線程池里掛起的線程就會再度激活執行任務。
* 這樣既節省了建立線程所造成的性能損耗,也可以讓多個任務反復重用同一線程,從而在應用程序生存期內節約大量開銷
*
* 默認情況下,創建線程池之后,線程池中是沒有線程的,需要提交任務之后才會創建線程。
* 在實際中如果需要線程池創建之后立即創建線程,可以通過以下兩個方法辦到:
* prestartCoreThread():初始化一個核心線程
* prestartAllCoreThreads():初始化所有核心線程
* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
*/
public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();

  int c = ctl.get();

  // 1.當前池中線程比核心數少,新建一個線程執行任務
  if (workerCountOf(c) < corePoolSize) {
    // 創建新的線程並執行任務,如果成功就返回
    if (addWorker(command, true))
      return;
    c = ctl.get();
  }

  // 2.核心池已滿但任務隊列未滿,將任務添加到隊列中
  if (isRunning(c) && workQueue.offer(command)) {
    //重新獲取ctl
    int recheck = ctl.get();
    //任務成功添加到隊列以后,再次檢查是否需要添加新的線程,因為已存在的線程可能被銷毀了
    //如果線程池處於非運行狀態,並且把當前的任務從任務隊列中移除成功,則拒絕該任務
    if (!isRunning(recheck) && remove(command))
      reject(command);
    //如果之前的線程已被銷毀完,新建一個線程
    else if (workerCountOf(recheck) == 0)
      addWorker(null, false);
  /*
  * 如果執行到這里,有兩種情況:
  * 1. 線程池已經不是RUNNING狀態;
  * 2. 線程池是RUNNING狀態,但workerCount >= corePoolSize並且workQueue已滿。
  */
  } else if (!addWorker(command, false))
    // 如果線程池是非RUNNING狀態或者加入阻塞隊列失敗,則嘗試創建新非核心線程(外包)直到maxPoolSize
    // 創建非核心線程失敗,則啟動拒絕策略
    reject(command);
}

其中ctl就是一個AutomicInteger的變量,用於存儲線程數量(低29位)和線程池的狀態(高3位)。

線程池狀態如下:

/**
 * 即高3位為111,該狀態的線程池會接收新任務,並處理阻塞隊列中的任務;
 * 111 0 0000 0000 0000 0000 0000 0000 0000
 * -1 原碼:0000 ... 0001 反碼:1111 ... 1110 補碼:1111 ... 1111
 * 左移操作:后面補 0
 * 111 0 0000 0000 0000 0000 0000 0000 0000
 */
private static final int RUNNING = -1 << COUNT_BITS;
/**
 * 即高3位為000,該狀態的線程池不會接收新任務,但會處理阻塞隊列中的任務;
 * 000 0 0000 0000 0000 0000 0000 0000 0000
 */
private static final int SHUTDOWN = 0 << COUNT_BITS;
/**
 * 即高3位為001,該狀態的線程不會接收新任務,也不會處理阻塞隊列中的任務,而且會中斷正在運行的任務;
 * 001 0 0000 0000 0000 0000 0000 0000 0000
 */
private static final int STOP = 1 << COUNT_BITS;
/**
 * 即高3位為010,所有任務都已終止,workerCount為零,過渡到狀態TIDYING的線程將運行terminated()鈎子方法;
 * 010 0 0000 0000 0000 0000 0000 0000 0000
 */
private static final int TIDYING = 2 << COUNT_BITS;
/**
 * 即高3位為011,terminated()方法執行完畢;
 * 011 0 0000 0000 0000 0000 0000 0000 0000
 */
private static final int TERMINATED = 3 << COUNT_BITS;
ThreadPoolExecutor的addWorker():
1)采用循環 CAS 操作來將線程數加 1;
2)新建一個線程並啟用;
/**
 * firstTask參數用於表示怎么獲取線程處理的任務,true為傳入的任務,false表示從阻塞隊列獲取任務
 * core參數為true表示在新增線程時會判斷當前活動線程數是否少於corePoolSize,
 * false表示新增線程前需要判斷當前活動線程數是否少於maximumPoolSize
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    // 內嵌循環,通過CAS worker + 1
    retry:
    for (; ; ) {
        // 獲取當前線程池狀態與線程數
        int c = ctl.get();
        // 獲取當前線程狀態
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        /**
         * 這個if判斷
         * 如果線程池處於SHUTDOWN,STOP,TIDYING,TERMINATED的時候,則表示此時不再接收新任務;
         * 接着判斷以下3個條件,只要有1個不滿足,則返回false:
         * 1. rs == SHUTDOWN,這時表示關閉狀態,不再接受新提交的任務,但卻可以繼續處理阻塞隊列中已保存的任務
         * 2. firsTask為空
         * 3. 阻塞隊列不為空
         *
         * 首先考慮rs == SHUTDOWN的情況
         * 這種情況下不會接受新提交的任務,所以在firstTask不為空的時候會返回false;
         * 然后,如果firstTask為空,並且workQueue也為空,則返回false,
         * 因為隊列中已經沒有任務了,不需要再添加線程了
         */
        if (rs >= SHUTDOWN &&
                !(rs == SHUTDOWN &&
                        // 不在接受新的任務
                        firstTask == null &&
                        // 隊列中已經沒有任務了,不需要再添加線程了
                        !workQueue.isEmpty()))
            return false;
        // 增加工作線程數
        for (; ; ) {
            // 線程數量
            int wc = workerCountOf(c);
            // 如果當前線程數大於線程最大上限CAPACITY  return false
            // 創建核心線程則與 corePoolSize 比較,否則與 maximumPoolSize 比較
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 嘗試增加workerCount,如果成功,則跳出第一個for循環
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    // 創建一個新的線程並執行
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        // 新建線程,將線程封裝成Worker
        w = new Worker(firstTask);
        // 每一個Worker對象都會創建一個線程
        final Thread t = w.thread;
        if (t != null) {
            // 將任務添加到workers Queue中
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();
                // 線程池狀態
                int rs = runStateOf(c);

                // rs < SHUTDOWN表示是RUNNING狀態;
                if (rs < SHUTDOWN ||
                        // rs是SHUTDOWN狀態並且firstTask為null,向線程池中添加線程。
                        // 因為在SHUTDOWN時不會在添加新的任務,但還是會執行workQueue中的任務
                        (rs == SHUTDOWN && firstTask == null)) {
                    // 當前線程已經啟動,拋出異常
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // workers是一個HashSet<Worker>
                    workers.add(w);
                    // 設置最大的池大小largestPoolSize,workerAdded設置為true
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 啟動線程
            if (workerAdded) {
                // 啟動時會調用Worker類中的run方法,Worker本身實現了Runnable接口,所以一個Worker類型的對象也是一個線程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 線程啟動失敗
        if (!workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
Worker 類說明
1. 每個worker,都是一條線程,同時里面包含了一個firstTask,即初始化時要被首先執行的任務;2. 最終執行任務的,是 runWorker()方法;

Worker 類繼承了 AQS並實現了 Runnable 接口,注意其中的 firstTask 和 thread 屬性:

firstTask 用它來保存傳入的任務;thread 是在調用構造方法時通過 ThreadFactory 來創建的線程,是用來處理任務的線程。

在調用構造方法時,需要傳入任務,這里通過 getThreadFactory().newThread(this) 來新建一個線程,newThread 方法傳入的參數是 this,因為 Worker 本身繼承了 Runnable 接口,所以一個 Worker 對象在啟動的時候會調用 Worker 類中的 run 方法。

/**
 * 1. 如果 task 不為空,則開始執行 task
 * 2. 如果 task 為空則通過 getTask()再去取任務,並賦值給 task;如果取到的 Runnable 不為空則執行該任務
 * 3. 執行完畢后,通過 while 循環繼續 getTask()取任務
 * 4. 如果 getTask()取到的任務依然是空,那么整個 runWorker()方法執行完畢
 */
final void runWorker(Worker w) {
    // 獲取當前線程
    Thread wt = Thread.currentThread();
    // 獲取第一個任務
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 釋放鎖,運行中斷
    w.unlock(); // allow interrupts
    //是否突然完成,如果是由於異常導致的進入finally,那么completedAbruptly==true就是突然完成的
    boolean completedAbruptly = true;
    try {
        // 調用getTask()方法從阻塞隊列中獲取新任務,如果阻塞隊列為空則根據是否超時來判斷是否需要阻塞
        while (task != null || (task = getTask()) != null) {
            /**
             * !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
             * 上鎖,不是為了防止並發執行任務,為了在shutdown()時不終止正在運行的 worker
             * !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
             */
            w.lock();
            /**
             * 如果線程池正在停止,那么要保證當前線程是中斷狀態;
             * 如果不是的話,則要保證當前線程不是中斷狀態;
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    /**
                     * 線程池為 stop 狀態時不接受新任務,不執行已經加入任務隊列的任務,還中斷正在執
                     * 行的任務,所以對於 stop 狀態以上是要中斷線程的
                     */
                    (Thread.interrupted() &&
                            runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                // 設置中斷標志
                wt.interrupt();
            try {
                //自定義實現
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 執行任務
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x;
                    throw x;
                } catch (Error x) {
                    thrown = x;
                    throw x;
                } catch (Throwable x) {
                    thrown = x;
                    throw new Error(x);
                } finally {
                    //自定義實現
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                // 完成任務數 + 1
                w.completedTasks++;
                // 釋放鎖
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 獲取不到任務時,主動回收自己
        // 線程回收的工作是在processWorkerExit方法完成的
        processWorkerExit(w, completedAbruptly);
    }
}

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM