Spring異步請求處理


Servlet容器配置

在web.xml中對DispatcherServlet和所有filter添加

對於配置了web.xml的應用程序,請確保更新至版本3.0:

<web-app xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
version="3.0">

</web-app

必須通過web.xml中的 true </ async-supported>子元素在DispatcherServlet上啟用異步支持。 另外,必須將參與異步請求處理的所有Filter配置為支持ASYNC調度程序類型。 為Spring框架提供的所有過濾器啟用ASYNC調度程序類型應該是安全的,因為它們通常擴展了OncePerRequestFilter,並且可以在運行時檢查是否需要將過濾器包含在異步調度中。

以下是一些示例web.xml配置:

<web-app xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
version="3.0">
    
<filter>
   <filter-name>Spring OpenEntityManagerInViewFilter</filter-name>
   <filter-class>org.springframework.orm.jpa.support.OpenEntityManagerInViewFilter</filter-class>
   <async-supported>true</async-supported>
</filter>
<filter-mapping>
     <filter-name>Spring OpenEntityManagerInViewFilter</filter-name>
     <url-pattern>/*</url-pattern>
     <dispatcher>REQUEST</dispatcher>
     <dispatcher>ASYNC</dispatcher>
</filter-mapping>
<servlet>
     <servlet-name>dispatcher</servlet-name>
     <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
        <init-param>
            <param-name>contextConfigLocation</param-name>
            <param-value/>
        </init-param>
        <load-on-startup>2</load-on-startup>
        <async-supported>true</async-supported>
    </servlet>
     <servlet-mapping>
        <servlet-name>dispatcher</servlet-name>
        <url-pattern>/*</url-pattern>
     </servlet-mapping>
</web-app>

注意:如果你的Filter是基於注解配置的需要增加如下,@WebFilter(asyncSupported = true,dispatcherTypes = {DispatcherType.ASYNC,DispatcherType.REQUEST})

如果使用Servlet 3(例如通過WebApplicationInitializer的基於Java的配置),則還需要設置“ asyncSupported”標志以及ASYNC調度程序類型,就像使用web.xml一樣。為了簡化所有配置,請考慮擴展AbstractDispatcherServletInitializer,或更好的AbstractAnnotationConfigDispatcherServletInitializer,它會自動設置這些選項並使注冊Filter實例非常容易。

Spring MVC 配置

MVC Java配置和MVC名稱空間提供用於配置異步請求處理的選項。 WebMvcConfigurer具有方法configureAsyncSupport,而<mvc:annotation-driven>有一個 子元素

這些允許配置用於異步請求的默認超時值,如果未設置,則取決於底層的Servlet容器(例如,在Tomcat上為10秒)。 您還可以配置AsyncTaskExecutor來執行從控制器方法返回的Callable實例。 強烈建議配置此屬性,因為默認情況下,Spring MVC使用SimpleAsyncTaskExecutor。 它不會重復使用線程,因此不建議用於生產環境。MVC Java配置和MVC命名空間還允許您注冊CallableProcessingInterceptor和DeferredResultProcessingInterceptor實例。

   <bean id="threadPoolTaskExecutor"
          class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
       <!--最小線程數 -->
        <property name="corePoolSize" value="5" />
       <!--最大線程數 -->
        <property name="maxPoolSize" value="10" />
       <!--緩沖隊列大小 -->
        <property name="queueCapacity" value="50" />
       <!--線程池中產生的線程名字前綴 -->
        <property name="threadNamePrefix" value="Async-Task-" />
        <!--線程池中空閑線程的存活時間單位秒 -->
        <property name="keepAliveSeconds" value="30" />
    </bean>
    <aop:aspectj-autoproxy/>
    <mvc:annotation-driven >
        <mvc:async-support default-timeout="10000" task-executor="threadPoolTaskExecutor"/>
    </mvc:annotation-driven>

default-timeout:指定異步請求處理超時之前的時間(以毫秒為單位)。在Servlet 3中,超時從主要請求處理線程退出后開始,到請求結束時結束再次分派以進一步處理同時產生的結果。 如果未設置此值,使用底層實現的默認超時時間,例如 使用Servlet 3在Tomcat上運行10秒。

Java Config配置

/**
 * 異步配置類
 */
@Configuration
public class AsynWebConfig implements WebMvcConfigurer {

    //配置自定義TaskExecutor
    @Override
    public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
        configurer.setDefaultTimeout(60 * 1000L);
        configurer.registerCallableInterceptors(timeoutInterceptor());
        configurer.setTaskExecutor(threadPoolTaskExecutor());
    }

    //異步處理攔截
    @Bean
    public TimeoutCallableProcessingInterceptor timeoutInterceptor() {
        return new TimeoutCallableProcessingInterceptor();
    }
    //異步線程池
    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor t = new ThreadPoolTaskExecutor();
        t.setCorePoolSize(5);
        t.setMaxPoolSize(10);
        t.setThreadNamePrefix("NEAL");
        return t;
    }

}

配置異步請求處理

Spring MVC 3.2引入了基於Servlet 3的異步請求處理。 現在,控制器方法無需像往常一樣返回值,而是可以返回java.util.concurrent.Callable並從Spring MVC托管線程產生返回值。 同時,退出並釋放主要的Servlet容器線程,並允許其處理其他請求。 Spring MVC借助TaskExecutor在一個單獨的線程中調用Callable,當Callable返回時,該請求被分派回Servlet容器,以使用Callable返回的值恢復處理。 這是這種控制器方法的示例:

    @PostMapping(value = "v1/files.do")
    public Callable<String> processUpload(final MultipartFile file) {
        return new Callable<String>() {
            @Override
            public String call() throws Exception {

                return "someView";
            }
        };
    }

另一個選項是控制器方法返回DeferredResult的一個實例。在這種情況下,返回值也會從任何線程中產生,即一個不是由Spring MVC管理的線程。例如,可能會在響應某些外部事件(如JMS消息、調度任務等)時生成結果。下面是這樣一個控制器方法的例子:

使用阻塞隊列異步處理用戶請求,超過阻塞隊列容量提示限流

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;

import java.util.UUID;

/**
 * @author Created by niugang on 2020/4/2/20:00
 */
@RestController
@Slf4j
public class DeferredResultUserInfoSaveController {

    private final SimilarQueueHolder similarQueueHolder;

    @Autowired
    public DeferredResultUserInfoSaveController(SimilarQueueHolder similarQueueHolder) {
        this.similarQueueHolder = similarQueueHolder;
    }

    @PostMapping("/deferred/result")
    public DeferredResult<Object> deferredResultHelloWolrd(@RequestBody UserInfo userInfo ) {

        printlnThread("主線程--deferredResultHelloWolrd開始執行");
        //聲明異步DeferredResult
        DeferredResult<Object> deferredResult = new DeferredResult<>();
        userInfo.setId(UUID.randomUUID().toString());
        deferredResult.setResult(userInfo);
        //模擬放入消息隊列
        boolean offer = similarQueueHolder.getBlockingDeque().offer(deferredResult);
        if(!offer){
            log.info("添加任務到隊列:{}",offer);
            DeferredResult<Object> deferredResult1 = new DeferredResult<>();
            deferredResult1.setResult("限流了稍后重試");
            return deferredResult1;
        }
        log.info("添加任務到隊列:{}",offer);
        printlnThread("主線程--deferredResultHelloWolrd結束執行");
        return deferredResult;
    }



    /**
     * 打印當前線程
     * @param object object
     */
    private void printlnThread(Object object) {
        String threadName = Thread.currentThread().getName();
        log.info("HelloWorldAsyncController[{}]:{} ",threadName,object) ;
    }



}
import lombok.Data;

/**
 * @author Created by niugang on 2020/4/2/20:04
 */
@Data
public class UserInfo {

    private  String name;


    private int  age;

    private String id;
}

import org.springframework.stereotype.Component;
import org.springframework.web.context.request.async.DeferredResult;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * 模擬消息隊列
 *
 * @author Created by niugang on 2020/4/2/19:59
 */
@Component
public class SimilarQueueHolder {

    /**
     * 創建容量為5的阻塞隊列
     */
    private static BlockingQueue<DeferredResult<Object>> blockingDeque = new ArrayBlockingQueue<>(5);

    public BlockingQueue<DeferredResult<Object>> getBlockingDeque() {
        return blockingDeque;
    }

}

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.web.context.request.async.DeferredResult;

import java.util.concurrent.TimeUnit;

/**
 * 使用監聽器來模擬消息隊列處理
 * @author Created by niugang on 2020/4/2/20:00
 */
@Configuration
@Slf4j
public class QueueListener implements ApplicationListener<ContextRefreshedEvent> {

    private final SimilarQueueHolder similarQueueHolder;

    @Autowired
    public QueueListener(SimilarQueueHolder similarQueueHolder) {
        this.similarQueueHolder = similarQueueHolder;
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        new Thread(()->{
            while(true) {
                try {
                    //從隊列中取出DeferredResult
                    DeferredResult<Object> deferredResult = similarQueueHolder.getBlockingDeque().take();
                    log.info("開始DeferredResult異步處理");
                    //模擬處理時間
                    TimeUnit.SECONDS.sleep(3);
                    log.info("用戶信息:{}",deferredResult.getResult());
                    log.info("結束DeferredResult異步處理");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }


}

在這里插入圖片描述
在這里插入圖片描述

如果不了解Servlet 3.0異步請求處理特性,就很難理解這一點。多了解這方面的情況肯定會有幫助。以下是關於潛在機制的一些基本事實:

  • 可以通過調用request.startAsync()將ServletRequest置於異步模式。 這樣做的主要效果是Servlet以及所有過濾器都可以退出,但響應將保持打開狀態,以便以后可以完成處理

  • 調用request.startAsync()返回AsyncContext,該AsyncContext可用於進一步控制異步處理。 例如,它提供了方法分派,類似於Servlet API的轉發,但它允許應用程序恢復Servlet容器線程上的請求處理。

  • ServletRequest提供對當前DispatcherType的訪問,該訪問可用於區分處理初始請求,異步分派,轉發和其他分派器類型。

考慮到上述內容,以下是使用Callable進行異步請求處理的事件序列:

  • 控制器返回Callable。
  • Spring MVC開始異步處理,並將Callable提交給TaskExecutor在單獨的線程中進行處理。
  • DispatcherServlet和所有Filter退出Servlet容器線程,但響應保持打開狀態
  • Callable產生結果,Spring MVC將請求分派回Servlet容器以恢復處理。
  • 再次調用DispatcherServlet,並使用Callable異步生成的結果恢復處理。

DeferredResult請求的事件序列

  • 控制器返回DeferredResult並將其保存在一些內存隊列或列表中,可以在其中訪問
  • Spring MVC開始異步處理
  • DispatcherServlet和所有已配置的Filter退出請求處理線程,但響應保持打開狀態。
  • 應用程序從某個線程設置DeferredResult,Spring MVC將請求分派回Servlet容器。
  • 再次調用DispatcherServlet,並以異步產生的結果恢復處理。

在這里插入圖片描述


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM