Spring中基於@Async的異步線程池構建與使用
在處理隊列中的請求或者與第三方系統的交互時,異步處理較為常見,為充分利用系統資源,常規多采用構建線程池的方式,但線程池的構建成本高、代碼維護困難;Spring 3.x 引入了@Async可完美解決這類異步處理難題,簡潔,易用,可讀性強。本文就以實際應用中,處理redis隊列中異步請求為例,結合前輩們的總結和自己的實際應用,簡要概述@Async在實際應用的特點。
關於異步調用
何為異步調用
同步就是整個處理過程順序執行,當各個過程都執行完畢,並返回結果。
異步調用則是只是發送了調用的指令,調用者無需等待被調用的方法完全執行完畢;而是繼續執行下面的流程。
常規的多線程處理均為異步調用,例如, 在某個調用中,需要順序調用 A, B, C三個過程方法;如他們都是同步調用,則需要將他們都順序執行完畢之后,方算作過程執行完畢; 如B為一個異步的調用方法,則在執行完A之后,調用B,並不等待B完成,而是執行開始調用C,待C執行完畢之后,就意味着這個過程執行完畢了。
常規的異步調用處理方式
在Java中,一般在處理類似的場景之時,都是基於創建獨立的線程去完成相應的異步調用邏輯,通過主線程和不同的線程之間的執行流程,從而在啟動獨立的線程之后,主線程繼續執行而不會產生停滯等待的情況。
@Async介紹
在Spring中,基於@Async標注的方法,稱之為異步方法;這些方法將在執行的時候,將會在獨立的線程中被執行,調用者無需等待它的完成,即可繼續其他的操作。
@Async的構建與使用
基於注解的使用方法
關於基於xml的使用方式,不做贅述,基於注解的使用方式包括如下三步:
- 啟動類加上@EnableAsync
- 配置類中完成異步線程池TaskExecutor的導入
- 需要異步調用的方法加上@Async
異步線程池TaskExecutor
異步線程池接口TaskExecutor繼承JDK的Executor,只是在Spring框架內部完成該並發執行接口的重新定義。其實現類與接口層級圖如下:
這里最常用的是ThreadPoolTaskExecutor ,其實質是對java.util.concurrent.ThreadPoolExecutor的包裝,推薦使用。
配置類中自定義異步線程池:
/** * 自定義異步線程池 * @return */ @Bean public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setThreadNamePrefix("Anno-Executor"); executor.setMaxPoolSize(10); // 設置拒絕策略 executor.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // ..... } }); // 使用預定義的異常處理類 // executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; }
@Async定義異步任務
使用@Async定義異步任務包括如下三種方式:
- 最簡單的異步調用,返回值為void
- 帶參數的異步調用異步方法
- 異步調用返回Future
代碼示例:
@Component public class AsyncDemo { private static final Logger log = LoggerFactory.getLogger(AsyncDemo.class); /** * 最簡單的異步調用,返回值為void */ @Async public void asyncInvokeSimplest() { log.info("asyncSimplest"); } /** * 帶參數的異步調用 異步方法可以傳入參數 * * @param s */ @Async public void asyncInvokeWithParameter(String s) { log.info("asyncInvokeWithParameter, parementer={}", s); } /** * 異步調用返回Future * * @param i * @return */ @Async public Future<String> asyncInvokeReturnFuture(int i) { log.info("asyncInvokeReturnFuture, parementer={}", i); Future<String> future; try { Thread.sleep(1000 * 1); future = new AsyncResult<String>("success:" + i); } catch (InterruptedException e) { future = new AsyncResult<String>("error"); } return future; } }
調用示例:
asyncDemo.asyncInvokeSimplest(); asyncDemo.asyncInvokeWithException("test"); Future<String> future = asyncDemo.asyncInvokeReturnFuture(100); System.out.println(future.get());
對異步方法的異常處理
在調用方法時,可能出現方法中拋出異常的情況。在異步中主要有有兩種異常處理方法:
- 對於方法返回值是Futrue的異步方法:
- 在調用future的get時捕獲異常
- 在異常方法中直接捕獲異常
- 對於返回值是void的異步方法:通過AsyncUncaughtExceptionHandler處理異常
代碼如下:
/** * 帶參數的異步調用 異步方法可以傳入參數 * 對於返回值是void,異常會被AsyncUncaughtExceptionHandler處理掉 * @param s */ @Async public void asyncInvokeWithException(String s) { log.info("asyncInvokeWithParameter, parementer={}", s); throw new IllegalArgumentException(s); } /** * 異常調用返回Future * 對於返回值是Future,不會被AsyncUncaughtExceptionHandler處理,需要我們在方法中捕獲異常並處理 * 或者在調用方在調用Futrue.get時捕獲異常進行處理 * * @param i * @return */ @Async public Future<String> asyncInvokeReturnFuture(int i) { log.info("asyncInvokeReturnFuture, parementer={}", i); Future<String> future; try { Thread.sleep(1000 * 1); future = new AsyncResult<String>("success:" + i); throw new IllegalArgumentException("a"); } catch (InterruptedException e) { future = new AsyncResult<String>("error"); } catch(IllegalArgumentException e){ future = new AsyncResult<String>("error-IllegalArgumentException"); } return future; }
實現AsyncConfigurer接口對異常線程池更加細粒度的控制:
- 創建線程自己的線程池
- 對void方法拋出的異常處理的類AsyncUncaughtExceptionHandler
代碼如下:
@Service public class MyAsyncConfigurer implements AsyncConfigurer{ private static final Logger log = LoggerFactory.getLogger(MyAsyncConfigurer.class); @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor(); threadPool.setCorePoolSize(1); threadPool.setMaxPoolSize(1); threadPool.setWaitForTasksToCompleteOnShutdown(true); threadPool.setAwaitTerminationSeconds(60 * 15); threadPool.setThreadNamePrefix("MyAsync-"); threadPool.initialize(); return threadPool; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new MyAsyncExceptionHandler(); } /** * 自定義異常處理類 * @author hry * */ class MyAsyncExceptionHandler implements AsyncUncaughtExceptionHandler { @Override public void handleUncaughtException(Throwable throwable, Method method, Object... obj) { log.info("Exception message - " + throwable.getMessage()); log.info("Method name - " + method.getName()); for (Object param : obj) { log.info("Parameter value - " + param); } } } }
@Async調用中的事務處理機制
在@Async標注的方法,同時也適用了@Transactional進行了標注;在其調用數據庫操作之時,將無法產生事務管理的控制,原因就在於其是基於異步處理的操作。
那該如何給這些操作添加事務管理呢?可以將需要事務管理操作的方法放置到異步方法內部,在內部被調用的方法上添加@Transactional.
例如: 方法A,使用了@Async/@Transactional來標注,但是無法產生事務控制的目的。方法B,使用了@Async來標注, B中調用了C、D,C/D分別使用@Transactional做了標注,則可實現事務控制的目的。
實際業務使用
目前的業務中,主要有兩塊需使用異步線程池完成快速、高效、簡便的異步處理,拋開業務,都是獲取隊列中請求,並發異步處理,再調用第三方接口返回數據,或者入庫用以后續生成報表。部分代碼如下:
啟動類添加@EnableAsync,完成對於Spring異步調用的支持,不做贅述。
創建異步線程池:
@Configuration public class ThreadPoolConfig { @Bean public TaskExecutor videoRetrievalPool() { ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor(); exec.setCorePoolSize(VideoSearchConstant.threadPoolSize); exec.setMaxPoolSize(VideoSearchConstant.threadPoolSize); exec.setAllowCoreThreadTimeOut(true); exec.setKeepAliveSeconds(100);
// 線程池常用處理策略 , 不做贅述 exec.setThreadNamePrefix("video-"); exec.initialize(); return exec; } }
異步任務耗時較長,為避免頻繁刷新隊列,暫時可采用定時作業的形式:
@Async("videoRetrievalPool") @Scheduled(fixedRate = 1000) public void scheduleRetrieval() { ... ... }