Spring中基於@Async的異步線程池構建與使用


Spring中基於@Async的異步線程池構建與使用

在處理隊列中的請求或者與第三方系統的交互時,異步處理較為常見,為充分利用系統資源,常規多采用構建線程池的方式,但線程池的構建成本高、代碼維護困難;Spring 3.x 引入了@Async可完美解決這類異步處理難題,簡潔,易用,可讀性強。本文就以實際應用中,處理redis隊列中異步請求為例,結合前輩們的總結和自己的實際應用,簡要概述@Async在實際應用的特點。

關於異步調用

何為異步調用

  同步就是整個處理過程順序執行,當各個過程都執行完畢,並返回結果。

  異步調用則是只是發送了調用的指令,調用者無需等待被調用的方法完全執行完畢;而是繼續執行下面的流程。

  常規的多線程處理均為異步調用,例如, 在某個調用中,需要順序調用 A, B, C三個過程方法;如他們都是同步調用,則需要將他們都順序執行完畢之后,方算作過程執行完畢; 如B為一個異步的調用方法,則在執行完A之后,調用B,並不等待B完成,而是執行開始調用C,待C執行完畢之后,就意味着這個過程執行完畢了。

常規的異步調用處理方式

  在Java中,一般在處理類似的場景之時,都是基於創建獨立的線程去完成相應的異步調用邏輯,通過主線程和不同的線程之間的執行流程,從而在啟動獨立的線程之后,主線程繼續執行而不會產生停滯等待的情況。

@Async介紹

  在Spring中,基於@Async標注的方法,稱之為異步方法;這些方法將在執行的時候,將會在獨立的線程中被執行,調用者無需等待它的完成,即可繼續其他的操作。

@Async的構建與使用

基於注解的使用方法

  關於基於xml的使用方式,不做贅述,基於注解的使用方式包括如下三步:

  1. 啟動類加上@EnableAsync
  2. 配置類中完成異步線程池TaskExecutor的導入
  3. 需要異步調用的方法加上@Async

異步線程池TaskExecutor

  異步線程池接口TaskExecutor繼承JDK的Executor,只是在Spring框架內部完成該並發執行接口的重新定義。其實現類與接口層級圖如下:

  這里最常用的是ThreadPoolTaskExecutor ,其實質是對java.util.concurrent.ThreadPoolExecutor的包裝,推薦使用。

  配置類中自定義異步線程池:

    /**
     * 自定義異步線程池
     * @return
     */
    @Bean
    public TaskExecutor taskExecutor() {  
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 
        executor.setThreadNamePrefix("Anno-Executor");
        executor.setMaxPoolSize(10);  
 
        // 設置拒絕策略
        executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                // .....
            }
        });
        // 使用預定義的異常處理類
        // executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
 
        return executor;  
    } 

@Async定義異步任務

  使用@Async定義異步任務包括如下三種方式:

  1. 最簡單的異步調用,返回值為void
  2. 帶參數的異步調用異步方法
  3. 異步調用返回Future

  代碼示例:

@Component
public class AsyncDemo {
    private static final Logger log = LoggerFactory.getLogger(AsyncDemo.class);
 
    /**
     * 最簡單的異步調用,返回值為void
     */
    @Async
    public void asyncInvokeSimplest() {
        log.info("asyncSimplest");
    }
 
    /**
     * 帶參數的異步調用 異步方法可以傳入參數
     * 
     * @param s
     */
    @Async
    public void asyncInvokeWithParameter(String s) {
        log.info("asyncInvokeWithParameter, parementer={}", s);
    }
 
    /**
     * 異步調用返回Future
     * 
     * @param i
     * @return
     */
    @Async
    public Future<String> asyncInvokeReturnFuture(int i) {
        log.info("asyncInvokeReturnFuture, parementer={}", i);
        Future<String> future;
        try {
            Thread.sleep(1000 * 1);
            future = new AsyncResult<String>("success:" + i);
        } catch (InterruptedException e) {
            future = new AsyncResult<String>("error");
        }
        return future;
    }
}

  調用示例:

asyncDemo.asyncInvokeSimplest();
asyncDemo.asyncInvokeWithException("test");
Future<String> future = asyncDemo.asyncInvokeReturnFuture(100);
System.out.println(future.get());

對異步方法的異常處理

  在調用方法時,可能出現方法中拋出異常的情況。在異步中主要有有兩種異常處理方法:

  1. 對於方法返回值是Futrue的異步方法:
    1. 在調用future的get時捕獲異常
    2. 在異常方法中直接捕獲異常
  2. 對於返回值是void的異步方法:通過AsyncUncaughtExceptionHandler處理異常

  代碼如下:

    /**
     * 帶參數的異步調用 異步方法可以傳入參數
     *  對於返回值是void,異常會被AsyncUncaughtExceptionHandler處理掉
     * @param s
     */
    @Async
    public void asyncInvokeWithException(String s) {
        log.info("asyncInvokeWithParameter, parementer={}", s);
        throw new IllegalArgumentException(s);
    }
 
    /**
     * 異常調用返回Future
     *  對於返回值是Future,不會被AsyncUncaughtExceptionHandler處理,需要我們在方法中捕獲異常並處理
     *  或者在調用方在調用Futrue.get時捕獲異常進行處理
     * 
     * @param i
     * @return
     */
    @Async
    public Future<String> asyncInvokeReturnFuture(int i) {
        log.info("asyncInvokeReturnFuture, parementer={}", i);
        Future<String> future;
        try {
            Thread.sleep(1000 * 1);
            future = new AsyncResult<String>("success:" + i);
            throw new IllegalArgumentException("a");
        } catch (InterruptedException e) {
            future = new AsyncResult<String>("error");
        } catch(IllegalArgumentException e){
            future = new AsyncResult<String>("error-IllegalArgumentException");
        }
        return future;
    }

  實現AsyncConfigurer接口對異常線程池更加細粒度的控制:

  1. 創建線程自己的線程池
  2. 對void方法拋出的異常處理的類AsyncUncaughtExceptionHandler

  代碼如下:

@Service
public class MyAsyncConfigurer implements AsyncConfigurer{
    private static final Logger log = LoggerFactory.getLogger(MyAsyncConfigurer.class);
 
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();  
        threadPool.setCorePoolSize(1);  
        threadPool.setMaxPoolSize(1);  
        threadPool.setWaitForTasksToCompleteOnShutdown(true);  
        threadPool.setAwaitTerminationSeconds(60 * 15);  
        threadPool.setThreadNamePrefix("MyAsync-");
        threadPool.initialize();
        return threadPool;  
    }
 
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
         return new MyAsyncExceptionHandler();  
    }
 
    /**
     * 自定義異常處理類
     * @author hry
     *
     */
    class MyAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {  
 
        @Override  
        public void handleUncaughtException(Throwable throwable, Method method, Object... obj) {  
            log.info("Exception message - " + throwable.getMessage());  
            log.info("Method name - " + method.getName());  
            for (Object param : obj) {  
                log.info("Parameter value - " + param);  
            }  
        }  
 
    } 
 
}

@Async調用中的事務處理機制

  在@Async標注的方法,同時也適用了@Transactional進行了標注;在其調用數據庫操作之時,將無法產生事務管理的控制,原因就在於其是基於異步處理的操作。

  那該如何給這些操作添加事務管理呢?可以將需要事務管理操作的方法放置到異步方法內部,在內部被調用的方法上添加@Transactional.

  例如:  方法A,使用了@Async/@Transactional來標注,但是無法產生事務控制的目的。方法B,使用了@Async來標注,  B中調用了C、D,C/D分別使用@Transactional做了標注,則可實現事務控制的目的。

實際業務使用

  目前的業務中,主要有兩塊需使用異步線程池完成快速、高效、簡便的異步處理,拋開業務,都是獲取隊列中請求,並發異步處理,再調用第三方接口返回數據,或者入庫用以后續生成報表。部分代碼如下:

  啟動類添加@EnableAsync,完成對於Spring異步調用的支持,不做贅述。

  創建異步線程池:

@Configuration
public class ThreadPoolConfig {

    @Bean
    public TaskExecutor videoRetrievalPool() {
        ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
        exec.setCorePoolSize(VideoSearchConstant.threadPoolSize);
        exec.setMaxPoolSize(VideoSearchConstant.threadPoolSize);
        exec.setAllowCoreThreadTimeOut(true);
        exec.setKeepAliveSeconds(100);
     // 線程池常用處理策略 , 不做贅述 exec.setThreadNamePrefix(
"video-"); exec.initialize(); return exec; } }

  異步任務耗時較長,為避免頻繁刷新隊列,暫時可采用定時作業的形式:

    @Async("videoRetrievalPool")
    @Scheduled(fixedRate = 1000)
    public void scheduleRetrieval() { ... ... }

 參考資料

  https://blog.csdn.net/wudiyong22/article/details/80747084

  https://blog.csdn.net/blueheart20/article/details/44648667


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM