1. @Async可以開啟異步,但是要在 main 中EnableAsync
2.@Async既可以注解在方法上,也可以注解到類上
3.使用@Async時,請注意一定要對應bean name,否則或調用系統默認的SampleTaskExecutor,容易造成OOM
4.本人使用的SpringBoot 2.3.4 ,默認值 maxPoolSize = 2147483647,queueCapacity = 2147483647, 建議在初始化時設置corePoolSize即可(百度到的例子中,大多數沒有講這一塊)
5.線程池對拒絕任務的處理策略處理,默認為 new ThreadPoolExecutor.CallerRunsPolicy(),建議使用 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
6.如果Executor后台線程池還沒有完成Callable的計算,這時調用返回Future對象的get()方法,會阻塞直到計算完成。
我為什么要在這里重點提第四點和第五點,目前百度到的大多文章都是相互抄的,在定義executor主動定義了queueCapacity ,maxPoolSize 並沒有去看源碼中對於queueCapacity ,maxPoolSize 的處理。
我的建議是,這倆值無需自定義,為了提高多線程的並發效率,可以考慮直接放大corePoolSize。
關於executort的使用代碼我就不在此處多講,各位可以用此代碼,測試系統中指定bean的taskExecutor中到底有多少任務在執行。
getBean見 https://www.jianshu.com/p/3cd2d4e73eb7
使用方式如下
@Component @Slf4j public class TaskSchedule { @Autowired ApplicationContextProvider applicationContextProvider; // @Scheduled(fixedRate = 2000L, initialDelay = 5) public void getTaskExecutorState(){ Class<ThreadPoolTaskExecutor> clas = ThreadPoolTaskExecutor.class; ThreadPoolTaskExecutor threadPoolTaskExecutor = applicationContextProvider.getBean("taskExecutor", clas); ThreadPoolExecutor threadPoolExecutor = threadPoolTaskExecutor.getThreadPoolExecutor(); log.info("{}, taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}], MaximumPoolSize[{}], largestPoolSize[{}]", threadPoolTaskExecutor.getThreadNamePrefix(), threadPoolExecutor.getTaskCount(), threadPoolExecutor.getCompletedTaskCount(), threadPoolExecutor.getActiveCount(), threadPoolExecutor.getQueue().size(), threadPoolExecutor.getMaximumPoolSize(), threadPoolExecutor.getLargestPoolSize()); } }
controller
@Autowired
private AsyncTask task;
@Autowired
private TaskSchedule taskSchedule;
@PostMapping("/consume") @ResponseBody public JSONObject consume(@RequestBody JSONObject params) throws InterruptedException, ExecutionException { count ++; JSONObject jsonObject = new JSONObject(); log.info("params flag {} ",params.getString("flag")); log.info("名稱 {}", params.getString("loginid")); jsonObject.put("loginidis",params.getString("loginid")); jsonObject.put("count", count); Future<String> task4 = task.task4(count); taskSchedule.getTaskExecutorState(); // task.task4(); // log.info("Future<String> {}", task4.get()); //調用返回Future對象的get()方法,會阻塞直到計算完成 // task.getTest1(); return jsonObject; }
import cn.hutool.core.util.RandomUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Component; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * 功能描述:異步任務業務類(@Async也可添加在方法上) */ @Component @Async("taskExecutor") @Slf4j public class AsyncTask { //獲取異步結果 public Future<String> task4(int index) throws InterruptedException { log.info("開始執行任務 task4 index:{}",index); long begin = System.currentTimeMillis(); // Thread.sleep(1000L*60*2); // int sleepTime = RandomUtil.randomInt(1000*60*3, 1000*60*5); int sleepTime = RandomUtil.randomInt(1000*30, 1000*60); log.info(" sleepTime is {}",sleepTime); Thread.sleep(sleepTime); long end = System.currentTimeMillis(); log.info("任務4執行完畢 index:"+index+" 耗時=" + (end - begin)); return new AsyncResult<String>("任務4"); } }
各位可以在代碼中注釋掉
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
或者使用不同的拒絕策略測試效果。
如本人設置的參數core=3, max=5, queue=10, 通過postman構造對應的請求,會在第16個請求開始阻塞,由接收請求的線程本身http-nio-80-exec負責執行任務,其執行時間即postman請求消耗的時間
http-nio-80-exec即SpringBoot中tomcat本身默認的executor。
關於拒絕策略可參考:https://www.jianshu.com/p/f3322daa2ad0
import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; @Configuration @Slf4j public class ThreadPoolTaskConfig { private static final int corePoolSize = 2; // 核心線程數(默認線程數)線程池創建時候初始化的線程數 private static final int maxPoolSize = 5; // 最大線程數 線程池最大的線程數,只有在緩沖隊列滿了之后才會申請超過核心線程數的線程 private static final int keepAliveTime = 10; // 允許線程空閑時間(單位:默認為秒)當超過了核心線程之外的線程在空閑時間到達之后會被銷毀 private static final int queueCapacity = 10; // 緩沖隊列數 用來緩沖執行任務的隊列 private static final String threadNamePrefix = "Async-Service-"; // 線程池名前綴 方便我們定位處理任務所在的線程池 @Bean("taskExecutor") // bean的名稱,默認為首字母小寫的方法名 // public ThreadPoolTaskExecutor taskExecutor(){ public ThreadPoolTaskExecutor taskExecutor(){ // public AsyncTaskExecutor taskExecutor(){ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); // executor.setKeepAliveSeconds(keepAliveTime); executor.setThreadNamePrefix(threadNamePrefix); // 線程池對拒絕任務的處理策略 采用了CallerRunsPolicy策略,當線程池沒有處理能力的時候,該策略會直接在 execute 方法的調用線程中運行被拒絕的任務;如果執行程序已關閉,則會丟棄該任務 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // executor.setRejectedExecutionHandler( // new RejectedExecutionHandler(){ // @Override // public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // try { // //繼續加入阻塞隊列執行,可自定義 // log.info("繼續加入阻塞隊列執行,可自定義"); // executor.getQueue().put(r); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } // } // // ); // 初始化 executor.initialize(); return executor; } }