在面向對象編程中,對象創建和銷毀是很費時間的,因為創建一個對象要獲取內存資源或者其它更多資源。在Java中更是如此,虛擬機將試圖跟蹤每一個對象,以便能夠在對象銷毀后進行垃圾回收。所以提高服務程序效率的一個手段就是盡可能減少創建和銷毀對象的次數,特別是對一些很耗資源的對象創建和銷毀。如何利用已有對象來服務就是一個需要解決的關鍵問題,其實這就是一些"池化資源"技術產生的原因。比如大家所熟悉的數據庫連接池就是遵循這一思想而產生的,下面將介紹的線程池技術同樣符合這一思想。
多線程技術主要解決處理器單元內多個線程執行的問題,它可以顯著減少處理器單元的閑置時間,增加處理器單元的吞吐能力。但如果對多線程應用不當,會增加對單個任務的處理時間。可以舉一個簡單的例子:
假設一台服務器完成一項任務的時間為T
T1 創建線程的時間
T2 在線程中執行任務的時間,包括線程間同步所需時間
T3 線程銷毀的時間
顯然T = T1+T2+T3。注意這是一個極度簡化的假設。
可以看出T1,T3是多線程本身附加的開銷,用戶希望減少T1,T3所用的時間,從而減少T的時間。但一些線程的使用者並沒有注意到這一點,所以在應用程序中頻繁的創建或銷毀線程,這導致T1和T3在T中占有非常大的比例。
線程池技術正是關注如何縮短或調整T1,T3時間的技術,從而提高服務器程序性能的。它把T1,T3分別安排在服務器程序的啟動和結束的時間段或者一些空閑的時間段,這樣在服務器程序處理客戶請求時,不會有T1,T3的開銷了,線程池不僅調整T1,T3產生的時間,而且它還顯著減少了創建線程的數目。在看一個例子:
假設一台服務器每天大約要處理100000個請求,並且每個請求需要一個單獨的線程完成,這是一個很常用的場景。在線程池中,線程數量一般是固定的,所以產生線程總數不會超過線程池中線程的數目或者上限,而如果服務器不利用線程池來處理這些請求則線程總數為100000。一般線程池尺寸是遠小於100000。所以利用線程池的服務器程序不會為了創建100000而在處理請求時浪費時間,從而提高效率。
線程池是一種多線程處理方法,處理過程中將任務添加到隊列,然后在創建線程后自動啟動這些任務。線程池線程都是后台線程,每個線程都使用默認的堆棧大小,以默認的優先級運行,並處於多線程單元中。如果某個線程處於空閑狀態,則線程池將會調度一個任務給它,.如果所有線程都始終保持繁忙,但將任務放入到一個隊列中,則線程池將在一段時間后創建另一個輔助線程,但線程的數目永遠不會超過最大值。超過最大值的線程可以排隊,但他們要等到其他線程完成后才啟動
線程池主要有如下幾個應用范圍:
1. 需要大量的線程來完成任務,且完成任務的時間比較短,如WEB服務器完成網頁請求這樣的任務。因為單個任務小,而任務數量巨大,比如一個熱門網站的點擊次數。 但對於長時間的任務,比如一個ftp連接請求,線程池的優點就不明顯了。因為ftp會話時間相對於線程的創建時間長多了。
2. 對性能要求苛刻的應用,比如要求服務器迅速相應客戶請求。
3. 接受突發性的大量請求,但不至於使服務器因此產生大量線程的應用。突發性大量客戶請求,在沒有線程池情況下,將產生大量線程,雖然理論上大部分操作系統線程數目最大值不是問題,短時間內產生大量線程可能使內存到達極限。
下面將討論線程池的簡單實現,以說明線程技術優點及應用領域。
線程池的簡單實現
一般一個簡單線程池至少包含下列組成部分。
1. 線程池管理器(ThreadPoolManager):用於創建並管理線程池
2. 工作線程(WorkThread): 線程池中線程
3. 任務接口(Task):每個任務必須實現的接口,以供工作線程調度任務的執行。
4. 任務隊列:用於存放沒有處理的任務。提供一種緩沖機制。
線程池管理器至少有下列功能:創建線程池,銷毀線程池,添加新任務 創建線程池的部分代碼如下:
public class ThreadPoolManager {
private int threadCount; //啟動的線程數
private WorkThread[] handlers; //線程數組
private ArrayList<Runnable> taskVector = new ArrayList<Runnable>(); //任務隊列
ThreadPoolManager(int threadCount) {
this.threadCount = threadCount;
for (int i = 0; i < threadCount; i++) {
handlers[i] = new WorkThread();
handlers[i].start();
}
}
void shutdown() {
synchronized (taskVector) {
while (!taskVector.isEmpty())
taskVector.remove(0); //清空任務隊列
}
for (int i = 0; i < threadCount; i++) {
handlers[i] = new WorkThread();
handlers[i].interrupt(); //結束線程
}
}
void execute(Runnable newTask) { //增加新任務
synchronized (taskVector) {
taskVector.add(newTask);
taskVector.notifyAll();
}
}
private class WorkThread extends Thread {
public void run() {
Runnable task = null;
for (;;) {
synchronized (taskVector) {//獲取一個新任務
if (taskVector.isEmpty())
try {
taskVector.wait();
task = taskVector.remove(0);
} catch (InterruptedException e) {
break;
}
}
task.run();
}
}
}
}
ThreadPoolManager構造函數允許用戶設置啟動的線程數量,並且需要的創建線程。Shutdown函數主要關閉打開的線程和清空還沒有執行的任務,execute函數將任務加入到工作隊列,並且喚醒等待的線程。WorkThread就是實際的工作線程,工作線程是一個可以循環執行任務的線程,在沒有任務時將等待,當有任務時,會被喚醒。任務接口是為所有任務提供統一的接口,以便工作線程處理,在這里我們采用java定義的Runnable接口,用戶可以實現這個接口來完成想要的事務。實現一個線程池需要了解線程的同步機制,這部分將在后面介紹。
Java自帶線程池
自從Java1.5之后,Java 提供了自己的線程池ThreadPoolExecutor和ScheduledThreadPoolExecutor,我們先看類之間的結構圖。
關於線程池的主要類有如下幾部分:
接口::Executor、ExecutorService、ScheduledExecutorService
類:Executors、AbstractExecutorService、ThreadPoolExecutor、ScheduledThreadPoolExecutor
ThreadPoolExecutor
首先看看ThreadPoolExecutor的構造函數,ThreadPoolExecutor提供了幾個構造函數,我們先來參數最全構造函數的含義。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize 線程池維護線程的最少數量
maximumPoolSize 線程池維護線程的最大數量
keepAliveTime 線程池維護線程所允許的空閑時間 ,所以如果任務很多,並且每個任務執行的時間比較短,可以適當調大這個參數來提高線程的利用率。
unit keepAliveTime 參數的單位,可選的單位:天(DAYS),小時(HOURS),分鍾(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS)和納秒(NANOSECONDS)
workQueue 任務隊列,用來存放我們所定義的任務處理線程,BlockingQueue是一種帶鎖的阻塞隊列,我們將在后面專門講解這種數據結構,BlockingQueue有四種選擇:(1)ArrayBlockingQueue,是一種基於數組的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行操作;(2)LinkedBlockingQueue,是一個基於鏈表的阻塞隊列,此隊列也按FIFO (先進先出)對元素進行操作,吞吐量通常要高於ArrayBlockingQueue, Executors.newFixedThreadPool()使用了這種隊列;(3)SynchronousQueue;是一種不存儲元素的阻塞隊列,每個插入操作必須等另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,Executors.newCachedThreadPool使用了這個隊列;(4)PriorityBlockingQueue,是一種具有優先權的阻塞隊列,優先級大的任務可以先執行,用戶由此可以控制任務的執行順序。這四種阻塞隊列都有自己的使用場景,用戶可以根據需要自己決定使用。
threadFactory 創建新線程時使用的工廠,threadFactory有兩種選擇:(1)DefaultThreadFactory,將創建一個同線程組且默認優先級的線程;(2)PrivilegedThreadFactory,使用訪問權限創建一個權限控制的線程。ThreadPoolExecutor默認采用DefaultThreadFactory
handler 由於超出線程范圍和隊列容量而使執行被阻塞時所使用的處理策略,handler有四個選擇:(1)ThreadPoolExecutor.AbortPolicy(),將拋出RejectedExecutionException異常;(2)ThreadPoolExecutor.CallerRunsPolicy(),將重試添加當前的任務,重復調用execute()方法;(3)ThreadPoolExecutor.DiscardOldestPolicy(),將拋棄舊任務;(4)ThreadPoolExecutor.DiscardPolicy,將直接拋棄任務。ThreadPoolExecutor默認采用AbortPolicy。
一個任務通過execute(Runnable)方法被添加到線程池,任務必須是一個 Runnable類型的對象,任務的執行方法就是調用Runnable類型對象的run()方法。當一個任務通過execute(Runnable)方法欲添加到線程池時,會做一下幾步:
1. 如果此時線程池中的數量小於corePoolSize,即使線程池中的線程都處於空閑狀態,也要創建新的線程來處理被添加的任務。
2. 如果此時線程池中的數量大於等於corePoolSize,但是緩沖隊列 workQueue未滿,那么任務被放入緩沖隊列。
3. 如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量小於maximumPoolSize,建新的線程來處理添加的任務。
4. 如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量等於maximumPoolSize,那么通過 handler所指定的策略來處理此任務。也就是處理任務的優先級為:核心線程corePoolSize、任務隊列workQueue、最大線程maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。
5. 當線程池中的線程數量大於corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。這樣,線程池可以動態的調整池中的線程數。
讀者可以參考下面的源代碼,分析execute函數執行的流程:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // 執行handler策略
}
}
當數量少於corePoolSize時的主要流程:
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask); //創建新線程
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
當數量大於corePoolSize,小於maximumPoolSize,且阻塞隊列不能存儲任務時,執行的主要流程:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
如果想在多線程環境中定期執行去執行任務,或者做一些其他事情,用戶可以通過Timer來實現,但是Timer有幾種缺陷:(1)Timer是基於絕對時間的,容易受系統時鍾的影響;(2)imer只新建了一個線程來執行所有的TimeTask,所有TimeTask可能會相關影響;(3)Timer不會捕獲TimerTask的異常,只是簡單地停止。這樣勢必會影響其他TimeTask的執行。JDK提供了一種定時功能的線程池:ScheduledThreadPoolExecutor,它繼承了ThreadPoolExecutor,並且實現了ScheduledExecutorService接口,此接口有如下幾個方法:
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
創建並執行在給定延遲后啟用的一次性操作
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
創建並執行在給定延遲后啟用的一次性操作
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
創建並執行一個在給定初始延遲后首次啟用的定期操作,后續操作具有給定的周期;也就是經過period 后開始執行,即在 initialDelay+period 后執行,接着在 initialDelay + 2 * period 后執行,依此類推。如果任務的任何一個執行遇到異常,則后續執行都會被取消。否則,只能通過執行程序的取消或終止方法來終止該任務。如果此任務的任何一個執行要花費比其周期更長的時間,則將推遲后續執行,但不會同時執行。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
創建並執行一個在給定初始延遲后首次啟用的定期操作,隨后在每一次執行終止和下一次執行開始之間都存在給定的延遲。如果任務的任一執行遇到異常,就會取消后續執行。否則,只能通過執行程序的取消或終止方法來終止該任務。
ScheduledThreadPoolExecutor也提供了幾個構造函數,下面列出的是其中最簡單的一個,只有corePoolSize一個參數。ScheduledThreadPoolExecutor的構造函數僅做的一件事就是調用ThreadPoolExecutor的構造函數,它使用一種帶有延時標記的等待隊列DelayedWorkQueue。DelayedWorkQueue內部使用concurrent包里的DelayQueue,DelayQueue是一個無界的BlockingQueue,用於放置延時Delayed接口的對象,對象只能在其到期時才能從隊列中取走,我們將在專門講解這種數據結構。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
}
要配置一個線程池相對比較復雜,需要了解相關的參數,尤其是對於線程池的原理不是很清楚的情況下,很有可能配置的線程池不是較優的,因此在Executors類里面提供了一些靜態工廠,生成一些常用的線程池:
public static ExecutorService newSingleThreadExecutor()
創建僅有一個線程工作的線程池,相當於單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那么將創建有一個新的線程來替代它。此線程池保證所有任務的執行順序按照任務的提交順序執行。
public static ExecutorService newCachedThreadPool()
創建一個緩存線程池,如果線程池的大小超過了任務所需要的線程,那么就會回收部分空閑(60秒不執行任務)的線程,當任務數增加時,此線程池又動態添加新線程來處理任務。此線程池沒有對線程池大小做限制,線程池大小完全依賴於操作系統(或者說JVM)所能夠創建的最大線程大小。
public static ExecutorService newFixedThreadPool(int nThreads)
創建指定大小的線程池。每次提交一個任務就創建一個線程,直到線程數量達到線程池的最大值。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
類似於newCachedThreadPool,創建一個緩存線程池,此線程池還支持定時以及周期性執行任務。
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
類似於newSingleThreadExecutor,創建一個單線程的線程池,此線程池還支持定時以及周期性執行任務。
下面用兩個例子介紹線程池的使用方法,第一個例子會創建一個固定大小的線程池,第二個例子會創建基於時間線程池。
第一個例子
ExecutorService pool = Executors.newFixedThreadPool(2);
//創建四個任務
Thread t1 = new Task1();
Thread t2 = new Task2();
Thread t3 = new Task3();
Thread t4 = new Task4();
//放入線程池
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.shutdown(); //關閉線程池
第二個例子:
ExecutorService pool = Executors.newScheduledThreadPool(4);
Thread t = new Task();
pool.scheduleAtFixedRate(t,1, 5, TimeUnit.SECONDS);
總結:
1. FixedThreadPool是一個典型且優秀的線程池,它具有線程池的高效率和節省創建線程時所耗的開銷的優點。但是在線程池空閑時,即線程池中沒有可運行任務時,它不會釋放工作線程,還會占用一定的系統資源。
2. CachedThreadPool的特點就是在線程池空閑時,即線程池中沒有可運行任務時,它會釋放工作線程,從而釋放工作線程所占用的資源。但是當出現新任務時,又要創建新的工作線程,這會帶來一定的系統開銷。並且在使用CachedThreadPool時,一定要注意控制任務的數量,否則大量線程同時運行,可能會造成系統癱瘓。