前言
在了解線程池之前,其實首先出現的疑問是:為什么要使用線程池,其次是了解什么是線程池,最后是如何使用線程池,帶着疑問去學習。
為什么要使用
前面多線程文章中,需要使用線程就開啟一個新線程,簡單方便,但是這樣在大量線程被開啟時:如果並發的線程數量很多,並且每個線程都是執行一個時間很短的任務就結束了,這樣頻繁創建線程就會大大降低系統的效率,因為頻繁創建線程和銷毀線程需要時間。
那么我們可不可以開啟適量的線程,執行完任務不被銷毀,繼續執行新的任務呢?
Java中,為我們提供了線程池來實現這個目標,所以先來了解,什么是線程池,線程池的實現原理是什么?
線程池
線程池為線程生命周期開銷問題和資源不足問題提供了解決方案。通過對多個任務重用線程,線程創建的開銷被分攤到了多個任務上。其好處是,因為在請求到達時線程已經存在,所以無意中也消除了線程創建所帶來的延遲。這樣,就可以立即為請求服務,使應用程序響應更快。而且,通過適當地調整線程池中的線程數目,也就是當請求的數目超過某個閾值時,就強制其它任何新到的請求一直等待,直到獲得一個線程來處理為止,從而可以防止資源不足
一. Java中的ThreadPoolExecutor類
了解線程池,從線程池最核心的類ThreadPoolExecutor類開始了解,盡管一萬個不願意,還是需要看源碼
在ThreadPoolExecutor類中提供了四個構造方法:
public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
......
}
...
}
可以看出,前三個構造函數最終調用的是第四個構造函數進行初始化操作。
逐個解釋構造器中各個參數的含義:
-
corePoolSize
:核心池的大小,這個參數跟后面講述的線程池的實現原理有非常大的關系。在創建了線程池后,默認情況下,線程池中並沒有任何線程,而是等待有任務到來才創建線程去執行任務。除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預創建線程的意思,即在沒有任務到來之前就創建corePoolSize個線程或者一個線程。默認情況下,在創建了線程池后,線程池中的線程數為0,當有任務來之后,就會創建一個線程去執行任務,當線程池中的線程數目達到corePoolSize后,就會把到達的任務放到緩存隊列當中; -
maximumPoolSize
:線程池最大線程數,這個參數也是一個非常重要的參數,它表示在線程池中最多能創建多少個線程; -
keepAliveTime
:表示線程沒有任務執行時最多保持多久時間會終止。默認情況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大於corePoolSize,即當線程池中的線程數大於corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0; -
unit
:參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:- TimeUnit.DAYS; //天
- TimeUnit.HOURS; //小時
- TimeUnit.MINUTES; //分鍾
- TimeUnit.SECONDS; //秒
- TimeUnit.MILLISECONDS; //毫秒
- TimeUnit.MICROSECONDS; //微妙
- TimeUnit.NANOSECONDS; //納秒
-
workQueue
:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,一般來說,這里的阻塞隊列有以下幾種選擇- ArrayBlockingQueue:基於數組的先進先出隊列,此隊列創建時必須指定大小;
- LinkedBlockingQueue:基於鏈表的先進先出隊列,如果創建時沒有指定此隊列大小,則默認為Integer.MAX_VALUE;
- synchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是直接新建一個線程來執行新來的任務。
- DelayQueue:DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue是一個沒有大小限制的隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據的操作(消費者)才會被阻塞。
- PriorityBlockingQueue:基於優先級的阻塞隊列(優先級的判斷通過構造函數傳入的Compator對象來決定),但需要注意的是PriorityBlockingQueue並不會阻塞數據生產者,而只會在沒有可消費的數據時,阻塞數據的消費者。因此使用的時候要特別注意,生產者生產數據的速度絕對不能快於消費者消費數據的速度,否則時間一長,會最終耗盡所有的可用堆內存空間。
-
threadFactory
:線程工廠,主要用來創建線程;一般不用自己實現,使用Executors.defaultThreadFactory() -
handler
:表示當拒絕處理任務時的策略,有以下四種取值- ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
- ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
- ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程)
- ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務
根據上面ThreadPoolExecutor類的代碼可以知道,ThreadPoolExecutor繼承了AbstractExecutorService,簡單看一下AbstractExecutorService的實現:
public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {}
public Future<?> submit(Runnable task) {}
public <T> Future<T> submit(Runnable task, T result) {}
public <T> Future<T> submit(Callable<T> task) {}
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException {}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException {}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException {}
private static <T> void cancelAll(ArrayList<Future<T>> futures) {}
private static <T> void cancelAll(ArrayList<Future<T>> futures, int j) {}
}
AbstractExecutorService是一個抽象類,它實現了ExecutorService接口
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
而ExecutorService又是繼承了Executor接口,我們看一下Executor接口的實現:
public interface Executor {
void execute(Runnable command);
}
通過上面的繼承與實現,能很明了的掌握ThreadPoolExecutor與AbstractExecutorService、ExecutorService、Executor之間的關系:
- Executor是一個頂層接口,在它里面只聲明了一個方法execute(Runnable),返回值為void,參數為Runnable類型,從字面意思可以理解,就是用來執行傳進去的任務的;
- ExecutorService接口繼承了Executor接口,並聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等
- 抽象類AbstractExecutorService實現了ExecutorService接口,基本實現了ExecutorService中聲明的所有方法
- 最后ThreadPoolExecutor繼承了類AbstractExecutorService
在ThreadPoolExecutor類中有幾個非常重要的方法:
execute()
submit()
shutdown()
shutdownNow()
execute()
方法實際上是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向線程池提交一個任務,交由線程池去執行。
submit()
方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中並沒有對其進行重寫,這個方法也是用來向線程池提交任務的,但是它和execute()方法不同,它能夠返回任務執行的結果,去看submit()方法的實現,會發現它實際上還是調用的execute()方法,只不過它利用了Future來獲取任務執行結果(Future相關內容將在下一篇講述)。
shutdown()
和shutdownNow()
是用來關閉線程池的。
二.深入剖析線程池實現原理
第一節,簡單了解ThreadPoolExecutor,接下來深入分析線程池的具體實現原理,將從下面幾個方面講解:
1.線程池狀態
2.任務的執行
3.線程池中的線程初始化
4.任務緩存隊列及排隊策略
5.任務拒絕策略
6.線程池的關閉
7.線程池容量的動態調整
1.線程池狀態
在ThreadPoolExecutor中定義了幾個static final變量表示線程池的各個狀態:
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
-
RUNNING
(1) 狀態說明:線程池處在RUNNING狀態時,能夠接收新任務,以及對已添加的任務進行處理。
(2) 狀態切換:線程池的初始化狀態是RUNNING。換句話說,線程池被一旦被創建,就處於RUNNING狀態,並且線程池中的任務數為0! -
SHUTDOWN
(1) 狀態說明:線程池處在SHUTDOWN狀態時,不接收新任務,但能處理已添加的任務。
(2) 狀態切換:調用線程池的shutdown()接口時,線程池由RUNNING -> SHUTDOWN。 -
STOP
(1) 狀態說明:線程池處在STOP狀態時,不接收新任務,不處理已添加的任務,並且會中斷正在處理的任務。
(2) 狀態切換:調用線程池的shutdownNow()接口時,線程池由(RUNNING or SHUTDOWN ) -> STOP。 -
TIDYING
(1) 狀態說明:當所有的任務已終止,ctl記錄的”任務數量”為0,線程池會變為TIDYING狀態。當線程池變為TIDYING狀態時,會執行鈎子函數terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變為TIDYING時,進行相應的處理;可以通過重載terminated()函數來實現。
(2) 狀態切換:當線程池在SHUTDOWN狀態下,阻塞隊列為空並且線程池中執行的任務也為空時,就會由 SHUTDOWN -> TIDYING。
當線程池在STOP狀態下,線程池中執行的任務為空時,就會由STOP -> TIDYING。 -
TERMINATED
(1) 狀態說明:線程池徹底終止,就變成TERMINATED狀態。
(2) 狀態切換:線程池處在TIDYING狀態時,執行完terminated()之后,就會由 TIDYING -> TERMINATED。
2.任務的執行
在了解將任務提交給線程池到任務執行完畢整個過程之前,我們先來看一下ThreadPoolExecutor類中其他的一些比較重要成員變量:
private final BlockingQueue<Runnable> workQueue; //任務緩存隊列,用來存放等待執行的任務
private final ReentrantLock mainLock = new ReentrantLock(); //線程池的主要狀態鎖,對線程池狀態(比如線程池大小
//、runState等)的改變都要使用這個鎖
private final HashSet<Worker> workers = new HashSet<Worker>(); //用來存放工作集
private volatile long keepAliveTime; //線程存貨時間
private volatile boolean allowCoreThreadTimeOut; //是否允許為核心線程設置存活時間
private volatile int corePoolSize; //核心池的大小(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列)
private volatile int maximumPoolSize; //線程池最大能容忍的線程數
private volatile int poolSize; //線程池中當前的線程數
private volatile RejectedExecutionHandler handler; //拒絕策略的處理句柄。所謂拒絕策略,是指將任務添加到線程池中時,線程池拒絕該任務所采用的相應策略。
private volatile ThreadFactory threadFactory; //線程工廠,用來創建線程
private int largestPoolSize; //用來記錄線程池中曾經出現過的最大線程數
private long completedTaskCount; //用來記錄已經執行完畢的任務個數
每個變量的作用添加了解釋,這里要對corePoolSize、maximumPoolSize、largestPoolSize三個變量重點解釋
corePoolSize一般翻譯為核心池的大小,也可以理解就是線程池的大小
,舉個例子來說明:
假如有一個工廠,工廠里面有10個工人,每個工人同時只能做一件任務。
因此只要當10個工人中有工人是空閑的,來了任務就分配給空閑的工人做;
當10個工人都有任務在做時,如果還來了任務,就把任務進行排隊等待;
如果說新任務數目增長的速度遠遠大於工人做任務的速度,那么此時工廠主管可能會想補救措施,比如重新招4個臨時工人進來;
然后就將任務也分配給這4個臨時工人做;
如果說着14個工人做任務的速度還是不夠,此時工廠主管可能就要考慮不再接收新的任務或者拋棄前面的一些任務了。
當這14個工人當中有人空閑時,而新任務增長的速度又比較緩慢,工廠主管可能就考慮辭掉4個臨時工了,只保持原來的10個工人,畢竟請額外的工人是要花錢的。
這里corePoolSize就是10,而maximumPoolSize就是14(10+4),也就是說corePoolSize就是線程池大小,maximumPoolSize在我看來是線程池的一種補救措施,即任務量突然過大時的一種補救措施。
largestPoolSize只是一個用來起記錄作用的變量,用來記錄線程池中曾經有過的最大線程數目,跟線程池的容量沒有任何關系
ThreadPoolExecutor類中最核心的方法是任務提交execute(),雖然通過submit也可以提交任務,但是實際上submit方法里面最終調用的還是execute()方法,所以我們只需要研究execute()方法的實現原理即可
execute()
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
下面一行行對代碼進行解釋:
首先判斷任務command是否為null,若是null,則拋出空指針異常;
接着獲取ctl對應的int值。該int值保存了"線程池狀態"和"線程池中任務的數量"信息
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0))
int c = ctl.get();
接下來判斷 線程池中線程數量是否 < "核心線程池數量",也就是線程池中少於corePoolSize個任務
if (workerCountOf(c) < corePoolSize)
狀態1:小於
嘗試通過addWorker(command, true)
將創建新線程將任務添加到該線程,然后添加到工作集workers
。如果添加成功則啟動線程開始執行任務。
- 1.如果任務添加成功,則整個方法結束。
- 2.如果失敗,重新獲取ctl的int值,然后繼續進入狀態2
狀態2:大於
if (isRunning(c) && workQueue.offer(command))
如果線程池處於RUNNING
狀態,嘗試將任務添加到任務緩存隊列
如果添加成功,進入狀態3,否則狀態4
狀態3:再次確認“線程池狀態”,若線程池異常終止了,則刪除任務;然后通過reject()執行相應的拒絕策略的內容。
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
否則,如果"線程池中任務數量"為0,則通過addWorker(null, false)嘗試新建一個線程,新建線程對應的任務為null。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
狀態4:
最后再嘗試通過addWorker(command, false)新建一個線程,並將任務(command)添加到該線程中;然后,啟動該線程從而執行任務。如果還是失敗,則通過reject(command)執行相應的拒絕策略的內容。
else if (!addWorker(command, false))
reject(command);
梳理
至此,提交新任務就執行完畢,再來梳理一遍
-
1.如果 當前線程池線程數量 < corePoolSize,則嘗試開啟新線程添加任務並執行線程。如果成功,則整個方法結束
-
2.如果 當前線程池線程數量 > corePoolSize,並且允許將當前任務添加任務緩存隊列。
- 2.1這時候來重新檢查線程池運行狀態,如果不是
RUNNING
狀態,則嘗試將任務從緩存隊列刪除,如果成功然后通過reject
執行相應拒絕策略 - 2.2否則檢查線程池中任務數量是不是0,如果為0,則通過addWorker(null, false)嘗試新建一個線程,新建線程對應的任務為null
- 2.1這時候來重新檢查線程池運行狀態,如果不是
-
3.上面兩種都不符合,再嘗試通過addWorker(command, false)新建一個線程,並將任務(command)添加到該線程中;然后,啟動該線程從而執行任務。如果還是失敗,則通過reject(command)執行相應的拒絕策略的內容
繼續查看addWorker()源碼
addWorker()
private boolean addWorker(Runnable firstTask, boolean core) {
// 更新"線程池狀態和計數"標記,即更新ctl。
retry:
for (;;) {
//獲取ctl對應的int值。該int值保存了"線程池狀態"和"線程池中任務的數量"信息
int c = ctl.get();
//獲取線程池運行狀態
int rs = runStateOf(c);
// 檢查運行狀態是不是可運行狀態並且者緩存隊列是否為空,如果不合法,則返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//獲取線程池中任務數量
int wc = workerCountOf(c);
//如果數量超過默認容量或者最大線程池數量,返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 通過CAS函數將c的值+1。操作失敗的話,則退出循環。
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 檢查"線程池狀態",如果與之前的狀態不同,則從retry重新開始。
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
// 開啟新線程添加任務,然后加入線程池,並啟動任務所在的線程。
try {
// 新建Worker,並且指定firstTask為Worker的第一個任務。
w = new Worker(firstTask);
// 獲取Worker對應的線程。
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//獲取鎖
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//獲取線程池運行狀態
int rs = runStateOf(ctl.get());
//再次確認運行狀態是不是"RUNNING"狀態
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//檢查線程是不是可以啟動
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//將任務添加到緩存隊列
workers.add(w);
//獲取緩存隊列數量
int s = workers.size();
//如果緩存隊列大於線程池出現過的最大線程數量
if (s > largestPoolSize)
//修改largestPoolSize的值
largestPoolSize = s;
//添加任務成功
workerAdded = true;
}
} finally {
//不要忘記釋放鎖
mainLock.unlock();
}
//如果任務添加成功
if (workerAdded) {
//運行線程
t.start();
//線程啟動成功
workerStarted = true;
}
}
} finally {
//如果線程未成功啟動
if (! workerStarted)
//任務啟動失敗,將任務從緩存隊列刪除
addWorkerFailed(w);
}
//返回任務啟動的狀態
return workerStarted;
}
梳理
addWorker(Runnable firstTask,boolean core)的作用是將任務(firstTask)添加到線程池中,並啟動該任務。
- core為true的話,則以corePoolSize為界限,若”線程池中已有任務數量>=corePoolSize”,則返回false;
- core為false的話,則以maximumPoolSize為界限,若”線程池中已有任務數量>=maximumPoolSize”,則返回false。
- addWorker()會先通過for循環不斷嘗試更新ctl狀態,ctl記錄了”線程池中任務數量和線程池狀態”。
- 更新成功之后,再通過try模塊來將任務添加到線程池中,並啟動任務所在的線程。
- 可以得到:線程池在添加任務時,會創建任務對應的Worker對象;而一個Worker對象包含一個Thread對象。
- 通過將worker對象添加到“線程的workers集合”中,從而實現將任務添加到線程池中。
- 通過啟動Worker對應的Thread線程,則執行該任務。
查看關閉線程池shutdown()
shutdown()
public void shutdown() {
//獲取鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//檢查是否有權限終止線程池中線程
checkShutdownAccess();
//將線程池運行狀態設為關閉
advanceRunState(SHUTDOWN);
//中斷線程池中空閑的線程
interruptIdleWorkers();
// 鈎子函數,在ThreadPoolExecutor中沒有任何動作。
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
//釋放鎖
mainLock.unlock();
}
//嘗試終止線程
tryTerminate();
}
3.線程池中的線程初始化
默認情況下,創建線程池之后,線程池中是沒有線程的,需要提交任務之后才會創建線程。
如果需要線程池創建之后立即創建線程,可以通過以下兩個方法:
- prestartCoreThread():初始化一個核心線程;
- prestartAllCoreThreads():初始化所有核心線程
public boolean prestartCoreThread() {
return addIfUnderCorePoolSize(null); //注意傳進去的參數是null
}
public int prestartAllCoreThreads() {
int n = 0;
while (addIfUnderCorePoolSize(null))//注意傳進去的參數是null
++n;
return n;
}
4.任務緩存隊列及排隊策略
前面我們提到了任務緩存隊列,即workQueue,它用來存放等待執行的任務
- ArrayBlockingQueue:基於數組的先進先出隊列,此隊列創建時必須指定大小;
- LinkedBlockingQueue:基於鏈表的先進先出隊列,如果創建時沒有指定此隊列大小,則默認為Integer.MAX_VALUE;
- synchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是直接新建一個線程來執行新來的任務。
- DelayQueue:DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue是一個沒有大小限制的隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據的操作(消費者)才會被阻塞。
- PriorityBlockingQueue:基於優先級的阻塞隊列(優先級的判斷通過構造函數傳入的Compator對象來決定),但需要注意的是PriorityBlockingQueue並不會阻塞數據生產者,而只會在沒有可消費的數據時,阻塞數據的消費者。因此使用的時候要特別注意,生產者生產數據的速度絕對不能快於消費者消費數據的速度,否則時間一長,會最終耗盡所有的可用堆內存空間。
5.任務拒絕策略
當線程池的任務緩存隊列已滿並且線程池中的線程數目達到maximumPoolSize,如果還有任務到來就會采取任務拒絕策略,通常有以下四種策略:
- ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
- ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
- ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程)
- ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務
6.線程池的關閉
ThreadPoolExecutor提供了兩個方法,用於線程池的關閉,分別是shutdown()和shutdownNow(),其中:
- shutdown():不會立即終止線程池,而是要等所有任務緩存隊列中的任務都執行完后才終止,但再也不會接受新的任務
- shutdownNow():立即終止線程池,並嘗試打斷正在執行的任務,並且清空任務緩存隊列,返回尚未執行的任務
7.線程池容量的動態調整
ThreadPoolExecutor提供了動態調整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
- setCorePoolSize:設置核心池大小
- setMaximumPoolSize:設置線程池最大能創建的線程數目大小
當上述參數從小變大時,ThreadPoolExecutor進行線程賦值,還可能立即創建新的線程來執行任務。
怎么使用線程池
下面來看看線程池的具體使用
class ThreadExecutor {
fun test(){
val executor = ThreadPoolExecutor(5,10,300,TimeUnit.MILLISECONDS,
ArrayBlockingQueue<Runnable>(5),ThreadPoolExecutor.DiscardOldestPolicy())
for (i in 0 until 20){
try {
val task = MyRunnable(i)
executor.execute(task)
System.out.println("線程池中線程數目:"+executor.poolSize +",隊列中等待執行的任務數目:"+
executor.queue.size+",已執行完別的任務數目:"+executor.completedTaskCount
)
} catch (e: Exception) {
e.printStackTrace()
}
}
executor.shutdown()
}
class MyRunnable constructor(private val number:Int) : Runnable {
override fun run() {
System.out.println("正在執行的task $number")
try {
Thread.sleep(2000)
} catch (e: Exception) {
e.printStackTrace()
}finally {
}
System.out.println("task $number 執行完畢");
}
}
}
查看輸出結果
01-21 19:50:14.756 24533-24533/com.t9.news I/System.out: 線程池中線程數目:1,隊列中等待執行的任務數目:0,已執行完別的任務數目:0
01-21 19:50:14.757 24533-24605/com.t9.news I/System.out: 正在執行的task 0
01-21 19:50:14.757 24533-24533/com.t9.news I/System.out: 線程池中線程數目:2,隊列中等待執行的任務數目:0,已執行完別的任務數目:0
01-21 19:50:14.758 24533-24533/com.t9.news I/System.out: 線程池中線程數目:3,隊列中等待執行的任務數目:0,已執行完別的任務數目:0
01-21 19:50:14.758 24533-24607/com.t9.news I/System.out: 正在執行的task 2
01-21 19:50:14.759 24533-24533/com.t9.news I/System.out: 線程池中線程數目:4,隊列中等待執行的任務數目:0,已執行完別的任務數目:0
01-21 19:50:14.759 24533-24533/com.t9.news I/System.out: 線程池中線程數目:5,隊列中等待執行的任務數目:0,已執行完別的任務數目:0
01-21 19:50:14.759 24533-24608/com.t9.news I/System.out: 正在執行的task 3
01-21 19:50:14.759 24533-24533/com.t9.news I/System.out: 線程池中線程數目:5,隊列中等待執行的任務數目:1,已執行完別的任務數目:0
01-21 19:50:14.759 24533-24606/com.t9.news I/System.out: 正在執行的task 1
01-21 19:50:14.759 24533-24533/com.t9.news I/System.out: 線程池中線程數目:5,隊列中等待執行的任務數目:2,已執行完別的任務數目:0
01-21 19:50:14.760 24533-24609/com.t9.news I/System.out: 正在執行的task 4
01-21 19:50:14.760 24533-24533/com.t9.news I/System.out: 線程池中線程數目:5,隊列中等待執行的任務數目:3,已執行完別的任務數目:0
01-21 19:50:14.760 24533-24533/com.t9.news I/System.out: 線程池中線程數目:5,隊列中等待執行的任務數目:4,已執行完別的任務數目:0
01-21 19:50:14.760 24533-24533/com.t9.news I/System.out: 線程池中線程數目:5,隊列中等待執行的任務數目:5,已執行完別的任務數目:0
01-21 19:50:14.760 24533-24533/com.t9.news I/System.out: 線程池中線程數目:6,隊列中等待執行的任務數目:5,已執行完別的任務數目:0
01-21 19:50:14.764 24533-24610/com.t9.news I/System.out: 正在執行的task 10
01-21 19:50:14.764 24533-24533/com.t9.news I/System.out: 線程池中線程數目:7,隊列中等待執行的任務數目:5,已執行完別的任務數目:0
01-21 19:50:14.765 24533-24533/com.t9.news I/System.out: 線程池中線程數目:8,隊列中等待執行的任務數目:5,已執行完別的任務數目:0
01-21 19:50:14.766 24533-24533/com.t9.news I/System.out: 線程池中線程數目:9,隊列中等待執行的任務數目:5,已執行完別的任務數目:0
01-21 19:50:14.767 24533-24614/com.t9.news I/System.out: 正在執行的task 13
01-21 19:50:14.767 24533-24533/com.t9.news I/System.out: 線程池中線程數目:10,隊列中等待執行的任務數目:5,已執行完別的任務數目:0
01-21 19:50:14.767 24533-24615/com.t9.news I/System.out: 正在執行的task 14
01-21 19:50:14.767 24533-24533/com.t9.news I/System.out: 線程池中線程數目:10,隊列中等待執行的任務數目:5,已執行完別的任務數目:0
01-21 19:50:14.767 24533-24613/com.t9.news I/System.out: 正在執行的task 12
01-21 19:50:14.767 24533-24533/com.t9.news I/System.out: 線程池中線程數目:10,隊列中等待執行的任務數目:5,已執行完別的任務數目:0
01-21 19:50:14.768 24533-24533/com.t9.news I/System.out: 線程池中線程數目:10,隊列中等待執行的任務數目:5,已執行完別的任務數目:0
01-21 19:50:14.771 24533-24612/com.t9.news I/System.out: 正在執行的task 11
01-21 19:50:16.759 24533-24605/com.t9.news I/System.out: task 0 執行完畢
01-21 19:50:16.759 24533-24607/com.t9.news I/System.out: task 2 執行完畢
01-21 19:50:16.760 24533-24605/com.t9.news I/System.out: 正在執行的task 15
01-21 19:50:16.760 24533-24607/com.t9.news I/System.out: 正在執行的task 16
01-21 19:50:16.760 24533-24608/com.t9.news I/System.out: task 3 執行完畢
01-21 19:50:16.760 24533-24609/com.t9.news I/System.out: task 4 執行完畢
01-21 19:50:16.760 24533-24606/com.t9.news I/System.out: task 1 執行完畢
01-21 19:50:16.761 24533-24608/com.t9.news I/System.out: 正在執行的task 17
01-21 19:50:16.761 24533-24609/com.t9.news I/System.out: 正在執行的task 18
01-21 19:50:16.763 24533-24606/com.t9.news I/System.out: 正在執行的task 19
01-21 19:50:16.765 24533-24610/com.t9.news I/System.out: task 10 執行完畢
01-21 19:50:16.767 24533-24614/com.t9.news I/System.out: task 13 執行完畢
01-21 19:50:16.767 24533-24613/com.t9.news I/System.out: task 12 執行完畢
01-21 19:50:16.767 24533-24615/com.t9.news I/System.out: task 14 執行完畢
01-21 19:50:16.771 24533-24612/com.t9.news I/System.out: task 11 執行完畢
01-21 19:50:18.761 24533-24605/com.t9.news I/System.out: task 15 執行完畢
01-21 19:50:18.761 24533-24608/com.t9.news I/System.out: task 17 執行完畢
01-21 19:50:18.762 24533-24607/com.t9.news I/System.out: task 16 執行完畢
01-21 19:50:18.762 24533-24609/com.t9.news I/System.out: task 18 執行完畢
01-21 19:50:18.763 24533-24606/com.t9.news I/System.out: task 19 執行完畢
從執行結果可以看出,當線程池中線程的數目大於5時,便將任務放入任務緩存隊列里面,當任務緩存隊列滿了之后,變丟棄最前面的任務,即5-9任務被丟棄。
不過在java doc中,並不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態方法來創建線程池:
Executors.newCachedThreadPool(); //創建一個緩沖池,緩沖池容量大小為Integer.MAX_VALUE
Executors.newSingleThreadExecutor(); //創建容量為1的緩沖池
Executors.newFixedThreadPool(int); //創建固定容量大小的緩沖池
下面是這三個靜態方法的具體實現:
//corePoolSize=maximumPoolSize,使用LinkedBlockingQueue
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
//corePoolSize和maximumPoolSize都設置為1,也使用的LinkedBlockingQueue
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
corePoolSize為0,將maximumPoolSize為Integer.MAX_VALUE,時間60s,使用的SynchronousQueue,也就是說來了任務就創建線程運行,當線程空閑超過60s,就銷毀線程
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
一般開發中,盡量使用Executors提供的靜態方式生成線程池,如果不滿足需求,再自己去實現。
總結
以上就是線程池的主要原理,核心是提交execute()任務、添加任務addWorker()、關閉線程池shutdown()以及線程池的使用方式