SpringBoot異步方法優化處理提高響應速度


1.前言

日常開發中,對於串行化的任務適當解耦耗時操作和業務邏輯,在保證結果准確性的前提下,使用異步方法適當進行並行化改造,可以提高接口響應速度,提升使用體驗。

如下抽象的串行化工作流程:

業務查詢,首先登記記錄record[cost 3s],之后依次執行searchA[cost 1s]、searchB[cost 2s]、searchC[cost 2s]分別得到變量a、b、c,返回結果fx(a,b,c)[計算耗時可忽略不記]。代碼如下:

import com.zang.async.service.AsyncCaseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.time.Duration;
import java.time.Instant;

@Slf4j
@RestController
public class AsyncCaseController {

    @Resource
    private AsyncCaseService asyncCaseService;

    @PostMapping("/search/sync-test")
    public int syncSearch(){
        log.info("========test start=========");
        Instant start = Instant.now();
        asyncCaseService.record();
        int a = asyncCaseService.searchA();
        int b = asyncCaseService.searchB();
        int c = asyncCaseService.searchC();
        int result = a+b+c;
        Instant end = Instant.now();
        log.info("========test end=========cost time is {} seconds", Duration.between(start,end).getSeconds());
        return result;
    }
    ···
import org.springframework.stereotype.Service;

@Service
public class AsyncCaseServiceImpl implements AsyncCaseService{

    @Override
    public int searchA() {
        try {
            Thread.sleep(1000);//模擬業務處理耗時
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 1;
    }

    @Override
    public int searchB() {
		//其他方法類似

執行結果:

2022-04-21 13:32:47.739  INFO 22764 --- [nio-8089-exec-2] com.zang.async.web.AsyncCaseController   : ========test start=========
2022-04-21 13:32:55.762  INFO 22764 --- [nio-8089-exec-2] com.zang.async.web.AsyncCaseController   : ========test end=========cost time is 8 seconds

經過分析,可以看到三個查詢方法可以並行執行,等待都產生結果執行fx(a,b,c)record方法執行的順序和完成度不影響結果的返回,可以使用異步任務執行。改造邏輯抽象如下:

之后就代碼實現展開闡述。

2.SpringBoot中的異步方法支持

SpringBoot已經提供了異步方法支持注解,因此不需要我們自己去創建維護線程或者線程池來異步的執行方法。

主要依靠兩個注解:

@EnableAsync // 使用異步方法時需要提前開啟(在啟動類上或配置類上)
@Async // 被async注解修飾的方法由SpringBoot默認線程池(SimpleAsyncTaskExecutor)執行

2.1 獲取(有返回值)異步方法的返回值

對於有返回值的異步方法,可使用java.util.concurrent.Future類及其子類來接收異步方法返回值。

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.Future;

@Service
public class AsyncCaseServiceImpl implements AsyncCaseService{

    @Async
    @Override
    public Future<Integer> searchA() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new AsyncResult<>(1);
    }
    //略

無返回值異步方法的異常捕獲見3.3。

2.2 異步任務並行控制

接上節,在對Service中有返回值的方法進行異步改造的同時,業務處理側需要添加並行控制,使並行的異步都返回結果才進行下一步操作:

import com.zang.async.service.AsyncCaseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Future;

@Slf4j
@RestController
public class AsyncCaseController {

    @Resource
    private AsyncCaseService asyncCaseService;

    @PostMapping("/search/async-test")
    public int asyncSearch() {
        log.info("========test start=========");
        Instant start = Instant.now();
        asyncCaseService.record();
        Future<Integer> searchAFuture = asyncCaseService.searchA();
        Future<Integer> searchBFuture = asyncCaseService.searchB();
        Future<Integer> searchCFuture = asyncCaseService.searchC();
        while (true) {
            if (searchAFuture.isDone() && searchBFuture.isDone() && searchCFuture.isDone()) {
                break;
            }
            if (searchAFuture.isCancelled() || searchBFuture.isCancelled() || searchCFuture.isCancelled()) {
                log.info("async work has cancelled , break");
                break;
            }
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        int a = 0, b = 0, c = 0;
        try {
            a = searchAFuture.get();
            b = searchBFuture.get();
            c = searchCFuture.get();
        } catch (Exception e) {
            e.printStackTrace();
        }

        int result = a + b + c;
        Instant end = Instant.now();
        log.info("========test end=========cost time is {} seconds", Duration.between(start, end).getSeconds());
        return result;
    }
}

結果:

2022-04-21 14:23:35.486  INFO 19912 --- [nio-8089-exec-4] com.zang.async.web.AsyncCaseController   : ========test start=========
2022-04-21 14:23:37.516  INFO 19912 --- [nio-8089-exec-4] com.zang.async.web.AsyncCaseController   : ========test end=========cost time is 2 seconds

3.自定義線程池執行異步方法

@Async使用了線程池org.springframework.core.task.SimpleAsyncTaskExecutor來執行我們的異步方法,實際開發中我們也可以自定義自己的線程池,便於對線程池進行合理配置。

3.1 自定義線程池

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@EnableAsync
@Configuration
public class AsyncThreadPoolConfigure {

    @Bean("asyncThreadPoolTaskExecutor")
    public Executor asyncThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(4);
        executor.setQueueCapacity(10);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("async-task-executor");
        executor.setThreadGroupName("async-task-executor-group");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 所有任務結束后關閉線程池
        //executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }
}

3.2 在@Async注解上指定執行的線程池

    @Async("asyncThreadPoolTaskExecutor")
    @Override
    public Future<Integer> searchA() {
        try {
        //略

以上,自定義線程池執行異步方法即完成。

3.3 自定義線程池監控

自定義的線程池配置的參數是否合理往往使人摸不着頭腦,實際上,線程池執行器org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor為Spring自帶的,在測試中可以創建新執行器,繼承該執行器,重寫submit方法,對其增加監控,從而查看線程池狀態,得到合適的線程池配置。

public class MonitorThreadPoolExecutor extends ThreadPoolTaskExecutor {

    public void monitor(){
       log.info("**** getActiveCount=={},getPoolSize=={},getLargestPoolSize=={},getTaskCount=={},getCompletedTaskCount=={},getQueue=={} ***",this.getThreadPoolExecutor().getActiveCount(),this.getThreadPoolExecutor().getPoolSize(),this.getThreadPoolExecutor().getLargestPoolSize(),this.getThreadPoolExecutor().getTaskCount(),this.getThreadPoolExecutor().getCompletedTaskCount(),this.getThreadPoolExecutor().getQueue().size());
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        monitor();
        return super.submit(task);
    }
}

在3.1自定義線程池時創建該監控執行器即可。

3.3 無返回值異步方法的異常捕獲

以實現org.springframework.scheduling.annotation.AsyncConfigurer接口的getAsyncExecutor方法和getAsyncUncaughtExceptionHandler方法改造配置類。

import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Slf4j
@EnableAsync
@Configuration
public class AsyncThreadPoolConfigure implements AsyncConfigurer {

    //線程池創建方法為重寫 getAsyncExecutor
    @Bean("asyncThreadPoolTaskExecutor")
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(4);
        executor.setQueueCapacity(10);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("async-task-executor");
        executor.setThreadGroupName("async-task-executor-group");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 所有任務結束后關閉線程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncExceptionHandler();
    }

    public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
        @Override
        public void handleUncaughtException(Throwable throwable, Method method, Object... obj) {
            log.error("Exception message is {}", throwable.getMessage());
            log.error("Method name is {} ", method.getName());
            for (Object param : obj) {
                log.error("Parameter value - {}", param);
            }
        }
    }

表現如下:

@Async("asyncThreadPoolTaskExecutor")
    @Override
    public void record() {
        try {
            Thread.sleep(3000);
            log.info("current thread name is {}",Thread.currentThread().getName());
            throw new RuntimeException("network not connect ");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

控制台:

2022-04-21 15:34:14.931  INFO 16596 --- [nio-8089-exec-1] com.zang.async.web.AsyncCaseController   : ========test start=========
2022-04-21 15:34:16.965  INFO 16596 --- [nio-8089-exec-1] com.zang.async.web.AsyncCaseController   : ========test end=========cost time is 2 seconds
2022-04-21 15:34:17.939  INFO 16596 --- [-task-executor1] c.z.async.service.AsyncCaseServiceImpl   : current thread name is async-task-executor1
2022-04-21 15:34:17.940 ERROR 16596 --- [-task-executor1] c.z.a.c.AsyncThreadPoolConfigure         : Exception message is network not connect 
2022-04-21 15:34:17.941 ERROR 16596 --- [-task-executor1] c.z.a.c.AsyncThreadPoolConfigure         : Method name is record 

4.一些思考

異步方法的集成極為方便,可以有效提高接口響應速度,但是使用過程中要注意合理的分析業務邏輯及服務器資源承載能力,不可濫用。

對於強一致性的業務,需要注意,異步方法執行失敗對於前部分的已執行的非異步操作是無影響的,因此在該場景異步並不可靠;

此外,對於並發量過大的任務,異步線程池的隊列緩存也較為消耗服務器資源,需要合理規划,必要時建議采用更為可靠的消息隊列等中間件。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM