hello world
pom依賴:(我這里用的boot版本是2.4.2)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
測試Controller:
@RestController
public class TestController {
@Autowired
ThreadPoolTaskExecutor threadPoolTaskExecutor;
@RequestMapping("test")
public String test() throws InterruptedException {
threadPoolTaskExecutor.execute(()->{
System.out.println("開始");
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("結束");
});
return "hello";
}
}
啟動項目后,我們發現ThreadPoolTaskExecutor可以注入到TestController,說明這個類也是被boot自動配置的,並且該方法execute確實是異步執行的
@EnableAsync和@Async
無返回值異步
我們在啟動類上標注@EnableAsync注解
@EnableAsync
@SpringBootApplication
public class BootAsyncApplication {
public static void main(String[] args) {
SpringApplication.run(BootAsyncApplication.class, args);
}
}
編寫controller和service
@RestController
public class TestController {
@Autowired
private TestService testService;
@Async
@RequestMapping("test")
public void test() throws InterruptedException {
System.out.println("開始");
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("結束");
}
@RequestMapping("test2")
public String test2(){
testService.ok();
return "ok";
}
}
@Service
public class TestService {
@Async
public void ok() {
System.out.println("service開始");
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("service結束");
}
}
我在controller的一個方法和service的一個方法上加上了@Async注解,調用接口觀察效果
訪問接口后,立馬返回結果,后台線程異步執行並打印結果。
有返回值異步
上面測試的是無返回值異步調用。
下面介紹有返回值異步調用,並接受返回值
我們知道在juc中通過Future和Callable可以獲取異步調用返回值,spring中支持了AsyncResult類支持獲取方法返回值,但其實該類實現的也是Future。
修改test2方法:
@RequestMapping("test2")
public String test2() throws ExecutionException, InterruptedException {
Future<String> ok = testService.ok();
//Future的get會被阻塞,需要注意
return ok.get();
}
@Service
public class TestService {
@Async
public Future<String> ok() {
System.out.println("Execute method asynchronously - "
+ Thread.currentThread().getName());
try {
Thread.sleep(5000);
return new AsyncResult<>("hello world !!!!");
} catch (InterruptedException ignored) {
}
return null;
}
}
測試效果:
自動配置類:TaskExecutionAutoConfiguration
大致就是先創建一個TaskExecutorBuilder,然后通過它去創建一個ThreadPoolTaskExecutor。
注意了,如果容器中有Executor,那么默認的ThreadPoolTaskExecutor就不會自動創建了。
注意:這里有一個大坑!!!我們注意到,默認的ThreadPoolTaskExecutor的參數是從TaskExecutionProperties.Pool中取得的,點進去。
竟然發現阻塞隊列大小和線程數最大值都是Integer.MAX_VALUE,這種配置極其容易造成生產事故(生產任務大量堆積,OOM異常等)
所以一定不要使用springboot默認配置的線程池!!!
自定義springboot線程池
springboot給了我們兩種解決方案
配置參數
我們可以通過配置spring.task.execution來修改配置
spring.task.execution.pool.core-size=10
spring.task.execution.pool.max-size=100
spring.task.execution.pool.queue-capacity=30
自定義線程池
這種方式比上一種更好,我們可以自定義線程池,定制化更好,包括我們可以自定義線程異常處理器。
我們可以實現AsyncConfigurer接口,並重寫接口中的方法
@Slf4j
@EnableAsync
@Configuration
public class ThreadPoolConfig implements AsyncConfigurer {
@Bean("WjTaskExecutor")
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(5);
threadPoolTaskExecutor.setMaxPoolSize(50);
threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
threadPoolTaskExecutor.setQueueCapacity(20);
threadPoolTaskExecutor.setKeepAliveSeconds(300);
threadPoolTaskExecutor.setThreadNamePrefix("wj-");
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//設置線程池關閉的時候等待所有任務都完成再繼續銷毀其他的bean
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
//設置線程池中任務的等待時間,如果超過這個時候還沒有就強制銷毀,以確保應用最后能夠關閉而不是阻塞住
threadPoolTaskExecutor.setAwaitTerminationSeconds(60);
return threadPoolTaskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params)->{
log.error("調用異步方法異常: \n" + method, ex);
};
}
}
修改controller的test方法:
@Async
@RequestMapping("test")
public void test() throws InterruptedException {
System.out.println("開始");
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
int i = 1/0;
System.out.println("結束");
}
測試: