Reactive Spring實戰 -- WebFlux使用教程


WebFlux是Spring 5提供的響應式Web應用框架。
它是完全非阻塞的,可以在Netty,Undertow和Servlet 3.1+等非阻塞服務器上運行。
本文主要介紹WebFlux的使用。

FluxWeb vs noFluxWeb

WebFlux是完全非阻塞的。
在FluxWeb前,我們可以使用DeferredResult和AsyncRestTemplate等方式實現非阻塞的Web通信。
我們先來比較一下這兩者。

注意:關於同步阻塞與異步非阻塞的性能差異,本文不再闡述。
阻塞即浪費。我們通過異步實現非阻塞。只有存在阻塞時,異步才能提高性能。如果不存在阻塞,使用異步反而可能由於線程調度等開銷導致性能下降。

下面例子模擬一種業務場景。
訂單服務提供接口查找訂單信息,同時,該接口實現還需要調用倉庫服務查詢倉庫信息,商品服務查詢商品信息,並過濾,取前5個商品數據。

OrderService提供如下方法

public void getOrderByRest(DeferredResult<Order> rs, long orderId) {
    // [1]
    Order order = mockOrder(orderId);
    // [2]
    ListenableFuture<ResponseEntity<User>> userLister = asyncRestTemplate.getForEntity("http://user-service/user/mock/" + 1, User.class);
    ListenableFuture<ResponseEntity<List<Goods>>> goodsLister =
                    asyncRestTemplate.exchange("http://goods-service/goods/mock/list?ids=" + StringUtils.join(order.getGoodsIds(), ","),
                            HttpMethod.GET,  null, new ParameterizedTypeReference<List<Goods>>(){});
    // [3]
    CompletableFuture<ResponseEntity<User>> userFuture = userLister.completable().exceptionally(err -> {
        logger.warn("get user err", err);
        return new ResponseEntity(new User(), HttpStatus.OK);
    });
    CompletableFuture<ResponseEntity<List<Goods>>> goodsFuture = goodsLister.completable().exceptionally(err -> {
        logger.warn("get goods err", err);
        return new ResponseEntity(new ArrayList<>(), HttpStatus.OK);
    });
    // [4]
    warehouseFuture.thenCombineAsync(goodsFuture, (warehouseRes, goodsRes)-> {
            order.setWarehouse(warehouseRes.getBody());
            List<Goods> goods = goodsRes.getBody().stream()
                    .filter(g -> g.getPrice() > 10).limit(5)
                    .collect(Collectors.toList());
            order.setGoods(goods);
        return order;
    }).whenCompleteAsync((o, err)-> {
        // [5]
        if(err != null) {
            logger.warn("err happen:", err);
        }
        rs.setResult(o);
    });
}
  1. 加載訂單數據,這里mack了一個數據。
  2. 通過asyncRestTemplate獲取倉庫,產品信息,得到ListenableFuture。
  3. 設置ListenableFuture異常處理,避免因為某個請求報錯導致接口失敗。
  4. 合並倉庫,產品請求結果,組裝訂單數據
  5. 通過DeferredResult設置接口返回數據。

可以看到,代碼較繁瑣,通過DeferredResult返回數據的方式也與我們同步接口通過方法返回值返回數據的方式大相徑庭。

這里實際存在兩處非阻塞

  1. 使用AsyncRestTemplate實現發送異步Http請求,也就是說通過其他線程調用倉庫服務和產品服務,並返回CompletableFuture,所以不阻塞getOrderByRest方法線程。
  2. DeferredResult負責異步返回Http響應。
    getOrderByRest方法中並不阻塞等待AsyncRestTemplate返回,而是直接返回,等到AsyncRestTemplate返回后通過回調函數設置DeferredResult的值將數據返回給Http,可對比以下阻塞等待的代碼
ResponseEntity<Warehouse> warehouseRes = warehouseFuture.get();
ResponseEntity<List<Goods>> goodsRes = goodsFuture.get();
order.setWarehouse(warehouseRes.getBody());
order.setGoods(goodsRes.getBody());
return order;

下面我們使用WebFlux實現。
pom引入依賴

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

服務啟動類OrderServiceReactive

@EnableDiscoveryClient
@SpringBootApplication
public class OrderServiceReactive
{
    public static void main( String[] args )
    {
        new SpringApplicationBuilder(
                OrderServiceReactive.class)
                .web(WebApplicationType.REACTIVE).run(args);
    }
}

WebApplicationType.REACTIVE啟動WebFlux。

OrderController實現如下

@GetMapping("/{id}")
public Mono<Order> getById(@PathVariable long id) {
    return service.getOrder(id);
}

注意返回一個Mono數據,Mono與Flux是Spring Reactor提供的異步數據流。
WebFlux中通常使用Mono,Flux作為數據輸入,輸出值。
當接口返回Mono,Flux,Spring知道這是一個異步請求結果。
關於Spring Reactor,可參考《理解Reactor的設計與實現

OrderService實現如下

public Mono<Order> getOrder(long orderId) {
    // [1]
    Mono<Order> orderMono = mockOrder(orderId);
    // [2]
    return orderMono.flatMap(o -> {
        // [3]
        Mono<User> userMono =  getMono("http://user-service/user/mock/" + o.getUserId(), User.class).onErrorReturn(new User());
        Flux<Goods> goodsFlux = getFlux("http://goods-service/goods/mock/list?ids=" +
                StringUtils.join(o.getGoodsIds(), ","), Goods.class)
                .filter(g -> g.getPrice() > 10)
                .take(5)
                .onErrorReturn(new Goods());
        // [4]
        return userMono.zipWith(goodsFlux.collectList(), (u, gs) -> {
            o.setUser(u);
            o.setGoods(gs);
            return o;
        });
    });
}

private <T> Mono<T> getMono(String url, Class<T> resType) {
    return webClient.get().uri(url).retrieve().bodyToMono(resType);
}

// getFlux
  1. 加載訂單數據,這里mock了一個Mono數據
  2. flatMap方法可以將Mono中的數據轉化類型,這里轉化后的結果還是Order。
  3. 獲取倉庫,產品數據。這里可以看到,對產品過濾,取前5個的操作可以直接添加到Flux 上。
  4. zipWith方法可以組合兩個Mono,並返回新的Mono類型,這里組合倉庫、產品數據,最后返回Mono
    可以看到,代碼整潔不少,並且接口返回Mono ,與我們在同步接口中直接數據的做法類似,不需要借助DeferredResult這樣的工具類。

我們通過WebClient發起異步請求,WebClient返回Mono結果,雖然它並不是真正的數據(它是一個數據發布者,等請求數據返回后,它才把數據送過來),但我們可以通過操作符方法對他添加邏輯,如過濾,排序,組合,就好像同步操作時已經拿到數據那樣。
而在AsyncRestTemplate,則所有的邏輯都要寫到回調函數中。

WebFlux是完全非阻塞的。
Mono、Flux的組合函數非常有用。
上面方法中先獲取訂單數據,再同時獲取倉庫,產品數據,
如果接口參數同時傳入了訂單id,倉庫id,產品id,我們也可以同時獲取這三個數據,再組裝起來

public Mono<Order> getOrder(long orderId, long warehouseId, List<Long> goodsIds) {
    Mono<Order> orderMono = mockOrderMono(orderId);

    return orderMono.zipWith(getMono("http://warehouse-service/warehouse/mock/" + warehouseId, Warehouse.class), (o,w) -> {
        o.setWarehouse(w);
        return o;
    }).zipWith(getFlux("http://goods-service/goods/mock/list?ids=" +
            StringUtils.join(goodsIds, ","), Goods.class)
            .filter(g -> g.getPrice() > 10).take(5).collectList(), (o, gs) -> {
        o.setGoods(gs);
        return o;
    });
}

如果我們需要串行獲取訂單,倉庫,商品這三個數據,實現如下

public Mono<Order> getOrderInLabel(long orderId) {
    Mono<Order> orderMono = mockOrderMono(orderId);

    return orderMono.zipWhen(o -> getMono("http://warehouse-service/warehouse/mock/" + o.getWarehouseId(), Warehouse.class), (o, w) -> {
        o.setWarehouse(w);
        return o;
    }).zipWhen(o -> getFlux("http://goods-service/goods/mock/list?ids=" +
                    StringUtils.join(o.getGoodsIds(), ",") + "&label=" + o.getWarehouse().getLabel() , Goods.class)
            .filter(g -> g.getPrice() > 10).take(5).collectList(), (o, gs) -> {
        o.setGoods(gs);
        return o;
    });
}

zipWith方法會同時請求待合並的兩個Mono數據,而zipWhen方法則會阻塞等待第一個Mono數據到達在請求第二個Mono數據。
orderMono.zipWhen(...).zipWhen(...),第一個zipWhen方法會阻塞等待orderMono數據返回再使用order數據構造新的Mono數據,第二個zipWhen方法也會等待前面zipWhen構建的Mono數據返回再構建新Mono,
所以在第二個zipWhen方法中,可以調用o.getWarehouse().getLabel(),因為第一個zipWhen已經獲取到倉庫信息。

下面說一個WebFlux的使用。
分為兩部分,WebFlux服務端與WebClient。

WebFlux服務端

底層容器切換

WebFlux默認使用Netty實現服務端異步通信,可以通過更換依賴包切換底層容器

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <exclusions>
    <exclusion>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-netty</artifactId>
    </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>

注解

WebFlux支持SpringMvc大部分的注解,如
映射:@Controller,@GetMapping,@PostMapping,@PutMapping,@DeleteMapping
參數綁定:@PatchMapping,@RequestParam,@RequestBody,@RequestHeader,@PathVariable,@RequestAttribute,@SessionAttribute
結果解析:@ResponseBody,@ModelAttribute
這些注解的使用方式與springMvc相同

命令式映射

WebFlux支持使用命令式編程指定映射關系

@Bean
public RouterFunction<ServerResponse> monoRouterFunction(InvoiceHandler invoiceHandler) {
    return route()
            .GET("/invoice/{orderId}",  accept(APPLICATION_JSON), invoiceHandler::get)
            .build();
}

調用"/invoice/{orderId}",請求會轉發到invoiceHandler#get方法

invoiceHandler#get方法實現如下

public Mono<ServerResponse> get(ServerRequest request) {
    Invoice invoice = new Invoice();
    invoice.setId(999L);
    invoice.setOrderId(Long.parseLong(request.pathVariable("orderId")));
    return ok().contentType(APPLICATION_JSON).body(Mono.just(invoice), Warehouse.class);
}

Filter

可以通過實現WebFilter接口添加過濾器

@Component
public class TokenCheckFilter implements WebFilter {
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        if(!exchange.getRequest().getHeaders().containsKey("token")) {
            ServerHttpResponse response =  exchange.getResponse();
            response.setStatusCode(HttpStatus.FORBIDDEN);
            response.getHeaders().setContentType(MediaType.APPLICATION_JSON);
            return response.writeWith(Mono.just(response.bufferFactory().wrap("{\"msg\":\"no token\"}".getBytes())));
        } else {
            exchange.getAttributes().put("auth", "true");
            return chain.filter(exchange);
        }
    }
}

上面實現的是前置過濾器,在調用邏輯方法前的檢查請求token

實現后置過濾器代碼如下

@Component
public class LogFilter  implements WebFilter {
    private static final Logger logger = LoggerFactory.getLogger(LogFilter.class);
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        // [1]
        logger.info("request before, url:{}, statusCode:{}", exchange.getRequest().getURI(), exchange.getResponse().getStatusCode());
        return chain.filter(exchange)
            .doFinally(s -> {
                // [2]
                logger.info("request after, url:{}, statusCode:{}", exchange.getRequest().getURI(), exchange.getResponse().getStatusCode());
            });
    }
}

注意,[1]處exchange.getResponse()返回的是初始化狀態的response,並不是請求處理后返回的response。

異常處理

通過@ExceptionHandler注解定義一個全局的異常處理器

@ControllerAdvice
public class ErrorController {
    private static final Logger logger = LoggerFactory.getLogger(ErrorController.class);

    @ResponseBody
    @ExceptionHandler({NullPointerException.class})
    @ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
    public String nullException(NullPointerException e) {
        logger.error("global err handler", e);
        return "{\"msg\":\"There is a problem\"}";
    }
}

WebFluxConfigurer

WebFlux中可以通過WebFluxConfigurer做自定義配置,如配置自定義的結果解析

@Configuration
@EnableWebFlux
public class WebConfig implements WebFluxConfigurer {
    public void configureArgumentResolvers(ArgumentResolverConfigurer configurer) {
        configurer.addCustomResolver(new HandlerMethodArgumentResolver() {
            ...
        });
    }

    public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
        configurer.customCodecs().register(new HttpMessageWriter() {
            ...
        });
    }
}

configureArgumentResolvers方法配置參數綁定處理器
configureHttpMessageCodecs方法配置Http請求報文,響應報文解析器

@EnableWebFlux要求Spring從WebFluxConfigurationSupport引入Spring WebFlux 配置。如果你的依賴中引入了spring-boot-starter-webflux,Spring WebFlux 將自動配置,不需要添加該注解。
但如果你只使用Spring WebFlux而沒有使用Spring Boot,這是需要添加@EnableWebFlux啟動Spring WebFlux自動化配置。

Spring Flux支持CORS,Spring Security,HTTP/2,更多內容不再列出,請參考官方文檔。

WebClient

WebClient可以發送異步Web請求,並支持響應式編程。
下面說一個WebClient的使用。

底層框架

WebClient底層使用的Netty實現異步Http請求,我們可以切換底層庫,如Jetty

@Bean
public JettyResourceFactory resourceFactory() {
    return new JettyResourceFactory();
}

@Bean
public WebClient webClient() {
    HttpClient httpClient = HttpClient.create();
    ClientHttpConnector connector =
            new JettyClientHttpConnector(httpClient, resourceFactory());
    return WebClient.builder().clientConnector(connector).build();
}

連接池

WebClient默認是每個請求創建一個連接。
我們可以配置連接池復用連接,以提高性能。

ConnectionProvider provider = ConnectionProvider.builder("order")
    .maxConnections(100)
    .maxIdleTime(Duration.ofSeconds(30))
    .pendingAcquireTimeout(Duration.ofMillis(100))  
    .build();
return WebClient
    .builder().clientConnector(new ReactorClientHttpConnector(HttpClient.create(provider)));

maxConnections:允許的最大連接數
pendingAcquireTimeout:沒有連接可用時,請求等待的最長時間
maxIdleTime:連接最大閑置時間

超時

底層使用Netty時,可以如下配置超時時間

import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;

HttpClient httpClient = HttpClient.create()
        .doOnConnected(conn -> conn
                .addHandlerLast(new ReadTimeoutHandler(10))
                .addHandlerLast(new WriteTimeoutHandler(10)));

或者直接使用responseTimeout

HttpClient httpClient = HttpClient.create()
        .responseTimeout(Duration.ofSeconds(2));
Post Json

WebClient可以發送json,form,文件等請求報文,
看一個最常用的Post Json請求

webClient.post().uri("http://localhost:9004/order/")
    .contentType(MediaType.APPLICATION_JSON)
    .body(Mono.just(order), Order.class)
    .retrieve().bodyToMono(String.class)

異常處理

可以在ResponseSpec中指定異常處理

private <T> Mono<T> getMono(String url, Class<T> resType) {
return webClient
    .get().uri(url).retrieve()
    .onStatus(HttpStatus::is5xxServerError, clientResponse -> {
        return Mono.error(...);
    })
    .onStatus(HttpStatus::is4xxClientError, clientResponse -> {
        return Mono.error(...);
    })
    .onStatus(HttpStatus::isError, clientResponse -> {
        return Mono.error(...);
    })
    .bodyToMono(resType)
}

也可以在HttpClient上配置

HttpClient httpClient = HttpClient.create()
        .doOnError((req, err) -> {
            log.error("err on request:{}", req.uri(), err);
        }, (res, err) -> {
            log.error("err on response:{}", res.uri(), err);
        })

同步返回結果

使用block方法可以阻塞線程,等待請求返回

private <T> T syncGetMono(String url, Class<T> resType) {
    return webClient
            .get().uri(url).retrieve()
            .bodyToMono(resType).block();
}

獲取響應信息

exchangeToMono可以獲取到響應的header,statusCode等信息

private <T> Mono<T> getMonoWithInfo(String url, Class<T> resType) {
    return webClient
            .get()
            .uri(url)
            .exchangeToMono(response -> {
                logger.info("request url:{},statusCode:{},headers:{}", url, response.statusCode(), response.headers());
                return response.bodyToMono(resType);
            });
}

注冊中心與Ribbon

經驗證,WebClient支持Eureka注冊中心與Ribbon轉發,使用方式與restTemplate相同。
不過@LoadBalanced需要添加在WebClient.Builder上

@Bean
@LoadBalanced
public WebClient.Builder loadBalancedWebClientBuilder() {
    return WebClient.builder();
}

官方文檔:https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html
文章完整代碼:https://gitee.com/binecy/bin-springreactive/tree/master/order-service

實際項目中,線程阻塞場景往往不只有Http請求阻塞,還有Mysql請求,Redis請求,Kafka請求等等導致的阻塞。從這些數據源中獲取數據時,大多數都是阻塞直到數據源返回數據。
而Reactive Spring強大在於,它也支持這些數據源的非阻塞響應式編程。
下一篇文章,我們來看一個如何實現Redis的非阻塞響應式編程。

如果您覺得本文不錯,歡迎關注我的微信公眾號,系列文章持續更新中。您的關注是我堅持的動力。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM