目錄
線程池
什么是線程池
線程池一種線程使用模式,線程池會維護多個線程,等待着分配可並發執行的任務,當有任務需要線程執行時,從線程池中分配線程給該任務而不用主動的創建線程。
線程池的好處
如果在我們平時如果需要用到線程時,我們一般是這樣做的:創建線程(T1),使用創建的線程來執行任務(T2),任務執行完成后銷毀當前線程(T3),這三個階段是必須要有的。
而如果使用線程池呢?
線程池會預先創建好一定數量的線程,需要的時候申請使用,在一個任務執行完后也不需要將該線程銷毀,很明顯的節省了T1和T3這兩階段的時間。
同時我們的線程由線程池來統一進行管理,這樣也提高了線程的可管理性。
手寫一個自己的線程池
現在我們可以簡單的理解為線程池實際上就是存放多個線程的數組,在程序啟動是預先實例化一定得線程實例,當有任務需要時分配出去。現在我們先來寫一個自己的線程池來理解一下線程池基本的工作過程。
線程池需要些什么?
首先線程池肯定需要一定數量的線程,所以首先需要一個線程數組,當然也可以是一個集合。
線程數組是用來進行存放線程實例的,要使用這些線程就需要有任務提交過來。當任務量過大時,我們是不可能在同一時刻給所有的任務分配一個線程的,所以我們還需要一個用於存放任務的容器。
這里的預先初始化線程實例的數量也需要我們來根據業務確定。
同時線程實例的數量也不能隨意的定義,所以我們還需要設置一個最大線程數。
//線程池中允許的最大線程數
private static int MAXTHREDNUM = Integer.MAX_VALUE;
//當用戶沒有指定時默認的線程數
private int threadNum = 6;
//線程隊列,存放線程任務
private List<Runnable> queue;
private WorkerThread[] workerThreads;
線程池工作
線程池的線程一般需要預先進行實例化,這里我們通過構造函數來模擬這個過程。
public MyThreadPool(int threadNum) {
this.threadNum = threadNum;
if(threadNum > MAXTHREDNUM)
threadNum = MAXTHREDNUM;
this.queue = new LinkedList<>();
this.workerThreads = new WorkerThread[threadNum];
init();
}
//初始化線程池中的線程
private void init(){
for(int i=0;i<threadNum;i++){
workerThreads[i] = new WorkerThread();
workerThreads[i].start();
}
}
在線程池准備好了后,我們需要像線程池中提交工作任務,任務統一提交到隊列中,當有任務時,自動分發線程。
//提交任務
public void execute(Runnable task){
synchronized (queue){
queue.add(task);
//提交任務后喚醒等待在隊列的線程
queue.notifyAll();
}
}
我們的工作線程為了獲取任務,需要一直監聽任務隊列,當隊列中有任務時就由一個線程去執行,這里我們用到了前面提到的安全中斷。
private class WorkerThread extends Thread {
private volatile boolean on = true;
@Override
public void run() {
Runnable task = null;
//判斷是否可以取任務
try {
while(on&&!isInterrupted()){
synchronized (queue){
while (on && !isInterrupted() && queue.isEmpty()) {
//這里如果使用阻塞隊列來獲取在執行時就不會報錯
//報錯是因為退出時銷毀了所有的線程資源,不影響使用
queue.wait(1000);
}
if (on && !isInterrupted() && !queue.isEmpty()) {
task = queue.remove(0);
}
if(task !=null){
//取到任務后執行
task.run();
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
task = null;//任務結束后手動置空,加速回收
}
public void cancel(){
on = false;
interrupt();
}
}
當然退出時還需要對線程池中的線程等進行銷毀。
//銷毀線程池
public void shutdown(){
for(int i=0;i<threadNum;i++){
workerThreads[i].cancel();
workerThreads[i] = null;
}
queue.clear();
}
好了,到這里我們的一個簡易版的線程池就完成了,功能雖然不多但是線程池運行的基本原理差不多實現了,實際上非常簡單,我們來寫個程序測試一下:
public class ThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
// 創建3個線程的線程池
MyThreadPool t = new MyThreadPool(3);
CountDownLatch countDownLatch = new CountDownLatch(5);
t.execute(new MyTask(countDownLatch, "testA"));
t.execute(new MyTask(countDownLatch, "testB"));
t.execute(new MyTask(countDownLatch, "testC"));
t.execute(new MyTask(countDownLatch, "testD"));
t.execute(new MyTask(countDownLatch, "testE"));
countDownLatch.await();
Thread.sleep(500);
t.shutdown();// 所有線程都執行完成才destory
System.out.println("finished...");
}
// 任務類
static class MyTask implements Runnable {
private CountDownLatch countDownLatch;
private String name;
private Random r = new Random();
public MyTask(CountDownLatch countDownLatch, String name) {
this.countDownLatch = countDownLatch;
this.name = name;
}
public String getName() {
return name;
}
@Override
public void run() {// 執行任務
try {
countDownLatch.countDown();
Thread.sleep(r.nextInt(1000));
System.out.println("任務 " + name + " 完成");
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getId()+" sleep InterruptedException:"
+Thread.currentThread().isInterrupted());
}
}
}
}
result:
任務 testA 完成
任務 testB 完成
任務 testC 完成
任務 testD 完成
任務 testE 完成
finished...
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at com.learn.threadpool.MyThreadPool$WorkerThread.run(MyThreadPool.java:75)
...
從結果可以看到我們提交的任務都被執行了,當所有任務執行完成后,我們強制銷毀了所有線程,所以會拋出異常。
JDK中的線程池
上面我們實現了一個簡易的線程池,稍微理解線程池的基本運作原理。現在我們來認識一些JDK中提供了線程池吧。
ThreadPoolExecutor
public class ThreadPoolExecutor extends AbstractExecutorService
ThreadPoolExecutor是一個ExecutorService ,使用可能的幾個合並的線程執行每個提交的任務,通常使用Executors工廠方法配置,通過Executors可以配置多種適合不同場景的線程池。
ThreadPoolExecutor中的主要參數
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
corePoolSize
線程池中的核心線程數,當外部提交一個任務時,線程池就創建一個新線程執行任務,直到當前線程數等於corePoolSize時不再創建新線程;
如果當前線程數為corePoolSize,繼續提交的任務被保存到阻塞隊列中,等待被執行;
如果執行了線程池的prestartAllCoreThreads()方法,線程池會提前創建並啟動所有核心線程。
maximumPoolSize
線程池中允許的最大線程數。如果當前阻塞隊列已滿,還在繼續提交任務,則創建新的線程執行任務,前提是當前線程數小於maximumPoolSize。
keepAliveTime
線程空閑時的存活時間,即當線程沒有任務執行時,繼續存活的時間。默認情況下,線程一般不會被銷毀,該參數只在線程數大於corePoolSize時才有用。
workQueue
workQueue必須是阻塞隊列。當線程池中的線程數超過corePoolSize的時候,線程會進入阻塞隊列進行等待。阻塞隊列可以使有界的也可以是無界的。
threadFactory
創建線程的工廠,通過自定義的線程工廠可以給每個新建的線程設置一個線程名。Executors靜態工廠里默認的threadFactory,線程的命名規則是“pool-{數字}-thread-{數字}”。
RejectedExecutionHandler
線程池的飽和處理策略,當阻塞隊列滿了,且沒有空閑的工作線程,如果繼續提交任務,必須采取一種策略處理該任務,線程池提供了4種策略:
- AbortPolicy:直接拋出異常,默認的處理策略
- CallerRunsPolicy:使用調用者所屬的線程來執行當前任務
- DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務
- DiscardPolicy:直接丟棄該任務
如果上述提供的處理策略無法滿足業務需求,也可以根據場景實現RejectedExecutionHandler接口,自定義飽和策略,如記錄日志或持久化存儲不能處理的任務。
ThreadPoolExecutor中的主要執行流程
//圖片來自網絡
- 線程池判斷核心線程池里的線程(corePoolSize)是否都在執行任務。如果不是,則創建一個新的工作線程來執行任務。如果核心線程池里的線程都在執行任務,則進入2。
- 線程池判斷工作隊列(workQueue)是否已滿。如果工作隊列沒有滿,則將新提交的任務存儲在該隊列里。如果工作隊列滿了,則進入3。
- 線程池判斷線程池的線程(maximumPoolSize)是否都處於工作狀態。如果沒有,則創建一個新的工作線程來執行任務。如果已經滿了,則交給飽和策略來處理這個任務。
這里需要注意的是核心線程池大小指得是corePoolSize參數,而線程池工作線程數指的是maximumPoolSize。
Executor
實際上我們在使用線程池時,並不一定需要自己來定義上面介紹的參數的值,JDK為我們提供了一個調度框架。通過這個調度框架我們可以輕松的創建好線程池以及異步的獲取任務的執行結果。
調度框架的組成
任務
一般是指需要被執行的任務,多為使用者提供。被提交的任務需要實現Runnable接口或Callable接口。
任務的執行
Executor是任務執行機制的核心接口,其將任務的提交和執行分離開來。ExecutorService繼承了Executor並做了一些擴展,可以產生Future為跟蹤一個或多個異步任務執行。任務的執行主要是通過實現了Executor和ExecutorService接口的類來進行實現。例如:ThreadPoolExecutor和ScheduledThreadPoolExecutor。
結果獲取
對結果的獲取可以通過Future接口以及其子類接口來實現。Future接口提供了一系列諸如檢查是否就緒,是否執行完成,阻塞以及獲取結果等方法。
Executors工廠中的線程池
FixedThreadPool
new ThreadPoolExecutor(nThreads, nThreads, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
該線程池中corePoolSize和maximumPoolSize參數一致。同時使用無界阻塞隊列,將會導致maximumPoolSize和keepAliveTime已經飽和策略無效,因為隊列會一直接收任務,直到OOM。
SingleThreadExecutor
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>())
該線程池中corePoolSize和maximumPoolSize都為1,表示始終只有一個線程在工作,適用於需要保證順序地執行各個任務;並且在任意時間點,不會有多個線程是活動的應用場景。同時使用無界阻塞隊列,當任務多時極有可能OOM。
CachedThreadPool
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>()
CachedThreadPool類型的線程池corePoolSize為0,表示任務將會提交給隊列,但是SynchronousQueue又是一個不包含任何容量的隊列。所以每一個任務提交過來都會創建一個新的線程來執行,該類型的線程池適用於執行很多的短期異步任務的程序,或者是負載較輕的服務器。如果當任務的提交速度一旦超過任務的執行速度,在極端情況下可能會因為創建過多線程而耗盡CPU和內存資源。
ScheduledThreadPool
對於定時任務類型的線程池,Executor可以創建兩種不同類型的線程池:ScheduledThreadPoolExecutor和SingleThreadScheduledExecutor,前者是包含若干個線程的ScheduledThreadPoolExecutor,后者是只包含一個的ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor適用於需要多個后台線程執行周期任務,同時為了滿足資源管理的需求而需要限制后台線程的數量的應用場景。
SingleThreadScheduledExecutor適用於需要單個后台線程執行周期任務,同時需要保證順序地執行各個任務的應用場景。
new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
在對該類型線程池進行實例化時,我們可以看到maximumPoolSize設置為了Integer的最大值,所以很明顯在極端情況下和CachedThreadPool類型一樣可能會因為創建過多線程而耗盡CPU和內存資源。
DelayedWorkQueue是一種延時阻塞隊列,此隊列的特點為其中元素只能在其延遲到期時才被使用。ScheduledThreadPool類型在執行任務時和其他線程池有些不同。
- ScheduledThreadPool類型線程池中的線程(假設現在線程A開始取任務)從DelayedWorkQueue中取已經到期的任務。
- 線程A獲取到任務后開始執行。
- 任務執行完成后設置該任務下一次執行的時間。
- 將該任務重新放入到線程池中。
ScheduledThreadPool中存在着定時任務和延時任務兩種。
延時任務通過schedule(...)
方法以及重載方法和scheduleWithFixedDelay
實現,延時任務通過設置某個時間間隔后執行,schedule(...)
僅執行一次。
定時任務由scheduleAtFixedRate
實現。該方法創建並執行在給定的初始延遲之后,隨后以給定的時間段進行周期性動作,即固定時間間隔的任務。
特殊的scheduleWithFixedDelay
方法是創建並執行在給定的初始延遲之后首先啟用的定期動作,隨后在一個執行的終止和下一個執行的開始之間給定的延遲,即固定延時間隔的任務。
固定時間間隔的任務不論每次任務花費多少時間,下次任務開始執行時間是確定的。對於scheduleAtFixedRate
方法中,若任務處理時長超出設置的定時頻率時長,本次任務執行完才開始下次任務,下次任務已經處於超時狀態,會馬上開始執行。若任務處理時長小於定時頻率時長,任務執行完后,定時器等待,下次任務會在定時器等待頻率時長后執行。
固定延時間隔的任務是指每次執行完任務以后都等待一個固定的時間。由於操作系統調度以及每次任務執行的語句可能不同,所以每次任務執行所花費的時間是不確定的,也就導致了每次任務的執行周期存在一定的波動。
需要注意的是定時或延時任務中所涉及到時間、周期不能保證實時性及准確性,實際運行中會有一定的誤差。
Callable/Future
在介紹實現多線程的時候我們有簡單介紹過Runnable和Callable的,這兩者基本相同,不同在於Callable可以返回一個結果,而Runnable不返回結果。對於Callable接口的使用方法和Runnable基本相同,同時我們也可以選擇是否對結果進行接收處理。在Executors中提供了將Runnable轉換為Callable的api:Callable<Object> callable(Runnable task)
。
Future是一個用於接收Runnable和Callable計算結果的接口,當然它還提供了查詢任務狀態,中斷或者阻塞任務以及查詢結果的能力。
boolean cancel(boolean mayInterruptIfRunning) //嘗試取消執行此任務。
V get() //等待計算完成,然后檢索其結果。
V get(long timeout, TimeUnit unit) //等待最多在給定的時間,然后檢索其結果(如果可用)。
boolean isCancelled() //如果此任務在正常完成之前被取消,則返回 true 。
boolean isDone() //如果任務已完成返回true。
FutureTask是對Future的基本實現,具有啟動和取消計算的方法,查詢計算是否完整,並檢索計算結果。FutureTask對Future做了一定得擴展:
void run() //將此future設置為其計算結果,除非已被取消。
protected boolean runAndReset() //執行計算而不設置其結果,然后重置為初始狀態,如果計算遇到異常或被取消,則不執行此操作。
protected void set(V v) //將此Future的結果設置為給定值,除非Future已被設置或已被取消。
protected void setException(Throwable t) //除非已經設置了此 Future 或已將其取消,否則它將報告一個 ExecutionException,並將給定的 throwable 作為其原因。
FutureTask除了實現Future接口外,還實現了Runnable接口。所以FutureTask可以由Executor執行,也可以由調用線程直接執行futureTask.run()。
當FutureTask處於未啟動或已啟動狀態時,執行FutureTask.get()方法將導致調用線程阻塞;
當FutureTask處於已完成狀態時,執行FutureTask.get()方法將導致調用線程立即返回結果或拋出異常。
當FutureTask處於未啟動狀態時,執行FutureTask.cancel()方法將導致此任務永遠不會被執行;
當FutureTask處於已啟動狀態時,執行FutureTask.cancel(true)方法將以中斷執行此任務線程的方式來嘗試停止該任務;
當FutureTask處於已啟動狀態時,執行FutureTask.cancel(false)方法將不會對正在執行此任務的線程產生影響(讓正在執行的任務運行完成)。
關於是否使用Executors
在之前阿里巴巴出的java開發手冊中,有明確提出禁止使用Executors:
【強制】線程池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式,
這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。
在上面我們分析過使用Executors創建的幾種線程池的使用場景和缺點,大多數情況下出問題在於可能導致OOM,在我實際使用中基本沒有遇到過這樣的情況。但是考慮到阿里巴巴這樣體量的並發請求,可能遇到這種情況的幾率較大。所以我們還是應該根據實際情況考慮是否使用,當然實際遵循阿里巴巴開發手冊來可能會更好一點,畢竟這是國類頂尖公司常年在生產中積累下的經驗。
最后,在本節中只是簡單介紹線程池及其基本原理,幫助更好的理解線程池。並不涉及具體如何使用。