Spring WebFlux是隨Spring 5推出的響應式Web框架。

一、服務端技術棧
1、從上圖縱向可看出,spring-webflux支持兩種開發模式:
(1)類似於Spring WebMVC的基於注解(@Controller、@RequestMapping)的開發模式;
(2)Java 8 lambda風格的函數式開發模式。
2、WebFlux是基於響應式流的,可以用來建立異步、非阻塞、事件驅動的服務。默認采用Reactor作為響應式流的實現庫,也提供對RxJava的支持。
3、由於響應式編程的特性,webFlux底層也需要支持異步的運行環境:
(1)Netty、Undertow;
(2)支持異步I/O的Servlet 3.1的容器,如Tomcat(8.0.23及以上)和Jetty(9.0.4及以上)。
4、WebFlux也支持響應式的WebSocket服務端開發
5、SpringBoot2.0以上版本整合了webFlux,直接通過Spring Initializ就可以很方便的創建WebFlux應用
二、響應式Http客戶端
Spring WebFlux提供了一個響應式的Http客戶端API WebClient。它可以用函數式的方式異步非阻塞地發起Http請求並處理響應,底層也是由Netty提供的異步
支持。WebClient與RestTemplate作對比,前者的優勢:
(1)是非阻塞的,可以基於少量線程處理更高並發;
(2)可以使用Java 8 lambda表達式;
(3)支持異步的同時也可以支持同步的使用方式;
(4)可以通過數據流的方式與服務端進行雙向通信。
為訪問WebSocket,Spring WebFlux也提供了響應式的WebSocket客戶端API WebSocketClient。
三、響應式Spring Data
開發基於響應式流的應用,就像是搭建數據流流動的管道,從而異步的數據能夠順暢流過每個環節。大多數系統免不了與數據庫交互,所以我們也需要響應
式的持久層API和支持異步的數據庫驅動。一條管道,如果任何一個環節發生阻塞,可能造成整體吞吐量的下降。
各個數據庫都開始陸續推出異步驅動,目前支持的可以進行響應式數據訪問的數據庫有MongoDB、Redis、Apache Cassandra和CouchDB。
四、WebFlux性能測試(和SpringMVC對比)
測試內容:分別基於WebMVC和WebFlux創建兩個項目:mvc-with-latency和webFlux-with-latency,來對比觀察異步非阻塞能帶來多大的性能提升,我們通
過sleep和delayElement來模擬一個簡單的帶有延遲的場景,然后啟動服務使用gatling進行測試並且分析。
1、首先測試mvc-with-latency,測試數據如下(Tomcat最大線程數200,延遲100ms):


由以上數據可知:
(1)用戶量在接近3000的時候,線程數達到默認的最大值200;
(2)線程數達到200之前,95%的請求響應時長是正常的(比100ms多一點點),之后呈直線上升的態勢;
(3)線程數達到200后,吞吐量增幅逐漸放緩。
2.最高200的線程數是Tomcat的默認設置,我們將其設置為400再次測試,測試結果如下:


由於工作線程數擴大一倍,因此請求排隊的情況緩解一半。但是增加線程是有成本的,更多的線程意味着更多的內存、線程上下文切換成本更高。
3.然后測試webFlux-with-latency,測試結果如下:

(1)這里沒有統計線程數量,因為對於運行在異步I/O的netty上的webFlux應用來說,其工作線程數量始終維持在一個固定的數量,通常這個固定的數量
等於CPU核數(reactor-http-nio-x和parallel-x的線程,四核八線程的i7的x為1-8),因為異步非阻塞條件下,程序邏輯是由事件驅動的,並不需要多
線程並發。
(2)隨着用戶數的增多,吞吐量基本呈線性增多的趨勢;
(3)95%的響應都在100ms+的可控范圍內返回了,並未出現延時的情況。
可見,非阻塞的處理方式規避了線程排隊等待的情況,從而可以用少量而固定的線程處理應對大量請求的處理。
此外,還直接測試了20000用戶的情況:
(1)對mvc-with-latency的測試由於出現了許多的請求fail而以失敗告終;
(2)webFlux-with-latency應對20000用戶,吞吐量達到7228 req/sec,95%響應時長僅117ms。
mvc-with-latency(200線程)與webFlux-with-latency對比圖:


五、WebClient和RestTemplate性能對比:
測試內容:創建兩個服務A的項目:restTemplate-as-caller和webClient-as-caller。提供同樣的url,通過http請求這個url,返回的數據作為自己的響應。區別在
於:restTemplate-as-caller使用一個基於http連接池構造的RestTemplate作為Http客戶端,webClient-as-caller使用WebClient作為Http客戶端。
1、首先用RestTemplate直接測試一下6000用戶,測試結果:吞吐量為1651 req/sec,95%響應時長為1622ms,和上面測試的mvc-with-latency差不多,可見
RestTemplate是會阻塞的。
2、利用elastic的調度器將阻塞的調用轉化為異步非阻塞的,再次測試RestTemplate,發現結果有了明顯改善,測試結果:吞吐量2169 req/sec,95%響應時
長為121ms。但是,使用schedulers.elastic()其實就相當於將每一次阻塞的RestTemplate調用調度到不同線程里去執行,因為不僅有處理請求的200個線程
,還有elastic給分配的工作線程,所以總線程數量達到了1000多個。不過在生產環境中,我們通常不會直接使用彈性線程池,而是使用線程數量可控的線
程池,RestTemplate用完所有的線程后,依然會造成排隊的情況。
3、接下來我們創建一個有最大400個線程的線程池,然后調度到這個線程池上,測試結果:吞吐量2169 req/sec,與彈性線程池的相同,95%響應時長為
236ms,雖然達不到彈性線程池的效果,但是比完全同步阻塞的方式(1中的方式)要好多了。
4、最后測試一下非阻塞的WebClient,跑一下6000用戶的測試,測試結果:吞吐量2195 req/sec,95響應時長109ms。值得注意的是,WebClient不需要大量
並發的線程就可以輕松的完成了。
總結:WebClient能夠以少量而固定的線程數處理高並發的http請求,在基於http的服務間通信方面,可以取代RestTemplate以及AsyncRestTemplate。
六、基於WebFlux的高性能REST API網關設計
API網關可以對外部系統提供統一的服務訪問入口,對請求進行鑒權、限流等訪問控制處理,通過后將請求路由轉發給后端服務。在高並發和潛在的高延遲場
景下,API網關要實現高性能高吞吐量的一個基本要求是全鏈路異步,不要阻塞線程。支持異步非阻塞的WebFlux就滿足上述條件。
基於WebFlux的網關主要組件是WebFilter過濾器(WebFlux中的WebFilter和Servlet Filter是兩回事,雖然概念類似,但其底層的運行是基於netty,不是
tomcat,雖然也可適配tomcat或jetty這些傳統容器。WebFlux不用Servlet那一套編程模型,如HttpServletRequest、HttpServletResponse,而是用的如
ServerWebExchange、ServerHttpRequest等),多個過濾器分別實現認證授權、限流、請求路由轉發、正常響應回寫(把后端服務的響應回寫給調用網關的
前端系統)等。
參考架構

服務路由
后端服務的url前綴是固定的,可以在application.properties文件中來配置,例如:backend.service.url.prefix=http://127.0.0.1:8080;
同時在配置文件當中可以配置服務超時時間,如backend.service.timeout.inmillis=10000;網關url前綴,如gateway.url.prefix=/api.gateway.demo
然后寫個控制器獲取上面自定義參數做一些判斷處理,forward到相應的url,然后將正常響應或者異常響應回寫給調用方等。
這樣的話假設前端應用(服務消費者)用如下url調用網關(網關端口為9988)
http://api.gateway.demo:9988/orders/1234
如果請求過濾通過,網關最后會將請求路由轉發到http://127.0.0.1:8080/orders/1234。
application.properties參數設置:
#網關服務端口
server.port=9988
#網關url前綴
gateway.url.prefix=/api.gateway.demo
#調用后端服務的超時時間,單位毫秒。實際項目應該是根據具體的服務取對應的超時配置
backend.service.timeout.inmillis=10000
#后端服務endpoint,實際項目通常應該從注冊中心獲取服務和它對應的endpoint關系
backend.service.url.prefix=http://127.0.0.1:8080
鑒權、限流
寫兩個過濾器RequestAuthFilter和RateLimitFilter都繼承WebFilter,在過濾器中做一些業務處理判斷,分別用於鑒權和限流。
七、WebClient的基本使用
創建WebClient對象
1.使用create()創建WebClient的實例:
WebClient webClient = WebClient.create("https://api.github.com"); |
2.使用WebClient構建器創建WebClient
//使用WebClient構建器,可以自定義選項:包括過濾器、默認標題、cookie、客戶端連接器等
WebClient webClient = WebClient.builder()
.baseUrl("https://api.github.com")
.defaultHeader(HttpHeaders.CONTENT_TYPE, "application/vnd.github.v3+json")
.defaultHeader(HttpHeaders.USER_AGENT, "Spring 5 WebClient")
.build() |
WebClient發GET請求並用retriveve()檢索響應
//retrieve():異步獲取response信息;bodyToFlux():將response body解析為字符串
webClient.get()
.uri("/user/repos")
.header("Authorization", "Basic " + Base64Utils.
encodeToString((username + ":" + token).getBytes(Charset.forName("utf-8"))))
.retrieve()
.bodyToFlux(GithubRepo.class); |
使用exchange()方法檢索響應
//retrieve方法是獲取response信息的最簡單方法,如果想對response擁有更多控制權,可使用exchange訪問整個response標題和正文
//使用flatMap來將ClientResponse映射為Flux;
webClient.get()
.uri("/user/repos")
.header("Authorization", "Basic " + Base64Utils.
encodeToString((username + ":" + token).getBytes(Charset.forName("utf-8"))))
.exchange()
.flatMapMany(clientResponse -> clientResponse.bodyToFlux(GithubRepo.class)); |
在請求URI中使用參數
//uri()中使用參數,參數都被花括號包圍,分別傳遞值。在提出請求之前,這些參數將被WebClient自動替換
webClient.get()
.uri("/user/repos?sort={sortField}&direction={sortDirection}","updated","desc")
.header("Authorization", "Basic " + Base64Utils.
encodeToString((username + ":" + token).getBytes(Charset.forName("utf-8"))))
.retrieve()
.bodyToFlux(GithubRepo.class); |
使用URIBuilder構造請求URI
webClient.get()
.uri(uriBuilder -> uriBuilder.path("/user/repos")
.queryParam("sort","updated")
.queryParam("direction","desc")
.build())
.header("Authorization", "Basic " + Base64Utils.
encodeToString((username + ":" + token).getBytes(Charset.forName("utf-8"))))
.retrieve()
.bodyToFlux(GithubRepo.class); |
在WebClient請求中傳遞Request Body
1.如果有一個Mono或Flux請求體,那么可以直接傳遞給body()中,否則需要創建響應式類型傳遞
webClient.post()
.uri("/user/repos")
.body(Mono.just(createRepoRequest),RepoRequest.class)
.header("Authorization", "Basic " + Base64Utils.
encodeToString((username + ":" + token).getBytes(Charset.forName("utf-8"))))
.retrieve()
.bodyToMono(GithubRepo.class); |
2.如果具有實際值而不是Publisher(Flux/Mono),則可以使用syncBody()快捷方式傳遞請求正文
webClient.post()
.uri("/user/repos")
.syncBody(createRepoRequest)
.header("Authorization", "Basic " + Base64Utils.
encodeToString((username + ":" + token).getBytes(Charset.forName("utf-8"))))
.retrieve()
.bodyToMono(GithubRepo.class); |
3.也可以使用BodyInserters類提供的各種工廠方法來構造一個BodyInserter對象並將其傳遞給body()方法
webClient.post()
.uri("/user/repos")
.body(BodyInserters.fromObject(createRepoRequest))
.header("Authorization", "Basic " + Base64Utils.
encodeToString((username + ":" + token).getBytes(Charset.forName("utf-8"))))
.retrieve()
.bodyToMono(GithubRepo.class); |
處理WebClient錯誤
只要接收到狀態碼為4xx或5xx的響應retrieve(),WebClient中的方法WebClientResponseException就會拋出一個。可以使用onStatus方法自定義
public Flux<GithubRepo> listGithubRepositories(){
return webClient.get()
.uri("/user/repos?sort={sortField}&direction={sortDirection}","updated","desc")
.retrieve()
.onStatus(HttpStatus::is4xxClientError,clientResponse ->
Mono.error(new MyCustomClientException()))
.onStatus(HttpStatus::is5xxClientError,clientResponse ->
Mono.error(new MyCustomClientException()))
.bodyToFlux(GithubRepo.class);
} |
此外也可以用@ExceptionHandler在控制權內部使用這種方式來處理WebClientResponseException並返回適當的響應給客戶端
@ExceptionHandler(WebClientResponseException.class)
public ResponseEntity<String> handleException(WebClientResponseException ex){
return ResponseEntity.status(ex.getRawStatusCode()).body(ex.getResponseBodyAsString());
} |
