spring webFlux的認識


Spring  WebFlux是隨Spring 5推出的響應式Web框架。

一、服務端技術棧

1、從上圖縱向可看出,spring-webflux支持兩種開發模式:

        (1)類似於Spring WebMVC的基於注解(@Controller、@RequestMapping)的開發模式;

        (2)Java 8 lambda風格的函數式開發模式。

2、WebFlux是基於響應式流的,可以用來建立異步、非阻塞、事件驅動的服務。默認采用Reactor作為響應式流的實現庫,也提供對RxJava的支持。

3、由於響應式編程的特性,webFlux底層也需要支持異步的運行環境:

        (1)Netty、Undertow;

        (2)支持異步I/O的Servlet 3.1的容器,如Tomcat(8.0.23及以上)和Jetty(9.0.4及以上)。

4、WebFlux也支持響應式的WebSocket服務端開發

5、SpringBoot2.0以上版本整合了webFlux,直接通過Spring Initializ就可以很方便的創建WebFlux應用

二、響應式Http客戶端

Spring WebFlux提供了一個響應式的Http客戶端API  WebClient。它可以用函數式的方式異步非阻塞地發起Http請求並處理響應,底層也是由Netty提供的異步

支持。WebClient與RestTemplate作對比,前者的優勢:

        (1)是非阻塞的,可以基於少量線程處理更高並發;

        (2)可以使用Java 8 lambda表達式;

        (3)支持異步的同時也可以支持同步的使用方式;

        (4)可以通過數據流的方式與服務端進行雙向通信。

為訪問WebSocket,Spring WebFlux也提供了響應式的WebSocket客戶端API  WebSocketClient。

三、響應式Spring Data

開發基於響應式流的應用,就像是搭建數據流流動的管道,從而異步的數據能夠順暢流過每個環節。大多數系統免不了與數據庫交互,所以我們也需要響應

式的持久層API和支持異步的數據庫驅動。一條管道,如果任何一個環節發生阻塞,可能造成整體吞吐量的下降。

各個數據庫都開始陸續推出異步驅動,目前支持的可以進行響應式數據訪問的數據庫有MongoDB、Redis、Apache Cassandra和CouchDB。

四、WebFlux性能測試(和SpringMVC對比)

測試內容:分別基於WebMVC和WebFlux創建兩個項目:mvc-with-latency和webFlux-with-latency,來對比觀察異步非阻塞能帶來多大的性能提升,我們通

過sleep和delayElement來模擬一個簡單的帶有延遲的場景,然后啟動服務使用gatling進行測試並且分析。

 

1、首先測試mvc-with-latency,測試數據如下(Tomcat最大線程數200,延遲100ms):

由以上數據可知:

        (1)用戶量在接近3000的時候,線程數達到默認的最大值200;

        (2)線程數達到200之前,95%的請求響應時長是正常的(比100ms多一點點),之后呈直線上升的態勢;

        (3)線程數達到200后,吞吐量增幅逐漸放緩。

 

2.最高200的線程數是Tomcat的默認設置,我們將其設置為400再次測試,測試結果如下:

由於工作線程數擴大一倍,因此請求排隊的情況緩解一半。但是增加線程是有成本的,更多的線程意味着更多的內存、線程上下文切換成本更高。

 

3.然后測試webFlux-with-latency,測試結果如下:

(1)這里沒有統計線程數量,因為對於運行在異步I/O的netty上的webFlux應用來說,其工作線程數量始終維持在一個固定的數量,通常這個固定的數量

         等於CPU核數(reactor-http-nio-x和parallel-x的線程,四核八線程的i7的x為1-8),因為異步非阻塞條件下,程序邏輯是由事件驅動的,並不需要多

         線程並發。

(2)隨着用戶數的增多,吞吐量基本呈線性增多的趨勢;

(3)95%的響應都在100ms+的可控范圍內返回了,並未出現延時的情況。

可見,非阻塞的處理方式規避了線程排隊等待的情況,從而可以用少量而固定的線程處理應對大量請求的處理。

 

此外,還直接測試了20000用戶的情況:

(1)對mvc-with-latency的測試由於出現了許多的請求fail而以失敗告終;

(2)webFlux-with-latency應對20000用戶,吞吐量達到7228 req/sec,95%響應時長僅117ms。

 

mvc-with-latency(200線程)與webFlux-with-latency對比圖:

五、WebClient和RestTemplate性能對比:

測試內容:創建兩個服務A的項目:restTemplate-as-caller和webClient-as-caller。提供同樣的url,通過http請求這個url,返回的數據作為自己的響應。區別在

於:restTemplate-as-caller使用一個基於http連接池構造的RestTemplate作為Http客戶端,webClient-as-caller使用WebClient作為Http客戶端。

1、首先用RestTemplate直接測試一下6000用戶,測試結果:吞吐量為1651 req/sec,95%響應時長為1622ms,和上面測試的mvc-with-latency差不多,可見

      RestTemplate是會阻塞的。

2、利用elastic的調度器將阻塞的調用轉化為異步非阻塞的,再次測試RestTemplate,發現結果有了明顯改善,測試結果:吞吐量2169 req/sec,95%響應時

     長為121ms。但是,使用schedulers.elastic()其實就相當於將每一次阻塞的RestTemplate調用調度到不同線程里去執行,因為不僅有處理請求的200個線程

     ,還有elastic給分配的工作線程,所以總線程數量達到了1000多個。不過在生產環境中,我們通常不會直接使用彈性線程池,而是使用線程數量可控的線

     程池,RestTemplate用完所有的線程后,依然會造成排隊的情況。

3、接下來我們創建一個有最大400個線程的線程池,然后調度到這個線程池上,測試結果:吞吐量2169 req/sec,與彈性線程池的相同,95%響應時長為

     236ms,雖然達不到彈性線程池的效果,但是比完全同步阻塞的方式(1中的方式)要好多了。

4、最后測試一下非阻塞的WebClient,跑一下6000用戶的測試,測試結果:吞吐量2195 req/sec,95響應時長109ms。值得注意的是,WebClient不需要大量

     並發的線程就可以輕松的完成了。

總結:WebClient能夠以少量而固定的線程數處理高並發的http請求,在基於http的服務間通信方面,可以取代RestTemplate以及AsyncRestTemplate。

六、基於WebFlux的高性能REST API網關設計

API網關可以對外部系統提供統一的服務訪問入口,對請求進行鑒權、限流等訪問控制處理,通過后將請求路由轉發給后端服務。在高並發和潛在的高延遲場

景下,API網關要實現高性能高吞吐量的一個基本要求是全鏈路異步,不要阻塞線程。支持異步非阻塞的WebFlux就滿足上述條件。

基於WebFlux的網關主要組件是WebFilter過濾器(WebFlux中的WebFilter和Servlet Filter是兩回事,雖然概念類似,但其底層的運行是基於netty,不是

tomcat,雖然也可適配tomcat或jetty這些傳統容器。WebFlux不用Servlet那一套編程模型,如HttpServletRequest、HttpServletResponse,而是用的如

ServerWebExchange、ServerHttpRequest等),多個過濾器分別實現認證授權、限流、請求路由轉發、正常響應回寫(把后端服務的響應回寫給調用網關的

前端系統)等。

參考架構

服務路由

后端服務的url前綴是固定的,可以在application.properties文件中來配置,例如:backend.service.url.prefix=http://127.0.0.1:8080;

同時在配置文件當中可以配置服務超時時間,如backend.service.timeout.inmillis=10000;網關url前綴,如gateway.url.prefix=/api.gateway.demo

然后寫個控制器獲取上面自定義參數做一些判斷處理,forward到相應的url,然后將正常響應或者異常響應回寫給調用方等。

這樣的話假設前端應用(服務消費者)用如下url調用網關(網關端口為9988)

http://api.gateway.demo:9988/orders/1234

如果請求過濾通過,網關最后會將請求路由轉發到http://127.0.0.1:8080/orders/1234。

application.properties參數設置:

#網關服務端口

server.port=9988

#網關url前綴

gateway.url.prefix=/api.gateway.demo

#調用后端服務的超時時間,單位毫秒。實際項目應該是根據具體的服務取對應的超時配置

backend.service.timeout.inmillis=10000

#后端服務endpoint,實際項目通常應該從注冊中心獲取服務和它對應的endpoint關系

backend.service.url.prefix=http://127.0.0.1:8080

鑒權、限流

寫兩個過濾器RequestAuthFilter和RateLimitFilter都繼承WebFilter,在過濾器中做一些業務處理判斷,分別用於鑒權和限流。

七、WebClient的基本使用

創建WebClient對象

1.使用create()創建WebClient的實例:

WebClient webClient = WebClient.create("https://api.github.com");

2.使用WebClient構建器創建WebClient

//使用WebClient構建器,可以自定義選項:包括過濾器、默認標題、cookie、客戶端連接器等
WebClient webClient = WebClient.builder()
        .baseUrl("https://api.github.com")
        .defaultHeader(HttpHeaders.CONTENT_TYPE, "application/vnd.github.v3+json")
        .defaultHeader(HttpHeaders.USER_AGENT, "Spring 5 WebClient")
        .build()

WebClient發GET請求並用retriveve()檢索響應

//retrieve():異步獲取response信息;bodyToFlux():將response body解析為字符串
webClient.get()
        .uri("/user/repos")
        .header("Authorization", "Basic " + Base64Utils.
                encodeToString((username + ":" + token).getBytes(Charset.forName("utf-8"))))
        .retrieve()
        .bodyToFlux(GithubRepo.class);

使用exchange()方法檢索響應

//retrieve方法是獲取response信息的最簡單方法,如果想對response擁有更多控制權,可使用exchange訪問整個response標題和正文
//使用flatMap來將ClientResponse映射為Flux;
webClient.get()
        .uri("/user/repos")
        .header("Authorization", "Basic " + Base64Utils.
                encodeToString((username + ":" + token).getBytes(Charset.forName("utf-8"))))
        .exchange()
		.flatMapMany(clientResponse -> clientResponse.bodyToFlux(GithubRepo.class));

在請求URI中使用參數

//uri()中使用參數,參數都被花括號包圍,分別傳遞值。在提出請求之前,這些參數將被WebClient自動替換
webClient.get()
        .uri("/user/repos?sort={sortField}&direction={sortDirection}","updated","desc")
        .header("Authorization", "Basic " + Base64Utils.
                encodeToString((username + ":" + token).getBytes(Charset.forName("utf-8"))))
        .retrieve()
        .bodyToFlux(GithubRepo.class);

使用URIBuilder構造請求URI

webClient.get()
        .uri(uriBuilder -> uriBuilder.path("/user/repos")
								.queryParam("sort","updated")
								.queryParam("direction","desc")
								.build())
        .header("Authorization", "Basic " + Base64Utils.
                encodeToString((username + ":" + token).getBytes(Charset.forName("utf-8"))))
        .retrieve()
        .bodyToFlux(GithubRepo.class);

在WebClient請求中傳遞Request Body

1.如果有一個Mono或Flux請求體,那么可以直接傳遞給body()中,否則需要創建響應式類型傳遞

webClient.post()
        .uri("/user/repos")
		.body(Mono.just(createRepoRequest),RepoRequest.class)
 		.header("Authorization", "Basic " + Base64Utils.
                encodeToString((username + ":" + token).getBytes(Charset.forName("utf-8"))))
        .retrieve()
        .bodyToMono(GithubRepo.class);

2.如果具有實際值而不是Publisher(Flux/Mono),則可以使用syncBody()快捷方式傳遞請求正文

webClient.post()
        .uri("/user/repos")
		.syncBody(createRepoRequest)
 		.header("Authorization", "Basic " + Base64Utils.
                encodeToString((username + ":" + token).getBytes(Charset.forName("utf-8"))))
        .retrieve()
        .bodyToMono(GithubRepo.class);

3.也可以使用BodyInserters類提供的各種工廠方法來構造一個BodyInserter對象並將其傳遞給body()方法

webClient.post()
        .uri("/user/repos")
		.body(BodyInserters.fromObject(createRepoRequest))
 		.header("Authorization", "Basic " + Base64Utils.
                encodeToString((username + ":" + token).getBytes(Charset.forName("utf-8"))))
        .retrieve()
        .bodyToMono(GithubRepo.class);

處理WebClient錯誤

只要接收到狀態碼為4xx或5xx的響應retrieve(),WebClient中的方法WebClientResponseException就會拋出一個。可以使用onStatus方法自定義

public Flux<GithubRepo> listGithubRepositories(){
	return webClient.get()
        	.uri("/user/repos?sort={sortField}&direction={sortDirection}","updated","desc")
        	.retrieve()
			.onStatus(HttpStatus::is4xxClientError,clientResponse -> 
				Mono.error(new MyCustomClientException()))
			.onStatus(HttpStatus::is5xxClientError,clientResponse -> 
				Mono.error(new MyCustomClientException()))
 			.bodyToFlux(GithubRepo.class);
}

此外也可以用@ExceptionHandler在控制權內部使用這種方式來處理WebClientResponseException並返回適當的響應給客戶端

@ExceptionHandler(WebClientResponseException.class)
public ResponseEntity<String> handleException(WebClientResponseException ex){
	return ResponseEntity.status(ex.getRawStatusCode()).body(ex.getResponseBodyAsString());
}

 

 

           


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM