多線程(七) 線程池的實現原理分析


什么是線程池

在 Java 中,如果每個請求到達就創建一個新線程,創建和銷毀線程花費的時間和消耗的系統
資源都相當大,甚至可能要比在處理實際的用戶請求的時間和資源要多的多。
 
如果在一個 Jvm 里創建太多的線程,可能會使系統由於過度消耗內存或“切換過度”而導致系統資源不足
為了解決這個問題,就有了線程池的概念,線程池的核心邏輯是提前創建好若干個線程放在一個容器中。
如果有任務需要處理,則將任務直接分配給線程池中的線程來執行就行,任務處理完以后這個線程不會被銷毀,
而是等待后續分配任務。同時通過線程池來重復管理線程還可以避免創建大量線程增加開銷。
 
線程池的優勢
合理的使用線程池,可以帶來一些好處
1. 降低創建線程和銷毀線程的性能開銷
2. 提高響應速度,當有新任務需要執行是不需要等待線程創建就可以立馬執行
3. 合理的設置線程池大小可以避免因為線程數超過硬件資源瓶頸帶來的問題
 
Java 中提供的線程池 API
我相信有很多同學或多或少都接觸過線程池,也可能自己也研究過線程池的原理。前面部分
的內容會相對簡單點,但是要想合理的使用線程池,那么勢必要對線程池的原理有比較深的理解
 
線程池的使用
要了解一個技術,我們仍然是從使用開始。JDK 為我們提供了幾種不同的線程池實現。我們
先來通過一個簡單的案例來引入線程池的基本使用在 Java 中怎么創建線程池呢?
下面這段代碼演示了創建三個固定線程數的線程池
package com.lf.threaddemo;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName());
    }

    static ExecutorService service = Executors.newFixedThreadPool(3);

    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            service.execute(new Test());
        }
        service.shutdown();
    }
}

Java 中提供的線程池 Api

為了方便大家對於線程池的使用,在 Executors 里面提供了幾個線程池的工廠方法,這樣,
很多新手就不需要了解太多關於 ThreadPoolExecutor 的知識了,他們只需要直接使用
Executors 的工廠方法,就可以使用線程池:
newFixedThreadPool:該方法返回一個固定數量的線程池,線程數不變,當有一個任務提交
               時,若線程池中空閑,則立即執行,若沒有,則會被暫緩在一個任務隊列中,等待有空閑的線程去執行。
newSingleThreadExecutor: 創建一個線程的線程池,若空閑則執行,若沒有空閑線程則暫緩在任務隊列中。
newCachedThreadPool:返回一個可根據實際情況調整線程個數的線程池,不限制最大線程
            數量,若用空閑的線程則執行任務,若無任務則不創建線程。並且每一個空閑線程會在 60 秒后自動回收。
newScheduledThreadPool: 創建一個可以指定線程的數量的線程池,但是這個線程池還帶有
延遲和周期性執行任務的功能,類似定時器。
 
ThreadpoolExecutor 
上面提到的四種線程池的構建,都是基於 ThreadpoolExecutor 來構建的,小伙伴們打起精神
來了,接下來將一起了解一下面試官最喜歡問到的一道面試題“請簡單說下你知道的線程池和ThreadPoolThread 有哪些構造參數
 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
hreadpoolExecutor 有多個重載的構造方法,我們可以基於它最完整的構造方法來分析
先來解釋一下每個參數的作用,稍后我們在分析源碼的過程中再來詳細了解參數的意義、
public ThreadPoolExecutor(int corePoolSize, //核心線程數量
 int maximumPoolSize, //最大線程數
 long keepAliveTime, //超時時間,超出核心線程數量以外的線程空余存活時間
 TimeUnit unit, //存活時間單位
 BlockingQueue<Runnable> workQueue, //保存執行任務的隊列
ThreadFactory threadFactory,//創建新線程使用的工廠
RejectedExecutionHandler handler //當任務無法執行的時候的處理方式)
這個地方有很多同學問過我,線程池初始化以后做了什么事情
線程池初始化時是沒有創建線程的,線程池里的線程的初始化與其他線程一樣,但是在完成
任務以后,該線程不會自行銷毀,而是以掛起的狀態返回到線程池。直到應用程序再次向線
程池發出請求時,線程池里掛起的線程就會再度激活執行任務。這樣既節省了建立線程所造
成的性能損耗,也可以讓多個任務反復重用同一線程,從而在應用程序生存期內節約大量開銷。
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool 的核心線程數和最大線程數都是指定值,也就是說當線程池中的線程數超
過核心線程數后,任務都會被放到阻塞隊列中。另外 keepAliveTime 為 0,也就是超出核心
線程數量以外的線程空余存活時間
而這里選用的阻塞隊列是 LinkedBlockingQueue,使用的是默認容量 Integer.MAX_VALUE,相當於沒有上限
這個線程池執行任務的流程如下:
1. 線程數少於核心線程數,也就是設置的線程數時,新建線程執行任務
2. 線程數等於核心線程數后,將任務加入阻塞隊列
3. 由於隊列容量非常大,可以一直添加
4. 執行完任務的線程反復去隊列中取任務執行
用途: FixedThreadPool 用於負載比較大的服務器,為了資源的合理利用,需要限制當前線程數量
 
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
 60L, TimeUnit.SECONDS,
 new SynchronousQueue<Runnable>());
}
CachedThreadPool 創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空
閑線程,若無可回收,則新建線程; 並且沒有核心線程,非核心線程數無上限,但是每個空閑
的時間只有 60 秒,超過后就會被回收。
它的執行流程如下:
1. 沒有核心線程,直接向 SynchronousQueue 中提交任務
2. 如果有空閑線程,就去取出任務執行;如果沒有空閑線程,就新建一個
3. 執行完任務的線程有 60 秒生存時間,如果在這個時間內可以接到新任務,就可以繼續活下去,否則就被回收
newSingleThreadExecutor
創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。
 

線程池的實現原理分析

線程池的基本使用我們都清楚了,接下來我們來了解一下線程池的實現原理
ThreadPoolExecutor 是線程池的核心,提供了線程池的實現。
ScheduledThreadPoolExecutor 繼承了 ThreadPoolExecutor,並另外提供一些調度方法以支
持定時和周期任務。Executers 是工具類,主要用來創建線程池對象
我們把一個任務提交給線程池去處理的時候,線程池的處理過程是什么樣的呢?首先直接來看看定義 
 
線程池原理分析( FixedThreadPool

 

 

源碼分析
execute
基於源碼入口進行分析,先看 execute 方法 
public void execute(Runnable command) {
 if (command == null)
 throw new NullPointerException();
 int c = ctl.get();
 if (workerCountOf(c) < corePoolSize) {//1.當前池中線程比核心數少,新建一個線程執行任務
 if (addWorker(command, true))
 return;
 c = ctl.get();
 }
 if (isRunning(c) && workQueue.offer(command)) {//2.核心池已滿,但任務隊列未滿,添加到隊列中
 int recheck = ctl.get();
//任務成功添加到隊列以后,再次檢查是否需要添加新的線程,因為已存在的線程可能被銷毀了
 if (! isRunning(recheck) && remove(command))
 reject(command);//如果線程池處於非運行狀態,並且把當前的任務從任務隊列中移除成功,則拒絕該任務
 else if (workerCountOf(recheck) == 0)//如果之前的線程已被銷毀完,新建一個線程
 addWorker(null, false);
 }
 else if (!addWorker(command, false)) //3.核心池已滿,隊列已滿,試着創建一個新線程
 reject(command); //如果創建新線程失敗了,說明線程池被關閉或者線程池完全滿了,拒絕任務
}
ctl 的作用
在線程池中,ctl 貫穿在線程池的整個生命周期中
ctl:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0));
它是一個原子類,主要作用是用來保存線程數量和線程池的狀態。我們來分析一下這段代碼,
其實比較有意思,他用到了位運算
一個 int 數值是 32 個 bit 位,這里采用高 3 位來保存運行狀態,低 29 位來保存線程數量。
我們來分析默認情況下,也就是 ctlOf(RUNNING)運行狀態,調用了 ctlOf(int rs,int wc)方法;
其中
private static int ctlOf(int rs, int wc) { return rs | wc; }
其中 RUNNING =-1 << COUNT_BITS ; -1 左移 29 位. -1 的二進制是 32 個 1(1111 1111 11111111 1111 1111 1111 1111) 
-1 的二進制計算方法
原碼是 1000001 . 高位 1 表示符號位。
然后對原碼取反,高位不變得到 1111110
然后對反碼進行+1 ,也就是補碼操作, 最后得到 11111111
那么-1 <<左移 29 位, 也就是 【111】 表示; rs | wc 。二進制的 111 | 000 。得到的結
果仍然是 111那么同理可得其他的狀態的 bit 位表示
private static final int COUNT_BITS = Integer.SIZE - 3; //32-3
private static final int CAPACITY = (1 << COUNT_BITS) - 1; //將 1 的二進制
向右位移 29 位,再減 1 表示最大線程容量
//運行狀態保存在 int 值的高 3 位 (所有數值左移 29 位)
private static final int RUNNING = -1 << COUNT_BITS;// 接收新任務,並執行隊
列中的任務
private static final int SHUTDOWN = 0 << COUNT_BITS;// 不接收新任務,但是執行隊列中的任務
private static final int STOP = 1 << COUNT_BITS;// 不接收新任務,不執行隊列中的任務,中斷正在執行中的任務
private static final int TIDYING = 2 << COUNT_BITS; //所有的任務都已結束,
線程數量為 0,處於該狀態的線程池即將調用 terminated()方法
private static final int TERMINATED = 3 << COUNT_BITS;// terminated()方法執行完成

 

 

addWorker
如果工作線程數小於核心線程數的話,會調用 addWorker,顧名思義,其實就是要創建一個
工作線程。我們來看看源碼的實現
源碼比較長,看起來比較唬人,其實就做了兩件事。1)才用循環 CAS 操作來將線程數加 1;
2)新建一個線程並啟用
 retry: //goto 語句,避免死循環
 for (;;) {
 int c = ctl.get();
 int rs = runStateOf(c);
 // Check if queue empty only if necessary.
如果線程處於非運行狀態,並且 rs 不等於 SHUTDOWN 且 firstTask 不等於空且且
workQueue 為空,直接返回 false(表示不可添加 work 狀態)
1. 線程池已經 shutdown 后,還要添加新的任務,拒絕
2. (第二個判斷)SHUTDOWN 狀態不接受新任務,但仍然會執行已經加入任務隊列的任
務,所以當進入 SHUTDOWN 狀態,而傳進來的任務為空,並且任務隊列不為空的時候,是允許添加
新線程的,如果把這個條件取反,就表示不允許添加 worker
 if (rs >= SHUTDOWN &&
 ! (rs == SHUTDOWN &&
 firstTask == null &&
 ! workQueue.isEmpty()))
 return false;
 for (;;) { //自旋
 int wc = workerCountOf(c);//獲得 Worker 工作線程數
//如果工作線程數大於默認容量大小或者大於核心線程數大小,則直接返回 false 表示不能再添加 worker。
 if (wc >= CAPACITY || 
 wc >= (core ? corePoolSize : maximumPoolSize))
 return false;
 if (compareAndIncrementWorkerCount(c))//通過 cas 來增加工作線程數,如果 cas 失敗,則直接重試
 break retry;
 c = ctl.get(); // Re-read ctl //再次獲取 ctl 的值
 if (runStateOf(c) != rs) //這里如果不想等,說明線程的狀態發生了變化,繼續重試
 continue retry;
 // else CAS failed due to workerCount change; retry inner loop
 }
 }
 //上面這段代碼主要是對 worker 數量做原子+1 操作,下面的邏輯才是正式構建一個 worker
 boolean workerStarted = false; //工作線程是否啟動的標識
 boolean workerAdded = false; //工作線程是否已經添加成功的標識
 Worker w = null; 
 try {
 w = new Worker(firstTask); //構建一個 Worker,這個 worker 是什么呢?我們可以看到構造方法里面傳入了一個 Runnable 對象
 final Thread t = w.thread; //從 worker 對象中取出線程
 if (t != null) {
 final ReentrantLock mainLock = this.mainLock;
 mainLock.lock(); //這里有個重入鎖,避免並發問題
 try {
 // Recheck while holding lock.
 // Back out on ThreadFactory failure or if
 // shut down before lock acquired.
 int rs = runStateOf(ctl.get());
 //只有當前線程池是正在運行狀態,[或是 SHUTDOWN 且 firstTask 為空],才能添加到 workers 集合中
 if (rs < SHUTDOWN ||
 (rs == SHUTDOWN && firstTask == null)) {
//任務剛封裝到 work 里面,還沒 start,你封裝的線程就是 alive,幾個意思?肯定是要拋異常出去的
 if (t.isAlive()) // precheck that t is startable
 throw new IllegalThreadStateException();
 workers.add(w); //將新創建的 Worker 添加到 workers 集合中
 int s = workers.size();
//如果集合中的工作線程數大於最大線程數,這個最大線程數表示線程池曾經出現過的最大線程數
 if (s > largestPoolSize) 
 largestPoolSize = s; //更新線程池出現過的最大線程數
 workerAdded = true;//表示工作線程創建成功了
 }
 } finally {
 mainLock.unlock(); //釋放鎖
 }
 if (workerAdded) {//如果 worker 添加成功
 t.start();//啟動線程
 workerStarted = true;
 }
 }
 } finally {
 if (! workerStarted)
 addWorkerFailed(w); //如果添加失敗,就需要做一件事,就是遞減實際工作線程數(還記得我們最開始的時候增加了工作線程數嗎)
 }
 return workerStarted;//返回結果
}
Worker 類說明
我們發現 addWorker 方法只是構造了一個 Worker,並且把 firstTask 封裝到 worker 中,它是
做什么的呢?我們來看看
1. 每個 worker,都是一條線程,同時里面包含了一個 firstTask,即初始化時要被首先執行的任務.
2. 最終執行任務的,是 runWorker()方法
Worker 類繼承了 AQS,並實現了 Runnable 接口,注意其中的 firstTask 和 thread 屬性:
firstTask 用它來保存傳入的任務;thread 是在調用構造方法時通過 ThreadFactory 來創建的
線程,是用來處理任務的線程。
在調用構造方法時,需要傳入任務,這里通過 getThreadFactory().newThread(this);來新建
一個線程,newThread 方法傳入的參數是 this,因為 Worker 本身繼承了 Runnable 接口,
也就是一個線程,所以一個 Worker 對象在啟動的時候會調用 Worker 類中的 run 方法。
Worker 繼承了 AQS,使用 AQS 來實現獨占鎖的功能。為什么不使用 ReentrantLock 來實
現呢?可以看到 tryAcquire 方法,它是不允許重入的,而 ReentrantLock 是允許重入的:
lock 方法一旦獲取了獨占鎖,表示當前線程正在執行任務中;那么它會有以下幾個作用
1. 如果正在執行任務,則不應該中斷線程;
2. 如果該線程現在不是獨占鎖的狀態,也就是空閑的狀態,說明它沒有在處理任務,這時可
以對該線程進行中斷;3. 線程池在執行 shutdown 方法或 tryTerminate 方法時會調用 interruptIdleWorkers 方法來
中斷空閑的線程,interruptIdleWorkers 方法會使用 tryLock 方法來判斷線程池中的線程
是否是空閑狀態
4. 之所以設置為不可重入,是因為我們不希望任務在調用像 setCorePoolSize 這樣的線程池
控制方法時重新獲取鎖,這樣會中斷正在運行的線程
5. private final class Worker
 extends AbstractQueuedSynchronizer
 implements Runnable
{
 private static final long serialVersionUID = 6138294804551838833L;
 /** Thread this worker is running in. Null if factory fails. */
 final Thread thread; //注意了,這才是真正執行 task 的線程,從構造函數可知是由ThreadFactury 創建的
 /** Initial task to run. Possibly null. */
 Runnable firstTask; //這就是需要執行的 task
 /** Per-thread task counter */
 volatile long completedTasks; //完成的任務數,用於線程池統計
 Worker(Runnable firstTask) {
 setState(-1); //初始狀態 -1,防止在調用 runWorker(),也就是真正執行 task前中斷 thread。
 this.firstTask = firstTask;
 this.thread = getThreadFactory().newThread(this);
 }
 public void run() {
 runWorker(this);
 }
 protected boolean isHeldExclusively() {
 return getState() != 0;
 }
 protected boolean tryAcquire(int unused) {
 if (compareAndSetState(0, 1)) {
 setExclusiveOwnerThread(Thread.currentThread());
 return true;
 }
 return false;
 }
 protected boolean tryRelease(int unused) {
 setExclusiveOwnerThread(null);
 setState(0);
 return true;
 }
 public void lock() { acquire(1); }
 public boolean tryLock() { return tryAcquire(1); }
 public void unlock() { release(1); }
 public boolean isLocked() { return isHeldExclusively(); }
 void interruptIfStarted() {
 Thread t;
 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
 try {
 t.interrupt();
 } catch (SecurityException ignore) {
 }
 }
 } }
