前言
SpringBoot使用異步線程池:
1、編寫線程池配置類,自定義一個線程池;
2、定義一個異步服務;
3、使用@Async注解指向定義的線程池;
這里以我工作中使用過的一個案例來做描述,我所在公司是醫療行業,敏感數據需要上報到某監管平台,所以有一個定時任務在流量較小時(一般是凌晨后)執行上報行為。但特殊時期會存在一定要在工作時間大批量上報數據的情況,且要求短時間內就要完成,此時就考慮寫一個專門的異步上報接口手動執行,利用線程池上報,極大提高了速度。
-
編寫線程池配置類
import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** * 類名稱:ExecutorConfig * ******************************** * <p> * 類描述:線程池配置 * * @author guoj * @date 2021-09-07 09:00 */ @Configuration @EnableAsync @Slf4j public class ExecutorConfig { /** * 定義數據上報線程池 * @return */ @Bean("dataCollectionExecutor") public Executor dataCollectionExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 核心線程數量:當前機器的核心數 executor.setCorePoolSize( Runtime.getRuntime().availableProcessors()); // 最大線程數 executor.setMaxPoolSize( Runtime.getRuntime().availableProcessors() * 2); // 隊列大小 executor.setQueueCapacity(Integer.MAX_VALUE); // 線程池中的線程名前綴 executor.setThreadNamePrefix("sjsb-"); // 拒絕策略:直接拒絕 executor.setRejectedExecutionHandler( new ThreadPoolExecutor.AbortPolicy()); // 執行初始化 executor.initialize(); return executor; } }
PS:
1)、需要注意,這里一定要自己定義ThreadPoolTaskExecutor線程池,否則springboot的異步注解會執行默認線程池,存在線程阻塞導致CPU飆高及內存溢出的風險。這一點可以參考阿里開發手冊,線程池定義這塊明確提到了這一點;
2)、在@Bean注解中定義線程池名稱,后面異步注解會用到。
-
編寫異步服務
/** * 異步方法的服務, 不影響主程序運行。 */ @Service public class AsyncService { private final Logger log = LoggerFactory.getLogger(AsyncService.class); /** * 發送短信 */ @Async("sendMsgExecutor") public void sendMsg(String access_token, Consult item, Map<String, String> configMap) { // 此處編寫發送短信業務 // 1、buildConsultData(); // 2、sendMsg(); } /** * 發送微信訂閱消息 */ @Async public void sendSubscribeMsg(String access_token, Consult item, Map<String, String> configMap) { // 此處編寫發送微信訂閱消息業務 // 1、buildConsultData(); // 2、sendSubscribeMsg(); } /** * 數據並上報 */ @Async("dataCollectionExecutor") public void buildAndPostData(String access_token, Consult item, Map<String, String> configMap) { // 此處編寫上報業務,如拼接數據,然后執行上報。 // 1、buildConsultData(); // 2、postData(); } }
-
異步批量上報數據
@Autowired private AsyncService asyncService; /** * 手動上報問診記錄,線程池方式。 */ public void manualUploadConsultRecordsAsync(String channel, Date startTime, Date endTime) { // 查詢指定時間內的問診記錄 List<Consult> consultList = consultService .findPaidListByChannelAndTime(channel, startTime, endTime, configMap.get("serviceId")); if (!CollectionUtils.isEmpty(consultList)) { log.debug("[SendWZDataService][manualUploadConsultRecordsAsync]>>>> 手動上報問診記錄, 一共[{}]條", consultList.size()); consultList.forEach((item) -> { try { // 異步調用,使用線程池。 asyncService.buildAndPostData(access_token, item, configMap); } catch (Exception ex) { log.error("[SendWZDataService][manualUploadConsultRecordsAsync]>>>> 手動上報問診記錄發生異常: ", ex); } }); } }
-
總結
以上方式已經在生產環境運行,在工作時間內執行過很多次,一次數萬條記錄基本是幾分鍾內就全部上報完畢,而正常循環遍歷時一次大概需要半個小時左右。 線程池的使用方式往往來源於業務場景,如果類似的業務不存在緊急處理的情況,大體還是以任務調度執行為主,因為更安全。如果存在緊急處理的情況,那么使用SpringBoot+線程池的方式不僅能節省非常多的時間,且不占用主線程的執行空間。 喜歡就點個關注吧~~
-