1. 概述
在日常開發中,為了提高主線程的效率,往往需要采用異步調用處理,例如系統日志等。在實際業務場景中,可以使用消息中間件如RabbitMQ、RocketMQ、Kafka等來解決。假如對高可用沒有太高的要求,也可以使用線程池或者隊列來解決。
2. 創建工程
- 創建Maven工程
- 修改配置文件
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.c3stones</groupId>
<artifactId>spring-boot-async-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-async-demo</name>
<description>Spring Boot Simple Demo</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath />
</parent>
<properties>
<java.version>1.8</java.version>
<maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
</properties>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.2-jre</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
- 示例1:創建線程池,異步調用
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class SimpleDemo {
// 創建固定數目線程的線程池
// 阿里巴巴Java開發手冊中提到可能存在OOM(OutOfMemory)內存溢出異常
// private static ExecutorService executorService = Executors.newFixedThreadPool(5);
// 推薦使用com.google.guava的ThreadFactoryBuilder來創建線程池
private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("simple-threadpool-%d")
.build();
/**
* java.util.concurrent.ThreadPoolExecutor.ThreadPoolExecutor(int corePoolSize,
* int maximumPoolSize, long keepAliveTime, TimeUnit unit,
* BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
* RejectedExecutionHandler handler)
*
* @param corePoolSize 線程池大小
* @param maximumPoolSize 最大線程數
* @param keepAliveTime 當線程大於線程池大小時,最長等待時間
* @param unit 時間單位
* @param workQueue 任務隊列
* @param threadFactory 指定線程工廠
* @param handler 當線程池界限和隊列容量時,阻止線程處理
*/
private static ExecutorService threadPool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), threadFactory, new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
// 提交線程到線程池
for (int i = 0; i < 5; i++) {
threadPool.execute(new SimpleThread1());
threadPool.execute(new SimpleThread2());
}
// 關閉
threadPool.shutdown();
}
}
class SimpleThread1 implements Runnable {
private static Logger logger = LoggerFactory.getLogger(SimpleThread1.class);
@Override
public void run() {
try {
logger.info("線程 SimpleThread1 被調用!");
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class SimpleThread2 implements Runnable {
private static Logger logger = LoggerFactory.getLogger(SimpleThread2.class);
@Override
public void run() {
try {
logger.info("線程 SimpleThread2 被調用!");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
控制台打印:
14:06:05.009 [simple-threadpool-1] INFO com.c3stones.simple.SimpleThread2 - 線程 SimpleThread2 被調用!
14:06:05.009 [simple-threadpool-3] INFO com.c3stones.simple.SimpleThread2 - 線程 SimpleThread2 被調用!
14:06:05.010 [simple-threadpool-0] INFO com.c3stones.simple.SimpleThread1 - 線程 SimpleThread1 被調用!
14:06:05.009 [simple-threadpool-4] INFO com.c3stones.simple.SimpleThread1 - 線程 SimpleThread1 被調用!
14:06:05.009 [simple-threadpool-2] INFO com.c3stones.simple.SimpleThread1 - 線程 SimpleThread1 被調用!
14:06:06.017 [simple-threadpool-3] INFO com.c3stones.simple.SimpleThread2 - 線程 SimpleThread2 被調用!
14:06:06.018 [simple-threadpool-1] INFO com.c3stones.simple.SimpleThread1 - 線程 SimpleThread1 被調用!
14:06:07.017 [simple-threadpool-3] INFO com.c3stones.simple.SimpleThread2 - 線程 SimpleThread2 被調用!
14:06:08.017 [simple-threadpool-3] INFO com.c3stones.simple.SimpleThread1 - 線程 SimpleThread1 被調用!
14:06:08.018 [simple-threadpool-2] INFO com.c3stones.simple.SimpleThread2 - 線程 SimpleThread2 被調用!
問題:
(1) 當進程被異常關閉,會導致存儲在線程池或者隊列的線程丟失。
(2) 但是消息隊列中的消息不會因為JVM進程關閉而丟失,依然存儲在消息隊列所在服務器上。
3. 快速入門
- 開啟異步支持
/**
* 啟動類
*
* @author CL
*
*/
@SpringBootApplication
@EnableAsync // 開啟異步支持
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
- 編寫業務代碼
import org.springframework.stereotype.Service;
import lombok.extern.slf4j.Slf4j;
/**
* 快速入門
*
* @author CL
*
*/
@Service
@Slf4j
public class TestQuickService {
@SneakyThrows
public String get() {
log.info("調用TestQuickService.get()!");
Thread.sleep(2000);
return "get";
}
@SneakyThrows
public String get2() {
log.info("調用TestQuickService.get2()!");
Thread.sleep(5000);
return "get2";
}
}
- 測試同步調用
/**
* 測試快速入門
*
* @author CL
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
@Slf4j
public class TestQuick {
@Autowired
private TestQuickService quickService;
/**
* 測試同步調用
*/
@Test
public void testSync() {
LocalDateTime startTime = LocalDateTime.now();
quickService.get();
quickService.get2();
LocalDateTime endTime = LocalDateTime.now();
log.info("同步調用,總耗時:" + Duration.between(startTime, endTime).toMillis() + " ms");
}
}
控制台打印:
2020-05-28 16:39:36.853 INFO 6220 --- [ main] com.c3stones.quick.TestQuickService : 調用TestQuickService.get()!
2020-05-28 16:39:38.853 INFO 6220 --- [ main] com.c3stones.quick.TestQuickService : 調用TestQuickService.get2()!
2020-05-28 16:39:43.857 INFO 6220 --- [ main] com.c3stones.test.TestQuick : 同步調用,總耗時:7000 ms
可以看出總耗時為兩個業務方法的總耗時,並且在主線程中執行。
- 異步調用,添加 @Async 注解
@Async
public String asyncGet() {
return this.get();
}
@Async
public String asyncGet2() {
return this.get2();
}
- 測試異步調用
/**
* 測試異步調用
*/
@Test
public void testAsync() {
LocalDateTime startTime = LocalDateTime.now();
quickService.asyncGet();
quickService.asyncGet2();
LocalDateTime endTime = LocalDateTime.now();
log.info("異步調用,總耗時:" + Duration.between(startTime, endTime).toMillis() + " ms");
}
控制台打印:
2020-05-28 17:11:10.550 INFO 908 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-05-28 17:11:10.563 INFO 908 --- [ main] com.c3stones.test.TestQuick : 異步調用,總耗時:45 ms
2020-05-28 17:11:10.574 INFO 908 --- [ task-2] com.c3stones.quick.TestQuickService : 調用TestQuickService.get2()!
2020-05-28 17:11:10.575 INFO 908 --- [ task-1] com.c3stones.quick.TestQuickService : 調用TestQuickService.get()!
2020-05-28 17:11:10.587 INFO 908 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
可以看出總耗時不受業務方法時間的影響,並且都在異步線程池中執行的。
注意:實際兩個方法並沒有執行完成。
4. 異步調用但主線程阻塞
實際場景中肯定不可能出現上述的問題。肯定希望既能異步調用,並且主線程能阻塞,直到方法執行完成。
- 改進代碼,返回Futrue對象
@Async
public Future<String> asyncFutureGet() {
return AsyncResult.forValue(this.get());
}
@Async
public Future<String> asyncFutureGet2() {
return AsyncResult.forValue(this.get2());
}
- 測試異步調用並阻塞主線程
/**
* 測試異步調用並阻塞主線程
*/
@Test
@SneakyThrows
public void testAsyncFuture() {
LocalDateTime startTime = LocalDateTime.now();
// 執行
Future<String> getFuture = quickService.asyncFutureGet();
Future<String> get2Future = quickService.asyncFutureGet2();
// 阻塞等待執行結果
getFuture.get();
get2Future.get();
LocalDateTime endTime = LocalDateTime.now();
log.info("異步調用,總耗時:" + Duration.between(startTime, endTime).toMillis() + " ms");
}
控制台打印:
2020-05-28 17:14:57.434 INFO 5784 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-05-28 17:14:57.468 INFO 5784 --- [ task-2] com.c3stones.quick.TestQuickService : 調用TestQuickService.get2()!
2020-05-28 17:14:57.470 INFO 5784 --- [ task-1] com.c3stones.quick.TestQuickService : 調用TestQuickService.get()!
2020-05-28 17:15:02.474 INFO 5784 --- [ main] com.c3stones.test.TestQuick : 異步調用,總耗時:5066 ms
2020-05-28 17:15:02.511 INFO 5784 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
可以看出總耗時由耗時較長的方法決定,並且在異步線程池中執行。
5. Spring Task配置
- 編寫配置文件application.yml
spring:
task: # Spring執行器配置,對應TaskExecutionProperties配置類。對於Spring異步任務,會使用該執行器。
execution:
thread-name-prefix: task- # 線程池的線程名的前綴。默認為 task- ,根據自己應用來設置。
pool: # 線程池相關
core-size: 8 # 核心線程數,線程池創建時初始化的線程數。默認為 8。
max-size: 20 # 最大線程數,線程池最大的線程數,只有在緩沖隊列滿了之后,才會申請超過核心線程數的線程。默認為 Integer.MAX_VALUE。
keep-alive: 60s # 允許線程的空閑時間,當超過了核心線程之外的線程,在空閑時間到達之后會被銷毀。默認為 60 秒。
queue-capacity: 200 # 緩沖隊列大小,用來緩沖執行任務的隊列的大小。默認為 Integer.MAX_VALUE。
allow-core-thread-timeout: true # 是否允許核心線程超時,即開啟線程池的動態增長和縮小。默認為 true。
shutdown:
await-termination: true # 應用關閉時,是否等待定時任務執行完成。默認為 false ,建議設置為 true。
await-termination-period: 60 # 等待任務完成的最大時長,單位為秒。默認為 0。
在spring.task.execution
配置項Spring Task
調度任務的配置對應TaskExecutionProperties
配置類。
Spring Boot TaskExecutionAutoConfiguration
自動化配置類,實現Spring Task
的自動配置,創建ThreadPoolTaskExecutor
基於線程池的任務執行器。本質上ThreadPoolTaskExecutor
是基於 ThreadPoolExecutor
的封裝,主要增加了提交任務,返回ListenableFuture
對象的功能。
spring.task.execution.shutdown
配置項是為了實現Spring Task
異步任務的優雅關閉。
在異步任務執行過程中,如果應用開始關閉,把異步任務需要使用到的Spring Bean一並銷毀,例如數據庫連接池等,但是異步任務還在執行中,當需要訪問數據庫時,就會導致報錯。所以通過配置await-termination = true
來實現應用關閉時,等待異步任務執行完成。這樣應用在關閉時Spring 會優先等待ThreadPoolTaskSchedule 執行完任務之后,再開始Spring Bean的銷毀。
同時,又考慮到我們不可能無限等待異步任務全部執行結束,因此可以配置await-termination-period = 60
,等待任務完成的最大時長,單位為秒。具體設置需與根據業務場景決定。
6. 異步回調
- AsyncResult類,異步結果
- ListenableFuture接口,監聽Future
- Future接口,返回異步計算結果
org.springframework.scheduling.annotation.AsyncResult<V>
類實現了org.springframework.util.concurrent.ListenableFuture<T>
接口,org.springframework.util.concurrent.ListenableFuture<T>
又繼承了java.util.concurrent.Future<V>
接口。
Future表示一個異步計算任務,當任務完成時得到計算結果。如果希望執行完成后馬上得到結果,則需要另一個線程不斷的查詢監控狀態,毋庸置疑增加了一定的復雜度。ListenableFuture對java原生Future進行擴展增強,其實就是監聽Future是否執行完成,並自動調用回調方法,減少並發程序的復雜度。
源碼分析:
public interface Future<V> {
/**
* 1. 如果任務沒有開始,mayInterruptIfRunning = true/false,則直接返回false;<br/>
* 2. 如果任務正在執行中,mayInterruptIfRunning = true,則試圖中斷任務,如果成功中斷,則返回true;<br/>
* 3. 如果任務正在執行中,mayInterruptIfRunning = false,則不會對執行中的線程產生影響,則直接返回false;<br/>
* 4. 如果任務已經完成,mayInterruptIfRunning = true/false,則直接返回false;<br/>
*
* @param mayInterruptIfRunning 是否中斷執行中的線程
* @return true/false
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 如果任務被中斷,則返回true
*
* @return true/false
*/
boolean isCancelled();
/**
* 如果任務已經完成,無論是正常結束或是中端斷都返回 true
*
* @return true/false
*/
boolean isDone();
/**
* 獲得異步結果,如果任務還在執行中,則阻塞直到異步任務完成
*
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
V get() throws InterruptedException, ExecutionException;
/**
* 獲得異步結果,如果任務還在執行中,則阻塞直到異步任務完成<br/>
* 但是有時間限制,如果阻塞時間超過設定的timeout,則拋出異常。
*
* @param timeout 超時時間
* @param unit 時間單位枚舉
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
public interface ListenableFuture<T> extends Future<T> {
/**
* 回調方法,處理異常結果(不論成功還是異常)
*
* @param callback
*/
void addCallback(ListenableFutureCallback<? super T> callback);
/**
* 回調方法,分別處理成功和異常結果
*
* @param successCallback 成功回調
* @param failureCallback 異常回調
*/
void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);
/**
* 轉換成JDK1.8提供的CompletableFuture類,該類提供了非常強大的Future擴展功能
*
* @return
*/
default CompletableFuture<T> completable() {
CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this);
addCallback(completable::complete, completable::completeExceptionally);
return completable;
}
}
public class AsyncResult<V> implements ListenableFuture<V> {
@Nullable
private final V value;
@Nullable
private final Throwable executionException;
public AsyncResult(@Nullable V value) {
this(value, null);
}
private AsyncResult(@Nullable V value, @Nullable Throwable ex) {
this.value = value;
this.executionException = ex;
}
/**
* AsyncResult表示異常結果,則表示取消失敗
*
* @param mayInterruptIfRunning 是否中斷執行中的線程
* @return false
*/
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
/**
* AsyncResult表示異常結果,則表示取消失敗
*
* @return false
*/
@Override
public boolean isCancelled() {
return false;
}
/**
* AsyncResult表示異常結果,則表示執行完成
*
* @return true
*/
@Override
public boolean isDone() {
return true;
}
/**
* 獲得異步結果,如果任務還在執行中,則阻塞直到異步任務完成
*
* @return
* @throws ExecutionException
*/
@Override
@Nullable
public V get() throws ExecutionException {
if (this.executionException != null) {
throw (this.executionException instanceof ExecutionException ? (ExecutionException) this.executionException
: new ExecutionException(this.executionException));
}
return this.value;
}
/**
* 獲得異步結果,如果任務還在執行中,則阻塞直到異步任務完成<br/>
* 但是有時間限制,如果阻塞時間超過設定的timeout,則拋出異常。
*
* @param timeout 超時時間
* @param unit 時間單位枚舉
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
@Override
@Nullable
public V get(long timeout, TimeUnit unit) throws ExecutionException {
return get();
}
/**
* 回調方法,處理成功和異常的結果
*
* @param callback
*/
@Override
public void addCallback(ListenableFutureCallback<? super V> callback) {
addCallback(callback, callback);
}
/**
* 回調方法,分別處理成功和異常的結果<br/>
* catch中忽略了回調中發生的異常。當多個對調方法執行時,不會因為某一個回調方法異常而影響其他的回調方法
*
* @param successCallback 成功回調
* @param failureCallback 異常回調
*/
@Override
public void addCallback(SuccessCallback<? super V> successCallback, FailureCallback failureCallback) {
try {
if (this.executionException != null) {
failureCallback.onFailure(exposedException(this.executionException));
} else {
successCallback.onSuccess(this.value);
}
} catch (Throwable ex) {
// Ignore
}
}
/**
* 返回CompletableFuture
*
* @return
*/
@Override
public CompletableFuture<V> completable() {
if (this.executionException != null) {
CompletableFuture<V> completable = new CompletableFuture<>();
completable.completeExceptionally(exposedException(this.executionException));
return completable;
} else {
return CompletableFuture.completedFuture(this.value);
}
}
/**
* 執行成功,返回ListenableFuture
*
* @param <V>
* @param value
* @return
*/
public static <V> ListenableFuture<V> forValue(V value) {
return new AsyncResult<>(value, null);
}
/**
* 執行失敗,返回ListenableFuture,執行回調方法
*
* @param <V>
* @param ex
* @return
*/
public static <V> ListenableFuture<V> forExecutionException(Throwable ex) {
return new AsyncResult<>(null, ex);
}
/**
* 獲得具體的異常
*
* @param original
* @return
*/
private static Throwable exposedException(Throwable original) {
if (original instanceof ExecutionException) {
Throwable cause = original.getCause();
if (cause != null) {
return cause;
}
}
return original;
}
}
- ListenableFutureTask類,異步增強ListenableFuture
org.springframework.util.concurrent.ListenableFutureTask<T>
類繼承了java.util.concurrent.FutureTask<T>
類,實現了org.springframework.util.concurrent.ListenableFuture<T>
接口。
public class ListenableFutureTask<T> extends FutureTask<T> implements ListenableFuture<T> {
/**
* 暫存回調
*/
private final ListenableFutureCallbackRegistry<T> callbacks = new ListenableFutureCallbackRegistry<>();
public ListenableFutureTask(Callable<T> callable) {
super(callable);
}
public ListenableFutureTask(Runnable runnable, @Nullable T result) {
super(runnable, result);
}
/**
* 回調方法,處理成功和異常的結果
*
* @param callback
*/
@Override
public void addCallback(ListenableFutureCallback<? super T> callback) {
this.callbacks.addCallback(callback);
}
/**
* 回調方法,分別處理成功和異常的結果<br/>
* catch中忽略了回調中發生的異常。當多個對調方法執行時,不會因為某一個回調方法異常而影響其他的回調方法
*
* @param successCallback 成功回調
* @param failureCallback 異常回調
*/
@Override
public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) {
this.callbacks.addSuccessCallback(successCallback);
this.callbacks.addFailureCallback(failureCallback);
}
/**
* 返回CompletableFuture
*
* @return
*/
@Override
public CompletableFuture<T> completable() {
CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this);
this.callbacks.addSuccessCallback(completable::complete);
this.callbacks.addFailureCallback(completable::completeExceptionally);
return completable;
}
/**
* 重寫FutureTask中的done方法
*/
@Override
protected void done() {
Throwable cause;
try {
// 獲得執行結果
T result = get();
// 執行成功,執行成功回調方法
this.callbacks.success(result);
return;
} catch (InterruptedException ex) { // 中斷異常,並返回
Thread.currentThread().interrupt();
return;
} catch (ExecutionException ex) { // 獲得具體的異常,並設置到cause中
cause = ex.getCause();
if (cause == null) {
cause = ex;
}
} catch (Throwable ex) { // 設置到cause中
cause = ex;
}
// 執行異常,執行異常回調方法
this.callbacks.failure(cause);
}
}
7. 測試異步回調
- 編寫業務代碼
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
/**
* 測試回調
*
* @author CL
*
*/
@Service
@Slf4j
public class TestCallbackService {
@SneakyThrows
public String get() {
log.info("調用TestCallbackService.get()!");
Thread.sleep(2000);
return "get";
}
@Async
public ListenableFuture<String> asyncCallback() {
try {
return AsyncResult.forValue(this.get());
} catch (Throwable ex) {
return AsyncResult.forExecutionException(ex);
}
}
}
- 測試異步回調
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
/**
* 測試異步回調
*
* @author CL
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
@Slf4j
public class TestCallback {
@Autowired
private TestCallbackService callbackService;
/**
* 測試異步回調
*/
@Test
@SneakyThrows
public void testCallback() {
LocalDateTime startTime = LocalDateTime.now();
ListenableFuture<String> listenableFuture = callbackService.asyncCallback();
log.info("返回類型為:" + listenableFuture.getClass().getSimpleName());
// 分別增加成功和異常的回調
listenableFuture.addCallback(new SuccessCallback<String>() {
@Override
public void onSuccess(String result) {
log.info("執行成功!");
}
}, new FailureCallback() {
@Override
public void onFailure(Throwable ex) {
log.error("執行異常:" + ex);
}
});
// 增加統一的成功和異常回調
listenableFuture.addCallback(new ListenableFutureCallback<String>() {
@Override
public void onSuccess(String result) {
log.info("執行成功!");
}
@Override
public void onFailure(Throwable ex) {
log.error("執行異常:" + ex);
}
});
// 阻塞主線程
listenableFuture.get();
LocalDateTime endTime = LocalDateTime.now();
log.info("異步調用,總耗時:" + Duration.between(startTime, endTime).toMillis() + " ms");
}
}
控制台打印:
2020-05-29 11:56:24.815 INFO 17920 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-05-29 11:56:24.822 INFO 17920 --- [ main] com.c3stones.test.TestCallback : 返回類型為:ListenableFutureTask
2020-05-29 11:56:24.835 INFO 17920 --- [ task-1] c.c3stones.callback.TestCallbackService : 調用TestCallbackService.get()!
2020-05-29 11:56:26.838 INFO 17920 --- [ task-1] com.c3stones.test.TestCallback : 執行成功!
2020-05-29 11:56:26.838 INFO 17920 --- [ task-1] com.c3stones.test.TestCallback : 執行成功!
2020-05-29 11:56:26.843 INFO 17920 --- [ main] com.c3stones.test.TestCallback : 異步調用,總耗時:2055 ms
2020-05-29 11:56:26.874 INFO 17920 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
8. 全局異步異常處理器
實現AsyncUncaughtExceptionHandler
接口即可實現異步調用的統一異常處理。
注意:AsyncUncaughtExceptionHandler
返回非Future類型 的異步調用方法。因此返回Future類型的異步方法只能使用回調方法處理。
從AsyncExecutionAspectSupport
的handleError
方法可以看出上述結論。
protected void handleError(Throwable ex, Method method, Object... params) throws Exception {
// 返回類型為Future,直接拋出異常
if (Future.class.isAssignableFrom(method.getReturnType())) {
ReflectionUtils.rethrowException(ex);
}
// 否則,交給AsyncUncaughtExceptionHandler來處理
else {
// Could not transmit the exception to the caller with default executor
try {
this.exceptionHandler.obtain().handleUncaughtException(ex, method, params);
}
catch (Throwable ex2) {
logger.warn("Exception handler for async method '" + method.toGenericString() +
"' threw unexpected exception itself", ex2);
}
}
}
- 編寫全局異步異常處理器
import java.lang.reflect.Method;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
/**
* 全局異步異常處理器
*
* @author CL
*
*/
@Component
@Slf4j
public class GlobalAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {
/**
* 處理未捕獲的異步調用異常
*/
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
log.error("方法:{}, 參數:{},調用異常:{}", method, params, ex.getMessage());
}
}
- 編寫異步配置類
import java.util.concurrent.Executor;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import com.c3stones.handle.GlobalAsyncUncaughtExceptionHandler;
@Configuration
@EnableAsync // 開啟異步支持(和Application啟動類中保留一個即可,建議保留在此處)
public class AsyncConfig implements AsyncConfigurer {
@Autowired
private GlobalAsyncUncaughtExceptionHandler asyncUncaughtExceptionHandler;
/**
* 返回 Spring Task異步任務的默認執行器。<br/>
* 返回了null,則使用TaskExecutionAutoConfiguration自動化配置類創建的ThreadPoolTaskExecutor任務執行器作為默認執行器
*/
@Override
public Executor getAsyncExecutor() {
return null;
}
/**
* 返回自定義的全局異步異常處理器
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return asyncUncaughtExceptionHandler;
}
}
- 測試全局異步異常處理
import java.time.Duration;
import java.time.LocalDateTime;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.c3stones.Application;
import com.c3stones.exceptionHandle.TestAsyncExceptionService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
/**
* 測試異步異常處理
*
* @author CL
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
@Slf4j
public class TestAsyncException {
@Autowired
private TestAsyncExceptionService asyncExceptionService;
/**
* 測試異步異常
*/
@Test
@SneakyThrows
public void testAsyncException() {
LocalDateTime startTime = LocalDateTime.now();
asyncExceptionService.asyncException();
// 異步調用成功
Thread.sleep(5000);
LocalDateTime endTime = LocalDateTime.now();
log.info("異步異常調用,總耗時:" + Duration.between(startTime, endTime).toMillis() + " ms");
}
}
控制台打印:
2020-05-29 11:59:12.804 INFO 12724 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-05-29 11:59:12.821 ERROR 12724 --- [ task-1] .c.h.GlobalAsyncUncaughtExceptionHandler : 方法:public java.lang.String com.c3stones.exceptionHandle.TestAsyncExceptionService.asyncException(), 參數:[],調用異常:TestAsyncExceptionService.asyncException拋出異常!
2020-05-29 11:59:17.817 INFO 12724 --- [ main] com.c3stones.test.TestAsyncException : 異步異常調用,總耗時:5048 ms
2020-05-29 11:59:17.856 INFO 12724 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
9. 自定義任務執行器
使用Spring Boot TaskExecutionAutoConfiguration
自動化配置類,實現自動配置ThreadPoolTaskExecutor
任務執行器。
- 創建Maven工程
- 修改pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.c3stones</groupId>
<artifactId>spring-boot-async-demo2</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-async-demo2</name>
<description>Spring Boot Async Demo2</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath />
</parent>
<properties>
<java.version>1.8</java.version>
<maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
- 編寫application.yml
spring:
task: # Spring執行器配置,對應TaskExecutionProperties配置類。對於Spring異步任務,會使用該執行器。
execution-one:
thread-name-prefix: task-one- # 線程池的線程名的前綴。默認為 task- ,根據自己應用來設置。
pool: # 線程池相關
core-size: 8 # 核心線程數,線程池創建時初始化的線程數。默認為 8。
max-size: 20 # 最大線程數,線程池最大的線程數,只有在緩沖隊列滿了之后,才會申請超過核心線程數的線程。默認為 Integer.MAX_VALUE。
keep-alive: 60s # 允許線程的空閑時間,當超過了核心線程之外的線程,在空閑時間到達之后會被銷毀。默認為 60 秒。
queue-capacity: 200 # 緩沖隊列大小,用來緩沖執行任務的隊列的大小。默認為 Integer.MAX_VALUE。
allow-core-thread-timeout: true # 是否允許核心線程超時,即開啟線程池的動態增長和縮小。默認為 true。
shutdown:
await-termination: true # 應用關閉時,是否等待定時任務執行完成。默認為 false ,建議設置為 true。
await-termination-period: 60 # 等待任務完成的最大時長,單位為秒。默認為 0。
execution-two:
thread-name-prefix: task-two- # 線程池的線程名的前綴。默認為 task- ,根據自己應用來設置。
pool: # 線程池相關
core-size: 8 # 核心線程數,線程池創建時初始化的線程數。默認為 8。
max-size: 20 # 最大線程數,線程池最大的線程數,只有在緩沖隊列滿了之后,才會申請超過核心線程數的線程。默認為 Integer.MAX_VALUE。
keep-alive: 60s # 允許線程的空閑時間,當超過了核心線程之外的線程,在空閑時間到達之后會被銷毀。默認為 60 秒。
queue-capacity: 200 # 緩沖隊列大小,用來緩沖執行任務的隊列的大小。默認為 Integer.MAX_VALUE。
allow-core-thread-timeout: true # 是否允許核心線程超時,即開啟線程池的動態增長和縮小。默認為 true。
shutdown:
await-termination: true # 應用關閉時,是否等待定時任務執行完成。默認為 false ,建議設置為 true。
await-termination-period: 60 # 等待任務完成的最大時長,單位為秒。默認為 0。
- 配置執行器
import org.springframework.boot.autoconfigure.task.TaskExecutionProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.task.TaskExecutorBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* 異步配置
*
* @author CL
*
*/
@Configuration
@EnableAsync // 開啟異步的支持
public class AsyncConfig {
/**
* 執行器1
*/
public static final String EXECUTOR_ONE = "executor-one";
/**
* 執行器2
*/
public static final String EXECUTOR_TWO = "executor-two";
/**
* 配置執行器1
*
* @author CL
*
*/
@Configuration
public static class ExecutorOneConfiguration {
@Bean(name = EXECUTOR_ONE + "-properties")
@ConfigurationProperties(prefix = "spring.task.execution-one")
@Primary
public TaskExecutionProperties taskExecutionProperties() {
return new TaskExecutionProperties();
}
@Bean(name = EXECUTOR_ONE)
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
// 通過TaskExecutorBuilder對象創建ThreadPoolTaskExecutor
TaskExecutorBuilder builder = createTaskExecutorBuilder(this.taskExecutionProperties());
return builder.build();
}
}
/**
* 配置執行器2
*
* @author CL
*
*/
@Configuration
public static class ExecutorTwoConfiguration {
@Bean(name = EXECUTOR_TWO + "-properties")
@ConfigurationProperties(prefix = "spring.task.execution-two")
public TaskExecutionProperties taskExecutionProperties() {
return new TaskExecutionProperties();
}
@Bean(name = EXECUTOR_TWO)
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
// 通過TaskExecutorBuilder對象創建ThreadPoolTaskExecutor
TaskExecutorBuilder builder = createTaskExecutorBuilder(this.taskExecutionProperties());
return builder.build();
}
}
/**
* 創建TaskExecutorBuilder
*
* @param properties 配置,從配置文件讀取
* @return
*/
private static TaskExecutorBuilder createTaskExecutorBuilder(TaskExecutionProperties properties) {
TaskExecutionProperties.Pool pool = properties.getPool();
TaskExecutorBuilder builder = new TaskExecutorBuilder();
// 配置屬性,與配置文件對應
// 其它基本屬性
builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
// Pool 屬性
builder = builder.corePoolSize(pool.getCoreSize());
builder = builder.maxPoolSize(pool.getMaxSize());
builder = builder.keepAlive(pool.getKeepAlive());
builder = builder.queueCapacity(pool.getQueueCapacity());
builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
// Shutdown 屬性
TaskExecutionProperties.Shutdown shutdown = properties.getShutdown();
builder = builder.awaitTermination(shutdown.isAwaitTermination());
builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
return builder;
}
}
- 編寫業務邏輯,指定執行器
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import com.c3stones.config.AsyncConfig;
import lombok.extern.slf4j.Slf4j;
/**
* 自定義執行器
*
* @author CL
*
*/
@Service
@Slf4j
public class TestExecutorService {
/**
* 指定執行器1
*
* @return
*/
@Async(AsyncConfig.EXECUTOR_ONE)
public String get() {
log.info("調用TestExecutorService.get()!");
return "get";
}
/**
* 指定執行器2
*
* @return
*/
@Async(AsyncConfig.EXECUTOR_TWO)
public String get2() {
log.info("調用TestExecutorService.get2()!");
return "get2";
}
}
- 編寫啟動類
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* 啟動類
*
* @author CL
*
*/
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
- 測試自定義執行器
import java.time.Duration;
import java.time.LocalDateTime;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.c3stones.Application;
import com.c3stones.service.TestExecutorService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
/**
* 測試自定義執行器
*
* @author CL
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
@Slf4j
public class TestExecutor {
@Autowired
private TestExecutorService executorService;
/**
* 自定義執行器
*/
@Test
@SneakyThrows
public void testExecute() {
LocalDateTime startTime = LocalDateTime.now();
executorService.get();
executorService.get2();
Thread.sleep(2000);
LocalDateTime endTime = LocalDateTime.now();
log.info("總耗時:" + Duration.between(startTime, endTime).toMillis() + " ms");
}
}
控制台打印:
2020-05-29 13:13:32.070 INFO 15564 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'executor-one'
2020-05-29 13:13:32.114 INFO 15564 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'executor-two'
2020-05-29 13:13:32.201 INFO 15564 --- [ main] com.c3stones.test.TestExecutor : Started TestExecutor in 1.427 seconds (JVM running for 2.584)
2020-05-29 13:13:32.576 INFO 15564 --- [ task-two-1] c.c3stones.service.TestExecutorService : 調用TestExecutorService.get2()!
2020-05-29 13:13:32.577 INFO 15564 --- [ task-one-1] c.c3stones.service.TestExecutorService : 調用TestExecutorService.get()!
2020-05-29 13:13:34.554 INFO 15564 --- [ main] com.c3stones.test.TestExecutor : 總耗時:2006 ms
2020-05-29 13:13:34.581 INFO 15564 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'executor-two'
2020-05-29 13:13:34.581 INFO 15564 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'executor-one'