SpringBoot異步調用--@Async詳解


1. 概述

  在日常開發中,為了提高主線程的效率,往往需要采用異步調用處理,例如系統日志等。在實際業務場景中,可以使用消息中間件如RabbitMQ、RocketMQ、Kafka等來解決。假如對高可用沒有太高的要求,也可以使用線程池或者隊列來解決。

2. 創建工程

  • 創建Maven工程
  • 修改配置文件
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.c3stones</groupId>
	<artifactId>spring-boot-async-demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>spring-boot-async-demo</name>
	<description>Spring Boot Simple Demo</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.6.RELEASE</version>
		<relativePath />
	</parent>

	<properties>
		<java.version>1.8</java.version>
		<maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>com.google.guava</groupId>
			<artifactId>guava</artifactId>
			<version>28.2-jre</version>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

</project>
  • 示例1:創建線程池,異步調用
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

public class SimpleDemo {

	// 創建固定數目線程的線程池
	// 阿里巴巴Java開發手冊中提到可能存在OOM(OutOfMemory)內存溢出異常
//	private static ExecutorService executorService = Executors.newFixedThreadPool(5);

	// 推薦使用com.google.guava的ThreadFactoryBuilder來創建線程池
	private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("simple-threadpool-%d")
			.build();
	/**
	 * java.util.concurrent.ThreadPoolExecutor.ThreadPoolExecutor(int corePoolSize,
	 * int maximumPoolSize, long keepAliveTime, TimeUnit unit,
	 * BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
	 * RejectedExecutionHandler handler)
	 * 
	 * @param corePoolSize    線程池大小
	 * @param maximumPoolSize 最大線程數
	 * @param keepAliveTime   當線程大於線程池大小時,最長等待時間
	 * @param unit            時間單位
	 * @param workQueue       任務隊列
	 * @param threadFactory   指定線程工廠
	 * @param handler         當線程池界限和隊列容量時,阻止線程處理
	 */
	private static ExecutorService threadPool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS,
			new LinkedBlockingQueue<Runnable>(1024), threadFactory, new ThreadPoolExecutor.AbortPolicy());

	public static void main(String[] args) {
		// 提交線程到線程池
		for (int i = 0; i < 5; i++) {
			threadPool.execute(new SimpleThread1());
			threadPool.execute(new SimpleThread2());
		}

		// 關閉
		threadPool.shutdown();
	}
}

class SimpleThread1 implements Runnable {

	private static Logger logger = LoggerFactory.getLogger(SimpleThread1.class);

