ThreadPoolTaskExecutor使用詳解(轉)


當並發或者異步操作,都會用到ThreadPoolTaskExecutor。現在對線程池稍作理解。

/***
*@Auth dzb
*@Date 22:29 2018/8/29
*@Description: 線程池
*@Version 1.0
*/
@Configuration
public class AsynTaskExecutePool implements AsyncConfigurer {

@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);//核心池大小
executor.setMaxPoolSize(100);//最大線程數
executor.setQueueCapacity(1000);//隊列程度
executor.setKeepAliveSeconds(1000);//線程空閑時間
executor.setThreadNamePrefix("tsak-asyn");//線程前綴名稱
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());//配置拒絕策略
return executor;
}

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {

return new AsyncUncaughtExceptionHandler() {
@Override
public void handleUncaughtException(Throwable throwable, Method method, Object... Object) {
System.err.println("error=================="+throwable.getMessage());
System.err.println("Method=================="+method.getName());
System.err.println("Object=================="+Object);
}
};

}
}
rejectedExecutionHandler字段用於配置拒絕策略,常用的拒絕策略如下:

AbortPolicy,用於被拒絕任務的處理程序,它將拋出RejectedExecutionException。
CallerRunsPolicy,用於被拒絕任務的處理程序,它直接在execute方法的調用線程中運行被拒絕的任務。
DiscardOldestPolicy,用於被拒絕任務的處理程序,它放棄最舊的未處理請求,然后重試execute。
DiscardPolicy,用於被拒絕任務的處理程序,默認情況下它將丟棄被拒絕的任務。
其他說明:
如果有特殊的業務需求,用戶可以選擇使用自定義策略,只需實現RejectedExecutionHandler接口即可。
配置threadNamePrefix屬性,出問題可以隨時排查。    
提交任務:
void execute(Runnable command)
  如果實現使用異步執行策略,則調用可能立即返回,或者在同步執行的情況下可能會阻塞。
public java.util.concurrent.Future <?> submit(java.lang.Runnable task)
提交Runnable任務以執行,接收表示該任務的Future。未來將null在完成后返回結果。
處理流程
當一個任務被提交到線程池時,首先查看線程池的核心線程是否都在執行任務,否就選擇一條線程執行任務,是就執行第二步。
查看核心線程池是否已滿,不滿就創建一條線程執行任務,否則執行第三步。
查看任務隊列是否已滿,不滿就將任務存儲在任務隊列中,否則執行第四步。
查看線程池是否已滿,不滿就創建一條線程執行任務,否則就按照策略處理無法執行的任務。
在ThreadPoolExecutor中表現為:

如果當前運行的線程數小於corePoolSize,那么就創建線程來執行任務(執行時需要獲取全局鎖)。
如果運行的線程大於或等於corePoolSize,那么就把task加入BlockQueue。
如果創建的線程數量大於BlockQueue的最大容量,那么創建新線程來執行該任務。
如果創建線程導致當前運行的線程數超過maximumPoolSize,就根據飽和策略來拒絕該任務。
關閉線程池
showdow()實現的代碼:

if (this.executor != null) {
if (this.waitForTasksToCompleteOnShutdown) {
this.executor.shutdown();//中斷未執行完的線程
} else {
Iterator var1 = this.executor.shutdownNow().iterator();
while(var1.hasNext()) {
Runnable remainingTask = (Runnable)var1.next();
this.cancelRemainingTask(remainingTask);//取消所有剩下需要執行的線程
}
}
調用shutdown或者shutdownNow,兩者都不會接受新的任務,而且通過調用要停止線程的interrupt方法來中斷線程,有可能線程永遠不會被中斷,不同之處在於shutdownNow會首先將線程池的狀態設置為STOP,然后嘗試停止所有線程(有可能導致部分任務沒有執行完)然后返回未執行任務的列表。而shutdown則只是將線程池的狀態設置為shutdown,然后中斷所有沒有執行任務的線程,並將剩余的任務執行完。

配置線程個數
如果是CPU密集型任務,那么線程池的線程個數應該盡量少一些,一般為CPU的個數+1條線程。
如果是IO密集型任務,那么線程池的線程可以放的很大,如2*CPU的個數。
對於混合型任務,如果可以拆分的話,通過拆分成CPU密集型和IO密集型兩種來提高執行效率;如果不能拆分的的話就可以根據實際情況來調整線程池中線程的個數。
監控線程池狀態
常用狀態:

taskCount:線程需要執行的任務個數。
completedTaskCount:線程池在運行過程中已完成的任務數。
largestPoolSize:線程池曾經創建過的最大線程數量。
getPoolSize獲取當前線程池的線程數量。
getActiveCount:獲取活動的線程的數量
通過繼承線程池,重寫beforeExecute,afterExecute和terminated方法來在線程執行任務前,線程執行任務結束,和線程終結前獲取線程的運行情況,根據具體情況調整線程池的線程數量。

 

rejectedExecutionHandler字段用於配置拒絕策略,常用的拒絕策略如下:

AbortPolicy,用於被拒絕任務的處理程序,它將拋出RejectedExecutionException。
CallerRunsPolicy,用於被拒絕任務的處理程序,它直接在execute方法的調用線程中運行被拒絕的任務。
DiscardOldestPolicy,用於被拒絕任務的處理程序,它放棄最舊的未處理請求,然后重試execute。
DiscardPolicy,用於被拒絕任務的處理程序,默認情況下它將丟棄被拒絕的任務。
其他說明:

為了實現某些特殊的業務需求,用戶可以選擇使用自定義策略,只需實現RejectedExecutionHandler接口即可。
建議配置threadNamePrefix屬性,出問題時可以更方便的進行排查。
提交任務
無返回值的任務使用execute(Runnable)
有返回值的任務使用submit(Runnable)
處理流程
當一個任務被提交到線程池時,首先查看線程池的核心線程是否都在執行任務,否就選擇一條線程執行任務,是就執行第二步。
查看核心線程池是否已滿,不滿就創建一條線程執行任務,否則執行第三步。
查看任務隊列是否已滿,不滿就將任務存儲在任務隊列中,否則執行第四步。
查看線程池是否已滿,不滿就創建一條線程執行任務,否則就按照策略處理無法執行的任務。
在ThreadPoolExecutor中表現為:

如果當前運行的線程數小於corePoolSize,那么就創建線程來執行任務(執行時需要獲取全局鎖)。
如果運行的線程大於或等於corePoolSize,那么就把task加入BlockQueue。
如果創建的線程數量大於BlockQueue的最大容量,那么創建新線程來執行該任務。
如果創建線程導致當前運行的線程數超過maximumPoolSize,就根據飽和策略來拒絕該任務。
關閉線程池
調用shutdown或者shutdownNow,兩者都不會接受新的任務,而且通過調用要停止線程的interrupt方法來中斷線程,有可能線程永遠不會被中斷,不同之處在於shutdownNow會首先將線程池的狀態設置為STOP,然后嘗試停止所有線程(有可能導致部分任務沒有執行完)然后返回未執行任務的列表。而shutdown則只是將線程池的狀態設置為shutdown,然后中斷所有沒有執行任務的線程,並將剩余的任務執行完。

配置線程個數
如果是CPU密集型任務,那么線程池的線程個數應該盡量少一些,一般為CPU的個數+1條線程。
如果是IO密集型任務,那么線程池的線程可以放的很大,如2*CPU的個數。
對於混合型任務,如果可以拆分的話,通過拆分成CPU密集型和IO密集型兩種來提高執行效率;如果不能拆分的的話就可以根據實際情況來調整線程池中線程的個數。
監控線程池狀態
常用狀態:

taskCount:線程需要執行的任務個數。
completedTaskCount:線程池在運行過程中已完成的任務數。
largestPoolSize:線程池曾經創建過的最大線程數量。
getPoolSize獲取當前線程池的線程數量。
getActiveCount:獲取活動的線程的數量
通過繼承線程池,重寫beforeExecute,afterExecute和terminated方法來在線程執行任務前,線程執行任務結束,和線程終結前獲取線程的運行情況,根據具體情況調整線程池的線程數量。

參考來源:

[1] https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.html

參考鏈接:https://blog.csdn.net/foreverling/article/details/78073105

參考鏈接:https://blog.csdn.net/sdmxdzb/article/details/82193847


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM