等待其他資源,可能會產生線程飢餓死鎖
在線程池中如果一個任務依賴於其它任務的執行,就可能產生死鎖.在一個單線程化的Executor中,提交兩個任務,任務二滯留在工作隊列中等待第一個任務完成,但是第一個任務不會完成,因為它在等待第二個任務的完成(需要第二個任務執行的結果進行運算),這就會發生死鎖.
在一個大的線程池中,如果所有線程執行的任務都阻塞在線程池中,等待着仍然處於同一工作隊列中的其它任務,那么會發生同樣的問題.這被稱作線程飢餓死鎖(thread starvation deadlock)
產生死鎖的情況: 只要池任務開始了無限期的阻塞,其目的是等待一些資源或條件,此時只有另一個池任務的活動才能使那些條件成立,比如等待返回值.除非你能保證這個池足夠大,否則會產生線程飢餓死鎖.
池任務等待另一個池任務的結果,可能會發生死鎖:
public class ThreadDeadLock {
ExecutorService exec = Executors.newSingleThreadExecutor();
public class Task implements Callable{
@Override
public Object call() throws Exception {
//等待另一個池任務的結果
Future<String> future1 = exec.submit(new LockTask());
Future<String> future2 = exec.submit(new LockTask());
//可能發生死鎖
return future1.get()+future2.get();
}
}
無論何時,你提交了一個非獨立的Executor任務,要明確出現線程飢餓死鎖的可能性,並且,在代碼或者配置文件以及其他可以配置Executor的地方,任何有關池的大小和配置約束都要寫入文檔
耗時的任務,設置超時時間
如果你的線程池不夠大,又有很多耗時的任務,這會影響服務的響應性.這時候你可以限定任務等待資源的時間,而不是無限制地等下去.
耗時的任務可能會死鎖或者響應的很慢
大多數平台類庫中的阻塞方法,都有限時和非限時兩個版本.例如Blocking.put.如果超時你可以把任務標記為失敗,終止或者把他重新返回隊列,准備之后執行.這樣無論每個任務的最終結果是成功還是失敗,都保證了任務會向前發展,這樣可以更快地將線程從任務中解放出來.(如果線程池頻頻被阻塞的任務充滿,這同樣可能是池太小的一個信號).
定制線程池的大小
不要硬編碼線程池的大小
線程池合理的長度取決於未來提交的任務類型和所部屬系統的特征.池的長度應該由某種配置機制來提供,或者利用Runtime.availableProcessors(獲取你電腦的處理器數量),動態進行計算
線程池過大&過小的壞處
線程池過大: 線程對稀缺的CPU和內存資源的競爭,會導致內存的高使用量.線程間切換也會消耗資源
線程池過小:由於存在很多可用的處理器資源沒用,會對吞吐量造成損失
制定線程池大小依據的內容
正確的定制線程池的長度,需要理解你的計算環境、資源預算和任務的自身特性.
部署系統中安裝了多少個CPU?多少內存?任務主要執行的是計算、I/O還是一些混合操作?它們是否需要像JDBC Connection 這樣的稀缺資源?
如果你有不同類別的任務,它們擁有差別很大行為,那么請考慮使用多個線程池,這樣每個線程池可以根據不同任務的工作負載進行調節.
計算密集型和I/O密集型的線程選擇
計算密集型:一直在計算,cpu利用率高,過多的線程沒有意義,反而切換線程會消耗額外的資源.
I/O密集型:例如查找數據庫,等待數據造成的阻塞,CPU利用率低,多個線程可以提高響應速度.
對於計算密集型的任務,一個有N個處理器的系統通常使用一個N+1個線程的線程池來獲得最優的利用率(計算密集型的線程恰好在某時因為發生一個頁錯誤或者因其它原因而暫停,剛好有一個"額外"的線程,可以確保在這種情況下CPU周期不會中斷工作).
對於包含了I/O和其他阻塞操作的任務,不是所有的線程都會在所有的時間被調度,因此你需要一個更大的池.
在一個基准負載下,可以使用不同大小的線程池運行你的應用程序,並觀察CPU利用率的水平.
計算線程池大小的公式
N = CPU的數量
U = 目標CPU的使用率,介於0-1之間
W/C = 等待時間(wait)和計算時間(calculate)的比率
為保持處理器到達期望的使用率,最優的池的大小等於:
num(線程數) = N * U * (1 + W/C);
你可以使用Runtime來獲得CPU的數目:
int nCpus = Runtime.getRuntime().availableProcessors();
簡單的例子
通過Runtime.getRuntime().availableProcessors();得到我的電腦cpu數是4, 我期望cpu的使用率是100%,假設等待時間是10秒,計算時間是1秒.那么我最優的池大小就是:
4 * 100% * (1+10/1) = 44
線程池的長度和資源池的長度互相影響
當任務需要使用池化的資源時,比如數據庫鏈接,線程池的長度和資源池的長度會互相影響.
如果每一個任務都需要一個數據庫鏈接,那么連接池的大小就限制了線程池的有效大小;類似地,當線程池中的任務是連接池的唯一消費者時,那么線程池的大小反而又會限制了連接池的有效大小.
配置ThreadPoolExecutor
靈活配置ThreadPoolExecutor
使用Executors工廠方法可以創建各種類型的線程池,newCachedThreadPool、newFixedThreadPool和newScheduledThreadExecutor等.如果這些執行策略不能滿足你的需求,你可以 new ThreadPoolExecutor(傳遞各種參數)來配置.
ThreadPoolExecutor有很多構造函數
最后一個構造函數是功能最全的,也是最常用的,源碼:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
它有五個參數分別是:
- corePoolSize 核心池大小
- maximumPoolSize 最大池大小
- keepAliveTime 存活時間
- TimeUnit 時間單元
- BlockingQueue
工作隊列 - ThreadFactory 線程工廠
- RejectedExecutionHandler 拒絕執行處理器
注意源碼,限定了設置這幾個值的范圍,不滿足就會報非法參數異常,當時博主就是將核心池的值設置比最大池的值大,報了這個異常,看了源碼才曉得:
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
corePoolSize(核心池大小)、maximum pool size(最大池的大小)和存活時間(keep-alive time)共同管理着線程的創建與銷毀.
corePoolSize: 線程池的實現試圖維護池的大小(Eexcutors.newSingleThreadExecutor就是一種實現);即時沒有執行任務,池的大小也等於核心池的大小,工作隊列充滿后會創建更多的線程.(Executors.newCacheThreadPool池的大小就是不固定的,隨着任務增減線程的數量).
maximumPoolSize:最大池的大小制約着線程池可以同時執行的的最大線程數(限制並發數量),如果一個線程閑置的時間超過了存活時間就會被回收.並且同時運行的線程的數量不能超過核心池大小,線程池會終止超過的數量.
keep alive time & TimeUniht: 存活時間保證了空閑的線程會被回收,這樣釋放了這部分被占有的資源可以做更有用的事情.
兩種特殊的線程池實現
Executors.newFixedThreadPool和newCachedThreadPool是兩種特殊的實現.
newFixedThreadPool設置了核心池和最大池的大小,而且永遠不會超時(不會回收線程).
newCachedThreadPool將最大池設置為了Integer.MAX_VALUE,核心池設置為0,超時設置為一分鍾,這樣創建出來的可無限擴大的線程池,會在需求量減少的情況下減少線程數量.
不要將核心池大小設置為0
我們自己手動創建ThreadPoolExecutor的時候,不要將corePoolSize設置為0,除非你用的是SynchronousQueue隊列(newCachedThreadPool用的就是),否則隊列不充滿,就不會執行.
管理任務隊列
用定長的線程池去替代為每個任務都創建一個線程的方式,在高負載的情況下,使得程序更不容易崩潰了,暫時沒時間處理的任務會放進阻塞隊列里,這是一個更優的方案,但是如果傳遞進來的任務超過處理的速速,程序仍然有可能崩潰.
過多的請求會使程序崩潰或者響應很慢
過多的請求會導致兩個問題:
- 耗盡內存
- 響應速度很慢(后請求的用戶需要等到前面的請求執行完)
隊列可以緩和上述問題.
隊列有助於緩和瞬時的任務激增,但是最終你還需要遏制請求的達到速率.
ThreadPoolExecutor的第五個參數是一個BlockingQueue阻塞隊列.隊列有三種:無限隊列、有限隊列和同步移交.隊列的選擇和很多其他的配置參數都有關系,比如池的大小.
newFixedThreadPool 和 newSingleThreadPool默認使用的是無界的LinkedBlockingQueue.任務無限多,隊列無限長.最后就是程序崩潰.
所以我們最終的選擇是使用有界隊列,例如ArrayBlockingQueue或者有限的LinkedBlockingQueue以及PriorityBlockingQueue(自定義優先級的隊列),有界隊列滿了以后有飽和策略可以處理那些沒放進隊列中的請求.
池的大小和隊列的大小相輔相成
一個大隊列加一個小池,可以控制對內存和CPU的使用,可以減少上下文切換,不過會影響吞吐量.
SynchronousQueue
對於龐大或者無限的池可以使用SynchronousQueue,這不是一個真正的隊列,原來的隊列可以理解為把任務存在一個容器里,線程放進去,線程取出來,而SynchronousQueue相當於是一個手遞手傳遞.
兩種情況可以使用SynchronousQueue:
- 任務可以被拒絕
- 池無限大
newCachedThreadPool使用的就是這個隊列.
newCachedThreadPool和定長線程池之間的選擇
如果不會因為負載過大導致程序崩潰就是用newCachedThreadPool,因為它的隊列用的是SynchronousQueue,它有更佳的吞吐量.
還有一點要特別注意,如果任務之間相互依賴(一個任務依賴於另一個任務的結果,像這樣的)最好使用newCachedThreadPool,否則可能產生線程飢餓死鎖.
反之,像互聯網應用程序還是應該使用定長的線程池.
飽和策略:處理未放入隊列的任務
ThreadPoolExecutor的第七個參數,
可以用過調用setRejectedExecutionHandler來修改(如果線程池關閉了也會用到飽和策略).
Java提供了幾種RejectedExecutionHandler實現:
- AbortPolicy
- CallerRunsPolicy
- DiscardPolicy
- DiscardOldestPolicy
默認的AbortPolicy會引起execute拋出未檢查的RejectedExecutionException;調用者可以捕獲這個異常,然后編寫滿足自己需求的處理代碼(例如:持久化這個任務,一會再執行).
DiscardPolicy策略會默認放棄這個任務;
DiscardOldestPolicy會放棄本該接下來執行的任務,同時還會嘗試去重新提交新任務.(無法和優先級隊列同時使用,遺棄的任務是優先級最高的)
CallerRunsPolicy(調用者運行)既不會丟棄任務也不會拋出異常,它會把任務推送會調用者那里,以減緩新任務流,它不會再池線程中執行最新提交的任務,會在調用線程池execute或submit方法的線程中執行.
public class ThreadPool {
private static class Worker implements Runnable{
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" is running");
}
}
public static void main(String [] args){
//Executors.newSingleThreadExecutor()
ExecutorService executorService = new ThreadPoolExecutor(2, 2, 1,
TimeUnit.MICROSECONDS, new ArrayBlockingQueue<Runnable>(2), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r);
}
},new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 5; i++) {
executorService.submit(new Worker());
}
}
}
打印輸出:
main is running
Thread-1 is running
Thread-0 is running
Thread-0 is running
Thread-1 is running
證明在main方法的線程中執行了最新的任務.
執行100次代碼中的循環會循環輸出,main 和 Thread-1 Thread-0,相當於加上主線程三個線程並發執行.
當所有的池線程都被占用,而且工作隊列已充滿后,下一個任務會在主線程中執行.主線程執行任務的時候會花費一些時間,這時候主線程是不能提交任何任務的,所以這也給工作者線程一些時間來追趕進度.
這期間主線程不會調用accept接受新的請求,而會在TCP層的隊列中等候.如果持續高負載的話,最終會由TCP層判斷它的鏈接請求隊列是否已經排滿,如果已滿就開始丟棄請求任務.
當服務器過載的時候,首先線程池里的所有線程都忙碌了起來,然后阻塞隊列滿了,接着TCP層滿了,最終就是用戶請求失敗.
這使得服務器在高負載下可以平緩地劣化(graceful degradation).
線程工廠
ThreadPoolExecutor構造方法的第六個參數.
ThreadFactory接口只有一個方法 newThread,在線程池需要創建一個新線程時使用的.
定制這個工廠的用處:
- 為池線程指明一個UncaughtExceptionHandler(上篇博客有解釋)
- 實例化一個定制Thread類的實例(例如給線程添加一個新的名稱)
- 修改線程的優先級(不要這樣做)
- 修改后台狀態(不要這樣做)
自定義線程池:
public class CustomThreadFactory implements ThreadFactory {
private String poolName;
public CustomThreadFactory(String poolName) {
this.poolName = poolName;
}
@Override
public Thread newThread(Runnable r) {
return new MyThread(r,poolName);
}
}
自定義線程:
public class MyThread extends Thread {
private final Logger log = Logger.getAnonymousLogger();
public MyThread(Runnable target,String name) {
super(target,name);
//線程異常終止的時候會得到記錄.
setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
log.info("Uncaught in thread"+t.getName()+e);
}
});
}
@Override
public void run(){
//可以做一些額外的日志記錄
super.run();
}
}
重新設置線程池的參數
ThreadPoolExecutor的屬性可以在創建后,通過setters方法重新設置.
如果線程池是通過Executors工廠方法創建的(除newSingleThreadExecutor以外),可以先轉型為ThreadPoolExecutor,然后在設置.
public class UpdateExecutor {
public static void main(String [] args){
Executor executor = Executors.newCachedThreadPool();
((ThreadPoolExecutor)executor).setCorePoolSize(111);
}
}
如果你不想你的線程池被修改可以使用Executors.unconfigurableExecutorService()方法.使線程池無法被修改
Executor executor = Executors.unconfigurableExecutorService(Executors.newCachedThreadPool());
擴展ThreadPoolExecutor
ThreadPoolExecutor提供了幾個函數讓子類去覆寫來擴展ThreadPoolExecutor
- beforeExecute
- afterExecute
- terminate
beforeExecute在任務執行前調用,afterExecute在任務執行后調用,可以用它們來寫日志.
無論任務是正常地從run返回,還是拋出一個異常,afterExecutor都會被調用(如果任務完成后拋出一個Error,afterExecute不會被調用).如果beforeExecutor拋出一個RuntimeException,任務不會被執行,afterExecutor也不會被調用.
terminated會在線程池關閉后調用.也就是當所有任務都已完成並且所有工作者線程都已經關閉后,會執行terminated.
示例:
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("beforeExecute方法執行了");
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("afterExecute方法執行了");
}
@Override
protected void terminated() {
System.out.println("terminated方法執行了");
}
public static void main(String[] args) throws InterruptedException {
CustomThreadPoolExecutor customThreadPoolExecutor = new CustomThreadPoolExecutor(1,
1, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3));
customThreadPoolExecutor.submit(()-> System.out.println("任務執行了"));
customThreadPoolExecutor.shutdown();
customThreadPoolExecutor.awaitTermination(1,TimeUnit.SECONDS);
}
}
輸出:
beforeExecute方法執行了
任務執行了
afterExecute方法執行了
terminated方法執行了
總結
對於並發執行的任務,Executor框架是強大且靈活的.它提供了大量可調節的選項,比如創建和關閉線程的策略,處理隊列任務的策略,並且提供了幾個鈎子函數用於擴展它的行為.然而,和大多數的框架一樣,草率地將一些設定組合在一起,並不能很好地工作;一些類型的任務需要特定的執行策略,而一些參數組合在一起后可能產生意外的后果.
下一篇會更新關於死鎖的博客.
再見.