再學webflux


響應式編程(reactive programming)是一種基於數據流(data stream)和變化傳遞(propagation of change)的聲明式(declarative)的編程范式。

 

keywords:

異步非阻塞

數據流

 

流水線

算子 (計算邏輯)

聲明式 :聲明式比較適合基於流的處理方式

 

 

服務端技術棧

 

客戶端技術

Spring WebFlux也提供了一個響應式的Http客戶端API WebClient。它可以用函數式的方式異步非阻塞地發起Http請求並處理響應。其底層也是由Netty提供的異步支持。

我們可以把WebClient看做是響應式的RestTemplate,與后者相比,前者:

是非阻塞的,可以基於少量的線程處理更高的並發;
可以使用Java 8 lambda表達式;
支持異步的同時也可以支持同步的使用方式;
可以通過數據流的方式與服務端進行雙向通信。

整個技術棧從命令式的、同步阻塞的【spring-webmvc + servlet + Tomcat】變成了響應式的、異步非阻塞的【spring-webflux + Reactor + Netty】

 

 

WebFlux的函數式開發模式

HandlerFunction和RouterFunction

HandlerFunction相當於Controller中的具體處理方法,輸入為請求,輸出為裝在Mono中的響應:
Mono<T extends ServerResponse> handle(ServerRequest request);

RouterFunction,顧名思義,路由,相當於@RequestMapping,用來判斷什么樣的url映射到那個具體的HandlerFunction,輸入為請求,輸出為裝在Mono里邊的Handlerfunction:
Mono<HandlerFunction<T>> route(ServerRequest request);

 

 

服務器推送
SSE:服務端推送(Server Send Event),在客戶端發起一次請求后會保持該連接,服務器端基於該連接持續向客戶端發送數據,從HTML5開始加入。

public Mono<ServerResponse> sendTimePerSec(ServerRequest serverRequest) {
return ok().contentType(MediaType.TEXT_EVENT_STREAM).body( // 1
Flux.interval(Duration.ofSeconds(1)). // 2
map(l -> new SimpleDateFormat("HH:mm:ss").format(new Date())),
String.class);
}

 

 

響應式Spring Data
一切都是異步非阻塞的

 

 

概念 

 

響應式編程通常作為面向對象編程中的“觀察者模式”(Observer design pattern)的一種擴展。 響應式流(reactive streams)與“迭代子模式”(Iterator design pattern)也有相通之處, 因為其中也有 Iterable-Iterator 這樣的對應關系。主要的區別在於,Iterator 是基於 “拉取”(pull)方式的,而響應式流是基於“推送”(push)方式的。

使用 iterator 是一種“命令式”(imperative)編程范式,因為什么時候獲取下一個元素取決於開發者。在響應式流中,相對應的角色是“發布者 - 訂閱者”(Publisher-Subscriber),當有新的值到來的時候,反過來由發布者(Publisher) 通知訂閱者(Subscriber),這種“推送”模式是響應式的關鍵。此外,對推送來的數據的操作 是通過一種聲明式(declaratively)而不是命令式(imperatively)的方式表達的:開發者通過 描述“處理流程”來定義對數據流的處理邏輯。



和lamada

  • 函數響應式編程的重點在於“函數式”的語言特性,這個概念在二十年前就蓋棺定論了。
  • 響應式編程的重點在於“基於事件流”的異步編程范式,由不斷產生的數據/時間來推動邏輯的執行。

 (type1 arg1, type2 arg2...) -> { body }

函數式”編程范式的核心特點之一:函數是”一等公民”。 
所謂”一等公民”(first class),指的是函數與其他數據類型一樣,處於平等地位,可以賦值給其他變量,也可以作為參數,傳入另一個函數,或者作為別的函數的返回值。

 

@FunctionalInterface

函數式接口

 

Function,接受一個輸入參數,返回一個結果。參數與返回值的類型可以不同,我們之前的map方法內的lambda就是表示這個函數式接口的;
Consumer,接受一個輸入參數並且無返回的操作。比如我們針對數據流的每一個元素進行打印,就可以用基於Consumer的lambda;
Supplier,無需輸入參數,只返回結果。看接口名就知道是發揮了對象工廠的作用;
Predicate,接受一個輸入參數,返回一個布爾值結果。比如我們在對數據流中的元素進行篩選的時候,就可以用基於Predicate的lambda;


 

Flux與Mono

Publisher

Flux和Mono都可以發出三種“數據信號”:元素值、錯誤信號、完成信號

  • 首先,錯誤信號和完成信號都是終止信號,二者不可能同時共存;
  • 如果沒有發出任何一個元素值,而是直接發出完成/錯誤信號,表示這是一個空數據流;
  • 如果沒有錯誤信號和完成信號,那么就是一個無限數據流。

 

 

Subscriber

// 訂閱並定義對正常數據元素、錯誤信號和完成信號的處理,以及訂閱發生時的處理邏輯
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);

Flux.just(1, 2, 3, 4, 5, 6)僅僅聲明了這個數據流,此時數據元素並未發出,只有subscribe()方法調用的時候才會觸發數據流。所以,訂閱前什么都不會發生。

 

操作符 Operator

 

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html

 

調度器與線程模型
Scheduler

public void testSyncToAsync() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Mono.fromCallable(() -> getStringSync()) // 1
.subscribeOn(Schedulers.elastic()) // 2
.subscribe(System.out::println, null, countDownLatch::countDown);
countDownLatch.await(10, TimeUnit.SECONDS);
}

 

回壓
subscribe(Subscriber subscriber)

BaseSubscriber

 

 

ref

https://blog.csdn.net/get_set/article/details/79466657 重點推薦

https://blog.csdn.net/get_set/article/details/79480233


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM