以下文章來源於aoho求索 ,作者aoho
1. 什么是異步調用?
異步調用是相對於同步調用而言的,同步調用是指程序按預定順序一步步執行,每一步必須等到上一步執行完后才能執行,異步調用則無需等待上一步程序執行完即可執行。異步調用指,在程序在執行時,無需等待執行的返回值即可繼續執行后面的代碼。在我們的應用服務中,有很多業務邏輯的執行操作不需要同步返回(如發送郵件、冗余數據表等),只需要異步執行即可。
本文將介紹 Spring 應用中,如何實現異步調用。在異步調用的過程中,會出現線程上下文信息的丟失,我們該如何解決線程上下文信息的傳遞。
2. Spring應用中實現異步
Spring 為任務調度與異步方法執行提供了注解支持。通過在方法或類上設置 @Async注解,可使得方法被異步調用。調用者會在調用時立即返回,而被調用方法的實際執行是交給 Spring 的 TaskExecutor 來完成的。所以被注解的方法被調用的時候,會在新的線程中執行,而調用它的方法會在原線程中執行,這樣可以避免阻塞,以及保證任務的實時性。
2.1 引入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
引入Spring相關的依賴即可。在Spring5.0之后,Spring官方推薦使用Spring Boot構建項目。
2.2 入口類
@SpringBootApplication @EnableAsync public class AsyncApplication { public static void main(String[] args) { SpringApplication.run(AsyncApplication.class, args); } }
入口類增加了 @EnableAsync 注解,主要是為了掃描范圍包下的所有 @Async 注解。
2.3 對外的接口
這里寫了一個簡單的接口:
@RestController @Slf4j public class TaskController { @Autowired private TaskService taskService; @GetMapping("/task") public String taskExecute() { try { taskService.doTaskOne(); taskService.doTaskTwo(); taskService.doTaskThree(); } catch (Exception e) { log.error("error executing task for {}",e.getMessage()); } return "ok"; } }
調用 TaskService 執行三個異步方法。
2.4 Service 方法
@Component @Slf4j public class TaskService { @Async public void doTaskOne() throws Exception { log.info("開始做任務一"); long start = System.currentTimeMillis(); Thread.sleep(1000); long end = System.currentTimeMillis(); log.info("完成任務一,耗時:" + (end - start) + "毫秒"); } @Async public void doTaskTwo() throws Exception { log.info("開始做任務二"); long start = System.currentTimeMillis(); Thread.sleep(1000); long end = System.currentTimeMillis(); log.info("完成任務二,耗時:" + (end - start) + "毫秒"); } @Async public void doTaskThree() throws Exception { log.info("開始做任務三"); long start = System.currentTimeMillis(); Thread.sleep(1000); long end = System.currentTimeMillis(); log.info("完成任務三,耗時:" + (end - start) + "毫秒"); } }
@Async 可以用於類上,標識該類的所有方法都是異步方法,也可以單獨用於某些方法。每個方法都會 sleep 1000 ms。
2.5 結果展示
運行結果如下:
2020-01-16 14:14:55.677 INFO 15516 --- [ task-2] com.zh.sync.TaskService : 開始做任務二 2020-01-16 14:14:55.678 INFO 15516 --- [ task-3] com.zh.sync.TaskService : 開始做任務三 2020-01-16 14:14:55.678 INFO 15516 --- [ task-1] com.zh.sync.TaskService : 開始做任務一 2020-01-16 14:14:56.678 INFO 15516 --- [ task-2] com.zh.sync.TaskService : 完成任務二,耗時:1000 毫秒 2020-01-16 14:14:56.678 INFO 15516 --- [ task-1] com.zh.sync.TaskService : 完成任務一,耗時:1000 毫秒 2020-01-16 14:14:56.678 INFO 15516 --- [ task-3] com.zh.sync.TaskService : 完成任務三,耗時:1000 毫秒
可以看到 TaskService 中的三個方法是異步執行的,接口的結果快速返回,日志信息異步輸出。異步調用,通過開啟新的線程調用的方法,不影響主線程。異步方法實際的執行交給了 Spring 的 TaskExecutor 來完成。
3. Future:獲取異步執行的結果
在上面的測試中我們也可以發現主調用方法並沒有等到調用方法執行完就結束了當前的任務。如果想要知道調用的三個方法全部執行完該怎么辦呢,下面就可以用到異步回調。
異步回調就是讓每個被調用的方法返回一個 Future 類型的值,Spring 中提供了一個 Future 接口的子類:AsyncResult,所以我們可以返回 AsyncResult 類型的值。
public class AsyncResult<V> implements ListenableFuture<V> { private final V value; private final ExecutionException executionException; //... }
AsyncResult 實現了 ListenableFuture 接口,該對象內部有兩個屬性:返回值和異常信息。
public interface ListenableFuture<T> extends Future<T> { void addCallback(ListenableFutureCallback<? super T> var1); void addCallback(SuccessCallback<? super T> var1, FailureCallback var2); }
ListenableFuture 接口繼承自 Future,在此基礎上增加了回調方法的定義。Future 接口定義如下:
public interface Future<V> { // 是否可以打斷當前正在執行的任務 boolean cancel(boolean mayInterruptIfRunning); // 任務取消的結果 boolean isCancelled(); // 異步方法中最后返回的那個對象中的值 V get() throws InterruptedException, ExecutionException; // 用來判斷該異步任務是否執行完成,如果執行完成,則返回 true,如果未執行完成,則返回false boolean isDone(); // 與 get() 一樣,只不過這里參數中設置了超時時間 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
#get() 方法,在執行的時候是需要等待回調結果的,阻塞等待。如果不設置超時時間,它就阻塞在那里直到有了任務執行完成。我們設置超時時間,就可以在當前任務執行太久的情況下中斷當前任務,釋放線程,這樣就不會導致一直占用資源。
#cancel(boolean) 方法,參數是一個 boolean 類型的值,用來傳入是否可以打斷當前正在執行的任務。如果參數是 true 且當前任務沒有執行完成 ,說明可以打斷當前任務,那么就會返回 true;如果當前任務還沒有執行,那么不管參數是 true 還是 false,返回值都是 true;如果當前任務已經完成,那么不管參數是 true 還是 false,那么返回值都是 false;如果當前任務沒有完成且參數是 false,那么返回值也是 false。即:
- 如果任務還沒執行,那么如果想取消任務,就一定返回 true,與參數無關。
- 如果任務已經執行完成,那么任務一定是不能取消的,所以此時返回值都是false,與參數無關。
- 如果任務正在執行中,那么此時是否取消任務就看參數是否允許打斷(true/false)。
3.1 獲取異步方法返回值的實現
public Future<String> doTaskOne() throws Exception { log.info("開始做任務一"); long start = System.currentTimeMillis(); Thread.sleep(1000); long end = System.currentTimeMillis(); log.info("完成任務一,耗時:" + (end - start) + "毫秒"); return new AsyncResult<>("任務一完成,耗時" + (end - start) + "毫秒"); } //...其他兩個方法類似,省略
我們將 task 方法的返回值改為 Future<String>,將執行的時間拼接為字符串返回。
@GetMapping("/task") public String taskExecute() { try { Future<String> r1 = taskService.doTaskOne(); Future<String> r2 = taskService.doTaskTwo(); Future<String> r3 = taskService.doTaskThree(); while (true) { if (r1.isDone() && r2.isDone() && r3.isDone()) { log.info("execute all tasks"); break; } Thread.sleep(200); } log.info("\n" + r1.get() + "\n" + r2.get() + "\n" + r3.get()); } catch (Exception e) { log.error("error executing task for {}",e.getMessage()); } return "ok"; }
運行結果:
2020-01-16 14:22:01.503 INFO 7764 --- [ task-3] com.zh.sync.TaskService2 : 開始做任務三 2020-01-16 14:22:01.503 INFO 7764 --- [ task-1] com.zh.sync.TaskService2 : 開始做任務一 2020-01-16 14:22:01.504 INFO 7764 --- [ task-2] com.zh.sync.TaskService2 : 開始做任務二 2020-01-16 14:22:02.504 INFO 7764 --- [ task-2] com.zh.sync.TaskService2 : 完成任務二,耗時:1000 毫秒 2020-01-16 14:22:02.504 INFO 7764 --- [ task-1] com.zh.sync.TaskService2 : 完成任務一,耗時:1000 毫秒 2020-01-16 14:22:02.514 INFO 7764 --- [nio-8080-exec-1] com.zh.sync.TaskController : execute all tasks 2020-01-16 14:22:02.514 INFO 7764 --- [nio-8080-exec-1] com.zh.sync.TaskController : 完成任務一,耗時:1000毫秒 完成任務二,耗時:1000毫秒 完成任務三,耗時:1000毫秒
在調用異步方法之后,可以通過循環判斷異步方法是否執行完成。結果正如我們所預期,future 所 get 到的是 AsyncResult 返回的字符串。
4. 配置線程池
前面是最簡單的使用方法,使用默認的 TaskExecutor。如果想使用自定義的 Executor,可以結合 @Configuration 注解的配置方式。
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; @Configuration public class TaskPoolConfig { @Bean("taskExecutor") // bean 的名稱,默認為首字母小寫的方法名 public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); // 核心線程數(默認線程數) executor.setMaxPoolSize(20); // 最大線程數 executor.setQueueCapacity(200); // 緩沖隊列數 executor.setKeepAliveSeconds(60); // 允許線程空閑時間(單位:默認為秒) executor.setThreadNamePrefix("taskExecutor-"); // 線程池名前綴 // 線程池對拒絕任務的處理策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } }
線程池的配置很靈活,對核心線程數、最大線程數等屬性進行配置。其中,rejection-policy,當線程池已經達到最大線程數的時候,如何處理新任務。可選策略有 CallerBlocksPolicy、CallerRunsPolicy 等。CALLER_RUNS:不在新線程中執行任務,而是由調用者所在的線程來執行。我們驗證下,線程池的設置是否生效,在 TaskService 中,打印當前的線程名稱:
public Future<String> doTaskOne() throws Exception { log.info("開始做任務一"); long start = System.currentTimeMillis(); Thread.sleep(1000); long end = System.currentTimeMillis(); log.info("完成任務一,耗時:" + (end - start) + "毫秒"); log.info("當前線程為 {}", Thread.currentThread().getName()); return new AsyncResult<>("任務一完成,耗時" + (end - start) + "毫秒"); }
運行結果:
2020-01-16 14:30:37.447 INFO 17000 --- [taskExecutor--1] com.zh.sync.TaskService2 : 當前線程為 taskExecutor--1 2020-01-16 14:30:37.447 INFO 17000 --- [taskExecutor--2] com.zh.sync.TaskService2 : 當前線程為 taskExecutor--2 2020-01-16 14:30:37.447 INFO 17000 --- [taskExecutor--3] com.zh.sync.TaskService2 : 當前線程為 taskExecutor--3
通過結果可以看到,線程池配置的線程名前綴已經生效。在 Spring @Async 異步線程使用過程中,需要注意的是以下的用法會使 @Async 失效:
- 異步方法使用 static 修飾;
- 異步類沒有使用 @Component 注解(或其他注解)導致 Spring 無法掃描到異步類;
- 異步方法不能與被調用的異步方法在同一個類中;
- 類中需要使用 @Autowired 或 @Resource 等注解自動注入,不能手動 new 對象;
- 如果使用 Spring Boot 框架必須在啟動類中增加 @EnableAsync 注解。
5. 線程上下文信息傳遞
很多時候,在微服務架構中的一次請求會涉及多個微服務。或者一個服務中會有多個處理方法,這些方法有可能是異步方法。有些線程上下文信息,如請求的路徑,用戶唯一的 userId,這些信息會一直在請求中傳遞。如果不做任何處理,我們看下是否能夠正常獲取這些信息。
@GetMapping("/task") public String taskExecute() { try { Future<String> r1 = taskService.doTaskOne(); Future<String> r2 = taskService.doTaskTwo(); Future<String> r3 = taskService.doTaskThree(); ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); HttpServletRequest request = requestAttributes.getRequest(); log.info("當前線程為 {},請求方法為 {},請求路徑為:{}", Thread.currentThread().getName(), request.getMethod(),
request.getRequestURL().toString()); while (true) { if (r1.isDone() && r2.isDone() && r3.isDone()) { log.info("execute all tasks"); break; } Thread.sleep(200); } log.info("\n" + r1.get() + "\n" + r2.get() + "\n" + r3.get()); } catch (Exception e) { log.error("error executing task for {}", e.getMessage()); } return "ok"; }
在 Spring Boot Web 中我們可以通過 RequestContextHolder 很方便的獲取 request。在接口方法中,輸出請求的方法和請求的路徑。
public Future<String> doTaskOne() throws Exception { log.info("開始做任務一"); long start = System.currentTimeMillis(); Thread.sleep(1000); long end = System.currentTimeMillis(); log.info("完成任務一,耗時:" + (end - start) + "毫秒"); ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); HttpServletRequest request = requestAttributes.getRequest(); log.info("當前線程為 {},請求方法為 {},請求路徑為:{}", Thread.currentThread().getName(), request.getMethod(),
request.getRequestURL().toString()); return new AsyncResult<>("任務一完成,耗時" + (end - start) + "毫秒"); }
同時在 TaskService 中,驗證是不是也能輸出請求的信息。運行程序,結果如下:
2020-01-16 14:35:20.314 INFO 13272 --- [nio-8080-exec-1] com.zh.sync.TaskController : 當前線程為 http-nio-8080-exec-1,請求方法為 GET,請求路徑為:http://localhost:8080/task2 2020-01-16 14:35:20.320 INFO 13272 --- [taskExecutor--2] com.zh.sync.TaskService2 : 開始做任務二 2020-01-16 14:35:20.320 INFO 13272 --- [taskExecutor--3] com.zh.sync.TaskService2 : 開始做任務三 2020-01-16 14:35:20.320 INFO 13272 --- [taskExecutor--1] com.zh.sync.TaskService2 : 開始做任務一 2020-01-16 14:35:21.321 INFO 13272 --- [taskExecutor--3] com.zh.sync.TaskService2 : 完成任務三,耗時:1001毫秒 2020-01-16 14:35:21.321 INFO 13272 --- [taskExecutor--1] com.zh.sync.TaskService2 : 完成任務一,耗時:1001毫秒 2020-01-16 14:35:21.321 INFO 13272 --- [taskExecutor--2] com.zh.sync.TaskService2 : 完成任務二,耗時:1001毫秒 2020-01-16 14:35:21.521 INFO 13272 --- [nio-8080-exec-1] com.zh.sync.TaskController : execute all tasks 2020-01-16 14:35:21.548 ERROR 13272 --- [nio-8080-exec-1] com.zh.sync.TaskController : 報錯啦, java.util.concurrent.ExecutionException: java.lang.NullPointerException at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[na:1.8.0_11] at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[na:1.8.0_11]
在 TaskService 中,每個異步線程的方法獲取 RequestContextHolder 中的請求信息時,報了空指針異常。這說明了請求的上下文信息未傳遞到異步方法的線程中。RequestContextHolder 的實現,里面有兩個 ThreadLocal 保存當前線程下的 request。
//得到存儲進去的request private static final ThreadLocal<RequestAttributes> requestAttributesHolder = new NamedThreadLocal<RequestAttributes>("Request attributes"); //可被子線程繼承的request private static final ThreadLocal<RequestAttributes> inheritableRequestAttributesHolder = new NamedInheritableThreadLocal<RequestAttributes>("Request context");
再看 #getRequestAttributes() 方法,相當於直接獲取 ThreadLocal 里面的值,這樣就使得每一次獲取到的 Request 是該請求的 request。如何將上下文信息傳遞到異步線程呢?Spring 中的 ThreadPoolTaskExecutor 有一個配置屬性 TaskDecorator,TaskDecorator 是一個回調接口,采用裝飾器模式。裝飾模式是動態的給一個對象添加一些額外的功能,就增加功能來說,裝飾模式比生成子類更為靈活。因此 TaskDecorator 主要用於任務的調用時設置一些執行上下文,或者為任務執行提供一些監視/統計
public interface TaskDecorator { Runnable decorate(Runnable runnable); }
#decorate 方法,裝飾給定的 Runnable,返回包裝的 Runnable 以供實際執行。
下面我們定義一個線程上下文拷貝的 TaskDecorator。
import org.springframework.core.task.TaskDecorator; import org.springframework.web.context.request.RequestAttributes; import org.springframework.web.context.request.RequestContextHolder; public class ContextDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnable) { RequestAttributes context = RequestContextHolder.currentRequestAttributes(); return () -> { try { RequestContextHolder.setRequestAttributes(context); runnable.run(); } finally { RequestContextHolder.resetRequestAttributes(); } }; } }
實現較為簡單,將當前線程的 context 裝飾到指定的 Runnable,最后重置當前線程上下文。
在線程池的配置中,增加回調的 TaskDecorator 屬性的配置:
@Bean("taskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setQueueCapacity(200); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("taskExecutor-"); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setAwaitTerminationSeconds(60); // 增加 TaskDecorator 屬性的配置 executor.setTaskDecorator(new ContextDecorator()); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; }
經過如上配置,我們再次運行服務,並訪問接口,控制台日志信息如下:
2020-01-16 14:42:27.362 INFO 2712 --- [nio-8080-exec-1] com.zh.sync.TaskController : 當前線程為 http-nio-8080-exec-1,請求方法為 GET,請求路徑為:http://localhost:8080/task2 2020-01-16 14:42:27.369 INFO 2712 --- [taskExecutor--3] com.zh.sync.TaskService2 : 開始做任務三 2020-01-16 14:42:27.370 INFO 2712 --- [taskExecutor--2] com.zh.sync.TaskService2 : 開始做任務二 2020-01-16 14:42:27.370 INFO 2712 --- [taskExecutor--1] com.zh.sync.TaskService2 : 開始做任務一 2020-01-16 14:42:28.371 INFO 2712 --- [taskExecutor--3] com.zh.sync.TaskService2 : 完成任務三,耗時:1001毫秒 2020-01-16 14:42:28.371 INFO 2712 --- [taskExecutor--2] com.zh.sync.TaskService2 : 完成任務二,耗時:1001毫秒 2020-01-16 14:42:28.371 INFO 2712 --- [taskExecutor--3] com.zh.sync.TaskService2 : 當前線程為 taskExecutor--3,請求方法為 GET,請求路徑為:http://localhost:8080/task2 2020-01-16 14:42:28.371 INFO 2712 --- [taskExecutor--2] com.zh.sync.TaskService2 : 當前線程為 taskExecutor--2,請求方法為 GET,請求路徑為:http://localhost:8080/task2 2020-01-16 14:42:28.371 INFO 2712 --- [taskExecutor--1] com.zh.sync.TaskService2 : 完成任務一,耗時:1001毫秒 2020-01-16 14:42:28.372 INFO 2712 --- [taskExecutor--1] com.zh.sync.TaskService2 : 當前線程為 taskExecutor--1,請求方法為 GET,請求路徑為:http://localhost:8080/task2 2020-01-16 14:42:28.565 INFO 2712 --- [nio-8080-exec-1] com.zh.sync.TaskController : execute all tasks
由結果可知,線程的上下文信息傳遞成功。
小結
本文結合示例講解了 Spring 中實現異步方法,獲取異步方法的返回值。並介紹了配置 Spring 線程池的方式。最后介紹如何在異步多線程中傳遞線程上下文信息。線程上下文傳遞在分布式環境中會經常用到,比如分布式鏈路追蹤中需要一次請求涉及到的 TraceId、SpanId。簡單來說,需要傳遞的信息能夠在不同線程中。異步方法是我們在日常開發中用來多線程處理業務邏輯,這些業務邏輯不需要嚴格的執行順序。用好異步解決問題的同時,更要用對異步多線程的方式。