淺聊接口性能優化——異步處理


HTTP作為一種無狀態的協議采用的是請求-應答的模式,每當客戶端發起的請求到達服務器,Servlet 容器通常會為每個請求使用一個線程來處理。為了避免線程創建和銷毀的資源消耗,一般會采用線程池,而線程池中的線程數量是有限的,當線程池中的線程被全部使用,客戶端只能等待有空閑線程處理請求。

在这里插入图片描述

實際場景中,部分線程可能因為等待數據庫查詢結果或遠程 Web 資源被阻塞,如果阻塞時間過長,線程池中的線程很快就被耗盡,從而導致無法處理其他請求。

Servlet 異步處理

為了提高系統的吞吐量,我們需要盡量使處理請求的線程處於非空閑狀態。如果能夠將那些長時間阻塞的線程利用起來處理新請求,由其他線程等資源滿足時再繼續處理前面的請求,這樣對吞吐量的提升就會有很大的幫助。

Java EE 自 Servlet 3.0 開始對 Servlet 和 Filter 提供了異步支持,如果 Servlet 和 Filter 在處理請求時可能會發生阻塞,可以將阻塞請求線程的操作分配到異步線程,然后將處理請求的線程歸還到 Servlet 容器中的線程池,而不產生響應,當異步線程中的操作完成,異步線程可以直接產生響應或將請求重新分派到容器中的 Servlet 處理。

在这里插入图片描述

Servlet 異步處理實戰

先通過一個案例了解如何使用 Servlet 中的異步處理。

默認情況下 Servlet 和 Filter 都不支持異步,需要在部署描述符或注解中開啟異步支持。

部署描述符開啟異步支持示例如下:

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
          http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd"
         version="4.0">
    
    <servlet>
        <servlet-name>asyncA</servlet-name>
        <servlet-class>com.zzuhkp.mvc.AsyncServlet</servlet-class>
        <!--支持異步處理-->
        <async-supported>true</async-supported>
    </servlet>
    <servlet-mapping>
        <servlet-name>asyncA</servlet-name>
        <url-pattern>/async/a</url-pattern>
    </servlet-mapping>

    <filter>
        <filter-name>asyncFilter</filter-name>
        <filter-class>com.zzuhkp.mvc.AsyncFilter</filter-class>
        <!--支持異步處理-->
        <async-supported>true</async-supported>
    </filter>
    <filter-mapping>
        <filter-name>asyncFilter</filter-name>
        <servlet-name>asyncA</servlet-name>
    </filter-mapping>
</web-app>

部署描述符開啟異步支持的重點是設置 servlet 或 filter 標簽下的 async-supported 值為 true。

注解開啟異步支持的示例如下:

@WebFilter(value = "/async/a", asyncSupported = true)
public class AsyncFilter implements Filter {
    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        chain.doFilter(request, response);
    }
}

@WebServlet(urlPatterns = "/async/a", asyncSupported = true)
public class AsyncServlet extends HttpServlet {

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        // 1. 開啟異步處理
        AsyncContext asyncContext = req.startAsync(req, resp);
        Executors.newSingleThreadExecutor().submit(new Runnable() {
            @Override
            public void run() {
                try {
                    // 2. 使用新線程執行耗時操作
                    Thread.sleep(10000L);
                    // 3. 耗時操作完成后進行響應
                    asyncContext.getResponse().getWriter().write("this is a async servlet");
                    // 4. 通知容器異步操作完成
                    asyncContext.complete();
                } catch (IOException | InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

通過注解開啟異步支持的重點是設置 @WebFilter 或 @WebServlet 中的 asyncSupported 為 true。

注意上述 Servlet 還列出了進行異步操作的常用步驟:

  1. 先使用 ServletRequest#startAsync(ServletRequest, ServletResponse) 開啟異步。
  2. 開啟異步后使用新線程進行異步處理,執行耗時操作。
  3. 新線程耗時操作完成后可以使用取到的資源信息發起響應。
  4. 最后調用第一步開啟異步支持返回的異步上下文 AsyncContext#complete 方法通知容器異步處理已經結束。

Servlet 異步處理詳解

開啟異步支持

開啟異步支持有兩個方法,分別如下:

  • ServletRequest#startAsync(ServletRequest,ServletResponse)
  • ServletRequest#startAsync()

這兩個參數都將返回一個異步處理的上下文 AsyncContext,不同的是如果使用了無參的 #startAsync 方法,AsyncContext 內部持有的 request、response 將是原始的,無論 Filter 是否對 request、response 進行了包裝。

結束異步處理

異步處理完成后有兩種結束的方式:一種如上面的示例通知容器返回響應到客戶端,另一種是通知容器使用其他 Servlet 繼續處理請求。

關聯的方法有4個:

  • AsyncContext#complete
  • AsyncContext#dispatch()
  • AsyncContext#dispatch(String)
  • AsyncContext#dispatch(ServletContext, String)

AsyncContext 中的 #complete 用於在異步線程中通知容器向客戶端發出響應,此后異步線程不可再產生響應。

AsyncContext 中的 #dispatch 用於通知容器重新派發請求。無參數的重載方法重新派發請求到當前請求路徑,有參數的重載方法可以指定派發請求的路徑。

派發類型判斷

由於異步處理后可以重新派發請求到當前 URL,因此需要判斷派發類型,知道當前請求是從哪里產生的,從而使用不同處理邏輯,這可以通過 ServletRequest#getDispatcherType 方法來實現,這個方法返回的是一個 DispatcherType 枚舉類型,每個枚舉值的含義如下:

public enum DispatcherType {
    // request.getRequestDispatcher("/path").forward(request,response) 產生的請求
    FORWARD,
    // request.getRequestDispatcher("/path").include(request,response) 產生的請求
    INCLUDE,
    // 客戶端正常發起請求
    REQUEST,
    // 異步處理 AsyncContext#dispatch 分派的請求
    ASYNC,
    // Servlet 產生錯誤,轉發請求到錯誤頁面
    ERROR
}

異步處理監聽

異步處理開始和結束之間,容器還會產生一些事件,可以通過 AsyncContext#addListener(AsyncListener) 方法添加對異步事件的監聽,具體可以監聽的事件如下:

public interface AsyncListener extends EventListener {
    // 異步處理完成
    public void onComplete(AsyncEvent event) throws IOException;
    // 異步處理超時
    public void onTimeout(AsyncEvent event) throws IOException;
    // 異步處理發生異常
    public void onError(AsyncEvent event) throws IOException;
    // ServletRequest#startAsync 重新開啟異步
    public void onStartAsync(AsyncEvent event) throws IOException;     
}

異步處理默認的超時時間是 30 秒,可以通過 AsyncContext#setTimeout 設置超時時間,以設置時間重新計算。

Spring MVC 異步處理

Spring MVC 結合自身特性,對 Servlet 中的異步處理進行了封裝,使異步處理更為簡便。

快速體驗 Spring MVC 異步處理

Spring MVC 手動配置 DispatcherServlet 需要指定 async-supported 為 true,Spring Boot 環境下已經默認開啟了異步處理的支持。

在 Spring MVC 中使用異步處理最簡單的方式是在 controller 方法中直接返回 Callable 類型,示例代碼如下:

@RestController
public class AsyncController {

    @GetMapping("/test")
    public Callable<String> test() {
        Callable<String> callable = new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "this is a test";
            }
        };
        return callable;
    }
}

controller 方法返回 Callable 類型之后,Spring 會自動使用異步線程池調用 Callable#call 方法,然后對 #call 方法返回值重新解析,解析方式和普通的 controller 方法一致,上述示例代碼將向瀏覽器輸出一段文字。

Spring MVC 異步處理常用的兩種方式

Callable

Callable 作為 controller 方法返回值是最常用的一種方式,這種方式會使用 Spring 默認的線程池進行異步處理。具體可以參見上面的示例。

DeferredResult

如果需要指定異步處理的線程池,將 DeferredResult 作為 controller 方法的返回值是更好的選擇,DeferredResult 不僅可以手動指定線程池,還可以配置異步處理的回調,如超時、完成、錯誤。示例代碼如下:

@RestController
public class AsyncController {

    @GetMapping("/test")
    public DeferredResult<String> test() {
        DeferredResult<String> deferredResult = new DeferredResult<>();
        Executors.newSingleThreadExecutor().submit(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                // 模擬耗時的操作
                Thread.sleep(5000L);
                // 設置異步處理結果
                deferredResult.setResult("this is a test");
            }
        });
        // 設置異步處理回調
        deferredResult.onTimeout(() -> System.out.println("異步處理超時"));
        deferredResult.onCompletion(() -> System.out.println("異步處理完成"));
        deferredResult.onError((throwable) -> System.out.println("異步處理錯誤:" + throwable.getMessage()));

        return deferredResult;
    }
}

上述代碼將 DeferredResult 作為 controller 返回值,然后在線程池中手動設置了返回的結果,相對來說更為靈活。

Spring MVC 異步處理的其他方式

除了上述 Callable 和 DeferredResult 兩種類型作為 controller 方法返回值,還有其他幾種使用相對沒那么頻繁的類型可以作為 controller 方法的返回值類型,這幾種類型與 Callable 或 DeferredResult 相互適配。

StreamingResponseBody、ResponseEntity<StreamingResponseBody>

StreamingResponseBody 可以使用原始的方式輸出響應,Spring 內部將這個類適配為 Callable,在異步處理的時候回調這個接口然后輸出響應。

ResponseEntity<StreamingResponseBody> 與 StreamingResponseBody 在 Spring 內部處理處理方式相似,Spring 會先根據 ResponseEntity 設置 HTTP 響應碼、響應頭,然后解析出 StreamingResponseBody 處理。

StreamingResponseBody 示例代碼如下:

@RestController
public class AsyncController {

    @GetMapping("/test")
    public StreamingResponseBody test() {
        StreamingResponseBody body = new StreamingResponseBody() {
            @Override
            public void writeTo(OutputStream outputStream) throws IOException {
                BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream));
                writer.write("this is a test");
            }
        };
        return body;
    }
}

WebAsyncTask

WebAsyncTask 是 Callable 最底層的實現,Callable 最終將適配為 WebAsyncTask,這個類和 DeferredResult 功能類似,可以指定異步執行線程池、異步執行回調,由於底層使用了 Callable ,因此不能手動指定何時產生響應。示例代碼如下:

@RestController
public class AsyncController {

    @GetMapping("/test")
    public WebAsyncTask<String> test() {
        // 設置超時時間、線程池、異步任務
        WebAsyncTask<String> task = new WebAsyncTask<>(5000L, new SimpleAsyncTaskExecutor(), new Callable<String>() {
            @Override
            public String call() throws Exception {
                // 模擬耗時的操作
                Thread.sleep(5000L);
                // 返回異步處理結果
                return "this ia a test";
            }
        });

        // 設置異步處理回調
        task.onTimeout(() -> "異步處理超時");
        task.onCompletion(() -> System.out.println("異步處理完成"));
        task.onError(() -> "異步處理錯誤");

        return task;
    }
}

ListenableFuture

ListenableFuture 是 Spring 對 Future 擴展提出的接口,可以在任務執行成功或者失敗時回調給定的接口方法。在異步處理中,如果 controller 方法返回這個類型,Spring 會將其適配為 DeferredResult,異步任務執行成功后設置異步處理的結果。從功能上來說弱於 DeferredResult,不能設置超時時間及超時回調。 示例代碼如下:

@RestController
public class AsyncController {

    @GetMapping("/test")
    public ListenableFuture<String> test() {
        ListenableFutureTask<String> task = new ListenableFutureTask<>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                // 模擬耗時的操作
                Thread.sleep(5000L);
                // 返回異步處理結果
                return "this is a test";
            }
        });
        task.addCallback(new ListenableFutureCallback<String>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("異步任務異常:" + ex.getMessage());
            }

            @Override
            public void onSuccess(String result) {
                System.out.println("異步任務執行完成");
            }
        });
        // 提交異步任務
        Executors.newSingleThreadExecutor().submit(task);

        return task;
    }
}

CompletionStage

CompletionStage 是 JDK 1.8 提供的表示異步執行的其中一個階段,可以在當前階段完成后進入下一個階段,典型的實現是 CompletableFuture。

使用 CompletableFuture 作為 controller 作為返回值,Spring 會將其適配為 DeferredResult,在當前階段完成后設置異步處理的結果,從功能上來說強於 Callable,可以設置線程池,但不能設置回調和設置超時時間。示例代碼如下:

@RestController
public class AsyncController {

    @GetMapping("/test")
    public CompletionStage<String> test() {
        CompletionStage<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                return "this is a test";
            }
        }, Executors.newSingleThreadExecutor());

        return future;
    }
}

ResponseBodyEmitter、ResponseEntity<ResponseBodyEmitter>

ResponseBodyEmitter 類型的作用類似於 Servlet 異步處理原生的 API,支持用戶多次發出響應,這個類型作為 controller 方法返回類型后,Spring 同樣會將這個類型適配為 DeferredResult。這個類型支持異步處理回調、設置超時時間,指定線程池等。

ResponseEntity<ResponseBodyEmitter> 相比 ResponseBodyEmitter 多了設置響應碼,響應頭的能力。

ResponseBodyEmitter 示例代碼如下:

@RestController
public class AsyncController {

    @GetMapping("/test")
    public ResponseBodyEmitter test() {

        ResponseBodyEmitter emitter = new ResponseBodyEmitter(5000L);

        // 異步線程池中執行耗時任務
        Executors.newSingleThreadExecutor().submit(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                // 設置異步處理回調
                emitter.onCompletion(() -> System.out.println("異步處理完成"));
                emitter.onTimeout(() -> System.out.println("異步處理"));
                emitter.onError((throwable) -> System.out.println("異步處理異常:" + throwable.getMessage()));

                // 模擬耗時操作
                Thread.sleep(3000L);

                // 發送響應
                emitter.send("this is ");
                emitter.send("a test");


                // 通知容器異步處理完成
                emitter.complete();
            }
        });

        return emitter;
    }
}

需要注意的是由於 Spring 需要等待 controller 方法返回后才能真正設置回調,因此如果異步任務如果在 controller 方法返回前就已經執行結束,回調將無法生效。

Spring MVC 異步處理方式總結

這里總結幾種 controller 方法返回類型的異同,上述中的幾種類型的適配關系可以如下圖所示:

在这里插入图片描述

圖中下面的類型可以適配到上面的類型,最終由 WebAsyncManager 使用來開啟異步處理。

各類型功能異同如下表,可根據需求選擇合適的類型進行異步處理。

類型 是否支持設置線程池 是否需要手動開啟異步線程 是否支持超時設置 是否支持異步回調 是否支持多次輸出響應
Callable
DeferredResult
StreamingResponseBody
WebAsyncTask
ListenableFuture 僅支持成功失敗回調
CompletionStage
ResponseBodyEmitter

Spring 異步處理流程

首先 Spring 將按照正常的流程執行 controller 方法,方法返回后 Spring 處理和異步有關的幾個類型值,然后開始異步處理。以 Callable 類型為例,處理這個返回值類型的代碼如下:

public class CallableMethodReturnValueHandler implements HandlerMethodReturnValueHandler {

    @Override
    public boolean supportsReturnType(MethodParameter returnType) {
    	return Callable.class.isAssignableFrom(returnType.getParameterType());
    }

    @Override
    public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
    							  ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {

    	if (returnValue == null) {
    		mavContainer.setRequestHandled(true);
    		return;
    	}

    	Callable<?> callable = (Callable<?>) returnValue;
    	// 開啟異步處理
    	WebAsyncUtils.getAsyncManager(webRequest).startCallableProcessing(callable, mavContainer);
    }

}

Spring 先調用 WebAsyncUtils.getAsyncManager 方法獲取異步管理器 WebAsyncManager,WebAsyncManager 是異步處理的核心類,WebAsyncManager 獲取之后會將實例存儲到 request 的屬性中。代碼如下:

public abstract class WebAsyncUtils {

	public static WebAsyncManager getAsyncManager(WebRequest webRequest) {
		int scope = RequestAttributes.SCOPE_REQUEST;
		WebAsyncManager asyncManager = null;
		Object asyncManagerAttr = webRequest.getAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE, scope);
		if (asyncManagerAttr instanceof WebAsyncManager) {
			asyncManager = (WebAsyncManager) asyncManagerAttr;
		}
		if (asyncManager == null) {
			asyncManager = new WebAsyncManager();
			// 將實例存儲至 request 屬性
			webRequest.setAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE, asyncManager, scope);
		}
		return asyncManager;
	}

}

然后 Spring 調用 WebAsyncManager#startCallableProcessing(Callable<?>, Object...) 開始異步處理,包括設置回調、開啟異步處理、執行異步任務等等,這里將用到 Servlet 原生的 API,由於代碼較多,不再展示。執行異步任務后 Spring 會調用 AsyncContext#dispatch() 將請求重新派發到當前 controller。

當請求轉發到當前 controller 時,RequestMappingHandlerAdapter 會再次執行 controller 方法,此時從 request 屬性中取出 WebAsyncManager,發現已經產生異步處理的結果,然后對表示 controller 方法的 ServletInvocableHandlerMethod 加以包裝,使其直接返回異步處理結果,后面和正常流程一樣,最終將結果輸出到客戶端。這塊代碼可參考 RequestMappingHandlerAdapter#invokeHandlerMethod,不再具體展示。

 

參考:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM