線程池之ThreadPoolExecutor線程池源碼分析筆記


1.線程池的作用

一方面當執行大量異步任務時候線程池能夠提供較好的性能,在不使用線程池的時候,每當需要執行異步任務時候是直接 new 一線程進行運行,而線程的創建和銷毀是需要開銷的。使用線程池時候,線程池里面的線程是可復用的,不會每次執行異步任務時候都重新創建和銷毀線程。

另一方面線程池提供了一種資源限制和管理的手段,比如可以限制線程的個數,動態新增線程等,每個 ThreadPoolExecutor 也保留了一些基本的統計數據,比如當前線程池完成的任務數目等。

 

 2.ThreadPoolExecutor 原理探究

類圖如下:

 

 

如上類圖,Executors 其實是個工具類,里面提供了好多靜態方法,根據用戶選擇返回不同的線程池實例。

ThreadPoolExecutor 繼承了 AbstractExecutorService,成員變量 ctl 是個 Integer 的原子變量用來記錄線程池狀態 和 線程池中線程個數,類似於 ReentrantReadWriteLock 使用一個變量存放兩種信息。

這里假設 Integer 類型是 32 位二進制標示,則其中高 3 位用來表示線程池狀態,后面 29 位用來記錄線程池線程個數。

//用來標記線程池狀態(高3位),線程個數(低29位)
//默認是RUNNING狀態,線程個數為0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//線程個數掩碼位數,並不是所有平台int類型是32位,所以准確說是具體平台下Integer的二進制位數-3后的剩余位數才是線程的個數,
private static final int COUNT_BITS = Integer.SIZE - 3;

//線程最大個數(低29位)00011111111111111111111111111111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

線程池狀態:

//(高3位):11100000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;

//(高3位):00000000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;

//(高3位):00100000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;

//(高3位):01000000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;

//(高3位):01100000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;
// 獲取高三位 運行狀態
private static int runStateOf(int c)     { return c & ~CAPACITY; }

//獲取低29位 線程個數
private static int workerCountOf(int c)  { return c & CAPACITY; }

//計算ctl新值,線程狀態 與 線程個數
private static int ctlOf(int rs, int wc) { return rs | wc; }

線程池狀態含義:

  • RUNNING:接受新任務並且處理阻塞隊列里的任務;

  • SHUTDOWN:拒絕新任務但是處理阻塞隊列里的任務;

  • STOP:拒絕新任務並且拋棄阻塞隊列里的任務,同時會中斷正在處理的任務;

  • TIDYING:所有任務都執行完(包含阻塞隊列里面任務)當前線程池活動線程為 0,將要調用 terminated 方法;

  • TERMINATED:終止狀態,terminated方法調用完成以后的狀態。

線程池狀態轉換:

       1.RUNNING -> SHUTDOWN:顯式調用 shutdown() 方法,或者隱式調用了 finalize(),它里面調用了 shutdown() 方法。

       2.RUNNING or SHUTDOWN -> STOP:顯式調用 shutdownNow() 方法時候。

       3.SHUTDOWN -> TIDYING:當線程池和任務隊列都為空的時候。

       4.STOP -> TIDYING:當線程池為空的時候。

       5.TIDYING -> TERMINATED:當 terminated() hook 方法執行完成時候。

 

線程池參數:

  • corePoolSize:線程池核心線程個數;

  • workQueue:用於保存等待執行的任務的阻塞隊列;比如基於數組的有界 ArrayBlockingQueue,基於鏈表的無界 LinkedBlockingQueue,最多只有一個元素的同步隊列 SynchronousQueue,優先級隊列 PriorityBlockingQueue 等。

  • maximunPoolSize:線程池最大線程數量。

  • ThreadFactory:創建線程的工廠。

  • RejectedExecutionHandler:飽和策略,當隊列滿了並且線程個數達到 maximunPoolSize 后采取的策略,比如 AbortPolicy (拋出異常),CallerRunsPolicy(使用調用者所在線程來運行任務),DiscardOldestPolicy(調用 poll 丟棄一個任務,執行當前任務),DiscardPolicy(默默丟棄,不拋出異常)。

  • keeyAliveTime:存活時間。如果當前線程池中的線程數量比核心線程數量要多,並且是閑置狀態的話,這些閑置的線程能存活的最大時間。

  • TimeUnit,存活時間的時間單位。

線程池類型:

      1.newFixedThreadPool:創建一個核心線程個數和最大線程個數都為 nThreads 的線程池,並且阻塞隊列長度為 Integer.MAX_VALUEkeeyAliveTime=0 說明只要線程個數比核心線程個數多並且當前空閑則回收。代碼如下:

 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
 }
 //使用自定義線程創建工廠
 public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
 }

 

2.newSingleThreadExecutor:創建一個核心線程個數和最大線程個數都為1的線程池,並且阻塞隊列長度為 Integer.MAX_VALUEkeeyAliveTime=0 說明只要線程個數比核心線程個數多並且當前空閑則回收。代碼如下:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    //使用自己的線程工廠
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

 

3.newCachedThreadPool:創建一個按需創建線程的線程池,初始線程個數為 0,最多線程個數為 Integer.MAX_VALUE,並且阻塞隊列為同步隊列,keeyAliveTime=60 說明只要當前線程 60s 內空閑則回收。這個特殊在於加入到同步隊列的任務會被馬上被執行,同步隊列里面最多只有一個任務。代碼如下:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    //使用自定義的線程工廠
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

 

如類圖,其中 mainLock 是獨占鎖,用來控制新增 Worker 線程時候的原子性,termination 是該鎖對應的條件隊列,在線程調用 awaitTermination 時候用來存放阻塞的線程。

Worker 繼承 AQS 和 Runnable 接口,是具體承載任務的對象,Worker 繼承了 AQS,自己實現了簡單不可重入獨占鎖,其中 status=0 標示鎖未被獲取狀態,state=1 標示鎖已經被獲取的狀態,state=-1 是創建 Worker 時候默認的狀態,創建時候狀態設置為 -1 是為了避免在該線程在運行 runWorker() 方法前被中斷,下面會具體講解到。其中變量 firstTask 記錄該工作線程執行的第一個任務,thread 是具體執行任務的線程。

DefaultThreadFactory 是線程工廠,newThread 方法是對線程的一個修飾,其中 poolNumber 是個靜態的原子變量,用來統計線程工廠的個數,threadNumber 用來記錄每個線程工廠創建了多少線程,這兩個值也作為線程池和線程的名稱的一部分。

 

3.源碼分析

1 public void execute(Runnable command):execute 方法是提交任務 command 到線程池進行執行,用戶線程提交任務到線程池的模型圖如下所示:

如上圖可知 ThreadPoolExecutor 的實現實際是一個生產消費模型,其中當用戶添加任務到線程池時候相當於生產者生產元素,workers 線程工作集中的線程直接執行任務或者從任務隊列里面獲取任務相當於消費者消費元素。用戶線程提交任務的 execute 方法具體代碼如下:

public void execute(Runnable command) {

    //(1) 如果任務為null,則拋出NPE異常
    if (command == null)
        throw new NullPointerException();

    //(2)獲取當前線程池的狀態+線程個數變量的組合值
    int c = ctl.get();

    //(3)當前線程池線程個數是否小於corePoolSize,小於則開啟新線程運行
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    //(4)如果線程池處於RUNNING狀態,則添加任務到阻塞隊列
    if (isRunning(c) && workQueue.offer(command)) {

        //(4.1)二次檢查
        int recheck = ctl.get();
        //(4.2)如果當前線程池狀態不是RUNNING則從隊列刪除任務,並執行拒絕策略
        if (! isRunning(recheck) && remove(command))
            reject(command);

        //(4.3)否者如果當前線程池線程空,則添加一個線程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //(5)如果隊列滿了,則新增線程,新增失敗則執行拒絕策略
    else if (!addWorker(command, false))
        reject(command);
}

 

1.代碼(3)判斷如果當前線程池線程個數小於 corePoolSize,如上圖會在 workers 里面新增一個核心線程(core 線程)執行該任務。

2.如果當前線程池線程個數大於等於 corePoolSize 執行代碼(4),如果當前線程池處於 RUNNING 狀態則添加當前任務到任務隊列,這里需要判斷線程池狀態是因為有可能線程池已經處於非 RUNNING 狀態,而非 RUNNING 狀態下是拋棄新任務的。

3.如果任務添加任務隊列成功,則代碼(4.2)對線程池狀態進行二次校驗,這是因為添加任務到任務隊列后,執行代碼(4.2)前有可能線程池的狀態已經變化了,這里進行二次校驗,如果當前線程池狀態不是 RUNNING 了則把任務從任務隊列移除,移除后執行拒絕策略;如果二次校驗通過,則執行代碼(4.3)重新判斷當前線程池里面是否還有線程,如果沒有則新增一個線程。

4.如果代碼(4)添加任務失敗,則說明任務隊列滿了,則執行代碼(5)嘗試新開啟線程(如上圖 thread 3 和 thread 4)來執行該任務,如果當前線程池線程個數 > maximumPoolSize 則執行拒絕策略。

 

接下來看新增線程的 addWorkder 方法的源碼,如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        //(6) 檢查隊列是否只在必要時為空
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        //(7)循環cas增加線程個數
        for (;;) {
            int wc = workerCountOf(c);

            //(7.1)如果線程個數超限則返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //(7.2)cas增加線程個數,同時只有一個線程成功
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //(7.3)cas失敗了,則看線程池狀態是否變化了,變化則跳到外層循環重試重新獲取線程池狀態,否者內層循環重新cas。
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    //(8)到這里說明cas成功了
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //(8.1)創建worker
        final ReentrantLock mainLock = this.mainLock;
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {

            //(8.2)加獨占鎖,為了workers同步,因為可能多個線程調用了線程池的execute方法。
            mainLock.lock();
            try {

                //(8.3)重新檢查線程池狀態,為了避免在獲取鎖前調用了shutdown接口
                int c = ctl.get();
                int rs = runStateOf(c);

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //(8.4)添加任務
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //(8.5)添加成功則啟動任務
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

 

如上代碼主要分兩部分,第一部分的雙重循環目的是通過 cas 操作增加線程池線程數,第二部分主要是並發安全的把任務添加到 workers 里面,並且啟動任務執行。

先看第一部分的代碼(6),如下所示:

rs >= SHUTDOWN &&
               ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty())

這樣看不好理解,我們展開!運算符后,相當於:

s >= SHUTDOWN &&
                (rs != SHUTDOWN ||//(1)
              firstTask != null ||//(2)
              workQueue.isEmpty())//(3)

 

如上代碼,也就是說代碼(6)在下面幾種情況下會返回 false:

    1.當前線程池狀態為 STOP,TIDYING,TERMINATED;

    2.當前線程池狀態為 SHUTDOWN 並且已經有了第一個任務;

    3.當前線程池狀態為 SHUTDOWN 並且任務隊列為空。

 

回到上面看新增線程的 addWorkder 方法,發現內層循環作用是使用 cas 增加線程,代碼(7.1)如果線程個數超限則返回 false,否者執行代碼(7.2)執行 CAS 操作設置線程個數,cas 成功則退出雙循環,CAS 失敗則執行代碼(7.3)看當前線程池的狀態是否變化了,如果變了,則重新進入外層循環重新獲取線程池狀態,否者進入內層循環繼續進行 cas 嘗試。

執行到第二部分的代碼(8)說明使用 CAS 成功的增加了線程個數,但是現在任務還沒開始執行,這里使用全局的獨占鎖來控制把新增的 Worker 添加到工作集 workers。代碼(8.1)創建了一個工作線程 Worker。

代碼(8.2)獲取了獨占鎖,代碼(8.3)重新檢查線程池狀態,這是為了避免在獲取鎖前其他線程調用了 shutdown 關閉了線程池,如果線程池已經被關閉,則釋放鎖,新增線程失敗,否者執行代碼(8.4)添加工作線程到線程工作集,然后釋放鎖,代碼(8.5)如果判斷如果工作線程新增成功,則啟動工作線程。

 

3.2 工作線程 Worker 的執行

當用戶線程提交任務到線程池后,具體是使用 worker 來執行的,先看下 Worker 的構造函數:

Worker(Runnable firstTask) {
    setState(-1); // 在調用runWorker前禁止中斷
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);//創建一個線程
}

 

如上代碼構造函數內首先設置 Worker 的狀態為 -1,是為了避免當前 worker 在調用 runWorker 方法前被中斷(當其它線程調用了線程池的 shutdownNow 時候,如果 worker 狀態 >= 0 則會中斷該線程)。這里設置了線程的狀態為 -1,所以該線程就不會被中斷了。如下代碼運行 runWorker 的代碼(9)時候會調用 unlock 方法,該方法把 status 變為了 0,所以這時候調用 shutdownNow 會中斷 worker 線程了。

 

接着我們再看看runWorker方法,代碼如下:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); //(9)status設置為0,允許中斷
        boolean completedAbruptly = true;
        try {
           //(10)
            while (task != null || (task = getTask()) != null) {

                 //(10.1)
                w.lock();
               ...
                try {
                    //(10.2)任務執行前干一些事情
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();//(10.3)執行任務
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //(10.4)任務執行完畢后干一些事情
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    //(10.5)統計當前worker完成了多少個任務
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {

            //(11)執行清工作
            processWorkerExit(w, completedAbruptly);
        }
    }

 

如上代碼(10)如果當前 task==null 或者調用 getTask 從任務隊列獲取的任務返回 null,則跳轉到代碼(11)執行。如果 task 不為 null 則執行代碼(10.1)獲取工作線程內部持有的獨占鎖,然后執行擴展接口代碼(10.2)在具體任務執行前做一些事情,代碼(10.3)具體執行任務,代碼(10.4)在任務執行完畢后做一些事情,代碼(10.5)統計當前 worker 完成了多少個任務,並釋放鎖。

這里在執行具體任務期間加鎖,是為了避免任務運行期間,其他線程調用了 shutdown 或者 shutdownNow 命令關閉了線程池。

 

其中代碼(11)執行清理任務,其代碼如下:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
     ...代碼太長,這里就不展示了

    //(11.1)統計整個線程池完成的任務個數,並從工作集里面刪除當前woker
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    //(11.2)嘗試設置線程池狀態為TERMINATED,如果當前是shutdonw狀態並且工作隊列為空
    //或者當前是stop狀態當前線程池里面沒有活動線程
    tryTerminate();

    //(11.3)如果當前線程個數小於核心個數,則增加
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

 

如上代碼(11.1)統計線程池完成任務個數,可知在統計前加了全局鎖,把當前工作線程中完成的任務累加到全局計數器,然后從工作集中刪除當前 worker。

代碼(11.2)判斷如果當前線程池狀態是 shutdonw 狀態並且工作隊列為空或者當前是 stop 狀態當前線程池里面沒有活動線程則設置線程池狀態為 TERMINATED,如果設置為了 TERMINATED 狀態還需要調用條件變量 termination 的 signalAll() 方法激活所有因為調用線程池的 awaitTermination 方法而被阻塞的線程

代碼(11.3)則判斷當前線程里面線程個數是否小於核心線程個數,如果是則新增一個線程。

 

3.3 shutdown 操作:調用 shutdown 后,線程池就不會在接受新的任務了,但是工作隊列里面的任務還是要執行的,該方法立刻返回的,並不等待隊列任務完成在返回。代碼如下:

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //(12)權限檢查
        checkShutdownAccess();

        //(13)設置當前線程池狀態為SHUTDOWN,如果已經是SHUTDOWN則直接返回
        advanceRunState(SHUTDOWN);

        //(14)設置中斷標志
        interruptIdleWorkers();
        onShutdown(); 
    } finally {
        mainLock.unlock();
    }
    //(15)嘗試狀態變為TERMINATED
    tryTerminate();
}

 

如上代碼(12)檢查如果設置了安全管理器,則看當前調用 shutdown 命令的線程是否有關閉線程的權限,如果有權限則還要看調用線程是否有中斷工作線程的權限,如果沒有權限則拋出 SecurityException 或者 NullPointerException 異常。

其中代碼(13)內容如下,如果當前狀態 >= SHUTDOWN 則直接返回,否者設置當前狀態為 SHUTDOWN:

private void advanceRunState(int targetState) {
    for (;;) {
        int c = ctl.get();
        if (runStateAtLeast(c, targetState) ||
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

 

代碼(14)內容如下,設置所有空閑線程的中斷標志,這里首先加了全局鎖,同時只有一個線程可以調用 shutdown 設置中斷標志,然后嘗試獲取 worker 自己的鎖,獲取成功則設置中斷標識,由於正在執行的任務已經獲取了鎖,所以正在執行的任務沒有被中斷。這里中斷的是阻塞到 getTask() 方法,企圖從隊列里面獲取任務的線程,也就是空閑線程。

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            //如果工作線程沒有被中斷,並且沒有正在運行則設置設置中斷
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

 

代碼(15)判斷如果當前線程池狀態是 shutdonw 狀態並且工作隊列為空或者當前是 stop 狀態當前線程池里面沒有活動線程則設置線程池狀態為 TERMINATED,如果設置為了 TERMINATED 狀態還需要調用條件變量 termination 的 signalAll()方法激活所有因為調用線程池的 awaitTermination 方法而被阻塞的線程

 

3.4 shutdownNow 操作

調用 shutdownNow 后,線程池就不會在接受新的任務了,並且丟棄工作隊列里面里面的任務,正在執行的任務會被中斷,該方法是立刻返回的,並不等待激活的任務執行完成在返回。返回值為這時候隊列里面被丟棄的任務列表。代碼如下:

public List<Runnable> shutdownNow() {


    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();//(16)權限檢查
        advanceRunState(STOP);//(17) 設置線程池狀態為stop
        interruptWorkers();//(18)中斷所有線程
        tasks = drainQueue();//(19)移動隊列任務到tasks
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

 

如上代碼首先調用代碼(16)檢查權限,然后調用代碼(17)設置當前線程池狀態為 stop,然后執行代碼(18)中斷所有的工作線程,這里需要注意的是中斷所有的線程,包含空閑線程和正在執行任務的線程,代碼如下:

   private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

然后代碼(19)移動當前任務隊列里面任務到 tasks 列表。

3.4 awaitTermination 操作

當線程調用 awaitTermination 方法后,當前線程會被阻塞,知道線程池狀態變為了 TERMINATED 才返回,或者等待時間超時才返回,整個過程獨占鎖,代碼如下:

public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }

 

如上代碼首先獲取了獨占鎖,然后無限循環內部首先判斷當前線程池狀態是否至少是 TERMINATED 狀態,如果是則直接返回。否者說明當前線程池里面還有線程在執行,則看設置的超時時間 nanos 是否小於 0,小於 0 則說明不需要等待,則直接返回;如果大於0則調用條件變量 termination 的 awaitNanos 方法等待 nanos 時間,期望在這段時間內線程池狀態內變為 TERMINATED 狀態。

在講解 shutdown 方法時候提到當線程池狀態變為 TERMINATED 后,會調用 termination.signalAll() 用來激活調用條件變量 termination 的 await 系列方法被阻塞的所有線程,所以如果在調用了 awaitTermination 之后調用了 shutdown 方法,並且 shutdown 內部設置線程池狀態為 TERMINATED 了,則 termination.awaitNanos 方法會返回。

另外在工作線程 Worker 的 runWorker 方法內當工作線程運行結束后,會調用 processWorkerExit 方法,processWorkerExit 方法內部也會調用 tryTerminate 方法測試當前是否應該把線程池設置為 TERMINATED 狀態,如果是,則也會調用 termination.signalAll() 用來激活調用線程池的 awaitTermination 方法而被阻塞的線程

另外當等待時間超時后,termination.awaitNanos 也會返回,這時候會重新檢查當前線程池狀態是否為 TERMINATED,如果是則直接返回,否者繼續阻塞掛起自己。

4、使用線程池需要注意的地方

4.1 創建線程池時候要指定與業務相關的名字,以便於追溯問題

日常開發中當一個應用中需要創建多個線程池時候最好給線程池根據業務類型設置具體的名字,以便在出現問題時候方便進行定位,下面就通過實例來說明不設置時候為何難以定位問題,以及如何進行設置。

下面通過簡單的代碼來說明不指定線程池名稱為何難定位問題,代碼如下:

package com.hjc;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Created by cong on 2019/5/26.
 */
public class ThreadPoolExecutorTest {
    static ThreadPoolExecutor executorOne = new ThreadPoolExecutor(5, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>());
    static ThreadPoolExecutor executorTwo = new ThreadPoolExecutor(5, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>());

    public static void main(String[] args) {

        //接受用戶鏈接模塊
        executorOne.execute(new  Runnable() {
            public void run() {
                System.out.println("接受用戶鏈接線程");
                throw new NullPointerException();
            }
        });
        //具體處理用戶請求模塊
        executorTwo.execute(new  Runnable() {
            public void run() {
                System.out.println("具體處理業務請求線程");
            }
        });

        executorOne.shutdown();
        executorTwo.shutdown();
    }
}

運行代碼輸出如下結果:

 

同理我們並不知道是那個模塊的線程池拋出了這個異常,那么我們看下這個 pool-1-thread-1 是如何來的。其實是使用了線程池默認的 ThreadFactory,翻看線程池創建的源碼如下:

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

   public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
   }

static class DefaultThreadFactory implements ThreadFactory {
        //(1)
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        //(2)
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        //(3)
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
           //(4)
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

 

如上代碼 DefaultThreadFactory 的實現可知:

     1.代碼(1)poolNumber 是 static 的原子變量用來記錄當前線程池的編號,它是應用級別的,所有線程池公用一個,比如創建第一個線程池時候線程池編號為1,創建第二個線程池時候線程池的編號為2,這里 pool-1-thread-1 里面的 pool-1 中的 1 就是這個值。

     2.代碼(2)threadNumber 是線程池級別的,每個線程池有一個該變量用來記錄該線程池中線程的編號,這里 pool-1-thread-1 里面的 thread - 1 中的 1 就是這個值。

     3.代碼(3)namePrefix是線程池中線程的前綴,默認固定為pool。

     4.代碼(4)具體創建線程,可知線程的名稱使用 namePrefix + threadNumber.getAndIncrement() 拼接的。

 

從上知道我們只需對 DefaultThreadFactory 的代碼中 namePrefix 的初始化做手腳,當需要創建線程池是傳入與業務相關的 namePrefix 名稱就可以了,代碼如下:

package com.hjc;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by cong on 2019/5/26.
 */
// 命名線程工廠
public class HjcThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    HjcThreadFactory(String name) {

        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        if (null == name || name.isEmpty()) {
            name = "pool";
        }

        namePrefix = name + "-" + poolNumber.getAndIncrement() + "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

 

然后創建線程池時候如下:

package com.hjc;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Created by cong on 2019/5/26.
 */
public class ThreadPoolExecutorTest {
    static ThreadPoolExecutor executorOne = new ThreadPoolExecutor(5, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(),new HjcThreadFactory("ASYN-ACCEPT-POOL"));
    static ThreadPoolExecutor executorTwo = new ThreadPoolExecutor(5, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(),new HjcThreadFactory("ASYN-PROCESS-POOL"));

    public static void main(String[] args) {

        //接受用戶鏈接模塊
        executorOne.execute(new  Runnable() {
            public void run() {
                System.out.println("接受用戶鏈接線程");
                throw new NullPointerException();
            }
        });
        //具體處理用戶請求模塊
        executorTwo.execute(new  Runnable() {
            public void run() {
                System.out.println("具體處理業務請求線程");
            }
        });

        executorOne.shutdown();
        executorTwo.shutdown();
    }
}

 

 然后運行執行結果如下:

 

 從運行結果拋出的異常,可以看到從 ASYN-ACCEPT-POOL-1-thread-1 就可以知道是接受鏈接線程池拋出的異常。

 

4.4 線程池中使用 ThreadLocal 導致的內存泄露

下面先看線程池中使用 ThreadLocal 的例子:

package com.hjc;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * Created by cong on 2019/5/26.
 */
public class ThreadPoolTest {

    static class LocalVariable {
        private Long[] a = new Long[1024 * 1024];
    }

    // (1)
    final static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 5, 1, TimeUnit.MINUTES,
            new LinkedBlockingQueue<>());
    // (2)
    final static ThreadLocal<LocalVariable> localVariable = new ThreadLocal<LocalVariable>();

    public static void main(String[] args) throws InterruptedException {
        // (3)
        for (int i = 0; i < 50; ++i) {
            poolExecutor.execute(new Runnable() {
                public void run() {
                    // (4)
                    localVariable.set(new LocalVariable());
                    // (5)
                    System.out.println("use local varaible");
                    //localVariable.remove();

                }
            });

            Thread.sleep(1000);
        }
        // (6)
        System.out.println("pool execute over");
    }
}

 

代碼(1)創建了一個核心線程數和最大線程數為 5 的線程池,這個保證了線程池里面隨時都有 5 個線程在運行。

代碼(2)創建了一個 ThreadLocal 的變量,泛型參數為 LocalVariable,LocalVariable 內部是一個 Long 數組。

代碼(3)向線程池里面放入 50 個任務

代碼(4)設置當前線程的 localVariable 變量,也就是把 new 的 LocalVariable 變量放入當前線程的 threadLocals 變量。

由於沒有調用線程池的 shutdown 或者 shutdownNow 方法所以線程池里面的用戶線程不會退出,進而 JVM 進程也不會退出。

 

運行當前代碼,使用 jconsole 監控堆內存變化如下圖:

 

然后解開 localVariable.remove() 注釋,然后在運行,觀察堆內存變化如下:

 

從運行結果一可知,當主線程處於休眠時候進程占用了大概 77M 內存,運行結果二則占用了大概 25M 內存,可知運行代碼一時候內存發生了泄露,下面分析下泄露的原因。

運行結果一的代碼,在設置線程的 localVariable 變量后沒有調用 localVariable.remove()方法,導致線程池里面的 5 個線程的 threadLocals 變量里面的 new LocalVariable() 實例沒有被釋放,雖然線程池里面的任務執行完畢了,但是線程池里面的 5 個線程會一直存在直到 JVM 進程被殺死。

這里需要注意的是由於 localVariable 被聲明了 static,雖然線程的 ThreadLocalMap 里面是對localVariable的弱引用,localVariable也不會被回收。

運行結果二的代碼由於線程在設置 localVariable 變量后及時調用了 localVariable.remove() 方法進行了清理,所以不會存在內存泄露。

總結:線程池里面設置了 ThreadLocal 變量一定要記得及時清理,因為線程池里面的核心線程是一直存在的,如果不清理,那么線程池的核心線程的 threadLocals 變量一直會持有 ThreadLocal 變量。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM