響應式編程系列(一):什么是響應式編程?reactor入門


響應式編程 系列文章目錄

(一)什么是響應式編程?reactor入門

(二)Flux入門學習:流的概念,特性和基本操作

(三)Flux深入學習:流的高級特性和進階用法

(四)reactor-core響應式api如何測試和調試?

(五)Spring reactive: Spring WebFlux的使用

(六)Spring reactive: webClient的使用

引言

  Spring framework 5 的一大新特性:響應式編程(Reactive Programming)。那么什么是響應式?他能給我們帶來什么?如何優雅地使用?本系列會從最基礎的概念和簡單的api講起,再慢慢深入探討響應式的一些高級特性,最后講解實戰內容,例如WebFlux和WebClient等在Spring boot中的使用,如何測試和調試。

  想要了解原理的話,美團點評的這篇博客 Java NIO淺析 非常適合入門。

簡單地說:

  當我們調用socket.read()、socket.write()這類阻塞函數的時候,這類函數不能立即返回,也無法中斷,需要等待socket可讀或者可寫,才會返回,因此一個線程只能處理一個請求。在這等待的過程中,cpu並不干活,(即阻塞住了),那么cpu的資源就沒有很好地利用起來。因此對於這種情況,我們使用多線程來提高cpu資源的利用率:在等待的這段時間,就可以切換到別的線程去處理事件,直到socket可讀或可寫了,通過中斷信號通知cpu,再切換回來繼續處理數據。例如線程A正在等待socket可讀,而線程B已經就緒了,那么就可以先切換到線程B去處理。雖然上下文切換也會花一些時間,但是遠比阻塞在線程A這里空等要好。當然計算機內部實際的情況比這復雜得多。

  而NIO的讀寫函數可以立刻返回,這就給了我們不開線程利用CPU的最好機會:如果一個連接不能讀寫(socket.read()返回0或者socket.write()返回0),我們可以把這件事記下來。因此只需要一個線程不斷地輪詢這些事件,一旦有就緒的時間,處理即可。不需要多線程。

 

阻塞型IO

  • 需要多線程,即需要很大的線程池。
  • 每個請求都要有一個單獨的線程去處理。

 

非阻塞型IO

  • 只需要數量非常少的線程。
  • 固定的幾個工作線程去處理事件。

 

使用NIO我們能得到什么?

  • 事件驅動模型
  • 避免多線程
  • 單線程處理多任務
  • 非阻塞I/O,I/O讀寫不再阻塞,而是返回0
  • 基於block的傳輸,通常比基於流的傳輸更高效
  • 更高級的IO函數,zero-copy
  • IO多路復用大大提高了Java網絡應用的可伸縮性和實用性

響應式編程入門

  響應式編程就是基於reactor的思想,當你做一個帶有一定延遲的才能夠返回的io操作時,不會阻塞,而是立刻返回一個流,並且訂閱這個流,當這個流上產生了返回數據,可以立刻得到通知並調用回調函數處理數據。

 

基本模型

我們首先需要理解響應式編程的基本模型:

 

Flux

  Reactor中的發布者(Publisher)由FluxMono兩個類定義,它們都提供了豐富的操作符(operator)。一個Flux對象代表一個包含0..N個元素的響應式序列,元素可以是普通對象、數據庫查詢的結果、http響應體,甚至是異常。而一個Mono對象代表一個包含零/一個(0..1)元素的結果。上圖就是一個Flux類型的數據流,Flux往流上發送了3個元素,Subscriber通過訂閱這個流來接收通知。

如何創建一個流?最簡單的方式有以下幾種:

//創建一個流,並直接往流上發布一個值為value數據
Flux.just(value);

//通過list創建一個流,往流上依次發布list中的數據
Flux.fromIterable(list);

//創建一個流,並向流上從i開始連續發布n個數據,數據類型為Integer
Flux.range(i, n);

//創建一個流,並定時向流上發布一個數據,數據從0開始遞增,數據類型為Long
Flux.interval(Duration.ofSeconds(n));

既然是“數據流”的發布者,Flux和Mono都可以發出三種“數據信號”:元素值、錯誤信號、完成信號,錯誤信號和完成信號都是終止信號,完成信號用於告知下游訂閱者該數據流正常結束,錯誤信號終止數據流的同時將錯誤傳遞給下游訂閱者。

Subscriber 

subscriber是一個訂閱者,他只有非常簡單的4個接口:

public interface Subscriber<T> {
    void onSubscribe(Subscription var1);

    //收到下一個元素值信號時的行為
    void onNext(T var1);

    //收到錯誤信號時的行為
    void onError(Throwable var1);

    //收到終止信號時的行為
    void onComplete();
}

Subscriber必須要訂閱一個Flux才能夠接收通知:

flux.subscribe(
    value -> handleData(value),
    error -> handleError(error),
    () -> handleComplete()
);

上面這個例子通過lambda表達式,定義了Subscriber分別在收到消息,收到錯誤,和消息流結束時的行為,當Subscriber接收到一個新數據,就會異步地執行handleData方法處理數據。

 

簡單例子:

接下來我們創建幾個最簡單的流來試一下:

首先我們新建一個maven項目,引入reactor的類庫:

<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
        <version>3.2.3.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <version>3.2.3.RELEASE</version>
        <scope>test</scope>
    </dependency>
</dependencies>

 

編寫代碼如下:

public class ReactorTests {

    @After
    public void after() {
        sleep(30_000);
    }

    @Test
    public void testJust() {
        Flux.just("hello", "world")
            .subscribe(System.out::println);
    }

    @Test
    public void testList() {
        List<String> words = Arrays.asList(
            "hello",
            "reactive",
            "world"
        );

        Flux.fromIterable(words)
            .subscribe(System.out::println);
    }

    @Test
    public void testRange() {
        Flux.range(1, 10)
            .subscribe(System.out::println);
    }

    @Test
    public void testInterval() {
        Flux.interval(Duration.ofSeconds(1))
            .subscribe(System.out::println);
    }
}

訂閱這些流,收到數據之后只是簡單地把它打印出來,運行這些Test,就能夠看到訂閱者在接收到流上的數據時,異步地去處理這些數據。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM