一、什么是同步調用

瀏覽器發起請求,Web服務器開一個線程處理,處理完把處理結果返回瀏覽器。好像沒什么好說的了,絕大多數Web服務器都如此般處理。現在想想如果處理的過程中需要調用后端的一個業務邏輯服務器

請求處理線程會在Call了之后等待Return,自身處於阻塞狀態。這也是絕大多數Web服務器的做法。一般此種做法主要適用於,后端處理響應比較快,並且並發數比較低的情況。
主要弊端,在高並發請求下,請求處理線程的短缺!因為請求處理線程的總數是有限的,如果類似的請求多了,所有的處理線程處於阻塞的狀態,那新的請求也就無法處理了,也就所謂影響了服務器的吞吐能力。要更加好地發揮服務器的全部性能,就要使用異步。
二、什么是異步

最大的不同在於請求處理線程對后台處理的調用使用了“invoke”的方式,就是說調了之后直接返回,而不等待,這樣請求處理線程就“自由”了,它可以接着去處理別的請求,當后端處理完成后,會鈎起一個回調處理線程來處理調用的結果,這個回調處理線程跟請求處理線程也許都是線程池中的某個線程,相互間可以完全沒有關系,由這個回調處理線程向瀏覽器返回內容。這就是異步的過程。
帶來的改進是顯而易見的,請求處理線程不需要阻塞了,它的能力得到了更充分的使用,帶來了服務器吞吐能力的提升。
三、使用Spring MVC 和Servlet3異步線程
3.1、前提
要使用Spring MVC的異步功能,你得先確保你用的是Servlet 3.0或以上的版本,Maven中如此配置:
<dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <version>3.1.0</version> <scope>provided</scope> </dependency>
Spring MVC 3.2以后版本開始引入了基於Servlet3的異步請求處理
3.2、概述
相比以前,控制器方法已經不一定需要一個值,而是可以直接返回一個Callable對象,並通過Spring MVC所管理的線程來產生返回值,與此同時,Servlet容器的主線程則可以退出並釋放其資源,同時也允許容器去處理其它請求。通過一個TaskExecutor,Spring MVC可以在另外的線程中調用Callable。當Callable返回時,請求在攜帶Callable返回的值,再次被分配到Servlet容器中恢復處理流程。
官方文檔中說DeferredResult和Callable都是為了異步生成返回值提供基本的支持。簡單來說就是一個請求進來,如果你使用了DeferredResult或者Callable,在沒有得到返回數據之前,DispatcherServlet和所有Filter就會退出Servlet容器線程,但響應保持打開狀態,一旦返回數據有了,這個DispatcherServlet就會被再次調用並且處理,以異步產生的方式,向請求端返回值。
這么做的好處就是請求不會長時間占用服務連接池,提高服務器的吞吐量。
1、Servlet 3.0異步請求運作機制的部分原理
a.Servlet請求ServletRequest可以通過調用request.startAsync()方法而進入異步模式,這樣做的主要結果就是該Servlet以及所有的過濾器都可以結束但其相應(Response)會留待異步處理結束后在返回調用。
b.request.startAsync()方法會返回一個AsyncContext對象,可以用他對異步處理進行進一步的控制和操作,比如說他也提供了一個與反轉(forward)很相似的dispatch方法,只不過他允許應用恢復Servlet容器的請求處理進程。
c.ServletRequest提供了獲取當前DispatherType的方式,后者可以用來區別當前處理的是原始請求,異步分發請求,轉向或者是其它類型的請求分發類型。
2、Callable的異步請求被處理時所發生的事件
官方介紹
Controller returns a Callable. Spring MVC calls request.startAsync() and submits the Callable to a TaskExecutor for processing in a separate thread. Meanwhile the DispatcherServlet and all Filter’s exit the Servlet container thread but the response remains open. Eventually the Callable produces a result and Spring MVC dispatches the request back to the Servlet container to complete processing. The DispatcherServlet is invoked again and processing resumes with the asynchronously produced return value from the Callable.
1》Controller返回Callable
2》Spring MVC調用request.startAsync()並將Callable提交給TaskExecutor,以便在單獨的線程中進行處理。
3》同時DispatcherServlet和所有Filter都退出Servlet容器線程,但響應仍保持打開狀態。
4》最終,Callable生成一個結果,Spring MVC將請求調度回Servlet容器以完成處理。
5》再次調用DispatcherServlet,並使用來自Callable的異步生成的返回值繼續處理。
流程上大體與DeferredResult類似,只不過Callable是由TaskExecutor來處理的,而TaskExecutor繼承自java.util.concurrent.Executor。我們來看一下它的源代碼,它也是在WebAysncManager中處理的:
/** * Use the given {@link WebAsyncTask} to configure the task executor as well as * the timeout value of the {@code AsyncWebRequest} before delegating to * {@link #startCallableProcessing(Callable, Object...)}. * @param webAsyncTask a WebAsyncTask containing the target {@code Callable} * @param processingContext additional context to save that can be accessed * via {@link #getConcurrentResultContext()} * @throws Exception if concurrent processing failed to start */ public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext) throws Exception { Assert.notNull(webAsyncTask, "WebAsyncTask must not be null"); Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null"); Long timeout = webAsyncTask.getTimeout(); if (timeout != null) { this.asyncWebRequest.setTimeout(timeout); } AsyncTaskExecutor executor = webAsyncTask.getExecutor(); if (executor != null) { this.taskExecutor = executor; } List<CallableProcessingInterceptor> interceptors = new ArrayList<CallableProcessingInterceptor>(); interceptors.add(webAsyncTask.getInterceptor()); interceptors.addAll(this.callableInterceptors.values()); interceptors.add(timeoutCallableInterceptor); final Callable<?> callable = webAsyncTask.getCallable(); final CallableInterceptorChain interceptorChain = new CallableInterceptorChain(interceptors); this.asyncWebRequest.addTimeoutHandler(new Runnable() { @Override public void run() { logger.debug("Processing timeout"); Object result = interceptorChain.triggerAfterTimeout(asyncWebRequest, callable); if (result != CallableProcessingInterceptor.RESULT_NONE) { setConcurrentResultAndDispatch(result); } } }); this.asyncWebRequest.addCompletionHandler(new Runnable() { @Override public void run() { interceptorChain.triggerAfterCompletion(asyncWebRequest, callable); } }); interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, callable); startAsyncProcessing(processingContext); //啟動線程池的異步處理 try { this.taskExecutor.submit(new Runnable() { @Override public void run() { Object result = null; try { interceptorChain.applyPreProcess(asyncWebRequest, callable); result = callable.call(); } catch (Throwable ex) { result = ex; } finally { result = interceptorChain.applyPostProcess(asyncWebRequest, callable, result); } //設置當前的結果並轉發 setConcurrentResultAndDispatch(result); } }); } catch (RejectedExecutionException ex) { Object result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, ex); setConcurrentResultAndDispatch(result); throw ex; } }
對比DeferredResult,在這里剛開始也是添加攔截器,只不過攔截器的名稱是CallableProcessingInterceptor ,同時也需要設置WebAsyncRequest的超時處理,完成時處理的響應操作。這其中最大的區別就是使用TaskExecutor來對Callable進行異步處理
3、DeferredResult對象請求的處理順序也非常類似,區別在於應用可以通過任何線程來計算返回一個結果
官網描述
DeferredResult processing: Controller returns a DeferredResult and saves it in some in-memory queue or list where it can be accessed. Spring MVC calls request.startAsync(). Meanwhile the DispatcherServlet and all configured Filter’s exit the request processing thread but the response remains open. The application sets the DeferredResult from some thread and Spring MVC dispatches the request back to the Servlet container. The DispatcherServlet is invoked again and processing resumes with the asynchronously produced return value.
1》將Controller返回的DeferredResult值保存到內存隊列或集合當中以便存取
2》SpringMVC調用HttpServletRequest的startAsync()方法,異步處理
3》同時,DispatcherServlet和所有已配置的Filter都退出請求處理線程,但響應仍保持打開狀態,此時方法的響應對象仍未返回。
4》應用程序從某個線程設置DeferredResult,Spring MVC將請求調度回Servlet容器,恢復處理
5》再次調用DispatcherServlet,並使用異步生成的返回值繼續處理
源碼分析:
當一個請求被DispatcherServlet處理時,會試着獲取一個WebAsyncManager對象
protected void doDispatch(HttpServletRequest request, HttpServletResponse response) throws Exception { HttpServletRequest processedRequest = request; HandlerExecutionChain mappedHandler = null; boolean multipartRequestParsed = false; // 獲取WebAsyncManager WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request); try { // ......省略部分代碼 // 執行子控制器的方法 mv = ha.handle(processedRequest, response, mappedHandler.getHandler()); //如果當前的請求需要異步處理,則終止當前請求,但是響應是打開的 if (asyncManager.isConcurrentHandlingStarted()) { return; } //....省略部分代碼 } //....省略部分代碼 }
對於每一個子控制器的方法返回值,都是HandlerMethodReturnValueHandler接口處理的,其中有一個實現類是DeferredResultMethodReturnValueHandler,關鍵代碼如下:
package org.springframework.web.servlet.mvc.method.annotation; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletionStage; import java.util.function.BiFunction; import org.springframework.core.MethodParameter; import org.springframework.lang.UsesJava8; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.context.request.NativeWebRequest; import org.springframework.web.context.request.async.DeferredResult; import org.springframework.web.context.request.async.WebAsyncUtils; import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler; import org.springframework.web.method.support.ModelAndViewContainer; /** * Handler for return values of type {@link DeferredResult}, {@link ListenableFuture}, * {@link CompletionStage} and any other async type with a {@link #getAdapterMap() * registered adapter}. * * @author Rossen Stoyanchev * @since 3.2 */ @SuppressWarnings("deprecation") public class DeferredResultMethodReturnValueHandler implements AsyncHandlerMethodReturnValueHandler { //存放DeferredResult的適配集合 private final Map<Class<?>, DeferredResultAdapter> adapterMap; public DeferredResultMethodReturnValueHandler() { this.adapterMap = new HashMap<Class<?>, DeferredResultAdapter>(5); this.adapterMap.put(DeferredResult.class, new SimpleDeferredResultAdapter()); this.adapterMap.put(ListenableFuture.class, new ListenableFutureAdapter()); if (ClassUtils.isPresent("java.util.concurrent.CompletionStage", getClass().getClassLoader())) { this.adapterMap.put(CompletionStage.class, new CompletionStageAdapter()); } } /** * Return the map with {@code DeferredResult} adapters. * <p>By default the map contains adapters for {@code DeferredResult}, which * simply downcasts, {@link ListenableFuture}, and {@link CompletionStage}. * @return the map of adapters * @deprecated in 4.3.8, see comments on {@link DeferredResultAdapter} */ @Deprecated public Map<Class<?>, DeferredResultAdapter> getAdapterMap() { return this.adapterMap; } private DeferredResultAdapter getAdapterFor(Class<?> type) { for (Class<?> adapteeType : getAdapterMap().keySet()) { if (adapteeType.isAssignableFrom(type)) { return getAdapterMap().get(adapteeType); } } return null; } @Override public boolean supportsReturnType(MethodParameter returnType) { return (getAdapterFor(returnType.getParameterType()) != null); } @Override public boolean isAsyncReturnValue(Object returnValue, MethodParameter returnType) { return (returnValue != null && (getAdapterFor(returnValue.getClass()) != null)); } @Override public void handleReturnValue(Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception { if (returnValue == null) { mavContainer.setRequestHandled(true); return; } //根據返回值的類型獲取對應的DeferredResult適配器 DeferredResultAdapter adapter = getAdapterFor(returnValue.getClass()); if (adapter == null) { throw new IllegalStateException( "Could not find DeferredResultAdapter for return value type: " + returnValue.getClass()); } DeferredResult<?> result = adapter.adaptToDeferredResult(returnValue); //開啟異步請求 WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer); } }
在這里我們重點關注handleReturnValue的方法,在經過適配包裝后獲取DeferredResult開啟了異步之旅
緊接着查看handleReturnValue方法中調用的WebAsyncManager的startDeferredResultProcessing方法
public void startDeferredResultProcessing( final DeferredResult<?> deferredResult, Object... processingContext) throws Exception { Assert.notNull(deferredResult, "DeferredResult must not be null"); Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null"); //設置超時時間 Long timeout = deferredResult.getTimeoutValue(); if (timeout != null) { this.asyncWebRequest.setTimeout(timeout); } //獲取所有的延遲結果攔截器 List<DeferredResultProcessingInterceptor> interceptors = new ArrayList<DeferredResultProcessingInterceptor>(); interceptors.add(deferredResult.getInterceptor()); interceptors.addAll(this.deferredResultInterceptors.values()); interceptors.add(timeoutDeferredResultInterceptor); final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors); this.asyncWebRequest.addTimeoutHandler(new Runnable() { @Override public void run() { try { interceptorChain.triggerAfterTimeout(asyncWebRequest, deferredResult); } catch (Throwable ex) { setConcurrentResultAndDispatch(ex); } } }); this.asyncWebRequest.addCompletionHandler(new Runnable() { @Override public void run() { interceptorChain.triggerAfterCompletion(asyncWebRequest, deferredResult); } }); interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, deferredResult); //開始異步處理 startAsyncProcessing(processingContext); try { interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult); deferredResult.setResultHandler(new DeferredResultHandler() { @Override public void handleResult(Object result) { result = interceptorChain.applyPostProcess(asyncWebRequest, deferredResult, result); //設置結果並轉發 setConcurrentResultAndDispatch(result); } }); } catch (Throwable ex) { setConcurrentResultAndDispatch(ex); } } private void startAsyncProcessing(Object[] processingContext) { clearConcurrentResult(); this.concurrentResultContext = processingContext; //實際上是執行的是HttpServletRequest對應方法 this.asyncWebRequest.startAsync(); if (logger.isDebugEnabled()) { HttpServletRequest request = this.asyncWebRequest.getNativeRequest(HttpServletRequest.class); String requestUri = urlPathHelper.getRequestUri(request); logger.debug("Concurrent handling starting for " + request.getMethod() + " [" + requestUri + "]"); } }
在這里首先收集所有配置好的DeferredResultProcessingInterceptor ,然后設置asyncRequest的超時處理,完成時的處理等,同時會分階段執行攔截器中的各個方法。最后我們關注一下如下代碼:
deferredResult.setResultHandler(result -> { result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result); //設置結果並轉發 setConcurrentResultAndDispatch(result); });
查看setConcurrentResultAndDispatch內實現:其最終還是要調用AsyncWebRequest接口中的dispatch方法進行轉發,讓DispatcherServlet重新處理異步結果:this.asyncWebRequest.dispatch();
其實在這里都是封裝自HttpServletRequest的異步操作,我們可以看一下StandardServletAsyncWebRequest的類結構圖:

可以在其父類ServletRequestAttributes里找到對應的實現:
private final HttpServletRequest request; /** * Exposes the native {@link HttpServletRequest} that we're wrapping. */ public final HttpServletRequest getRequest() { return this.request; }
StandardServletAsyncWebRequest代碼,方便理解整個異步是怎么執行:
//java.servlet.AsnycContext private AsyncContext asyncContext; @Override public void startAsync() { Assert.state(getRequest().isAsyncSupported(), "Async support must be enabled on a servlet and for all filters involved " + "in async request processing. This is done in Java code using the Servlet API " + "or by adding \"<async-supported>true</async-supported>\" to servlet and " + "filter declarations in web.xml."); Assert.state(!isAsyncComplete(), "Async processing has already completed"); if (isAsyncStarted()) { return; } this.asyncContext = getRequest().startAsync(getRequest(), getResponse()); this.asyncContext.addListener(this); if (this.timeout != null) { this.asyncContext.setTimeout(this.timeout); } } @Override public void dispatch() { Assert.notNull(this.asyncContext, "Cannot dispatch without an AsyncContext"); this.asyncContext.dispatch(); }
4、異步結果的異常處理:
如果Callable在執行過程中拋出異常 與一般的控制器異常一樣,會被正常的異常處理流程捕獲處理
如果返回方法是一個DeferredResult對象,可以選擇
deferredResult.setErrorResult()
5、攔截異步請求
處理連接器HandlerInterceptor可以實現AsyncHandlerInterceptor接口攔截異步請求,因為在異步請求的開始時,被調用的回調方法是該接口的afterConcurrentHandlingStarted方法,而不是一般的postHandle 和 afterCompletion方法。如果需要與異步請求處理的生命流程有更深入的集成,比如需要處理timeout的事件等,則HandlerInterceptor需要注冊CallableProcessingInterceptor或DeferredResultProcessingInterceptor攔截器,更多細節需要參考AsyncHandlerInterceptor類的Java文檔。
DeferredResult類還提供了onTimeout(Runnable)和onCompletion(Runnable)等方法可以參考DeferredResult的java文檔
Callable需要請求過期(timeout)和完成后的攔截時,可以把他包裝在一個WebAsyncTask實例中,后者提供了相關技術支持。
3.3、異步攔截器
1)、原生API的AsyncListener
2)、SpringMVC:實現AsyncHandlerInterceptor;
四、使用
4.1、Callable使用
@RestController public class WebCallableAsyncController { Logger log=LoggerFactory.getLogger(WebCallableAsyncController.class); @GetMapping("/callable") public Callable<String> testCallable() throws Exception { log.info("主線程開始!"); Callable<String> result = new Callable<String>() { @Override public String call() throws Exception { log.info("副線程開始1!"); Thread.sleep(3000); log.info("副線程結束1!"); return "SUCCESS1"; } }; log.info("主線程結束!"); return result; } }
請求地址查看
2019-02-27 14:25:00.197 INFO 13815 --- [nio-8080-exec-3] c.g.b.g.d.w.WebCallableAsyncController : 主線程開始! 2019-02-27 14:25:00.197 INFO 13815 --- [nio-8080-exec-3] c.g.b.g.d.w.WebCallableAsyncController : 主線程結束! 2019-02-27 14:25:00.197 INFO 13815 --- [ MvcAsync2] c.g.b.g.d.w.WebCallableAsyncController : 副線程開始1! 2019-02-27 14:25:03.200 INFO 13815 --- [ MvcAsync2] c.g.b.g.d.w.WebCallableAsyncController : 副線程結束1!
返回Callable意味着Spring MVC將調用在不同的線程中執行定義的任務。Spring將使用TaskExecutor來管理線程。在等待完成的長期任務之前,servlet線程將被釋放。
在長時間運行的任務執行完畢之前就已經從servlet返回了。這並不意味着客戶端收到了一個響應。與客戶端的通信仍然是開放的等待結果,但接收到的請求的線程已被釋放,並可以服務於另一個客戶的請求。
4.2、DeferredResult使用
一旦在Servlet容器中啟用了異步請求處理功能,控制器方法就可以使用DeferredResult包裝任何支持的控制器方法返回值,
DeferredResult這個類代表延遲結果,我們先看一看spring的API文檔給我們的解釋:
{@code DeferredResult} provides an alternative to using a {@link Callable} for asynchronous request processing.
While a {@code Callable} is executed concurrently on behalf of the application,
with a {@code DeferredResult} the application can produce the result from a thread of its choice.
根據文檔說明DeferredResult可以替代Callable來進行異步的請求處理。只不過這個類可以從其他線程里拿到對應的結果。當使用DeferredResult,我們可以將DefferedResult的類型並將其保存到可以獲取到該對象的地方,比如說隊列或者集合當中,這樣方便其它線程能夠取到並設置DefferedResult的值。
@RestController public class WebDeferredResultAsyncController { Logger log = LoggerFactory.getLogger(WebDeferredResultAsyncController.class); //接收隊列 private BlockingQueue<DeferredResult<String>> blockingQueue = new ArrayBlockingQueue(1024); //接收隊列 或者ConcurrentLinkedQueue private static Queue<DeferredResult<String>> queue = new ConcurrentLinkedQueue<DeferredResult<String>>(); /** * 返回值是DeferredResult類型,如果沒有結果請求阻塞 * * @return */ @GetMapping("/quotes") public DeferredResult<String> quotes() { //指定超時時間,及出錯時返回的值 DeferredResult<String> result = new DeferredResult(3000L, "error"); blockingQueue.add(result); // queue.add(result); return result; } /** * 另外一個請求(新的線程)設置值 * * @throws InterruptedException */ @GetMapping("take") public void take() throws InterruptedException { DeferredResult<String> result = blockingQueue.take(); result.setResult("route"); // DeferredResult<String> poll = queue.poll(); // poll.setResult("OK"); } }
控制器可以從不同的線程異步生成返回值,例如響應外部事件(JMS消息)、計划任務等,那么在這里我先使用另外一個請求來模擬這個過程
此時我們啟動tomcat,先訪問地址http://localhost:8080/quotes ,此時我們會看到發送的請求由於等待響應遭到了阻塞:
當在規定時間內訪問http://localhost:8080/take 時,則能成功顯示結果:
如果有另一個線程給DeferredResult賦值后,DeferredResult在感知到自己的對象被賦值后就返回頁面成功;
一個獨立的示例
/** * 一個獨立的示例 * @return */ @RequestMapping(value = "/deferred", method = RequestMethod.GET) public DeferredResult<String> executeSlowTask() { log.info("Request received"); DeferredResult<String> deferredResult = new DeferredResult<>(); CompletableFuture.supplyAsync(()->{ try { Thread.sleep(5000); log.info("Slow task executed"); return "Task finished"; } catch (InterruptedException e) { e.printStackTrace(); return "Task exception"; } }).whenCompleteAsync((result, throwable) -> deferredResult.setResult(result)); log.info("Servlet thread released"); return deferredResult; }
返回DeferredResult和返回Callable有什么區別?不同的是返回DeferredResult的線程是由我們管理。創建一個線程並將結果set到DeferredResult是由我們自己來做的。
用completablefuture創建一個異步任務。這將創建一個新的線程,在那里我們的長時間運行的任務將被執行。也就是在這個線程中,我們將set結果到DeferredResult並返回。
是在哪個線程池中我們取回這個新的線程?默認情況下,在completablefuture的supplyasync方法將在forkjoin池運行任務。如果你想使用一個不同的線程池,你可以通過傳一個executor到supplyasync方法:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
注意:Callable和Deferredresult做的是同樣的事情——釋放容器線程,在另一個線程上異步運行長時間的任務。不同的是誰管理執行任務的線程。
4.3、 WebAsyncTask
1》常規調用
查看WebAsyncTask,有說明:Holder for a {@link Callable}, a timeout value, and a task executor.其實是一個Callable。
示例
@RequestMapping(value="/longtimetask", method = RequestMethod.GET) public WebAsyncTask longTimeTask(){ System.out.println("/longtimetask被調用 thread id is : " + Thread.currentThread().getId()); Callable<String> callable = new Callable<String>() { public String call() throws Exception { Thread.sleep(3000); //假設是一些長時間任務 System.out.println("執行成功 thread id is : " + Thread.currentThread().getId()); return "ok"; } }; return new WebAsyncTask(callable); }
事實上,直接返回Callable<String>都是可以的,但這里包裝了一層,以便做后面提到的“超時處理”。和前一個方案的差別在於這個Callable的call方法並不是我們直接調用的,而是在longTimeTask返回后,由Spring MVC用一個工作線程來調用,執行,打印出來的結果:
/longtimetask被調用 thread id is : 24
執行成功 thread id is : 38
2》超時處理
如果“長時間處理任務”一直沒返回,那也不應該讓客戶端無限等下去,需要服務端終結,即“超時”處理。如圖:

“超時處理線程”和“回調處理線程”可能都是線程池中的某個線程,我為了清晰點把它們分開畫而已。
@RequestMapping(value="/longtimetaskTimeout", method = RequestMethod.GET) public WebAsyncTask longtimetaskTimeout(){ System.out.println("/longtimetask被調用 thread id is : " + Thread.currentThread().getId()); Callable<String> callable = new Callable<String>() { public String call() throws Exception { Thread.sleep(3000); //假設是一些長時間任務 System.out.println("執行成功 thread id is : " + Thread.currentThread().getId()); return "ok"; } }; WebAsyncTask webAsyncTask = new WebAsyncTask(2000,callable); webAsyncTask.onTimeout(()->{ System.out.println("執行超時 thread id is :" + Thread.currentThread().getId()); return "執行超時"; }); return webAsyncTask; }
這就是前面提到的為什么Callable還要外包一層的緣故,給WebAsyncTask設置一個超時回調,即可實現超時處理,在這個例子中,正常處理需要3秒鍾,而超時設置為2秒,所以肯定會出現超時
返回值
/longtimetask被調用 thread id is : 23
執行超時 thread id is :24
3》綜合示例
自定義線程池
@Configuration public class TaskConfiguration { @Bean("taskExecutor") public ThreadPoolTaskExecutor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(5); taskExecutor.setMaxPoolSize(10); taskExecutor.setQueueCapacity(10); taskExecutor.setThreadNamePrefix("asyncTask"); return taskExecutor; } }
使用
@GetMapping("/threadPool")
public WebAsyncTask<String> asyncTaskThreadPool() {
return new WebAsyncTask<>(10 * 1000L, executor,
() -> {
out.println(format("異步工作線程:%s", currentThread().getName()));
return asyncService.generateUUID();
});
}
超時、異常綜合示例
@RestController public class WebAsyncTaskController { private final WebAsyncService asyncService; private final static String ERROR_MESSAGE = "Task error"; private final static String TIME_MESSAGE = "Task timeout"; @Autowired @Qualifier("taskExecutor") private ThreadPoolTaskExecutor executor; @Autowired public WebAsyncTaskController(WebAsyncService asyncService) { this.asyncService = asyncService; } @GetMapping("/completion") public WebAsyncTask<String> asyncTaskCompletion() { // 打印處理線程名 out.println(format("請求處理線程:%s", currentThread().getName())); // 模擬開啟一個異步任務,超時時間為10s WebAsyncTask<String> asyncTask = new WebAsyncTask<>(10 * 1000L, () -> { out.println(format("異步工作線程:%s", currentThread().getName())); // 任務處理時間5s,不超時 sleep(5 * 1000L); return asyncService.generateUUID(); }); // 任務執行完成時調用該方法 asyncTask.onCompletion(() -> out.println("任務執行完成")); out.println("繼續處理其他事情"); return asyncTask; } @GetMapping("/exception") public WebAsyncTask<String> asyncTaskException() { // 打印處理線程名 out.println(format("請求處理線程:%s", currentThread().getName())); // 模擬開啟一個異步任務,超時時間為10s WebAsyncTask<String> asyncTask = new WebAsyncTask<>(10 * 1000L, () -> { out.println(format("異步工作線程:%s", currentThread().getName())); // 任務處理時間5s,不超時 sleep(5 * 1000L); throw new Exception(ERROR_MESSAGE); }); // 任務執行完成時調用該方法 asyncTask.onCompletion(() -> out.println("任務執行完成")); asyncTask.onError(() -> { out.println("任務執行異常"); return ERROR_MESSAGE; }); out.println("繼續處理其他事情"); return asyncTask; } @GetMapping("/timeout") public WebAsyncTask<String> asyncTaskTimeout() { // 打印處理線程名 out.println(format("請求處理線程:%s", currentThread().getName())); // 模擬開啟一個異步任務,超時時間為10s WebAsyncTask<String> asyncTask = new WebAsyncTask<>(10 * 1000L, () -> { out.println(format("異步工作線程:%s", currentThread().getName())); // 任務處理時間5s,不超時 sleep(15 * 1000L); return TIME_MESSAGE; }); // 任務執行完成時調用該方法 asyncTask.onCompletion(() -> out.println("任務執行完成")); asyncTask.onTimeout(() -> { out.println("任務執行超時"); return TIME_MESSAGE; }); out.println("繼續處理其他事情"); return asyncTask; } }
4.4、@Async
參看:https://www.cnblogs.com/bjlhx/p/10364385.html
五、Callable、DeferredResult、WebAsyncTask、Async對比、
| Callable | WebAsyncTask | DeferredResult | Async | |
| 針對問題點 | 異步請求處理 | 異步請求處理 | 異步請求處理 | 異步方法 |
| 目標 | 釋放容器線程 | 釋放容器線程 | 釋放容器線程 | 服務線程內多線程執行 |
| 攔截器 | CallableProcessingInterceptor | DeferredResultProcessingInterceptor | ||
| 超時攔截器 | TimeoutCallableProcessingInterceptor | TimeoutDeferredResultProcessingInterceptor |
常用類:
NoSupportAsyncWebRequest.java
不支持異步處理模式的web請求
DeferredResultProcessingInterceptor.java
DeferredResult處理過程攔截器
在start async前,超時后/異步處理完成后/網絡超時后觸發攔截
DeferredResultProcessingInterceptorAdapter.java
抽象類實現DeferredResultProcessingInterceptor,做空實現
DeferredResultInterceptorChain.java
調用DeferredResultProcessingInterceptor的輔助類
DeferredResult.java
遞延結果,在兩個線程中傳遞的對象結果
實現Comparable接口以保證加入PriorityQueue隊列的正確順序
CallableProcessingInterceptor.java
Callable攔截器
CallableProcessingInterceptorAdapter.java
抽象類實現CallableProcessingInterceptor接口,空實現
CallableInterceptorChain.java
調用CallableProcessingInterceptor的輔助類
TimeoutCallableProcessingInterceptor.java
繼承CallableProcessingInterceptorAdapter
實現超時處理方法
TimeoutDeferredResultProcessingInterceptor.java
繼承DeferredResultProcessingInterceptorAdapter
實現超時處理方法
WebAsyncTask.java
web異步任務
包含一個Callable類,一個超時時間,一個任務執行着或名字
WebAsyncUtils.java
實現getAsyncManager
實現createAsyncWebRequest
WebAsyncManager.java
對Callables和DeferredResults啟動的管理,包括攔截器的注入,Excutor的注入等
異步處理的入口類
