springboot 使用webflux響應式開發教程(一)


什么是webFlux

左側是傳統的基於Servlet的Spring Web MVC框架,右側是5.0版本新引入的基於Reactive Streams的Spring WebFlux框架,從上到下依次是Router Functions,WebFlux,Reactive Streams三個新組件。

Router Functions: 對標@Controller,@RequestMapping等標准的Spring MVC注解,提供一套函數式風格的API,用於創建Router,Handler和Filter。
WebFlux: 核心組件,協調上下游各個組件提供響應式編程支持。
Reactive Streams: 一種支持背壓(Backpressure)的異步數據流處理標准,主流實現有RxJava和Reactor,Spring WebFlux默認集成的是Reactor。
在Web容器的選擇上,Spring WebFlux既支持像Tomcat,Jetty這樣的的傳統容器(前提是支持Servlet 3.1 Non-Blocking IO API),又支持像Netty,Undertow那樣的異步容器。不管是何種容器,Spring WebFlux都會將其輸入輸出流適配成Flux<DataBuffer>格式,以便進行統一處理。

值得一提的是,除了新的Router Functions接口,Spring WebFlux同時支持使用老的Spring MVC注解聲明Reactive Controller。和傳統的MVC Controller不同,Reactive Controller操作的是非阻塞的ServerHttpRequest和ServerHttpResponse,而不再是Spring MVC里的HttpServletRequest和HttpServletResponse。

    @GetMapping("/reactive/restaurants")
    public Flux<Restaurant> findAll() {
        return restaurantRepository.findAll();
    }

可以看到主要變化就是在 返回的類型上Flux<Restaurant>

Flux和Mono 是 Reactor 中的流數據類型,其中Flux會發送多次,Mono會發送0次或一次

使用webflux需要具備的基礎是Reactive programming 的理解。 
Reactor 的基礎 和 熟練的java8 lambda使用

創建springboot應用
下面通過創建股票報價的demo來演示。

通過 https://start.spring.io 或idea自帶功能創建springboot項目,groupId為io.spring.workshop,artifactId為 stock-quotes。

 

勾選 ReactiveWeb

 

修改 application.properties 配置文件,指定接口 8081

server.port=8081

啟動應用,成功后控制台輸出日志

 

日志顯示使用Netty而不是tomcat,后續會使用Tomcat

股票報價生成
定義實體

@Data
public class Quote {

    private static final MathContext MATH_CONTEXT = new MathContext(2);

    private String ticker;

    private BigDecimal price;

    private Instant instant;

    public Quote() {
    }

    public Quote(String ticker, BigDecimal price) {
        this.ticker = ticker;
        this.price = price;
    }

    public Quote(String ticker, Double price) {
        this(ticker, new BigDecimal(price, MATH_CONTEXT));
    }

    @Override
    public String toString() {
        return "Quote{" +
                "ticker='" + ticker + '\'' +
                ", price=" + price +
                ", instant=" + instant +
                '}';
    }
}

定義生成器

@Component
public class QuoteGenerator {

    private final MathContext mathContext = new MathContext(2);

    private final Random random = new Random();

    private final List<Quote> prices = new ArrayList<>();

    /**
     * 生成行情數據
     */
    public QuoteGenerator() {
        this.prices.add(new Quote("CTXS", 82.26));
        this.prices.add(new Quote("DELL", 63.74));
        this.prices.add(new Quote("GOOG", 847.24));
        this.prices.add(new Quote("MSFT", 65.11));
        this.prices.add(new Quote("ORCL", 45.71));
        this.prices.add(new Quote("RHT", 84.29));
        this.prices.add(new Quote("VMW", 92.21));
    }


    public Flux<Quote> fetchQuoteStream(Duration period) {

        // 需要周期生成值並返回,使用 Flux.interval
        return Flux.interval(period)
                // In case of back-pressure, drop events
                .onBackpressureDrop()
                // For each tick, generate a list of quotes
                .map(this::generateQuotes)
                // "flatten" that List<Quote> into a Flux<Quote>
                .flatMapIterable(quotes -> quotes)
                .log("io.spring.workshop.stockquotes");
    }

    /**
     * Create quotes for all tickers at a single instant.
     */
    private List<Quote> generateQuotes(long interval) {
        final Instant instant = Instant.now();
        return prices.stream()
                .map(baseQuote -> {
                    BigDecimal priceChange = baseQuote.getPrice()
                            .multiply(new BigDecimal(0.05 * this.random.nextDouble()), this.mathContext);
                    Quote result = new Quote(baseQuote.getTicker(), baseQuote.getPrice().add(priceChange));
                    result.setInstant(instant);
                    return result;
                })
                .collect(Collectors.toList());
    }
}

使用webflux創建web應用

webflux的使用有兩種方式,基於注解和函數式編程。這里使用函數式編程,先貼代碼:

創建QuoteHandler

@Component
public class QuoteHandler {

    private final Flux<Quote> quoteStream;

    public QuoteHandler(QuoteGenerator quoteGenerator) {
        this.quoteStream = quoteGenerator.fetchQuoteStream(ofMillis(1000)).share();
    }

    public Mono<ServerResponse> hello(ServerRequest request) {
        return ok().contentType(TEXT_PLAIN)
                .body(BodyInserters.fromObject("Hello Spring!"));
    }

    public Mono<ServerResponse> echo(ServerRequest request) {
        return ok().contentType(TEXT_PLAIN)
                .body(request.bodyToMono(String.class), String.class);
    }

    public Mono<ServerResponse> streamQuotes(ServerRequest request) {
        return ok()
                .contentType(APPLICATION_STREAM_JSON)
                .body(this.quoteStream, Quote.class);
    }

    public Mono<ServerResponse> fetchQuotes(ServerRequest request) {
        int size = Integer.parseInt(request.queryParam("size").orElse("10"));
        return ok()
                .contentType(APPLICATION_JSON)
                .body(this.quoteStream.take(size), Quote.class);
    }
}

創建Router

@Configuration
public class QuoteRouter {

   @Bean
   public RouterFunction<ServerResponse> route(QuoteHandler quoteHandler) {
      return RouterFunctions
            .route(GET("/hello").and(accept(TEXT_PLAIN)), quoteHandler::hello)
            .andRoute(POST("/echo").and(accept(TEXT_PLAIN).and(contentType(TEXT_PLAIN))), quoteHandler::echo)
            .andRoute(GET("/quotes").and(accept(APPLICATION_JSON)), quoteHandler::fetchQuotes)
            .andRoute(GET("/quotes").and(accept(APPLICATION_STREAM_JSON)), quoteHandler::streamQuotes);
   }
}

需要注意的是在springboot中Handler和Router都需要打上@Configuration。

HTTP請求交由Router轉發給對應的Handler,Handler處理請求,並返回Mono<ServerResponse>,這里的Router類似@RequestMapping,Handler類似Controller。這么理解非常容易。

運行項目,瀏覽器輸入 http://localhost:8081/hello 或者 使用curl,即可收到 "Hello Spring!"的文本信息。

到目前為止,一個簡單的webflux示例已經完成,但是還沒有體現出它與傳統模式有何不同。

下面我們來做一下測試:

$ curl http://localhost:8081/echo -i -d "WebFlux workshop" -H "Content-Type: text/plain"
HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: text/plain
 
WebFlux workshop

還是沒有區別T.T,看下一步。

$ curl http://localhost:8081/quotes -i -H "Accept: application/stream+json"
HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: application/stream+json
 
{"ticker":"CTXS","price":82.77,"instant":"2018-05-15T06:45:51.261Z"}
{"ticker":"DELL","price":64.83,"instant":"2018-05-15T06:45:51.261Z"}
{"ticker":"GOOG","price":881,"instant":"2018-05-15T06:45:51.261Z"}
{"ticker":"MSFT","price":67.3,"instant":"2018-05-15T06:45:51.261Z"}
{"ticker":"ORCL","price":48.1,"instant":"2018-05-15T06:45:51.261Z"}
{"ticker":"RHT","price":85.1,"instant":"2018-05-15T06:45:51.261Z"}
{"ticker":"VMW","price":92.24,"instant":"2018-05-15T06:45:51.261Z"}
-------------------------------無敵分割線-------------------------------------
{"ticker":"CTXS","price":85.7,"instant":"2018-05-15T06:45:52.260Z"}
{"ticker":"DELL","price":64.12,"instant":"2018-05-15T06:45:52.260Z"}
{"ticker":"GOOG","price":879,"instant":"2018-05-15T06:45:52.260Z"}
{"ticker":"MSFT","price":67.9,"instant":"2018-05-15T06:45:52.260Z"}
{"ticker":"ORCL","price":46.43,"instant":"2018-05-15T06:45:52.260Z"}
{"ticker":"RHT","price":86.8,"instant":"2018-05-15T06:45:52.260Z"}
...

上面的分割線是為了易於分辨人為加上去的,我們看到返回結果每隔一秒刷新一次,不終止的話會一直返回數據,傳統的Request/Response是一次請求,一次返回。

注意是設置了Header Accept: application/stream+json ,

如果將Header設置為 Accept: application/json ,只會得到一次Response。

寫測試
springboot的test模塊包含WebTestClient,可以用來對webflux服務端進行測試。

@RunWith(SpringRunner.class)
//  We create a `@SpringBootTest`, starting an actual server on a `RANDOM_PORT`
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class StockQuotesApplicationTests {

   // Spring Boot will create a `WebTestClient` for you,
   // already configure and ready to issue requests against "localhost:RANDOM_PORT"
   @Autowired
   private WebTestClient webTestClient;

   @Test
   public void fetchQuotes() {
      webTestClient
            // We then create a GET request to test an endpoint
            .get().uri("/quotes?size=20")
            .accept(MediaType.APPLICATION_JSON)
            .exchange()
            // and use the dedicated DSL to test assertions against the response
            .expectStatus().isOk()
            .expectHeader().contentType(MediaType.APPLICATION_JSON)
            .expectBodyList(Quote.class)
            .hasSize(20)
            // Here we check that all Quotes have a positive price value
            .consumeWith(allQuotes ->
                  assertThat(allQuotes.getResponseBody())
                        .allSatisfy(quote -> assertThat(quote.getPrice()).isPositive()));
   }

   @Test
   public void fetchQuotesAsStream() {
      List<Quote> result = webTestClient
            // We then create a GET request to test an endpoint
            .get().uri("/quotes")
            // this time, accepting "application/stream+json"
            .accept(MediaType.APPLICATION_STREAM_JSON)
            .exchange()
            // and use the dedicated DSL to test assertions against the response
            .expectStatus().isOk()
            .expectHeader().contentType(MediaType.APPLICATION_STREAM_JSON)
            .returnResult(Quote.class)
            .getResponseBody()
            .take(30)
            .collectList()
            .block();

      assertThat(result).allSatisfy(quote -> assertThat(quote.getPrice()).isPositive());
   }
}

參考文章:

https://docs.spring.io/spring-framework/docs/5.0.3.RELEASE/spring-framework-reference/web.html#web-reactive-server-functional
http://projectreactor.io/docs
https://www.ibm.com/developerworks/cn/java/spring5-webflux-reactive/index.html
https://blog.csdn.net/qq_34438958/article/details/78539234


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM