本文關鍵字:
線程
,線程池
,單線程
,多線程
,線程池的好處
,線程回收
,創建方式
,核心參數
,底層機制
,拒絕策略
,參數設置
,動態監控
,線程隔離
線程和線程池相關的知識,是Java學習或者面試中一定會遇到的知識點,本篇我們會從線程和進程,並行與並發,單線程和多線程等,一直講解到線程池,線程池的好處,創建方式,重要的核心參數,幾個重要的方法,底層實現,拒絕策略,參數設置,動態調整,線程隔離等等。主要的大綱如下:
線程池的好處
線程池,使用了池化思想來管理線程,池化技術就是為了最大化效益,最小化用戶風險,將資源統一放在一起管理的思想。這種思想在很多地方都有使用到,不僅僅是計算機,比如金融,企業管理,設備管理等。
為什么要線程池?如果在並發的場景,編碼人員根據需求來創建線程池,可能會有以下的問題:
- 我們很難確定系統有多少線程在運行,如果使用就創建,不使用就銷毀,那么創建和銷毀線程的消耗也是比較大的
- 假設來了很多請求,可能是爬蟲,瘋狂創建線程,可能把系統資源耗盡。
實現線程池有什么好處呢?
- 降低資源消耗:池化技術可以重復利用已經創建的線程,降低線程創建和銷毀的損耗。
- 提高響應速度:利用已經存在的線程進行處理,少去了創建線程的時間
- 管理線程可控:線程是稀缺資源,不能無限創建,線程池可以做到統一分配和監控
- 拓展其他功能:比如定時線程池,可以定時執行任務
其實池化技術,用在比較多地方,比如:
- 數據庫連接池:數據庫連接是稀缺資源,先創建好,提高響應速度,重復利用已有的連接
- 實例池:先創建好對象放到池子里面,循環利用,減少來回創建和銷毀的消耗
線程池相關的類
下面是與線程池相關的類的繼承關系:
Executor
Executor
是頂級接口,里面只有一個方法execute(Runnable command)
,定義的是調度線程池來執行任務,它定義了線程池的基本規范,執行任務是它的天職。
ExecutorService
ExecutorService
繼承了Executor
,但是它仍然是一個接口,它多了一些方法:
void shutdown()
:關閉線程池,會等待任務執行完。List<Runnable> shutdownNow()
:立刻關閉線程池,嘗試停止所有正在積極執行的任務,停止等待任務的處理,並返回一個正在等待執行的任務列表(還沒有執行的)。boolean isShutdown()
:判斷線程池是不是已經關閉,但是可能線程還在執行。boolean isTerminated()
:在執行shutdown/shutdownNow之后,所有的任務已經完成,這個狀態就是true。boolean awaitTermination(long timeout, TimeUnit unit)
:執行shutdown之后,阻塞等到terminated狀態,除非超時或者被打斷。<T> Future<T> submit(Callable<T> task)
: 提交一個有返回值的任務,並且返回該任務尚未有結果的Future,調用future.get()方法,可以返回任務完成的時候的結果。<T> Future<T> submit(Runnable task, T result)
:提交一個任務,傳入返回結果,這個result沒有什么作用,只是指定類型和一個返回的結果。Future<?> submit(Runnable task)
: 提交任務,返回Future<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
:批量執行tasks,獲取Future的list,可以批量提交任務。<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
:批量提交任務,並指定超時時間<T> T invokeAny(Collection<? extends Callable<T>> tasks)
: 阻塞,獲取第一個完成任務的結果值,<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
:阻塞,獲取第一個完成結果的值,指定超時時間
可能有同學對前面的<T> Future<T> submit(Runnable task, T result)
有疑問,這個reuslt有什么作用?
其實它沒有什么作用,只是持有它,任務完成后,還是調用 future.get()
返回這個結果,用result
new 了一個 ftask
,其內部其實是使用了Runnable的包裝類 RunnableAdapter
,沒有對result做特殊的處理,調用 call()
方法的時候,直接返回這個結果。(Executors 中具體的實現)
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
// 返回傳入的結果
return result;
}
}
還有一個方法值得一提:invokeAny()
: 在 ThreadPoolExecutor
中使用ExecutorService
中的方法 invokeAny()
取得第一個完成的任務的結果,當第一個任務執行完成后,會調用 interrupt()
方法將其他任務中斷。
注意,ExecutorService
是接口,里面都是定義,並沒有涉及實現,而前面的講解都是基於它的名字(規定的規范)以及它的普遍實現來說的。
可以看到 ExecutorService
定義的是線程池的一些操作,包括關閉,判斷是否關閉,是否停止,提交任務,批量提交任務等等。
AbstractExecutorService
AbstractExecutorService
是一個抽象類,實現了 ExecutorService
接口,這是大部分線程池的基本實現,定時的線程池先不關注,主要的方法如下:
不僅實現了submit
,invokeAll
,invokeAny
等方法,而且提供了一個 newTaskFor
方法用於構建 RunnableFuture
對象,那些能夠獲取到任務返回結果的對象都是通過 newTaskFor
來獲取的。不展開里面所有的源碼的介紹,僅以submit()方法為例:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 封裝任務
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 執行任務
execute(ftask);
// 返回 RunnableFuture 對象
return ftask;
}
但是在 AbstractExecutorService
是沒有對最最重要的方法進行實現的,也就是 execute()
方法。線程池具體是怎么執行的,這個不同的線程池可以有不同的實現,一般都是繼承 AbstractExecutorService
(定時任務有其他的接口),我們最最常用的就是ThreadPoolExecutor
。
ThreadPoolExecutor
重點來了!!! ThreadPoolExecutor
一般就是我們平時常用到的線程池類,所謂創建線程池,如果不是定時線程池,就是使用它。
先看ThreadPoolExecutor
的內部結構(屬性):
public class ThreadPoolExecutor extends AbstractExecutorService {
// 狀態控制,主要用來控制線程池的狀態,是核心的遍歷,使用的是原子類
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 用來表示線程數量的位數(使用的是位運算,一部分表示線程的數量,一部分表示線程池的狀態)
// SIZE = 32 表示32位,那么COUNT_BITS就是29位
private static final int COUNT_BITS = Integer.SIZE - 3;
// 線程池的容量,也就是27位表示的最大值
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 狀態量,存儲在高位,32位中的前3位
// 111(第一位是符號位,1表示負數),線程池運行中
private static final int RUNNING = -1 << COUNT_BITS;
// 000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001
private static final int STOP = 1 << COUNT_BITS;
// 010
private static final int TIDYING = 2 << COUNT_BITS;
// 011
private static final int TERMINATED = 3 << COUNT_BITS;
// 取出運行狀態
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 取出線程數量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 用運行狀態和線程數獲取ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 任務等待隊列
private final BlockingQueue<Runnable> workQueue;
// 可重入主鎖(保證一些操作的線程安全)
private final ReentrantLock mainLock = new ReentrantLock();
// 線程的集合
private final HashSet<Worker> workers = new HashSet<Worker>();
// 在Condition中,用await()替換wait(),用signal()替換notify(),用signalAll()替換notifyAll(),
// 傳統線程的通信方式,Condition都可以實現,Condition和傳統的線程通信沒什么區別,Condition的強大之處在於它可以為多個線程間建立不同的Condition
private final Condition termination = mainLock.newCondition();
// 最大線程池大小
private int largestPoolSize;
// 完成的任務數量
private long completedTaskCount;
// 線程工廠
private volatile ThreadFactory threadFactory;
// 任務拒絕處理器
private volatile RejectedExecutionHandler handler;
// 非核心線程的存活時間
private volatile long keepAliveTime;
// 允許核心線程的超時時間
private volatile boolean allowCoreThreadTimeOut;
// 核心線程數
private volatile int corePoolSize;
// 工作線程最大容量
private volatile int maximumPoolSize;
// 默認的拒絕處理器(丟棄任務)
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
// 運行時關閉許可
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
// 上下文
private final AccessControlContext acc;
// 只有一個線程
private static final boolean ONLY_ONE = true;
}
線程池狀態
從上面的代碼可以看出,用一個32位的對象保存線程池的狀態以及線程池的容量,高3位是線程池的狀態,而剩下的29位,則是保存線程的數量:
// 狀態量,存儲在高位,32位中的前3位
// 111(第一位是符號位,1表示負數),線程池運行中
private static final int RUNNING = -1 << COUNT_BITS;
// 000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001
private static final int STOP = 1 << COUNT_BITS;
// 010
private static final int TIDYING = 2 << COUNT_BITS;
// 011
private static final int TERMINATED = 3 << COUNT_BITS;
各種狀態之間是不一樣的,他們的狀態之間變化如下:
- RUNNING:運行狀態,可以接受任務,也可以處理任務
- SHUTDOWN:不可以接受任務,但是可以處理任務
- STOP:不可以接受任務,也不可以處理任務,中斷當前任務
- TIDYING:所有線程停止
- TERMINATED:線程池的最后狀態
Worker 實現
線程池,肯定得有池子,並且是放線程的地方,在 ThreadPoolExecutor
中表現為 Worker
,這是內部類:
線程池其實就是 Worker
(打工人,不斷的領取任務,完成任務)的集合,這里使用的是 HashSet
:
private final HashSet<Worker> workers = new HashSet<Worker>();
Worker
怎么實現的呢?
Worker
除了繼承了 AbstractQueuedSynchronizer
,也就是 AQS
, AQS
本質上就是個隊列鎖,一個簡單的互斥鎖,一般是在中斷或者修改 worker
狀態的時候使用。
內部引入AQS
,是為了線程安全,線程執行任務的時候,調用的是runWorker(Worker w)
,這個方法不是worker的方法,而是 ThreadPoolExecutor
的方法。從下面的代碼可以看出,每次修改Worke
r的狀態的時候,都是線程安全的。Worker
里面,持有了一個線程Thread
,可以理解為是對線程的封裝。
至於runWorker(Worker w)
是怎么運行的?先保持這個疑問,后面詳細講解。
// 實現 Runnable,封裝了線程
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// 序列化id
private static final long serialVersionUID = 6138294804551838833L;
// worker運行的線程
final Thread thread;
// 初始化任務,有可能是空的,如果任務不為空的時候,其他進來的任務,可以直接運行,不在添加到任務隊列
Runnable firstTask;
// 線程任務計數器
volatile long completedTasks;
// 指定一個任務讓工人忙碌起來,這個任務可能是空的
Worker(Runnable firstTask) {
// 初始化AQS隊列鎖的狀態
setState(-1); // 禁止中斷直到 runWorker
this.firstTask = firstTask;
// 從線程工廠,取出一個線程初始化
this.thread = getThreadFactory().newThread(this);
}
// 實際上運行調用的是runWorker
public void run() {
// 不斷循環獲取任務進行執行
runWorker(this);
}
// 0表示沒有被鎖
// 1表示被鎖的狀態
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 獨占,嘗試獲取鎖,如果成功返回true,失敗返回false
protected boolean tryAcquire(int unused) {
// CAS 樂觀鎖
if (compareAndSetState(0, 1)) {
// 成功,當前線程獨占鎖
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 獨占方式,嘗試釋放鎖
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 上鎖,調用的是AQS的方法
public void lock() { acquire(1); }
// 嘗試上鎖
public boolean tryLock() { return tryAcquire(1); }
// 解鎖
public void unlock() { release(1); }
// 是否鎖住
public boolean isLocked() { return isHeldExclusively(); }
// 如果開始可就中斷
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
任務隊列
除了放線程池的地方,要是任務很多,沒有那么多線程,肯定需要一個地方放任務,充當緩沖作用,也就是任務隊列,在代碼中表現為:
private final BlockingQueue<Runnable> workQueue;
拒絕策略和處理器
計算機的內存總是有限的,我們不可能一直往隊列里面增加內容,所以線程池為我們提供了選擇,可以選擇多種隊列。同時當任務實在太多,占滿了線程,並且把任務隊列也占滿的時候,我們需要做出一定的反應,那就是拒絕還是拋出錯誤,丟掉任務?丟掉哪些任務,這些都是可能需要定制的內容。
如何創建線程池
關於如何創建線程池,其實 ThreadPoolExecutor
提供了構造方法,主要參數如下,不傳的話會使用默認的:
- 核心線程數:核心線程數,一般是指常駐的線程,沒有任務的時候通常也不會銷毀
- 最大線程數:線程池允許創建的最大的線程數量
- 非核心線程的存活時間:指的是沒有任務的時候,非核心線程能夠存活多久
- 時間的單位:存活時間的單位
- 存放任務的隊列:用來存放任務
- 線程工廠
- 拒絕處理器:如果添加任務失敗,將由該處理器處理
// 指定核心線程數,最大線程數,非核心線程沒有任務的存活時間,時間單位,任務隊列
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
// 指定核心線程數,最大線程數,非核心線程沒有任務的存活時間,時間單位,任務隊列,線程池工廠
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
// 指定核心線程數,最大線程數,非核心線程沒有任務的存活時間,時間單位,任務隊列,拒絕任務處理器
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
// 最后其實都是調用了這個方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}
其實,除了顯示的指定上面的參數之外,JDK也封裝了一些直接創建線程池的方法給我們,那就是Executors
:
// 固定線程數量的線程池,無界的隊列
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
// 單個線程的線程池,無界的隊列,按照任務提交的順序,串行執行
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
// 動態調節,沒有核心線程,全部都是普通線程,每個線程存活60s,使用容量為1的阻塞隊列
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
// 定時任務線程池
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
但是一般是不推薦使用上面別人封裝的線程池的哈!!!
線程池的底層參數以及核心方法
看完上面的創建參數大家可能會有點懵,但是沒關系,一一為大家道來:
可以看出,當有任務進來的時候,先判斷核心線程池是不是已經滿了,如果還沒有,將會繼續創建線程。注意,如果一個任務進來,創建線程執行,執行完成,線程空閑下來,這時候再來一個任務,是會繼續使用之前的線程,還是重新創建一個線程來執行呢?
答案是重新創建線程,這樣線程池可以快速達到核心線程數的規模大小,以便快速響應后面的任務。
如果線程數量已經到達核心線程數,來了任務,線程池的線程又都不是空閑狀態,那么就會判斷隊列是不是滿的,倘若隊列還有空間,那么就會把任務放進去隊列中,等待線程領取執行。
如果任務隊列已經滿了,放不下任務,那么就會判斷線程數是不是已經到最大線程數了,要是還沒有到達,就會繼續創建線程並執行任務,這個時候創建的是非核心部分線程。
如果已經到達最大線程數,那么就不能繼續創建線程了,只能執行拒絕策略,默認的拒絕策略是丟棄任務,我們可以自定義拒絕策略。
值得注意的是,倘若之前任務比較多,創建出了一些非核心線程,那么任務少了之后,領取不到任務,過了一定時間,非核心線程就會銷毀,只剩下核心線程池的數量的線程。這個時間就是前面說的keepAliveTime
。
提交任務
提交任務,我們看execute()
,會先獲取線程池的狀態和個數,要是線程個數還沒達到核心線程數,會直接添加線程,否則會放到任務隊列,如果任務隊列放不下,會繼續增加線程,但是不是增加核心線程。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 獲取狀態和個數
int c = ctl.get();
// 如果個數小於核心線程數
if (workerCountOf(c) < corePoolSize) {
// 直接添加
if (addWorker(command, true))
return;
// 添加失敗則繼續獲取
c = ctl.get();
}
// 判斷線程池狀態是不是運行中,任務放到隊列中
if (isRunning(c) && workQueue.offer(command)) {
// 再次檢查
int recheck = ctl.get();
// 判斷線程池是不是還在運行
if (! isRunning(recheck) && remove(command))
// 如果不是,那么就拒絕並移除任務
reject(command);
else if (workerCountOf(recheck) == 0)
// 如果線程數為0,並且還在運行,那么就直接添加
addWorker(null, false);
}else if (!addWorker(command, false))
// 添加任務隊列失敗,拒絕
reject(command);
}
上面的源碼中,調用了一個重要的方法:addWorker(Runnable firstTask, boolean core)
,該方法主要是為了增加工作的線程,我們來看看它是如何執行的:
private boolean addWorker(Runnable firstTask, boolean core) {
// 回到當前位置重試
retry:
for (;;) {
// 獲取狀態
int c = ctl.get();
int rs = runStateOf(c);
// 大於SHUTDOWN說明線程池已經停止
// ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()) 表示三個條件至少有一個不滿足
// 不等於SHUTDOWN說明是大於shutdown
// firstTask != null 任務不是空的
// workQueue.isEmpty() 隊列是空的
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 工作線程數
int wc = workerCountOf(c);
// 是否符合容量
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 添加成功,跳出循環
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// cas失敗,重新嘗試
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 前面線程計數增加成功
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 創建了一個worker,包裝了任務
w = new Worker(firstTask);
final Thread t = w.thread;
// 線程創建成功
if (t != null) {
// 獲取鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 再次確認狀態
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果線程已經啟動,失敗
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 新增線程到集合
workers.add(w);
// 獲取大小
int s = workers.size();
// 判斷最大線程池數量
if (s > largestPoolSize)
largestPoolSize = s;
// 已經添加工作線程
workerAdded = true;
}
} finally {
// 解鎖
mainLock.unlock();
}
// 如果已經添加
if (workerAdded) {
// 啟動線程
t.start();
workerStarted = true;
}
}
} finally {
// 如果沒有啟動
if (! workerStarted)
// 失敗處理
addWorkerFailed(w);
}
return workerStarted;
}
處理任務
前面在介紹Worker
這個類的時候,我們講解到其實它的run()
方法調用的是外部的runWorker()
方法,那么我們來看看runWorkder()
方法:
首先,它會直接處理自己的firstTask,這個任務並沒有在任務隊列里面,而是它自己持有的:
final void runWorker(Worker w) {
// 當前線程
Thread wt = Thread.currentThread();
// 第一個任務
Runnable task = w.firstTask;
// 重置為null
w.firstTask = null;
// 允許打斷
w.unlock();
boolean completedAbruptly = true;
try {
// 任務不為空,或者獲取的任務不為空
while (task != null || (task = getTask()) != null) {
// 加鎖
w.lock();
//如果線程池停止,確保線程被中斷;
//如果不是,確保線程沒有被中斷。這
//在第二種情況下需要復查處理
// shutdown - now競賽同時清除中斷
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 執行之前回調方法(可以由我們自己實現)
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執行任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 執行之后回調方法
afterExecute(task, thrown);
}
} finally {
// 置為null
task = null;
// 更新完成任務
w.completedTasks++;
w.unlock();
}
}
// 完成
completedAbruptly = false;
} finally {
// 處理線程退出相關工作
processWorkerExit(w, completedAbruptly);
}
}
上面可以看到如果當前的任務是null,會去獲取一個task,我們看看getTask()
,里面涉及到了兩個參數,一個是是不是允許核心線程銷毀,另外一個是線程數是不是大於核心線程數,如果滿足條件,就從隊列中取出任務,如果超時取不到,那就返回空,表示沒有取到任務,沒有取到任務,就不會執行前面的循環,就會觸發線程銷毀processWorkerExit()
等工作。
private Runnable getTask() {
// 是否超時
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// SHUTDOWN狀態繼續處理隊列中的任務,但是不接收新的任務
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 線程數
int wc = workerCountOf(c);
// 是否允許核心線程超時或者線程數大於核心線程數
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 減少線程成功,就返回null,后面由processWorkerExit()處理
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果允許核心線程關閉,或者超過了核心線程,就可以在超時的時間內獲取任務,或者直接取出任務
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 如果能取到任務,那就肯定可以執行
if (r != null)
return r;
// 否則就獲取不到任務,超時了
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
銷毀線程
前面提到,如果線程當前任務為空,又允許核心線程銷毀,或者線程超過了核心線程數,等待了一定時間,超時了卻沒有從任務隊列獲取到任務的話,就會跳出循環執行到后面的線程銷毀(結束)程序。那銷毀線程的時候怎么做呢?
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果是突然結束的線程,那么之前的線程數是沒有調整的,這里需要調整
if (completedAbruptly)
decrementWorkerCount();
// 獲取鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 完成的任務數
completedTaskCount += w.completedTasks;
// 移除線程
workers.remove(w);
} finally {
// 解鎖
mainLock.unlock();
}
// 試圖停止
tryTerminate();
// 獲取狀態
int c = ctl.get();
// 比stop小,至少是shutdown
if (runStateLessThan(c, STOP)) {
// 如果不是突然完成
if (!completedAbruptly) {
// 最小值要么是0,要么是核心線程數,要是允許核心線程超時銷毀,那么就是0
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果最小的是0或者隊列不是空的,那么保留一個線程
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 只要大於等於最小的線程數,就結束當前線程
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 否則的話,可能還需要新增工作線程
addWorker(null, false);
}
}
如何停止線程池
停止線程池可以使用shutdown()
或者shutdownNow()
,shutdown()
可以繼續處理隊列中的任務,而shutdownNow()
會立即清理任務,並返回未執行的任務。
public void shutdown() {
// 獲取鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 檢查停止權限
checkShutdownAccess();
// 更新狀態
advanceRunState(SHUTDOWN);
// 中斷所有線程
interruptIdleWorkers();
// 回調鈎子
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
// 立刻停止
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
// 獲取鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 檢查停止權限
checkShutdownAccess();
// 更新狀態到stop
advanceRunState(STOP);
// 中斷所有線程
interruptWorkers();
// 清理隊列
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
// 返回任務列表(未完成)
return tasks;
}
execute()和submit()方法
execute()
方法可以提交不需要返回值的任務,無法判斷任務是否被線程池執行是否成功submit()
方法用於提交需要返回值的任務。線程池會返回一個future類型的對象,通過這個對象,我們調用get()
方法就可以阻塞,直到獲取到線程執行完成的結果,同時我們也可以使用有超時時間的等待方法get(long timeout,TimeUnit unit)
,這樣不管線程有沒有執行完成,如果到時間,也不會阻塞,直接返回null。返回的是RunnableFuture
對象,繼承了Runnable, Future<V>
兩個接口:
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
線程池為什么使用阻塞隊列?
阻塞隊列,首先是一個隊列,肯定具有先進先出的屬性。
而阻塞,則是這個模型的演化,一般隊列,可以用在生產消費者模型,也就是數據共享,有人往里面放任務,有人不斷的往里面取出任務,這是一個理想的狀態。
但是倘若不理想,產生任務和消費任務的速度不一樣,要是任務放在隊列里面比較多,消費比較慢,還可以慢慢消費,或者生產者得暫停一下產生任務(阻塞生產者線程)。可以使用 offer(E o, long timeout, TimeUnit unit)
設定等待的時間,如果在指定的時間內,還不能往隊列中加入BlockingQueue
,則返回失敗,也可以使用put(Object)
,將對象放到阻塞隊列里面,如果沒有空間,那么這個方法會阻塞到有空間才會放進去。
如果消費速度快,生產者來不及生產,獲取任務的時候,可以使用poll(time)
,有數據則直接取出來,沒數據則可以等待time
時間后,返回null
。也可以使用take()
取出第一個任務,沒有任務就會一直阻塞到隊列有任務為止。
上面說了阻塞隊列的屬性,那么為啥要用呢?
- 如果產生任務,來了就往隊列里面放,資源很容易被耗盡。
- 創建線程需要獲取鎖,這個一個線程池的全局鎖,如果各個線程不斷的獲取鎖,解鎖,線程上下文切換之類的開銷也比較大,不如在隊列為空的時候,然一個線程阻塞等待。
常見的阻塞隊列
- ArrayBlockingQueue:基於數組實現,內部有一個定長的數組,同時保存着隊列頭和尾部的位置。
- LinkedBlockingQueue:基於鏈表的阻塞對壘,生產者和消費者使用獨立的鎖,並行能力強,如果不指定容量,默認是無效容量,容易系統內存耗盡。
- DelayQueue:延遲隊列,沒有大小限制,生產數據不會被阻塞,消費數據會,只有指定的延遲時間到了,才能從隊列中獲取到該元素。
- PriorityBlockingQueue:基於優先級的阻塞隊列,按照優先級進行消費,內部控制同步的是公平鎖。
- SynchronousQueue:沒有緩沖,生產者直接把任務交給消費者,少了中間的緩存區。
線程池如何復用線程的?執行完成的線程怎么處理
前面的源碼分析,其實已經講解過這個問題了,線程池的線程調用的run()
方法,其實調用的是runWorker()
,里面是死循環,除非獲取不到任務,如果沒有了任務firstTask並且從任務隊列中獲取不到任務,超時的時候,會再判斷是不是可以銷毀核心線程,或者超過了核心線程數,滿足條件的時候,才會讓當前的線程結束。
否則,一直都在一個循環中,不會結束。
我們知道start()
方法只能調用一次,因此調用到run()
方法的時候,調用外面的runWorker()
,讓其在runWorker()
的時候,不斷的循環,獲取任務。獲取到任務,調用任務的run()
方法。
執行完成的線程會調用processWorkerExit()
,前面有分析,里面會獲取鎖,把線程數減少,從工作線程從集合中移除,移除掉之后,會判斷線程是不是太少了,如果是,會再加回來,個人以為是一種補救。
如何配置線程池參數?
一般而言,有個公式,如果是計算(CPU)密集型的任務,那么核心線程數設置為處理器核數-1
,如果是io密集型(很多網絡請求),那么就可以設置為2*處理器核數
。但是這並不是一個銀彈,一切要從實際出發,最好就是在測試環境進行壓測,實踐出真知,並且很多時候一台機器不止一個線程池或者還會有其他的線程,因此參數不可設置得太過飽滿。
一般 8 核的機器,設置 10-12 個核心線程就差不多了,這一切必須按照業務具體值進行計算。設置過多的線程數,上下文切換,競爭激烈,設置過少,沒有辦法充分利用計算機的資源。
計算(CPU)密集型消耗的主要是 CPU 資源,可以將線程數設置為 N(CPU 核心數)+1,比 CPU 核心數多出來的一個線程是為了防止線程偶發的缺頁中斷,或者其它原因導致的任務暫停而帶來的影響。一旦任務暫停,CPU 就會處於空閑狀態,而在這種情況下多出來的一個線程就可以充分利用 CPU 的空閑時間。
io密集型系統會用大部分的時間來處理 I/O 交互,而線程在處理 I/O 的時間段內不會占用 CPU 來處理,這時就可以將 CPU 交出給其它線程使用。因此在 I/O 密集型任務的應用中,我們可以多配置一些線程,具體的計算方法是 2N。
為什么不推薦默認的線程池創建方式?
阿里的編程規范里面,不建議使用默認的方式來創建線程,是因為這樣創建出來的線程很多時候參數都是默認的,可能創建者不太了解,很容易出問題,最好通過new ThreadPoolExecutor()
來創建,方便控制參數。默認的方式創建的問題如下:
- Executors.newFixedThreadPool():無界隊列,內存可能被打爆
- Executors.newSingleThreadExecutor():單個線程,效率低,串行。
- Executors.newCachedThreadPool():沒有核心線程,最大線程數可能為無限大,內存可能還會爆掉。
使用具體的參數創建線程池,開發者必須了解每個參數的作用,不會胡亂設置參數,減少內存溢出等問題。
一般體現在幾個問題:
- 任務隊列怎么設置?
- 核心線程多少個?
- 最大線程數多少?
- 怎么拒絕任務?
- 創建線程的時候沒有名稱,追溯問題不好找。
線程池的拒絕策略
線程池一般有以下四種拒絕策略,其實我們可以從它的內部類看出來:
- AbortPolicy: 不執行新的任務,直接拋出異常,提示線程池已滿
- DisCardPolicy:不執行新的任務,但是也不會拋出異常,默默的
- DisCardOldSetPolicy:丟棄消息隊列中最老的任務,變成新進來的任務
- CallerRunsPolicy:直接調用當前的execute來執行任務
一般而言,上面的拒絕策略都不會特別理想,一般要是任務滿了,首先需要做的就是看任務是不是必要的,如果非必要,非核心,可以考慮拒絕掉,並報錯提醒,如果是必須的,必須把它保存起來,不管是使用mq消息,還是其他手段,不能丟任務。在這些過程中,日志是非常必要的。既要保護線程池,也要對業務負責。
線程池監控與動態調整
線程池提供了一些API,可以動態獲取線程池的狀態,並且還可以設置線程池的參數,以及狀態:
查看線程池的狀態:
修改線程池的狀態:
關於這一點,美團的線程池文章講得很清楚,甚至做了一個實時調整線程池參數的平台,可以進行跟蹤監控,線程池活躍度、任務的執行Transaction(頻率、耗時)、Reject異常、線程池內部統計信息等等。這里我就不展開了,原文:https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html ,這是我們可以參考的思路。
線程池隔離
線程隔離,很多同學可能知道,就是不同的任務放在不同的線程里面運行,而線程池隔離,一般是按照業務類型來隔離,比如訂單的處理線程放在一個線程池,會員相關的處理放在一個線程池。
也可以通過核心和非核心來隔離,核心處理流程放在一起,非核心放在一起,兩個使用不一樣的參數,不一樣的拒絕策略,盡量保證多個線程池之間不影響,並且最大可能保住核心線程的運行,非核心線程可以忍受失敗。
Hystrix
里面運用到這個技術,Hystrix
的線程隔離技術,來防止不同的網絡請求之間的雪崩,即使依賴的一個服務的線程池滿了,也不會影響到應用程序的其他部分。
關於作者
秦懷,公眾號【秦懷雜貨店】作者,技術之路不在一時,山高水長,縱使緩慢,馳而不息。個人寫作方向:Java源碼解析,JDBC,Mybatis,Spring,redis,分布式,劍指Offer,LeetCode等,認真寫好每一篇文章,不喜歡標題黨,不喜歡花里胡哨,大多寫系列文章,不能保證我寫的都完全正確,但是我保證所寫的均經過實踐或者查找資料。遺漏或者錯誤之處,還望指正。