15.SpringMVC之異步請求


SpringMVC中異步請求相關組件

SpringMVC在此基礎上對異步請求進行了封裝。提供了AsyncWebRequest類型的request,並提供了處理異步請求的管理器WebAsyncManager和工具WebAsyncUtils.

SpringMVC將異步請求返回值細分為了:Callable,WebAsyncTask,,DeferredResult 和 ListenableFuture. 后續會針對這四種不同的類型一一分析。

AsyncWebRequest

AsyncWebRequest,它是專門處理異步請求的request,定義如下:

//org.springframework.web.context.request.async.AsyncWebRequest
public interface AsyncWebRequest extends NativeWebRequest {
    void setTimeout(Long timeout);
        
    //相當於在AsyncListener中的`onTimeout和onComplete`
    void addTimeoutHandler(Runnable runnable);
    void addCompletionHandler(Runnable runnable);
    
    void startAsync();
    
    //判斷異步請求是否開啟和結束
    boolean isAsyncStarted();
    boolean isAsyncComplete();
    
    void dispatch();
}

AsyncWebRequest 有兩個實現類,

  • NoSupportAsyncWebRequest: 不支持異步請求
  • StandardServletAsyncWebRequest: 支持異步請求。

StandardServletAsyncWebRequest 除了實現了AsyncWebRequest接口外,還實現了AsyncListener,另外它還繼承了ServletWebRequest.

public class StandardServletAsyncWebRequest extends ServletWebRequest implements AsyncWebRequest, AsyncListener {
    private Long timeout;
    
    //封裝 AsyncContext 屬性
    private AsyncContext asyncContext;
    private AtomicBoolean asyncCompleted = new AtomicBoolean(false);
    
    //AsyncListener onTimeout,onCompletion方法 調用如下handlers..
    private final List<Runnable> timeoutHandlers = new ArrayList<Runnable>();
    private final List<Runnable> completionHandlers = new ArrayList<Runnable>();
    
    @Override
    public boolean isAsyncStarted() {
        return ((this.asyncContext != null) && getRequest().isAsyncStarted());
    }
    
    @Override
    public void startAsync() {
        if (isAsyncStarted()) {
            return;
        }
        this.asyncContext = getRequest().startAsync(getRequest(), getResponse());
        this.asyncContext.addListener(this);
        if (this.timeout != null) {
            this.asyncContext.setTimeout(this.timeout);
        }
    }
    
    // ---  實現 AsyncListener 方法----
    @Override
    public void onTimeout(AsyncEvent event) throws IOException {
        for (Runnable handler : this.timeoutHandlers) {
            handler.run();
        }
    }

    @Override
    public void onComplete(AsyncEvent event) throws IOException {
        for (Runnable handler : this.completionHandlers) {
            handler.run();
        }
        //執行完完成時,清空asyncContext
        this.asyncContext = null;
        this.asyncCompleted.set(true);
    }
}

WebAsyncUtils

//org.springframework.web.context.request.async.WebAsyncUtils
public abstract class WebAsyncUtils {
    //第一次獲取時,直接創建WebAsyncManager,並設置到setAttribute中。 以后獲取,直接從request屬性中獲取。
    public static WebAsyncManager getAsyncManager(ServletRequest servletRequest) {
        WebAsyncManager asyncManager = (WebAsyncManager) servletRequest.getAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE);
        if (asyncManager == null) {
            asyncManager = new WebAsyncManager();
            servletRequest.setAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE, asyncManager);
        }
        return asyncManager;
    }
    
    public static WebAsyncManager getAsyncManager(WebRequest webRequest) {
        //邏輯類似  getAsyncManager(ServletRequest servletRequest) 略...
    }
    
    //判斷ServletRequest是否有方法"startAsync"。 只有servlet環境3.0以上版本才有此方法
    public static AsyncWebRequest createAsyncWebRequest(HttpServletRequest request, HttpServletResponse response) {
        return ClassUtils.hasMethod(ServletRequest.class, "startAsync") ?
                createStandardServletAsyncWebRequest(request, response) : new NoSupportAsyncWebRequest(request, response);
    }
    
    private static AsyncWebRequest createStandardServletAsyncWebRequest(HttpServletRequest request, HttpServletResponse response) {
        if (standardAsyncRequestConstructor == null) {
            String className = "org.springframework.web.context.request.async.StandardServletAsyncWebRequest";
            Class<?> clazz = ClassUtils.forName(className, WebAsyncUtils.class.getClassLoader());
            standardAsyncRequestConstructor = clazz.getConstructor(HttpServletRequest.class, HttpServletResponse.class);
        }
        return (AsyncWebRequest) BeanUtils.instantiateClass(standardAsyncRequestConstructor, request, response);
    }
}

