響應式編程 系列文章目錄
(二)Flux入門學習:流的概念,特性和基本操作
(三)Flux深入學習:流的高級特性和進階用法
(四)reactor-core響應式api如何測試和調試?
(五)Spring reactive: Spring WebFlux的使用
(六)Spring reactive: webClient的使用
引言
Spring framework 5 的一大新特性:響應式編程(Reactive Programming)。那么什么是響應式?他能給我們帶來什么?如何優雅地使用?本系列會從最基礎的概念和簡單的api講起,再慢慢深入探討響應式的一些高級特性,最后講解實戰內容,例如WebFlux和WebClient等在Spring boot中的使用,如何測試和調試。
想要了解原理的話,美團點評的這篇博客 Java NIO淺析 非常適合入門。
簡單地說:
當我們調用socket.read()、socket.write()這類阻塞函數的時候,這類函數不能立即返回,也無法中斷,需要等待socket可讀或者可寫,才會返回,因此一個線程只能處理一個請求。在這等待的過程中,cpu並不干活,(即阻塞住了),那么cpu的資源就沒有很好地利用起來。因此對於這種情況,我們使用多線程來提高cpu資源的利用率:在等待的這段時間,就可以切換到別的線程去處理事件,直到socket可讀或可寫了,通過中斷信號通知cpu,再切換回來繼續處理數據。例如線程A正在等待socket可讀,而線程B已經就緒了,那么就可以先切換到線程B去處理。雖然上下文切換也會花一些時間,但是遠比阻塞在線程A這里空等要好。當然計算機內部實際的情況比這復雜得多。
而NIO的讀寫函數可以立刻返回,這就給了我們不開線程利用CPU的最好機會:如果一個連接不能讀寫(socket.read()返回0或者socket.write()返回0),我們可以把這件事記下來。因此只需要一個線程不斷地輪詢這些事件,一旦有就緒的時間,處理即可。不需要多線程。
阻塞型IO
- 需要多線程,即需要很大的線程池。
- 每個請求都要有一個單獨的線程去處理。
非阻塞型IO
- 只需要數量非常少的線程。
- 固定的幾個工作線程去處理事件。
使用NIO我們能得到什么?
- 事件驅動模型
- 避免多線程
- 單線程處理多任務
- 非阻塞I/O,I/O讀寫不再阻塞,而是返回0
- 基於block的傳輸,通常比基於流的傳輸更高效
- 更高級的IO函數,zero-copy
- IO多路復用大大提高了Java網絡應用的可伸縮性和實用性
響應式編程入門
響應式編程就是基於reactor的思想,當你做一個帶有一定延遲的才能夠返回的io操作時,不會阻塞,而是立刻返回一個流,並且訂閱這個流,當這個流上產生了返回數據,可以立刻得到通知並調用回調函數處理數據。
基本模型
我們首先需要理解響應式編程的基本模型:
Flux
Reactor中的發布者(Publisher)由Flux和Mono兩個類定義,它們都提供了豐富的操作符(operator)。一個Flux對象代表一個包含0..N個元素的響應式序列,元素可以是普通對象、數據庫查詢的結果、http響應體,甚至是異常。而一個Mono對象代表一個包含零/一個(0..1)元素的結果。上圖就是一個Flux類型的數據流,Flux往流上發送了3個元素,Subscriber通過訂閱這個流來接收通知。
如何創建一個流?最簡單的方式有以下幾種:
//創建一個流,並直接往流上發布一個值為value數據 Flux.just(value); //通過list創建一個流,往流上依次發布list中的數據 Flux.fromIterable(list); //創建一個流,並向流上從i開始連續發布n個數據,數據類型為Integer Flux.range(i, n); //創建一個流,並定時向流上發布一個數據,數據從0開始遞增,數據類型為Long Flux.interval(Duration.ofSeconds(n));
既然是“數據流”的發布者,Flux和Mono都可以發出三種“數據信號”:元素值、錯誤信號、完成信號,錯誤信號和完成信號都是終止信號,完成信號用於告知下游訂閱者該數據流正常結束,錯誤信號終止數據流的同時將錯誤傳遞給下游訂閱者。
Subscriber
subscriber是一個訂閱者,他只有非常簡單的4個接口:
public interface Subscriber<T> { void onSubscribe(Subscription var1); //收到下一個元素值信號時的行為 void onNext(T var1); //收到錯誤信號時的行為 void onError(Throwable var1); //收到終止信號時的行為 void onComplete(); }
Subscriber必須要訂閱一個Flux才能夠接收通知:
flux.subscribe( value -> handleData(value), error -> handleError(error), () -> handleComplete() );
上面這個例子通過lambda表達式,定義了Subscriber分別在收到消息,收到錯誤,和消息流結束時的行為,當Subscriber接收到一個新數據,就會異步地執行handleData方法處理數據。
簡單例子:
接下來我們創建幾個最簡單的流來試一下:
首先我們新建一個maven項目,引入reactor的類庫:
<dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.2.3.RELEASE</version> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <version>3.2.3.RELEASE</version> <scope>test</scope> </dependency> </dependencies>
編寫代碼如下:
public class ReactorTests { @After public void after() { sleep(30_000); } @Test public void testJust() { Flux.just("hello", "world") .subscribe(System.out::println); } @Test public void testList() { List<String> words = Arrays.asList( "hello", "reactive", "world" ); Flux.fromIterable(words) .subscribe(System.out::println); } @Test public void testRange() { Flux.range(1, 10) .subscribe(System.out::println); } @Test public void testInterval() { Flux.interval(Duration.ofSeconds(1)) .subscribe(System.out::println); } }
訂閱這些流,收到數據之后只是簡單地把它打印出來,運行這些Test,就能夠看到訂閱者在接收到流上的數據時,異步地去處理這些數據。