Reactor 3 與之前學習的RxJava是同一類(反應式編程)框架,基本概念大致差不多,簡單記錄一下:
Reactor 3 利用了java 8中的CompletableFuture、Stream、Duration,在此基礎上發展出了二個更通用的模型:Flux及Mono.
一、Flux
Flux 簡單點講,就是一個可以發射1到N個元素的異步"發射器",官方給的示例圖如下:

1,2,3...這些顏色各異的小圓,代表正常發射出來的數據;(對應onNext方法)
上右黑色的豎線表示發送完成;(對應onComplete方法)
如果發射過程中出現異常,豎線用大紅叉叉表示;(對應onError方法)
二、Mono
相對Flux而言,Mono最多只能發射1個元素,示例圖如下:

三、Gradle依賴
apply plugin: 'java'
apply plugin: 'idea'
repositories {
maven {
url "http://maven.aliyun.com/nexus/content/groups/public/"
}
mavenCentral()
jcenter()
}
dependencies {
compile 'io.projectreactor:reactor-core:3.1.6.RELEASE'
testCompile('io.projectreactor:reactor-test:3.1.6.RELEASE')
testCompile('junit:junit:4.8.2')
}
四、常用方法
4.1 、just/fromArray/range
import org.junit.Test;
import reactor.core.publisher.Flux;
public class FluxTest {
@Test
public void fluxJustTest() {
Flux.just("1", "A", 3).subscribe(System.out::println);
}
}
點擊just可以看到源碼:
/**
* Create a {@link Flux} that emits the provided elements and then completes.
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/justn.png" alt="">
* <p>
* @param data the elements to emit, as a vararg
* @param <T> the emitted data type
*
* @return a new {@link Flux}
*/
@SafeVarargs
public static <T> Flux<T> just(T... data) {
return fromArray(data);
}
注意:注釋中給了一個圖片地址 https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/justn.png
從圖上看,就是發射一串數據。最終的輸出也是如此:
1 A 3
其它類似的方法還有:fromArray、fromIterable、range 大家可以自行嘗試。
4.2、interval
@Test
public void fluxIntervalTest() throws InterruptedException {
Flux.interval(Duration.of(500, ChronoUnit.MILLIS)).subscribe(System.out::println);
//防止程序過早退出,放一個CountDownLatch攔住
CountDownLatch latch = new CountDownLatch(1);
latch.await();
}
顧名思義就是每隔一定時間,發射一個數據(從0開始),上面的示例表示每隔500毫秒,從0開始遞增,發射1個數字,輸出如下:
0 1 2 3 ...
4.3、empty/never/error
@Test
public void fluxEmptyTest() {
Flux.empty().subscribe(System.out::println);
}
empty方法幾乎啥都不干,就發一個結束消息完事,示意圖如下:

empty源碼如下:
public static <T> Flux<T> empty() {
return FluxEmpty.instance();
}
與之接近的,還有never方法
public static <T> Flux<T> never() {
return FluxNever.instance();
}
二者區別在於:empty里面至少還有一個結束消息,而never則是真的啥都沒有。
還有一個比較特別的方法:error,只包含一個錯誤消息

示例代碼如下:
Flux.error(new Exception("a wo,something is wrong!")).subscribe(System.out::println);
4.4、 generate/create
前面的幾個方法,開發者不用顯式的調用complete,而generate則需要調用,否則序列就不會終止。
@Test
public void fluxGenerateTest() {
Flux.generate(i -> {
i.next("AAAAA");
//i.next("BBBBB");//注意generate中next只能調用1次
i.complete();
}).subscribe(System.out::println);
final Random rnd = new Random();
Flux.generate(ArrayList::new, (list, item) -> {
Integer value = rnd.nextInt(100);
list.add(value);
item.next(value);
if (list.size() >= 10) {
item.complete();
}
return list;
}).subscribe(System.out::println);
}
輸出如下:
AAAAA 85 80 32 19 90 72 0 37 46 33
注:generate中next只能調1次,否則會報錯 reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: More than one call to onNext

create方法則沒有next的調用次數限制,見下面的代碼:
@Test
public void fluxCreateTest() {
Flux.create(i -> {
i.next("A");
i.next("B");
i.complete();
}).subscribe(System.out::println);
final Random rnd = new Random();
Flux.create(item -> {
for (int i = 0; i < 10; i++) {
item.next(i);
}
}).subscribe(System.out::println);
}
4.5、buffer/bufferTimeout/window/windowTimeout
@Test
public void fluxBufferTest() throws InterruptedException {
Flux.range(0, 10).buffer(3).subscribe(System.out::println);
System.out.println("--------------");
Flux.interval(Duration.of(1, ChronoUnit.SECONDS))
.bufferTimeout(2, Duration.of(2, ChronoUnit.SECONDS))
.subscribe(System.out::println);
//防止程序過早退出,放一個CountDownLatch攔住
CountDownLatch latch = new CountDownLatch(1);
latch.await();
}
字面意思理解,buffer指flux產生的數據,先緩沖起來,等緩沖區滿了以后,才真正發射,所以上面的代碼,第1段的意思是,0-9這10個數字,每次緩存3個,等3個數攢齊后,才輸出。
而另一個版本bufferTimeout則不是根據元素的個數來緩沖,而是根據時間,第2段代碼的意思是:flux每隔1秒,產生1個遞增數字,而緩沖區每2秒才算充滿,相當於每湊足2個數字后,才輸出。
[0, 1, 2] [3, 4, 5] [6, 7, 8] [9] -------------- [0, 1] [2, 3] [4, 5]
buffer示例圖如下:
bufferTimeout示例圖如下:

另外還有二個接近的方法window/windowTimeout,只是window/windowTimeout調用后的結果是Flux<Flux<T>>,處理過程中產生的流為UnicastProcessor對象。
window示意圖:

windowTimeout示意圖:

4.6、filter
@Test
public void fluxFilterTest() {
Flux.range(0, 10).filter(c -> c % 2 == 0).subscribe(System.out::println);
}
輸出:
0 2 4 6 8
示意圖:

4.7 zipWith
@Test
public void fluxZipTest() {
Flux.just("A", "B").zipWith(Flux.just("1", "2", "3")).subscribe(System.out::println);
}

就是把各組元素,按位組合(就算用拉鏈袋封起來一樣,因此得名),注意:這里有一個木桶原則,即 元素最少的"組",決定了最后輸出的"組"個數。
上面代碼的輸出為:
[A,1] [B,2]
4.8 take/takeLast/takeWhile/takeUntil
@Test
public void fluxTakeTest() {
Flux.range(1, 10).take(3).subscribe(System.out::println);
System.out.println("--------------");
Flux.range(1, 10).takeLast(3).subscribe(System.out::println);
System.out.println("--------------");
Flux.range(1, 10).takeWhile(c -> c > 1 && c < 5).subscribe(System.out::println);
System.out.println("--------------");
Flux.range(1, 10).takeUntil(c -> c > 1 && c < 5).subscribe(System.out::println);
System.out.println("--------------");
Flux.range(1, 4).takeUntilOther(Flux.never()).subscribe(System.out::println);
}
take與takeLast很好理解,就是前n個或后n個。 takeWhile與takeUntil 需要記憶一下:
takeWhile 是先判斷條件是否成立,然后再決定是否取元素(換言之,如果一開始條件不成立,就直接終止了);
takeUntil 是先取元素,直到遇到條件成立,才停下
takeUntilOther 則是先取元素,直到別一個Flux序列產生元素
所以上面的輸出為:
1 2 3 -------------- 8 9 10 -------------- -------------- 1 2 -------------- 1 2 3 4
注意:takeWhile無輸出,因為判斷條件一開始就不成立,直接cancel了;而takeUntilOther由於另一個flux使用了never()相當於沒有任何元素,所以把前1個序列的元素取完,自然結束。
takeWhile的示意圖如下:

takeUntil的示意圖如下:

takeUntilOther的示意圖如下:

reactor的東西比較多,剩下的寫到下篇吧
參考文章:
