SpringMVC中異步請求相關組件
SpringMVC在此基礎上對異步請求進行了封裝。提供了AsyncWebRequest
類型的request,並提供了處理異步請求的管理器WebAsyncManager
和工具WebAsyncUtils
.
SpringMVC將異步請求返回值細分為了:Callable,WebAsyncTask,,DeferredResult 和 ListenableFuture
. 后續會針對這四種不同的類型一一分析。
AsyncWebRequest
AsyncWebRequest
,它是專門處理異步請求的request,定義如下:
//org.springframework.web.context.request.async.AsyncWebRequest public interface AsyncWebRequest extends NativeWebRequest { void setTimeout(Long timeout); //相當於在AsyncListener中的`onTimeout和onComplete` void addTimeoutHandler(Runnable runnable); void addCompletionHandler(Runnable runnable); void startAsync(); //判斷異步請求是否開啟和結束 boolean isAsyncStarted(); boolean isAsyncComplete(); void dispatch(); }
AsyncWebRequest
有兩個實現類,
NoSupportAsyncWebRequest
: 不支持異步請求StandardServletAsyncWebRequest
: 支持異步請求。
StandardServletAsyncWebRequest
除了實現了AsyncWebRequest
接口外,還實現了AsyncListener
,另外它還繼承了ServletWebRequest
.
public class StandardServletAsyncWebRequest extends ServletWebRequest implements AsyncWebRequest, AsyncListener { private Long timeout; //封裝 AsyncContext 屬性 private AsyncContext asyncContext; private AtomicBoolean asyncCompleted = new AtomicBoolean(false); //AsyncListener onTimeout,onCompletion方法 調用如下handlers.. private final List<Runnable> timeoutHandlers = new ArrayList<Runnable>(); private final List<Runnable> completionHandlers = new ArrayList<Runnable>(); @Override public boolean isAsyncStarted() { return ((this.asyncContext != null) && getRequest().isAsyncStarted()); } @Override public void startAsync() { if (isAsyncStarted()) { return; } this.asyncContext = getRequest().startAsync(getRequest(), getResponse()); this.asyncContext.addListener(this); if (this.timeout != null) { this.asyncContext.setTimeout(this.timeout); } } // --- 實現 AsyncListener 方法---- @Override public void onTimeout(AsyncEvent event) throws IOException { for (Runnable handler : this.timeoutHandlers) { handler.run(); } } @Override public void onComplete(AsyncEvent event) throws IOException { for (Runnable handler : this.completionHandlers) { handler.run(); } //執行完完成時,清空asyncContext this.asyncContext = null; this.asyncCompleted.set(true); } }
WebAsyncUtils
//org.springframework.web.context.request.async.WebAsyncUtils public abstract class WebAsyncUtils { //第一次獲取時,直接創建WebAsyncManager,並設置到setAttribute中。 以后獲取,直接從request屬性中獲取。 public static WebAsyncManager getAsyncManager(ServletRequest servletRequest) { WebAsyncManager asyncManager = (WebAsyncManager) servletRequest.getAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE); if (asyncManager == null) { asyncManager = new WebAsyncManager(); servletRequest.setAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE, asyncManager); } return asyncManager; } public static WebAsyncManager getAsyncManager(WebRequest webRequest) { //邏輯類似 getAsyncManager(ServletRequest servletRequest) 略... } //判斷ServletRequest是否有方法"startAsync"。 只有servlet環境3.0以上版本才有此方法 public static AsyncWebRequest createAsyncWebRequest(HttpServletRequest request, HttpServletResponse response) { return ClassUtils.hasMethod(ServletRequest.class, "startAsync") ? createStandardServletAsyncWebRequest(request, response) : new NoSupportAsyncWebRequest(request, response); } private static AsyncWebRequest createStandardServletAsyncWebRequest(HttpServletRequest request, HttpServletResponse response) { if (standardAsyncRequestConstructor == null) { String className = "org.springframework.web.context.request.async.StandardServletAsyncWebRequest"; Class<?> clazz = ClassUtils.forName(className, WebAsyncUtils.class.getClassLoader()); standardAsyncRequestConstructor = clazz.getConstructor(HttpServletRequest.class, HttpServletResponse.class); } return (AsyncWebRequest) BeanUtils.instantiateClass(standardAsyncRequestConstructor, request, response); } }
WebAsyncManager
WebAsyncManager
是SpringMVC處理異步請求過程中最核心的類,它管理着整個異步處理的過程。
//org.springframework.web.context.request.async public final class WebAsyncManager { //兩種類型的 超時 Interceptors private static final CallableProcessingInterceptor timeoutCallableInterceptor = new TimeoutCallableProcessingInterceptor(); private static final DeferredResultProcessingInterceptor timeoutDeferredResultInterceptor = new TimeoutDeferredResultProcessingInterceptor(); //持有 asyncWebRequest 對象 private AsyncWebRequest asyncWebRequest; private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(this.getClass().getSimpleName()); //兩種類型的 處理請求Interceptors private final Map<Object, CallableProcessingInterceptor> callableInterceptors = new LinkedHashMap<Object, CallableProcessingInterceptor>(); private final Map<Object, DeferredResultProcessingInterceptor> deferredResultInterceptors = new LinkedHashMap<Object, DeferredResultProcessingInterceptor>(); //用來處理Callable 和 WebAsyncTask 類型的異步請求 public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext) throws Exception { } //用來處理 DeferredResult 和 ListenableFuture 類型的請求 public void startDeferredResultProcessing(final DeferredResult<?> deferredResult, Object... processingContext) throws Exception }
它最重要的兩個方法是:startCallableProcessing
和startDeferredResultProcessing
,這兩個方法是啟動異步處理的入口方法,它們一共做三件事:
- 給Request設置屬性(timeout,timeoutHandler,completionHandler…)
- 在相應位置,執行
interceptors
邏輯 - 啟動異步處理
這里重點分析下startCallableProcessing
:
public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext) throws Exception { //設置asyncWebRequest 屬性... Long timeout = webAsyncTask.getTimeout(); if (timeout != null) { this.asyncWebRequest.setTimeout(timeout); } AsyncTaskExecutor executor = webAsyncTask.getExecutor(); if (executor != null) { this.taskExecutor = executor; } //初始化 interceptors //在asyncWebRequest 執行前后,執行完成,超時 等關鍵時間節點 執行 interceptors 邏輯... //啟動異步處理 startAsyncProcessing(processingContext); // 線程池 執行callable方法.... this.taskExecutor.submit(new Runnable() { @Override public void run() { // interceptors 略.... Object result = callable.call(); //設置處理結果,並發送請求 setConcurrentResultAndDispatch(result); } }); } //調用asyncWebRequest.startAsync()啟動異步處理 private void startAsyncProcessing(Object[] processingContext) { clearConcurrentResult(); this.concurrentResultContext = processingContext; this.asyncWebRequest.startAsync(); } //判斷是否已經有異步處理結果 public boolean hasConcurrentResult() { //concurrentResult 初始化時 = RESULT_NONE return (this.concurrentResult != RESULT_NONE); } //設置處理結果,並發送請求 private void setConcurrentResultAndDispatch(Object result) { synchronized (WebAsyncManager.this) { //判斷是否已經有異步處理結果 if (hasConcurrentResult()) { return; } //將result設置為當前處理結果 this.concurrentResult = result; } //如果異步請求在這里已經被設置為異步處理完成狀態,則記錄錯誤日志。(網絡異常會造成此種問題) if (this.asyncWebRequest.isAsyncComplete()) { logger.error("Could not complete async processing due to timeout or network error"); return; } //再次發送請求:SpringMVC請求處理完成之后再次發送一個相同的請求。在HandlerAdapter做特殊處理 this.asyncWebRequest.dispatch(); }
SpringMVC 對異步的支持
SpringMVC想要支持異步處理,首先DispatchServlet要配置:<async-supported>true</async-supported>
,其次請求方法的返回值為:Callable,WebAsyncTask,,DeferredResult 和 ListenableFuture
@Controller @RequestMapping("/async") public class AsyncController { @RequestMapping(value = "/callable",produces = "text/plain;charset=UTF-8") @ResponseBody public Callable<String> callable(){ System.out.println("Callable進入主線程..."); Callable<String> result = new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(5 * 1000); System.out.println("Callable子線程執行ing..."); return "Callable:"+"久等了"; } }; System.out.println("Callable主線程退出..."); return result; } @RequestMapping(value = "/web",produces = "text/plain;charset=UTF-8") @ResponseBody public WebAsyncTask<String> web(){ System.out.println("WebAsyncTask 進入主線程..."); WebAsyncTask task = new WebAsyncTask(new Callable() { @Override public Object call() throws Exception { Thread.sleep(5 * 1000); System.out.println("WebAsyncTask 子線程執行ing..."); return "WebAsyncTask:"+"久等了"; } }); System.out.println("WebAsyncTask 主線程退出..."); return task; } @RequestMapping(value = "/deferred",produces = "text/plain;charset=UTF-8") @ResponseBody public DeferredResult<String> deferred(){ //這里的 7 * 1000 L ,是指主線程結束之后的超時時間。 DeferredResult<String> result = new DeferredResult<String>(7 * 1000L , "超時了"); approve(result); try { Thread.sleep(10 * 1000); //在主線程執行這段代碼,並不會拋出"超時了" } catch (InterruptedException e) { } return result; } private void approve(final DeferredResult<String> result) { new Thread(() -> { try { Thread.sleep(5 * 1000); result.setResult("同意:" + LocalDateTime.now()); } catch (InterruptedException e) { } }).start(); } @RequestMapping(value = "/future",produces = "text/plain;charset=UTF-8") public ListenableFuture<ResponseEntity<String>> future(){ ListenableFuture<ResponseEntity<String>> future = new AsyncRestTemplate().getForEntity("http://www.baidu.com", String.class); return future; } }
源碼跟蹤
springMVC異步處理請求的過程是總體上可以拆分為2次:
- 第一次,啟動異步請求,並設置
timeout,completion
等事件的監聽,直接返回 null; - 第二次,當監聽到
completion
時,直接在發送一次相同的請求,並將執行結果返回。
SpringMVC執行請求方法的過程都是在HandlerAdater
中進行的。
在之前解析RequestMappingHandlerAdapter#invokeHandleMethod()
處理請求時,將異步請求部分給剔除了,現在回看此方法:
//org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter private ModelAndView invokeHandleMethod(HttpServletRequest request, HttpServletResponse response, HandlerMethod handlerMethod) throws Exception { ServletWebRequest webRequest = new ServletWebRequest(request, response); WebDataBinderFactory binderFactory = getDataBinderFactory(handlerMethod); ModelFactory modelFactory = getModelFactory(handlerMethod, binderFactory); ServletInvocableHandlerMethod requestMappingMethod = createRequestMappingMethod(handlerMethod, binderFactory); //mavContainer相關略...... AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response); asyncWebRequest.setTimeout(this.asyncRequestTimeout); final WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request); asyncManager.setTaskExecutor(this.taskExecutor); asyncManager.setAsyncWebRequest(asyncWebRequest); asyncManager.registerCallableInterceptors(this.callableInterceptors); asyncManager.registerDeferredResultInterceptors(this.deferredResultInterceptors); //異步請求是否已經完成 if (asyncManager.hasConcurrentResult()) { //如果異步請求已經處理完成,則獲取執行結果 --- 1 Object result = asyncManager.getConcurrentResult(); mavContainer = (ModelAndViewContainer) asyncManager.getConcurrentResultContext()[0]; //清空執行結果 asyncManager.clearConcurrentResult(); //覆蓋原有的requestMappingMethod方法; --- 2 requestMappingMethod = requestMappingMethod.wrapConcurrentResult(result); } //執行方法 -- 3 requestMappingMethod.invokeAndHandle(webRequest, mavContainer); //asyncManager是否已經啟動 if (asyncManager.isConcurrentHandlingStarted()) { //-- 4 return null; } // --- 5 return getModelAndView(mavContainer, modelFactory, webRequest); }
- 第一次執行時: 會執行上述代碼中的
3,4
- 第二次執行時: 執行上述代碼中的
1,2,3,5
。 注意步驟2
,會將原有的requestMappingMethod
重寫.接下來會分析。
ServletInvocableHandlerMethod.invokeAndHandle(webRequest, mavContainer)
springMVC在使用RequestMappingHandlerAdapter#invokeHandleMethod()
處理請求時,會調用ServletInvocableHandlerMethod#invokeAndHandle()
方法,該方法在處理完畢之后,會調用
this.returnValueHandlers.handleReturnValue(returnValue, getReturnValueType(returnValue), mavContainer, webRequest);`
處理返回值,針對上述四種類型的結果,匹配不同的XXReturnValueHandler
.
Callable
: CallableMethodReturnValueHandler
//org.springframework.web.servlet.mvc.method.annotation.CallableMethodReturnValueHandler @Override public void handleReturnValue(Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception { //if 略... Callable<?> callable = (Callable<?>) returnValue; WebAsyncUtils.getAsyncManager(webRequest).startCallableProcessing(callable, mavContainer); }
WebAsyncTask
: AsyncTaskMethodReturnValueHandler
//org.springframework.web.servlet.mvc.method.annotation.AsyncTaskMethodReturnValueHandler @Override public void handleReturnValue(Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception { //if 略... WebAsyncTask<?> webAsyncTask = (WebAsyncTask<?>) returnValue; webAsyncTask.setBeanFactory(this.beanFactory); WebAsyncUtils.getAsyncManager(webRequest).startCallableProcessing(webAsyncTask, mavContainer); }
這里可以看出 Callable和webAsyncTask
都是用了startCallableProcessing
方法。
DeferredResult
: DeferredResultMethodReturnValueHandler
//org.springframework.web.servlet.mvc.method.annotation.DeferredResultMethodReturnValueHandler @Override public void handleReturnValue(Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception { //if 略... DeferredResult<?> deferredResult = (DeferredResult<?>) returnValue; WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer); }
DeferredResult
: DeferredResultMethodReturnValueHandler
//org.springframework.web.servlet.mvc.method.annotation.ListenableFutureReturnValueHandler @Override public void handleReturnValue(Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception { //if 略... final DeferredResult<Object> deferredResult = new DeferredResult<Object>(); WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer); ListenableFuture<?> future = (ListenableFuture<?>) returnValue; future.addCallback(new ListenableFutureCallback<Object>() { @Override public void onSuccess(Object result) { deferredResult.setResult(result); } @Override public void onFailure(Throwable ex) { deferredResult.setErrorResult(ex); } }); }
自此可以說明看 DeferredResult 和 ListenableFuture
都是用了startDeferredResultProcessing
方法。
ServletInvocableHandlerMethod.wrapConcurrentResult(result)
第二次請求時,要重點關注此行:requestMappingMethod.wrapConcurrentResult(result)
,此時的result
已經是異步執行后的最終結果,不是DeferredResult
.
//org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod private static final Method CALLABLE_METHOD = ClassUtils.getMethod(Callable.class, "call"); public ServletInvocableHandlerMethod(Object handler, Method method) { super(handler, method); initResponseStatus(); } ServletInvocableHandlerMethod wrapConcurrentResult(Object result) { return new ConcurrentResultHandlerMethod(result, new ConcurrentResultMethodParameter(result)); } //org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod $ ConcurrentResultMethodParameter private class ConcurrentResultMethodParameter extends HandlerMethodParameter { private final Object returnValue; private final ResolvableType returnType; //直接傳入返回值returnValue, 返回值的類型為 returnValue的類型 public ConcurrentResultMethodParameter(Object returnValue) { super(-1); this.returnValue = returnValue; this.returnType = ResolvableType.forType(super.getGenericParameterType()).getGeneric(0); } } //org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod $ ConcurrentResultHandlerMethod private class ConcurrentResultHandlerMethod extends ServletInvocableHandlerMethod { public ConcurrentResultHandlerMethod(final Object result, ConcurrentResultMethodParameter returnType) { //調用父類的構造方法(handler,method),最終調用 method.invoke(); super(new Callable<Object>() { @Override public Object call() throws Exception { if (result instanceof Exception) { throw (Exception) result; } else if (result instanceof Throwable) { throw new NestedServletException("Async processing failed", (Throwable) result); } //此時的result即為最終異步處理的結果. return result; } }, CALLABLE_METHOD); setHandlerMethodReturnValueHandlers(ServletInvocableHandlerMethod.this.returnValueHandlers); this.returnType = returnType; } }
第二次執行 requestMappingMethod.invokeAndHandle(webRequest, mavContainer);
,此時的requestMappingMethod
已經是偽造后的結果,該方法的返回值也被偽造為ConcurrentResultMethodParameter
,最終調用的為ConcurrentResultHandlerMethod
在構造函數中定義的Callable.call()
;
SpringMVC想要支持異步處理,首先DispatchServlet要配置:<async-supported>true</async-supported>,其次請求方法的返回值為:Callable,WebAsyncTask,,DeferredResult 和 ListenableFuture
<task:executor />
配置參數:
- id:當配置多個executor時,被@Async(“id”)指定使用;也被作為線程名的前綴。
- pool-size:
- core size:最小的線程數,缺省:1
- max size:最大的線程數,缺省:Integer.MAX_VALUE
- queue-capacity:當最小的線程數已經被占用滿后,新的任務會被放進queue里面,當這個queue的capacity也被占滿之后,pool里面會創建新線程處理這個任務,直到總線程數達到了max size,這時系統會拒絕這個任務並拋出TaskRejectedException異常(缺省配置的情況下,可以通過rejection-policy來決定如何處理這種情況)。缺省值為:Integer.MAX_VALUE
- keep-alive:超過core size的那些線程,任務完成后,再經過這個時長(秒)會被結束掉
- rejection-policy:當pool已經達到max size的時候,如何處理新任務
- ABORT(缺省):拋出TaskRejectedException異常,然后不執行
- DISCARD:不執行,也不拋出異常
- DISCARD_OLDEST:丟棄queue中最舊的那個任務
- CALLER_RUNS:不在新線程中執行任務,而是有調用者所在的線程來執行
Java編程方式的配置方法:
@Configuration @EnableAsync public class SpringConfig { /** Set the ThreadPoolExecutor's core pool size. */ private int corePoolSize = 10; /** Set the ThreadPoolExecutor's maximum pool size. */ private int maxPoolSize = 200; /** Set the capacity for the ThreadPoolExecutor's BlockingQueue. */ private int queueCapacity = 10; private String ThreadNamePrefix = "MyLogExecutor-"; @Bean public Executor logExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setThreadNamePrefix(ThreadNamePrefix); // rejection-policy:當pool已經達到max size的時候,如何處理新任務 // CALLER_RUNS:不在新線程中執行任務,而是有調用者所在的線程來執行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }