1.簡介
1.1 概述
In WebFlux.fn, an HTTP request is handled with a
HandlerFunction
: a function that takesServerRequest
and returns a delayedServerResponse
(i.e.Mono<ServerResponse>
). Both the request and the response object have immutable contracts that offer JDK 8-friendly access to the HTTP request and response.HandlerFunction
is the equivalent of the body of a@RequestMapping
method in the annotation-based programming model.Incoming requests are routed to a handler function with a
RouterFunction
: a function that takesServerRequest
and returns a delayedHandlerFunction
(i.e.Mono<HandlerFunction>
). When the router function matches, a handler function is returned; otherwise an empty Mono.RouterFunction
is the equivalent of a@RequestMapping
annotation, but with the major difference that router functions provide not just data, but also behavior.
RouterFunctions.route()
provides a router builder that facilitates the creation of routers, as the following example shows:
在WebFlux.fn中,HTTP請求由HandlerFunction處理:該函數接受ServerRequest並返回延遲的ServerResponse(即Mono
1.2 特點
Router functions are used to route the requests to the corresponding
HandlerFunction
. Typically, you do not write router functions yourself, but rather use a method on theRouterFunctions
utility class to create one.RouterFunctions.route()
(no parameters) provides you with a fluent builder for creating a router function, whereasRouterFunctions.route(RequestPredicate, HandlerFunction)
offers a direct way to create a router.Generally, it is recommended to use the
route()
builder, as it provides convenient short-cuts for typical mapping scenarios without requiring hard-to-discover static imports. For instance, the router function builder offers the methodGET(String, HandlerFunction)
to create a mapping for GET requests; andPOST(String, HandlerFunction)
for POSTs.Besides HTTP method-based mapping, the route builder offers a way to introduce additional predicates when mapping to requests. For each HTTP method there is an overloaded variant that takes a
RequestPredicate
as a parameter, though which additional constraints can be expressed.
路由器功能用於將請求路由到相應的HandlerFunction。通常,您不是自己編寫路由器功能,而是使用RouterFunctions實用程序類上的方法創建一個。 RouterFunctions.route()(無參數)為您提供了一個流暢的生成器來創建路由器功能,而RouterFunctions.route(RequestPredicate,HandlerFunction)提供了直接創建路由器的方法。通常,建議使用route()構建器,因為它為典型的映射方案提供了便捷的捷徑,而無需發現靜態導入。例如,路由器功能構建器提供了GET(String,HandlerFunction)方法來為GET請求創建映射。和POST(String,HandlerFunction)進行POST。除了基於HTTP方法的映射外,路由構建器還提供了一種在映射到請求時引入其他謂詞的方法。對於每個HTTP方法,都有一個以RequestPredicate作為參數的重載變體,盡管可以表達其他約束。
2.演示環境
- JDK 1.8.0_201
- Spring Boot 2.2.0.RELEASE
- 構建工具(apache maven 3.6.3)
- 開發工具(IntelliJ IDEA )
3.演示代碼
3.1 代碼說明
使用 RouterFunction
的方式使用 webflux
3.2 代碼結構
3.3 maven 依賴
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
</dependencies>
3.4 配置文件
無配置
3.5 java代碼
UserModel.java
public class UserModel {
private Long id;
private String name;
private Integer age;
private String birthday;
private String address;
private String phone;
public UserModel() {}
public UserModel(Long id, String name, Integer age, String birthday, String address, String phone) {
this.id = id;
this.name = name;
this.age = age;
this.birthday = birthday;
this.address = address;
this.phone = phone;
}
// get&set&toString
}
UserRepository.java
@Repository
public class UserRepository {
// 預置兩條數據,所以起始值從2開始
private static final AtomicLong ID_GENERATOR = new AtomicLong(2);
// 模擬數據庫操作
private static final Map<Long, UserModel> USER_MAP = new HashMap<>();
@PostConstruct
public void init() {
UserModel user1 = new UserModel(1L, "zhangsan", 20, "2000-01-02", "beijing", "13666666666");
UserModel user2 = new UserModel(2L, "lisi", 30, "1990-03-23", "shanghai", "13888888888");
USER_MAP.put(user1.getId(), user1);
USER_MAP.put(user2.getId(), user2);
}
public List<UserModel> findAll() {
return new ArrayList<>(USER_MAP.values());
}
public UserModel findById(Long id) {
return USER_MAP.get(id);
}
public UserModel add(UserModel userModel) {
long id = ID_GENERATOR.incrementAndGet();
userModel.setId(id);
USER_MAP.put(id, userModel);
return userModel;
}
public UserModel update(UserModel userModel) {
USER_MAP.put(userModel.getId(), userModel);
return USER_MAP.get(userModel.getId());
}
public UserModel deleteById(Long id) {
UserModel userModel = USER_MAP.get(id);
USER_MAP.remove(id);
return userModel;
}
}
UserHandler.java
@Component
public class UserHandler {
@Autowired
private UserRepository userRepository;
public Mono<ServerResponse> list(ServerRequest request) {
// ServerResponse.ok().body(Flux.fromIterable(userRepository.findAll()), UserModel.class);
return ServerResponse.ok().body(Flux.fromStream(userRepository.findAll().stream()), UserModel.class);
}
public Mono<ServerResponse> findById(ServerRequest request) {
return Mono.justOrEmpty(userRepository.findById(Long.valueOf(request.pathVariable("id"))))
.flatMap(user -> ServerResponse.ok().body(Mono.just(user), UserModel.class))
.switchIfEmpty(ServerResponse.notFound().build()); // 輸出 404 Not Found
}
public Mono<ServerResponse> add(ServerRequest request) {
return ServerResponse.ok().body(
request.bodyToMono(UserModel.class).flatMap(userModel -> Mono.just(userRepository.add(userModel))),
UserModel.class);
}
public Mono<ServerResponse> update(ServerRequest request) {
/*request.bodyToMono(UserModel.class)
.flatMap(user -> Mono.justOrEmpty(userRepository.findById(user.getId()))
.then(ServerResponse.ok().body(Mono.just(userRepository.update(user)), UserModel.class))
// .switchIfEmpty(Mono.error(new NotFoundException(String.valueOf(user.getId())))))
.switchIfEmpty(ServerResponse.notFound().build()));*/
return request.bodyToMono(UserModel.class)
.flatMap(body -> Mono.justOrEmpty(userRepository.findById(body.getId())).flatMap(foundUser -> {
foundUser.setAge(body.getAge());
foundUser.setName(body.getName());
foundUser.setBirthday(body.getBirthday());
foundUser.setAddress(body.getAddress());
foundUser.setPhone(body.getPhone());
return Mono.just(foundUser);
}).flatMap(u -> ServerResponse.ok().body(fromObject(u))).switchIfEmpty(ServerResponse.notFound().build()));
}
public Mono<ServerResponse> deleteById(ServerRequest request) {
Long id = Long.valueOf(request.pathVariable("id"));
return Mono.justOrEmpty(userRepository.findById(id))
.switchIfEmpty(Mono.error(new RuntimeException(id + "not found!"))) // 控制台異常:RuntimeException: 30
.then(ServerResponse.ok().body(Mono.justOrEmpty(userRepository.deleteById(id)), UserModel.class));
}
public Mono<ServerResponse> deleteId(ServerRequest request) {
Long id = Long.valueOf(request.pathVariable("id"));
return Mono.justOrEmpty(userRepository.findById(id))
.flatMap(user -> ServerResponse.ok().body(Mono.just(userRepository.deleteById(id)), UserModel.class))
.switchIfEmpty(ServerResponse.notFound().build());
}
}
UserRouter.java
import static org.springframework.web.reactive.function.server.RequestPredicates.DELETE;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RequestPredicates.PUT;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
import static org.springframework.web.reactive.function.server.RequestPredicates.contentType;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
@Configuration
public class UserRouter {
@Autowired
private UserHandler userHandler;
@Bean
public RouterFunction<ServerResponse> routerFunction() {
return route(GET("/user/list").and(accept(MediaType.APPLICATION_JSON)), userHandler::list)
.andRoute(GET("/user/find/{id}").and(accept(MediaType.APPLICATION_JSON)), userHandler::findById)
.andRoute(POST("/user/add").and(contentType(MediaType.APPLICATION_JSON)), userHandler::add)
.andRoute(PUT("/user/update").and(contentType(MediaType.APPLICATION_JSON)), userHandler::update)
.andRoute(DELETE("/user/delete/{id}"), userHandler::deleteId);
}
}
3.6 git 地址
spring-boot/spring-boot-07-webflux/spring-boot-webflux-router
4.效果展示
啟動 SpringBootWebfluxRouterApplication.main 方法,在 spring-boot-webflux-router.http 訪問下列地址,觀察輸出信息是否符合預期。
查詢用戶列表
### GET /user/list
GET http://localhost:8080/user/list
Accept: application/json
根據id查詢用戶
### GET /user/find/{id}
GET http://localhost:8080/user/find/1
Accept: application/json
新增用戶
### POST /user/add
POST http://localhost:8080/user/add
Accept: application/json
Content-Type: application/json
{
"name": "wangwu",
"age": 25,
"birthday": "1995-06-23",
"address": "guangzhou",
"phone": "13777777777"
}
更新用戶(成功)
### PUT /user/update success
PUT http://localhost:8080/user/update
Accept: application/json
Content-Type: application/json
{
"id": 2,
"name": "lisi2",
"age": 32,
"birthday": "1988-03-23",
"address": "shanghai2",
"phone": "13888888882"
}
更新用戶(失敗)
### PUT /user/update fail // 404 Not Found (id)
PUT http://localhost:8080/user/update
Accept: application/json
Content-Type: application/json
{
"id": 222,
"name": "lisi2",
"age": 32,
"birthday": "1988-03-23",
"address": "shanghai2",
"phone": "13888888882"
}
刪除用戶
### DELETE /user/delete/{id}
DELETE http://localhost:8080/user/delete/3
Accept: application/json
5.源碼分析
5.1 Webflux Server 如何啟動?
使用 webflux 時,默認的 applicationContext 為 AnnotationConfigReactiveWebServerApplicationContext
protected ConfigurableApplicationContext createApplicationContext() {
Class<?> contextClass = this.applicationContextClass;
if (contextClass == null) {
try {
switch (this.webApplicationType) {
case SERVLET:
contextClass = Class.forName(DEFAULT_SERVLET_WEB_CONTEXT_CLASS);
break;
case REACTIVE:
contextClass = Class.forName(DEFAULT_REACTIVE_WEB_CONTEXT_CLASS);
break;
default:
contextClass = Class.forName(DEFAULT_CONTEXT_CLASS);
}
}
catch (ClassNotFoundException ex) {
throw new IllegalStateException(
"Unable create a default ApplicationContext, please specify an ApplicationContextClass", ex);
}
}
return (ConfigurableApplicationContext) BeanUtils.instantiateClass(contextClass);
}
所以在 AnnotationConfigReactiveWebServerApplicationContext 中進行初始化和啟動
5.2 Webflux 默認 Server 為何是 Netty?
先看下 spring-boot-starter-webflux 的依賴結構
在 spring-boot-autoconfigure/spring.factories 中有一個自動裝配的類 ReactiveWebServerFactoryAutoConfiguration
它的內容如下
@AutoConfigureOrder(Ordered.HIGHEST_PRECEDENCE)
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(ReactiveHttpInputMessage.class)
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
@EnableConfigurationProperties(ServerProperties.class)
@Import({ ReactiveWebServerFactoryAutoConfiguration.BeanPostProcessorsRegistrar.class,
ReactiveWebServerFactoryConfiguration.EmbeddedTomcat.class,
ReactiveWebServerFactoryConfiguration.EmbeddedJetty.class,
ReactiveWebServerFactoryConfiguration.EmbeddedUndertow.class,
ReactiveWebServerFactoryConfiguration.EmbeddedNetty.class })
public class ReactiveWebServerFactoryAutoConfiguration {
@Bean
public ReactiveWebServerFactoryCustomizer reactiveWebServerFactoryCustomizer(ServerProperties serverProperties) {
return new ReactiveWebServerFactoryCustomizer(serverProperties);
}
@Bean
@ConditionalOnClass(name = "org.apache.catalina.startup.Tomcat")
public TomcatReactiveWebServerFactoryCustomizer tomcatReactiveWebServerFactoryCustomizer(
ServerProperties serverProperties) {
return new TomcatReactiveWebServerFactoryCustomizer(serverProperties);
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(value = "server.forward-headers-strategy", havingValue = "framework")
public ForwardedHeaderTransformer forwardedHeaderTransformer() {
return new ForwardedHeaderTransformer();
}
/**
* Registers a {@link WebServerFactoryCustomizerBeanPostProcessor}. Registered via
* {@link ImportBeanDefinitionRegistrar} for early registration.
*/
public static class BeanPostProcessorsRegistrar implements ImportBeanDefinitionRegistrar, BeanFactoryAware {
private ConfigurableListableBeanFactory beanFactory;
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
if (beanFactory instanceof ConfigurableListableBeanFactory) {
this.beanFactory = (ConfigurableListableBeanFactory) beanFactory;
}
}
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
if (this.beanFactory == null) {
return;
}
registerSyntheticBeanIfMissing(registry, "webServerFactoryCustomizerBeanPostProcessor",
WebServerFactoryCustomizerBeanPostProcessor.class);
}
private void registerSyntheticBeanIfMissing(BeanDefinitionRegistry registry, String name, Class<?> beanClass) {
if (ObjectUtils.isEmpty(this.beanFactory.getBeanNamesForType(beanClass, true, false))) {
RootBeanDefinition beanDefinition = new RootBeanDefinition(beanClass);
beanDefinition.setSynthetic(true);
registry.registerBeanDefinition(name, beanDefinition);
}
}
}
}
它里面通過 @Import 分別引入了 EmbeddedTomcat、EmbeddedJetty、EmbeddedUndertow、EmbeddedNetty,它們都是 ReactiveWebServerFactoryConfiguration 的內部類
abstract class ReactiveWebServerFactoryConfiguration {
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(ReactiveWebServerFactory.class)
@ConditionalOnClass({ HttpServer.class })
static class EmbeddedNetty {
@Bean
@ConditionalOnMissingBean
ReactorResourceFactory reactorServerResourceFactory() {
return new ReactorResourceFactory();
}
@Bean
NettyReactiveWebServerFactory nettyReactiveWebServerFactory(ReactorResourceFactory resourceFactory,
ObjectProvider<NettyRouteProvider> routes, ObjectProvider<NettyServerCustomizer> serverCustomizers) {
NettyReactiveWebServerFactory serverFactory = new NettyReactiveWebServerFactory();
serverFactory.setResourceFactory(resourceFactory);
routes.orderedStream().forEach(serverFactory::addRouteProviders);
serverFactory.getServerCustomizers().addAll(serverCustomizers.orderedStream().collect(Collectors.toList()));
return serverFactory;
}
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(ReactiveWebServerFactory.class)
@ConditionalOnClass({ org.apache.catalina.startup.Tomcat.class })
static class EmbeddedTomcat {
@Bean
TomcatReactiveWebServerFactory tomcatReactiveWebServerFactory(
ObjectProvider<TomcatConnectorCustomizer> connectorCustomizers,
ObjectProvider<TomcatContextCustomizer> contextCustomizers,
ObjectProvider<TomcatProtocolHandlerCustomizer<?>> protocolHandlerCustomizers) {
TomcatReactiveWebServerFactory factory = new TomcatReactiveWebServerFactory();
factory.getTomcatConnectorCustomizers()
.addAll(connectorCustomizers.orderedStream().collect(Collectors.toList()));
factory.getTomcatContextCustomizers()
.addAll(contextCustomizers.orderedStream().collect(Collectors.toList()));
factory.getTomcatProtocolHandlerCustomizers()
.addAll(protocolHandlerCustomizers.orderedStream().collect(Collectors.toList()));
return factory;
}
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(ReactiveWebServerFactory.class)
@ConditionalOnClass({ org.eclipse.jetty.server.Server.class })
static class EmbeddedJetty {
@Bean
@ConditionalOnMissingBean
JettyResourceFactory jettyServerResourceFactory() {
return new JettyResourceFactory();
}
@Bean
JettyReactiveWebServerFactory jettyReactiveWebServerFactory(JettyResourceFactory resourceFactory,
ObjectProvider<JettyServerCustomizer> serverCustomizers) {
JettyReactiveWebServerFactory serverFactory = new JettyReactiveWebServerFactory();
serverFactory.getServerCustomizers().addAll(serverCustomizers.orderedStream().collect(Collectors.toList()));
serverFactory.setResourceFactory(resourceFactory);
return serverFactory;
}
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(ReactiveWebServerFactory.class)
@ConditionalOnClass({ Undertow.class })
static class EmbeddedUndertow {
@Bean
UndertowReactiveWebServerFactory undertowReactiveWebServerFactory(
ObjectProvider<UndertowBuilderCustomizer> builderCustomizers) {
UndertowReactiveWebServerFactory factory = new UndertowReactiveWebServerFactory();
factory.getBuilderCustomizers().addAll(builderCustomizers.orderedStream().collect(Collectors.toList()));
return factory;
}
}
}
它們生效的條件分別是:
- EmbeddedTomcat: @ConditionalOnClass -- Tomcat.class(apache)
- EmbeddedJetty: @ConditionalOnClass -- Server.class(jetty)
- EmbeddedUndertow: @ConditionalOnClass -- Undertow.class
- EmbeddedNetty: @ConditionalOnClass -- HttpServer.class
結合上面的依賴分析,spring-boot-starter-webflux 中依賴了 reactor-netty.jar,而 HttpServer.class 恰好在 reactor-netty.jar 包中,所以 netty 生效。最終使用 EmbeddedNetty 啟動服務。