線程五個狀態(生命周期):

線程運行時間
假設一個服務器完成一項任務所需時間為:T1 創建線程時間,T2 在線程中執行任務的時間,T3 銷毀線程時間。
如果:T1 + T3 遠大於 T2,則可以采用線程池,以提高服務器性能。
線程池技術
一個線程池包括以下四個基本組成部分
1、線程池管理器(ThreadPool):用於創建並管理線程池,包括 創建線程池,銷毀線程池,添加新任務;
2、工作線程(PoolWorker):線程池中線程,在沒有任務時處於等待狀態,可以循環的執行任務;
3、任務接口(Task):每個任務必須實現的接口,以供工作線程調度任務的執行,它主要規定了任務的入口,任務執行完后的收尾工作,任務的執行狀態等;
4、任務隊列(taskQueue):用於存放沒有處理的任務。提供一種緩沖機制。
線程池技術正是關注如何縮短或調整T1,T3時間的技術,從而提高服務器程序性能的。它把T1,T3分別安排在服務器程序的啟動和結束的時間段或者一些空閑的時間段,這樣在服務器程序處理客戶請求時,不會有T1,T3的開銷了。
線程池不僅調整T1,T3產生的時間段,而且它還顯著減少了創建線程的數目,看一個例子:
假設一個服務器一天要處理50000個請求,並且每個請求需要一個單獨的線程完成。在線程池中,線程數一般是固定的,所以產生線程總數不會超過線程池中線程的數目,而如果服務器不利用線程池來處理這些請求則線程總數為50000。一般線程池大小是遠小於50000。所以利用線程池的服務器程序不會為了創建50000而在處理請求時浪費時間,從而提高效率。
java並發編程:線程池的使用
我們使用線程的時候就去創建一個線程,這樣實現起來非常簡便,但是就會有一個問題:
如果並發的線程數量很多,並且每個線程都是執行一個時間很短的任務就結束了,這樣頻繁創建線程就會大大降低系統的效率,因為頻繁創建線程和銷毀線程需要時間。
那么有沒有一種辦法使得線程可以復用,就是執行完一個任務,並不被銷毀,而是可以繼續執行其他的任務?
在Java中可以通過線程池來達到這樣的效果。下面我們就來詳細講解一下Java的線程池,首先我們從最核心的ThreadPoolExecutor類中的方法講起,然后再講述它的實現原理,接着給出了它的使用示例,最后討論了一下如何合理配置線程池的大小。
本文的目錄大綱:
一.Java中的ThreadPoolExecutor類
二.深入剖析線程池實現原理
三.使用示例
四.如何合理配置線程池的大小
若有不正之處請多多諒解,並歡迎批評指正。
一.Java中的ThreadPoolExecutor類
java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,因此如果要透徹地了解Java中的線程池,必須先了解這個類。下面我們來看一下ThreadPoolExecutor類的具體實現源碼。
在ThreadPoolExecutor類中提供了四個構造方法:
public class ThreadPoolExecutor extends AbstractExecutorService { ..... public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
從上面的代碼可以得知,ThreadPoolExecutor繼承了AbstractExecutorService類,並提供了四個構造器,事實上,通過觀察每個構造器的源碼具體實現,發現前面三個構造器都是調用的第四個構造器進行的初始化工作。
下面解釋下一下構造器中各個參數的含義:
- corePoolSize:核心池的大小,這個參數與后面講述的線程池的實現原理有非常大的關系。在創建了線程池后,默認情況下,線程池中並沒有任何線程,而是等待有任務到來才創建線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預創建線程的意思,即在沒有任務到來之前就創建corePoolSize個線程或者一個線程。默認情況下,在創建了線程池后,線程池中的線程數為0,當有任務來之后,就會創建一個線程去執行任務,當線程池中的線程數目達到corePoolSize后,就會把到達的任務放到緩存隊列當中;
- maximumPoolSize:線程池最大線程數,它表示在線程池中最多能創建多少個線程;
- keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。默認情況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大於corePoolSize:即當線程池中的線程數大於corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize;但是如果調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;
- unit:參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性,以及TimeUtil的源碼展示:
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小時 TimeUnit.MINUTES; //分鍾 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //納秒
public enum TimeUnit { NANOSECONDS { public long toNanos(long d) { return d; } public long toMicros(long d) { return d/(C1/C0); } public long toMillis(long d) { return d/(C2/C0); } public long toSeconds(long d) { return d/(C3/C0); } public long toMinutes(long d) { return d/(C4/C0); } public long toHours(long d) { return d/(C5/C0); } public long toDays(long d) { return d/(C6/C0); } public long convert(long d, TimeUnit u) { return u.toNanos(d); } int excessNanos(long d, long m) { return (int)(d - (m*C2)); } }, MICROSECONDS { public long toNanos(long d) { return x(d, C1/C0, MAX/(C1/C0)); } public long toMicros(long d) { return d; } public long toMillis(long d) { return d/(C2/C1); } public long toSeconds(long d) { return d/(C3/C1); } public long toMinutes(long d) { return d/(C4/C1); } public long toHours(long d) { return d/(C5/C1); } public long toDays(long d) { return d/(C6/C1); } public long convert(long d, TimeUnit u) { return u.toMicros(d); } int excessNanos(long d, long m) { return (int)((d*C1) - (m*C2)); } }, MILLISECONDS { public long toNanos(long d) { return x(d, C2/C0, MAX/(C2/C0)); } public long toMicros(long d) { return x(d, C2/C1, MAX/(C2/C1)); } public long toMillis(long d) { return d; } public long toSeconds(long d) { return d/(C3/C2); } public long toMinutes(long d) { return d/(C4/C2); } public long toHours(long d) { return d/(C5/C2); } public long toDays(long d) { return d/(C6/C2); } public long convert(long d, TimeUnit u) { return u.toMillis(d); } int excessNanos(long d, long m) { return 0; } }, SECONDS { public long toNanos(long d) { return x(d, C3/C0, MAX/(C3/C0)); } public long toMicros(long d) { return x(d, C3/C1, MAX/(C3/C1)); } public long toMillis(long d) { return x(d, C3/C2, MAX/(C3/C2)); } public long toSeconds(long d) { return d; } public long toMinutes(long d) { return d/(C4/C3); } public long toHours(long d) { return d/(C5/C3); } public long toDays(long d) { return d/(C6/C3); } public long convert(long d, TimeUnit u) { return u.toSeconds(d); } int excessNanos(long d, long m) { return 0; } }, MINUTES { public long toNanos(long d) { return x(d, C4/C0, MAX/(C4/C0)); } public long toMicros(long d) { return x(d, C4/C1, MAX/(C4/C1)); } public long toMillis(long d) { return x(d, C4/C2, MAX/(C4/C2)); } public long toSeconds(long d) { return x(d, C4/C3, MAX/(C4/C3)); } public long toMinutes(long d) { return d; } public long toHours(long d) { return d/(C5/C4); } public long toDays(long d) { return d/(C6/C4); } public long convert(long d, TimeUnit u) { return u.toMinutes(d); } int excessNanos(long d, long m) { return 0; } }, HOURS { public long toNanos(long d) { return x(d, C5/C0, MAX/(C5/C0)); } public long toMicros(long d) { return x(d, C5/C1, MAX/(C5/C1)); } public long toMillis(long d) { return x(d, C5/C2, MAX/(C5/C2)); } public long toSeconds(long d) { return x(d, C5/C3, MAX/(C5/C3)); } public long toMinutes(long d) { return x(d, C5/C4, MAX/(C5/C4)); } public long toHours(long d) { return d; } public long toDays(long d) { return d/(C6/C5); } public long convert(long d, TimeUnit u) { return u.toHours(d); } int excessNanos(long d, long m) { return 0; } }, DAYS { public long toNanos(long d) { return x(d, C6/C0, MAX/(C6/C0)); } public long toMicros(long d) { return x(d, C6/C1, MAX/(C6/C1)); } public long toMillis(long d) { return x(d, C6/C2, MAX/(C6/C2)); } public long toSeconds(long d) { return x(d, C6/C3, MAX/(C6/C3)); } public long toMinutes(long d) { return x(d, C6/C4, MAX/(C6/C4)); } public long toHours(long d) { return x(d, C6/C5, MAX/(C6/C5)); } public long toDays(long d) { return d; } public long convert(long d, TimeUnit u) { return u.toDays(d); } int excessNanos(long d, long m) { return 0; } }; static final long C0 = 1L; static final long C1 = C0 * 1000L; static final long C2 = C1 * 1000L; static final long C3 = C2 * 1000L; static final long C4 = C3 * 60L; static final long C5 = C4 * 60L; static final long C6 = C5 * 24L; static final long MAX = Long.MAX_VALUE; static long x(long d, long m, long over) { if (d > over) return Long.MAX_VALUE; if (d < -over) return Long.MIN_VALUE; return d * m; } public long convert(long sourceDuration, TimeUnit sourceUnit) { throw new AbstractMethodError(); } public long toNanos(long duration) { throw new AbstractMethodError(); } public long toMicros(long duration) { throw new AbstractMethodError(); } public long toMillis(long duration) { throw new AbstractMethodError(); } public long toSeconds(long duration) { throw new AbstractMethodError(); } public long toMinutes(long duration) { throw new AbstractMethodError(); } public long toHours(long duration) { throw new AbstractMethodError(); } public long toDays(long duration) { throw new AbstractMethodError(); } public void timedWait(Object obj, long timeout) throws InterruptedException { if (timeout > 0) { long ms = toMillis(timeout); int ns = excessNanos(timeout, ms); obj.wait(ms, ns); } } public void timedJoin(Thread thread, long timeout) throws InterruptedException { if (timeout > 0) { long ms = toMillis(timeout); int ns = excessNanos(timeout, ms); thread.join(ms, ns); } } public void sleep(long timeout) throws InterruptedException { if (timeout > 0) { long ms = toMillis(timeout); int ns = excessNanos(timeout, ms); Thread.sleep(ms, ns); } } }
- workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇會對線程池的運行過程產生重大影響,一般來說,這里的阻塞隊列有以下幾種選擇:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關。
- threadFactory:線程工廠,主要用來創建線程;
- handler:表示當拒絕處理任務時的策略,有以下四種取值:
ThreadPoolExecutor.AbortPolicy;//丟棄任務並拋出RejectedExecutionException異常。 ThreadPoolExecutor.DiscardPolicy;//也是丟棄任務,但是不拋出異常。 ThreadPoolExecutor.DiscardOldestPolicy;//丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程) ThreadPoolExecutor.CallerRunsPolicy;//由調用線程處理該任務
具體參數的配置與線程池的關系將在下一節講述。
public class ThreadPoolExecutor extends AbstractExecutorService { ....... public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } /** * Creates a new {@code ThreadPoolExecutor} with the given initial parameters and default rejected execution handler.*/ public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } /** * Creates a new {@code ThreadPoolExecutor} with the given initial parameters and default thread factory.*/ public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } /** * Creates a new {@code ThreadPoolExecutor} with the given initial parameters.*/ public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } ....... }
從ThreadPoolExecutor類的代碼可以知道,ThreadPoolExecutor繼承了AbstractExecutorService,我們來看一下AbstractExecutorService的源碼:
public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { }; protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { }; public Future<?> submit(Runnable task) {}; public <T> Future<T> submit(Runnable task, T result) { }; public <T> Future<T> submit(Callable<T> task) { }; private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { }; public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { }; public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { }; public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { }; public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { }; }
AbstractExecutorService是一個抽象類,它實現了ExecutorService接口。
我們接着看看ExecutorService接口的源代碼:
public interface ExecutorService extends Executor { void shutdown(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
而ExecutorService又是繼承了Executor接口,我們看一下Executor接口的實現:
public interface Executor { void execute(Runnable command); }
到這里,我們就應該明白了ThreadPoolExecutor類、AbstractExecutorService類、ExecutorService接口和Executor接口幾個之間的關系了。
Executor是最頂層接口,在它里面只聲明了一個方法execute(Runnable),返回值為void,參數為Runnable類型,從字面意思可以理解,就是用來執行傳進去的任務的;
ExecutorService接口繼承了Executor接口,並聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
AbstractExecutorService抽象類實現了ExecutorService接口,基本實現了ExecutorService接口中聲明的所有方法;
ThreadPoolExecutor類繼承了抽象類AbstractExecutorService。
在ThreadPoolExecutor類中有幾個非常重要的方法:
public void execute(Runnable command) {...........} public void shutdown() {........} public List<Runnable> shutdownNow() {..............}
繼承AbstractExecutorService抽象類中的方法
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
execute()方法實際上是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向線程池提交一個任務,交由線程池去執行。
submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中並沒有對其進行重寫,這個方法也是用來向線程池提交任務的,但是它和execute()方法不同,它能夠返回任務執行的結果,去看submit()方法的實現,會發現它實際上還是調用的execute()方法,只不過它利用了Future來獲取任務執行結果。
shutdown()和shutdownNow()是用來關閉線程池的。
還有很多其他的方法:
比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關屬性的方法,有興趣的朋友可以自行查閱API。
二.剖析線程池實現原理
以上內容,我們從宏觀上介紹了ThreadPoolExecutor,下面我們來深入分析一下線程池的具體實現原理,將從下面幾個方面講解:
1.線程池狀態 2.任務的執行 3.線程池中的線程初始化 4.任務緩存隊列及排隊策略
5.任務拒絕策略 6.線程池的關閉 7.線程池容量的動態調整
1.線程池狀態
在ThreadPoolExecutor中定義了一個volatile變量,另外定義了幾個static final變量表示線程池的各個狀態:
// runState is stored in the high-order bits
volatile int runState;
private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
runState表示當前線程池的狀態,它是一個volatile變量用來保證線程之間的可見性;
下面的幾個static final變量表示runState可能的幾個取值。
當創建線程池后,初始時,線程池處於RUNNING狀態;
如果調用了shutdown()方法,則線程池處於SHUTDOWN狀態,此時線程池不能夠接受新的任務,它會等待所有任務執行完畢;
如果調用了shutdownNow()方法,則線程池處於STOP狀態,此時線程池不能接受新的任務,並且會去嘗試終止正在執行的任務;
當線程池處於SHUTDOWN或STOP狀態,並且所有工作線程已經銷毀,任務緩存隊列已經清空或執行結束后,線程池被設置為TERMINATED狀態。
2.任務的執行
在了解將任務提交給線程池到任務執行完畢整個過程之前,我們先來看一下ThreadPoolExecutor類中其他的一些比較重要成員變量:
private final BlockingQueue<Runnable> workQueue; //任務緩存隊列,用來存放等待執行的任務 private final ReentrantLock mainLock = new ReentrantLock(); //線程池的主要狀態鎖,對線程池狀態(比如線程池大小 //、runState等)的改變都要使用這個鎖 private final HashSet<Worker> workers = new HashSet<Worker>(); //用來存放工作集 private volatile long keepAliveTime; //線程存貨時間 private volatile boolean allowCoreThreadTimeOut; //是否允許為核心線程設置存活時間 private volatile int corePoolSize; //核心池的大小(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列) private volatile int maximumPoolSize; //線程池最大能容忍的線程數 private volatile int poolSize; //線程池中當前的線程數 private volatile RejectedExecutionHandler handler; //任務拒絕策略 private volatile ThreadFactory threadFactory; //線程工廠,用來創建線程 private int largestPoolSize; //用來記錄線程池中曾經出現過的最大線程數 private long completedTaskCount; //用來記錄已經執行完畢的任務個數
每個變量都有其各自的作用,這里要重點解釋一下corePoolSize、maximumPoolSize、largestPoolSize三個變量。
corePoolSize在很多地方被翻譯成核心池大小,其實我的理解這個就是線程池的大小。舉個簡單的例子:
假如有一個工廠,工廠里面有10個工人,每個工人同時只能做一件任務。
因此只要當10個工人中有工人是空閑的,來了任務就分配給空閑的工人做;
當10個工人都有任務在做時,如果還來了任務,就把任務進行排隊等待;
如果說新任務數目增長的速度遠遠大於工人做任務的速度,那么此時工廠主管可能會想補救措施,比如重新招4個臨時工人進來;
然后就將任務也分配給這4個臨時工人做;
如果說着14個工人做任務的速度還是不夠,此時工廠主管可能就要考慮不再接收新的任務或者拋棄前面的一些任務了。
當這14個工人當中有人空閑時,而新任務增長的速度又比較緩慢,工廠主管可能就考慮辭掉4個臨時工了,只保持原來的10個工人,畢竟請額外的工人是要花錢的。
這個例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。
也就是說corePoolSize就是線程池大小,maximumPoolSize在我看來是線程池的一種補救措施,即任務量突然過大時的一種補救措施。
不過為了方便理解,在本文后面還是將corePoolSize翻譯成核心池大小。
largestPoolSize只是一個用來起記錄作用的變量,用來記錄線程池中曾經有過的最大線程數目,跟線程池的容量沒有任何關系。
下面我們看一下任務從提交到最終執行完畢經歷了哪些過程。
JDK1.5 execute()方法實現原理
在ThreadPoolExecutor類中,最核心的任務提交方法是execute()方法,雖然通過submit也可以提交任務,但是實際上submit方法里面最終調用的還是execute()方法,所以我們只需要研究execute()方法的實現原理即可:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } }
上面的代碼可能看起來不是那么容易理解,下面我們一句一句解釋:
首先,判斷提交的任務command是否為null,若是null,則拋出空指針異常;
接着是這句,這句要好好理解一下:
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
由於是或條件運算符,所以先計算前半部分的值,如果線程池中當前線程數不小於核心池大小,那么就會直接進入下面的if語句塊了。
如果線程池中當前線程數小於核心池大小,則接着執行后半部分,也就是執行
addIfUnderCorePoolSize(command)
如果執行完addIfUnderCorePoolSize這個方法返回false,則繼續執行下面的if語句塊,否則整個方法就直接執行完畢了。
如果執行完addIfUnderCorePoolSize這個方法返回false,然后接着判斷:
if (runState == RUNNING && workQueue.offer(command))
如果當前線程池處於RUNNING狀態,則將任務放入任務緩存隊列;如果當前線程池不處於RUNNING狀態或者任務放入緩存隊列失敗,則執行:
addIfUnderMaximumPoolSize(command)
如果執行addIfUnderMaximumPoolSize方法失敗,則執行reject()方法進行任務拒絕處理。
回到前面:
if (runState == RUNNING && workQueue.offer(command))
這句的執行,如果說當前線程池處於RUNNING狀態且將任務放入任務緩存隊列成功,則繼續進行判斷:
if (runState != RUNNING || poolSize == 0)
這句判斷是為了防止在將此任務添加進任務緩存隊列的同時其他線程突然調用shutdown或者shutdownNow方法關閉了線程池的一種應急措施。如果是這樣就執行:
ensureQueuedTaskHandled(command)
進行應急處理,從名字可以看出是保證 添加到任務緩存隊列中的任務得到處理。
我們接着看2個關鍵方法的實現:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:
private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); //創建線程去執行firstTask任務 } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
這個是addIfUnderCorePoolSize方法的具體實現,從名字可以看出它的意圖就是當低於核心池大小時執行的方法。下面看其具體實現,首先獲取到鎖,因為這地方涉及到線程池狀態的變化,先通過if語句判斷當前線程池中的線程數目是否小於核心池大小,有朋友也許會有疑問:前面在execute()方法中不是已經判斷過了嗎,只有線程池當前線程數目小於核心池大小才會執行addIfUnderCorePoolSize方法的,為何這地方還要繼續判斷?原因很簡單,前面的判斷過程中並沒有加鎖,因此可能在execute方法判斷的時候poolSize小於corePoolSize,而判斷完之后,在其他線程中又向線程池提交了任務,就可能導致poolSize不小於corePoolSize了,所以需要在這個地方繼續判斷。然后接着判斷線程池的狀態是否為RUNNING,原因也很簡單,因為有可能在其他線程中調用了shutdown或者shutdownNow方法。然后就是執行
t = addThread(firstTask);
這個方法也非常關鍵,傳進去的參數為提交的任務,返回值為Thread類型。然后接着在下面判斷t是否為空,為空則表明創建線程失敗(即poolSize>=corePoolSize或者runState不等於RUNNING),否則調用t.start()方法啟動線程。
我們來看一下addThread方法的實現:
private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w); //創建一個線程,執行任務 if (t != null) { w.thread = t; //將創建的線程的引用賦值為w的成員變量 workers.add(w); int nt = ++poolSize; //當前線程數加1 if (nt > largestPoolSize) largestPoolSize = nt; } return t; }
在addThread方法中,首先用提交的任務創建了一個Worker對象,然后調用線程工廠threadFactory創建了一個新的線程t,然后將線程t的引用賦值給了Worker對象的成員變量thread,接着通過workers.add(w)將Worker對象添加到工作集當中。
JDK1.7版本execute()方法實現略有不同
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
下面我們一句一句理解:
首先,判斷提交的任務command是否為null,若是null,則拋出空指針異常;
接着是這句,這句要好好理解一下:
if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); }
如果線程池中當前線程數<核心池大小,就么就不進入執行以下代碼,如果小於核心池大小,線程正在運行,嘗試將給定的命令作為首要任務啟動一個新線程。通知addworker自動檢查runstate和workercount,所以添加線程時,防止誤報,它不應該,通過返回false。
官方這么解釋
/*If fewer than corePoolSize threads are running, try to start a new thread with the given command as its first task. The call to addWorker atomic ally checks runState and workerCount, and so prevents false alarms that would add threads when it shouldn't, by returning false.
*/
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); }
如果線程池中當前線程數>=核心池大小,就么就不進入執行以上代碼,官方解釋:
/* If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*/
else if (!addWorker(command, false)) reject(command);
官方解釋
/*If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task
*/
下面我們看一下Worker類的源碼,在1.5JDK中Worker類只實現了Runnable接口:
public class ThreadPoolExecutor extends AbstractExecutorService {
.........省略部分代碼....... private final class Worker implements Runnable { private final ReentrantLock runLock = new ReentrantLock(); private Runnable firstTask; volatile long completedTasks; Thread thread; Worker(Runnable firstTask) { this.firstTask = firstTask; } boolean isActive() { return runLock.isLocked(); } void interruptIfIdle() { final ReentrantLock runLock = this.runLock; if (runLock.tryLock()) { try { if (thread != Thread.currentThread()) thread.interrupt(); } finally { runLock.unlock(); } } } void interruptNow() { thread.interrupt(); } private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { if (runState < STOP && Thread.interrupted() && runState >= STOP) boolean ran = false; beforeExecute(thread, task); //beforeExecute方法是ThreadPoolExecutor類的一個方法,沒有具體實現,用戶可以根據 //自己需要重載這個方法和后面的afterExecute方法來進行一些統計信息,比如某個任務的執行時間等 try { task.run(); ran = true; afterExecute(task, null); ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock(); } } public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); //當任務隊列中沒有任務時,進行清理工作 } } }
..........省略部分代碼........ }
它實際上實現了Runnable接口,因此上面的Thread t = threadFactory.newThread(w);效果跟下面這句的效果基本一樣:
Thread t = new Thread(w);
相當於傳進去了一個Runnable任務,在線程t中執行這個Runnable。
既然Worker實現了Runnable接口,那么自然最核心的方法便是run()方法了:
public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } }
從run方法的實現可以看出,它首先執行的是通過構造器傳進來的任務firstTask,在調用runTask()執行完firstTask之后,在while循環里面不斷通過getTask()去取新的任務來執行,那么去哪里取呢?自然是從任務緩存隊列里面去取,getTask是ThreadPoolExecutor類中的方法,並不是Worker類中的方法,下面是getTask方法的實現:
Runnable getTask() { for (;;) { try { int state = runState; if (state > SHUTDOWN) return null; Runnable r; if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果線程數大於核心池大小或者允許為核心池線程設置空閑時間, //則通過poll取任務,若等待一定的時間取不到任務,則返回null r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take(); if (r != null) return r; if (workerCanExit()) { //如果沒取到任務,即r為null,則判斷當前的worker是否可以退出 if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); //中斷處於空閑狀態的worker return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } } }
在getTask中,先判斷當前線程池狀態,如果runState大於SHUTDOWN(即為STOP或者TERMINATED),則直接返回null。
如果runState為SHUTDOWN或者RUNNING,則從任務緩存隊列取任務。
如果當前線程池的線程數大於核心池大小corePoolSize或者允許為核心池中的線程設置空閑存活時間,則調用poll(time,timeUnit)來取任務,這個方法會等待一定的時間,如果取不到任務就返回null。
然后判斷取到的任務r是否為null,為null則通過調用workerCanExit()方法來判斷當前worker是否可以退出,我們看一下workerCanExit()的實現:
private boolean workerCanExit() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); boolean canExit; //如果runState大於等於STOP,或者任務緩存隊列為空了 //或者 允許為核心池線程設置空閑存活時間並且線程池中的線程數目大於1 try { canExit = runState >= STOP || workQueue.isEmpty() || (allowCoreThreadTimeOut && poolSize > Math.max(1, corePoolSize)); } finally { mainLock.unlock(); } return canExit; }
也就是說如果線程池處於STOP狀態、或者任務隊列已為空或者允許為核心池線程設置空閑存活時間並且線程數大於1時,允許worker退出。如果允許worker退出,則調用interruptIdleWorkers()中斷處於空閑狀態的worker,我們看一下interruptIdleWorkers()的實現:
void interruptIdleWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) //實際上調用的是worker的interruptIfIdle()方法 w.interruptIfIdle(); } finally { mainLock.unlock(); } }
從實現可以看出,它實際上調用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中:
void interruptIfIdle() { final ReentrantLock runLock = this.runLock; if (runLock.tryLock()) { //注意這里,是調用tryLock()來獲取鎖的,因為如果當前worker正在執行任務,鎖已經被獲取了,是無法獲取到鎖的 //如果成功獲取了鎖,說明當前worker處於空閑狀態 try { if (thread != Thread.currentThread()) thread.interrupt(); } finally { runLock.unlock(); } } }
這里有一個非常巧妙的設計方式,假如我們來設計線程池,可能會有一個任務分派線程,當發現有線程空閑時,就從任務緩存隊列中取一個任務交給空閑線程執行。但是在這里,並沒有采用這樣的方式,因為這樣會要額外地對任務分派線程進行管理,無形地會增加難度和復雜度,這里直接讓執行完任務的線程去任務緩存隊列里面取任務來執行。
我們再看addIfUnderMaximumPoolSize方法的實現,這個方法的實現思想和addIfUnderCorePoolSize方法的實現思想非常相似,唯一的區別在於addIfUnderMaximumPoolSize方法是在線程池中的線程數達到了核心池大小並且往任務隊列中添加任務失敗的情況下執行的:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < maximumPoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
看到沒有,其實它和addIfUnderCorePoolSize方法的實現基本一模一樣,只是if語句判斷條件中的poolSize < maximumPoolSize不同而已。
到這里,大部分朋友應該對任務提交給線程池之后到被執行的整個過程有了一個基本的了解,下面總結一下:
1)首先,要清楚corePoolSize和maximumPoolSize的含義;
2)其次,要知道Worker是用來起到什么作用的;
3)要知道任務提交給線程池之后的處理策略,這里總結一下主要有4點:
- 如果當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;
- 如果當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程將其取出去執行;若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務;
- 如果當前線程池中的線程數目達到maximumPoolSize,則會采取任務拒絕策略進行處理;
- 如果線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;如果允許為核心池中的線程設置存活時間,那么核心池中的線程空閑時間超過keepAliveTime,線程也會被終止。
JDK1.7 Worker作為ThreadPoolExecutor在繼承AbstractQueuedSynchronizer類也實現了Runnable接口
public class ThreadPoolExecutor extends AbstractExecutorService {
........................ private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks;
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
...................... }
3.線程池中的線程初始化
默認情況下,創建線程池之后,線程池中是沒有線程的,需要提交任務之后才會創建線程。
在實際中如果需要線程池創建之后立即創建線程,可以通過以下兩個方法辦到:
- prestartCoreThread():初始化一個核心線程;
- prestartAllCoreThreads():初始化所有核心線程
下面是這2個方法的實現:
public boolean prestartCoreThread() { return addIfUnderCorePoolSize(null); //注意傳進去的參數是null } public int prestartAllCoreThreads() { int n = 0; while (addIfUnderCorePoolSize(null))//注意傳進去的參數是null ++n; return n; }
注意上面傳進去的參數是null,根據第2小節的分析可知如果傳進去的參數為null,則最后執行線程會阻塞在getTask方法中的
r = workQueue.take();
即等待任務隊列中有任務。
4.任務緩存隊列及排隊策略
在前面我們多次提到了任務緩存隊列,即workQueue,它用來存放等待執行的任務。
workQueue的類型為BlockingQueue<Runnable>,通常可以取下面三種類型:
- ArrayBlockingQueue:基於數組的先進先出隊列,此隊列創建時必須指定大小;
- LinkedBlockingQueue:基於鏈表的先進先出隊列,如果創建時沒有指定此隊列大小,則默認為Integer.MAX_VALUE;
- synchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。
5.任務拒絕策略
當線程池的任務緩存隊列已滿並且線程池中的線程數目達到maximumPoolSize,如果還有任務到來就會采取任務拒絕策略,通常有以下四種策略:
ThreadPoolExecutor.AbortPolicy;//丟棄任務並拋出RejectedExecutionException異常。 ThreadPoolExecutor.DiscardPolicy;//也是丟棄任務,但是不拋出異常。 ThreadPoolExecutor.DiscardOldestPolicy;//丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程) ThreadPoolExecutor.CallerRunsPolicy;//由調用線程處理該任務
6.線程池的關閉
ThreadPoolExecutor提供了兩個方法,用於線程池的關閉,分別是shutdown()和shutdownNow(),其中:
- shutdown():不會立即終止線程池,而是要等所有任務緩存隊列中的任務都執行完后才終止,但再也不會接受新的任務
- shutdownNow():立即終止線程池,並嘗試打斷正在執行的任務,並且清空任務緩存隊列,返回尚未執行的任務
7.線程池容量的動態調整
ThreadPoolExecutor提供了動態調整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
- setCorePoolSize:設置核心池大小
- setMaximumPoolSize:設置線程池最大能創建的線程數目大小
當上述參數從小變大時,ThreadPoolExecutor進行線程賦值,還可能立即創建新的線程來執行任務。
三.使用示例
前面我們討論了關於線程池的實現原理,這一節我們來看一下它的具體使用:
public class Test { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5)); for(int i=0;i<15;i++){ MyTask myTask = new MyTask(i); executor.execute(myTask); System.out.println("線程池中線程數目:"+executor.getPoolSize()+",隊列中等待執行的任務數目:"+ executor.getQueue().size()+",已執行玩別的任務數目:"+executor.getCompletedTaskCount()); } executor.shutdown(); } } class MyTask implements Runnable { private int taskNum; public MyTask(int num) { this.taskNum = num; } @Override public void run() { System.out.println("正在執行task "+taskNum); try { Thread.currentThread().sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("task "+taskNum+"執行完畢"); } }
執行結果:
線程池中線程數目:1,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
線程池中線程數目:2,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 1
線程池中線程數目:3,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 2
線程池中線程數目:4,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 3
線程池中線程數目:5,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 4
線程池中線程數目:5,隊列中等待執行的任務數目:1,已執行玩別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:2,已執行玩別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:3,已執行玩別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:4,已執行玩別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
線程池中線程數目:6,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 10
線程池中線程數目:7,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 11
線程池中線程數目:8,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 12
線程池中線程數目:9,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
線程池中線程數目:10,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 14
正在執行task 13
task 0執行完畢
正在執行task 5
task 1執行完畢
正在執行task 6
task 4執行完畢
正在執行task 7
task 2執行完畢
正在執行task 8
task 10執行完畢
正在執行task 9
task 3執行完畢
task 12執行完畢
task 11執行完畢
task 13執行完畢
task 14執行完畢
task 5執行完畢
task 6執行完畢
task 8執行完畢
task 7執行完畢
task 9執行完畢
從執行結果可以看出,當線程池中線程的數目大於5時,便將任務放入任務緩存隊列里面,當任務緩存隊列滿了之后,便創建新的線程。如果上面程序中,將for循環中改成執行20個任務,就會拋出任務拒絕異常了。
不過在java doc中,並不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態方法來創建線程池:
Executors.newCachedThreadPool(); //創建一個緩沖池,緩沖池容量大小為Integer.MAX_VALUE Executors.newSingleThreadExecutor(); //創建容量為1的緩沖池 Executors.newFixedThreadPool(int); //創建固定容量大小的緩沖池
下面是這三個靜態方法的具體實現;
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
從它們的具體實現來看,它們實際上也是調用了ThreadPoolExecutor,只不過參數都已配置好了。
newFixedThreadPool創建的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
newSingleThreadExecutor將corePoolSize和maximumPoolSize都設置為1,也使用的LinkedBlockingQueue;
newCachedThreadPool將corePoolSize設置為0,將maximumPoolSize設置為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就創建線程運行,當線程空閑超過60秒,就銷毀線程。
實際中,如果Executors提供的三個靜態方法能滿足要求,就盡量使用它提供的三個方法,因為自己去手動配置ThreadPoolExecutor的參數有點麻煩,要根據實際任務的類型和數量來進行配置。
另外,如果ThreadPoolExecutor達不到要求,可以自己繼承ThreadPoolExecutor類進行重寫。
四.如何合理配置線程池的大小
本節來討論一個比較重要的話題:如何合理配置線程池大小,僅供參考。
一般需要根據任務的類型來配置線程池大小:
如果是CPU密集型任務,就需要盡量壓榨CPU,參考值可以設為 NCPU+1
如果是IO密集型任務,參考值可以設置為2*NCPU
當然,這只是一個參考值,具體的設置還需要根據實際情況進行調整,比如可以先將線程池大小設置為參考值,再觀察任務運行情況和系統負載、資源利用率來進行適當調整。
五.線程池的使用舉例
代碼實現中並沒有實現任務接口,而是把Runnable對象加入到線程池管理器(ThreadPool),然后剩下的事情就由線程池管理器(ThreadPool)來完成了
package mine.util.thread; import java.util.LinkedList; import java.util.List; /** * 線程池類,線程管理器:創建線程,執行任務,銷毀線程,獲取線程基本信息 */ public final class ThreadPool { // 線程池中默認線程的個數為5 private static int worker_num = 5; // 工作線程 private WorkThread[] workThrads; // 未處理的任務 private static volatile int finished_task = 0; // 任務隊列,作為一個緩沖,List線程不安全 private List<Runnable> taskQueue = new LinkedList<Runnable>(); private static ThreadPool threadPool; // 創建具有默認線程個數的線程池 private ThreadPool() { this(5); } // 創建線程池,worker_num為線程池中工作線程的個數 private ThreadPool(int worker_num) { ThreadPool.worker_num = worker_num; workThrads = new WorkThread[worker_num]; for (int i = 0; i < worker_num; i++) { workThrads[i] = new WorkThread(); workThrads[i].start();// 開啟線程池中的線程 } } // 單態模式,獲得一個默認線程個數的線程池 public static ThreadPool getThreadPool() { return getThreadPool(ThreadPool.worker_num); } // 單態模式,獲得一個指定線程個數的線程池,worker_num(>0)為線程池中工作線程的個數 // worker_num<=0創建默認的工作線程個數 public static ThreadPool getThreadPool(int worker_num1) { if (worker_num1 <= 0) worker_num1 = ThreadPool.worker_num; if (threadPool == null) threadPool = new ThreadPool(worker_num1); return threadPool; } // 執行任務,其實只是把任務加入任務隊列,什么時候執行有線程池管理器覺定 public void execute(Runnable task) { synchronized (taskQueue) { taskQueue.add(task); taskQueue.notify(); } } // 批量執行任務,其實只是把任務加入任務隊列,什么時候執行有線程池管理器覺定 public void execute(Runnable[] task) { synchronized (taskQueue) { for (Runnable t : task) taskQueue.add(t); taskQueue.notify(); } } // 批量執行任務,其實只是把任務加入任務隊列,什么時候執行有線程池管理器覺定 public void execute(List<Runnable> task) { synchronized (taskQueue) { for (Runnable t : task) taskQueue.add(t); taskQueue.notify(); } } // 銷毀線程池,該方法保證在所有任務都完成的情況下才銷毀所有線程,否則等待任務完成才銷毀 public void destroy() { while (!taskQueue.isEmpty()) {// 如果還有任務沒執行完成,就先睡會吧 try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } // 工作線程停止工作,且置為null for (int i = 0; i < worker_num; i++) { workThrads[i].stopWorker(); workThrads[i] = null; } threadPool=null; taskQueue.clear();// 清空任務隊列 } // 返回工作線程的個數 public int getWorkThreadNumber() { return worker_num; } // 返回已完成任務的個數,這里的已完成是只出了任務隊列的任務個數,可能該任務並沒有實際執行完成 public int getFinishedTasknumber() { return finished_task; } // 返回任務隊列的長度,即還沒處理的任務個數 public int getWaitTasknumber() { return taskQueue.size(); } // 覆蓋toString方法,返回線程池信息:工作線程個數和已完成任務個數 @Override public String toString() { return "WorkThread number:" + worker_num + " finished task number:" + finished_task + " wait task number:" + getWaitTasknumber(); } /** * 內部類,工作線程 */ private class WorkThread extends Thread { // 該工作線程是否有效,用於結束該工作線程 private boolean isRunning = true; /* * 關鍵所在啊,如果任務隊列不空,則取出任務執行,若任務隊列空,則等待 */ @Override public void run() { Runnable r = null; while (isRunning) {// 注意,若線程無效則自然結束run方法,該線程就沒用了 synchronized (taskQueue) { while (isRunning && taskQueue.isEmpty()) {// 隊列為空 try { taskQueue.wait(20); } catch (InterruptedException e) { e.printStackTrace(); } } if (!taskQueue.isEmpty()) r = taskQueue.remove(0);// 取出任務 } if (r != null) { r.run();// 執行任務 } finished_task++; r = null; } } // 停止工作,讓該線程自然執行完run方法,自然結束 public void stopWorker() { isRunning = false; } } }
測試代碼:
package mine.util.thread; //測試線程池 public class TestThreadPool { public static void main(String[] args) { // 創建3個線程的線程池 ThreadPool t = ThreadPool.getThreadPool(3); t.execute(new Runnable[] { new Task(), new Task(), new Task() }); t.execute(new Runnable[] { new Task(), new Task(), new Task() }); System.out.println(t); t.destroy();// 所有線程都執行完成才destory System.out.println(t); } // 任務類 static class Task implements Runnable { private static volatile int i = 1; @Override public void run() {// 執行任務 System.out.println("任務 " + (i++) + " 完成"); } } }
運行結果
WorkThread number:3 finished task number:0 wait task number:6 任務 1 完成 任務 2 完成 任務 3 完成 任務 4 完成 任務 5 完成 任務 6 完成 WorkThread number:3 finished task number:6 wait task number:0
分析:由於並沒有任務接口,傳入的可以是自定義的任何任務,所以線程池並不能准確的判斷該任務是否真正的已經完成(真正完成該任務是這個任務的run方法執行完畢),只能知道該任務已經出了任務隊列,正在執行或者已經完成。
JAVA類庫中提供的線程池簡介:
java提供的線程池更加強大,相信理解線程池的工作原理,看類庫中的線程池就不會感到陌生了。


executors類源碼
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
.......還有一些內部類.............
executorService接口繼承了executor接口
excutors接口定義了返回executorService對象的靜態方法
此文章感謝作者--海子:http://www.cnblogs.com/dolphin0520/p/3932921.html
--Hsuxu:http://blog.csdn.net/hsuxu/article/details/8985931