addWorkerFailed
addWorker 方法中,如果添加 Worker 並且啟動線程失敗,則會做失敗后的處理。
這個方法主要做兩件事
1. 如果 worker 已經構造好了,則從 workers 集合中移除這個 worker
2. 原子遞減核心線程數(因為在 addWorker 方法中先做了原子增加)
3. 嘗試結束線程池
private void addWorkerFailed(Worker w) {
 final ReentrantLock mainLock = this.mainLock;
 mainLock.lock();
 try {
 if (w != null)
 workers.remove(w);
 decrementWorkerCount();
 tryTerminate();
 } finally {
 mainLock.unlock();
 } }
runWorker 方法
前面已經了解了 ThreadPoolExecutor 的核心方法 addWorker,主要作用是增加工作線程,
而 Worker 簡單理解其實就是一個線程,里面重新了 run 方法,這塊是線程池中執行任務的
真正處理邏輯,也就是 runWorker 方法,這個方法主要做幾件事
1. 如果 task 不為空,則開始執行 task
2. 如果 task 為空,則通過 getTask()再去取任務,並賦值給 task,如果取到的 Runnable 不為空,則執行該任務
3. 執行完畢后,通過 while 循環繼續 getTask()取任務
4. 如果 getTask()取到的任務依然是空,那么整個 runWorker()方法執行完畢
final void runWorker(Worker w) {
 Thread wt = Thread.currentThread();
 Runnable task = w.firstTask;
 w.firstTask = null;
//unlock,表示當前 worker 線程允許中斷,因為 new Worker 默認的 state=-1,此處是調用
//Worker 類的 tryRelease()方法,將 state 置為 0, 而 interruptIfStarted()中只有 state>=0 才允許調用中斷
 w.unlock(); // allow interrupts
 boolean completedAbruptly = true;
 try {
//注意這個 while 循環,在這里實現了 [線程復用] // 如果 task 為空,則通過getTask 來獲取任務
 while (task != null || (task = getTask()) != null) {
 w.lock(); //上鎖,不是為了防止並發執行任務,為了在 shutdown()時不終止正在運行的 worker線程池為 stop 狀態時不接受新任務,不執行已經加入任務隊列的任務,還中斷正在執行的任務
//所以對於 stop 狀態以上是要中斷線程的
//(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP)確保線程中斷標志位為 true 且是 stop 狀態以上,接着清除了中斷標志
//!wt.isInterrupted()則再一次檢查保證線程需要設置中斷標志位
 if ((runStateAtLeast(ctl.get(), STOP) ||
 (Thread.interrupted() &&
 runStateAtLeast(ctl.get(), STOP))) &&
 !wt.isInterrupted())
 wt.interrupt();
 try {
 beforeExecute(wt, task);//這里默認是沒有實現的,在一些特定的場景中我們可以自己繼承 ThreadpoolExecutor 自己重寫
 Throwable thrown = null;
 try {
 task.run(); //執行任務中的 run 方法
 } catch (RuntimeException x) {
 thrown = x; throw x;
 } catch (Error x) {
 thrown = x; throw x;
 } catch (Throwable x) {
 thrown = x; throw new Error(x);
 } finally {
 afterExecute(task, thrown); //這里默認默認而也是沒有實現
 }
 } finally {
//置空任務(這樣下次循環開始時,task 依然為 null,需要再通過 getTask()取) + 記錄該 Worker 完成任務數量 + 解鎖
 task = null;
 w.completedTasks++;
 w.unlock();
 }
 }
 completedAbruptly = false;
 } finally {
 processWorkerExit(w, completedAbruptly);
//1.將入參 worker 從數組 workers 里刪除掉;
//2.根據布爾值 allowCoreThreadTimeOut 來決定是否補充新的 Worker 進數組workers
 } }
getTask
worker 線程會從阻塞隊列中獲取需要執行的任務,這個方法不是簡單的 take 數據,我們來分析下他的源碼實現
你也許好奇是怎樣判斷線程有多久沒有活動了,是不是以為線程池會啟動一個監控線程,專
門監控哪個線程正在偷懶?想太多,其實只是在線程從工作隊列 poll 任務時,加上了超時
限制,如果線程在 keepAliveTime 的時間內 poll 不到任務,那我就認為這條線程沒事做,
可以干掉了,看看這個代碼片段你就清楚了
private Runnable getTask() {
 boolean timedOut = false; // Did the last poll() time out?
 for (;;) {//自旋
 int c = ctl.get();
 int rs = runStateOf(c);
// * 對線程池狀態的判斷,兩種情況會 workerCount-1,並且返回 null
 //1. 線程池狀態為 shutdown,且 workQueue 為空(反映了 shutdown 狀態的線程池還是要執行 workQueue 中剩余的任務的)
 //2. 線程池狀態為 stop(shutdownNow()會導致變成 STOP)(此時不用考慮 workQueue的情況)
 // Check if queue empty only if necessary.
 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
 decrementWorkerCount();
 return null;//返回 null,則當前 worker 線程會退出
 }
 int wc = workerCountOf(c);
 // timed 變量用於判斷是否需要進行超時控制。
// allowCoreThreadTimeOut 默認是 false,也就是核心線程不允許進行超時;
 // wc > corePoolSize,表示當前線程池中的線程數量大於核心線程數量;
 // 對於超過核心線程數量的這些線程,需要進行超時控制
 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//1. 線程數量超過 maximumPoolSize 可能是線程池在運行時被調用了 setMaximumPoolSize()被改變了大小,否則已經 addWorker()成功不會超過 maximumPoolSize
//2. timed && timedOut 如果為 true,表示當前操作需要進行超時控制,並且上次從阻塞隊列中獲取任務發生了超時.其實就是體現了空閑線程的存活時間
 if ((wc > maximumPoolSize || (timed && timedOut))
 && (wc > 1 || workQueue.isEmpty())) {
 if (compareAndDecrementWorkerCount(c))
 return null;
 continue;
 }
 try {
//根據 timed 來判斷,如果為 true,則通過阻塞隊列 poll 方法進行超時控制,如果在
//keepaliveTime 時間內沒有獲取到任務,則返回 null.
//否則通過 take 方法阻塞式獲取隊列中的任務
 Runnable r = timed ?
 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
 workQueue.take();
 if (r != null)//如果拿到的任務不為空,則直接返回給 worker 進行處理
 return r;
 timedOut = true;//如果 r==null,說明已經超時了,設置 timedOut=true,在下次自旋的時候進行回收
 } catch (InterruptedException retry) {
 timedOut = false;// 如果獲取任務時當前線程發生了中斷,則設置 timedOut 為false 並返回循環重試
 }
 }}
這里重要的地方是第二個 if 判斷,目的是控制線程池的有效線程數量。由上文中的分析可以
知道,在執行 execute 方法時,如果當前線程池的線程數量超過了 corePoolSize 且小於
maximumPoolSize,並且 workQueue 已滿時,則可以增加工作線程,但這時如果超時沒有
獲取到任務,也就是 timedOut 為 true 的情況,說明 workQueue 已經為空了,也就說明了
當前線程池中不需要那么多線程來執行任務了,可以把多於 corePoolSize 數量的線程銷毀
掉,保持線程數量在 corePoolSize 即可。
什么時候會銷毀?當然是 runWorker 方法執行完之后,也就是 Worker 中的 run 方法執行完,由 JVM 自動回收。
getTask 方法返回 null 時,在 runWorker 方法中會跳出 while 循環,然后會執行processWorkerExit 方法。
processWorkerExit
runWorker 的 while 循環執行完畢以后,在 finally 中會調用 processWorkerExit,來銷毀工作線程。
到目前為止,我們已經從 execute 方法中輸入了 worker 線程的創建到執行以及最后到銷毀
的全部過程。那么我們繼續回到 execute 方法.我們只分析完
addWorker 這段邏輯,繼續來看后面的判斷 execute 后續邏輯分析
如果核心線程數已滿,說明這個時候不能再創建核心線程了,於是走第二個判斷
第二個判斷邏輯比較簡單,如果線程池處於運行狀態並且任務隊列沒有滿,則將任務添加到隊列中
第三個判斷,核心線程數滿了,隊列也滿了,那么這個時候創建新的線程也就是(非核心線程)
如果非核心線程數也達到了最大線程數大小,則直接拒絕任務。
if (isRunning(c) && workQueue.offer(command)) {//2.核心池已滿,但任務隊列未滿,添加到隊列中
 int recheck = ctl.get();
//任務成功添加到隊列以后,再次檢查是否需要添加新的線程,因為已存在的線程可能被銷毀了
 if (! isRunning(recheck) && remove(command))
 reject(command);//如果線程池處於非運行狀態,並且把當前的任務從任務隊列中移除成功,則拒絕該任務
 else if (workerCountOf(recheck) == 0)//如果之前的線程已被銷毀完,新建一個線程
 addWorker(null, false);
 }
else if (!addWorker(command, false)) //3.核心池已滿,隊列已滿,試着創建一個新線程
 reject(command); //如果創建新線程失敗了,說明線程池被關閉或者線程池完全滿了,拒絕任務
拒絕策略
1、 AbortPolicy:直接拋出異常,默認策略;
2、 CallerRunsPolicy:用調用者所在的線程來執行任務;
3、 DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務;
4、 DiscardPolicy:直接丟棄任務;
 
當然也可以根據應用場景實現 RejectedExecutionHandler 接口,自定義飽和策略,如記錄日志或持久化存儲不能處理的任務
線程池的注意事項
分析完線程池以后,我們再來了解一下線程池的注意事項阿里開發手冊不建議使用線程池
不止一個同學問我說阿里開發手冊上不建議使用線程池?估計這些同學都是沒有認真看手冊的。手冊上是說
線程池的構建不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式。
分析完原理以后,大家自己一定要有一個答案。我來簡單分析下,用 Executors 使得用戶不
需要關心線程池的參數配置,意味着大家對於線程池的運行規則也會慢慢的忽略。這會導致
一個問題,
比如我們用 newFixdThreadPool 或者 singleThreadPool.允許的隊列長度為Integer.MAX_VALUE,如果使用不當會導致大量請求堆積到隊列中導致 OOM 的風險
而 newCachedThreadPool,允許創建線程數量為 Integer.MAX_VALUE,也可能會導致大量線程的創建出現 CPU 使用過高或者 OOM 的問題
而如果我們通過 ThreadPoolExecutor 來構造線程池的話,我們勢必要了解線程池構造中每個
參數的具體含義,使得開發者在配置參數的時候能夠更加謹慎。不至於像有些同學去面試的
時候被問到:構造一個線程池需要哪些參數,都回答不上來
 
如何合理配置線程池的大小
如何合理配置線程池大小,也是很多同學反饋給我的問題,我也簡單說一下。線程池大小不
是靠猜,也不是說越多越好。
在遇到這類問題時,先冷靜下來分析
1. 需要分析線程池執行的任務的特性: CPU 密集型還是 IO 密集型
2. 每個任務執行的平均時長大概是多少,這個任務的執行時長可能還跟任務處理邏輯是否涉
及到網絡傳輸以及底層系統資源依賴有關系
如果是 CPU 密集型,主要是執行計算任務,響應時間很快,cpu 一直在運行,這種任務 cpu
的利用率很高,那么線程數的配置應該根據 CPU 核心數來決定,CPU 核心數=最大同時執行
線程數,加入 CPU 核心數為 4,那么服務器最多能同時執行 4 個線程。過多的線程會導致上
下文切換反而使得效率降低。
那線程池的最大線程數可以配置為 cpu 核心數+1
 
如果是 IO 密集型,主要是進行 IO 操作,執行 IO 操作的時間較長,這是 cpu 出於空閑狀態,
導致 cpu 的利用率不高,這種情況下可以增加線程池的大小。這種情況下可以結合線程的等
待時長來做判斷,等待時間越高,那么線程數也相對越多。一般可以配置 cpu 核心數的 2 倍。
一個公式:
線程池設定最佳線程數目 = ((線程池設定的線程等待時間+線程 CPU 時間)/線程 CPU 時間 )* CPU 數目
這個公式的線程 cpu 時間是預估的程序單個線程在 cpu 上運行的時間(通常使用 loadrunner測試大量運行次數求出平均值)
 
線程池中的線程初始化
默認情況下,創建線程池之后,線程池中是沒有線程的,需要提交任務之后才會創建線程。
在實 際中如果需要 線程池創建之 后立即創建線 程,可以通過 以下兩個方法 辦到:
prestartCoreThread():初始化一個核心線程; prestartAllCoreThreads():初始化所有核心線程
ThreadPoolExecutor tpe=(ThreadPoolExecutor)service;
tpe.prestartAllCoreThreads();
線程池的關閉
ThreadPoolExecutor 提 供 了 兩 個 方 法 , 用 於 線 程 池 的 關 閉 , 分 別 是 shutdown() 和
shutdownNow(),其中: shutdown():不會立即終止線程池,而是要等所有任務緩存隊列中
的任務都執行完后才終止,但再也不會接受新的任務 shutdownNow():立即終止線程池,並
嘗試打斷正在執行的任務,並且清空任務緩存隊列,返回尚未執行的任務
線程池容量的動態調整
ThreadPoolExecutor 提 供 了 動 態 調 整 線 程 池 容 量 大 小 的 方 法 : setCorePoolSize() 和
setMaximumPoolSize(),setCorePoolSize:設置核心池大小 setMaximumPoolSize:設置線程池最大能創建的線程數目大小
 
任務緩存隊列及排隊策略
在前面我們多次提到了任務緩存隊列,即 workQueue,它用來存放等待執行的任務。
workQueue 的類型為 BlockingQueue,通常可以取下面三種類型:
1. ArrayBlockingQueue:基於數組的先進先出隊列,此隊列創建時必須指定大小;
2. LinkedBlockingQueue:基於鏈表的先進先出隊列,如果創建時沒有指定此隊列大小,則默認為 Integer.MAX_VALUE;
3. SynchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。
 

線程池的監控

如果在項目中大規模的使用了線程池,那么必須要有一套監控體系,來指導當前線程池的狀
態,當出現問題的時候可以快速定位到問題。而線程池提供了相應的擴展方法,我們通過重
寫線程池的 beforeExecute、afterExecute 和 shutdown 等方式就可以實現對線程的監控,簡
單給大家演示一個案例
package com.lf.threaddemo;

import java.util.Date;
import java.util.concurrent.*;

public class Demo1 extends ThreadPoolExecutor {
    // 保存任務開始執行的時間,當任務結束時,用任務結束時間減去開始時間計算任務執行時間
    private ConcurrentHashMap<String, Date> startTimes;

    public Demo1(int corePoolSize, int maximumPoolSize, long
            keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
                workQueue);
        this.startTimes = new ConcurrentHashMap<>();
    }

    @Override
    public void shutdown() {
        System.out.println("已經執行的任務數: " + this.getCompletedTaskCount() + "," +
                "當前活動線程數:" + this.getActiveCount() + ",當前排隊線程 數:" + this.getQueue().size());
        System.out.println();
        super.shutdown();
    }

    //任務開始之前記錄任務開始時間
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        startTimes.put(String.valueOf(r.hashCode()), new Date());
        super.beforeExecute(t, r);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        Date startDate = startTimes.remove(String.valueOf(r.hashCode()));
        Date finishDate = new Date();
        long diff = finishDate.getTime() - startDate.getTime();
        // 統計任務耗時、初始線程數、核心線程數、正在執行的任務數量、
        // 已完成任務數量、任務總數、隊列里緩存的任務數量、
        // 池中存在的最大線程數、最大允許的線程數、線程空閑時間、線程池是否關閉、線程池是否終止
        System.out.print("任務耗時:" + diff + "\n");
        System.out.print("初始線程數:" + this.getPoolSize() + "\n");
        System.out.print("核心線程數:" + this.getCorePoolSize() + "\n");
        System.out.print("正在執行的任務數量:" + this.getActiveCount() + "\n");
        System.out.print("已經執行的任務 數:" + this.getCompletedTaskCount() + "\n");
        System.out.print("任務總數:" + this.getTaskCount() + "\n");
        System.out.print("最大允許的線程數:" + this.getMaximumPoolSize() + "\n");
        System.out.print("線程空閑時 間:" + this.getKeepAliveTime(TimeUnit.MILLISECONDS) + "\n");
        System.out.println();
        super.afterExecute(r, t);
    }

    public static ExecutorService newCachedThreadPool() {
        return new Demo1(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new
                SynchronousQueue());
    }
}

測試

package com.lf.threaddemo;

import java.util.concurrent.ExecutorService;

public class ThreadPoolTest implements Runnable {
    private static ExecutorService es = Demo1.newCachedThreadPool();

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 100; i++) {
            es.execute(new ThreadPoolTest());
        }
        es.shutdown();
    }
}

 

Callable/Future 使用及原理分析
很多同學應該關注到了。線程池的執行任務有兩種方法,一種是 submit、一種是 execute;
這兩個方法是有區別的,那么基於這個區別我們再來看看。
execute 和 submit 區別
1. execute 只可以接收一個 Runnable 的參數
2. execute 如果出現異常會拋出
3. execute 沒有返回值
1. submit 可以接收 Runable 和 Callable 這兩種類型的參數,
2. 對於 submit 方法,如果傳入一個 Callable,可以得到一個 Future 的返回值
3. submit 方法調用不會拋異常,除非調用 Future.get
這里,我們重點了解一下 Callable/Future,可能很多同學知道他是一個帶返回值的線程,但是具體的實現可能不清楚。
 
Callable/Future 案例演示
Callable/Future 和 Thread 之類的線程構建最大的區別在於,能夠很方便的獲取線程執行完
以后的結果。首先來看一個簡單的例子 
package com.lf.threaddemo;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class CallableDemo implements Callable<String> {
    @Override
    public String call() throws Exception {
        //Thread.sleep(3000);//阻塞案例演示
        return "hello world";
    }

    public static void main(String[] args) throws ExecutionException,
            InterruptedException {
        CallableDemo callableDemo = new CallableDemo();
        FutureTask futureTask = new FutureTask(callableDemo);
        new Thread(futureTask).start();
        System.out.println(futureTask.get());
    }
}
想一想我們為什么需要使用回調呢?那是因為結果值是由另一線程計算的,當前線程是不知
道結果值什么時候計算完成,所以它傳遞一個回調接口給計算線程,當計算完成時,調用這個回調接口,回傳結果值。
這個在很多地方有用到,比如 Dubbo 的異步調用,比如消息中間件的異步通信等等…
利用 FutureTask、Callable、Thread 對耗時任務(如查詢數據庫)做預處理,在需要計算結果之前就啟動計算。
所以我們來看一下 Future/Callable 是如何實現的
Callable/Future 原理分析
在剛剛實現的 demo 中,我們用到了兩個 api,分別是 Callable 和 FutureTask。
Callable 是一個函數式接口,里面就只有一個 call 方法。子類可以重寫這個方法,並且這個
方法會有一個返回值
@FunctionalInterface
public interface Callable<V> {
 /**
 * Computes a result, or throws an exception if unable to do so.
 *
 * @return computed result
 * @throws Exception if unable to compute a result
 */
 V call() throws Exception;
}
FutureTask
FutureTask 的類關系圖如下,它實現 RunnableFuture 接口,那么這個 RunnableFuture 接口的作用是什么呢。
在講解 FutureTask 之前,先看看 Callable, Future, FutureTask 它們之間的關系圖,如下:

 

 

public interface RunnableFuture<V> extends Runnable, Future<V> {
 /**
 * Sets this Future to the result of its computation
 * unless it has been cancelled.
 */
 void run();
}
RunnableFuture 是一個接口,它繼承了 Runnable 和 Future 這兩個接口,Runnable 太熟悉了,那么 Future 是什么呢?
Future 表示一個任務的生命周期,並提供了相應的方法來判斷是否已經完成或取消,以及獲取任務的結果和取消任務等。
public interface Future<V> {
 boolean cancel(boolean mayInterruptIfRunning);
 // 當前的 Future 是否被取消,返回 true 表示已取消
 boolean isCancelled();
 // 當前 Future 是否已結束。包括運行完成、拋出異常以及取消,都表示當前 Future 已結束
 boolean isDone();
 // 獲取 Future 的結果值。如果當前 Future 還沒有結束,那么當前線程就等待,
 // 直到 Future 運行結束,那么會喚醒等待結果值的線程的。
 V get() throws InterruptedException, ExecutionException;
// 獲取 Future 的結果值。與 get()相比較多了允許設置超時時間
 V get(long timeout, TimeUnit unit)
 throws InterruptedException, ExecutionException, TimeoutException;
}
分析到這里我們其實有一些初步的頭緒了,FutureTask 是 Runnable 和 Future 的結合,如果
我們把 Runnable 比作是生產者,Future 比作是消費者,那么 FutureTask 是被這兩者共享的,
生產者運行 run 方法計算結果,消費者通過 get 方法獲取結果。
作為生產者消費者模式,有一個很重要的機制,就是如果生產者數據還沒准備的時候,消費
者會被阻塞。當生產者數據准備好了以后會喚醒消費者繼續執行。
這個有點像我們上次可分析的阻塞隊列,那么在 FutureTask 里面是基於什么方式實現的呢?
state 的含義
表示 FutureTask 當前的狀態,分為七種狀態
private static final int NEW = 0; // NEW 新建狀態,表示這個 FutureTask
還沒有開始運行
// COMPLETING 完成狀態, 表示 FutureTask 任務已經計算完畢了
// 但是還有一些后續操作,例如喚醒等待線程操作,還沒有完成。
private static final int COMPLETING = 1;
// FutureTask 任務完結,正常完成,沒有發生異常
private static final int NORMAL = 2;
// FutureTask 任務完結,因為發生異常。
private static final int EXCEPTIONAL = 3;
// FutureTask 任務完結,因為取消任務
private static final int CANCELLED = 4;
// FutureTask 任務完結,也是取消任務,不過發起了中斷運行任務線程的中斷請求
private static final int INTERRUPTING = 5;
// FutureTask 任務完結,也是取消任務,已經完成了中斷運行任務線程的中斷請求
private static final int INTERRUPTED = 6;
run 方法
public void run() {
// 如果狀態 state 不是 NEW,或者設置 runner 值失敗
// 表示有別的線程在此之前調用 run 方法,並成功設置了 runner 值
// 保證了只有一個線程可以運行 try 代碼塊中的代碼。
 if (state != NEW ||
 !UNSAFE.compareAndSwapObject(this, runnerOffset,
 null, Thread.currentThread()))
 return;
 try {
 Callable<V> c = callable;
 if (c != null && state == NEW) {/ 只有 c 不為 null 且狀態 state 為 NEW 的情況
 V result;
 boolean ran;
 try {
 result = c.call(); //調用 callable 的 call 方法,並獲得返回結果
 ran = true;//運行成功
 } catch (Throwable ex) {
 result = null;
 ran = false;
 setException(ex); //設置異常結果,
 }
 if (ran)
 set(result);//設置結果
 }
 } finally {
 // runner must be non-null until state is settled to
 // prevent concurrent calls to run()
 runner = null;
 // state must be re-read after nulling runner to prevent
 // leaked interrupts
 int s = state;
 if (s >= INTERRUPTING)
 handlePossibleCancellationInterrupt(s);
 }
}
其實 run 方法作用非常簡單,就是調用 callable 的 call 方法返回結果值 result,根據是否發生
異常,調用 set(result)或 setException(ex)方法表示 FutureTask 任務完結。
不過因為 FutureTask 任務都是在多線程環境中使用,所以要注意並發沖突問題。注意在 run
方法中,我們沒有使用 synchronized 代碼塊或者 Lock 來解決並發問題,而是使用了 CAS 這
個樂觀鎖來實現並發安全,保證只有一個線程能運行 FutureTask 任務
 
get 方法
get 方法就是阻塞獲取線程執行結果,這里主要做了兩個事情
1. 判斷當前的狀態,如果狀態小於等於 COMPLETING,表示 FutureTask 任務還沒有完結,
所以調用 awaitDone 方法,讓當前線程等待。
2. report 返回結果值或者拋出異常
public V get() throws InterruptedException, ExecutionException {
 int s = state;
 if (s <= COMPLETING)
 s = awaitDone(false, 0L);
 return report(s);
}
awaitDone
如果當前的結果還沒有被執行完,把當前線程線程和插入到等待隊列
private int awaitDone(boolean timed, long nanos)
 throws InterruptedException {
 final long deadline = timed ? System.nanoTime() + 
nanos : 0L;
 WaitNode q = null;
 boolean queued = false; // 節點是否已添加
 for (;;) {
// 如果當前線程中斷標志位是 true,
 // 那么從列表中移除節點 q,並拋出 InterruptedException 異 常
 if (Thread.interrupted()) {
 removeWaiter(q);
 throw new InterruptedException();
 }
 int s = state;
 if (s > COMPLETING) { // 當狀態大於 COMPLETING 時,表示 FutureTask 任務已結束。
 if (q != null)
 q.thread = null; // 將節點 q 線程設置為 null,因為線程沒有阻塞等待
 return s;
 }// 表示還有一些后序操作沒有完成,那么當前線程讓出執行權
 else if (s == COMPLETING) // cannot time out yet
 Thread.yield();
//表示狀態是 NEW,那么就需要將當前線程阻塞等待。
 // 就是將它插入等待線程鏈表中,
 else if (q == null)
 q = new WaitNode();
 else if (!queued)
// 使用 CAS 函數將新節點添加到鏈表中,如果添加失敗,那么
queued 為 false// 下次循環時,會繼續添加,知道成功。
 queued = UNSAFE.compareAndSwapObject(this, 
waitersOffset,
 q.next = 
waiters, q);
 else if (timed) {// timed 為 true 表示需要設置超時
 nanos = deadline - System.nanoTime();
 if (nanos <= 0L) {
 removeWaiter(q);
 return state;
 }
 LockSupport.parkNanos(this, nanos); // 讓當前線程等待 nanos 時間
 }
 else
 LockSupport.park(this);
 } }

被阻塞的線程,會等到 run 方法執行結束之后被喚醒

 

report
report 方法就是根據傳入的狀態值 s,來決定是拋出異常,還是返回結果值。這個兩種情況都表示 FutureTask 完結了 
private V report(int s) throws ExecutionException {
 Object x = outcome;//表示 call 的返回值
 if (s == NORMAL) // 表示正常完結狀態,所以返回結果值
 return (V)x;
// 大於或等於 CANCELLED,都表示手動取消 FutureTask 任務,
// 所以拋出 CancellationException 異常
 if (s >= CANCELLED)
 throw new CancellationException();
// 否則就是運行過程中,發生了異常,這里就拋出這個異常
 throw new ExecutionException((Throwable)x);
}
線程池對於 Future/Callable 的執行
我們現在再來看線程池里面的 submit 方法,就會很清楚了。
public class CallableDemo implements Callable<String> {
 @Override
 public String call() throws Exception {
 //Thread.sleep(3000);//阻塞案例演示
 return "hello world";
 }
 public static void main(String[] args) throws ExecutionException, 
InterruptedException {
 ExecutorService es=Executors.newFixedThreadPool(1);
 CallableDemo callableDemo=new CallableDemo();
 Future future=es.submit(callableDemo);
 System.out.println(future.get());
 } }
AbstractExecutorService.submit
調用抽象類中的 submit 方法,這里其實相對於 execute 方法來說,只多做了一步操作,就是封裝了一個 RunnableFuture
public <T> Future<T> submit(Callable<T> task) {
 if (task == null) throw new NullPointerException();
 RunnableFuture<T> ftask = newTaskFor(task);
 execute(ftask);
 return ftask;
}
ThreadpoolExecutor.execute
然后調用 execute 方法,這里面的邏輯前面分析過了,會通過 worker 線程來調用過 ftask 的
run 方法。而這個 ftask 其實就是 FutureTask 里面最終實現的邏輯
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM