線程池全面總結


什么是線程池?

  諸如web服務器、數據庫服務器、文件服務器和郵件服務器等許多服務器應用都面向處理來自某些遠程來源的大量短小的任務。構建服務器應用程序的一個過於簡單的模型是:每當一個請求到達就創建一個新的服務對象,然后在新的服務對象中為請求服務。但當有大量請求並發訪問時,服務器不斷的創建和銷毀對象的開銷很大。所以提高服務器效率的一個手段就是盡可能減少創建和銷毀對象的次數,特別是一些很耗資源的對象創建和銷毀,這樣就引入了“池”的概念,“池”的概念使得人們可以定制一定量的資源,然后對這些資源進行復用,而不是頻繁的創建和銷毀。

  線程池是預先創建線程的一種技術。線程池在還沒有任務到來之前,創建一定數量的線程,放入空閑隊列中。這些線程都是處於睡眠狀態,即均為啟動,不消耗CPU,而只是占用較小的內存空間。當請求到來之后,緩沖池給這次請求分配一個空閑線程,把請求傳入此線程中運行,進行處理。當預先創建的線程都處於運行狀態,即預制線程不夠,線程池可以自由創建一定數量的新線程,用於處理更多的請求。當系統比較閑的時候,也可以通過移除一部分一直處於停用狀態的線程。

 

在面向對象編程中,創建和銷毀對象是很費時間的,因為創建一個對象要獲取內存資源或者其它更多資源。在Java中更是如此,虛擬機將試圖跟蹤每一個對象,以便能夠在對象銷毀后進行垃圾回收。

 

所以提高服務程序效率的一個手段就是盡可能減少創建和銷毀對象的次數,特別是一些很耗資源的對象創建和銷毀。如何利用已有對象來服務就是一個需要解決的關鍵問題,其實這就是一些”池化資源”技術產生的原因。

 

在開發程序的過程中,很多時候我們會遇到遇到批量執行任務的場景,當各個具體任務之間互相獨立並不依賴其他任務的時候,我們會考慮使用並發的方式,將各個任務分散到不同的線程中進行執行來提高任務的執行效率。

  我們會想到為每個任務都分配一個線程,但是這樣的做法存在很大的問題:

  1、資源消耗:首先當任務數量龐大的時候,大量線程會占據大量的系統資源,特別是內存,當線程數量大於CPU可用數量時,空閑線程會浪費造成內存的浪費,並加大GC的壓力,大量的線程甚至會直接導致程序的內存溢出,而且大量線程在競爭CPU的時候會帶來額外的性能開銷。如果CPU已經足夠忙碌,再多的線程不僅不會提高性能,反而會降低性能。

  2、線程生命周期的開銷:線程的創建和銷毀都是有代價的,線程的創建需要時間、延遲處理的請求、需要JVM和操作系統提供一些輔助操作。如果請求特別龐大,並且任務的執行特別輕量級(比如只是計算1+1),那么對比下來創建和銷毀線程代價就太昂貴了。

  3、穩定性:如資源消耗中所說如果程序因為大量的線程拋出OutOfMemoryEorror,會導致程序極大的不穩定。

  

  既然為每個任務分配一個線程的做法已經不可行,我們考慮的代替方法中就必須考慮到,1、線程不能不能無限制創建,數量必須有一個合適的上限。2、線程的創建開銷昂貴,那我們可以考慮重用這些線程。理所當然,池化技術是一項比較容易想到的替代方案(馬后炮),線程的池化管理就叫線程池

 

線程池簡介   

  多線程技術主要解決處理器單元內多個線程執行的問題,它可以顯著減少處理器單元的閑置時間,增加處理器單元的吞吐能力。       

  假設一個服務器完成一項任務所需時間為:T1 創建線程時間,T2 在線程中執行任務的時間,T3 銷毀線程時間。

  如果:T1 + T3 遠大於 T2,則可以采用線程池,以提高服務器性能。

     

一個線程池包括以下四個基本組成部分:

        1、線程池管理器(ThreadPool):用於創建並管理線程池,包括創建線程池,銷毀線程池,添加新任務;

        2、工作線程(PoolWorker):線程池中線程,在沒有任務時處於等待狀態,可以循環的執行任務;

        3、任務接口(Task):每個任務必須實現的接口,以供工作線程調度任務的執行,它主要規定了任務的入口,任務執行完后的收尾工作,任務的執行狀態等;

        4、任務隊列(taskQueue):用於存放沒有處理的任務。提供一種緩沖機制。   

             

  線程池技術正是關注如何縮短或調整T1,T3時間的技術,從而提高服務器程序性能的。它把T1,T3分別安排在服務器程序的啟動和結束的時間段或者一些空閑的時間段,這樣在服務器程序處理客戶請求時,不會有T1,T3的開銷了。

  線程池不僅調整T1,T3產生的時間段,而且它還顯著減少了創建線程的數目,看一個例子:

  假設一個服務器一天要處理50000個請求,並且每個請求需要一個單獨的線程完成。在線程池中,線程數一般是固定的,所以產生線程總數不會超過線程池中線程的數目,而如果服務器不利用線程池來處理這些請求則線程總數為50000。一般線程池大小是遠小於50000。所以利用線程池的服務器程序不會為了創建50000而在處理請求時浪費時間,從而提高效率。

  代碼實現中並沒有實現任務接口,而是把Runnable對象加入到線程池管理器(ThreadPool),然后剩下的事情就由線程池管理器(ThreadPool)來完成了。

 

為什么要用線程池:

1、減少了創建和銷毀線程的次數,每個工作線程都可以被重復利用,可執行多個任務。

2、可以根據系統的承受能力,調整線程池中工作線程的數目,防止因為消耗過多的內存而把服務器累趴下(每個線程需要大於1MB內存,線程開的越多,消耗的內存也就越大,最后死機)。

Java里面線程池的頂級接口是Executor,但是嚴格意義上講Executor並不是一個線程池,而只是一個執行線程的工具。真正的線程池接口是ExecutorService。

 

線程池的作用:

線程池作用就是限制系統中執行線程的數量。

     根據系統的環境情況,可以自動或手動設置線程數量,達到運行的最佳效果;少了浪費了系統資源,多了造成系統擁擠效率不高。用線程池控制線程數量,其他線程排隊等候。一個任務執行完畢,再從隊列的中取最前面的任務開始執行。若隊列中沒有等待進程,線程池的這一資源處於等待。當一個新任務需要運行時,如果線程池中有等待的工作線程,就可以開始運行了;否則進入等待隊列。

 

線程池的實現原理

  提交一個任務到線程池中,線程池的處理流程如下:

  1、判斷線程池里的核心線程是否都在執行任務,如果不是(核心線程空閑或者還有核心線程沒有被創建)則創建一個新的工作線程來執行任務。如果核心線程都在執行任務,則進入下個流程。

  2、線程池判斷工作隊列是否已滿,如果工作隊列沒有滿,則將新提交的任務存儲在這個工作隊列里。如果工作隊列滿了,則進入下個流程。

  3、判斷線程池里的線程是否都處於工作狀態,如果沒有,則創建一個新的工作線程來執行任務。如果已經滿了,則交給飽和策略來處理這個任務。

 

使用場景

1、異步處理日志,這個是比較場景的采用線程池來解決的

2、定時任務,定時對數據庫備份、定時更新redis配置等,定時發送郵件

3、數據遷移

這些常見的一些場景我們就應該優先來考慮線程池來解決

線程池本質的概念就是一堆線程一起完成一件事情。

  

線程池技術要點

     從內部實現上看,線程池技術可主要划分為如下6個要點實現: 

工作者線程worker:即線程池中可以重復利用起來執行任務的線程,一個worker的生命周期內會不停的處理多個業務job。線程池“復用”的本質就是復用一個worker去處理多個job,“流控“的本質就是通過對worker數量的控制實現並發數的控制。通過設置不同的參數來控制 worker的數量可以實現線程池的容量伸縮從而實現復雜的業務需求。

待處理工作job的存儲隊列:工作者線程workers的數量是有限的,同一時間最多只能處理最多workers數量個job。對於來不及處理的job需要保存到等待隊列里,空閑的工作者work會不停的讀取空閑隊列里的job進行處理。基於不同的隊列實現,可以擴展出多種功能的線程池,如定制隊列出隊順序實現帶處理優先級的線程池、定制隊列為阻塞有界隊列實現可阻塞能力的線程池等。流控一方面通過控制worker數控制並發數和處理能力,一方面可基於隊列控制線程池處理能力的上限。

線程池初始化:即線程池參數的設定和多個工作者workers的初始化。通常有一開始就初始化指定數量的workers或者有請求時逐步初始化工作者兩種方式。前者線程池啟動初期響應會比較快但造成了空載時的少量性能浪費,后者是基於請求量靈活擴容但犧牲了線程池啟動初期性能達不到最優。

處理業務job算法:業務給線程池添加任務job時線程池的處理算法。有的線程池基於算法識別直接處理job還是增加工作者數處理job或者放入待處理隊列,也有的線程池會直接將job放入待處理隊列,等待工作者worker去取出執行。

workers的增減算法:業務線程數不是持久不變的,有高低峰期。線程池要有自己的算法根據業務請求頻率高低調節自身工作者workers的 數量來調節線程池大小,從而實現業務高峰期增加工作者數量提高響應速度,而業務低峰期減少工作者數來節省服務器資源。增加算法通常基於幾個維度進行:待處 理工作job數、線程池定義的最大最小工作者數、工作者閑置時間。

線程池終止邏輯:應用停止時線程池要有自身的停止邏輯,保證所有job都得到執行或者拋棄。

 

 

線程池的優點:

1.重用線程池中的線程,減少因對象創建,銷毀所帶來的性能開銷;

2.能有效的控制線程的最大並發數,提高系統資源利用率,同時避免過多的資源競爭,避免堵塞;

3.能夠多線程進行簡單的管理,使線程的使用簡單、高效。

4.減少頻繁的創建和銷毀線程(由於線程創建和銷毀都會耗用一定的內存)

5.線程池也是多線程,充分利用CPU,提高系統的效率

6.線程是稀缺資源,使用線程池可以減少創建和銷毀線程的次數,每個工作線程都可以重復使用。

7.可以根據系統的承受能力,調整線程池中工作線程的數量,防止因為消耗過多內存導致服務器崩潰。

8.線程復用

9.控制最大並發數

10.管理線程

 

線程池的工作過程如下:

 線程池剛創建時,里面沒有一個線程。任務隊列是作為參數傳進來的。不過,就算隊列里面有任務,線程池也不會馬上執行它們。

 當調用 execute() 方法添加一個任務時,線程池會做如下判斷:

 如果正在運行的線程數量小於 corePoolSize,那么馬上創建線程運行這個任務;

 如果正在運行的線程數量大於或等於 corePoolSize,那么將這個任務放入隊列;

 如果這時候隊列滿了,而且正在運行的線程數量小於 maximumPoolSize,那么還是要創建非核心線程立刻運行這個任務;

 如果隊列滿了,而且正在運行的線程數量大於或等於 maximumPoolSize,那么線程池會拋出異常RejectExecutionException。

 當一個線程完成任務時,它會從隊列中取下一個任務來執行。

 當一個線程無事可做,超過一定的時間(keepAliveTime)時,線程池會判斷,如果當前運行的線程數大於 corePoolSize,那么這個線程就被停掉。所以線程池的所有任務完成后,它最終會收縮到 corePoolSize 的大小。

 

