javaStream與響應式流


將Java Stream用於響應式編程中,是有局限性的。比如如下兩個需要面對的問題:

  1. Web 應用具有I/O密集的特點,I/O阻塞會帶來比較大的性能損失或資源浪費,我們需要一種異步非阻塞的響應式的庫,而Java Stream是一種同步API
  2. 假設我們要搭建從數據層到前端的一個變化傳遞管道,可能會遇到數據層每秒上千次的數據更新,而顯然不需要向前端傳遞每一次更新,這時候就需要一種流量控制能力,就像我們家里的水龍頭,可以控制開關流速,而Java Stream不具備完善的對數據流的流量控制的能力。

具備“異步非阻塞”特性和“流量控制”能力的數據流,我們稱之為響應式流(Reactive Stream)。

目前有幾個實現了響應式流規范的Java庫,這里簡單介紹兩個:RxJavaReactor

要介紹RxJava,就不得不提ReactiveX(Reactive Extensions,Rx),它最初是LINQ的一個擴展,由微軟的架構師Erik Meijer領導的團隊開發,在2012年11月開源,Rx是一個編程模型,目標是提供一致的編程接口,幫助開發者更方便的處理異步數據流,Rx庫支持.NET、JavaScript和C++,Rx近幾年越來越流行了,現在已經支持幾乎全部的流行編程語言了,包括RxJS、RxJava等。

后來,Java社區的一些大牛湊到一起制定了一個響應式流規范。RxJava團隊隨后對1版本進行了重構,形成了兼容該響應流規范的RxJava 2。

Reactor是Pivotal旗下的項目,與大名鼎鼎的Spring是兄弟關系,因此是Spring近期推出的響應式模塊WebFlux的“御用”響應式流。Reactor支持響應式流規范,與RxJava相比,它沒有任何歷史包袱,專注於Server端的響應式開發,而RxJava更多傾向於Android端的響應式開發。

在Java 9版本中,響應式流的規范被納入到了JDK中,相應的API接口是java.util.concurrent.Flow

Spring WebFlux也是本系列文章后邊的重點內容。由於WebFlux首選Reactor作為其響應式技術棧的一部分,我們下邊也主要以Reactor為主,目前的版本是Reactor3。

我們繼續回到主線,討論“異步非阻塞”和“流量控制”。注意,本節請不必關注Reactor的代碼細節,僅體會使用響應式流的“感覺”就好。

一、阻塞

對於阻塞造成的性能損失,我們通常有兩種思路來解決:

  1. 並行化:使用更多的線程和硬件資源;
  2. 異步化:基於現有的資源來提高執行效率。

解決方案之一:多線程

解決方案之二:非阻塞

1)非阻塞的回調

2)異步的CompletableFuture

Flux<String> ids = ifhrIds(); // <1>

Flux<String> combinations =
    ids.flatMap(id -> { // <2>
        Mono<String> nameTask = ifhrName(id); // <3>
        Mono<Integer> statTask = ifhrStat(id); // <4>

        return nameTask.zipWith(statTask, // <5>
                (name, stat) -> "Name " + name + " has stats " + stat);
    });

Mono<List<String>> result = combinations.collectList(); // <6>

List<String> results = result.block(); // <7>
assertThat(results).containsExactly( // <8>
    "Name NameJoe has stats 103",
    "Name NameBart has stats 104",
    "Name NameHenry has stats 105",
    "Name NameNicole has stats 106",
    "Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);

 

說明:

    1. 這一次,我們從一個異步方式提供的 ids 序列(Flux<String>)開始。
    2. 對於序列中的每一個元素,我們異步地處理它(flatMap 方法內)兩次。
    3. 獲取相應的 name。
    4. 獲取相應的 statistic.
    5. 異步地組合兩個值。
    6. 隨着序列中的元素值“到位”,它們收集一個 List 中。
    7. 在生成流的環節,我們可以繼續異步地操作 Flux 流,對其進行組合和訂閱(subscribe)。
      最終我們很可能得到一個 Mono 。由於是測試,我們阻塞住(block()),等待流處理過程結束,
      然后直接返回集合。
    8. Assert 結果。

這種非阻塞數據流的感覺,讓我想起來了《讓×××飛》里邊最經典的一段:姜文飾演的張麻子朝新來縣長那“馬拉的火車啪啪啪連續打了N槍,旁邊兄弟問“打中沒有”,張麻子說“讓×××飛一會兒~”,稍后就見拉火車的馬韁繩全都被×××打斷,馬匹四散,非常6+1!如果張麻子每打一槍都看看前一槍有沒有射中的話,還怎么裝X呢?

通過上邊的例子可見,回調或 CompletableFuture在處理復雜邏輯時會遇到的相似的窘境,反觀Reactor3提供的API,卻可以顯著減少代碼量,提高代碼可閱讀性,尤其是還可以提供一些不錯的功能。

二、流量控制——回壓

在響應式流中,數據流的發出者叫做Publisher,監聽者叫做Subscriber。我們后續就統一直譯叫做“發布者”和“訂閱者”吧。

 

問題來了,假如發布者發出數據的速度和訂閱者處理數據的速度不同的時候,怎么辦呢?訂閱者處理速度快的話,那還好說,但是如果處理速度跟不上數據發出的速度,就像這樣:

回壓

如果沒有流量控制,那么訂閱者會被發布者快速產生的數據流淹沒。就像在一個流水線上,如果某個工位處理比較慢,而上游下料比較快的話,這個工位的工人師傅就吃不消了,這個時候他需要一種途徑來告訴上游下料慢一些。

同樣的,訂閱者也需要有一種能夠向上游反饋流量需求的機制:

title

這種能夠向上游反饋流量請求的機制就叫做回壓(backpressure,也有翻譯為“背壓”的)。

在具體的使用過程中,回壓的處理會涉及不同的策略。舉兩個例子以便於理解:

舉例:緩存的策略

緩存

如圖,訂閱者處理完一個元素的時候通過request(1)跟發布者再請求一個元素。由於發布者的數據不能很快被訂閱者處理掉,那么發布者會將未處理的數據元素緩存起來。

這種處理方式與消息隊列有些相似之處,發布者需要維護一個隊列用來緩存還沒有被處理的元素。通常用於對數據准確性要求比較高的場景,比如發布者這兒是突然到來的數據高峰,都是要保存到數據庫的,作為訂閱者的數據持久層沒有那么快的處理速度,那么發布者就需要將數據暫時緩存起來。

舉例:丟棄的策略

丟棄

如圖,發布者不需要緩存來不及處理的數據,而是直接丟棄,當訂閱者請求數據的時候,會拿到發布者那里最近的一個數據元素。比如我們在做一個監控系統,后台的監控數據以每秒10個的速度產生,而前端界面只需要每秒鍾更新一下監控數據即可,那作為發布者的后台就不用緩存數據了,因為這種時效性強的場景,用不到的數據直接丟掉即可。

在后續的實戰階段,我們還會再深入了解回壓的作用原理。

1.2.3 總結

以上就是響應式流的兩個核心特點:異步非阻塞,以及基於“回壓”機制的流量控制。

這樣我們有了基於響應式流的“升級版”的響應式編程:

響應式編程

Reactor3和RxJava2都是具有以上特點的響應式流的具體實現庫。

響應式編程通常作為面向對象編程中的“觀察者模式”(Observer design pattern)的一種擴展。 響應式流(reactive streams)與“迭代子模式”(Iterator design pattern)也有相通之處, 因為其中也有 Iterable-Iterator 這樣的對應關系。主要的區別在於,Iterator 是基於 “拉取”(pull)方式的,而響應式流是基於“推送”(push)方式的。

使用 iterator 是一種“命令式”(imperative)編程范式,因為什么時候獲取下一個元素取決於開發者。在響應式流中,相對應的角色是“發布者 - 訂閱者”(Publisher-Subscriber),當有新的值到來的時候,反過來由發布者(Publisher) 通知訂閱者(Subscriber),這種“推送”模式是響應式的關鍵。此外,對推送來的數據的操作 是通過一種聲明式(declaratively)而不是命令式(imperatively)的方式表達的:開發者通過 描述“處理流程”來定義對數據流的處理邏輯。

 轉自:https://blog.51cto.com/liukang/2090183


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM