1.前言
日常開發中,對於串行化的任務適當解耦耗時操作和業務邏輯,在保證結果准確性的前提下,使用異步方法適當進行並行化改造,可以提高接口響應速度,提升使用體驗。
如下抽象的串行化工作流程:
業務查詢,首先登記記錄record
[cost 3s],之后依次執行searchA
[cost 1s]、searchB
[cost 2s]、searchC
[cost 2s]分別得到變量a、b、c,返回結果fx(a,b,c)
[計算耗時可忽略不記]。代碼如下:
import com.zang.async.service.AsyncCaseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.time.Duration;
import java.time.Instant;
@Slf4j
@RestController
public class AsyncCaseController {
@Resource
private AsyncCaseService asyncCaseService;
@PostMapping("/search/sync-test")
public int syncSearch(){
log.info("========test start=========");
Instant start = Instant.now();
asyncCaseService.record();
int a = asyncCaseService.searchA();
int b = asyncCaseService.searchB();
int c = asyncCaseService.searchC();
int result = a+b+c;
Instant end = Instant.now();
log.info("========test end=========cost time is {} seconds", Duration.between(start,end).getSeconds());
return result;
}
···
import org.springframework.stereotype.Service;
@Service
public class AsyncCaseServiceImpl implements AsyncCaseService{
@Override
public int searchA() {
try {
Thread.sleep(1000);//模擬業務處理耗時
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}
@Override
public int searchB() {
//其他方法類似
執行結果:
2022-04-21 13:32:47.739 INFO 22764 --- [nio-8089-exec-2] com.zang.async.web.AsyncCaseController : ========test start=========
2022-04-21 13:32:55.762 INFO 22764 --- [nio-8089-exec-2] com.zang.async.web.AsyncCaseController : ========test end=========cost time is 8 seconds
經過分析,可以看到三個查詢方法可以並行執行,等待都產生結果執行fx(a,b,c)
,record
方法執行的順序和完成度不影響結果的返回,可以使用異步任務執行。改造邏輯抽象如下:
之后就代碼實現展開闡述。
2.SpringBoot中的異步方法支持
SpringBoot已經提供了異步方法支持注解,因此不需要我們自己去創建維護線程或者線程池來異步的執行方法。
主要依靠兩個注解:
@EnableAsync // 使用異步方法時需要提前開啟(在啟動類上或配置類上)
@Async // 被async注解修飾的方法由SpringBoot默認線程池(SimpleAsyncTaskExecutor)執行
2.1 獲取(有返回值)異步方法的返回值
對於有返回值的異步方法,可使用java.util.concurrent.Future
類及其子類來接收異步方法返回值。
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.Future;
@Service
public class AsyncCaseServiceImpl implements AsyncCaseService{
@Async
@Override
public Future<Integer> searchA() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new AsyncResult<>(1);
}
//略
無返回值異步方法的異常捕獲見3.3。
2.2 異步任務並行控制
接上節,在對Service中有返回值的方法進行異步改造的同時,業務處理側需要添加並行控制,使並行的異步都返回結果才進行下一步操作:
import com.zang.async.service.AsyncCaseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Future;
@Slf4j
@RestController
public class AsyncCaseController {
@Resource
private AsyncCaseService asyncCaseService;
@PostMapping("/search/async-test")
public int asyncSearch() {
log.info("========test start=========");
Instant start = Instant.now();
asyncCaseService.record();
Future<Integer> searchAFuture = asyncCaseService.searchA();
Future<Integer> searchBFuture = asyncCaseService.searchB();
Future<Integer> searchCFuture = asyncCaseService.searchC();
while (true) {
if (searchAFuture.isDone() && searchBFuture.isDone() && searchCFuture.isDone()) {
break;
}
if (searchAFuture.isCancelled() || searchBFuture.isCancelled() || searchCFuture.isCancelled()) {
log.info("async work has cancelled , break");
break;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int a = 0, b = 0, c = 0;
try {
a = searchAFuture.get();
b = searchBFuture.get();
c = searchCFuture.get();
} catch (Exception e) {
e.printStackTrace();
}
int result = a + b + c;
Instant end = Instant.now();
log.info("========test end=========cost time is {} seconds", Duration.between(start, end).getSeconds());
return result;
}
}
結果:
2022-04-21 14:23:35.486 INFO 19912 --- [nio-8089-exec-4] com.zang.async.web.AsyncCaseController : ========test start=========
2022-04-21 14:23:37.516 INFO 19912 --- [nio-8089-exec-4] com.zang.async.web.AsyncCaseController : ========test end=========cost time is 2 seconds
3.自定義線程池執行異步方法
@Async
使用了線程池org.springframework.core.task.SimpleAsyncTaskExecutor
來執行我們的異步方法,實際開發中我們也可以自定義自己的線程池,便於對線程池進行合理配置。
3.1 自定義線程池
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@EnableAsync
@Configuration
public class AsyncThreadPoolConfigure {
@Bean("asyncThreadPoolTaskExecutor")
public Executor asyncThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(4);
executor.setQueueCapacity(10);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("async-task-executor");
executor.setThreadGroupName("async-task-executor-group");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 所有任務結束后關閉線程池
//executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
}
3.2 在@Async注解上指定執行的線程池
@Async("asyncThreadPoolTaskExecutor")
@Override
public Future<Integer> searchA() {
try {
//略
以上,自定義線程池執行異步方法即完成。
3.3 自定義線程池監控
自定義的線程池配置的參數是否合理往往使人摸不着頭腦,實際上,線程池執行器org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
為Spring自帶的,在測試中可以創建新執行器,繼承該執行器,重寫submit
方法,對其增加監控,從而查看線程池狀態,得到合適的線程池配置。
public class MonitorThreadPoolExecutor extends ThreadPoolTaskExecutor {
public void monitor(){
log.info("**** getActiveCount=={},getPoolSize=={},getLargestPoolSize=={},getTaskCount=={},getCompletedTaskCount=={},getQueue=={} ***",this.getThreadPoolExecutor().getActiveCount(),this.getThreadPoolExecutor().getPoolSize(),this.getThreadPoolExecutor().getLargestPoolSize(),this.getThreadPoolExecutor().getTaskCount(),this.getThreadPoolExecutor().getCompletedTaskCount(),this.getThreadPoolExecutor().getQueue().size());
}
@Override
public <T> Future<T> submit(Callable<T> task) {
monitor();
return super.submit(task);
}
}
在3.1自定義線程池時創建該監控執行器即可。
3.3 無返回值異步方法的異常捕獲
以實現org.springframework.scheduling.annotation.AsyncConfigurer
接口的getAsyncExecutor
方法和getAsyncUncaughtExceptionHandler
方法改造配置類。
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Slf4j
@EnableAsync
@Configuration
public class AsyncThreadPoolConfigure implements AsyncConfigurer {
//線程池創建方法為重寫 getAsyncExecutor
@Bean("asyncThreadPoolTaskExecutor")
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(4);
executor.setQueueCapacity(10);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("async-task-executor");
executor.setThreadGroupName("async-task-executor-group");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 所有任務結束后關閉線程池
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncExceptionHandler();
}
public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable throwable, Method method, Object... obj) {
log.error("Exception message is {}", throwable.getMessage());
log.error("Method name is {} ", method.getName());
for (Object param : obj) {
log.error("Parameter value - {}", param);
}
}
}
表現如下:
@Async("asyncThreadPoolTaskExecutor")
@Override
public void record() {
try {
Thread.sleep(3000);
log.info("current thread name is {}",Thread.currentThread().getName());
throw new RuntimeException("network not connect ");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
控制台:
2022-04-21 15:34:14.931 INFO 16596 --- [nio-8089-exec-1] com.zang.async.web.AsyncCaseController : ========test start=========
2022-04-21 15:34:16.965 INFO 16596 --- [nio-8089-exec-1] com.zang.async.web.AsyncCaseController : ========test end=========cost time is 2 seconds
2022-04-21 15:34:17.939 INFO 16596 --- [-task-executor1] c.z.async.service.AsyncCaseServiceImpl : current thread name is async-task-executor1
2022-04-21 15:34:17.940 ERROR 16596 --- [-task-executor1] c.z.a.c.AsyncThreadPoolConfigure : Exception message is network not connect
2022-04-21 15:34:17.941 ERROR 16596 --- [-task-executor1] c.z.a.c.AsyncThreadPoolConfigure : Method name is record
4.一些思考
異步方法的集成極為方便,可以有效提高接口響應速度,但是使用過程中要注意合理的分析業務邏輯及服務器資源承載能力,不可濫用。
對於強一致性的業務,需要注意,異步方法執行失敗對於前部分的已執行的非異步操作是無影響的,因此在該場景異步並不可靠;
此外,對於並發量過大的任務,異步線程池的隊列緩存也較為消耗服務器資源,需要合理規划,必要時建議采用更為可靠的消息隊列等中間件。