# 響應式編程筆記二:寫點代碼
## 新建一個項目
我們使用Reactor庫萊演示。
https://start.spring.io 新建一個空項目,然后添加Reactor Core依賴。
```xml
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.0.0.RC2</version>
</dependency>
```
## What Makes it Functional?
Reactive的基本building block是`a sequence of events`,和兩個主角:一個是events的publisher,一個是events的subscriber。
也可以將`a sequence`叫做`stream`,因為就是。
Reactor將publisher叫做Flux(實現了Reactive Streams的Publisher接口)。RxJava庫非常類似,有很多平行的特性,所以我們會講一下`Observable`。
另外,監獄Reactor 2.0 叫它Stream,會與Java 8中的Streams混淆,所以這里直接使用Reactor 3.0。
### Generators | 生成器
Flux是什么?
是一序列的特定POJO類型的事件的publisher,所以,是泛型的。
Flux<T>,就是T類型事件的publisher。
Flux有一些靜態方法,可以方便地根據不同的sources創建其實例。
```java
//如從一個數組創建:
Flux<String> flux = Flux.just("red", "white", "blue");
```
上面生成了一個Flux,現在可以開工了。
實際上你只能做兩件事:操作它(transform it, or combine it with other sequences)、訂閱它(別忘了是一個publisher)。
### Single Valued Sequences | 單值序列
你會經常遇到僅含有一個或零個元素的sequence,例如按照主鍵查找的repository方法。
Reactor提供了Mono類型,代表了a single valued or empty Flux。
Mono的API非常類似Flux,但更專注,因為不是所有操作符都適合單值序列。
RxJava也有一個,叫Single,還有一個Completable用於empty序列。而Reactor中的empty序列是Mono<Void>。
### Operators | 操作符
Flux有很多方法,幾乎所有方法都是操作符。
我們不會在這里過多的關注它們,因為你可以從Javadocs等地方獲取想要的知識。
我們僅需要知道操作符是什么,以及它能做什么。
例如,如果想要讓Flux中的事件被記錄到標准輸出,你可以調用`.log()`。或者,你可以將其transform - 使用`.map()`:
```java
Flux<String> flux = Flux.just("red", "white", "blue");
Flux<String> upper = flux
.log()
.map(String::toUpperCase);
```
到目前為止,唯一有趣的東西,就是,實際上還沒有任何數據被處理!
對一個Flux調用操作符,就像是在做一個后續執行的計划。
完全是聲明式的,這就是為什么人們叫它`函數式的`。
除非數據開始流動,否則操作符中的邏輯不會被執行 - 而除非有人訂閱了(subscribe)這個Flux(Publisher),否則數據不會開始流動!
Reactive庫中到處都是同樣的聲明式、函數式的數據序列處理,Java 8的Streams也一樣。因此,使用Stream的理念也是一樣的:
```java
Stream<String> stream = Streams.of("red", "white", "blue");
Stream<String> upper = stream.map(value -> {
System.out.println(value);
return value.toUpperCase();
});
```
我們對Flux的觀察,同樣適用於這里:沒有任何數據被處理,僅僅是一個執行的計划。
然而,Flux和Stream之間有一些非常重要的區別,從而使得Stream不適合作為Reactive用例的API。
- Flux擁有更多操作符,但其中多數僅僅是為了方便,真正的區別在於你想什么時候消費數據!
>有一個非常有用的blog,是關於[Reactive Types](https://spring.io/blog/2016/04/19/understanding-reactive-types)的,
在里面描述了不同流式和響應式API的區別 - 通過查看類型的定義,以及如何使用它們。Flux和Stream的區別更是高亮的,更詳細。
### Subscribers | 訂閱者
想要讓數據流動,你必須訂閱Flux,使用某個`subscribe()`方法。只有這些方法才會讓數據流動!
它們會回溯你聲明在序列上的操作符,並請求publisher開始創建數據。
在上面的小例子中,意味着底層的字符串集合會開始迭代。而在更復雜的用例中,可能會觸發 讀取文件、從數據庫中讀取數據或調用一個HTTP服務。
```java
Flux.just("red", "white", "blue")
.log()
.map(String::toUpperCase)
.subscribe(); //注意,這里沒有指定任何消費行為,所以,你懂的!
```
輸出是:
```text
09:17:59.665 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@3ffc5af1)
09:17:59.666 [main] INFO reactor.core.publisher.FluxLog - request(unbounded)
09:17:59.666 [main] INFO reactor.core.publisher.FluxLog - onNext(red)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog - onNext(white)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog - onNext(blue)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog - onComplete()
```
注意,這里的`subscribe()`沒有任何參數,僅僅是請求publisher發送**所有數據** -- 所以僅有一個`request()`被記錄,是`unbounded`。
>這里有個小知識點,request,相當於TCP的ACK,其參數值就是ACK的sliding window 大小!
我們可以看到每個數據被發布時的回調方法`onNext()`,序列結束時的回調方法`onComplete()`,以及最開始的訂閱時的回調方法`onSuscribe()`。
__如果你需要,你可以自己監聽這些事件,使用Flux中的`doOn*()`方法,這些方法都是操作符,不是訂閱者,所以不會導致數據流動__。
`subscribe()`方法有重載,其變體有不同的選項以控制要發生的事情。
一個重要且方便的形式是帶有回調方法的變體。
第一個參數是一個Consumer,用於每個item的回調,你還可以選擇添加一個Consumer - 用於error,以及一個Runnable - 當序列完成時執行。
```java
Flux.just("red", "white", "blue")
.log()
.map(String::toUpperCase)
.subscribe(System.out::println);
```
其輸出:
```text
09:56:12.680 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@59f99ea)
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - request(unbounded)
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - onNext(red)
RED
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - onNext(white)
WHITE
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - onNext(blue)
BLUE
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - onComplete()
```
我們可以控制數據流,讓其`bounded` - 有幾種方式可以達成。
原生API是你從`Subscriber`得到的`Subscription`。相對於上面`subscribe()`的長格式是:
```java
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String t) {
System.out.println(t);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
```
如果想要控制同一時刻最多消費2個item,可以這樣使用`Subscription`:
```java
.subscribe(new Subscriber<String>() {
private long count = 0;
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(2);
}
@Override
public void onNext(String t) {
count++;
if (count>=2) {
count = 0;
subscription.request(2);
}
}
//...
}
```
該`Subscriber` 會一次批發2個items! 這可是一個非常常見的用例,也許你會想着將其提取出來,這樣可讀性就更佳。
不過,先來看看輸出吧:
```text
09:47:13.562 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@61832929)
09:47:13.564 [main] INFO reactor.core.publisher.FluxLog - request(2)
09:47:13.564 [main] INFO reactor.core.publisher.FluxLog - onNext(red)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog - onNext(white)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog - request(2)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog - onNext(blue)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog - onComplete()
```
事實上,批量訂閱是非常常見的用力,所以`Flux`提供了幾個便利的方法。上面的例子可以這樣寫:
```java
Flux.just("red", "white", "blue")
.log()
.map(String::toUpperCase)
.subscribe(null, 2); //FIXME 新版本沒有這個方法了!
```
輸出是:
```text
10:25:43.739 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@4667ae56)
10:25:43.740 [main] INFO reactor.core.publisher.FluxLog - request(2)
10:25:43.740 [main] INFO reactor.core.publisher.FluxLog - onNext(red)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog - onNext(white)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog - request(2)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog - onNext(blue)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog - onComplete()
```
務必注意,新版本已經沒有這個方法了!!!!!
>注意,諸如Spring Reactive Web這種會為你處理sequences的庫,會handle the subscription。
>將這些關注點推到棧底是件很好的事,因為會避免讓你陷入零亂的非阻塞邏輯代碼中,從而變得更有可讀性,便於測試和維護。
>因此,如果能夠避免subscribe to a sequence,或者起碼也可以將那些代碼放入一個處理層,而非業務層,是非常好的。
### Threads, Schedulers and Background Processing | 線程、定時器、背壓處理
上面的所有log,有一個有意思的特點,就是它們都位於main thread,就是調用`subscribe()`的線程。
這揭示了一個重要的特點:Reactor對於線程的使用是非常節儉的,因為這會給你最佳的性能體驗!
如果你一直在使用線程、線程池、異步執行 等方式來榨干服務的每一份性能,也許你會覺得奇怪。
但這是真的:如果沒有必要的線程切換,即便JVM高度優化了線程切換,其速度也不如單線程計算快!
Reactor掌握了這些關鍵點,並用來控制異步處理,並假定你知道你在做什么!
Flux,提供了少數配置方法,用於控制線程邊界。例如,你可以配置使用一個背景線程:Flux.subscribeOn():
```java
Flux.just("red", "white", "blue")
.log()
.map(String::toUpperCase)
.subscribeOn(Schedulers.parallel())
.subscribe(null, 2); //FIXME 舊的方法,不再支持!
```
輸出:
```text
13:43:41.279 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@58663fc3)
13:43:41.280 [parallel-1-1] INFO reactor.core.publisher.FluxLog - request(2)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onNext(red)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onNext(white)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - request(2)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onNext(blue)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onComplete()
```
>注意,當你執行這段代碼的時候,記得不要在處理結束之前退出JVM!
注意,所有的處理都發生在后台的一個線程上。如果處理是CPU密集型,這樣做也行(但無意義,因為不會更快)。
你也可能想要能夠執行IO密集型的處理,甚至可能是阻塞的。這種情況下,你希望盡快的完成,而不會阻塞調用者。那么線程池是你的選擇,這也是你從`Schedulers.parallel()`得到的。
如果想將每個獨立item的處理切換到不同的線程中(上限是線程池的數量),那我們需要將它們打散到不同的publisher中,每個publisher都在一個背景線程中請求結果。
- 一種方式是使用操作符`flatMap()`,會將所有的items映射進一個Publisher(一般是不同類型的),然后返回新類型的sequence:
```java
Flux.just("red", "white", "blue")
.log()
.flatMap(value ->
Mono.just(value.toUpperCase())
.subscribeOn(Schedulers.parallel()),
2)
.subscribe(value -> {
log.info("Consumed: " + value);
})
```
注意,這里使用`flatMap()`將items推到child publisher中,然后我們就可以控制每個item線程的訂閱了。
Reactor的默認行為,是盡可能地使用單線程,因此,如果我們需要使用背景線程處理,那就需要顯式的寫出來。
實際上,這是一些共知的強制並行處理的技巧之一(更多詳見[Reactive Gems issue](https://github.com/reactor/reactive-streams-commons/issues/21))。
輸出:
```text
15:24:36.596 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@6f1fba17)
15:24:36.610 [main] INFO reactor.core.publisher.FluxLog - request(2)
15:24:36.610 [main] INFO reactor.core.publisher.FluxLog - onNext(red)
15:24:36.613 [main] INFO reactor.core.publisher.FluxLog - onNext(white)
15:24:36.613 [parallel-1-1] INFO com.example.FluxFeaturesTests - Consumed: RED
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog - request(1)
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onNext(blue)
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onComplete()
15:24:36.614 [parallel-3-1] INFO com.example.FluxFeaturesTests - Consumed: BLUE
15:24:36.617 [parallel-2-1] INFO com.example.FluxFeaturesTests - Consumed: WHITE
```
注意,現在有多個線程在消費items,而`flatMap()`中的第二個參數`concurrency hint`,則確保任何時刻都只有2 items被處理(意思是總共處理2 items),當然前提是可用。
我們看到很多`request(1)`,是因為系統試圖保留2 items在管道線中,一般來說,此時沒有完成處理。
Reactor 力圖聰明地處理,它會從上游Publisher中pre-fetch items,以消除等待時間。
>3 items 顯得太少了,最好多一些數據。
Flux還有一個`publishOn()`方法,是用於監聽器(如onNext()或者consumer callbacks),而非subscriber:
```java
Flux.just("red", "white", "blue")
.log()
.map(String::toUpperCase)
.subscribeOn(Schedulers.newParallel("sub"))
.publishOn(Schedulers.newParallel("pub"), 2)
.subscribe(value -> {
log.info("Consumed: " + value);
});
```
輸出:
```text
15:12:09.750 [sub-1-1] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@172ed57)
15:12:09.758 [sub-1-1] INFO reactor.core.publisher.FluxLog - request(2)
15:12:09.759 [sub-1-1] INFO reactor.core.publisher.FluxLog - onNext(red)
15:12:09.759 [sub-1-1] INFO reactor.core.publisher.FluxLog - onNext(white)
15:12:09.770 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: RED
15:12:09.771 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: WHITE
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog - request(2)
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog - onNext(blue)
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog - onComplete()
15:12:09.783 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: BLUE
```
注意,consumer callbacks(記錄"Consumed: ..."),是在publisher thread上!
> Notice that the consumer callbacks (logging "Consumed: …") are on the publisher thread pub-1-1. If you take out the subscribeOn() call,
you might see all of the 2nd chunk of data processed on the pub-1-1 thread as well. This, again, is Reactor being frugal with threads
— if there’s no explicit request to switch threads it stays on the same one for the next call, whatever that is.
> We changed the code in this sample from subscribe(null, 2) to adding a prefetch=2 to the publishOn().
In this case the fetch size hint in subscribe() would have been ignored.
### Extractors: The Subscribers from the Dark Side
還有另一種方式訂閱一個sequence,就是調用 `Mono.block()`或者 `Mono.toFuture()` 或者 `Mono.toStream()`(這些都是提取器方法,它們會跳出Reactive類型,而進入一個阻塞式抽象)。
Flux也有一些轉換器`collectList()`和`collectMap()`,會將Flux轉成Mono!
They don’t actually subscribe to the sequence, but they do throw away any control you might have had over the suscription at the level of the individual items.
>警告,一個好的rule是,永遠不要調用一個提取器!當然也有例外(否則方法就不會存在)。一個明顯的例外就是在測試中,因為阻塞以收集結果很有用。
這些方法,就是作為轉換通道,架起Reactive到阻塞式的橋梁。
當你調用`Mono.block()`時,你會享受不到任何Reactive Streams帶來的好處。
這一點,就是Reactive Streams 和 Java 8 Streams的關鍵區別:原生Java Streams只有"all or nothing" 訂閱模式,等價於`Mono.block()`。
當然,`subscribe()`也可以阻塞調用線程,所以跟轉換器方法一樣微信,但你有更多控制 - 你可以使用`subscribeOn()`來阻止阻塞,還可以使用背壓來臨時決定是否繼續。
## Conclusion | 總結
在本文中,我們涉及了Reactive Streams和Reactor APIs的基本內容。
如果你需要了解更多,有很多書籍,但都不能替代編碼,所以,盡量使用[GitHub中的代碼](https://github.com/dsyer/reactive-notes)吧。
[本系列的下一篇文章](https://spring.io/blog/2016/07/20/notes-on-reactive-programming-part-iii-a-simple-http-server-application)會挖掘一些更深的知識:Reactive模型的阻塞式、分發、異步。