	@Override
	public void run() {
		try {
			logger.info("線程 SimpleThread1 被調用!");
			Thread.sleep(3000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}

class SimpleThread2 implements Runnable {

	private static Logger logger = LoggerFactory.getLogger(SimpleThread2.class);

	@Override
	public void run() {
		try {
			logger.info("線程 SimpleThread2 被調用!");
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}

控制台打印:

14:06:05.009 [simple-threadpool-1] INFO com.c3stones.simple.SimpleThread2 - 線程 SimpleThread2 被調用!
14:06:05.009 [simple-threadpool-3] INFO com.c3stones.simple.SimpleThread2 - 線程 SimpleThread2 被調用!
14:06:05.010 [simple-threadpool-0] INFO com.c3stones.simple.SimpleThread1 - 線程 SimpleThread1 被調用!
14:06:05.009 [simple-threadpool-4] INFO com.c3stones.simple.SimpleThread1 - 線程 SimpleThread1 被調用!
14:06:05.009 [simple-threadpool-2] INFO com.c3stones.simple.SimpleThread1 - 線程 SimpleThread1 被調用!
14:06:06.017 [simple-threadpool-3] INFO com.c3stones.simple.SimpleThread2 - 線程 SimpleThread2 被調用!
14:06:06.018 [simple-threadpool-1] INFO com.c3stones.simple.SimpleThread1 - 線程 SimpleThread1 被調用!
14:06:07.017 [simple-threadpool-3] INFO com.c3stones.simple.SimpleThread2 - 線程 SimpleThread2 被調用!
14:06:08.017 [simple-threadpool-3] INFO com.c3stones.simple.SimpleThread1 - 線程 SimpleThread1 被調用!
14:06:08.018 [simple-threadpool-2] INFO com.c3stones.simple.SimpleThread2 - 線程 SimpleThread2 被調用!

  問題:
  (1) 當進程被異常關閉,會導致存儲在線程池或者隊列的線程丟失。
  (2) 但是消息隊列中的消息不會因為JVM進程關閉而丟失,依然存儲在消息隊列所在服務器上。

3. 快速入門

  • 開啟異步支持
/**
 * 啟動類
 * 
 * @author CL
 *
 */
@SpringBootApplication
@EnableAsync // 開啟異步支持
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

}
  • 編寫業務代碼
import org.springframework.stereotype.Service;

import lombok.extern.slf4j.Slf4j;

/**
 * 快速入門
 * 
 * @author CL
 *
 */
@Service
@Slf4j
public class TestQuickService {

	@SneakyThrows
	public String get() {
		log.info("調用TestQuickService.get()!");
		Thread.sleep(2000);
		return "get";
	}

	@SneakyThrows
	public String get2() {
		log.info("調用TestQuickService.get2()!");
		Thread.sleep(5000);
		return "get2";
	}
	
}
  • 測試同步調用
/**
 * 測試快速入門
 * 
 * @author CL
 *
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
@Slf4j
public class TestQuick {

	@Autowired
	private TestQuickService quickService;

	/**
	 * 測試同步調用
	 */
	@Test
	public void testSync() {
		LocalDateTime startTime = LocalDateTime.now();

		quickService.get();
		quickService.get2();

		LocalDateTime endTime = LocalDateTime.now();

		log.info("同步調用,總耗時:" + Duration.between(startTime, endTime).toMillis() + " ms");
	}

}

控制台打印:

2020-05-28 16:39:36.853  INFO 6220 --- [           main] com.c3stones.quick.TestQuickService      : 調用TestQuickService.get()!
2020-05-28 16:39:38.853  INFO 6220 --- [           main] com.c3stones.quick.TestQuickService      : 調用TestQuickService.get2()!
2020-05-28 16:39:43.857  INFO 6220 --- [           main] com.c3stones.test.TestQuick              : 同步調用,總耗時:7000 ms

  可以看出總耗時為兩個業務方法的總耗時,並且在主線程中執行。

  • 異步調用,添加 @Async 注解
@Async
public String asyncGet() {
	return this.get();
}

@Async
public String asyncGet2() {
	return this.get2();
}
  • 測試異步調用
/**
 * 測試異步調用
 */
@Test
public void testAsync() {
	LocalDateTime startTime = LocalDateTime.now();

	quickService.asyncGet();
	quickService.asyncGet2();

	LocalDateTime endTime = LocalDateTime.now();

	log.info("異步調用,總耗時:" + Duration.between(startTime, endTime).toMillis() + " ms");
}

控制台打印:

2020-05-28 17:11:10.550  INFO 908 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2020-05-28 17:11:10.563  INFO 908 --- [           main] com.c3stones.test.TestQuick              : 異步調用,總耗時:45 ms
2020-05-28 17:11:10.574  INFO 908 --- [         task-2] com.c3stones.quick.TestQuickService      : 調用TestQuickService.get2()!
2020-05-28 17:11:10.575  INFO 908 --- [         task-1] com.c3stones.quick.TestQuickService      : 調用TestQuickService.get()!
2020-05-28 17:11:10.587  INFO 908 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'

  可以看出總耗時不受業務方法時間的影響,並且都在異步線程池中執行的。
  注意:實際兩個方法並沒有執行完成。

4. 異步調用但主線程阻塞

  實際場景中肯定不可能出現上述的問題。肯定希望既能異步調用,並且主線程能阻塞,直到方法執行完成。

  • 改進代碼,返回Futrue對象
@Async
public Future<String> asyncFutureGet() {
	return AsyncResult.forValue(this.get());
}

@Async
public Future<String> asyncFutureGet2() {
	return AsyncResult.forValue(this.get2());
}
  • 測試異步調用並阻塞主線程
/**
 * 測試異步調用並阻塞主線程
 */
@Test
@SneakyThrows
public void testAsyncFuture() {
	LocalDateTime startTime = LocalDateTime.now();

	// 執行
	Future<String> getFuture = quickService.asyncFutureGet();
	Future<String> get2Future = quickService.asyncFutureGet2();

	// 阻塞等待執行結果
	getFuture.get();
	get2Future.get();

	LocalDateTime endTime = LocalDateTime.now();

	log.info("異步調用,總耗時:" + Duration.between(startTime, endTime).toMillis() + " ms");
}

控制台打印:

2020-05-28 17:14:57.434  INFO 5784 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2020-05-28 17:14:57.468  INFO 5784 --- [         task-2] com.c3stones.quick.TestQuickService      : 調用TestQuickService.get2()!
2020-05-28 17:14:57.470  INFO 5784 --- [         task-1] com.c3stones.quick.TestQuickService      : 調用TestQuickService.get()!
2020-05-28 17:15:02.474  INFO 5784 --- [           main] com.c3stones.test.TestQuick              : 異步調用,總耗時:5066 ms
2020-05-28 17:15:02.511  INFO 5784 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'

  可以看出總耗時由耗時較長的方法決定,並且在異步線程池中執行。

5. Spring Task配置

  • 編寫配置文件application.yml
spring:
   task: # Spring執行器配置,對應TaskExecutionProperties配置類。對於Spring異步任務,會使用該執行器。
      execution:
         thread-name-prefix: task- # 線程池的線程名的前綴。默認為 task- ,根據自己應用來設置。
         pool: # 線程池相關
            core-size: 8 # 核心線程數,線程池創建時初始化的線程數。默認為 8。
            max-size: 20 # 最大線程數,線程池最大的線程數,只有在緩沖隊列滿了之后,才會申請超過核心線程數的線程。默認為 Integer.MAX_VALUE。
            keep-alive: 60s # 允許線程的空閑時間,當超過了核心線程之外的線程,在空閑時間到達之后會被銷毀。默認為 60 秒。
            queue-capacity: 200 # 緩沖隊列大小,用來緩沖執行任務的隊列的大小。默認為 Integer.MAX_VALUE。
            allow-core-thread-timeout: true # 是否允許核心線程超時,即開啟線程池的動態增長和縮小。默認為 true。
         shutdown:
            await-termination: true # 應用關閉時,是否等待定時任務執行完成。默認為 false ,建議設置為 true。
            await-termination-period: 60 # 等待任務完成的最大時長,單位為秒。默認為 0。

  在spring.task.execution配置項Spring Task調度任務的配置對應TaskExecutionProperties配置類。
  Spring Boot TaskExecutionAutoConfiguration自動化配置類,實現Spring Task的自動配置,創建ThreadPoolTaskExecutor 基於線程池的任務執行器。本質上ThreadPoolTaskExecutor是基於 ThreadPoolExecutor的封裝,主要增加了提交任務,返回ListenableFuture 對象的功能。
  spring.task.execution.shutdown配置項是為了實現Spring Task異步任務的優雅關閉。
  在異步任務執行過程中,如果應用開始關閉,把異步任務需要使用到的Spring Bean一並銷毀,例如數據庫連接池等,但是異步任務還在執行中,當需要訪問數據庫時,就會導致報錯。所以通過配置await-termination = true來實現應用關閉時,等待異步任務執行完成。這樣應用在關閉時Spring 會優先等待ThreadPoolTaskSchedule 執行完任務之后,再開始Spring Bean的銷毀。
  同時,又考慮到我們不可能無限等待異步任務全部執行結束,因此可以配置await-termination-period = 60,等待任務完成的最大時長,單位為秒。具體設置需與根據業務場景決定。

6. 異步回調

  • AsyncResult類,異步結果
  • ListenableFuture接口,監聽Future
  • Future接口,返回異步計算結果
      org.springframework.scheduling.annotation.AsyncResult<V>類實現了org.springframework.util.concurrent.ListenableFuture<T>接口,org.springframework.util.concurrent.ListenableFuture<T>又繼承了java.util.concurrent.Future<V>接口。
      Future表示一個異步計算任務,當任務完成時得到計算結果。如果希望執行完成后馬上得到結果,則需要另一個線程不斷的查詢監控狀態,毋庸置疑增加了一定的復雜度。ListenableFuture對java原生Future進行擴展增強,其實就是監聽Future是否執行完成,並自動調用回調方法,減少並發程序的復雜度。
    源碼分析:
public interface Future<V> {

	/**
	 * 1. 如果任務沒有開始,mayInterruptIfRunning = true/false,則直接返回false;<br/>
	 * 2. 如果任務正在執行中,mayInterruptIfRunning = true,則試圖中斷任務,如果成功中斷,則返回true;<br/>
	 * 3. 如果任務正在執行中,mayInterruptIfRunning = false,則不會對執行中的線程產生影響,則直接返回false;<br/>
	 * 4. 如果任務已經完成,mayInterruptIfRunning = true/false,則直接返回false;<br/>
	 * 
	 * @param mayInterruptIfRunning 是否中斷執行中的線程
	 * @return true/false
	 */
	boolean cancel(boolean mayInterruptIfRunning);

	/**
	 * 如果任務被中斷,則返回true
	 * 
	 * @return true/false
	 */
	boolean isCancelled();

	/**
	 * 如果任務已經完成,無論是正常結束或是中端斷都返回 true
	 * 
	 * @return true/false
	 */
	boolean isDone();

	/**
	 * 獲得異步結果,如果任務還在執行中,則阻塞直到異步任務完成
	 * 
	 * @return
	 * @throws InterruptedException
	 * @throws ExecutionException
	 */
	V get() throws InterruptedException, ExecutionException;

	/**
	 * 獲得異步結果,如果任務還在執行中,則阻塞直到異步任務完成<br/>
	 * 但是有時間限制,如果阻塞時間超過設定的timeout,則拋出異常。
	 * 
	 * @param  timeout 超時時間
	 * @param  unit    時間單位枚舉
	 * @return
	 * @throws InterruptedException
	 * @throws ExecutionException
	 */
	V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
public interface ListenableFuture<T> extends Future<T> {

	/**
	 * 回調方法,處理異常結果(不論成功還是異常)
	 * 
	 * @param callback
	 */
	void addCallback(ListenableFutureCallback<? super T> callback);

	/**
	 * 回調方法,分別處理成功和異常結果
	 * 
	 * @param successCallback 成功回調
	 * @param failureCallback 異常回調
	 */
	void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);

	/**
	 * 轉換成JDK1.8提供的CompletableFuture類,該類提供了非常強大的Future擴展功能
	 * 
	 * @return
	 */
	default CompletableFuture<T> completable() {
		CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this);
		addCallback(completable::complete, completable::completeExceptionally);
		return completable;
	}

}
public class AsyncResult<V> implements ListenableFuture<V> {

	@Nullable
	private final V value;

	@Nullable
	private final Throwable executionException;

	public AsyncResult(@Nullable V value) {
		this(value, null);
	}

	private AsyncResult(@Nullable V value, @Nullable Throwable ex) {
		this.value = value;
		this.executionException = ex;
	}

	/**
	 * AsyncResult表示異常結果,則表示取消失敗
	 * 
	 * @param mayInterruptIfRunning 是否中斷執行中的線程
	 * @return false
	 */
	@Override
	public boolean cancel(boolean mayInterruptIfRunning) {
		return false;
	}

	/**
	 * AsyncResult表示異常結果,則表示取消失敗
	 * 
	 * @return false
	 */
	@Override
	public boolean isCancelled() {
		return false;
	}

	/**
	 * AsyncResult表示異常結果,則表示執行完成
	 * 
	 * @return true
	 */
	@Override
	public boolean isDone() {
		return true;
	}

	/**
	 * 獲得異步結果,如果任務還在執行中,則阻塞直到異步任務完成
	 * 
	 * @return
	 * @throws ExecutionException
	 */
	@Override
	@Nullable
	public V get() throws ExecutionException {
		if (this.executionException != null) {
			throw (this.executionException instanceof ExecutionException ? (ExecutionException) this.executionException
					: new ExecutionException(this.executionException));
		}
		return this.value;
	}

	/**
	 * 獲得異步結果,如果任務還在執行中,則阻塞直到異步任務完成<br/>
	 * 但是有時間限制,如果阻塞時間超過設定的timeout,則拋出異常。
	 * 
	 * @param timeout 超時時間
	 * @param unit    時間單位枚舉
	 * @return
	 * @throws InterruptedException
	 * @throws ExecutionException
	 */
	@Override
	@Nullable
	public V get(long timeout, TimeUnit unit) throws ExecutionException {
		return get();
	}

	/**
	 * 回調方法,處理成功和異常的結果
	 * 
	 * @param callback
	 */
	@Override
	public void addCallback(ListenableFutureCallback<? super V> callback) {
		addCallback(callback, callback);
	}

	/**
	 * 回調方法,分別處理成功和異常的結果<br/>
	 * catch中忽略了回調中發生的異常。當多個對調方法執行時,不會因為某一個回調方法異常而影響其他的回調方法
	 * 
	 * @param successCallback 成功回調
	 * @param failureCallback 異常回調
	 */
	@Override
	public void addCallback(SuccessCallback<? super V> successCallback, FailureCallback failureCallback) {
		try {
			if (this.executionException != null) {
				failureCallback.onFailure(exposedException(this.executionException));
			} else {
				successCallback.onSuccess(this.value);
			}
		} catch (Throwable ex) {
			// Ignore
		}
	}

	/**
	 * 返回CompletableFuture
	 * 
	 * @return
	 */
	@Override
	public CompletableFuture<V> completable() {
		if (this.executionException != null) {
			CompletableFuture<V> completable = new CompletableFuture<>();
			completable.completeExceptionally(exposedException(this.executionException));
			return completable;
		} else {
			return CompletableFuture.completedFuture(this.value);
		}
	}

	/**
	 * 執行成功,返回ListenableFuture
	 * 
	 * @param <V>
	 * @param value
	 * @return
	 */
	public static <V> ListenableFuture<V> forValue(V value) {
		return new AsyncResult<>(value, null);
	}

	/**
	 * 執行失敗,返回ListenableFuture,執行回調方法
	 * 
	 * @param <V>
	 * @param ex
	 * @return
	 */
	public static <V> ListenableFuture<V> forExecutionException(Throwable ex) {
		return new AsyncResult<>(null, ex);
	}

	/**
	 * 獲得具體的異常
	 * 
	 * @param original
	 * @return
	 */
	private static Throwable exposedException(Throwable original) {
		if (original instanceof ExecutionException) {
			Throwable cause = original.getCause();
			if (cause != null) {
				return cause;
			}
		}
		return original;
	}

}
  • ListenableFutureTask類,異步增強ListenableFuture
      org.springframework.util.concurrent.ListenableFutureTask<T>類繼承了java.util.concurrent.FutureTask<T>類,實現了org.springframework.util.concurrent.ListenableFuture<T>接口。
public class ListenableFutureTask<T> extends FutureTask<T> implements ListenableFuture<T> {

	/**
	 * 暫存回調
	 */
	private final ListenableFutureCallbackRegistry<T> callbacks = new ListenableFutureCallbackRegistry<>();

	public ListenableFutureTask(Callable<T> callable) {
		super(callable);
	}

	public ListenableFutureTask(Runnable runnable, @Nullable T result) {
		super(runnable, result);
	}

	/**
	 * 回調方法,處理成功和異常的結果
	 * 
	 * @param callback
	 */
	@Override
	public void addCallback(ListenableFutureCallback<? super T> callback) {
		this.callbacks.addCallback(callback);
	}

	/**
	 * 回調方法,分別處理成功和異常的結果<br/>
	 * catch中忽略了回調中發生的異常。當多個對調方法執行時,不會因為某一個回調方法異常而影響其他的回調方法
	 * 
	 * @param successCallback 成功回調
	 * @param failureCallback 異常回調
	 */
	@Override
	public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) {
		this.callbacks.addSuccessCallback(successCallback);
		this.callbacks.addFailureCallback(failureCallback);
	}

	/**
	 * 返回CompletableFuture
	 * 
	 * @return
	 */
	@Override
	public CompletableFuture<T> completable() {
		CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this);
		this.callbacks.addSuccessCallback(completable::complete);
		this.callbacks.addFailureCallback(completable::completeExceptionally);
		return completable;
	}

	/**
	 * 重寫FutureTask中的done方法
	 */
	@Override
	protected void done() {
		Throwable cause;
		try {
			// 獲得執行結果
			T result = get();
			// 執行成功,執行成功回調方法
			this.callbacks.success(result);
			return;
		} catch (InterruptedException ex) { // 中斷異常,並返回
			Thread.currentThread().interrupt();
			return;
		} catch (ExecutionException ex) { // 獲得具體的異常,並設置到cause中
			cause = ex.getCause();
			if (cause == null) {
				cause = ex;
			}
		} catch (Throwable ex) { // 設置到cause中
			cause = ex;
		}
		// 執行異常,執行異常回調方法
		this.callbacks.failure(cause);
	}

}

7. 測試異步回調

  • 編寫業務代碼
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
 * 測試回調
 * 
 * @author CL
 *
 */
@Service
@Slf4j
public class TestCallbackService {

	@SneakyThrows
	public String get() {
		log.info("調用TestCallbackService.get()!");
		Thread.sleep(2000);
		return "get";
	}

	@Async
	public ListenableFuture<String> asyncCallback() {
		try {
			return AsyncResult.forValue(this.get());
		} catch (Throwable ex) {
			return AsyncResult.forExecutionException(ex);
		}
	}
}
  • 測試異步回調
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
 * 測試異步回調
 * 
 * @author CL
 *
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
@Slf4j
public class TestCallback {

	@Autowired
	private TestCallbackService callbackService;

	/**
	 * 測試異步回調
	 */
	@Test
	@SneakyThrows
	public void testCallback() {
		LocalDateTime startTime = LocalDateTime.now();

		ListenableFuture<String> listenableFuture = callbackService.asyncCallback();
		log.info("返回類型為:" + listenableFuture.getClass().getSimpleName());

		// 分別增加成功和異常的回調
		listenableFuture.addCallback(new SuccessCallback<String>() {

			@Override
			public void onSuccess(String result) {
				log.info("執行成功!");
			}

		}, new FailureCallback() {

			@Override
			public void onFailure(Throwable ex) {
				log.error("執行異常:" + ex);
			}

		});
		// 增加統一的成功和異常回調
		listenableFuture.addCallback(new ListenableFutureCallback<String>() {

			@Override
			public void onSuccess(String result) {
				log.info("執行成功!");
			}

			@Override
			public void onFailure(Throwable ex) {
				log.error("執行異常:" + ex);
			}

		});
		// 阻塞主線程
		listenableFuture.get();

		LocalDateTime endTime = LocalDateTime.now();

		log.info("異步調用,總耗時:" + Duration.between(startTime, endTime).toMillis() + " ms");
	}

}

控制台打印:

2020-05-29 11:56:24.815  INFO 17920 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2020-05-29 11:56:24.822  INFO 17920 --- [           main] com.c3stones.test.TestCallback           : 返回類型為:ListenableFutureTask
2020-05-29 11:56:24.835  INFO 17920 --- [         task-1] c.c3stones.callback.TestCallbackService  : 調用TestCallbackService.get()!
2020-05-29 11:56:26.838  INFO 17920 --- [         task-1] com.c3stones.test.TestCallback           : 執行成功!
2020-05-29 11:56:26.838  INFO 17920 --- [         task-1] com.c3stones.test.TestCallback           : 執行成功!
2020-05-29 11:56:26.843  INFO 17920 --- [           main] com.c3stones.test.TestCallback           : 異步調用,總耗時:2055 ms
2020-05-29 11:56:26.874  INFO 17920 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'

8. 全局異步異常處理器

  實現AsyncUncaughtExceptionHandler接口即可實現異步調用的統一異常處理。
  注意:AsyncUncaughtExceptionHandler返回非Future類型 的異步調用方法。因此返回Future類型的異步方法只能使用回調方法處理。
  從AsyncExecutionAspectSupporthandleError方法可以看出上述結論。

protected void handleError(Throwable ex, Method method, Object... params) throws Exception {
        // 返回類型為Future,直接拋出異常
		if (Future.class.isAssignableFrom(method.getReturnType())) {
			ReflectionUtils.rethrowException(ex);
		}
		// 否則,交給AsyncUncaughtExceptionHandler來處理
		else {
			// Could not transmit the exception to the caller with default executor
			try {
				this.exceptionHandler.obtain().handleUncaughtException(ex, method, params);
			}
			catch (Throwable ex2) {
				logger.warn("Exception handler for async method '" + method.toGenericString() +
						"' threw unexpected exception itself", ex2);
			}
		}
	}
  • 編寫全局異步異常處理器
import java.lang.reflect.Method;

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

/**
 * 全局異步異常處理器
 * 
 * @author CL
 *
 */
@Component
@Slf4j
public class GlobalAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {

	/**
	 * 處理未捕獲的異步調用異常
	 */
	@Override
	public void handleUncaughtException(Throwable ex, Method method, Object... params) {
		log.error("方法:{}, 參數:{},調用異常:{}", method, params, ex.getMessage());
	}

}
  • 編寫異步配置類
import java.util.concurrent.Executor;

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;

import com.c3stones.handle.GlobalAsyncUncaughtExceptionHandler;

@Configuration
@EnableAsync // 開啟異步支持(和Application啟動類中保留一個即可,建議保留在此處)
public class AsyncConfig implements AsyncConfigurer {

	@Autowired
	private GlobalAsyncUncaughtExceptionHandler asyncUncaughtExceptionHandler;

	/**
	 * 返回 Spring Task異步任務的默認執行器。<br/>
	 * 返回了null,則使用TaskExecutionAutoConfiguration自動化配置類創建的ThreadPoolTaskExecutor任務執行器作為默認執行器
	 */
	@Override
	public Executor getAsyncExecutor() {
		return null;
	}

	/**
	 * 返回自定義的全局異步異常處理器
	 */
	@Override
	public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
		return asyncUncaughtExceptionHandler;
	}

}
  • 測試全局異步異常處理
import java.time.Duration;
import java.time.LocalDateTime;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.c3stones.Application;
import com.c3stones.exceptionHandle.TestAsyncExceptionService;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
 * 測試異步異常處理
 * 
 * @author CL
 *
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
@Slf4j
public class TestAsyncException {

	@Autowired
	private TestAsyncExceptionService asyncExceptionService;

	/**
	 * 測試異步異常
	 */
	@Test
	@SneakyThrows
	public void testAsyncException() {
		LocalDateTime startTime = LocalDateTime.now();

		asyncExceptionService.asyncException();

		// 異步調用成功
		Thread.sleep(5000);

		LocalDateTime endTime = LocalDateTime.now();

		log.info("異步異常調用,總耗時:" + Duration.between(startTime, endTime).toMillis() + " ms");
	}

}

控制台打印:

2020-05-29 11:59:12.804  INFO 12724 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2020-05-29 11:59:12.821 ERROR 12724 --- [         task-1] .c.h.GlobalAsyncUncaughtExceptionHandler : 方法:public java.lang.String com.c3stones.exceptionHandle.TestAsyncExceptionService.asyncException(), 參數:[],調用異常:TestAsyncExceptionService.asyncException拋出異常!
2020-05-29 11:59:17.817  INFO 12724 --- [           main] com.c3stones.test.TestAsyncException     : 異步異常調用,總耗時:5048 ms
2020-05-29 11:59:17.856  INFO 12724 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'

9. 自定義任務執行器

  使用Spring Boot TaskExecutionAutoConfiguration 自動化配置類,實現自動配置ThreadPoolTaskExecutor任務執行器。

  • 創建Maven工程
  • 修改pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.c3stones</groupId>
	<artifactId>spring-boot-async-demo2</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>spring-boot-async-demo2</name>
	<description>Spring Boot Async Demo2</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.6.RELEASE</version>
		<relativePath />
	</parent>

	<properties>
		<java.version>1.8</java.version>
		<maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>
	
</project>
  • 編寫application.yml
spring:
   task: # Spring執行器配置,對應TaskExecutionProperties配置類。對於Spring異步任務,會使用該執行器。
      execution-one:
         thread-name-prefix: task-one- # 線程池的線程名的前綴。默認為 task- ,根據自己應用來設置。
         pool: # 線程池相關
            core-size: 8 # 核心線程數,線程池創建時初始化的線程數。默認為 8。
            max-size: 20 # 最大線程數,線程池最大的線程數,只有在緩沖隊列滿了之后,才會申請超過核心線程數的線程。默認為 Integer.MAX_VALUE。
            keep-alive: 60s # 允許線程的空閑時間,當超過了核心線程之外的線程,在空閑時間到達之后會被銷毀。默認為 60 秒。
            queue-capacity: 200 # 緩沖隊列大小,用來緩沖執行任務的隊列的大小。默認為 Integer.MAX_VALUE。
            allow-core-thread-timeout: true # 是否允許核心線程超時,即開啟線程池的動態增長和縮小。默認為 true。
         shutdown:
            await-termination: true # 應用關閉時,是否等待定時任務執行完成。默認為 false ,建議設置為 true。
            await-termination-period: 60 # 等待任務完成的最大時長,單位為秒。默認為 0。
      execution-two:
         thread-name-prefix: task-two- # 線程池的線程名的前綴。默認為 task- ,根據自己應用來設置。
         pool: # 線程池相關
            core-size: 8 # 核心線程數,線程池創建時初始化的線程數。默認為 8。
            max-size: 20 # 最大線程數,線程池最大的線程數,只有在緩沖隊列滿了之后,才會申請超過核心線程數的線程。默認為 Integer.MAX_VALUE。
            keep-alive: 60s # 允許線程的空閑時間,當超過了核心線程之外的線程,在空閑時間到達之后會被銷毀。默認為 60 秒。
            queue-capacity: 200 # 緩沖隊列大小,用來緩沖執行任務的隊列的大小。默認為 Integer.MAX_VALUE。
            allow-core-thread-timeout: true # 是否允許核心線程超時,即開啟線程池的動態增長和縮小。默認為 true。
         shutdown:
            await-termination: true # 應用關閉時,是否等待定時任務執行完成。默認為 false ,建議設置為 true。
            await-termination-period: 60 # 等待任務完成的最大時長,單位為秒。默認為 0。
  • 配置執行器
import org.springframework.boot.autoconfigure.task.TaskExecutionProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.task.TaskExecutorBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
 * 異步配置
 * 
 * @author CL
 *
 */
@Configuration
@EnableAsync // 開啟異步的支持
public class AsyncConfig {

	/**
	 * 執行器1
	 */
	public static final String EXECUTOR_ONE = "executor-one";
	/**
	 * 執行器2
	 */
	public static final String EXECUTOR_TWO = "executor-two";

	/**
	 * 配置執行器1
	 * 
	 * @author CL
	 *
	 */
	@Configuration
	public static class ExecutorOneConfiguration {

		@Bean(name = EXECUTOR_ONE + "-properties")
		@ConfigurationProperties(prefix = "spring.task.execution-one")
		@Primary
		public TaskExecutionProperties taskExecutionProperties() {
			return new TaskExecutionProperties();
		}

		@Bean(name = EXECUTOR_ONE)
		public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
			// 通過TaskExecutorBuilder對象創建ThreadPoolTaskExecutor
			TaskExecutorBuilder builder = createTaskExecutorBuilder(this.taskExecutionProperties());
			return builder.build();
		}

	}

	/**
	 * 配置執行器2
	 * 
	 * @author CL
	 *
	 */
	@Configuration
	public static class ExecutorTwoConfiguration {

		@Bean(name = EXECUTOR_TWO + "-properties")
		@ConfigurationProperties(prefix = "spring.task.execution-two")
		public TaskExecutionProperties taskExecutionProperties() {
			return new TaskExecutionProperties();
		}

		@Bean(name = EXECUTOR_TWO)
		public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
			// 通過TaskExecutorBuilder對象創建ThreadPoolTaskExecutor
			TaskExecutorBuilder builder = createTaskExecutorBuilder(this.taskExecutionProperties());
			return builder.build();
		}

	}

	/**
	 * 創建TaskExecutorBuilder
	 * 
	 * @param properties 配置,從配置文件讀取
	 * @return
	 */
	private static TaskExecutorBuilder createTaskExecutorBuilder(TaskExecutionProperties properties) {
		TaskExecutionProperties.Pool pool = properties.getPool();
		TaskExecutorBuilder builder = new TaskExecutorBuilder();
		// 配置屬性,與配置文件對應
		// 其它基本屬性
		builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
		// Pool 屬性
		builder = builder.corePoolSize(pool.getCoreSize());
		builder = builder.maxPoolSize(pool.getMaxSize());
		builder = builder.keepAlive(pool.getKeepAlive());
		builder = builder.queueCapacity(pool.getQueueCapacity());
		builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
		// Shutdown 屬性
		TaskExecutionProperties.Shutdown shutdown = properties.getShutdown();
		builder = builder.awaitTermination(shutdown.isAwaitTermination());
		builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
		return builder;
	}

}
  • 編寫業務邏輯,指定執行器
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import com.c3stones.config.AsyncConfig;

import lombok.extern.slf4j.Slf4j;

/**
 * 自定義執行器
 * 
 * @author CL
 *
 */
@Service
@Slf4j
public class TestExecutorService {

	/**
	 * 指定執行器1
	 * 
	 * @return
	 */
	@Async(AsyncConfig.EXECUTOR_ONE)
	public String get() {
		log.info("調用TestExecutorService.get()!");
		return "get";
	}

	/**
	 * 指定執行器2
	 * 
	 * @return
	 */
	@Async(AsyncConfig.EXECUTOR_TWO)
	public String get2() {
		log.info("調用TestExecutorService.get2()!");
		return "get2";
	}

}
  • 編寫啟動類
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 啟動類
 * 
 * @author CL
 *
 */
@SpringBootApplication
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

}
  • 測試自定義執行器
import java.time.Duration;
import java.time.LocalDateTime;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.c3stones.Application;
import com.c3stones.service.TestExecutorService;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
 * 測試自定義執行器
 * 
 * @author CL
 *
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
@Slf4j
public class TestExecutor {

	@Autowired
	private TestExecutorService executorService;

	/**
	 * 自定義執行器
	 */
	@Test
	@SneakyThrows
	public void testExecute() {
		LocalDateTime startTime = LocalDateTime.now();

		executorService.get();
		executorService.get2();

		Thread.sleep(2000);

		LocalDateTime endTime = LocalDateTime.now();

		log.info("總耗時:" + Duration.between(startTime, endTime).toMillis() + " ms");
	}

}

控制台打印:

2020-05-29 13:13:32.070  INFO 15564 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'executor-one'
2020-05-29 13:13:32.114  INFO 15564 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'executor-two'
2020-05-29 13:13:32.201  INFO 15564 --- [           main] com.c3stones.test.TestExecutor           : Started TestExecutor in 1.427 seconds (JVM running for 2.584)
2020-05-29 13:13:32.576  INFO 15564 --- [     task-two-1] c.c3stones.service.TestExecutorService   : 調用TestExecutorService.get2()!
2020-05-29 13:13:32.577  INFO 15564 --- [     task-one-1] c.c3stones.service.TestExecutorService   : 調用TestExecutorService.get()!
2020-05-29 13:13:34.554  INFO 15564 --- [           main] com.c3stones.test.TestExecutor           : 總耗時:2006 ms
2020-05-29 13:13:34.581  INFO 15564 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'executor-two'
2020-05-29 13:13:34.581  INFO 15564 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'executor-one'

10. 項目地址

  spring-boot-async


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM