ThreadPoolExecutor:=======這個是java自己實現的線程池執行類,基本上創建線程池都是通過這個類進行的創建。
ThreadPoolTaskExecutor:========這個是springboot基於ThreadPoolExecutor實現的一個線程池執行類,包裝類。
1、Spring默認的@Async用線程池名字為SimpleAsyncTaskExecutor。
2、Spring異步線程池的接口類是TaskExecutor,本質還是java.util.concurrent.Executor,沒有配置的情況下,默認使用的是simpleAsyncTaskExecutor。
3、標注@Async注解的方法和調用的方法一定不能在同一個類下,這樣的話注解會失效,具體原因不多說
注意:
在springboot當中,如果沒有配置線程池的話,springboot會自動配置一個ThreadPoolTaskExecutor線程池到bean當中,我們調用只需要
@Autowired
ThreadPoolTaskExecutor threadPoolTaskExecutor;
第一步: 首先在application啟動類添加@EnableAsync
@SpringBootApplication @EnableAsync //首先在application啟動類添加@EnableAsync public class ThreadpoolApplication { public static void main(String[] args) { SpringApplication.run(ThreadpoolApplication.class, args); } }
第二步:配置線程池,不配置的話使用springboot默認的線程池。
package com.aswatson.csc.task.conf; import java.util.concurrent.Executor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @Configuration @EnableAsync public class AsyncThreadConfiguration { @Bean("kafkaThreadExecutor") public Executor asyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10);//核心線程數 executor.setMaxPoolSize(20);//最大線程數 executor.setKeepAliveSeconds(60);//空閑線程存活時間 executor.setThreadNamePrefix("kafkaThreadAsync-"); executor.initialize(); return executor; } }
第三步:測試1:在需要異步執行的方法上加上@Async注解。
@Service public class AsyncTest { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @Async public void hello(String name){ customerEventLogMapper.insert(customerEventLog); logger.info("異步線程啟動 started."+name); } }
第四步:測試2:使用注入的模式:
package com.example.apidemo.completableFutrue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import java.time.LocalDateTime; @Service public class AsyncService { @Autowired ThreadPoolTaskExecutor threadPoolTaskExecutor; public void addEventLog(String buId, String status){ CustomerEventLogPO customerEventLog = new CustomerEventLogPO(); customerEventLog.setUuid(uuid); customerEventLog.setStatus(status); customerEventLog.setCreated(LocalDateTime.now()); customerEventLogMapper.insert(customerEventLog); threadPoolTaskExecutor.submit(new Thread(()->{ customerEventLogMapper.insert(customerEventLog); })); //submit有返回值 threadPoolTaskExecutor.execute(new Thread(()->{ customerEventLogMapper.insert(customerEventLog); })); //execute無返回值 } }
注意: 如果配置多個線程池,該如何指定線程池呢?
方式1: @Resources("kafkaThreadExecutor")。
方式2: 如果有多個線程池,但是在@Async注解里面沒有指定的話,會默認加載第一個配置的線程池。
======================================================================================================================================================================
另外需要注意的是:關於注解失效需要注意以下幾點:
- 注解的方法必須是public方法
- 方法一定要從另一個類中調用,也就是從類的外部調用,類的內部調用是無效的,因為@Transactional和@Async注解的實現都是基於Spring的AOP,而AOP的實現是基於動態代理模式實現的。那么注解失效的原因就很明顯了,有可能因為調用方法的是對象本身而不是代理對象,因為沒有經過Spring容器。
- 異步方法使用注解@Async的返回值只能為void或者Future
==================================================================================================================================================================================================
// @Bean()的拒絕策略:
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
拒絕策略:如果(總任務數 - 核心線程數 - 任務隊列數)-(最大線程數 - 核心線程數)> 0 的話,則會出現線程拒絕。舉例:( 12 - 5 - 2 ) - ( 8 - 5 ) > 0,會出現線程拒絕。線程拒絕又分為 4 種策略,分別為:
-
- CallerRunsPolicy():交由調用方線程運行,比如 main 線程。
- AbortPolicy():直接拋出異常。
- DiscardPolicy():直接丟棄。
- DiscardOldestPolicy():丟棄隊列中最老的任務。