Spring Boot從入門到實戰(十):異步處理


原文地址:http://blog.jboost.cn/springboot-async.html

 

在業務開發中,有時候會遇到一些非核心的附加功能,比如短信或微信模板消息通知,或者一些耗時比較久,但主流程不需要立即獲得其結果反饋的操作,比如保存圖片、同步數據到其它合作方等等。如果將這些操作都置於主流程中同步處理,勢必會對核心流程的性能造成影響,甚至由於第三方服務的問題導致自身服務不可用。這時候就應該將這些操作異步化,以提高主流程的性能,並與第三方解耦,提高主流程的可用性。

在Spring Boot中,或者說在Spring中,我們實現異步處理一般有以下幾種方式:

1. 通過 @EnableAsync 與 @Asyc 注解結合實現
2. 通過異步事件實現
3. 通過消息隊列實現

1. 基於注解實現

我們以前在Spring中提供異步支持一般是在配置文件 applicationContext.xml 中添加類似如下配置

<task:annotation-driven executor="executor" />
<task:executor id="executor" pool-size="10-200" queue-capacity="2000"/>

 

Spring的 @EnableAsync 注解的功能與<task:annotation-driven/>類似,將其添加於一個 @Configuration 配置類上,可對Spring應用的上下文開啟異步方法支持。 @Async 注解可以標注在方法或類上,表示某個方法或某個類里的所有方法需要通過異步方式來調用。 

我們以一個demo來示例具體用法,demo地址:https://github.com/ronwxy/springboot-demos/tree/master/springboot-async

 

1. 添加 @EnableAsync 注解

在一個 @Configuration 配置類上添加 @EnableAysnc 注解,我們一般可以添加到啟動類上,如

@SpringBootApplication
@EnableAsync
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

 

2. 配置相關的異步執行線程池 

@Configuration
public class AsyncConfig implements AsyncConfigurer {


    @Value("${async.corePoolSize:10}")
    private int corePoolSize;

    @Value("${async.maxPoolSize:200}")
    private int maxPoolSize;

    @Value("${async.queueCapacity:2000}")
    private int queueCapacity;

    @Value("${async.keepAlive:5}")
    private int keepAlive;

    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setKeepAliveSeconds(keepAlive);
        executor.setThreadNamePrefix("async-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setDaemon(false); //以用戶線程模式運行
        executor.initialize();
        return executor;
    }

    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new MyAsyncUncaughtExceptionHandler();
    }

    public static class MyAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {

        public void handleUncaughtException(Throwable throwable, Method method, Object... objects) {
            System.out.println("catch exception when invoke " + method.getName());
            throwable.printStackTrace();
        }
    }
}

 

可通過配置類的方式對異步線程池進行配置,並提供異步執行時出現異常的處理方法,如

這里我們通過實現 AsyncConfigurer 接口提供了一個異步執行線程池對象,各參數的說明可以參考【線程池的基本原理,看完就懂了】,里面有很詳細的介紹。且通過實現 AsyncUncaughtExceptionHandler 接口提供了一個異步執行過程中未捕獲異常的處理類。 

 

3. 定義異步方法

異步方法的定義只需要在類(類上注解表示該類的所有方法都異步執行)或方法上添加 @Async 注解即可,如

@Service
public class AsyncService {

    @Async
    public void asyncMethod(){
        System.out.println("2. running in thread: " + Thread.currentThread().getName());
    }

    @Async
    public void asyncMethodWithException() {
        throw new RuntimeException("exception in async method");
    }
}

 

4. 測試 

我們可以通過如下測試類來對異步方法進行測試

@RunWith(SpringRunner.class)
@SpringBootTest
public class AnnotationBasedAsyncTest {

    @Autowired
    private AsyncService asyncService;

    @Test
    public void testAsync() throws InterruptedException {
        System.out.println("1. running in thread: " + Thread.currentThread().getName());
        asyncService.asyncMethod();

        Thread.sleep(3);
    }

    @Test
    public void testAysncWithException() throws InterruptedException {
        System.out.println("1. running in thread: " + Thread.currentThread().getName());
        asyncService.asyncMethodWithException();

        Thread.sleep(3);
    }
}

因為異步方法在一個新的線程中執行,可能在主線程執行完后還沒來得及處理,所以通過sleep來等待它執行完成。具體執行結果讀者可自行嘗試運行,這里就不貼圖了。

 

2. 基於事件實現

第二種方式是通過Spring框架的事件監聽機制實現,但Spring的事件監聽默認是同步執行的,所以實際上還是需要借助 @EnableAsync 與 @Async 來實現異步。

1. 添加 @EnableAsync 注解

與上同,可添加到啟動類上。

2. 自定義事件類
通過繼承 ApplicationEvent 來自定義一個事件

public class MyEvent extends ApplicationEvent {

    private String arg;

    public MyEvent(Object source, String arg) {
        super(source);
        this.arg = arg;
    }
    
    //getter/setter
}

 

3. 定義事件處理類
支持兩種形式,一是通過實現 ApplicationListener 接口,如下

@Component
@Async
public class MyEventHandler implements ApplicationListener<MyEvent> {

    public void onApplicationEvent(MyEvent event) {
        System.out.println("2. running in thread: " + Thread.currentThread().getName());
        System.out.println("2. arg value: " + event.getArg());
    }
}

二是通過 @EventListener 注解,如下

@Component
public class MyEventHandler2 {

    @EventListener
    @Async
    public void handle(MyEvent event){
        System.out.println("3. running in thread: " + Thread.currentThread().getName());
        System.out.println("3. arg value: " + event.getArg());
    }
}

注意兩者都需要添加 @Async 注解,否則默認是同步方式執行。 

 

4. 定義事件發送類
可以通過實現 ApplicationEventPublisherAware 接口來使用 ApplicationEventPublisher 的 publishEvent()方法發送事件,

@Component
public class MyEventPublisher implements ApplicationEventPublisherAware {

    protected ApplicationEventPublisher applicationEventPublisher;

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    public void publishEvent(ApplicationEvent event){
        this.applicationEventPublisher.publishEvent(event);
    }
}

 

 

5. 測試

 

可以通過如下測試類來進行測試,

@RunWith(SpringRunner.class)
@SpringBootTest
public class EventBasedAsyncTest {

    @Autowired
    private MyEventPublisher myEventPublisher;

    @Test
    public void testAsync() throws InterruptedException {
        System.out.println("1. running in thread: " + Thread.currentThread().getName());
        myEventPublisher.publishEvent(new MyEvent(this,"testing event based async"));
        Thread.sleep(3);
    }
}

 

運行后發現兩個事件處理類都執行了,因為兩者都監聽了同一個事件 MyEvent 。 

 

3. 基於消息隊列實現

以上兩種方式都是基於服務器本機運行,如果服務進程出現異常退出,可能導致異步執行中斷。如果需要保證任務執行的可靠性,可以借助消息隊列的持久化與重試機制。阿里雲上的消息隊列服務提供了幾種類型的消息支持,如順序消息、定時/延時消息、事務消息等(詳情可參考:https://help.aliyun.com/document_detail/29532.html?spm=5176.234368.1278132.btn4.6f43db25Rn8oey ),如果項目是基於阿里雲部署的,可以考慮使用其中一類消息服務來實現業務需求。

 

4. 總結

本文對spring boot下異步處理的幾種方法進行了介紹,如果對任務執行的可靠性要求不高,則推薦使用第一種方式,如果可靠性要求較高,則推薦使用自建消息隊列或雲消息隊列服務的方式。
本文demo源碼地址:https://github.com/ronwxy/springboot-demos/tree/master/springboot-async/src/main/java/cn/jboost/async


我的個人博客地址:http://blog.jboost.cn
我的微信公眾號:jboost-ksxy (一個不只有技術干貨的公眾號,歡迎關注,及時獲取更新內容)
———————————————————————————————————————————————————————————————
微信公眾號

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM