線程復用:線程池



一、核心線程池內部實現
為了能夠更好地控制多線程,JDK提供了一套Executor框架,幫助開發人員有效地進行線程控制,其本質就是一個線程池。它的核心成員如圖

 

以上成員均在java.util.concurrent包中,是JDK並發包的核心類。其中ThreadPoolExecutor表示一個線程池。Executors類則扮演着線程池工廠的角色,通過Executors可以取得一個擁有特定功能的線程池。

Executor框架提供了各種類型的線程池,主要有以下工廠方法:
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newCachedThreadPool()
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

newFixedThreadPool()方法:該方法返回一個固定線程數量的線程池。該線程池中的線程數量始終不變。當有一個新的任務提交時,線程池中若有空閑線程,則立即執行。若沒有,則新的任務會被暫存在一個任務隊列中,待有線程空閑時,便處理在任務隊列中的任務。 •newSingleThreadExecutor()方法:該方法返回一個只有一個線程的線程池。若多余一個任務被提交到該線程池,任務會被保存在一個任務隊列中,待線程空閑,按先入先出的順序執行隊列中的任務。
newCachedThreadPool()方法:該方法返回一個可根據實際情況調整線程數量的線程池。線程池的線程數量不確定,但若有空閑線
程可以復用,則會優先使用可復用的線程。若所有線程均在工作,又有新的任務提交,則會創建新的線程處理任務。所有線程在當前任務執行完畢后,將返回線程池進行復用。
newSingleThreadScheduledExecutor()方法:該方法返回一個ScheduledExecutorSer-vice對象,線程池大小為1。ScheduledEx-ecutorService接口在ExecutorService接口之上擴展了在給定時間執行某任務的功能,如在某個固定的延時之后執行,或者周期性執行某個任務。
newScheduledThreadPool()方法:該方法也返回一個ScheduledExecutorService對象,但該線程池可以指定線程數量。


另外一個值得注意的方法是newSched-uledThreadPool()。它返回一個ScheduledExecu-torService對象,可以根據時間需要對線程進行調度。它會在指定的時間,對任務進行調度。
它的一些主要方法如下:
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay, long delay,TimeUnit unit);

 

對於FixedRate方式來說,任務調度的頻率是一定的。它是以上一個任務開始執行時間為起點,之后的period時間,調度下一次任務。而FixDelay則是在上一個任務結束后,再經過delay時間進行任務調度。
對於FixedRate 周期如果太短,那么任務就會在上一個任務結束后,立即被調用
另外一個值得注意的問題是,調度程序實際上並不保證任務會無限期的持續調用。如果任務本身拋出了異常,那么后續的所有執行都會被中斷,因此,如果你想讓你的任務持續穩定的執行,那么做好異常處理就非常重要,否則,你很有可能觀察到你的調度器無疾而終。

 

ThreadPoolExecutor最重要的構造函數:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler
函數的參數含義如下。
•corePoolSize:指定了線程池中的線程數量。
•maximumPoolSize:指定了線程池中的最大線程數量。
•keepAliveTime:當線程池線程數量超過corePoolSize時,多余的空閑線程的存活時間。即,超過corePoolSize的空閑線程,在多長時間內,會被銷毀。
•unit:keepAliveTime的單位。
•workQueue:任務隊列,被提交但尚未被執行的任務。
•threadFactory:線程工廠,用於創建線程,一般用默認的即可。
•handler:拒絕策略。當任務太多來不及處理,如何拒絕任務。

參數workQueue指被提交但未執行的任務隊列,它是一個BlockingQueue接口的對象,僅用於存放Runnable對象。根據隊列功能分類,在ThreadPoolExecutor的構造函數中可使用以下幾種BlockingQueue。
1.直接提交的隊列:該功能由Syn-chronousQueue對象提供。Syn-chronousQueue是一個特殊的Block-ingQueue。SynchronousQueue沒有容量,每一個插入操作都要等待一個相應的刪除操作,反之,每一個刪除操作都要等待對應的插入操作。如果使用SynchronousQueue,提交的任務不會被真實的保存,而總是將新任務提交給線程執行,如果沒有空閑的進程,則嘗試創建新的進程,如果進程數量已經達到最大值,則執行拒絕策略。因此,使用Syn-chronousQueue隊列,通常要設置很大的maximumPoolSize值,否則很容易執行拒絕策略。
2.有界的任務隊列:有界的任務隊列可以使用ArrayBlockingQueue實現。ArrayBlock-ingQueue的構造函數必須帶一個容量參數,表示該隊列的最大容量,如下所示。public ArrayBlockingQueue(int capacity)當使用有界的任務隊列時,若有新的任務需要執行,如果線程池的實際線程數小於core-PoolSize,則會優先創建新的線程,若大於corePoolSize,則會將新任務加入等待隊列。若等待隊列已滿,無法加入,則在總線程數不大於maximumPoolSize的前提下,創建新的進程執行任務。若大於maximumPoolSize,則執行拒絕策略。可見,有界隊列僅當在任務隊列裝滿時,才可能將線程數提升到core-PoolSize以上,換言之,除非系統非常繁忙,否則確保核心線程數維持在在corePoolSize。
3.無界的任務隊列:無界任務隊列可以通過LinkedBlockingQueue類實現。與有界隊列相比,除非系統資源耗盡,否則無界的任務隊列不存在任務入隊失敗的情況。當有新的任務到來,系統的線程數小於corePoolSize時,線程池會生成新的線程執行任務,但當系統的線程數達到corePoolSize后,就不會繼續增加。若后續仍有新的任務加入,而又沒有空閑的線程資源,則任務直接進入隊列等待。若任務創建和處理的速度差異很大,無界隊列會保持快速增長,直到耗盡系統內存。
4.優先任務隊列:優先任務隊列是帶有執行優先級的隊列。它通過PriorityBlockingQueue實現,可以控制任務的執行先后順序。它是一個特殊的無界隊列。無論是有界隊列Array-BlockingQueue,還是未指定大小的無界隊列LinkedBlockingQueue都是按照先進先出算法處理任務的。而PriorityBlockingQueue則可以根據任務自身的優先級順序先后執行,在確保系統性能的同時,也能有很好的質量保證(總是確保高優先級的任務先執

ThreadPoolExecutor線程池的核心調度代碼,這段代碼也充分體現了上述線程池的工作邏輯:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

代碼第5行的workerCountOf()函數取得了當前線程池的線程總數。當線程總數小於corePool-Size核心線程數時,會將任務通過addWorker()方法直接調度執行。否則,則在第10行代碼處(workQueue.offer())進入等待隊列。如果進入等待隊列失敗(比如有界隊列到達了上限,或者使用了SynchronousQueue),則會執行第17行,將任務直接提交給線程池。如果當前線程數已經達到maximumPoolSize,則提交失敗,就執行第18行的拒絕策略。

 

 


二、超負荷下的拒絕策略

 

JDK內置的拒絕策略如下:
•AbortPolicy策略:該策略會直接拋出異常,阻止系統正常工作。
•CallerRunsPolicy策略:只要線程池未關閉,該策略直接在調用者線程中,運行當前被丟棄的任務。顯然這樣做不會真的丟棄任務,但是,任務提交線程的性能極有可能會急劇下降。
•DiscardOledestPolicy策略:該策略將丟棄最老的一個請求,也就是即將被執行的一個任務,並嘗試再次提交當前任務。 •DiscardPolicy策略:該策略默默地丟棄無法處理的任務,不予任何處理。如果允許任務丟失,我覺得這可能是最好的一種方案了吧!以 上內置的策略均實現了RejectedExecution-Handler接口,若以上策略仍無法滿足實際應用需要,完全可以自己擴展RejectedExecutionHandler接口。


三、ThreadFactory
ThreadFactory是一個接口,它只有一個方法,用來創建線程:Thread newThread(Runnable r);當線程池需要新建線程時,就會調用這個方法。
自定義線程池可以幫助我們做不少事。比如,我們可以跟蹤線程池究竟在何時創建了多少線程,也可以自定義線程的名稱、組以及優先級等信息,甚至可以任性地將所有的線程設置為守護線程。


四、ThreadPoolExecutor線程池擴展
ThreadPoolExecutor也是一個可以擴展的線程池。它提供了beforeExecute()、af-terExecute()和terminated()三個接口對線程池進行控制。
beforeExecute()、afterExecute()和ter-miniated()三個方法。這三個方法分別用於記錄一個任務的開始、結束和整個線程池的退出。
以beforeExecute()、afterExecute()為例,
在ThreadPoolExecutor.Worker.runTask()方法內部提供了這樣的實現:
boolean ran = false;
beforeExecute(thread, task);//運行前
try {
task.run(); //運行任務
ran = true;
afterExecute(task, null); //運行結束后
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex);//運行結束
throw ex;}


五、優化線程池線程數量
在《Java Concurrency in Practice》一書中給出了一個估算線程池大小的經驗公式:
Ncpu=CPU的數量
Ucpu=目標CPU的使用率,0≤Ucpu≤1
W/C=等待時間與計算時間的比率為保持處理器達到期望的使用率,最優的池的大小等於:
Nthreads=Ncpu*Ucpu*(1+W/C)
在Java中,可以通過:Runtime.getRuntime().availableProcessors()取得可用的CPU數量。

 

六、分而治之:Fork/Join框架
在實際使用中,如果毫無顧忌地使用fork()開啟線程進行處理,那么很有可能導致系統開啟過多的線程而嚴重影響性能。所以,在JDK中,給出了一個ForkJoinPool線程池,對於fork()方法並不急着開啟線程,而是提交給ForkJoinPool線程池進行處理,以節省系統資源。fork()用來開啟線程,join()用來等待。
下ForkJoinPool的一個重要的接口:
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
你可以向ForkJoinPool線程池提交一個ForkJoinTask任務。所謂ForkJoinTask任務就是支持fork()分解以及join()等待的任務。ForkJoinTask有兩個重要的子類,RecursiveAction和Recur-siveTask。它們分別表示沒有返回值的任務和可以攜帶返回值的任務。

下面我們簡單地展示Fork/Join框架的使用,這里用來計算數列求和。
01 public class CountTask extends RecursiveTask<Long>{
02 private static final int THRESHOLD = 10000;
03 private long start;
04 private long end;
05
06 public CountTask(long start,long end){
07 this.start=start;
08 this.end=end;
09 }
10
11 public Long compute(){
12 long sum=0;
13 boolean canCompute = (end-start)<THRESHOLD;
14 if(canCompute){
15 for(long i=start;i<=end;i++){
16 sum +=i;
17 }
18 }else{
19 //分成100個小任務
20 long step=(start+end)/100;
21 ArrayList<CountTask> subTasks=new ArrayList<CountTask>();
22 long pos=start;
23 for(int i=0;i<100;i++){
24 long lastOne=pos+step;
25 if(lastOne>end)lastOne=end;
26 CountTask subTask=new CountTask(pos,lastOne);
27 pos+=step+1;
28 subTasks.add(subTask);
29 subTask.fork();
30 }
31 for(CountTask t:subTasks){
32 sum+=t.join();
33 }
34 }
35 return sum;
36 }
37
38 public static void main(String[]args){
39 ForkJoinPool forkJoinPool = new ForkJoinPool();
40 CountTask task = new CountTask(0,200000L);
41 ForkJoinTask<Long> result = forkJoinPool.submit(task);
42 try{
43 long res = result.get();
44 System.out.println("sum="+res);
45 }catch(InterruptedException e){
46 e.printStackTrace();
47 }catch(ExecutionException e){
48 e.printStackTrace();
49 }
50 }
51 }

由於計算數列的和必然是需要函數返回值的,因此選擇RecursiveTask作為任務的模型。上述代碼第39行,建立ForkJoinPool線程池。在第40行,構造一個計算1到200000求和的任務。在第41行將任務提交給線程池,線程池會返回一個攜帶結果的任務,通過get()方法可以得到最終結果(第43行)。如果在執行get()方法時,任務沒有結束,那么主線程就會在get()方法時等待。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM