前面兩篇講解了線程池中線程創建后的運行情況,其中有一系列的策略來保證線程正常運行。但是我們知道線程池是可以設置容量的,而且這容量的設置也是至關重要的,如果容量設置的太小,那么將會影響系統的運行效率,如果設置的過大,也可能造成無止盡的線程堆積,最終造成系統內存溢出。對於此,線程池也提供了一些設置來防止這些現象。下面我們將會介紹。
線程初始化
當我們創建線程池后,如果沒有新任務進來的話,默認是沒有線程的,提交任務后線程池才會創建新的線程。如果你想創建線程池時就初始化corePoolSize數量的線程的話,線程池提供了以下兩個方法:
prestartCoreThread(): 立即初始化一個線程prestartAllCoreThreads():立即初始化corePoolSize數量的線程
以下是具體方法實現:
public int prestartAllCoreThreads() {
int n = 0;
while (addIfUnderCorePoolSize(null))
++n;
return n;
}
public boolean prestartCoreThread() {
return addIfUnderCorePoolSize(null);
}
底層都是調用 addIfUnderCorePoolSize() 方法,上一篇有講過,如果傳入的參數為null的話,則最后執行線程會阻塞在getTask方法中的,因為要等待堵塞隊列中有任務到達。
任務堵塞隊列
當線程池池創建的線程數量大於 corePoolSize 后,新來的任務將會加入到堵塞隊列(workQueue)中等待有空閑線程來執行。workQueue的類型為BlockingQueue
- ArrayBlockingQueue:基於數組的FIFO隊列,是有界的,創建時必須指定大小
- LinkedBlockingQueue: 基於鏈表的FIFO隊列,是無界的,默認大小是
Integer.MAX_VALUE - synchronousQueue:一個比較特殊的隊列,雖然它是無界的,但它不會保存任務,每一個新增任務的線程必須等待另一個線程取出任務,也可以把它看成容量為0的隊列
所有 BlockingQueue 都可用於傳輸和保持提交的任務。可以使用此隊列與池大小進行交互:
如果運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。(如果當前運行的線程小於corePoolSize,則任務根本不會存放,添加到queue中,而是直接抄家伙(thread)開始運行)
如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。
如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出 maximumPoolSize,在這種情況下,任務將被拒絕。
排隊有三種通用策略:
直接提交。工作隊列的默認選項是 SynchronousQueue,它將任務直接提交給線程而不保持它們。在 此,如果不存在可用於立即運行任務的線程,則試圖把任務加入隊列將失敗,因此會構造一個新的線程。此策略可以避免在處理可能具有內部依賴性的請求集時出現 鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線 程具有增長的可能性。
無界隊列。使用無界隊列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所 有 corePoolSize 線程都忙時新任務在隊列中等待。這樣,創建的線程就不會超過 corePoolSize。(因 此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界隊列;例如, 在 Web 頁服務器中。這種排隊可用於處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。
有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列 (如 ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型 池可以最大限度地降低 CPU 使用率、操作系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊 界),則系統可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降 低吞吐量。
BlockingQueue****的選擇。
例子一:使用直接提交策略,也即SynchronousQueue。
首先SynchronousQueue是無界的,也就是說他存數任務的能力是沒有限制的,但是由於該Queue本身的特性,在某次添加元素后必須等待其他線程取走后才能繼續添加。在這里不是核心線程便是新創建的線程,但是我們試想一樣下,下面的場景。
我們使用一下參數構造ThreadPoolExecutor:
new ThreadPoolExecutor(
2, 3, 30, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new RecorderThreadFactory("CookieRecorderPool"),
new ThreadPoolExecutor.CallerRunsPolicy());
當核心線程已經有2個正在運行.
- 此時繼續來了一個任務(A),根據前面介紹的“如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。”,所以A被添加到queue中。
- 又來了一個任務(B),且核心2個線程還沒有忙完,OK,接下來首先嘗試1中描述,但是由於使用的SynchronousQueue,所以一定無法加入進去。
- 此時便滿足了上面提到的“如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出maximumPoolSize,在這種情況下,任務將被拒絕。”,所以必然會新建一個線程來運行這個任務。
- 暫時還可以,但是如果這三個任務都還沒完成,連續來了兩個任務,第一個添加入queue中,后一個呢?queue中無法插入,而線程數達到了maximumPoolSize,所以只好執行異常策略了。
所以在使用SynchronousQueue通常要求maximumPoolSize是無界的,這樣就可以避免上述情況發生(如果希望限制就直接使 用有界隊列)。對於使用SynchronousQueue的作用jdk中寫的很清楚:此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。
什么意思?如果你的任務A1,A2有內部關聯,A1需要先運行,那么先提交A1,再提交A2,當使用SynchronousQueue我們可以保證,A1必定先被執行,在A1么有被執行前,A2不可能添加入queue中。
例子二:使用無界隊列策略,即****LinkedBlockingQueue
這個就拿newFixedThreadPool來說,根據前文提到的規則:
如果運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。那么當任務繼續增加,會發生什么呢?
如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。OK,此時任務變加入隊列之中了,那什么時候才會添加新線程呢?
如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出 maximumPoolSize,在這種情況下,任務將被拒絕。這里就很有意思了, 可能會出現無法加入隊列嗎?不像SynchronousQueue那樣有其自身的特點,對於無界隊列來說,總是可以加入的(資源耗盡,當然另當別論)。換 句說,永遠也不會觸發產生新的線程!corePoolSize大小的線程數會一直運行,忙完當前的,就從隊列中拿任務開始運行。所以要防止任務瘋長,比如 任務運行的實行比較長,而添加任務的速度遠遠超過處理任務的時間,而且還不斷增加,不一會兒就爆了。
例子三:有界隊列,使用ArrayBlockingQueue。
這個是最為復雜的使用,所以JDK不推薦使用也有些道理。與上面的相比,最大的特點便是可以防止資源耗盡的情況發生。
舉例來說,請看如下構造方法:
new ThreadPoolExecutor(
2, 4, 30, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2),
new RecorderThreadFactory("CookieRecorderPool"),
new ThreadPoolExecutor.CallerRunsPolicy());
假設,所有的任務都永遠無法執行完。
對於首先來的A,B來說直接運行,接下來,如果來了C,D,他們會被放到queue中,如果接下來再來E,F,則增加線程運行E,F。但是如果再來任務,隊列無法再接受了,線程數也到達最大的限制了,所以就會使用拒絕策略來處理。
keepAliveTime
jdk中的解釋是:當線程數大於核心時,此為終止前多余的空閑線程等待新任務的最長時間。
有點拗口,其實這個不難理解,在使用了“池”的應用中,大多都有類似的參數需要配置。比如數據庫連接池,DBCP中的maxIdle,minIdle參數。
什么意思?接着上面的解釋,后來向老板派來的工人始終是“借來的”,俗話說“有借就有還”,但這里的問題就是什么時候還了,如果借來的工人剛完成一個任務就還回去,后來發現任務還有,那豈不是又要去借?這一來一往,老板肯定頭也大死了。
合理的策略:既然借了,那就多借一會兒。直到“某一段”時間后,發現再也用不到這些工人時,便可以還回去了。這里的某一段時間便是keepAliveTime的含義,TimeUnit為keepAliveTime值的度量。
任務拒絕策略
線程池堵塞隊列容量滿之后,將會直接新建線程,數量等於 maximumPoolSize 后,將會執行任務拒絕策略不在接受任務,有以下四種拒絕策略:
- ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
- ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
- ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程)
- ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務
線程池的關閉
ThreadPoolExecutor提供了兩個方法,用於線程池的關閉,分別是shutdown()和shutdownNow(),其中:
- shutdown():不會立即終止線程池,而是要等所有任務緩存隊列中的任務都執行完后才終止,但再也不會接受新的任務
- shutdownNow():立即終止線程池,並嘗試打斷正在執行的任務,並且清空任務緩存隊列,返回尚未執行的任務
線程池容量的動態調整
ThreadPoolExecutor提供了動態調整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
- setCorePoolSize:設置核心池大小
- setMaximumPoolSize:設置線程池最大能創建的線程數目大小
當上述參數從小變大時,ThreadPoolExecutor進行線程賦值,還可能立即創建新的線程來執行任務。