執行流程

 調用ThreadPoolExecutor的execute提交線程,首先檢查CorePool,如果CorePool內的線程小於CorePoolSize,新創建線程執行任務。

 如果當前CorePool內的線程大於等於CorePoolSize,那么將線程加入到BlockingQueue。

 如果不能加入BlockingQueue,在小於MaxPoolSize的情況下創建線程執行任務。

 如果線程數大於等於MaxPoolSize,那么執行拒絕策略。

 

配置線程池的大小

一般需要根據任務的類型來配置線程池大小:

 如果是CPU密集型任務,就需要盡量壓榨CPU,參考值可以設為 NCPU+1;

 如果是IO密集型任務,參考值可以設置為2*NCPU。

當然,這只是一個參考值,具體的設置還需要根據實際情況進行調整,比如可以先將線程池大小設置為參考值,再觀察任務運行情況和系統負載、資源利用率來進行適當調整。

 

線程池的注意事項

  雖然線程池是構建多線程應用程序的強大機制,但使用它並不是沒有風險的。在使用線程池時需注意線程池大小與性能的關系,注意並發風險、死鎖、資源不足和線程泄漏等問題。

  (1)線程池大小。多線程應用並非線程越多越好,需要根據系統運行的軟硬件環境以及應用本身的特點決定線程池的大小。一般來說,如果代碼結構合理的話,線程數目與CPU 數量相適合即可。如果線程運行時可能出現阻塞現象,可相應增加池的大小;如有必要可采用自適應算法來動態調整線程池的大小,以提高CPU 的有效利用率和系統的整體性能。

  (2)並發錯誤。多線程應用要特別注意並發錯誤,要從邏輯上保證程序的正確性,注意避免死鎖現象的發生。

  (3)線程泄漏。這是線程池應用中一個嚴重的問題,當任務執行完畢而線程沒能返回池中就會發生線程泄漏現象。

 

是否使用線程池就一定比使用單線程高效呢?

答案是否定的,比如Redis就是單線程的,但它卻非常高效,基本操作都能達到十萬量級/s。從線程這個角度來看,部分原因在於:

 多線程帶來線程上下文切換開銷,單線程就沒有這種開銷

 鎖

當然“Redis很快”更本質的原因在於:Redis基本都是內存操作,這種情況下單線程可以很高效地利用CPU。而多線程適用場景一般是:存在相當比例的IO和網絡操作。

所以即使有上面的簡單估算方法,也許看似合理,但實際上也未必合理,都需要結合系統真實情況(比如是IO密集型或者是CPU密集型或者是純內存操作)和硬件環境(CPU、內存、硬盤讀寫速度、網絡狀況等)來不斷嘗試達到一個符合實際的合理估算值。

 

線程使用要點:

線程數量要點

 如果運行線程的數量少於核心線程數量,則創建新的線程處理請求

 如果運行線程的數量大於核心線程數量,小於最大線程數量,則當隊列滿的時候才創建新的線程

 如果核心線程數量等於最大線程數量,那么將創建固定大小的連接池

 如果設置了最大線程數量為無窮,那么允許線程池適合任意的並發數量

線程空閑時間要點:

 當前線程數大於核心線程數,如果空閑時間已經超過了,那該線程會銷毀

排隊策略要點

 同步移交:不會放到隊列中,而是等待線程執行它。如果當前線程沒有執行,很可能會新開一個線程執行。

 無界限策略:如果核心線程都在工作,該線程會放到隊列中。所以線程數不會超過核心線程數

 有界限策略:可以避免資源耗盡,但是一定程度上減低了吞吐量

當線程關閉或者線程數量滿了和隊列飽和了,就有拒絕任務的情況了:

拒絕任務策略:

 直接拋出異常

 使用調用者的線程來處理

 直接丟掉這個任務

 丟掉最老的任務

 

線程池結構

 

 

線程生命周期

 

 

在線程的生命周期中,它要經過新建(New)、就緒(Runnable)、運行(Running)、阻塞(Blocked)和死亡(Dead)5種狀態。

Thread通過new來新建一個線程,這個過程是是初始化一些線程信息,如線程名,id,線程所屬group等,可以認為只是個普通的對象。調用Thread的start()后Java虛擬機會為其創建方法調用棧和程序計數器,同時將hasBeenStarted為true,之后調用start方法就會有異常。

處於這個狀態中的線程並沒有開始運行,只是表示該線程可以運行了。至於該線程何時開始運行,取決於JVM里線程調度器的調度。當線程獲取cpu后,run()方法會被調用。不要自己去調用Thread的run()方法。之后根據CPU的調度在就緒——運行——阻塞間切換,直到run()方法結束或其他方式停止線程,進入dead狀態。

 

源碼解析

  Java中線程池主要是並發包java.util.concurrent 中 ThreadPoolExecutor這個類實現的。

 

線程池框架Executor

java中的線程池是通過Executor框架實現的,Executor 框架包括類

Executor

Executors

ExecutorService

ThreadPoolExecutor

Callable

Future

FutureTask

 

(1) Executor: 所有線程池的接口,只有一個方法。

public interface Executor {          
  void execute(Runnable command);
}

execute執行方法

execute執行方法分了三步,以注釋的方式寫在代碼上了~

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    //如果線程池中運行的線程數量<corePoolSize,則創建新線程來處理請求,即使其他輔助線程是空閑的。
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //如果線程池中運行的線程數量>=corePoolSize,且線程池處於RUNNING狀態,且把提交的任務成功放入阻塞隊列中,就再次檢查線程池的狀態,
    // 1.如果線程池不是RUNNING狀態,且成功從阻塞隊列中刪除任務,則該任務由當前 RejectedExecutionHandler 處理。
    // 2.否則如果線程池中運行的線程數量為0,則通過addWorker(null, false)嘗試新建一個線程,新建線程對應的任務為null。
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (!isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果以上兩種case不成立,即沒能將任務成功放入阻塞隊列中,且addWoker新建線程失敗,則該任務由當前 RejectedExecutionHandler 處理。
    else if (!addWorker(command, false))
        reject(command);
}
  •  task:是具體的線程執行任務,線程在追加線程池的時候沒有進行啟動;
  •  worker:任務的執行需要worker來支持的,可以運行的worker受到“corePoolSize”限制;
  •  reject:如果現在線程池已經滿了或者關閉了,那么就會出現拒絕新線程加入的可能性。

(2) ExecutorService: 增加Executor的行為,是Executor實現類的最直接接口。

(3) Executors: 提供了一系列工廠方法用於創先線程池,返回的線程池都實現了ExecutorService 接口。

      創建線程池使用類:java.util.concurrent.Executors

Executors幾個重要方法:

callable(Runnable task): 將 Runnable 的任務轉化成 Callable 的任務

newSingleThreadExecutor(): 產生一個ExecutorService對象,這個對象只有一個線程可用來執行任務,若任務多於一個,任務將按先后順序執行。

newCachedThreadPool(): 產生一個ExecutorService對象,這個對象帶有一個線程池,線程池的大小會根據需要調整,線程執行完任務后返回線程池,供執行下一次任務使用。

//創建無大小限制的線程池
public static ExecutorService  newCachedThreadPool() {    
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}

      newFixedThreadPool(int poolSize): 產生一個ExecutorService對象,這個對象帶有一個大小為 poolSize 的線程池,若任務數量大於 poolSize ,任務會被放在一個 queue 里順序執行。

//創建固定大小的線程池
public static ExecutorService  newFixedThreadPool(int nThreads) {    
  return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}

      newSingleThreadScheduledExecutor(): 產生一個ScheduledExecutorService對象,這個對象的線程池大小為 1 ,若任務多於一個,任務將按先后順序執行。

//創建單線程池
public static ExecutorService newSingleThreadExecutor() {       
  return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}

      newScheduledThreadPool(int poolSize): 產生一個ScheduledExecutorService對象,這個對象的線程池大小為 poolSize ,若任務數量大於 poolSize ,任務會在一個 queue 里等待執行。

//創建定時調度池
public static ScheduledExecutorService  newScheduledThreadPool(int corePoolSize) {    
  return new ScheduledThreadPoolExecutor(corePoolSize);
}

(4) ThreadPoolExecutor:線程池的具體實現類,一般用的各種線程池都是基於這個類實現的。

線程池可以解決兩個不同問題:由於減少了每個任務的調用開銷,在執行大量的異步任務時,它通常能夠提供更好的性能,並且還可以提供綁定和管理資源(包括執行集合任務時使用的線程)的方法。每個 ThreadPoolExecutor還維護着一些基本的統計數據,如完成的任務數。

  線程池做的其實可以看得很簡單,其實就是把你提交的任務(task)進行調度管理運行,但這個調度的過程以及其中的狀態控制是比較復雜的。

構造方法如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
    
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException();
this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

[1]corePoolSize:線程池的核心線程數,線程池中運行的線程數也永遠不會超過 corePoolSize 個,默認情況下可以一直存活。可以通過設置allowCoreThreadTimeOut為True,此時 核心線程數就是0,此時keepAliveTime控制所有線程的超時時間。

核心線程會一直存活,及時沒有任務需要執行,當線程數小於核心線程數時

即使有線程空閑,線程池也會優先創建新線程處理

設置allowCoreThreadTimeout=true(默認false)時,核心線程會超時關閉

 

[2]maximumPoolSize:線程池允許的最大線程數;

 線程池最大線程數(當workQueue都放不下時,啟動新線程)

 當線程數>=corePoolSize,且任務隊列已滿時。線程池會創建新線程來處理任務

 當線程數=maxPoolSize,且任務隊列已滿時,線程池會拒絕處理任務而拋出異常(飽和策略怎么處理)

 

[3]keepAliveTime: 指的是空閑線程結束的超時時間;

 超出corePoolSize數量的線程的保留時間,如果allowCoreThreadTimeout=true,則會直到線程數量=0

 jdk中的解釋是:當線程數大於核心時,此為終止前多余的空閑線程等待新任務的最長時間。

有點拗口,其實這個不難理解,在使用了“池”的應用中,大多都有類似的參數需要配置。比如數據庫連接池,DBCP中的maxIdle,minIdle參數。

什么意思?接着上面的解釋,后來向老板派來的工人始終是“借來的”,俗話說“有借就有還”,但這里的問題就是什么時候還了,如果借來的工人剛完成一個任務就還回去,后來發現任務還有,那豈不是又要去借?這一來一往,老板肯定頭也大死了。

 

合理的策略:既然借了,那就多借一會兒。直到“某一段”時間后,發現再也用不到這些工人時,便可以還回去了。這里的某一段時間便是keepAliveTime的含義,TimeUnit為keepAliveTime值的度量。

 

[4]unit :是一個枚舉,表示 keepAliveTime 的單位;

TimeUnit.DAYS;               //
TimeUnit.HOURS;             //小時 
TimeUnit.MINUTES;           //分鍾
TimeUnit.SECONDS;           //
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙 
TimeUnit.NANOSECONDS;       //納秒

[5]workQueue:表示存放任務的BlockingQueue<Runnable隊列。阻塞隊列(任務隊列容量),當線程數達到核心線程數時,新任務會放在隊列中排隊等待執行

 ArrayBlockingQueue:構造函數一定要傳大小

 LinkedBlockingQueue:構造函數不傳大小會默認為Integer.MAX_VALUE ,當大量請求任務時,容易造成 內存耗盡

 SynchronousQueue:同步隊列,一個沒有存儲空間的阻塞隊列 ,將任務同步交付給工作線程

 PriorityBlockingQueue : 優先隊列

 

BlockingQueue:阻塞隊列(BlockingQueue)是java.util.concurrent下的主要用來控制線程同步的工具。如果BlockQueue是空的,從BlockingQueue取東西的操作將會被阻斷進入等待狀態,直到BlockingQueue進了東西才會被喚醒。同樣,如果BlockingQueue是滿的,任何試圖往里存東西的操作也會被阻斷進入等待狀態,直到BlockingQueue里有空間才會被喚醒繼續操作。

阻塞隊列常用於生產者和消費者的場景,生產者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器里拿元素。具體的實現類有LinkedBlockingQueue,ArrayBlockingQueued等。一般其內部的都是通過Lock和Condition(顯示鎖(Lock)及Condition的學習與使用)來實現阻塞和喚醒。

queue上的三種類型。

排隊有三種通用策略:

直接提交:工作隊列默認選項是SynchronousQueue,它將任務直接提交給線程而不保存它們。在此,如果不存在可用於立即運行任務的線程,則駛入把任務加入到隊列將失敗,因此將會構造一個新的線程。此策略可以避免在處理可能具有內部依賴性請求集時出現鎖。直接提交通常要求無界maximumPoolSize以避免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程有增長的可能性。

無界隊列:使用無界隊列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有 corePoolSize 線程都忙時新任務在隊列中等待。這樣,創建的線程就不會超過 corePoolSize。(因此,maximumPoolSize的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界隊列;例如,在 Web頁服務器中。這種排隊可用於處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。

有界隊列。當使用有限的 maximumPoolSizes時,有界隊列(如 ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、操作系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O邊界),則系統可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降低吞吐量。

 

[6]threadFactory:線程工廠,主要用來創建線程

[7]rejectedExecutionHandler :任務拒絕處理器

兩種情況會拒絕處理任務

  • 當線程數已經達到maxPoolSize,切隊列已滿,會拒絕新任務
  • 當線程池被調用shutdown()后,會等待線程池里的任務執行完畢,再shutdown。如果在調用shutdown()和線程池真正shutdown之間提交任務,會拒絕新任務

 

線程池會調用rejectedExecutionHandler來處理這個任務。如果沒有設置默認是AbortPolicy,會拋出異常,ThreadPoolExecutor類有幾個內部實現類來處理這類情況

  •  AbortPolicy(默認):直接拋棄,拋運行時異常
  •  CallerRunsPolicy:用調用者的線程執行任務
  •  DiscardOldestPolicy:拋棄隊列中最久的任務
  •  DiscardPolicy:拋棄當前任務

實現RejectedExecutionHandler接口,可自定義處理器

另一種情況便是,即使向老板借了工人,但是任務還是繼續過來,還是忙不過來,這時整個隊伍只好拒絕接受了。

 

RejectedExecutionHandler接口提供了對於拒絕任務的處理的自定方法的機會。在ThreadPoolExecutor中已經默認包含了4中策略,因為源碼非常簡單,這里直接貼出來。

CallerRunsPolicy:線程調用運行該任務的 execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {           
  if (!e.isShutdown()) {
    r.run();
  }
}

AbortPolicy:處理程序遭到拒絕將拋出運行時RejectedExecutionException

  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {           
    throw new RejectedExecutionException();
  }

DiscardPolicy:不能執行的任務將被刪除

  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}

這種策略和AbortPolicy幾乎一樣,也是丟棄任務,只不過他不拋出異常。

DiscardOldestPolicy:如果執行程序尚未關閉,則位於工作隊列頭部的任務將被刪除,然后重試執行程序(如果再次失敗,則重復此過程)

  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {           
    if (!e.isShutdown()) {
      e.getQueue().poll();
      e.execute(r);
    }
  }

 

該策略就稍微復雜一些,在pool沒有關閉的前提下首先丟掉緩存在隊列中的最早的任務,然后重新嘗試運行該任務。這個策略需要適當小心。

設想:如果其他線程都還在運行,那么新來任務踢掉舊任務,緩存在queue中,再來一個任務又會踢掉queue中最老任務。

 

 線程池的拒絕策略

  池子有對象池如commons pool的GenericObjectPool(通用對象池技術)也有java里面的線程池ThreadPoolExecutor,但java里面的線程池引入了一個叫拒絕執行的策略模式,感覺比GenericObjectPool好一點,意思也就是說當池子滿的時候該如何執行還在不斷往里面添加的一些任務。 

  像GenericObjectPool只提供了,繼續等待和直接返回空的策略。而ThreadPoolExecutor則提供了一個接口,並內置了4中實現策略供用戶分場景使用。

  ThreadPoolExecutor.execute(Runnable command)提供了提交任務的入口,此方法會自動判斷如果池子滿了的話,則會調用拒絕策略來執行此任務,接口為RejectedExecutionHandler,內置的4中策略分別為AbortPolicy、DiscardPolicy、DiscardOldestPolicy、CallerRunsPolicy。

 

圖5 拒絕策略關系圖

  AbortPolicy

  為java線程池默認的阻塞策略,不執行此任務,而且直接拋出一個運行時異常,切記ThreadPoolExecutor.execute需要try catch,否則程序會直接退出。

  DiscardPolicy

  直接拋棄,任務不執行,空方法

  DiscardOldestPolicy

  從隊列里面拋棄head的一個任務,並再次execute 此task。

  CallerRunsPolicy

  在調用execute的線程里面執行此command,會阻塞入口。

  用戶自定義拒絕策略

  實現RejectedExecutionHandler,並自己定義策略模式。

  再次需要注意的是,ThreadPoolExecutor.submit() 函數,此方法內部調用的execute方法,並把execute執行完后的結果給返回,但如果任務並沒有執行的話(被拒絕了),則submit返回的future.get()會一直等到。 

  future 內部其實還是一個runnable,並把command給封裝了下,當command執行完后,future會返回一個值。

 

  handler:拒絕機制。

當線程池因為工作池已經飽和,准備拒絕任務時候。會調用RejectedExecutionHandler來拒絕該任務。Jdk提供了幾種不同的RejectedExecutionHandler實現,每種實現都包含不同的飽和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy。

 Abort是默認的飽和策略,該策略會拋出未檢查的RejectedExecutionException。

 CallerRuns實現一種調節機制,將任務回退到調用者,讓調用者執行,從而降低了新任務的流量。webServer通過使用該策略使得在請求負載過高的情況下實現了性能的平緩降低。

 Discard實現了會悄悄拋棄該任務,DiscardOldestPolicy會拋棄隊列中拋棄下一個即將被執行的任務。如果是在優先隊列里,DiscardOldestPolicy會拋棄優先級最高的任務。

 

  ThreadLocalPool的池的大小設置,《Java並發編程實戰》書中給了一個推薦的設置值。

  Ncpu為CPU的數量,Ucpu為CPU的利用率,W/C為任務的等待時間 / 任務的計算時間。在這種情況下,一般線程池的最優大小: 

N=Ncpu*Ucpu*(1+W/C)

線程池有五種運行狀態

ThreadPoolExecutor中有一個ctl變量。ctl是一個32位的二級制數,其中高3位用於表示線程池的狀態,低29位表示線程池中的活動線程。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

  如上代碼所示,線程池有五種狀態。RUNNINGSHUTDOWNSTOPTIDYINGTERMINNATED。幸好ThreadPoolExecutor的代碼上有對應注釋,看着這些注釋能對ThreadPoolExecutor的狀態作用和狀態流轉能有一個大致的了解。

  RUNNING:在線程池創建的時候,線程池默認處於RUNNING狀態。當線程池處於RUNNING狀態的時候,任務隊列可以接受任務,並且可以執行QUEUE中任務。

  SHUTDOWN:不接受新任務,但是會繼續執行QUEUE中的任務。

  STOP:不接受新任務,也不執行QUEUE中的任務。

  TIDYING:所有的任務都中止了,沒有活動中的線程。當線程池進行該狀態時候,會執行鈎子方法terminated() 。

 

Executors的執行

當Executors創建完成了線程池之后可以返回“ExecutorService”接口對象,而這個對象里面有兩個方法來接收線程的執行:

//接收Callable:
public <T> Future<T> submit(Callable<T> task);
//接收Runnable:
public Future<?> submit(Runnable task);

范例:創建無限量線程池

package so.strong.mall.concurrent;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExecutorDemo {
    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool(); //創建一個線程池
        for (int i = 0; i < 5; i++) {
            service.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName()+"執行操作");
                }
            });
        }
        service.shutdown(); //線程池執行完畢后需要關閉
    }
}
//無限量大小的線程池會根據內部線程的執行狀況來進行線程對象個數的控制。

submit()方法是可以接收Callable接口對象的

 

package so.strong.mall.concurrent;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ExecutorDemo {
    public static void main(String[] args) throws Exception {
        ExecutorService service = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            Future<?> future = service.submit(new Callable<Object>() {
                @Override
                public Object call() throws Exception {
                    return Thread.currentThread().getName() + "執行操作";
                }
            });
            System.out.println(future.get()); 
        }
        service.shutdown();
    }
}

 

Future線程模型設計的優勢在於:可以進行線程數據的異步控制,但是在之前編寫的過程嚴格來講並不好,相當於啟動了一個線程就獲得了一個返回值,於是為了方便這些線程池中線程對象的管理,可以使用如下方法進行統一返回:

 

public interface ExecutorService extends Executor {
      public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
}

范例:使用invokeAny()方法

 

package so.strong.mall.concurrent;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecutorDemo {
    public static void main(String[] args) throws Exception {
        Set<Callable<String>> tasks = new HashSet<>(); //所有任務
        for (int i = 0; i < 10; i++) {
            final int temp = i;
            tasks.add(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    return Thread.currentThread().getName() + "執行任務,i=" + temp;
                }
            });
        }
        ExecutorService service = Executors.newCachedThreadPool(); //創建一個線程池
        String invokeAny = service.invokeAny(tasks); //執行任務
        System.out.println("返回結果:" + invokeAny);
        service.shutdown();

    }
}
//返回結果:pool-1-thread-2執行任務,i=4
//使用invokeAny()方法只會返回一個任務的執行操作

 

 

CompletionService線程池異步交互

 

package java.util.concurrent; 
public interface CompletionService<V> {
    Future<V> submit(Callable<V> task);
    Future<V> submit(Runnable task, V result);
    Future<V> take() throws InterruptedException;
    Future<V> poll();
    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

線程池異步交互:CompletionService

 將生產新的異步任務與使用已完成任務的結果分離開來的服務。生產者submit()執行的任務,使用者take()已完成的任務,並按照完成任務的順序處理它們的結果。

 CompletionService依賴於一個單獨的Executor來實際執行任務,在這種情況下,CompletionService只管理一個內部完成隊列,在CompletionService接口里面提供有如下兩個方法:

  設置Callable: 

      public Future<V> submit(Callable<V> task);

           設置Runnable:

      public Future<V> submit(Runnable task, V result); 

       CompletionService是一個接口,如果要想使用這個接口可以采用ExecutorCompletionService這個子類

      public class ExecutorCompletionService<V> implements CompletionService<V>

     ExecutorCompletionService的構造方法:

 

public ExecutorCompletionService(Executor executor) {
    if (executor == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

    CompletionService來控制所有線程池的操作以及數據返回,則應該使用這個類來進行線程池的提交處理。

     提交線程

       Future<V> submit(Callable<V> task);

     獲取返回內容

      Future<V> take() throws InterruptedException;  

    范例:使用CompletionService工具類

 

package so.strong.mall.concurrent;
import java.util.concurrent.*;

public class ExecutorDemo {
    public static void main(String[] args) throws Exception {
        ExecutorService service = Executors.newCachedThreadPool();
        CompletionService<String> completions = new ExecutorCompletionService<>(service);
        for (int i = 0; i < 5; i++) {
            final int temp = i;
            completions.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    return Thread.currentThread().getName() + "- i =" + temp;
                }
            });
        }
        for (int i = 0; i < 5; i++) {
            System.out.println(completions.take().get());
        }
        service.shutdown();
    }
}

    CompletionService操作接口的主要目的是可以去隱藏ExecutorService接口執行線程池的處理,不在需要關心novkeAny(), invokeAll()的執行方法了。

    創建一個定時調度池,這個調度池主要是以時間間隔調度為主。如果要創建調度池則使用ScheduledExecutorService接口完成,該接口之中包含有如下的兩個方法:

       延遲啟動

        public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);

       間隔調度

        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);

    范例:創建調度池

 

package so.strong.mall.concurrent;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ExecutorDemo {
    public static void main(String[] args) {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
        for (int i = 0; i < 5; i++) {
            service.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName()+"執行操作");
                }
            },2, TimeUnit.SECONDS);
        }
        service.shutdown();
    }
}

    在ExecutorService接口里面的確提供有接收Runnable接口對象的方法,但是這個方法為了統一使用的是submit()。submit()重載了許多次,可以接收Runnable:

      public Future<?> submit(Runnale task) 

 

線程池的創建和使用

生成線程池采用了工具類Executors的靜態方法,以下是幾種常見的線程池。

 

SingleThreadExecutor:單個后台線程 (其緩沖隊列是無界的)

 

public static ExecutorService newSingleThreadExecutor() {        
    return new FinalizableDelegatedExecutorService (
        new ThreadPoolExecutor(1, 1,                                    
        0L, TimeUnit.MILLISECONDS,                                    
        new LinkedBlockingQueue<Runnable>()));   
}

創建一個單線程的線程池。這個線程池只有一個核心線程在工作,也就是相當於單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。此線程池保證所有任務的執行順序按照任務的提交順序執行。

 

FixedThreadPool:只有核心線程的線程池,大小固定 (其緩沖隊列是無界的) 。

 

public static ExecutorService newFixedThreadPool(int nThreads) {         
        return new ThreadPoolExecutor(nThreads, nThreads,                                       
            0L, TimeUnit.MILLISECONDS,                                         
            new LinkedBlockingQueue<Runnable>());     
}

創建固定大小的線程池。每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。

 

CachedThreadPool:無界線程池,可以進行自動線程回收。

 

public static ExecutorService newCachedThreadPool() {         
    return new ThreadPoolExecutor(0,Integer.MAX_VALUE,                                           
           60L, TimeUnit.SECONDS,                                       
           new SynchronousQueue<Runnable>());     
}

如果線程池的大小超過了處理任務所需要的線程,那么就會回收部分空閑(60秒不執行任務)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。此線程池不會對線程池大小做限制,線程池大小完全依賴於操作系統(或者說JVM)能夠創建的最大線程大小。SynchronousQueue是一個是緩沖區為1的阻塞隊列。

 

ScheduledThreadPool:核心線程池固定,大小無限的線程池。此線程池支持定時以及周期性執行任務的需求。

 

public static ExecutorService newScheduledThreadPool(int corePoolSize) {         
    return new ScheduledThreadPool(corePoolSize, 
              Integer.MAX_VALUE,                                                  
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,                                                    
              new DelayedWorkQueue());    
}

 

創建一個周期性執行任務的線程池。如果閑置,非核心線程池會在DEFAULT_KEEPALIVEMILLIS時間內回收。

 

線程池最常用的提交任務的方法有兩種:

execute:

  ExecutorService.execute(Runnable runable);

submit:

FutureTask task = ExecutorService.submit(Runnable runnable);

FutureTask<T> task = ExecutorService.submit(Runnable runnable,T Result); FutureTask<T> task = ExecutorService.submit(Callable<T> callable);

 

submit(Callable callable)的實現,submit(Runnable runnable)同理。

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    FutureTask<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

 

可以看出submit開啟的是有返回結果的任務,會返回一個FutureTask對象,這樣就能通過get()方法得到結果。submit最終調用的也是execute(Runnable runable),submit只是將Callable對象或Runnable封裝成一個FutureTask對象,因為FutureTask是個Runnable,所以可以在execute中執行。關於Callable對象和Runnable怎么封裝成FutureTask對象,見Callable和Future、FutureTask的使用。

 

擴展ThreadPoolExecutor:

ThreadPoolExecutor提供了以下3個生命周期的鈎子方法讓子類擴展:

(1).beforeExecute:

任務執行前,線程會調用該方法,可以用來添加日志、監控或者信息收集統計。

若beforeExcute方法拋出了RuntimeException,線程的任務將不被執行,afterExecute方法也不會被調用。

 

(2).afterExecute:

任務執行結束后,線程會調用該方法,可以用來添加日志、監控或者信息收集統計。

無論任務正常返回或者拋出異常(拋出Error不能被調用),該方法都會被調用。

 

(3).terminate:

線程池完成關閉動作后調用,可以用來釋放資源、發出通知、記錄日志或者完成統計信息等。

一個擴展ThreadPoolExecutor的例子代碼如下:

 

public class TimingThreadPool extends ThreadPoolExecutor{  
    private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();  
    private final Logger log = Logger.getLogger(TimingThreadPool.class.getClassName());  
    private final AtomicLong numTasks = new AtomicLong();  
    private final AtomicLong totalTime = new AtomicLong();  
 
    protected void beforeExecute(Thread t, Runnable r){  
        super.beforeExecute(t, r);  
        log.fine(String.format(“Thread %s: start %s”, t, r));  
        startTime.set(System.nanoTime());  
    }  

    protected void afterExecute(Runnable r, Throwable t){  
        try{  
            long endTime = System.nanoTime();  
            long taskTime = endTime - startTime.get();  
            numTasks.incrementAndGet();  
            totalTime.addAndGet(taskTime);  
            log.fine(String.format("Thread %s: end %s, time=%dns", t, r, taskTime));  
        }finally{  
            super.afterExecute(r, t);  
        }  
    }  

    protected void terminated(){  
        try{  
            log.info(String.format("Terminated: avg time=%dns",   
                    totalTime.get() / numTasks.get()));  
        }finally{  
            super.terminated();  
        }  
    }  
}  

 

 

 

 

總線程數 = 排隊線程數 + 活動線程數 + 執行完成的線程數。

 

下面給出一個線程池使用示例,及教你獲取線程池狀態。

 

private static ExecutorService es = new ThreadPoolExecutor(50, 100, 0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(100000));
public static void main(String[] args) throws Exception {
    for (int i = 0; i < 100000; i++) {
        es.execute(() -> {
            System.out.print(1);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
    ThreadPoolExecutor tpe = ((ThreadPoolExecutor) es);
    while (true) {
        System.out.println();
        int queueSize = tpe.getQueue().size();
        System.out.println("當前排隊線程數:" + queueSize);
        int activeCount = tpe.getActiveCount();
        System.out.println("當前活動線程數:" + activeCount);
        long completedTaskCount = tpe.getCompletedTaskCount();
        System.out.println("執行完成線程數:" + completedTaskCount);
        long taskCount = tpe.getTaskCount();
        System.out.println("總線程數:" + taskCount);
        Thread.sleep(3000);
    }
}

 

 

線程池提交了 100000 個任務,但同時只有 50 個線程在執行工作,我們每陋 3 秒來獲取當前線程池的運行狀態。

第一次程序輸出:

當前排隊線程數:99950 當前活動線程數:50 執行完成線程數:0 總線程數(排隊線程數 + 活動線程數 +  執行完成線程數):100000

第二次程序輸出:

當前排隊線程數:99800 當前活動線程數:50 執行完成線程數:150 總線程數(排隊線程數 + 活動線程數 +  執行完成線程數):100000

活動線程數和總線程數是不變的,排隊中的線程數和執行完成的線程數不斷在變化,直到所有任務執行完畢,最后輸出:

當前排隊線程數:0 當前活動線程數:0 執行完成線程數:100000 總線程數(排隊線程數 + 活動線程數 +  執行完成線程數):100000

這樣,你了解了這些 API 的使用方法,你想監控線程池的狀態就非常方便了。

 

Java 8 增強的線程池

為了充分利用多CPU的優勢、多核CPU的性能優勢。可以考多個小任務,把小任務放到多個處理器核心上並行執行;當多個小任務執行完成之后,再將這些執行結果合並起來即可。Java 7提供了ForkJoinPool來支持這個功能。

 ForkJoinPool是ExecutorService的實現類,因此是一種特殊的線程池。提供了如下兩個常用的構造器

  •  ForkJoinPool(int parallelism):創建一個包含parallelism個並行線程的ForkJoinPool.
  •  ForkJoinPool():以Runtime.availableProssesors()方法的返回值作為paralelism參數來創建ForkJoinPool.

Java 8進一步拓展了ForkJoinPool的功能,Java 8增加了通用池功能。ForkJoinPool通過如下兩個方法提供通用池功能。

  •  ForkJoinPool commonPool():該方法返回一個通用池,通用池的狀態不會受shutdown()或shutdownNow()方法的影響。
  •  int getCommonPoolParallelism():該方法返回通用池的並行級別。

創建了通用池ForkJoinPool實例之后,就可調用ForkJoinPool的submit(ForkJoinTask task)或invoke(ForkJoinTask task)方法來執行指定任務了。其中,ForkJoinTask代表一個並行,合並的任務。

ForkJoinTask是一個抽象類,它還有兩個抽象子類:RecursiveAction和recursiveTask。其中RecursiveAction代表沒有返回值的任務,RecursiveTask代表有返回值的任務。

下面程序將一個大任務(打印0~500)的數值分成多個小任務,並將任務交給ForkJoinPool來執行。

 

package com.gdut.thread.threadPool;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

class PrintTask extends RecursiveAction{
    private static final int THRESHOLD = 50;
    private int start;
    private int end;
    public PrintTask(int start,int end) {
      this.start = start;
      this.end = end;
    }

    @Override
    protected void compute() {
        if(end-start<THRESHOLD){
            for (int i = start; i <end ; i++) {
                System.out.println(Thread.currentThread().getName()+"的i值"+i);
            }
        }else{
            //當end與start的差大於THRESHOLD時,即要打印的數超過50時,將大任務分成兩個小任務
            int middle = (end+start)/2;
            PrintTask left = new PrintTask(start,middle);
            PrintTask right = new PrintTask(middle,end);
            left.fork();
            right.fork();
        }
    }
}
public class ForkJoinPoolTest{
    public static void main(String[] args) throws InterruptedException{
        ForkJoinPool pool = new ForkJoinPool();
        pool.submit(new PrintTask(0,500));
        pool.awaitTermination(2, TimeUnit.SECONDS);
        pool.shutdown();
    }

}

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM