淺析如何使用Spring的@Async異步任務、自定義線程池及異常處理


  在項目中,當訪問其他人的接口較慢或者做耗時任務時,不想程序一直卡在耗時任務上,想程序能夠並行執行,我們可以使用多線程來並行的處理任務,也可以使用spring提供的異步處理方式@Async。

  Spring 是通過任務執行器(TaskExecutor)來實現多線程和並發編程,使用 ThreadPoolTaskExecutor 來創建一個基於線程池的 TaskExecutor。在使用線程池的大多數情況下都是異步非阻塞的。我們配置注解 @EnableAsync 可以開啟異步任務,然后在實際執行的方法上配置注解 @Async 上聲明是異步任務。通過 @Async 注解表明該方法是異步方法,如果注解在類上,那表明這個類里面的所有方法都是異步的。

一、如何使用

1、開啟異步支持  ——  使用 @EnableAsync 啟用異步注解

@Configuration @EnableAsync public class SpringAsyncConfig { ... }

2、異步處理方式  ——  @Async注解使用

(1)無返回值  ——  調用之后,不返回任何數據。無返回值的話,和常規寫法沒什么不同。

@Async public void asyncMethodWithVoidReturnType() {   System.out.println("Execute method asynchronously. " + Thread.currentThread().getName()); }

(2)有返回值  ——  調用之后,返回數據,通過Future來獲取返回數據。有返回值的話,需要將返回值包在 Future 對象中。Future 對象是專門存放異步響應的一個接口。

  異步調用返回數據,Future表示在未來某個點獲取執行結果,返回數據類型可以自定義

 @Async public Future<String> dealHaveReturnTask() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } JSONObject jsonObject = new JSONObject(); jsonObject.put("thread", Thread.currentThread().getName()); jsonObject.put("time", System.currentTimeMillis()); return new AsyncResult<String>(jsonObject.toJSONString()); }

  測試類用 isCancelled 判斷異步任務是否取消,isDone 判斷任務是否執行結束

 @Test public void testDealHaveReturnTask() throws Exception { Future<String> future = asyncTask.dealHaveReturnTask(); log.info("begin to deal other Task!"); while (true) { if(future.isCancelled()){ log.info("deal async task is Cancelled"); break; } if (future.isDone() ) { log.info("deal async task is Done"); log.info("return result is " + future.get()); break; } log.info("wait async task to end ..."); Thread.sleep(1000); } }

  日志打印如下,我們可以看出任務一直在等待異步任務執行完畢,用 future.get() 來獲取異步任務的返回結果。

begin to deal other Task! wait async task to end ... wait async task to end ... wait async task to end ... wait async task to end ... deal async task is Done return result is {"thread":"AsyncExecutorThread-1","time":1499752617330}

二、定義線程池

1、第一步,先在Spring Boot主類中定義一個線程池,比如

@SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } @EnableAsync @Configuration class TaskPoolConfig { @Bean("taskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setQueueCapacity(200); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("taskExecutor-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } } }

  上面我們通過使用 ThreadPoolTaskExecutor 創建了一個線程池,同時設置了以下這些參數:

  • 核心線程數10:線程池創建時候初始化的線程數
  • 最大線程數20:線程池最大的線程數,只有在緩沖隊列滿了之后才會申請超過核心線程數的線程
  • 緩沖隊列200:用來緩沖執行任務的隊列
  • 允許線程的空閑時間60秒:當超過了核心線程出之外的線程在空閑時間到達之后會被銷毀
  • 線程池名的前綴:設置好了之后可以方便我們定位處理任務所在的線程池
  • 線程池對拒絕任務的處理策略:這里采用了CallerRunsPolicy策略,當線程池沒有處理能力的時候,該策略會直接在 execute 方法的調用線程中運行被拒絕的任務;如果執行程序已關閉,則會丟棄該任務

2、如何使用線程池

  在定義了線程池之后,我們如何讓異步調用的執行任務使用這個線程池中的資源來運行呢?方法非常簡單,我們只需要在@Async注解中指定線程池名即可,比如

    @Async("taskExecutor") public void doTaskOne() throws Exception { log.info("開始做任務一"); long start = System.currentTimeMillis(); Thread.sleep(random.nextInt(10000)); long end = System.currentTimeMillis(); log.info("完成任務一,耗時:" + (end - start) + "毫秒"); }

三、異常處理

  默認的,打開異步開關后,Spring 會使用一個 SimpleAsyncTaskExecutor 作為線程池,該線程默認的並發數是不受限制的。所以每次異步方法來,都會獲取一個新線程去運行它。

1、AsyncConfigurer 接口

  Spring 4 中,對異步方法可以做一些配置,將配置類實現 AsyncConfigurer 接口后,可以實現自定義線程池的功能,和統一處理異步方法的異常。

(1)如果不限制並發數,可能會造成系統壓力。AsyncConfigurer 接口中的方法 Executor getAsyncExecutor() 實現自定義線程池,控制並發數。

(2)AsyncConfigurer 接口中的方法 public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() 用於處理異步方法的異常。

  AsyncUncaughtExceptionHandler 接口,只有一個方法:void handleUncaughtException(Throwable ex, Method method, Object… params);

  因此,AsyncUncaughtExceptionHandler 接口可以認為是一個函數式接口,可以用拉姆達表達式實現該接口。

2、代碼示例

(1)我們可以實現AsyncConfigurer接口,也可以繼承AsyncConfigurerSupport類來實現,在方法getAsyncExecutor()中創建線程池的時候,必須使用 executor.initialize(),不然在調用時會報線程池未初始化的異常。

  如果使用threadPoolTaskExecutor()來定義bean,則不需要初始化。

@Configuration @EnableAsync @Slf4j public class AsyncConfig implements AsyncConfigurer { // @Bean // public ThreadPoolTaskExecutor threadPoolTaskExecutor(){ // ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // executor.setCorePoolSize(10); // executor.setMaxPoolSize(100); // executor.setQueueCapacity(100); // return executor; // }
  // 自定義線程池,控制並發數,將線程池的大小設置成只有10個線程 @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(100); executor.setQueueCapacity(100); executor.setThreadNamePrefix("AsyncExecutorThread-"); executor.initialize(); //如果不初始化,導致找到不到執行器
        return executor; }
  // 統一處理異常 @Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new AsyncExceptionHandler(); } }

(2)異步異常處理類:

@Slf4j public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler { @Override public void handleUncaughtException(Throwable ex, Method method, Object... params) { log.info("Async method: {} has uncaught exception,params:{}", method.getName(), JSON.toJSONString(params)); if (ex instanceof AsyncException) { AsyncException asyncException = (AsyncException) ex; log.info("asyncException:{}",asyncException.getErrorMessage()); } log.info("Exception :"); ex.printStackTrace(); } } @Data @AllArgsConstructor public class AsyncException extends Exception { private int code; private String errorMessage; }

  2.1、在無返回值的異步調用中,異步處理拋出異常,AsyncExceptionHandler的handleUncaughtException()會捕獲指定異常,原有任務還會繼續運行,直到結束。

  2.2、在有返回值的異步調用中,異步處理拋出異常,會直接拋出異常,異步任務結束,原有處理結束執行。

參考文章:https://segmentfault.com/a/1190000010142962


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM