jdk的線程池實現-ThreadPoolExecutor


前言

一直以來對線程池的概念都挺模糊的,想不明白線程池要如何實現,今天難得周末,就開始查閱資料,研究了一下jdk中的線程池實現,終於解開了我長久以來的疑惑,本文參考文章來自網絡,原文連接如下:

http://www.cnblogs.com/dolphin0520/p/3932921.html

參考連接針對jdk6,本文針對jdk8

疑惑

和線程池類似的有一個概念叫連接池,在數據庫連接中使用的非常多,連接池比較好理解,一般來說就是一個連接建立完成之后不去關閉它,需要的時候就看獲取這個連接的對象並給它的輸入流寫數據,並從輸出流讀取響應結果,但是在線程中,一旦某個線程的run方法運行結束之后,線程也就結束了,因此和連接池有很大的不同,這也給我留下了幾個疑惑:

  • 如何復用一個線程?
  • 多個線程如何管理?
  • 如何知道某個線程是目前正在運行還是在等待任務?

今天看過jdk中對線程池的實現,才真正明白,線程池和連接池是有很大不同的,使用上也完全不一樣。

  1. 連接池是每次需要時候的時候,從池里取出一個連接給我們,我們再使用這個連接來交換數據,而線程池並不是在使用的時候從池里取出一個線程對象給我們使用,而是將我們的任務交給線程池,由線程池自己調度任務決定什么時候執行這個任務。
  2. 連接池的連接用完之后,會直接放到池內,等待下一個請求連接,而線程池的線程一旦運行完,線程也就結束了,因此不存在放回池內的操作。
  3. 一個連接池要持有一定數量的連接,只要保證持有這些未關閉的連接的對象即可,而線程池要持有一定數量的線程,必須保證持有的線程的run方法不會運行結束。

了解了線程池和連接池的區別之后,我們就可以知道,雖然名字很像,但是實際上這兩者在原理和概念上是完全不一樣的。

jdk1.5以后,新增了一個並發包java.concurrent,這個包里就包含了線程池的實現,最核心的實現類是java.util.concurrent.ThreadPoolExecutor

實際上jdk從1.5到1.8,java.util.concurrent.ThreadPoolExecutor已經經過多次重構了,1.6和1.8在實現上已經有了很大的不同了,本文針對的是jdk8中的實現。

jdk的線程池實現

我們來看一下jdk中的線程池是如何實現的。

首先要先了解一下類結構,如下圖:

enter description here

類結構

最頂層的接口是java.util.concurrent.Executor:

public interface Executor { void execute(Runnable command); } 

只有一個接口,傳入一個Runnable對象,稱為指令,線程池就會幫你執行這個指令。

它的一級子接口是java.util.concurrent.ExecutorService:

public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; } 

這個接口是執行器服務接口,聲明了關於執行器的許多管理方法,如:

void shutdown(); <T> Future<T> submit(Callable<T> task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; 

這些方法都是用於對線程池內的任務管理的接口。

接下來我們看java.util.concurrent.ExecutorService的抽象實現類java.util.concurrent.AbstractExecutorService,這里就不貼源碼了,這個抽象類實現了接口中的大部分方法,不過大部分的實現都依賴於Executor接口聲明的execute方法,而這里並沒有實現這個關鍵的方法,而是把這個方法的實現交給了子類,也就是java.util.concurrent.ThreadPoolExecutor來實現了。

接下來我們看一下最終的實現類java.util.concurrent.ThreadPoolExecutor,在類結構圖中,我們還能看到ThreadPoolExecutor有如下幾個內部類:

public static class DiscardPolicy implements RejectedExecutionHandler public static class CallerRunsPolicy implements RejectedExecutionHandler private final class Worker extends AbstractQueuedSynchronizer implements Runnable public static class AbortPolicy implements RejectedExecutionHandler public static class DiscardOldestPolicy implements RejectedExecutionHandler

這幾個類都是線程池實現需要的,這里最關鍵的類就是Worker,每一個Worker對象代表了一個線程,同時也是真正負責執行任務的對象。

線程池的實現原理

我們現在已經了解線程池的整體結構,現在讓我們來看看具體的實現,我們按照下面的幾個方面來閱讀jdk的源碼:

  1. 線程池的狀態
  2. 線程任務執行
  3. 線程池關閉
  4. 線程容量動態調整

線程池的狀態

我們先看看跟線程池狀態有關的幾個屬性:

AtomicInteger ctl; // 狀態計數器 int RUNNING; // 運行狀態 int SHUTDOWN ;// 關閉狀態 int STOP; // 停止狀態 int TIDYING; // 整理狀態 int TERMINATED; //結束狀態 

這里的狀態計數器ctl是用來標識線程池當前狀態和線程數的,這里要特別注意,這個屬性把兩個變量打包成一個變量了,通過這個屬性可以計算得出目前的線程數和線程池當前的狀態。

其他屬性都是常量,用來定義狀態的,從源碼的定義看,線程池的狀態是有大小關系的,分別是:

RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED 

各個狀態表示的含義如下:

  • RUNNING:正在處理任務和接受隊列中的任務。
  • SHUTDOWN:不再接受新的任務,但是會繼續處理完隊列中的任務。
  • STOP:不再接受新任務,也不繼續處理隊列中的任務,並且會中止正在處理的任務。
  • TIDYING:所有任務都已經處理結束,目前worker數為0,當線程池進入這個狀態的時候,會調用terminated()方法。
  • TERMINATED:線程池已經全部結束,並且terminated()方法執行完成。

注意:這里使用了AtomicInteger,因此也決定了線程池最大線程數不能超過(2^29)-1個線程,大約是500萬個線程,多數情況下是能滿足要求的,如果需要支持更高線程數的話,需要使用AtomicLong來標識。

這里還有幾個跟狀態有關的方法

static int runStateOf(int c) ; // 計算線程池當前狀態 static boolean runStateLessThan(int c, int s) ; // 計算當前狀態是否小於某個狀態 static boolean runStateAtLeast(int c, int s) ; // 計算當前狀態是否大於或等於某個狀態 static boolean isRunning(int c) ; // 計算當前狀態是否RUNNING 

線程任務執行

在了解線程任務執行的實現之前,我們需要先了解一些等一下會涉及到的成員變量:

private volatile int corePoolSize; private final BlockingQueue<Runnable> workQueue; private volatile int maximumPoolSize; private final ReentrantLock mainLock = new ReentrantLock(); private volatile RejectedExecutionHandler handler; private volatile long keepAliveTime; 

corePoolSize

這個屬性稱為核心線程數,實際上表示的是整個線程池中最少存活的線程數,實際上,線程池會在任務數比較多的情況下,創建一些新的臨時線程出來執行任務,等到任務數降下來之后,臨時線程就會逐漸減少,總的線程數會回落到核心線程數,需要注意的是,當線程池任務隊列中沒有任務的時候,所有線程都會處於等待狀態,但是臨時線程等待會超時,當超時之后,臨時線程數就會結束,而核心線程是不允許等待超時的,因此核心線程不會減少,這也保證了線程池的線程數量不會少於核心線程數。

workQueue

這個屬性稱為任務隊列,這個隊列用來存放任務,在任務量比較大,核心線程在執行任務的時候,會把新來的任務放到任務隊列中,如果任務隊列再滿了的話,會嘗試創建臨時線程來處理任務,如果臨時線程全部滿了的話,就會啟動拒絕策略。

這里任務隊列的poll()允許返回null,線程池判斷隊列是否為空時不會根據這個判斷,而是根據隊列的isEmpty()方法判斷的。

maximumPoolSize

最大線程數,我們已經知道,線程池默認會有一定數量的核心線程,當任務繁忙的時候,就會創建臨時線程來處理任務,這個最大線程數表示的就是線程池最大的線程數量,也就是說,臨時線程的數量最多不能超過最大線程數和核心線程數的差值。

mainLock

線程池主鎖,這個對象用於線程池內部某些需要同步的操作,在對線程池本身的一些狀態操作的時候,必須使用這個鎖。

handler

任務拒絕策略處理器,這個拒絕處理策略是可以由外界傳入的,具體的策略可以由使用者自己決定,當然,線程池默認使用的處理策略是java.util.concurrent.ThreadPoolExecutor.AbortPolicy,這個在前面我們已經看到了,就是ThreadPoolExecutor的內部類,這個默認策略是直接拋出RejectedExecutionException異常。

keepAliveTime

臨時線程等待超時時間,這個屬性表示創建的臨時線程在空閑的時候最長的等待時間,如果超過這個時間,線程就會結束掉。

了解上面的成員變量之后,我們來看一下線程池是如何執行任務的,下面是execute()的源碼:

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } 

我們先觀察這個邏輯,首先判斷了傳入的指令對象是否為空,為空就不用執行了,直接拋出異常。

如果指令對象不為空,那么就真正進入線程任務的邏輯,一共分為3步來處理:

  1. 檢查當前線程總數,如果低於核心線程數,則創建新的線程來執行這個任務
int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } 

這里workerCountOf(c) 可以從計數器(clt)的結果中計算出當前線程數。

addWorker(command, true)會檢查線程池狀態和總線程數,並確定是否創建新線程,如果創建了新線程執行這個任務,則返回true,如果沒有創建新線程,則返回false,這個方法我們后邊再細說。

  1. 嘗試把任務放入任務隊列,並且重新檢查線程池狀態,如果線程池已經不接收新的任務,則移除這個任務,並轉入拒絕策略。
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } 

這里我們看到,第二步先檢查了線程池當前是否運行狀態,如果是運行狀態的話,則執行workQueue.offer(command)把任務放入任務隊列。

注意&&運算符的特性,如果線程池已經不在RUNNING狀態了,workQueue.offer(command)是不會執行的。

任務放入隊列之后,會復查線程池狀態是否RUNNING,這里需要做復查的主要原因是在前面的檢查中沒有加鎖,因此可能在添加任務隊列的過程,其他線程修改了線程池的狀態。

如果這個時候線程池狀態被修改了,那么就會把這次添加的任務移除remove(command),同時啟動拒絕策略reject(command),拒絕策略我們后邊再細講。

如果線程池狀態沒有被改變,則重新檢查當前核心線程數,如果為0則調用addWorker(null, false)去隊列中取任務並執行,如果不為0,則不做任何操作,等待線程執行完當前任務后自動去任務隊列中獲取新的任務並執行。

  1. 如果任務隊列已滿,則嘗試添加臨時線程,並把當然任務交給臨時線程處理,如果臨時線程也滿了,則啟動拒絕策略
else if (!addWorker(command, false)) reject(command); 

這里先通過addWorker(command, false)嘗試添加臨時線程,如果臨時線程創建成功則由臨時線程執行這個任務,如果臨時線程創建失敗,則會返回false,並轉入拒絕策略reject(command)

現在我們已經了解了,當我們把一個任務交給線程池的時候,線程池是如何處理這個任務的,接下來我們再來分析這個處理過程中涉及的兩個重要函數:

private boolean addWorker(Runnable firstTask, boolean core)

這個方法主要用於添加線程和啟動線程,源碼如下:

    private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } 

這里有兩個參數:

Runnable firstTask; // 表示新建的線程的第一個任務 boolean core; // 是否以核心線程數為邊界,如果傳入true,表示以核心線程數為邊界,當前線程超過核心線程數則不創建新線程,如果不使用核心線程數為邊界,則會以最大線程數為邊界 

進入這個函數的時候,第一步就是檢查線程池當前的狀態:

int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; 

如果已經不允許接受新任務了,這里就直接返回了,如果允許接受新任務的話,會繼續執行:

for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; } 

這里需要好好理解一下,進入這個循環的時候,首先檢查是否要求使用核心線程數為邊界,如果不滿足條件,則直接返回false:

if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; 

如果不要求核心線程,則會執行compareAndIncrementWorkerCount(c)給計數器加1,同時跳出循環,執行下一步,如果計數器添加失敗,會再次計算線程池當前狀態是否RUNNING,如果線程池還在RUNNING狀態,則繼續重試:

if (runStateOf(c) != rs) continue retry; 

如果在前面已經添加了線程數了,那么下一步就開始創建新線程了:

w = new Worker(firstTask); final Thread t = w.thread; 

前面我們已經說了,一個Worker對象表示的是一個線程,這里我們看一下Worker的構造方法:

Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } 

可以看到,每創建一個Worker對象,就會創建一個新的線程,並以自己作為線程的運行對象(Worker自己也是Runnable的實現類)。

創建了Worker對象之后,需要對線程池做一系列的檢查,並將這個對象加入到線程池中:

final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } 

這里首先獲取線程池的主鎖,保證在添加線程的過程不受其他線程干擾。

mainLock.lock(); 

然后檢查線程池狀態和線程狀態,如果線程池各個狀態都是正常的,可以把線程加入到線程池中,則會把線程池加入線程池,並將線程添加狀態設置為true:

workers.add(w); // ... 省略若干代碼 workerAdded = true; 

最后釋放鎖:

finally { mainLock.unlock(); } 

這里要特別注意,鎖一定要在finally代碼塊中釋放,不然很容易造成死鎖,想知道為什么的同學可以查一下synchronized關鍵字和Lock對象獲取鎖的區別。

最后,判斷線程是否已經加入線程池中,如果已經加入線程池中,則啟動線程:

if (workerAdded) { t.start(); workerStarted = true; } 

如果前面的過程拋出了沒有捕獲的異常,會在最后判斷線程是否啟動,如果線程沒有啟動,則會做回滾:

finally { if (! workerStarted) addWorkerFailed(w); } 

final void reject(Runnable command)

這個方法是調用線程池的拒絕處理策略:

final void reject(Runnable command) { handler.rejectedExecution(command, this); } 

當然這個策略可以通過我們自己傳入的對象來處理,默認使用AbortPolicy處理,拋出異常。


現在我們已經知道線程池整個執行的過程了,那么線程池如何保證線程的存活呢?這里就涉及到Workerrun方法實現了。

public void run() { runWorker(this); } 

這里實際上是調用了java.util.concurrent.ThreadPoolExecutorrunWorker(Worker w)方法(jdk6中這個方法在Worker內部),我們來看下這個方法:

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } 

這里首先調用Workerunlock()方法,允許這個線程被中斷,然后進入一個循環,這個循環內部做了幾件事情:

  1. 獲取要執行的任務
while (task != null || (task = getTask()) != null) 

這里判斷是否有第一個任務,如果有第一個任務則使用第一個任務,如果沒有第一個任務,則使用getTask()獲得新任務,getTask()是一個重要的方法,如果獲取不到任務的話,這個方法會阻塞並等待任務,后邊我們再來詳細看這個方法。

  1. 獲取當前線程的鎖,檢查當前線程是否允許運行
w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); 
  1. 所有檢查通過之后,確認當前任務可以執行了,就開始執行任務
try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } 

執行任務的過程,先調用了beforeExecute(wt, task)做執行前處理,任務執行完成后,調用afterExecute(task, thrown)做執行完成后處理,這里主要是留給開發者擴展用的,默認不做任何處理,如果我們需要做一些處理,比如計算任務執行時間一類的,可以通過繼承java.util.concurrent.ThreadPoolExecutor並重寫這兩個方法來實現。

任務執行完成后,在finally中釋放了鎖並給完成任務計數器加1。

然后再回到循環的開頭,繼續取新的任務執行,如果沒有新的任務,線程就會阻塞在getTask()方法內,現在我們可以來看看這個方法如何實現了:

private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } 

這個方法已經很好理解了,這里就不再多做解釋,簡單而言就是檢查線程池狀態,只要線程池還沒有終止,這里就就會無限循環知道拋出異常或者線程池終止,線程的阻塞是用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)workQueue.take()實現的。

線程池關閉

線程池的關閉就比較簡單了,在java.util.concurrent.ExecutorService接口中提供了如下兩個方法:

void shutdown(); // 不會立即終止線程池,而是要等所有任務緩存隊列中的任務都執行完后才終止,但再也不會接受新的任務 List<Runnable> shutdownNow(); // 立即終止線程池,並嘗試打斷正在執行的任務,並且清空任務緩存隊列,返回尚未執行的任務 

線程池容量的動態調整

動態調整線程池容量,在執行過程我們已經知道了,只要改變核心線程數和最大線程數即可:

void setCorePoolSize(int corePoolSize);// 調整核心線程數 void setMaximumPoolSize(int maximumPoolSize) ; // 調整動態線程數 

線程池使用

jdk8中提供了一個線程池的工程類java.util.concurrent.Executors,我們一般使用這個工廠類來創建線程池,而不是直接new線程池對象,這個工程類提供了多個創建線程池對象的靜態方法,其實使用的是線程池不同的構造器和傳了不同的參數而已,有興趣的童鞋請自行查閱API文檔,下面給出一個例子:

import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; public class Main { public static void main(String[] args) { ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(15); for(int i=0;i<15;i++){ final int index = i; executor.execute(new Runnable() { @Override public void run() { System.out.println("正在執行task "+index); try { Thread.currentThread().sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("task "+index+"執行完畢"); } }); System.out.println("線程池中線程數目:"+executor.getPoolSize()+",隊列中等待執行的任務數目:"+ executor.getQueue().size()+",已執行玩別的任務數目:"+executor.getCompletedTaskCount()); } executor.shutdown(); } } 

執行結果如下:

正在執行task 0
線程池中線程數目:1,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
線程池中線程數目:2,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 1
線程池中線程數目:3,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
線程池中線程數目:4,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
線程池中線程數目:6,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
線程池中線程數目:7,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
線程池中線程數目:8,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
線程池中線程數目:9,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 2
線程池中線程數目:10,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 3
線程池中線程數目:11,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 4
線程池中線程數目:12,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
線程池中線程數目:13,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 5
線程池中線程數目:14,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 6
線程池中線程數目:15,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 7
正在執行task 8
正在執行task 9
正在執行task 10
正在執行task 11
正在執行task 12
正在執行task 13
正在執行task 14
task 0執行完畢
task 9執行完畢
task 7執行完畢
task 6執行完畢
task 5執行完畢
task 3執行完畢
task 2執行完畢
task 4執行完畢
task 1執行完畢
task 11執行完畢
task 10執行完畢
task 8執行完畢
task 13執行完畢
task 12執行完畢
task 14執行完畢

合理配置線程池

知道了線程池的作用和實現,那么我們要怎么合理配置線程池大小呢?一般有如下規則:

  • 如果是CPU密集型任務,就需要盡量壓榨CPU,參考值可以設為 NCPU+1
  • 如果是IO密集型任務,參考值可以設置為2*NCPU

當然,這只是一個參考值,具體的設置還需要根據實際情況進行調整,比如可以先將線程池大小設置為參考值,再觀察任務運行情況和系統負載、資源利用率來進行適當調整。

 

  1. 線程池剛創建時,里面沒有一個線程。任務隊列是作為參數傳進來的。不過,就算隊列里面有任務,線程池也不會馬上執行它們。
  2. 當調用 execute() 方法添加一個任務時,線程池會做如下判斷:
    1. 如果正在運行的線程數量小於 corePoolSize,那么馬上創建線程運行這個任務;
    2. 如果正在運行的線程數量大於或等於 corePoolSize,那么將這個任務放入隊列。
    3. 如果這時候隊列滿了,而且正在運行的線程數量小於 maximumPoolSize,那么還是要創建線程運行這個任務;
    4. 如果隊列滿了,而且正在運行的線程數量大於或等於 maximumPoolSize,那么線程池會拋出異常,告訴調用者“我不能再接受任務了”。
  3. 當一個線程完成任務時,它會從隊列中取下一個任務來執行。
  4. 當一個線程無事可做,超過一定的時間(keepAliveTime)時,線程池會判斷,如果當前運行的線程數大於 corePoolSize,那么這個線程就被停掉。所以線程池的所有任務完成后,它最終會收縮到 corePoolSize 的大小。

這樣的過程說明,並不是先加入任務就一定會先執行。假設隊列大小為 10,corePoolSize 為 3,maximumPoolSize 為 6,那么當加入 20 個任務時,執行的順序就是這樣的:首先執行任務 1、2、3,然后任務 4~13 被放入隊列。這時候隊列滿了,任務 14、15、16 會被馬上執行,而任務 17~20 則會拋出異常。最終順序是:1、2、3、14、15、16、4、5、6、7、8、9、10、11、12、13。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM