文章很長,建議收藏起來,慢慢讀! 瘋狂創客圈為小伙伴奉上以下珍貴的學習資源:
- 瘋狂創客圈 經典圖書 : 《Netty Zookeeper Redis 高並發實戰》 面試必備 + 大廠必備 + 漲薪必備
- 瘋狂創客圈 經典圖書 : 《SpringCloud、Nginx高並發核心編程》 面試必備 + 大廠必備 + 漲薪必備
- 資源寶庫: Java程序員必備 網盤資源大集合 價值>1000元 隨便取 GO->【博客園總入口 】
- 獨孤九劍:Netty靈魂實驗 : 本地 100W連接 高並發實驗,瞬間提升Java內力
- 最純粹的技術交流:和大廠 小伙伴、技術高手、架構師 進行 純粹的的技術問題交流、探討求助、問題圍觀學習
推薦2:史上最全 Java 面試題 21 個專題
| 史上最全 Java 面試題 21 個專題 | 阿里、京東、美團、頭條.... 隨意挑、橫着走!!! |
|---|---|
| 1: JVM面試題(史上最強、持續更新、吐血推薦) | https://www.cnblogs.com/crazymakercircle/p/14365820.html |
| 2:Java基礎面試題(史上最全、持續更新、吐血推薦) | https://www.cnblogs.com/crazymakercircle/p/14366081.html |
| 4:設計模式面試題 (史上最全、持續更新、吐血推薦) | https://www.cnblogs.com/crazymakercircle/p/14367101.html |
| 5:架構設計面試題 (史上最全、持續更新、吐血推薦) | https://www.cnblogs.com/crazymakercircle/p/14367907.html |
| 還有 21篇必刷、必刷 的面試題 | 更多 ....., 請參見【 瘋狂創客圈 高並發 總目錄 】 |
推薦3: 瘋狂創客圈 高質量 博文
| springCloud 高質量 博文 | |
|---|---|
| nacos 實戰(史上最全) | sentinel (史上最全+入門教程) |
| springcloud + webflux 高並發實戰 | Webflux(史上最全) |
| SpringCloud gateway (史上最全) | TCP/IP圖解 (史上最全) |
| 10分鍾看懂, Java NIO 底層原理 | Feign原理 (圖解) |
| 更多精彩博文 ..... | 請參見【 瘋狂創客圈 高並發 總目錄 】 |
前言
webmvc和webflux作為spring framework的兩個重要模塊,代表了兩個IO模型,阻塞式和非阻塞式的。
webmvc是基於servlet的阻塞式模型(一般稱為oio),一個請求到達服務器后會單獨分配一個線程去處理請求,如果請求包含IO操作,線程在IO操作結束之前一直處於阻塞等待狀態,這樣線程在等待IO操作結束的時間就浪費了。
webflux是基於reactor的非阻塞模型(一般稱為nio),同樣,請求到達服務器后也會分配一個線程去處理請求,如果請求包含IO操作,線程在IO操作結束之前不再是處於阻塞等待狀態,而是去處理其他事情,等到IO操作結束之后,再通知(得益於系統的機制)線程繼續處理請求。
這樣線程就有效地利用了IO操作所消耗的時間。
WebFlux 增刪改查完整實戰 demo
Dao層 (又稱 repository 層)
entity(又稱 PO對象)
新建User 對象 ,代碼如下:
package com.crazymaker.springcloud.reactive.user.info.entity;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
@Entity
@Table(name = "t_user")
public final class UserEntity extends User
{
@Id
@Column(name = "id")
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Override
public long getUserId()
{
return super.getUserId();
}
@Column(name = "name")
public String getName()
{
return super.getName();
}
}
Dao 實現類
@Repository 用於標注數據訪問組件,即 DAO 組件。實現代碼中使用名為 repository 的 Map 對象作為內存數據存儲,並對對象具體實現了具體業務邏輯。JpaUserRepositoryImpl 負責將 PO 持久層(數據操作)相關的封裝組織,完成新增、查詢、刪除等操作。
package com.crazymaker.springcloud.reactive.user.info.dao.impl;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import org.springframework.stereotype.Repository;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.Query;
import javax.transaction.Transactional;
import java.util.List;
@Repository
@Transactional
public class JpaUserRepositoryImpl
{
@PersistenceContext
private EntityManager entityManager;
public Long insert(final User user)
{
entityManager.persist(user);
return user.getUserId();
}
public void delete(final Long userId)
{
Query query = entityManager.createQuery("DELETE FROM UserEntity o WHERE o.userId = ?1");
query.setParameter(1, userId);
query.executeUpdate();
}
@SuppressWarnings("unchecked")
public List<User> selectAll()
{
return (List<User>) entityManager.createQuery("SELECT o FROM UserEntity o").getResultList();
}
@SuppressWarnings("unchecked")
public User selectOne(final Long userId)
{
Query query = entityManager.createQuery("SELECT o FROM UserEntity o WHERE o.userId = ?1");
query.setParameter(1, userId);
return (User) query.getSingleResult();
}
}
Service服務層
package com.crazymaker.springcloud.reactive.user.info.service.impl;
import com.crazymaker.springcloud.common.util.BeanUtil;
import com.crazymaker.springcloud.reactive.user.info.dao.impl.JpaUserRepositoryImpl;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import com.crazymaker.springcloud.reactive.user.info.entity.UserEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.List;
@Slf4j
@Service
@Transactional
public class JpaEntityServiceImpl
{
@Resource
private JpaUserRepositoryImpl userRepository;
@Transactional
//增加用戶
public User addUser(User dto)
{
User userEntity = new UserEntity();
userEntity.setUserId(dto.getUserId());
userEntity.setName(dto.getName());
userRepository.insert(userEntity);
BeanUtil.copyProperties(userEntity,dto);
return dto;
}
@Transactional
//刪除用戶
public User delUser(User dto)
{
userRepository.delete(dto.getUserId());
return dto;
}
//查詢全部用戶
public List<User> selectAllUser()
{
log.info("方法 selectAllUser 被調用了");
return userRepository.selectAll();
}
//查詢一個用戶
public User selectOne(final Long userId)
{
log.info("方法 selectOne 被調用了");
return userRepository.selectOne(userId);
}
}
Controller控制層
Spring Boot WebFlux也可以使用注解模式來進行API接口開發。
package com.crazymaker.springcloud.reactive.user.info.controller;
import com.crazymaker.springcloud.common.result.RestOut;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import com.crazymaker.springcloud.reactive.user.info.service.impl.JpaEntityServiceImpl;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
/**
* Mono 和 Flux 適用於兩個場景,即:
* Mono:實現發布者,並返回 0 或 1 個元素,即單對象。
* Flux:實現發布者,並返回 N 個元素,即 List 列表對象。
* 有人會問,這為啥不直接返回對象,比如返回 City/Long/List。
* 原因是,直接使用 Flux 和 Mono 是非阻塞寫法,相當於回調方式。
* 利用函數式可以減少了回調,因此會看不到相關接口。這恰恰是 WebFlux 的好處:集合了非阻塞 + 異步
*/
@Slf4j
@Api(value = "用戶信息、基礎學習DEMO", tags = {"用戶信息DEMO"})
@RestController
@RequestMapping("/api/user")
public class UserReactiveController
{
@ApiOperation(value = "回顯測試", notes = "提示接口使用者注意事項", httpMethod = "GET")
@RequestMapping(value = "/hello")
@ApiImplicitParams({
@ApiImplicitParam(paramType = "query", dataType="string",dataTypeClass = String.class, name = "name",value = "名稱", required = true)})
public Mono<RestOut<String>> hello(@RequestParam(name = "name") String name)
{
log.info("方法 hello 被調用了");
return Mono.just(RestOut.succeed("hello " + name));
}
@Resource
JpaEntityServiceImpl jpaEntityService;
@PostMapping("/add/v1")
@ApiOperation(value = "插入用戶" )
@ApiImplicitParams({
// @ApiImplicitParam(paramType = "body", dataType="java.lang.Long", name = "userId", required = false),
// @ApiImplicitParam(paramType = "body", dataType="用戶", name = "dto", required = true)
@ApiImplicitParam(paramType = "body",dataTypeClass = User.class, dataType="User", name = "dto", required = true),
})
// @ApiImplicitParam(paramType = "body", dataType="com.crazymaker.springcloud.reactive.user.info.dto.User", required = true)
public Mono<User> userAdd(@RequestBody User dto)
{
//命令式寫法
// jpaEntityService.delUser(dto);
//響應式寫法
return Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto)));
}
@PostMapping("/del/v1")
@ApiOperation(value = "響應式的刪除")
@ApiImplicitParams({
@ApiImplicitParam(paramType = "body", dataType="User",dataTypeClass = User.class,name = "dto", required = true),
})
public Mono<User> userDel(@RequestBody User dto)
{
//命令式寫法
// jpaEntityService.delUser(dto);
//響應式寫法
return Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.delUser(dto)));
}
@PostMapping("/list/v1")
@ApiOperation(value = "查詢用戶")
public Flux<User> listAllUser()
{
log.info("方法 listAllUser 被調用了");
//命令式寫法 改為響應式 以下語句,需要在流中執行
// List<User> list = jpaEntityService.selectAllUser();
//響應式寫法
Flux<User> userFlux = Flux.fromIterable(jpaEntityService.selectAllUser());
return userFlux;
}
@PostMapping("/detail/v1")
@ApiOperation(value = "響應式的查看")
@ApiImplicitParams({
@ApiImplicitParam(paramType = "body", dataTypeClass = User.class,dataType="User", name = "dto", required = true),
})
public Mono<User> getUser(@RequestBody User dto)
{
log.info("方法 getUser 被調用了");
//構造流
Mono<User> userMono = Mono.justOrEmpty(jpaEntityService.selectOne(dto.getUserId()));
return userMono;
}
@PostMapping("/detail/v2")
@ApiOperation(value = "命令式的查看")
@ApiImplicitParams({
@ApiImplicitParam(paramType = "body", dataType="User",dataTypeClass = User.class, name = "dto", required = true),
}) public RestOut<User> getUserV2(@RequestBody User dto)
{
log.info("方法 getUserV2 被調用了");
User user = jpaEntityService.selectOne(dto.getUserId());
return RestOut.success(user);
}
}
從返回值可以看出,Mono 和 Flux 適用於兩個場景,即:
- Mono:實現發布者,並返回 0 或 1 個元素,即單對象
- Flux:實現發布者,並返回 N 個元素,即 List 列表對象
有人會問,這為啥不直接返回對象,比如返回 City/Long/List。原因是,直接使用 Flux 和 Mono 是非阻塞寫法,相當於回調方式。利用函數式可以減少了回調,因此會看不到相關接口。這恰恰是 WebFlux 的好處:集合了非阻塞 + 異步。
Mono
Mono 是什么? 官方描述如下:A Reactive Streams Publisher with basic rx operators that completes successfully by emitting an element, or with an error.
Mono 是響應流 Publisher 具有基礎 rx 操作符。可以成功發布元素或者錯誤。如圖所示:

file
Mono 常用的方法有:
- Mono.create():使用 MonoSink 來創建 Mono
- Mono.justOrEmpty():從一個 Optional 對象或 null 對象中創建 Mono。
- Mono.error():創建一個只包含錯誤消息的 Mono
- Mono.never():創建一個不包含任何消息通知的 Mono
- Mono.delay():在指定的延遲時間之后,創建一個 Mono,產生數字 0 作為唯一值
Flux
Flux 是什么? 官方描述如下:A Reactive Streams Publisher with rx operators that emits 0 to N elements, and then completes (successfully or with an error).
Flux 是響應流 Publisher 具有基礎 rx 操作符。可以成功發布 0 到 N 個元素或者錯誤。Flux 其實是 Mono 的一個補充。如圖所示:

file
所以要注意:如果知道 Publisher 是 0 或 1 個,則用 Mono。
Flux 最值得一提的是 fromIterable 方法。 fromIterable(Iterable<? extends T> it) 可以發布 Iterable 類型的元素。當然,Flux 也包含了基礎的操作:map、merge、concat、flatMap、take,這里就不展開介紹了。
使用配置模式進行WebFlux 接口開發
1 可以編寫一個處理器類 Handler代替 Controller , Service 、dao層保持不變。
2 配置請求的路由
處理器類 Handler
處理器類 Handler需要從請求解析參數,並且封裝響應,代碼如下:
package com.crazymaker.springcloud.reactive.user.info.config.handler;
import com.crazymaker.springcloud.common.exception.BusinessException;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import com.crazymaker.springcloud.reactive.user.info.service.impl.JpaEntityServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
import static org.springframework.http.MediaType.APPLICATION_JSON_UTF8;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
@Slf4j
@Component
public class UserReactiveHandler
{
@Resource
private JpaEntityServiceImpl jpaEntityService;
/**
* 得到所有用戶
*
* @param request
* @return
*/
public Mono<ServerResponse> getAllUser(ServerRequest request)
{
log.info("方法 getAllUser 被調用了");
return ok().contentType(APPLICATION_JSON_UTF8)
.body(Flux.fromIterable(jpaEntityService.selectAllUser()), User.class);
}
/**
* 創建用戶
*
* @param request
* @return
*/
public Mono<ServerResponse> createUser(ServerRequest request)
{
// 2.0.0 是可以工作, 但是2.0.1 下面這個模式是會報異常
Mono<User> user = request.bodyToMono(User.class);
/**Mono 使用響應式的,時候都是一個流,是一個發布者,任何時候都不能調用發布者的訂閱方法
也就是不能消費它, 最終的消費還是交給我們的Springboot來對它進行消費,任何時候不能調用它的
user.subscribe();
不能調用block
把異常放在統一的地方來處理
*/
return user.flatMap(dto ->
{
// 校驗代碼需要放在這里
if (StringUtils.isBlank(dto.getName()))
{
throw new BusinessException("用戶名不能為空");
}
return ok().contentType(APPLICATION_JSON_UTF8)
.body(Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto))), User.class);
});
}
/**
* 根據id刪除用戶
*
* @param request
* @return
*/
public Mono<ServerResponse> deleteUserById(ServerRequest request)
{
String id = request.pathVariable("id");
// 校驗代碼需要放在這里
if (StringUtils.isBlank(id))
{
throw new BusinessException("id不能為空");
}
User dto = new User();
dto.setUserId(Long.parseLong(id));
return ok().contentType(APPLICATION_JSON_UTF8)
.body(Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.delUser(dto))), User.class);
}
}
路由配置
package com.crazymaker.springcloud.reactive.user.info.config;
import com.crazymaker.springcloud.reactive.user.info.config.handler.UserReactiveHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.server.WebFilter;
import static org.springframework.web.reactive.function.server.RequestPredicates.DELETE;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
@Configuration
public class RoutersConfig
{
@Bean
RouterFunction<ServerResponse> routes(UserReactiveHandler handler)
{
// 下面的相當於類里面的 @RequestMapping
// 得到所有用戶
return RouterFunctions.route(GET("/user"), handler::getAllUser)
// 創建用戶
.andRoute(POST("/user").and(accept(MediaType.APPLICATION_JSON_UTF8)), handler::createUser)
// 刪除用戶
.andRoute(DELETE("/user/{id}"), handler::deleteUserById);
}
@Value("${server.servlet.context-path}")
private String contextPath;
//處理上下文路徑,沒有上下文路徑,此函數可以忽略
@Bean
public WebFilter contextPathWebFilter()
{
return (exchange, chain) ->
{
ServerHttpRequest request = exchange.getRequest();
String requestPath = request.getURI().getPath();
if (requestPath.startsWith(contextPath))
{
return chain.filter(
exchange.mutate()
.request(request.mutate().contextPath(contextPath).build())
.build());
}
return chain.filter(exchange);
};
}
}
集成Swagger
本文主要展示一下如何使用支持WebFlux的Swagger
maven依賴
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>${swagger.version}</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-spring-webflux</artifactId>
<version>${swagger.version}</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>${swagger.version}</version>
</dependency>
- swagger.version目前是3.0.0,Spring 5引入了WebFlux,而當前版本的SpringFox Swagger2(
2.9.2)還不支持WebFlux,得使用3.0.0才支持
swagger 配置
package com.crazymaker.springcloud.reactive.user.info.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.web.util.UriComponentsBuilder;
import springfox.documentation.PathProvider;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.paths.DefaultPathProvider;
import springfox.documentation.spring.web.paths.Paths;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2WebFlux;
@Configuration
@EnableSwagger2WebFlux
public class SwaggerConfig
{
@Bean
public Docket createRestApi()
{
// return new Docket(DocumentationType.OAS_30)
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.pathMapping(servletContextPath) //注意webflux沒有context-path配置,如果不加這句話的話,接口測試時路徑沒有前綴
.select()
.apis(RequestHandlerSelectors.basePackage("com.crazymaker.springcloud.reactive.user.info.controller"))
.paths(PathSelectors.any())
.build();
}
@Value("${server.servlet.context-path}")
private String servletContextPath;
//構建 api文檔的詳細信息函數
private ApiInfo apiInfo()
{
return new ApiInfoBuilder()
//頁面標題
.title("瘋狂創客圈 springcloud + Nginx 高並發核心編程")
//描述
.description("Zuul+Swagger2 構建 RESTful APIs")
//條款地址
.termsOfServiceUrl("https://www.cnblogs.com/crazymakercircle/")
.contact(new Contact("瘋狂創客圈", "https://www.cnblogs.com/crazymakercircle/", ""))
.version("1.0")
.build();
}
/**
* 重寫 PathProvider ,解決 context-path 重復問題
* @return
*/
@Order(Ordered.HIGHEST_PRECEDENCE)
@Bean
public PathProvider pathProvider() {
return new DefaultPathProvider() {
@Override
public String getOperationPath(String operationPath) {
operationPath = operationPath.replaceFirst(servletContextPath, "/");
UriComponentsBuilder uriComponentsBuilder = UriComponentsBuilder.fromPath("/");
return Paths.removeAdjacentForwardSlashes(uriComponentsBuilder.path(operationPath).build().toString());
}
@Override
public String getResourceListingPath(String groupName, String apiDeclaration) {
apiDeclaration = super.getResourceListingPath(groupName, apiDeclaration);
return apiDeclaration;
}
};
}
}
測試
配置模式的 WebFlux Rest接口測試
配置模式的 WebFlux Rest接口只能使用PostMan測試,例子如下:

注意,不能帶上下文路徑:
http://192.168.68.1:7705/uaa-react-provider/user
注解模式的WebFlux Rest接口測試
swagger 增加界面

CRUD其他的界面,略過
配置大全
靜態資源配置
@Configuration
@EnableWebFlux //使用注解@EnableWebFlux
public class WebFluxConfig implements WebFluxConfigurer { //繼承WebFluxConfigurer
//配置靜態資源
@Override
public void addResourceHandlers(ResourceHandlerRegistry registry) {
registry.addResourceHandler("/static/**")
.addResourceLocations("classpath:/static/");
registry.addResourceHandler("/file/**")
.addResourceLocations("file:" + System.getProperty("user.dir") + File.separator + "file" + File.separator);
registry.addResourceHandler("/swagger-ui.html**")
.addResourceLocations("classpath:/META-INF/resources/");
registry.addResourceHandler("/webjars/**")
.addResourceLocations("classpath:/META-INF/resources/webjars/");
}
//配置攔截器
//配置編解碼
...
}
WebFluxSecurity配置
@Configuration
@EnableWebSecurity
public class WebMvcSecurityConfig extends WebSecurityConfigurerAdapter implements
AuthenticationEntryPoint, //未驗證回調
AuthenticationSuccessHandler, //驗證成功回調
AuthenticationFailureHandler, //驗證失敗回調
LogoutSuccessHandler { //登出成功回調
@Override
public void commence(HttpServletRequest request, HttpServletResponse response, AuthenticationException authException) throws IOException, ServletException {
sendJson(response, new Response<>(HttpStatus.UNAUTHORIZED.value(), "Unauthorized"));
}
@Override
public void onAuthenticationFailure(HttpServletRequest request, HttpServletResponse response, AuthenticationException exception) throws IOException, ServletException {
sendJson(response, new Response<>(1, "Incorrect"));
}
@Override
public void onAuthenticationSuccess(HttpServletRequest request, HttpServletResponse response, Authentication authentication) throws IOException, ServletException {
sendJson(response, new Response<>(0, authentication.getClass().getSimpleName()));
}
@Override
public void onLogoutSuccess(HttpServletRequest request, HttpServletResponse response, Authentication authentication) throws IOException, ServletException {
sendJson(response, new Response<>(0, "Success"));
}
@Override
protected void configure(HttpSecurity http) throws Exception {
http
.csrf()
.disable()
.authorizeRequests()
.antMatchers("/swagger*/**", "/webjars/**", "/v2/api-docs")
.permitAll()
.and()
.authorizeRequests()
.antMatchers("/static/**", "/file/**")
.permitAll()
.and()
.authorizeRequests()
.anyRequest()
.authenticated()
.and()
.logout()
.logoutUrl("/user/logout") //虛擬路徑,不是控制器定義的路徑
.logoutSuccessHandler(this)
.permitAll()
.and()
.exceptionHandling()
.authenticationEntryPoint(this)
.and()
.formLogin()
.usernameParameter("username")
.passwordParameter("password")
.loginProcessingUrl("/user/login") //虛擬路徑,不是控制器定義的路徑
.successForwardUrl("/user/login") //是控制器定義的路徑
.failureHandler(this)
.and()
.httpBasic()
.authenticationEntryPoint(this);
}
@Override
protected void configure(AuthenticationManagerBuilder auth) throws Exception {
auth.userDetailsService(userDetailService);
}
webflux-驗證依賴於用戶數據服務,需定義實現ReactiveUserDetailsService的Bean
@Configuration
@EnableWebFluxSecurity //使用注解@EnableWebFluxSecurity
public class WebFluxSecurityConfig implements
WebFilter, //攔截器
ServerLogoutSuccessHandler, //登出成功回調
ServerAuthenticationEntryPoint, //驗證入口
ServerAuthenticationFailureHandler, //驗證成功回調
ServerAuthenticationSuccessHandler { //驗證失敗回調
//實現接口的方法
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
//配置webflux的context-path
ServerHttpRequest request = exchange.getRequest();
if (request.getURI().getPath().startsWith(contextPath)) {
exchange = exchange.mutate().request(request.mutate().contextPath(contextPath).build()).build();
}
//把查詢參數轉移到FormData中,不然驗證過濾器(ServerFormLoginAuthenticationConverter)接受不到參數
if (exchange.getRequest().getMethod() == HttpMethod.POST && exchange.getRequest().getQueryParams().size() > 0) {
ServerWebExchange finalExchange = exchange;
ServerWebExchange realExchange = new Decorator(exchange) {
@Override
public Mono<MultiValueMap<String, String>> getFormData() {
return super.getFormData().map(new Function<MultiValueMap<String, String>, MultiValueMap<String, String>>() {
@Override
public MultiValueMap<String, String> apply(MultiValueMap<String, String> stringStringMultiValueMap) {
if (stringStringMultiValueMap.size() == 0) {
return finalExchange.getRequest().getQueryParams();
} else {
return stringStringMultiValueMap;
}
}
});
}
};
return chain.filter(realExchange);
}
return chain.filter(exchange);
}
@Override
public Mono<Void> onLogoutSuccess(WebFilterExchange webFilterExchange, Authentication authentication) {
return sendJson(webFilterExchange.getExchange(), new Response<>("登出成功"));
}
@Override
public Mono<Void> commence(ServerWebExchange exchange, AuthenticationException e) {
return sendJson(exchange, new Response<>(HttpStatus.UNAUTHORIZED.value(), "未驗證"));
}
@Override
public Mono<Void> onAuthenticationFailure(WebFilterExchange webFilterExchange, AuthenticationException exception) {
return sendJson(webFilterExchange.getExchange(), new Response<>(1, "驗證失敗"));
}
@Override
public Mono<Void> onAuthenticationSuccess(WebFilterExchange webFilterExchange, Authentication authentication) {
return webFilterExchange.getChain().filter(
webFilterExchange.getExchange().mutate()
.request(t -> t.method(HttpMethod.POST).path("/user/login")) //轉發到自定義控制器
.build()
);
}
@Bean
public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {
http.addFilterAfter(this, SecurityWebFiltersOrder.FIRST)
.csrf().disable()
.authorizeExchange()
.pathMatchers("/swagger*/**", "/webjars/**", "/v2/api-docs") //swagger
.permitAll()
.and()
.authorizeExchange()
.pathMatchers("/static/**", "/file/**") //靜態資源
.permitAll()
.and()
.authorizeExchange()
.anyExchange()
.authenticated()
.and()
.logout() //登出
.logoutUrl("/user/logout")
.logoutSuccessHandler(this)
.and()
.exceptionHandling() //未驗證回調
.authenticationEntryPoint(this)
.and()
.formLogin()
.loginPage("/user/login")
.authenticationFailureHandler(this) //驗證失敗回調
.authenticationSuccessHandler(this) //驗證成功回調
.and()
.httpBasic()
.authenticationEntryPoint(this); //basic驗證,一般用於移動端
return http.build();
}
}
WebSession配置
@Configuration
@EnableRedisWebSession(maxInactiveIntervalInSeconds = 60) //使用注解@EnableRedisWebSession ,maxInactiveIntervalInSeconds設置數據過期時間,spring.session.timeout不管用
public class RedisWebSessionConfig { //考慮到分布式系統,一般使用redis存儲session
@Bean
public LettuceConnectionFactory lettuceConnectionFactory() {
return new LettuceConnectionFactory();
}
}
//單點登錄使用ReactiveRedisSessionRepository.getSessionRedisOperations().scan方法查詢相同用戶名的session,刪除其他session即可
public Mono<Map<String, String>> findByPrincipalName(String name) {
return reactiveSessionRepository.getSessionRedisOperations().scan(ScanOptions.scanOptions().match(ReactiveRedisSessionRepository.DEFAULT_NAMESPACE + ":sessions:*").build())
.flatMap(new Function<String, Publisher<Tuple2<String, Map.Entry<Object, Object>>>>() {
@Override
public Publisher<Tuple2<String, Map.Entry<Object, Object>>> apply(String s) {
return reactiveSessionRepository.getSessionRedisOperations().opsForHash().entries(s)
.map(new Function<Map.Entry<Object, Object>, Tuple2<String, Map.Entry<Object, Object>>>() {
@Override
public Tuple2<String, Map.Entry<Object, Object>> apply(Map.Entry<Object, Object> objectObjectEntry) {
return Tuples.of(s, objectObjectEntry);
}
});
}
})
.filter(new Predicate<Tuple2<String, Map.Entry<Object, Object>>>() {
@Override
public boolean test(Tuple2<String, Map.Entry<Object, Object>> rule) {
Map.Entry<Object, Object> t = rule.getT2();
String key = "sessionAttr:" + HttpSessionSecurityContextRepository.SPRING_SECURITY_CONTEXT_KEY;
if (key.equals(t.getKey())) {
User sci = (User) ((SecurityContextImpl) t.getValue()).getAuthentication().getPrincipal();
return sci.getUsername().equals(name);
}
return false;
}
})
.collectMap(new Function<Tuple2<String, Map.Entry<Object, Object>>, String>() {
@Override
public String apply(Tuple2<String, Map.Entry<Object, Object>> rule) {
return name;
}
}, new Function<Tuple2<String, Map.Entry<Object, Object>>, String>() {
@Override
public String apply(Tuple2<String, Map.Entry<Object, Object>> rule) {
return rule.getT1().replace(ReactiveRedisSessionRepository.DEFAULT_NAMESPACE + ":sessions:", "");
}
});
}
對標的 SpringWebMVC配置
@Configuration
@EnableRedisHttpSession //使用注解@EnableRedisHttpSession
public class RedisHttpSessionConfig { //考慮到分布式系統,一般使用redis存儲session
@Bean
public LettuceConnectionFactory redisConnectionFactory() {
return new LettuceConnectionFactory();
}
}
//單點登錄使用FindByIndexNameSessionRepository根據用戶名查詢session,刪除其他session即可
Map<String, Session> map = findByIndexNameSessionRepository.findByPrincipalName(name);
文件上傳配置
//參數上傳
//定義參數bean
@Setter
@Getter
@ToString
@ApiModel
public class QueryBean{
@ApiModelProperty(value = "普通參數", required = false, example = "")
private String query;
@ApiModelProperty(value = "文件參數", required = false, example = "")
private FilePart image; //強調,webflux中使用FilePart作為接收文件的類型
}
//定義接口
@ApiOperation("一個接口")
@PostMapping("/path")
//這里需要使用@ApiImplicitParam顯示配置【文件參數】才能使swagger界面顯示上傳文件按鈕
@ApiImplicitParams({
@ApiImplicitParam(
paramType = "form", //表單參數
dataType = "__file", //最新版本使用__file表示文件,以前用的是file
name = "image", //和QueryBean里面的【文件參數image】同名
value = "文件") //注釋
})
public Mono<Response> bannerAddOrUpdate(QueryBean q) {
}
WebFlux 執行流程
userAdd方法代碼如下:
public Mono<User> userAdd(@RequestBody User dto)
{
//命令式寫法
// jpaEntityService.delUser(dto);
//響應式寫法
return Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto)));
}
由於返回的數據只有一個所以使用的是Mono作為返回數據,使用Mono類靜態create方法創建Mono對象,代碼如下:
public abstract class Mono<T> implements Publisher<T> {
static final BiPredicate EQUALS_BIPREDICATE = Object::equals;
public Mono() {
}
public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) {
return onAssembly(new MonoCreate(callback));
}
...
}
可以到create方法接收一個參數,參數是Consumer對象,通過callback可以看出,這里使用的是callback回調,下面看看Consumer接口的定義:
@FunctionalInterface
public interface Consumer<T> {
/**
* Performs this operation on the given argument.
*
* @param t the input argument
*/
void accept(T t);
/**
* Returns a composed {@code Consumer} that performs, in sequence, this
* operation followed by the {@code after} operation. If performing either
* operation throws an exception, it is relayed to the caller of the
* composed operation. If performing this operation throws an exception,
* the {@code after} operation will not be performed.
*
* @param after the operation to perform after this operation
* @return a composed {@code Consumer} that performs in sequence this
* operation followed by the {@code after} operation
* @throws NullPointerException if {@code after} is null
*/
default Consumer<T> andThen(Consumer<? super T> after) {
Objects.requireNonNull(after);
return (T t) -> { accept(t); after.accept(t); };
}
}
通過上面的代碼可以看出,有兩個方法,一個是默認的方法andThen,還有一個accept方法,
Mono.create()方法的參數需要一個實現類,實現Consumer接口;Mono.create方法的參數指向的實例對象, 就是要實現這個accept方法。
例子中,下面的lambda表達式,就是accept方法的實現,實參的類型為 Consumer<MonoSink
cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto))
來來來,重復看一下,create方法的實現:
public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) {
return onAssembly(new MonoCreate(callback));
}
在方法內部調用了onAssembly方法,參數是MonoCreate對象,然后我們看看MonoCreate類,代碼如下:
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package reactor.core.publisher;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Scannable.Attr;
import reactor.core.publisher.FluxCreate.SinkDisposable;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
final class MonoCreate<T> extends Mono<T> {
final Consumer<MonoSink<T>> callback;
MonoCreate(Consumer<MonoSink<T>> callback) {
this.callback = callback;
}
public void subscribe(CoreSubscriber<? super T> actual) {
MonoCreate.DefaultMonoSink<T> emitter = new MonoCreate.DefaultMonoSink(actual);
actual.onSubscribe(emitter);
try {
this.callback.accept(emitter);
} catch (Throwable var4) {
emitter.error(Operators.onOperatorError(var4, actual.currentContext()));
}
}
static final class DefaultMonoSink<T> extends AtomicBoolean implements MonoSink<T>, InnerProducer<T> {
final CoreSubscriber<? super T> actual;
volatile Disposable disposable;
static final AtomicReferenceFieldUpdater<MonoCreate.DefaultMonoSink, Disposable> DISPOSABLE = AtomicReferenceFieldUpdater.newUpdater(MonoCreate.DefaultMonoSink.class, Disposable.class, "disposable");
volatile int state;
static final AtomicIntegerFieldUpdater<MonoCreate.DefaultMonoSink> STATE = AtomicIntegerFieldUpdater.newUpdater(MonoCreate.DefaultMonoSink.class, "state");
volatile LongConsumer requestConsumer;
static final AtomicReferenceFieldUpdater<MonoCreate.DefaultMonoSink, LongConsumer> REQUEST_CONSUMER = AtomicReferenceFieldUpdater.newUpdater(MonoCreate.DefaultMonoSink.class, LongConsumer.class, "requestConsumer");
T value;
static final int NO_REQUEST_HAS_VALUE = 1;
static final int HAS_REQUEST_NO_VALUE = 2;
static final int HAS_REQUEST_HAS_VALUE = 3;
DefaultMonoSink(CoreSubscriber<? super T> actual) {
this.actual = actual;
}
public Context currentContext() {
return this.actual.currentContext();
}
@Nullable
public Object scanUnsafe(Attr key) {
if (key != Attr.TERMINATED) {
return key == Attr.CANCELLED ? OperatorDisposables.isDisposed(this.disposable) : super.scanUnsafe(key);
} else {
return this.state == 3 || this.state == 1;
}
}
public void success() {
if (STATE.getAndSet(this, 3) != 3) {
try {
this.actual.onComplete();
} finally {
this.disposeResource(false);
}
}
}
public void success(@Nullable T value) {
if (value == null) {
this.success();
} else {
int s;
do {
s = this.state;
if (s == 3 || s == 1) {
Operators.onNextDropped(value, this.actual.currentContext());
return;
}
if (s == 2) {
if (STATE.compareAndSet(this, s, 3)) {
try {
this.actual.onNext(value);
this.actual.onComplete();
} finally {
this.disposeResource(false);
}
}
return;
}
this.value = value;
} while(!STATE.compareAndSet(this, s, 1));
}
}
public void error(Throwable e) {
if (STATE.getAndSet(this, 3) != 3) {
try {
this.actual.onError(e);
} finally {
this.disposeResource(false);
}
} else {
Operators.onOperatorError(e, this.actual.currentContext());
}
}
public MonoSink<T> onRequest(LongConsumer consumer) {
Objects.requireNonNull(consumer, "onRequest");
if (!REQUEST_CONSUMER.compareAndSet(this, (Object)null, consumer)) {
throw new IllegalStateException("A consumer has already been assigned to consume requests");
} else {
return this;
}
}
public CoreSubscriber<? super T> actual() {
return this.actual;
}
public MonoSink<T> onCancel(Disposable d) {
Objects.requireNonNull(d, "onCancel");
SinkDisposable sd = new SinkDisposable((Disposable)null, d);
if (!DISPOSABLE.compareAndSet(this, (Object)null, sd)) {
Disposable c = this.disposable;
if (c instanceof SinkDisposable) {
SinkDisposable current = (SinkDisposable)c;
if (current.onCancel == null) {
current.onCancel = d;
} else {
d.dispose();
}
}
}
return this;
}
public MonoSink<T> onDispose(Disposable d) {
Objects.requireNonNull(d, "onDispose");
SinkDisposable sd = new SinkDisposable(d, (Disposable)null);
if (!DISPOSABLE.compareAndSet(this, (Object)null, sd)) {
Disposable c = this.disposable;
if (c instanceof SinkDisposable) {
SinkDisposable current = (SinkDisposable)c;
if (current.disposable == null) {
current.disposable = d;
} else {
d.dispose();
}
}
}
return this;
}
public void request(long n) {
if (Operators.validate(n)) {
LongConsumer consumer = this.requestConsumer;
if (consumer != null) {
consumer.accept(n);
}
int s;
do {
s = this.state;
if (s == 2 || s == 3) {
return;
}
if (s == 1) {
if (STATE.compareAndSet(this, s, 3)) {
try {
this.actual.onNext(this.value);
this.actual.onComplete();
} finally {
this.disposeResource(false);
}
}
return;
}
} while(!STATE.compareAndSet(this, s, 2));
}
}
public void cancel() {
if (STATE.getAndSet(this, 3) != 3) {
this.value = null;
this.disposeResource(true);
}
}
void disposeResource(boolean isCancel) {
Disposable d = this.disposable;
if (d != OperatorDisposables.DISPOSED) {
d = (Disposable)DISPOSABLE.getAndSet(this, OperatorDisposables.DISPOSED);
if (d != null && d != OperatorDisposables.DISPOSED) {
if (isCancel && d instanceof SinkDisposable) {
((SinkDisposable)d).cancel();
}
d.dispose();
}
}
}
}
}
上面的代碼比較多,我們主要關注下面兩個函數:
MonoCreate(Consumer<MonoSink<T>> callback) {
this.callback = callback;
}
public void subscribe(CoreSubscriber<? super T> actual) {
MonoCreate.DefaultMonoSink<T> emitter = new MonoCreate.DefaultMonoSink(actual);
actual.onSubscribe(emitter);
try {
this.callback.accept(emitter);
} catch (Throwable var4) {
emitter.error(Operators.onOperatorError(var4, actual.currentContext()));
}
}
通過上面的代碼可以看出,一個是構造器,參數是Consumer,里面進行操作保存了Consumer對象,然后在subscribe方法里面有一句代碼是this.callback.accept(emitter),就是在這里進行了接口的回調,回調Consumer的accept方法,這個方法是在調用Mono.create()方法的時候實現了。然后在細看subscribe方法,這里面有一個actual.onSubscribe方法,通過方法名可以知道,這里是訂閱了消息。webflux是基於reactor模型,基於事件消息和異步,這里也體現了一個異步。
Mono和Flux的其他用法可以參照上面的源碼流程自己看看,就不細說了。
