阿里的面試官問了個問題,如果corePolllSize=10,MaxPollSize=20,如果來了25個線程 怎么辦,
先 達到 corePoolSize,然后 優先放入隊列,然后在到MaxPollSize;然后拒絕;
答案:
當一個任務通過execute(Runnable)方法欲添加到線程池時: 1、 如果此時線程池中的數量小於corePoolSize,即使線程池中的線程都處於空閑狀態,也要創建新的線程來處理被添加的任務。 2、 如果此時線程池中的數量等於 corePoolSize,但是緩沖隊列 workQueue未滿,那么任務被放入緩沖隊列。 3、如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,再有新的線程,開始增加線程池的線程數量處理新的線程,直到maximumPoolSize; 4、 如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量等於maximumPoolSize,那么通過 handler所指定的策略來處理此任務。也就是:處理任務的優先級為:核心線程corePoolSize、任務隊列workQueue、最大線程 maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。 5、 當線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。這樣,線程池可以動態的調整池中的線程數。
當線程數小於corePoolSize時,提交一個任務創建一個線程(即使這時有空閑線程)來執行該任務。
當線程數大於等於corePoolSize,首選將任務添加等待隊列workQueue中(這里的workQueue是上面的BlockingQueue),等有空閑線程時,讓空閑線程從隊列中取任務。
當等待隊列滿時,如果線程數量小於maximumPoolSize則創建新的線程,否則使用拒絕線程處理器來處理提交的任務。
慢慢的啟動到10,然后把剩下的15個放到阻塞隊列里面,並開始在線程池里面創建線程,直到最大MaximumPoolSize;
當然是先放在阻塞隊列(如果數量為0,就一直等待,LinkedBlockingDeque是一個由鏈表結構組成的雙向阻塞隊列,兩邊都可以進出的,那種,
參考:聊聊並發(七)——Java中的阻塞隊列)里面了,BlockingQueue,面試官想知道具體的處理流程,我掌握的不深,於是下定決心好好查查:
尤其是那個車間里工人的例子,好好看看,理解線程很有用:
在上一章中我們概述了一下線程池,這一章我們看一下創建newFixedThreadPool的源碼。例子還是我們在上一章中寫的那個例子。
創建newFixedThreadPool的方法:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
上面這兩個方法是創建固定數量的線程池的兩種方法,兩者的區別是:第二種創建方法多了一個線程工廠的方法。我們繼續看ThreadPoolExecutor這個類中的構造函數:
ThreadPoolExecutor的構造函數:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
ThreadPollExecutor中的所有的構造函數最終都會調用上面這個構造函數,接下來我們來分析一下這些參數的含義:
corePoolSize:
maxinumPoolSize:
線程池中能容納的最大線程數量,如果超出,則使用RejectedExecutionHandler拒絕策略處理。
keepAliveTime:
unit:
workQueue:
任務隊列。當線程池中的線程都處於運行狀態,而此時任務數量繼續增加,則需要一個容器來容納這些任務,這就是任務隊列。這個任務隊列是一個阻塞式的單端隊列。
newFixedThreadPool和newSingleThreadExector使用的是LinkedBlockingQueue的無界模式(美團面試題目)。
threadFactory:
handler:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue RejectedExecutionHandler handler) corePoolSize: 線程池維護線程的最少線程數,也是核心線程數,包括空閑線程 maximumPoolSize: 線程池維護線程的最大線程數 keepAliveTime: 線程池維護線程所允許的空閑時間 unit: 程池維護線程所允許的空閑時間的單位 workQueue: 線程池所使用的緩沖隊列 handler: 線程池對拒絕任務的處理策略


submit方法
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask);//執行任務 return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask);//執行任務 return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask);//執行任務 return ftask; }
這是三個重載方法,分別對應Runnable、帶結果的Runnable接口和Callable回調函數。其中的newTaskFor也是一個重載的方法,它通過層層的包裝,把Runnable接口包裝成了適配RunnableFuture的實現類,底層實現如下:
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); }
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
execute方法:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) {// if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); }
Java中的線程池
我們一般將任務(Task)提交到線程池中運行,對於一個線程池而言,需要關注的內容有以下幾點:
在什么樣的線程中執行任務
任務按照什么順序來執行(FIFO,LIFO,優先級)
最多有多少個任務能並發執行
最多有多個任務等待執行
如果系統過載則需要拒絕一個任務,如何通知任務被拒絕?
在執行一個任務之前或之后需要進行哪些操作
圍繞上面的問題,我們來研究一下java中的線程池
線程池的創建
Exectors.newFixedThreadPool(int size):創建一個固定大小的線程池。 每來一個任務創建一個線程,當線程數量為size將會停止創建。當線程池中的線程已滿,繼續提交任務,如果有空閑線程那么空閑線程去執行任務,否則將任務添加到一個無界的等待隊列中。
Exectors.newCachedThreadPool():創建一個可緩存的線程池。對線程池的規模沒有限制,當線程池的當前規模超過處理需求時(比如線程池中有10個線程,而需要處理的任務只有5個),那么將回收空閑線程。當需求增加時則會添加新的線程。
Exectors.newSingleThreadExcutor():創建一個單線程的Executor,它創建單個工作者線程來執行任務,如果這個線程異常結束,它會創建另一個線程來代替。
Exectors.newScheduledThreadPool():創建一個固定長度的線程池,而且以延遲或定時的方式來執行任務。
上面都是通過工廠方法來創建線程池,其實它們內部都是通過創建ThreadPoolExector對象來創建線程池的。下面是ThreadPoolExctor的構造函數。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ... }
我們看到構造函數是public類型的,所以我們也可以自定義自己的線程池。
在什么樣的線程中執行任務?
java中對於任務的描述有兩種,一種是Runnable型的任務,一種是Callable型的任務。前者運行結束后不會返回任何東西,而后者可以返回我們需要的計算結果,甚至異常。
在沒有返回值的線程中運行
創建一個線程池,然后調用其execute方法,並將一個Runnable對象傳遞進去即可。
ExectorService exector = Exectors.newCachedThreadPool(); exector.execute(new Runnable(){ public void run(){ System.out.println("running..."); } });
在有返回值的線程中運行
ExectorService exector = Exectors.newCachedThreadPool(); Callable<Result> task = new Callable<Result>() { public Result call() { return new Computor().compute(); } }; Future<Result> future = exector.submit(task); result = future.get(); //改方法會一直阻塞,直到提交的任務被運行完畢
任務按照什么順序來執行(FIFO,優先級)
如果任務按照某種順序來執行的話,則任務一定是串行執行的。我們可以看到在ThreadPoolExecutor中第四個參數是BlockingQueue,提交的任務都先放到該隊列中。如果傳入不同的BlockQueue就可以實現不同的執行順序。傳入LinkedBlockingQueue則表示先來先服務,傳入PriorityBlockingQueue則使用優先級來處理任務
Exectors.newSingleThreadExcutor()使用的是先來先服務策略
最多有多少個任務能並發執行
線程池中的線程會不斷從workQueue中取任務來執行,如果沒任務可執行,則線程處於空閑狀態。
在ThreadPoolExecutor中有兩個參數corePoolSize和maximumPoolSize,前者被稱為基本大小,表示一個線程池初始化時,里面應該有的一定數量的線程。但是默認情況下,ThreadPoolExecutor在初始化是並不會馬上創建corePoolSize個線程對象,它使用的是懶加載模式。
- 當線程數小於corePoolSize時,提交一個任務創建一個線程(即使這時有空閑線程)來執行該任務。
- 當線程數大於等於corePoolSize,首選將任務添加等待隊列workQueue中(這里的workQueue是上面的BlockingQueue),等有空閑線程時,讓空閑線程從隊列中取任務。
- 當等待隊列滿時,如果線程數量小於maximumPoolSize則創建新的線程,否則使用拒絕線程處理器來處理提交的任務。
最多有多少的任務等待執行
這個問題和BlockingQueue相關。 BlockingQueue有三個子類,一個是ArrayBlockingQueue(有界隊列),一個是LinkedBlockingQueue(默認無界,但可以配置為有界),PriorityBlockingQueue(默認無界,可配置為有界)。所以,對於有多少個任務等待執行與傳入的阻塞隊列有關。
newFixedThreadPool和newSingleThreadExector使用的是LinkedBlockingQueue的無界模式。而newCachedThreadPool使用的是SynchronousQueue,這種情況下線程是不需要排隊等待的,SynchronousQueue適用於線程池規模無界。
如果系統過載則需要拒絕一個任務,如何通知任務被拒絕?
當有界隊列被填滿或者某個任務被提交到一個已關閉的Executor時將會啟動飽和策略,即使用RejectedExecutionHandler來處理。JDK中提供了幾種不同的RejectedExecutionHandler的實現:AbortPolicy,CallerRunsPolicy, DiscardPolicy和DiscardOldestPolicy。
AbortPolicy:默認的飽和策略。該策略將拋出未檢查的RejectedExcutionException,調用者可以捕獲這個異常,然后根據自己的需求來處理。
DiscardPolicy:該策略將會拋棄提交的任務
DiscardOldestPolicy:該策略將會拋棄下一個將被執行的任務(處於隊頭的任務),然后嘗試重新提交該任務到等待隊列
CallerRunsPolicy:該策略既不會拋棄任務也不會拋出異常,而是在調用execute()的線程中運行任務。比如我們在主線程中調用了execute(task)方法,但是這時workQueue已經滿了,並且也不會創建的新的線程了。這時候將會在主線程中直接運行execute中的task。
在執行一個任務之前或之后需要進行哪些操作
ThreadPoolExecutor是可擴展的,它提供了幾個可以重載的方法:beforeExecute,afterExecute和terminated,這里用到了面向的切面編程的思想。無論任務是從run中正常返回,還是拋出異常而返回,afterExectue都會被調用。如果 beforeExecute中拋出了一個 RunntimeException,那么任務將不會被執行,並且 afterExecute也不會被調用。
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class Test { public static void main(String[] args) { TimingThreadPool executor = new TimingThreadPool(5, 10, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>()); for (int i = 0; i < 5; i++) executor.execute(new Runnable() { @Override public void run() { System.out.println("running1...."); } }); executor.shutdown(); } } class TimingThreadPool extends ThreadPoolExecutor { private final ThreadLocal<Long> startTime = new ThreadLocal<Long>(); private final AtomicLong numTasks = new AtomicLong(); private final AtomicLong totalTime = new AtomicLong(); public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); startTime.set(System.nanoTime()); } @Override protected void afterExecute(Runnable r, Throwable t) { try { long endTime = System.nanoTime(); long taskTime = endTime - startTime.get(); numTasks.incrementAndGet(); totalTime.addAndGet(taskTime); } finally { super.afterExecute(r, t); } } @Override protected void terminated() { try { System.out.println(String.format("Terminated: arg time = %d", totalTime.get() / numTasks.get())); } finally { super.terminated(); } } }
上面的代碼統計任務平均執行時間,在每個線程中beforeExecute和afertExecute都會執行一次,而terminated等線程池關閉的時候執行
參考:Java多線程和線程池
參考:java中的線程池