WebAsyncManager

WebAsyncManager是SpringMVC處理異步請求過程中最核心的類,它管理着整個異步處理的過程。

//org.springframework.web.context.request.async
public final class WebAsyncManager {
    //兩種類型的 超時 Interceptors
    private static final CallableProcessingInterceptor timeoutCallableInterceptor = new TimeoutCallableProcessingInterceptor();
    private static final DeferredResultProcessingInterceptor timeoutDeferredResultInterceptor = new TimeoutDeferredResultProcessingInterceptor();
    
    //持有 asyncWebRequest 對象
    private AsyncWebRequest asyncWebRequest;
    private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(this.getClass().getSimpleName());
    
    //兩種類型的 處理請求Interceptors
    private final Map<Object, CallableProcessingInterceptor> callableInterceptors = new LinkedHashMap<Object, CallableProcessingInterceptor>();
    private final Map<Object, DeferredResultProcessingInterceptor> deferredResultInterceptors = new LinkedHashMap<Object, DeferredResultProcessingInterceptor>();    
    
    //用來處理Callable 和 WebAsyncTask 類型的異步請求
    public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext) throws Exception {   }
    
    //用來處理 DeferredResult 和 ListenableFuture 類型的請求 
    public void startDeferredResultProcessing(final DeferredResult<?> deferredResult, Object... processingContext) throws Exception 
}

它最重要的兩個方法是:startCallableProcessingstartDeferredResultProcessing,這兩個方法是啟動異步處理的入口方法,它們一共做三件事:

  1. 給Request設置屬性(timeout,timeoutHandler,completionHandler…)
  2. 在相應位置,執行interceptors邏輯
  3. 啟動異步處理

這里重點分析下startCallableProcessing

public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext) throws Exception {
    //設置asyncWebRequest 屬性...
    Long timeout = webAsyncTask.getTimeout();
    if (timeout != null) {
        this.asyncWebRequest.setTimeout(timeout);
    }

    AsyncTaskExecutor executor = webAsyncTask.getExecutor();
    if (executor != null) {
        this.taskExecutor = executor;
    }
    
    //初始化 interceptors
   
   //在asyncWebRequest 執行前后,執行完成,超時 等關鍵時間節點 執行 interceptors 邏輯...
   
   //啟動異步處理
    startAsyncProcessing(processingContext);

    // 線程池 執行callable方法....
    this.taskExecutor.submit(new Runnable() {
        @Override
        public void run() {
            // interceptors 略....
            Object result = callable.call();
            
            //設置處理結果,並發送請求
            setConcurrentResultAndDispatch(result);
        }
    });
}

    
//調用asyncWebRequest.startAsync()啟動異步處理 
private void startAsyncProcessing(Object[] processingContext) {
    clearConcurrentResult();
    this.concurrentResultContext = processingContext;
    this.asyncWebRequest.startAsync();
}    

//判斷是否已經有異步處理結果
public boolean hasConcurrentResult() {
    //concurrentResult 初始化時 = RESULT_NONE
    return (this.concurrentResult != RESULT_NONE);
}

 //設置處理結果,並發送請求
private void setConcurrentResultAndDispatch(Object result) {
    synchronized (WebAsyncManager.this) {
        //判斷是否已經有異步處理結果
        if (hasConcurrentResult()) {
            return;
        }
        
        //將result設置為當前處理結果
        this.concurrentResult = result;
    }
    
    //如果異步請求在這里已經被設置為異步處理完成狀態,則記錄錯誤日志。(網絡異常會造成此種問題)
    if (this.asyncWebRequest.isAsyncComplete()) {
        logger.error("Could not complete async processing due to timeout or network error");
        return;
    }
    //再次發送請求:SpringMVC請求處理完成之后再次發送一個相同的請求。在HandlerAdapter做特殊處理
    this.asyncWebRequest.dispatch();
}

SpringMVC 對異步的支持

SpringMVC想要支持異步處理,首先DispatchServlet要配置:<async-supported>true</async-supported>,其次請求方法的返回值為:Callable,WebAsyncTask,,DeferredResult 和 ListenableFuture

@Controller
@RequestMapping("/async")
public class AsyncController {

    @RequestMapping(value = "/callable",produces = "text/plain;charset=UTF-8")
    @ResponseBody
    public Callable<String> callable(){
        System.out.println("Callable進入主線程...");
        Callable<String> result =  new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(5 * 1000);
                System.out.println("Callable子線程執行ing...");
                return "Callable:"+"久等了";
            }
        };
        System.out.println("Callable主線程退出...");
        return result;
    }

    @RequestMapping(value = "/web",produces = "text/plain;charset=UTF-8")
    @ResponseBody
    public WebAsyncTask<String> web(){
        System.out.println("WebAsyncTask 進入主線程...");
        WebAsyncTask task = new WebAsyncTask(new Callable() {
            @Override
            public Object call() throws Exception {
                Thread.sleep(5 * 1000);
                System.out.println("WebAsyncTask 子線程執行ing...");
                return "WebAsyncTask:"+"久等了";
            }
        });
        System.out.println("WebAsyncTask 主線程退出...");
        return task;
    }

    @RequestMapping(value = "/deferred",produces = "text/plain;charset=UTF-8")
    @ResponseBody
    public DeferredResult<String> deferred(){
        //這里的 7 * 1000 L ,是指主線程結束之后的超時時間。
        DeferredResult<String> result = new DeferredResult<String>(7 * 1000L , "超時了");
        approve(result);
        try {
            Thread.sleep(10 * 1000); //在主線程執行這段代碼,並不會拋出"超時了"
        } catch (InterruptedException e) {
        }
        return result;
    }

    private void approve(final DeferredResult<String> result) {
        new Thread(() -> {
            try {
                Thread.sleep(5 * 1000);
                result.setResult("同意:" + LocalDateTime.now());
            } catch (InterruptedException e) {
            }
        }).start();
    }

    @RequestMapping(value = "/future",produces = "text/plain;charset=UTF-8")
    public ListenableFuture<ResponseEntity<String>> future(){
        ListenableFuture<ResponseEntity<String>> future = new AsyncRestTemplate().getForEntity("http://www.baidu.com", String.class);
        return future;
    }
}

源碼跟蹤

springMVC異步處理請求的過程是總體上可以拆分為2次:

  • 第一次,啟動異步請求,並設置timeout,completion等事件的監聽,直接返回 null;
  • 第二次,當監聽到completion 時,直接在發送一次相同的請求,並將執行結果返回。

SpringMVC執行請求方法的過程都是在HandlerAdater中進行的。

在之前解析RequestMappingHandlerAdapter#invokeHandleMethod()處理請求時,將異步請求部分給剔除了,現在回看此方法:

//org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter
private ModelAndView invokeHandleMethod(HttpServletRequest request,
            HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {

        ServletWebRequest webRequest = new ServletWebRequest(request, response);

        WebDataBinderFactory binderFactory = getDataBinderFactory(handlerMethod);
        ModelFactory modelFactory = getModelFactory(handlerMethod, binderFactory);
        ServletInvocableHandlerMethod requestMappingMethod = createRequestMappingMethod(handlerMethod, binderFactory);

        //mavContainer相關略......

        AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response);
        asyncWebRequest.setTimeout(this.asyncRequestTimeout);

        final WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
        asyncManager.setTaskExecutor(this.taskExecutor);
        asyncManager.setAsyncWebRequest(asyncWebRequest);
        asyncManager.registerCallableInterceptors(this.callableInterceptors);
        asyncManager.registerDeferredResultInterceptors(this.deferredResultInterceptors);
        
        //異步請求是否已經完成
        if (asyncManager.hasConcurrentResult()) {
            //如果異步請求已經處理完成,則獲取執行結果  --- 1
            Object result = asyncManager.getConcurrentResult();
            mavContainer = (ModelAndViewContainer) asyncManager.getConcurrentResultContext()[0];
            
            //清空執行結果
            asyncManager.clearConcurrentResult();
            
            //覆蓋原有的requestMappingMethod方法;  --- 2
            requestMappingMethod = requestMappingMethod.wrapConcurrentResult(result);
        }
        
        //執行方法 -- 3
        requestMappingMethod.invokeAndHandle(webRequest, mavContainer);
        
        //asyncManager是否已經啟動
        if (asyncManager.isConcurrentHandlingStarted()) {
            //-- 4
            return null;
        }
        // --- 5
        return getModelAndView(mavContainer, modelFactory, webRequest);
    }
  • 第一次執行時: 會執行上述代碼中的 3,4
  • 第二次執行時: 執行上述代碼中的1,2,3,5 。 注意步驟2,會將原有的requestMappingMethod重寫.接下來會分析。

ServletInvocableHandlerMethod.invokeAndHandle(webRequest, mavContainer)

springMVC在使用RequestMappingHandlerAdapter#invokeHandleMethod()處理請求時,會調用ServletInvocableHandlerMethod#invokeAndHandle()方法,該方法在處理完畢之后,會調用

this.returnValueHandlers.handleReturnValue(returnValue, getReturnValueType(returnValue), mavContainer, webRequest);`

處理返回值,針對上述四種類型的結果,匹配不同的XXReturnValueHandler.

Callable : CallableMethodReturnValueHandler

//org.springframework.web.servlet.mvc.method.annotation.CallableMethodReturnValueHandler
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType,
        ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
    //if 略...
    Callable<?> callable = (Callable<?>) returnValue;
    WebAsyncUtils.getAsyncManager(webRequest).startCallableProcessing(callable, mavContainer);
}

WebAsyncTask : AsyncTaskMethodReturnValueHandler

//org.springframework.web.servlet.mvc.method.annotation.AsyncTaskMethodReturnValueHandler
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType,
        ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
    //if 略...
    WebAsyncTask<?> webAsyncTask = (WebAsyncTask<?>) returnValue;
    webAsyncTask.setBeanFactory(this.beanFactory);
    WebAsyncUtils.getAsyncManager(webRequest).startCallableProcessing(webAsyncTask, mavContainer);
}

這里可以看出 Callable和webAsyncTask都是用了startCallableProcessing方法。

DeferredResultDeferredResultMethodReturnValueHandler

//org.springframework.web.servlet.mvc.method.annotation.DeferredResultMethodReturnValueHandler
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType,
        ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
    //if 略...
    DeferredResult<?> deferredResult = (DeferredResult<?>) returnValue;
    WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer);
}

DeferredResultDeferredResultMethodReturnValueHandler

//org.springframework.web.servlet.mvc.method.annotation.ListenableFutureReturnValueHandler
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType,
        ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
    //if 略...
    final DeferredResult<Object> deferredResult = new DeferredResult<Object>();
    WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer);

    ListenableFuture<?> future = (ListenableFuture<?>) returnValue;
    future.addCallback(new ListenableFutureCallback<Object>() {
        @Override
        public void onSuccess(Object result) {
            deferredResult.setResult(result);
        }
        @Override
        public void onFailure(Throwable ex) {
            deferredResult.setErrorResult(ex);
        }
    });
}

自此可以說明看 DeferredResult 和 ListenableFuture都是用了startDeferredResultProcessing方法。

ServletInvocableHandlerMethod.wrapConcurrentResult(result)

第二次請求時,要重點關注此行:requestMappingMethod.wrapConcurrentResult(result),此時的result已經是異步執行后的最終結果,不是DeferredResult.

//org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod
private static final Method CALLABLE_METHOD = ClassUtils.getMethod(Callable.class, "call");

public ServletInvocableHandlerMethod(Object handler, Method method) {
    super(handler, method);
    initResponseStatus();
}
ServletInvocableHandlerMethod wrapConcurrentResult(Object result) {
    return new ConcurrentResultHandlerMethod(result, new ConcurrentResultMethodParameter(result));
}

//org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod $ ConcurrentResultMethodParameter
private class ConcurrentResultMethodParameter extends HandlerMethodParameter {
    private final Object returnValue;
    private final ResolvableType returnType;
    
    //直接傳入返回值returnValue, 返回值的類型為 returnValue的類型
    public ConcurrentResultMethodParameter(Object returnValue) {
        super(-1);
        this.returnValue = returnValue;
        this.returnType = ResolvableType.forType(super.getGenericParameterType()).getGeneric(0);
    }
}

//org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod $ ConcurrentResultHandlerMethod
private class ConcurrentResultHandlerMethod extends ServletInvocableHandlerMethod {
    public ConcurrentResultHandlerMethod(final Object result, ConcurrentResultMethodParameter returnType) {
        //調用父類的構造方法(handler,method),最終調用 method.invoke();
        super(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                if (result instanceof Exception) {
                    throw (Exception) result;
                }
                else if (result instanceof Throwable) {
                    throw new NestedServletException("Async processing failed", (Throwable) result);
                }
                //此時的result即為最終異步處理的結果.
                return result;
            }
        }, CALLABLE_METHOD);
        setHandlerMethodReturnValueHandlers(ServletInvocableHandlerMethod.this.returnValueHandlers);
        this.returnType = returnType;
    }
}

第二次執行 requestMappingMethod.invokeAndHandle(webRequest, mavContainer);,此時的requestMappingMethod已經是偽造后的結果,該方法的返回值也被偽造ConcurrentResultMethodParameter,最終調用的為ConcurrentResultHandlerMethod在構造函數中定義的Callable.call();

SpringMVC想要支持異步處理,首先DispatchServlet要配置:<async-supported>true</async-supported>,其次請求方法的返回值為:Callable,WebAsyncTask,,DeferredResult 和 ListenableFuture

<task:executor />配置參數:

  • id:當配置多個executor時,被@Async(“id”)指定使用;也被作為線程名的前綴。
  • pool-size:
    • core size:最小的線程數,缺省:1
    • max size:最大的線程數,缺省:Integer.MAX_VALUE
  • queue-capacity:當最小的線程數已經被占用滿后,新的任務會被放進queue里面,當這個queue的capacity也被占滿之后,pool里面會創建新線程處理這個任務,直到總線程數達到了max size,這時系統會拒絕這個任務並拋出TaskRejectedException異常(缺省配置的情況下,可以通過rejection-policy來決定如何處理這種情況)。缺省值為:Integer.MAX_VALUE
  • keep-alive:超過core size的那些線程,任務完成后,再經過這個時長(秒)會被結束掉
  • rejection-policy:當pool已經達到max size的時候,如何處理新任務
    • ABORT(缺省):拋出TaskRejectedException異常,然后不執行
    • DISCARD:不執行,也不拋出異常
    • DISCARD_OLDEST:丟棄queue中最舊的那個任務
    • CALLER_RUNS:不在新線程中執行任務,而是有調用者所在的線程來執行

Java編程方式的配置方法:

@Configuration  
@EnableAsync  
public class SpringConfig {  

    /** Set the ThreadPoolExecutor's core pool size. */  
    private int corePoolSize = 10;  
    /** Set the ThreadPoolExecutor's maximum pool size. */  
    private int maxPoolSize = 200;  
    /** Set the capacity for the ThreadPoolExecutor's BlockingQueue. */  
    private int queueCapacity = 10;  

    private String ThreadNamePrefix = "MyLogExecutor-";  

    @Bean  
    public Executor logExecutor() {  
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();  
        executor.setCorePoolSize(corePoolSize);  
        executor.setMaxPoolSize(maxPoolSize);  
        executor.setQueueCapacity(queueCapacity);  
        executor.setThreadNamePrefix(ThreadNamePrefix);  

        // rejection-policy:當pool已經達到max size的時候,如何處理新任務  
        // CALLER_RUNS:不在新線程中執行任務,而是有調用者所在的線程來執行  
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());  
        executor.initialize();  
        return executor;  
    }  

}  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM