對於spring異步注解@Async的使用:
對於異步方法調用,從Spring3開始提供了@Async注解,該注解可以被標注在方法上,以便異步地調用該方法。調用者將在調用時立即返回,方法的實際執行將提交給Spring TaskExecutor的任務中,由指定的線程池中的線程執行。
我們嘗試一次異步操作。
定義接口,我們采用有回調的future。
1 public interface AsyncService { 2 3 //涉及一個future的通道,以及一個計數使用的latch 4 Future<Integer> getFollowed(Integer times, CountDownLatch latch); 5 6 Future<Integer> getAtention(Integer times, CountDownLatch latch); 7 8 }
進行方法實現
其中一個我使用了默認線程池,另外一個指定自己的線程池稍后介紹區別
@Service public class AsyncServiceImpl implements AsyncService { @Override @Asyncpublic Future<Integer> getFollowed(Integer times, CountDownLatch latch) { Integer count = 0; //負責相關使用邏輯 for (int i = 0; i < times; i++) { count += i; }
latch.countDown(); return new AsyncResult<>(new Integer(count)); } @Override @Async("asyncExecutor") public Future<Integer> getAtention(Integer times, CountDownLatch latch) { Integer count = 0; //負責相關使用邏輯 for (int i = 0; i < times; i++) { count += i; } latch.countDown(); return new AsyncResult<>(new Integer(count)); }
在外部設置好計數器返回現有結果
long s = System.currentTimeMillis(); //計數 CountDownLatch latch = new CountDownLatch(2); Future<Integer> atention = asyncService.getAtention(times,latch); Future<Integer> followed = asyncService.getFollowed(times, latch); long e = System.currentTimeMillis(); System.out.println("asyncService spend: " + (e - s));
定義自己異步線程池
@Configuration public class AsyncThreadingConfig implements AsyncConfigurer { @Override @Bean(name = "asyncExecutor") public Executor getAsyncExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //最小線程數 taskExecutor.setCorePoolSize(5); //最大線程數 taskExecutor.setMaxPoolSize(10); //等待隊列 taskExecutor.setQueueCapacity(25); // 允許線程空閑時間(單位:默認為秒) taskExecutor.setKeepAliveSeconds(30); // 線程池名前綴 taskExecutor.setThreadNamePrefix("asyncExecutor-"); //設置拒絕策略 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //等待所有任務結束后再關閉線程池 taskExecutor.setWaitForTasksToCompleteOnShutdown(true); // 線程池中任務的等待時間,如果超過這個時候還沒有銷毀就強制銷毀,以確保應用最后能夠被關閉,而不是阻塞住 taskExecutor.setAwaitTerminationSeconds(60); taskExecutor.initialize(); return taskExecutor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return AsyncConfigurer.super.getAsyncUncaughtExceptionHandler(); } }
基本上就是異步實現的過程。
首先我們Spring 已經實現的線程池
1. SimpleAsyncTaskExecutor:不是真的線程池,這個類不重用線程,默認每次調用都會創建一個新的線程。
2. SyncTaskExecutor:這個類沒有實現異步調用,只是一個同步操作。只適用於不需要多線程的地方。
3. ConcurrentTaskExecutor:Executor的適配類,不推薦使用。如果ThreadPoolTaskExecutor不滿足要求時,才用考慮使用這個類。
4. SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的類。線程池同時被quartz和非quartz使用,才需要使用此類。
5. ThreadPoolTaskExecutor :最常使用,推薦。 其實質是對java.util.concurrent.ThreadPoolExecutor的包裝。
Spring應用默認的線程池,指在@Async注解在使用時,不指定線程池的名稱。查看源碼,@Async的默認執行器為SimpleAsyncTaskExecutor。
幾種線程池會出現的問題:
在線程池應用中,參考阿里巴巴java開發規范:線程池不允許使用Executors去創建,不允許使用系統默認的線程池,推薦通過ThreadPoolExecutor的方式,這樣的處理方式讓開發的工程師更加明確線程池的運行規則,規避資源耗盡的風險。Executors各個方法的弊端:
newFixedThreadPool和newSingleThreadExecutor:主要問題是堆積的請求處理隊列可能會耗費非常大的內存,甚至OOM。
newCachedThreadPool和newScheduledThreadPool:要問題是線程數最大數是Integer.MAX_VALUE,可能會創建數量非常多的線程,甚至OOM。
@Async默認異步配置使用的是SimpleAsyncTaskExecutor,該線程池默認來一個任務創建一個線程,若系統中不斷的創建線程,最終會導致系統占用內存過高,引發OutOfMemoryError錯誤。針對線程創建問題,SimpleAsyncTaskExecutor提供了限流機制,通過concurrencyLimit屬性來控制開關,當concurrencyLimit>=0時開啟限流機制,默認關閉限流機制即concurrencyLimit=-1,當關閉情況下,會不斷創建新的線程來處理任務。基於默認配置,SimpleAsyncTaskExecutor並不是嚴格意義的線程池,達不到線程復用的功能。
@Async應用自定義線程池(第四步)
自定義線程池,可對系統中線程池更加細粒度的控制,方便調整線程池大小配置,線程執行異常控制和處理。在設置系統自定義線程池代替默認線程池時,雖可通過多種模式設置,但替換默認線程池最終產生的線程池有且只能設置一個(不能設置多個類繼承AsyncConfigurer)。自定義線程池有如下模式:
- 重新實現接口AsyncConfigurer
- 繼承AsyncConfigurerSupport
- 配置由自定義的TaskExecutor替代內置的任務執行器
通過查看Spring源碼關於@Async的默認調用規則,會優先查詢源碼中實現AsyncConfigurer這個接口的類,實現這個接口的類為AsyncConfigurerSupport。但默認配置的線程池和異步處理方法均為空,所以,無論是繼承或者重新實現接口,都需指定一個線程池。且重新實現 public Executor getAsyncExecutor()方法。
Async可能會存在失效
沒有過去到代理類,本類調用時,直接自己內部調用,沒有走代理類
1.沒有在@SpringBootApplication啟動類當中添加注解@EnableAsync注解。
2.異步方法使用注解@Async的返回值只能為void或者Future。
3.沒有走Spring的代理類。因為@Transactional和@Async注解的實現都是基於Spring的AOP,而AOP的實現是基於動態代理模式實現的。那么注解失效的原因就很明顯了,有可能因為調用方法的是對象本身而不是代理對象,因為沒有經過Spring容器。
