上文介紹了基於 @Async
注解的 異步調用編程,本文將繼續引入 Spring Boot
的 WebAsyncTask
進行更靈活異步任務處理,包括 異步回調,超時處理 和 異常處理。
正文
1. 處理線程和異步線程
在開始下面的講解之前,在這里先區別下兩個概念:
處理線程:處理線程 屬於 web 服務器線程,負責 處理用戶請求,采用 線程池 管理。
異步線程:異步線程 屬於 用戶自定義的線程,可采用 線程池管理。
Spring 提供了對 異步任務 API,采用 WebAsyncTask 類即可實現 異步任務。對異步任務設置相應的 回調處理,如當 任務超時、異常拋出 等。異步任務通常非常實用,比如:當一筆訂單支付完成之后,開啟異步任務查詢訂單的支付結果。
2. 環境准備
配置gradle依賴
利用 Spring Initializer
創建一個 gradle
項目 spring-boot-web-async-task
,創建時添加相關依賴。得到的初始 build.gradle
如下:
buildscript { ext { springBootVersion = '2.0.3.RELEASE' } repositories { mavenCentral() } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") } } apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'org.springframework.boot' apply plugin: 'io.spring.dependency-management' group = 'io.ostenant.springboot.sample' version = '0.0.1-SNAPSHOT' sourceCompatibility = 1.8 repositories { mavenCentral() } dependencies { compile('org.springframework.boot:spring-boot-starter-web') testCompile('org.springframework.boot:spring-boot-starter-test') }
配置服務類
配置一個用於異步任務調度的 Mock
服務。
@Service public class WebAsyncService { public String generateUUID() { return UUID.randomUUID().toString(); } }
配置異步處理控制器並注入以上服務 Bean
。
@RestController public class WebAsyncController { private final WebAsyncService asyncService; private final static String ERROR_MESSAGE = "Task error"; private final static String TIME_MESSAGE = "Task timeout"; @Autowired public WebAsyncController(WebAsyncService asyncService) { this.asyncService = asyncService; } }
3. 正常異步任務
配置一個正常的 WebAsyncTask
任務對象,設置任務 超時時間 為 10s
。異步任務執行采用 Thread.sleep(long)
模擬,這里設置 異步線程 睡眠時間為 5s
。
@GetMapping("/completion") public WebAsyncTask<String> asyncTaskCompletion() { // 打印處理線程名 out.println(format("請求處理線程:%s", currentThread().getName())); // 模擬開啟一個異步任務,超時時間為10s WebAsyncTask<String> asyncTask = new WebAsyncTask<>(10 * 1000L, () -> { out.println(format("異步工作線程:%s", currentThread().getName())); // 任務處理時間5s,不超時 sleep(5 * 1000L); return asyncService.generateUUID(); }); // 任務執行完成時調用該方法 asyncTask.onCompletion(() -> out.println("任務執行完成")); out.println("繼續處理其他事情"); return asyncTask; }
@GetMapping("/completion") public WebAsyncTask<String> asyncTaskCompletion() { // 打印處理線程名 out.println(format("請求處理線程:%s", currentThread().getName())); // 模擬開啟一個異步任務,超時時間為10s WebAsyncTask<String> asyncTask = new WebAsyncTask<>(10 * 1000L, () -> { out.println(format("異步工作線程:%s", currentThread().getName())); // 任務處理時間5s,不超時 sleep(5 * 1000L); return asyncService.generateUUID(); }); // 任務執行完成時調用該方法 asyncTask.onCompletion(() -> out.println("任務執行完成")); out.println("繼續處理其他事情"); return asyncTask; }
啟動 Spring Boot
項目,訪問 http://localhost:8080/completion ,發起 正常 的異步任務請求。
觀察控制台輸出,可以驗證 WebAsyncTask
的異步處理流程正常
請求處理線程:http-nio-8080-exec-2 繼續處理其他事情 異步工作線程:MvcAsync1 任務執行完成
Web
頁面正常響應,頁面響應消息如下:
注意:WebAsyncTask.onCompletion(Runnable) :在當前任務執行結束以后,無論是執行成功還是異常中止,onCompletion的回調最終都會被調用。
4. 拋出異常異步任務
配置一個 錯誤 的 WebAsyncTask
任務對象,設置任務 超時時間 為 10s
。在異步任務執行方法中 拋出異常。
@GetMapping("/exception") public WebAsyncTask<String> asyncTaskException() { // 打印處理線程名 out.println(format("請求處理線程:%s", currentThread().getName())); // 模擬開啟一個異步任務,超時時間為10s WebAsyncTask<String> asyncTask = new WebAsyncTask<>(10 * 1000L, () -> { out.println(format("異步工作線程:%s", currentThread().getName())); // 任務處理時間5s,不超時 sleep(5 * 1000L); throw new Exception(ERROR_MESSAGE); }); // 任務執行完成時調用該方法 asyncTask.onCompletion(() -> out.println("任務執行完成")); asyncTask.onError(() -> { out.println("任務執行異常"); return ERROR_MESSAGE; }); out.println("繼續處理其他事情"); return asyncTask; }
@GetMapping("/exception") public WebAsyncTask<String> asyncTaskException() { // 打印處理線程名 out.println(format("請求處理線程:%s", currentThread().getName())); // 模擬開啟一個異步任務,超時時間為10s WebAsyncTask<String> asyncTask = new WebAsyncTask<>(10 * 1000L, () -> { out.println(format("異步工作線程:%s", currentThread().getName())); // 任務處理時間5s,不超時 sleep(5 * 1000L); throw new Exception(ERROR_MESSAGE); }); // 任務執行完成時調用該方法 asyncTask.onCompletion(() -> out.println("任務執行完成")); asyncTask.onError(() -> { out.println("任務執行異常"); return ERROR_MESSAGE; }); out.println("繼續處理其他事情"); return asyncTask; }
啟動 Spring Boot
項目,訪問 http://localhost:8080/exception ,發起 異常 的異步任務請求。
Web
頁面響應異常信息如下:
觀察控制台輸出,可以驗證 WebAsyncTask
對於 異常請求 的異步處理過程。
請求處理線程:http-nio-8080-exec-1 繼續處理其他事情 異步工作線程:MvcAsync2 2018-06-18 21:12:10.110 ERROR 89875 --- [nio-8080-exec-2] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] threw exception java.lang.Exception: Task error at io.ostenant.springboot.sample.controller.WebAsyncController.lambda$asyncTaskException$2(WebAsyncController.java:55) ~[classes/:na] at org.springframework.web.context.request.async.WebAsyncManager.lambda$startCallableProcessing$4(WebAsyncManager.java:317) ~[spring-web-5.0.7.RELEASE.jar:5.0.7.RELEASE] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_172] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_172] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_172] 2018-06-18 21:12:10.111 ERROR 89875 --- [nio-8080-exec-2] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.Exception: Task error] with root cause java.lang.Exception: Task error at io.ostenant.springboot.sample.controller.WebAsyncController.lambda$asyncTaskException$2(WebAsyncController.java:55) ~[classes/:na] at org.springframework.web.context.request.async.WebAsyncManager.lambda$startCallableProcessing$4(WebAsyncManager.java:317) ~[spring-web-5.0.7.RELEASE.jar:5.0.7.RELEASE] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_172] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_172] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_172] 任務執行異常 2018-06-18 21:12:10.144 WARN 89875 --- [nio-8080-exec-2] o.apache.catalina.core.AsyncContextImpl : onError() failed for listener of type [org.apache.catalina.core.AsyncListenerWrapper] java.lang.IllegalArgumentException: Cannot dispatch without an AsyncContext at org.springframework.util.Assert.notNull(Assert.java:193) ~[spring-core-5.0.7.RELEASE.jar:5.0.7.RELEASE] at org.springframework.web.context.request.async.StandardServletAsyncWebRequest.dispatch(StandardServletAsyncWebRequest.java:131) ~[spring-web-5.0.7.RELEASE.jar:5.0.7.RELEASE] at org.springframework.web.context.request.async.WebAsyncManager.setConcurrentResultAndDispatch(WebAsyncManager.java:353) ~[spring-web-5.0.7.RELEASE.jar:5.0.7.RELEASE] at org.springframework.web.context.request.async.WebAsyncManager.lambda$startCallableProcessing$2(WebAsyncManager.java:304) ~[spring-web-5.0.7.RELEASE.jar:5.0.7.RELEASE] at org.springframework.web.context.request.async.StandardServletAsyncWebRequest.lambda$onError$0(StandardServletAsyncWebRequest.java:146) ~[spring-web-5.0.7.RELEASE.jar:5.0.7.RELEASE] at java.util.ArrayList.forEach(ArrayList.java:1257) ~[na:1.8.0_172] at org.springframework.web.context.request.async.StandardServletAsyncWebRequest.onError(StandardServletAsyncWebRequest.java:146) ~[spring-web-5.0.7.RELEASE.jar:5.0.7.RELEASE] at org.apache.catalina.core.AsyncListenerWrapper.fireOnError(AsyncListenerWrapper.java:49) ~[tomcat-embed-core-8.5.31.jar:8.5.31] at org.apache.catalina.core.AsyncContextImpl.setErrorState(AsyncContextImpl.java:397) ~[tomcat-embed-core-8.5.31.jar:8.5.31] at org.apache.catalina.connector.CoyoteAdapter.asyncDispatch(CoyoteAdapter.java:239) [tomcat-embed-core-8.5.31.jar:8.5.31] at org.apache.coyote.AbstractProcessor.dispatch(AbstractProcessor.java:232) [tomcat-embed-core-8.5.31.jar:8.5.31] at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:53) [tomcat-embed-core-8.5.31.jar:8.5.31] at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:790) [tomcat-embed-core-8.5.31.jar:8.5.31] at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1468) [tomcat-embed-core-8.5.31.jar:8.5.31] at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-8.5.31.jar:8.5.31] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_172] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_172] at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.5.31.jar:8.5.31] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_172] 任務執行完成
注意:WebAsyncTask.onError(Callable
5. 超時異步任務
配置一個正常的 WebAsyncTask
任務對象,設置任務 超時時間 為 10s
。異步任務執行采用 Thread.sleep(long)
模擬,這里設置 異步線程 睡眠時間為 15s
,引發異步任務超時。
@GetMapping("/timeout") public WebAsyncTask<String> asyncTaskTimeout() { // 打印處理線程名 out.println(format("請求處理線程:%s", currentThread().getName())); // 模擬開啟一個異步任務,超時時間為10s WebAsyncTask<String> asyncTask = new WebAsyncTask<>(10 * 1000L, () -> { out.println(format("異步工作線程:%s", currentThread().getName())); // 任務處理時間5s,不超時 sleep(15 * 1000L); return TIME_MESSAGE; }); // 任務執行完成時調用該方法 asyncTask.onCompletion(() -> out.println("任務執行完成")); asyncTask.onTimeout(() -> { out.println("任務執行超時"); return TIME_MESSAGE; }); out.println("繼續處理其他事情"); return asyncTask; }
啟動 Spring Boot
項目,訪問 http://localhost:8080/timeout ,發起 超時 的異步任務請求。
觀察控制台輸出,可以驗證 WebAsyncTask
的異步超時處理的過程。
請求處理線程:http-nio-8080-exec-1 繼續處理其他事情 異步工作線程:MvcAsync3 任務執行超時 任務執行完成
Web
頁面常響應超時提示信息,頁面響應消息如下:
注意:WebAsyncTask.onTimeout(Callable
6. 線程池異步任務
上面的三種情況中的 異步任務 默認不是采用 線程池機制 進行管理的。
也就是說,一個請求進來,雖然釋放了處理線程,但是系統依舊會為每個請求創建一個 異步任務線程,也就是上面看到的 MvcAsync 開頭的 異步任務線程。
后果就是開銷嚴重,所以通常采用 線程池 進行統一的管理,直接在 WebAsyncTask 類構造器傳入一個 ThreadPoolTaskExecutor 對象實例即可。
構造一個線程池 Bean 對象:
@Configuration public class TaskConfiguration { @Bean("taskExecutor") public ThreadPoolTaskExecutor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(5); taskExecutor.setMaxPoolSize(10); taskExecutor.setQueueCapacity(10); taskExecutor.setThreadNamePrefix("asyncTask"); return taskExecutor; } }
在控制器中注入 ThreadPoolTaskExecutor
對象,重新配置基於 線程池 的 異步任務處理。
@Autowired @Qualifier("taskExecutor") private ThreadPoolTaskExecutor executor; @GetMapping("/threadPool") public WebAsyncTask<String> asyncTaskThreadPool() { return new WebAsyncTask<>(10 * 1000L, executor, () -> { out.println(format("異步工作線程:%s", currentThread().getName())); return asyncService.generateUUID(); }); }
並發地請求 http://localhost:8080/threadPool ,觀察控制台輸出的 異步線程 信息,可以發現 異步任務 直接從 線程池 中獲取 異步線程。
異步工作線程:asyncTask1
異步工作線程:asyncTask2
異步工作線程:asyncTask3
異步工作線程:asyncTask4
異步工作線程:asyncTask5
異步工作線程:asyncTask1
異步工作線程:asyncTask2
異步工作線程:asyncTask3
異步工作線程:asyncTask4
異步工作線程:asyncTask5
小結
本文介紹了 Spring Boot
提供的 WebAsyncTask
的異步編程 API
。相比上問介紹的 @Async
注解,WebAsyncTask
提供更加健全的 超時處理 和 異常處理 支持。