本文基於 Spring Boot 2.6.0
基於之前提到的 Reactor
的出現,使得編寫響應式程序成為可能。為此,Spring 的開發團隊決定添加有關 Reactor
模型的網絡層。這樣做的話將會對 Spring MVC 作出許多重大的修改,因此 Spring 的研發團隊決定開發一個單獨的響應式處理框架,隨之,Spring WeFlux 就這么誕生了。
Spring WebFlux 與 Spring MVC 的關系如下:
Spring WebFlux 的大部分內容都借鑒了 Spring MVC,許多在 Spring MVC 上使用的注解在 Spring WebFlux 上依舊是可用的,但是 Spring WebFlux 會為此作出特定於 Reactor
的實現
基本使用
注解
依舊可以使用在 Spring MVC 中存在的那些注解
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import reactor.core.publisher.Mono;
@Controller
@RequestMapping(path = "/hello")
public class HelloController {
@GetMapping(path = "") // MVC 相關
public String hello(Model model) {
model.addAttribute("name", "xhliu2");
return "hello";
}
@GetMapping(path = "/test") // Rest。。。。
public @ResponseBody
Mono<String> test() {
return Mono.just("xhliu2"); // 每個請求都將創建一個 Mono 流,在對請求作出響應時將會將這些流組合到一起(flatMap),因此整個請求整體來講將會是非阻塞的
}
}
RouteFunction
通過 RouteFunction
來定義請求處理:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
@Configuration
public class RouterController {
/*
通過 RouterFunction 來定義處理邏輯。。。。
*/
@Bean
public RouterFunction<ServerResponse> route1() {
return RouterFunctions.route(
RequestPredicates.GET("/route1"), // 定義請求方法和路徑
// 使用函數式的方式來處理請求,這是為了結合 Reactor 的最佳處理方式(非阻塞)
request -> ServerResponse.ok().body(Mono.just("This is a Mono Sample"), String.class)
);
}
}
如果需要定義多個請求路徑,可以額外定義一個 RouterFunction
的 Bean,也可以在一個 RouterFunction
Bean 中定義額外的處理路徑和處理邏輯
@Bean
public RouterFunction<ServerResponse> route2() {
return RouterFunctions.route( // 第一個處理邏輯。。。。
RequestPredicates.GET("/route2"),
request -> ServerResponse.ok().body(Mono.just("This is route2"), String.class)
).andRoute( // 定義的第二個處理。。。
RequestPredicates.GET("/route3"),
request -> ServerResponse.ok().body(Mono.just("This is route3"), String.class)
);
}
也可以通過預先定義的 Bean 的相關的方法,使用函數式編程的方式來處理對應的邏輯:
首先定義一個 Bean,用於定義一些邏輯處理:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
@Component(value = "handleA")
public class HandlerA {
private final static Logger log = LoggerFactory.getLogger(HandlerA.class);
public Mono<ServerResponse> echo(ServerRequest request) {
log.info("Ready to echo......");
return ServerResponse.ok().body(Mono.just(request.queryParams().toString()), String.class);
}
}
再定義對應的路徑的處理邏輯:
@Bean
public RouterFunction<ServerResponse> route3(@Autowired HandlerA handlerA) {
return RouterFunctions.route(
RequestPredicates.GET("/route4"),
handlerA::echo
);
}
源碼解析
WebFlux 的初始化
-
根據
classpath
來判斷當前的 web 應用所屬的類型// org.springframework.boot.SpringApplication。。。 public SpringApplication(ResourceLoader resourceLoader, Class<?>... primarySources) { this.webApplicationType = WebApplicationType.deduceFromClasspath(); // 省略一部分不太相關的代碼。。。。 }
deduceFromClasspath()
方法對應的源代碼:// 該方法位於 org.springframework.boot.WebApplicationType 中 private static final String WEBMVC_INDICATOR_CLASS = "org.springframework.web.servlet.DispatcherServlet"; private static final String WEBFLUX_INDICATOR_CLASS = "org.springframework.web.reactive.DispatcherHandler"; private static final String JERSEY_INDICATOR_CLASS = "org.glassfish.jersey.servlet.ServletContainer"; static WebApplicationType deduceFromClasspath() { /* 如果當前加載的 Class 中,WEBFLUX_INDICATOR_CLASS 已經被加載並且 WEBMVC_INDICATOR_CLASS 和 JERSEY_INDICATOR_CLASS 都沒有被加載的情況下,才會認為當前的 Web 應用的類型是 Reactive 的 */ if (ClassUtils.isPresent(WEBFLUX_INDICATOR_CLASS, null) && !ClassUtils.isPresent(WEBMVC_INDICATOR_CLASS, null) && !ClassUtils.isPresent(JERSEY_INDICATOR_CLASS, null)) { return WebApplicationType.REACTIVE; } for (String className : SERVLET_INDICATOR_CLASSES) { if (!ClassUtils.isPresent(className, null)) { return WebApplicationType.NONE; } } return WebApplicationType.SERVLET; }
-
創建
Reactive
應用上下文創建應用上下文對應的源代碼:
// 該源代碼位於 org.springframework.boot.ApplicationContextFactory 中。。。 // 注意這里的 Lamada 表達式。。。 ApplicationContextFactory DEFAULT = (webApplicationType) -> { try { switch (webApplicationType) { case SERVLET: return new AnnotationConfigServletWebServerApplicationContext(); case REACTIVE: // 根據上一步推斷出的 Web 應用類型為 Reactive,因此會走這 return new AnnotationConfigReactiveWebServerApplicationContext(); default: return new AnnotationConfigApplicationContext(); } } catch (Exception ex) { throw new IllegalStateException("Unable create a default ApplicationContext instance, " + "you may need a custom ApplicationContextFactory", ex); } };
實例化
AnnotationConfigReactiveWebServerApplicationContext
對應的源代碼:// 一些基本的 Spring IOC 的內容。。。。具體細節可以查看有關 Spring IOC 部分的內容 public AnnotationConfigReactiveWebServerApplicationContext() { this.reader = new AnnotatedBeanDefinitionReader(this); this.scanner = new ClassPathBeanDefinitionScanner(this); }
-
之后就是一般的 Spring IOC 容器的創建和
Bean
的初始化了,與Reactor
相關的比較重要的部分為onRefresh()
方法調用的階段,這個方法使用到了模板方法
模式,在org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext
類中得到了具體的實現onRefresh()
在WebFlux
中的實現的源代碼如下:@Override protected void onRefresh() { // AbstractApplicationContext 中定義的 “模板方法”,就目前 Spring 5.3.13 的版本來講,是一個空的方法 super.onRefresh(); createWebServer(); // 由 WebFlux 具體定義 // 省略有一部分異常捕獲代碼 }
createWebServer()
方法對應的源代碼如下:// 該方法定義依舊位於 org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext 中 private void createWebServer() { WebServerManager serverManager = this.serverManager; // 默認為 null if (serverManager == null) { // 獲取 BeanFactory。。。。。 StartupStep createWebServer = this.getApplicationStartup().start("spring.boot.webserver.create"); String webServerFactoryBeanName = getWebServerFactoryBeanName(); /* 默認情況下,WebFlux 會選擇 Netty 作為服務器,這是因為 Netty 的處理模型十分適合 Reactor 編程,因此能夠很好地契合 WebFlux 在這里的 webServerFactory 為 org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory */ ReactiveWebServerFactory webServerFactory = getWebServerFactory(webServerFactoryBeanName); createWebServer.tag("factory", webServerFactory.getClass().toString()); // 獲取 BeanFactory 結束。。。。 boolean lazyInit = getBeanFactory().getBeanDefinition(webServerFactoryBeanName).isLazyInit(); // 默認為 false /* 比較關鍵的部分,這里會創建一個 WebServerManager */ this.serverManager = new WebServerManager(this, webServerFactory, this::getHttpHandler, lazyInit); // 剩下的部分就是完成一些其它 Bean 的注冊了。。。 getBeanFactory().registerSingleton("webServerGracefulShutdown", new WebServerGracefulShutdownLifecycle(this.serverManager.getWebServer())); getBeanFactory().registerSingleton("webServerStartStop", new WebServerStartStopLifecycle(this.serverManager)); createWebServer.end(); } // 最后再初始化相關的屬性資源,在當前的類中,這也是一個模板方法 initPropertySources(); }
-
剩下的就是一般的 IOC 初始化流程,在此不做贅述
WebServerFactory 的實例化
具體對應上文描述的 createWebServer
() 方法中
ReactiveWebServerFactory webServerFactory=getWebServerFactory(webServerFactoryBeanName);
的部分,其中 getWebServerFactory
對應的源代碼如下:
protected ReactiveWebServerFactory getWebServerFactory(String factoryBeanName) {
/*
當前環境下的 factoryBeanName 為 "nettyReactiveWebServerFactory",按照 Spring Bean 默認的命名方式,將會加載 NettyReactiveWebServerFactory 作為 ReactiveWebServerFactory 的實現
*/
return getBeanFactory().getBean(factoryBeanName, ReactiveWebServerFactory.class);
}
WebServerManager
的實例化對應的源代碼如下:
WebServerManager(
ReactiveWebServerApplicationContext applicationContext,
ReactiveWebServerFactory factory,
Supplier<HttpHandler> handlerSupplier, boolean lazyInit
) {
this.applicationContext = applicationContext;
Assert.notNull(factory, "Factory must not be null");
/*
比較重要的部分就是有關 HttpHandler 的處理,在這里定義了 HttpHandler Bean 的初始化方式
結合上文中默認傳入的參數,在當前的上下文環境中不是以 lazy-init 的方式進行加載的
*/
this.handler = new DelayedInitializationHttpHandler(handlerSupplier, lazyInit);
this.webServer = factory.getWebServer(this.handler);
}
具體 NettyReactiveWebServerFactory
中對 getWebServer(handler)
方法的實現如下:
// 該方法定義於 org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory
@Override
public WebServer getWebServer(HttpHandler httpHandler) {
HttpServer httpServer = createHttpServer();
/*
這里是重點部分!HttpHandler 的作用相當於 Spring MVC 中的 DispatcherServlet,用於處理請求的分發,以及尋找 Handler 對請求進行處理。。。。
這里使用到了 "適配器模式", handlerAdapter 將 HttpHandler 適配到 Netty 的 Channel,使得原本不相干的兩個對象能夠協同工作
*/
ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter(httpHandler);
// 創建 Netty 服務端。。。。。。。。
NettyWebServer webServer = createNettyWebServer(
httpServer, handlerAdapter, this.lifecycleTimeout, getShutdown()
);
webServer.setRouteProviders(this.routeProviders);
return webServer;
}
HttpHandler 的實例化
在 Reactive
中,對於 handlerSupplier
的定義如下:
// 該方法定義於 org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext 中
protected HttpHandler getHttpHandler() {
// Use bean names so that we don't consider the hierarchy
String[] beanNames = getBeanFactory().getBeanNamesForType(HttpHandler.class);
// 省略一部分參數檢測代碼。。。。
// 一般的 BeanFactory 獲取 Bean 的步驟
return getBeanFactory().getBean(beanNames[0], HttpHandler.class);
}
由於 Spring Boot 自動配置的存在,在創建應用時會把能夠自動配置的類自動配置到 IOC 中,具體包括 spring.factories
文件中定義的 Bean、以及使用 @Configuration
注解修飾的配置類。
在 WebFlux
中 HttpHandler
的配置類的定義如下:
// 此靜態類位於 org.springframework.boot.autoconfigure.web.reactive.HttpHandlerAutoConfiguration 類中,這個類在 spring.factories 文件中定義為是可以自動配置的
@Configuration(proxyBeanMethods = false)
public static class AnnotationConfig {
private final ApplicationContext applicationContext;
public AnnotationConfig(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Bean
public HttpHandler httpHandler(ObjectProvider<WebFluxProperties> propsProvider) {
/*
這里使用到了構建者模式的方式來創建對象。。。。。
*/
HttpHandler httpHandler = WebHttpHandlerBuilder.applicationContext(this.applicationContext).build();
// 省略一部分不太重要的代碼。。。。。
return httpHandler;
}
}
applicationContext(applicationContext)
對應的源代碼如下:
public static WebHttpHandlerBuilder applicationContext(ApplicationContext context) {
/*
獲取 WebHandler Bean,由於 Spring Boot 的自動配置的存在,在將 org.springframework.boot.autoconfigure.web.reactive.HttpHandlerAutoConfiguration 配置類加載到 IOC 容器中時,將會自動引入 DispatcherHandler 的 WebHandler Bean
HttpHandlerAutoConfiguration 在 spring.factories 中定義為是可以自動配置的
*/
WebHttpHandlerBuilder builder = new WebHttpHandlerBuilder(
context.getBean(WEB_HANDLER_BEAN_NAME, WebHandler.class), context);
// 添加 WebFliter。。。。
List<WebFilter> webFilters = context
.getBeanProvider(WebFilter.class)
.orderedStream()
.collect(Collectors.toList());
builder.filters(filters -> filters.addAll(webFilters));
// 異常處理 Handler。。。
List<WebExceptionHandler> exceptionHandlers = context
.getBeanProvider(WebExceptionHandler.class)
.orderedStream()
.collect(Collectors.toList());
builder.exceptionHandlers(handlers -> handlers.addAll(exceptionHandlers));
// 省略一部分不太重要的代碼。。。。
return builder;
}
bulid()
方法的定義如下:
// 該方法定義於 org.springframework.web.server.adapter.WebHttpHandlerBuilder 中
public HttpHandler build() {
/*
這里是 WebFlux 用於處理請求的關鍵的地方,通過 “裝飾者” 模式,將 FilterWebHandler 通過 ExceptionHandlingWebHandler 進行 “裝飾”,使得在處理請求時先執行 ExceptionHandlingWebHandler 的 handle 的處理邏輯,從而增強了底層 FilterWebHandler 的功能
在設計時值得考慮使用這樣的方式來優化自己的設計,從而盡可能地復用已有的對象和類
*/
WebHandler decorated = new FilteringWebHandler(this.webHandler, this.filters);
decorated = new ExceptionHandlingWebHandler(decorated, this.exceptionHandlers);
// 因此最終生成的 HttpHandler 的具體實例化類為 HttpWebHandlerAdapter
HttpWebHandlerAdapter adapted = new HttpWebHandlerAdapter(decorated);
// 省略一部分設置屬性相關的代碼。。。
return (this.httpHandlerDecorator != null ? this.httpHandlerDecorator.apply(adapted) : adapted);
}
組件的定義
這里又涉及到 Spring Boot 的自動配置,spring.factories
文件中定義了對於 WebFlux 的自動配置類 WebFluxAutoConfiguration
:
// 該類定義於org.springframework.boot.autoconfigure.web.reactive.WebFluxAutoConfiguration
@Configuration(proxyBeanMethods = false)
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
@ConditionalOnClass(WebFluxConfigurer.class) // 有關 WebFlux 的相關配置。。。。
// 這里引入的類是和 WebFlux 相關的主要類
@ConditionalOnMissingBean({ WebFluxConfigurationSupport.class })
@AutoConfigureAfter({ ReactiveWebServerFactoryAutoConfiguration.class, CodecsAutoConfiguration.class,
ReactiveMultipartAutoConfiguration.class, ValidationAutoConfiguration.class,
WebSessionIdResolverAutoConfiguration.class })
@AutoConfigureOrder(Ordered.HIGHEST_PRECEDENCE + 10)
public class WebFluxAutoConfiguration {
// 省略類中的內容
}
主要需要關注的是 WebFluxConfigurationSupport
的引入,在這個類中定義了有關請求分發和處理的邏輯的類。
WebFluxConfigurationSupport
中定義的 Bean 如下:
// 此類定義於 org.springframework.web.reactive.config.WebFluxConfigurationSupport
public class WebFluxConfigurationSupport implements ApplicationContextAware {
/*
這個 Bean 類似於 Spring MVC 中的 DiaptcherServlet,用於處理請求的分發以及查找對應的 handler 去處理對應的請求(“外觀模式” 的使用)
*/
@Bean
public DispatcherHandler webHandler() {
return new DispatcherHandler();
}
/*
由於篇幅問題,在此省略了一些其它必需的 Bean 的定義
*/
}
簡單起見,在此僅僅只是描述一下 WebFluxConfigurationSupport
中定義的必需的 Bean:
-
DispatcherHandler
用於處理請求的分發、為當前請求尋找對應的處理 Handler,類似於 Spring MVC 中的 DispatcherServlet
-
RouterFunctionMapping
和RequestMappingHandlerMapping
定義了請求和處理方法之間的對應關系,Spring WebFlux 支持使用傳統的 Spring MVC 的注解方式來定義 Handler,也支持使用
RouterFunction
通過函數式的方式來定義對應的請求的 Handler -
RequestMappingHandlerAdapter
和HandlerFunctionAdapter
同樣地,兩者是都是為了處理實際的請求而做的適配,和 Spring MVC 中對 Handler 的適配是一樣的。由於 Spring WebFlux 支持使用
@RequestMapping
的方式來定義請求,因此也必須對這種類型的方式定義對應的適配器。 -
WebSocketHandlerAdapter
對於
WebSocket
的支持。。。。 -
ResponseEntityResultHandler
、ResponseBodyResultHandler
、ViewResolutionResultHandler
以及ServerResponseResultHandler
ResponseEntityResultHandler
、ResponseBodyResultHandler
和ServerResponseResultHandler
都是針對 Rest 的響應結果(Http
);ViewResolutionResultHandler
則是相當於返回的是一個View
(MVC 中的View
)即 “視圖”
整合 Handler
有了這些組件之后,值得關心的地方就是 .*Adapter
和對應的 Handler 之間的連接,即 Adapter 是如何調用 Handler的
首先,經過一系列的 debug 操作,得到 WebFlux 對於一個一般的 Http 請求的處理鏈如下:

有關 WebHandler
的類層次結構如下:

對請求的具體分析:
-
服務的啟動
以
NettyWebServer
為例,查看服務啟動的源代碼// 該方法定義於 org.springframework.boot.web.embedded.netty.NettyWebServer DisposableServer startHttpServer() { HttpServer server = this.httpServer; // HttpServerBind 為當上下文的具體實現 if (this.routeProviders.isEmpty()) { // 在這里定義了對於請求的處理邏輯。。。。 server = server.handle(this.handler); } else { server = server.route(this::applyRouteProviders); } if (this.lifecycleTimeout != null) { return server.bindNow(this.lifecycleTimeout); } return server.bindNow(); }
handle(handler)
對應的源代碼如下:// 該方法定義於 reactor.netty.http.server.HttpServer /* 該方法的主要目的是捕獲來自客戶端的請求,附加一個 IO 處理程序以對連接的客戶端作出響應 */ public final HttpServer handle( BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) { Objects.requireNonNull(handler, "handler"); // 重點在於具體的 handler,它定義了處理請求的邏輯 return childObserve(new HttpServerHandle(handler)); }
繼續查看
HttpServerHandle
的源代碼,具體如下:static final class HttpServerHandle implements ConnectionObserver { final BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler; HttpServerHandle(BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) { this.handler = handler; } /* 重點部分在這里,這里設置一個觀察者來監聽請求,當請求狀態發生改變時,則作出對應的響應 */ @Override @SuppressWarnings("FutureReturnValueIgnored") public void onStateChange(Connection connection, State newState) { if (newState == HttpServerState.REQUEST_RECEIVED) { try { if (log.isDebugEnabled()) { log.debug(format(connection.channel(), "Handler is being applied: {}"), handler); } HttpServerOperations ops = (HttpServerOperations) connection; /* 重點部分就在這里了,在當前的上下問環境中當前的 handler 為 ReactorHttpHandlerAdapter 具體細節可以查看 org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext 在 refresh() 階段創建 WebManager 的源代碼,在那里定義了當前 webServer 的具體實現過程 */ Mono<Void> mono = Mono.fromDirect(handler.apply(ops, ops)); if (ops.mapHandle != null) { mono = ops.mapHandle.apply(mono, connection); } mono.subscribe(ops.disposeSubscriber()); } catch (Throwable t) { log.error(format(connection.channel(), ""), t); //"FutureReturnValueIgnored" this is deliberate connection.channel() .close(); } } } }
-
ReactorHttpHandlerAdapter
具體處理請求的源代碼如下:
// 此方法定義於 org.springframework.http.server.reactive.ReactorHttpHandlerAdapter @Override public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) { // 用於創建對應的 ByteBuf,具體可以查看有關 Netty 的 ByteBuf NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc()); /* 為 request 和 response 創建對應的 ByteBuf。。。。。 但是就目前步驟來講並不會真正創建 ByteBuf,而是只是設置創建 ByteBuf 的工廠對象 */ ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory); ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory); // Rest 風格的請求,“Head”,不常用。。。 if (request.getMethod() == HttpMethod.HEAD) { response = new HttpHeadResponseDecorator(response); } /* 這里是重點部分!在這里定義了實際的處理邏輯 (handle 方法) */ return this.httpHandler.handle(request, response) .doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage())) .doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed")); // 省略部分異常檢測代碼。。。。 }
-
HttpWebHandlerAdapter
經過上文的相關分析,在當前的運行條件下唯一存在於 Spring IOC 的
HttpHandler
的實際 Bean 為HttpWebHandlerAdapter
,即具體執行的handle()
方法為HttpWebHandlerAdapter
的具體實現對應的源代碼如下:
// 該方法定義於 org.springframework.web.server.adapter.HttpWebHandlerAdapter @Override public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) { if (this.forwardedHeaderTransformer != null) { request = this.forwardedHeaderTransformer.apply(request); // 省略一部分異常檢測代碼。。。。 } ServerWebExchange exchange = createExchange(request, response); // 省略一部分日志打印的代碼。。。。 /* 通過對 HttpHandler 實例化的分析,現在調用的是最外層的 ExceptionHandlingWebHandler 的 handle() 方法 */ return getDelegate().handle(exchange) .doOnSuccess(aVoid -> logResponse(exchange)) .onErrorResume(ex -> handleUnresolvedError(exchange, ex)) .then(Mono.defer(response::setComplete)); }
-
ExceptionHandlingWebHandler
其中具體的源代碼如下:
// 該方法定義於 org.springframework.web.server.handler.ExceptionHandlingWebHandler。。。 @Override public Mono<Void> handle(ServerWebExchange exchange) { Mono<Void> completion; try { // 首先會執行父類的 handle 方法 completion = super.handle(exchange); } catch (Throwable ex) { // 處理過程中存在異常則返回一個 error 到上層 completion = Mono.error(ex); } /* 遍歷當前上下文中存在的 WebExceptionHandler,如果處理結果存在對應的 Handler 預定義的異常,那么將會處理對應的異常。。。 */ for (WebExceptionHandler handler : this.exceptionHandlers) { completion = completion.onErrorResume(ex -> handler.handle(exchange, ex)); } return completion; }
根據上文中的
WebHandler
的類結構圖,ExceptionHandlingWebHandler
繼承自WebHandlerDecorator
,其中中的handle()
方法的定義如下:// 該具體實現定義於 org.springframework.web.server.handler.WebHandlerDecorator @Override public Mono<Void> handle(ServerWebExchange exchange) { /* 結合之前使用 “構建者模式” 來構造 HttpHandler 的過程,在構建時通過 “裝飾器模式” 來增強 FilterWebHandler 功能的邏輯,對於當前的執行上下文 ExceptionHandlingWebHandler, 其中的 delegate 為 FilterWebHandler */ return this.delegate.handle(exchange); }
也就是說,WebFlux 在接收到一個請求時,首先將請求發送到
ExceptionHandlingWebHandler
進行進一步的處理,而在ExceptionHandlingWebHandler
在調用handle()
方法進行處理時,首先會再講請求發送到下一層的handle()
方法,最后通過處理結果再執行當前上下文對應的邏輯在
ExceptionHandlingWebHandler
中,當處理結果出現異常時將會進行捕獲,並返回一個帶有 error 的Mono
-
FilteringWebHandler
具體的源代碼如下:
// 該方法定義於 org.springframework.web.server.handler.FilteringWebHandler 中 /* 在使用 “構建者模式” 構建 HttpHandler Bean 時,會創建一個 FilteringWebHandler 的實例 傳入的參數為 DispatcherHandler, filters */ public FilteringWebHandler(WebHandler handler, List<WebFilter> filters) { super(handler); // 設置當前的 delegate,將請求發送到下一層 this.chain = new DefaultWebFilterChain(handler, filters); } @Override public Mono<Void> handle(ServerWebExchange exchange) { // 在這里定義了對應的過濾處理 return this.chain.filter(exchange); }
結合構造
FilteringWebHandler
對象時的構造函數,chain
的具體實例對象為DefaultWebFilterChain
,具體的filter(exchange)
方法的定義如下:// org.springframework.web.server.handler.DefaultWebFilterChain @Override public Mono<Void> filter(ServerWebExchange exchange) { return Mono.defer(() -> this.currentFilter != null && this.chain != null ? // 如果沒有定義 Filter,那么就會將 request 發送到下一層的 WebHandler,在當前環境下下一層的 WebHandler 為 DispatcherHandler invokeFilter(this.currentFilter, this.chain, exchange) : this.handler.handle(exchange) ); }
-
DispatcherHandler
對應的源代碼如下:
// org.springframework.web.reactive.DispatcherHandler @Override public Mono<Void> handle(ServerWebExchange exchange) { // 省略一部分不太重要的代碼。。。 return Flux.fromIterable(this.handlerMappings) /* 遍歷對應的處理的 Mapping,將它們組合為一個 Flux,這么做的目的是為了使得 @RequestMapping 注解能夠和 RouterFunction 能夠協同工作 經過這一輪處理之后將會得到所有的 request 對應的 handler */ .concatMap(mapping -> mapping.getHandler(exchange)) .next() // 為每個處理創建一個單獨的 Mono,以達到完全異步的效果 .switchIfEmpty(createNotFoundError()) // 調用對應的 handler 方法處理請求 .flatMap(handler -> invokeHandler(exchange, handler)) // 將處理后的結果進行一定的封裝之后再組合成一個 Flux .flatMap(result -> handleResult(exchange, result)); }
invokeHandler(exchange, handler)
對應的源代碼如下:// org.springframework.web.reactive.DispatcherHandler private Mono<HandlerResult> invokeHandler( ServerWebExchange exchange, Object handler ) { // 省略一部分 CORS 檢測。。。 if (this.handlerAdapters != null) { // 通過合適的 HandlerAdapter 適配所有的 handler,使得 handler 能夠正常地處理請求 for (HandlerAdapter handlerAdapter : this.handlerAdapters) { if (handlerAdapter.supports(handler)) { // “適配器模式” 的使用 :) return handlerAdapter.handle(exchange, handler); } } } return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler)); }
具體的
HandlerAdapter
的類結構圖如下: -
HandlerFunctionAdapter
最后一步的
handle(exchange)
方法,對應的源代碼如下:// 在這里以 HandlerFunctionAdapter 的 handle() 方法為例 @Override public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) { HandlerFunction<?> handlerFunction = (HandlerFunction<?>) handler; ServerRequest request = exchange.getRequiredAttribute(RouterFunctions.REQUEST_ATTRIBUTE); // 調用對應的 handle(request) 函數進行對應的處理 return handlerFunction.handle(request) .map(response -> new HandlerResult(handlerFunction, response, HANDLER_FUNCTION_RETURN_TYPE)); }
最后對相應結果進行封裝:
// org.springframework.web.reactive.DispatcherHandler .... private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) { // 就當前上下文來講,對應的 HandlerResultHandler 為 ServerResponseResultHandler return getResultHandler(result).handleResult(exchange, result) .checkpoint("Handler " + result.getHandler() + " [DispatcherHandler]") .onErrorResume(ex -> result.applyExceptionHandler(ex).flatMap(exResult -> { String text = "Exception handler " + exResult.getHandler() + ", error=\"" + ex.getMessage() + "\" [DispatcherHandler]"; return getResultHandler(exResult).handleResult(exchange, exResult).checkpoint(text); })); }
ServerResponseResultHandler
處理結果對應的源代碼如下:/* org.springframework.web.reactive.function.server.support.ServerResponseResultHandler */ @Override public Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) { ServerResponse response = (ServerResponse) result.getReturnValue(); Assert.state(response != null, "No ServerResponse"); // 將響應結果寫入到 response 中。。。 return response.writeTo(exchange, new ServerResponse.Context() { @Override public List<HttpMessageWriter<?>> messageWriters() { return messageWriters; } @Override public List<ViewResolver> viewResolvers() { return viewResolvers; } }); }