Java Reactive Programming


Java Reactive Programming

響應式編程

在 Spring Boot 中,支持了響應式編程,帶來了性能和內存使用方面的優化。

詳見:

困難

但是不同於 async/await 模式,響應式編程也給編碼帶來了一些困難,主要如下:

  • 一個代碼塊只能最多調用一個響應式 API。
  • null 處理不友好,甚至是災難性的。

響應式編程的規則

  • 控制層,返回響應式對象,大多數情況下使用 Mono<T>

  • 服務層,使用@Transactional 的 API 必須返回響應式對象。

  • 數據訪問層(R2DBC)返回響應式對象: Mono<T>, Flux<T>

  • 使用響應式方法的 API 盡量返回響應式對象。

  • 不要使用任何 block(), blockFirst(), share().block() 等 API,會引起嚴重的性能問題。

  • 在重載傳統接口的情況下使用 subscribe()

  • 對於計數的 API,使用 Mono<Long> 作為返回對象。
    這是因為 Flux.count() 返回的是一個Mono<Long>。因此在其它的計數 API 中使用 Mono<Long> 作為返回對象,讓我們可以保持一致。

  • Mono<Void>null

    • 響應式 API 不能返回 null 或者 Mono.just(null) 或者其等價方式。
      會引起下面的錯誤:

      Caused by: java.lang.NullPointerException: The mapper returned a null value.

    • Mono.empty() 不能調用隨后的映射方法 map()flatMap()transform()等。
    • Mono.empty() 在 doOnSuccess() 等函數中獲取值是 null
    • Flux 中使用元素 Mono<Void> 可以調用隨后的映射方法 colllectList()等方法。
      Mono<Void> 不會被記入 count()colllectList()
    • 建議: 避免定義返回 Mono<Void>的方法。
      這種返回不能調用隨后的映射方法 map()flatMap()transform()等方法。
      一個例外是,這個方法在控制層的最后被調用。

響應式編程模式

響應式編程是一種流編程,我把編程模式分為: 啟動模式、映射模式、返回模式、異常模式。

空模式

  • 響應式 API 不能返回 null 或者 Mono.just(null) 或者其等價方式。
    會引起下面的錯誤:

    Caused by: java.lang.NullPointerException: The mapper returned a null value.

	public static Mono<Void> monoNullTest() {
		return Mono.just(null);
	}

	monoNullTest().log().subscribe();

/*
Exception in thread "main" java.lang.NullPointerException: value
        at java.base/java.util.Objects.requireNonNull(Objects.java:246)
        at reactor.core.publisher.MonoJust.<init>(MonoJust.java:35)
        at reactor.core.publisher.Mono.just(Mono.java:719)
        at demo.ReactiveVoidTest.monoNullTest(ReactiveVoidTest.java:22)
        at demo.ReactiveVoidTest.main(ReactiveVoidTest.java:16)
*/
  • Mono.empty() 不能調用隨后的映射方法 map()flatMap()transform()等。
	public static Mono<Integer> monoVoidTest() {
		logger.info("case: mono void test");
		return Mono.empty().map(o -> {
			logger.info(MessageFormat.format("map: {0}", o));
			return o;
		}).doOnSuccess(o -> {
			logger.info(MessageFormat.format("doOnSuccess: {0}", o));
		}).thenReturn(1);
	}
	monoVoidTest().log().subscribe();

/*
10:52:06.993 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
10:52:07.023 [main] INFO reactor.Mono.IgnoreThen.1 - onSubscribe(MonoIgnoreThen.ThenIgnoreMain)
10:52:07.030 [main] INFO reactor.Mono.IgnoreThen.1 - request(unbounded)
10:52:07.036 [main] INFO demo.ReactiveVoidTest - doOnSuccess: null
10:52:07.038 [main] INFO reactor.Mono.IgnoreThen.1 - onNext(1)
10:52:07.042 [main] INFO reactor.Mono.IgnoreThen.1 - onComplete()
*/
  • Flux 中使用元素 Mono<Void> 可以調用隨后的映射方法 colllectList()等方法。
    Mono<Void> 不會被記入 count()colllectList()
	public static Mono<String> fluxVoidTest() {
		logger.info("case: flux void test");
		return Flux.fromIterable(Arrays.asList(0, 1, 2)).flatMap(o -> {
			logger.info(MessageFormat.format("emit an empty: {0}", o));
			return Mono.empty();
		}).map(o -> {
			logger.info(MessageFormat.format("map: {0}", o));
			return o;
		}).count().doOnSuccess(o -> {
			logger.info(MessageFormat.format("doOnSuccess: count is {0}", o));
		}).map(o -> {
			return "abc";
		});
	}

	fluxVoidTest().log().subscribe();

/*
11:20:35.788 [main] INFO demo.ReactiveVoidTest - case: flux void test
11:20:35.986 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
11:20:36.121 [main] INFO reactor.Mono.MapFuseable.1 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
11:20:36.127 [main] INFO reactor.Mono.MapFuseable.1 - | request(unbounded)
11:20:36.129 [main] INFO demo.ReactiveVoidTest - emit an empty: 0
11:20:36.130 [main] INFO demo.ReactiveVoidTest - emit an empty: 1
11:20:36.131 [main] INFO demo.ReactiveVoidTest - emit an empty: 2
11:20:36.133 [main] INFO demo.ReactiveVoidTest - doOnSuccess: count is 0
11:20:36.134 [main] INFO reactor.Mono.MapFuseable.1 - | onNext(abc)
11:20:36.138 [main] INFO reactor.Mono.MapFuseable.1 - | onComplete()
*/

異常模式

響應式編程對於異常處理,建議使用下面的方法:

  • 拋出 RuntimeException
  • 要小心使用 Mono.error(t) 方法。
  • 在 Mono API 中返回 Mono.error(t)
    會被當成一個 MonoError 值被處理,
    可以在map, doOnNext, doOnSuccess處理。
    不會被 doOnError處理。
	public static Mono<Integer> monoErrorTest() {
		logger.info("case: mono error test");
		return Mono.just(0).onErrorStop().map(o -> {
			if (o < 2) {
				return Mono.error(new RuntimeException("test"));
			}
			return o;
		}).doOnError(e -> {
			logger.info(MessageFormat.format("doOnError: {0}", e.getMessage()));
			throw new RuntimeException(e);
		}).doOnNext(o -> {
			logger.info(MessageFormat.format("doOnNext: {0}", o));
		}).doOnSuccess(o -> {
			logger.info(MessageFormat.format("doOnSuccess: {0}", o));
		}).thenReturn(1);
	}

	monoErrorTest().log().subscribe();

/*
00:08:22.338 [main] INFO demo.ReactiveErrorDemo - case: mono error test
00:08:22.460 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
00:08:22.484 [main] INFO reactor.Mono.IgnoreThen.1 - onSubscribe(MonoIgnoreThen.ThenIgnoreMain)
00:08:22.488 [main] INFO reactor.Mono.IgnoreThen.1 - request(unbounded)
00:08:22.495 [main] INFO demo.ReactiveErrorDemo - doOnNext: MonoError
00:08:22.496 [main] INFO demo.ReactiveErrorDemo - doOnSuccess: MonoError
00:08:22.497 [main] INFO reactor.Mono.IgnoreThen.1 - onNext(1)
00:08:22.499 [main] INFO reactor.Mono.IgnoreThen.1 - onComplete()
*/
  • 在 Mono API 中拋出異常,會被doOnError截獲,並且跳過 map, doOnSuccess
	public static Mono<Integer> monoExceptionTest() {
		logger.info("case: mono error test");
		return Mono.just(0).map(o -> {
			if (o < 2) {
				throw new RuntimeException("test");
			}
			return o;
		}).map(o -> {
			return 2;
		}).doOnSuccess(o -> {
			logger.info(MessageFormat.format("doOnSuccess: {0}", o));
		}).doOnError(e -> {
			logger.info(MessageFormat.format("doOnError: {0}", e.getMessage()));
		});
	}
	monoExceptionTest().log().subscribe();
/*
00:08:22.499 [main] INFO demo.ReactiveErrorDemo - case: mono exception test
00:08:22.502 [main] INFO reactor.Mono.PeekTerminal.2 - | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
00:08:22.502 [main] INFO reactor.Mono.PeekTerminal.2 - | request(unbounded)
00:08:22.508 [main] INFO demo.ReactiveErrorDemo - doOnError: test
00:08:22.510 [main] ERROR reactor.Mono.PeekTerminal.2 - | onError(java.lang.RuntimeException: test)
00:08:22.515 [main] ERROR reactor.Mono.PeekTerminal.2 -
java.lang.RuntimeException: test
        at demo.ReactiveErrorDemo.lambda$0(ReactiveErrorDemo.java:26)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:281)
        at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:354)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:354)
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139)
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
        at reactor.core.publisher.LambdaMonoSubscriber.onSubscribe(LambdaMonoSubscriber.java:121)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178)
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152)
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:263)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:263)
        at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
        at reactor.core.publisher.Mono.subscribeWith(Mono.java:4515)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4232)
        at demo.ReactiveErrorDemo.main(ReactiveErrorDemo.java:18)
00:08:22.520 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: test
Caused by: java.lang.RuntimeException: test
        at demo.ReactiveErrorDemo.lambda$0(ReactiveErrorDemo.java:26)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:281)
        at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:354)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:354)
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139)
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
        at reactor.core.publisher.LambdaMonoSubscriber.onSubscribe(LambdaMonoSubscriber.java:121)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178)
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152)
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:263)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:263)
        at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
        at reactor.core.publisher.Mono.subscribeWith(Mono.java:4515)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4232)
        at demo.ReactiveErrorDemo.main(ReactiveErrorDemo.java:18)
*/
  • 在 Flux API 中返回 Mono.error(t)
    會被當成一個異常被處理,
    不會在map, doOnNext, doOnSuccess處理。
    會被 doOnError處理。
	public static Mono<String> fluxErrorTest() {
		logger.info("case: flux error test");
		return Flux.fromIterable(Arrays.asList(0, 1, 2)).flatMap(o -> {
			logger.info(MessageFormat.format("flatMap: {0}", o));
			if (o == 1) {
				return Mono.error(new RuntimeException("test"));
			}
			return Mono.just(o);
		}).map(o -> {
			logger.info(MessageFormat.format("map: {0}", o));
			return o;
		}).count().doOnSuccess(o -> {
			logger.info(MessageFormat.format("doOnSuccess: count is {0}", o));
		}).doOnError(e -> {
			logger.info(MessageFormat.format("doOnError: {0}", e.getMessage()));
		}).map(o -> {
			return "abc";
		});
	}

fluxErrorTest().log().subscribe();
/*
00:18:12.204 [main] INFO demo.ReactiveErrorDemo - case: flux error test
00:18:12.367 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
00:18:12.472 [main] INFO reactor.Mono.MapFuseable.1 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
00:18:12.476 [main] INFO reactor.Mono.MapFuseable.1 - | request(unbounded)
00:18:12.478 [main] INFO demo.ReactiveErrorDemo - flatMap: 0
00:18:12.478 [main] INFO demo.ReactiveErrorDemo - map: 0
00:18:12.478 [main] INFO demo.ReactiveErrorDemo - flatMap: 1
00:18:12.484 [main] INFO demo.ReactiveErrorDemo - doOnError: test
00:18:12.486 [main] ERROR reactor.Mono.MapFuseable.1 - | onError(java.lang.RuntimeException: test)
00:18:12.491 [main] ERROR reactor.Mono.MapFuseable.1 -
java.lang.RuntimeException: test
        at demo.ReactiveErrorDemo.lambda$13(ReactiveErrorDemo.java:82)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386)
        at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:272)
        at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:230)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371)
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165)
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
        at reactor.core.publisher.Mono.subscribeWith(Mono.java:4515)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4232)
        at demo.ReactiveErrorDemo.main(ReactiveErrorDemo.java:19)
00:18:12.495 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: test
Caused by: java.lang.RuntimeException: test
        at demo.ReactiveErrorDemo.lambda$13(ReactiveErrorDemo.java:82)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386)
        at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:272)
        at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:230)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371)
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165)
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
        at reactor.core.publisher.Mono.subscribeWith(Mono.java:4515)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4232)
        at demo.ReactiveErrorDemo.main(ReactiveErrorDemo.java:19)
*/
  • 在 Flux API 中拋出異常,和返回 Mono.error() 一樣
    會被當成一個異常被處理,
    不會在map, doOnNext, doOnSuccess處理。
    會被 doOnError處理。
	public static Mono<String> fluxExceptionTest() {
		logger.info("case: flux error test");
		return Flux.fromIterable(Arrays.asList(0, 1, 2)).flatMap(o -> {
			logger.info(MessageFormat.format("flatMap: {0}", o));
			if (o == 1) {
				throw new RuntimeException("test");
			}
			return Mono.just(o);
		}).map(o -> {
			logger.info(MessageFormat.format("map: {0}", o));
			return o;
		}).count(

		).doOnSuccess(o -> {
			logger.info(MessageFormat.format("doOnSuccess: count is {0}", o));
		}).doOnError(e -> {
			logger.info(MessageFormat.format("doOnError: {0}", e.getMessage()));
		}).map(o -> {
			return "abc";
		});
	}

	fluxExceptionTest().log().subscribe();

/*
00:20:38.104 [main] INFO demo.ReactiveErrorDemo - case: flux error test
00:20:38.265 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
00:20:38.358 [main] INFO reactor.Mono.MapFuseable.1 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
00:20:38.364 [main] INFO reactor.Mono.MapFuseable.1 - | request(unbounded)
00:20:38.365 [main] INFO demo.ReactiveErrorDemo - flatMap: 0
00:20:38.366 [main] INFO demo.ReactiveErrorDemo - map: 0
00:20:38.366 [main] INFO demo.ReactiveErrorDemo - flatMap: 1
00:20:38.373 [main] INFO demo.ReactiveErrorDemo - doOnError: test
00:20:38.376 [main] ERROR reactor.Mono.MapFuseable.1 - | onError(java.lang.RuntimeException: test)
00:20:38.381 [main] ERROR reactor.Mono.MapFuseable.1 -
java.lang.RuntimeException: test
        at demo.ReactiveErrorDemo.lambda$8(ReactiveErrorDemo.java:61)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386)
        at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:272)
        at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:230)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371)
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165)
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
        at reactor.core.publisher.Mono.subscribeWith(Mono.java:4515)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4232)
        at demo.ReactiveErrorDemo.main(ReactiveErrorDemo.java:20)
00:20:38.385 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: test
Caused by: java.lang.RuntimeException: test
        at demo.ReactiveErrorDemo.lambda$8(ReactiveErrorDemo.java:61)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386)
        at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:272)
        at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:230)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371)
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165)
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
        at reactor.core.publisher.Mono.subscribeWith(Mono.java:4515)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4232)
        at demo.ReactiveErrorDemo.main(ReactiveErrorDemo.java:20)
*/

啟動模式

  • 常用的啟動模式
Mono.just(data);
Mono.fromXXX(xxx);
Flux.from(data);

  • 冷響應式
Mono.defer(() -> supplier);

冷響應式是指,在啟動時,不會立即執行,而是在被訂閱時才執行。
下面 IllegalArgumentException 會在 subscribe 后才會被調用。

// Sample code
	private Mono<Integer> monoAdd(Integer a, Integer b) {

		return Mono.defer(() -> {
			if (a == null) {
				throw new IllegalArgumentException("a is null");
			}

			if (b == null) {
				throw new IllegalArgumentException("b is null");
			}

			return Mono.just(a + b);
		});
	}

映射模式

這里討論的映射模式,大都是關於多個響應式 API 之間的協作。

平行模式(flat pattern)

主要是用 flatMap() 方法。代碼成 flatMap().flatMap().flatMap() 形狀。
用於后面的 API 只使用前面 API 輸出結果的情況。

	public static Mono<Integer> monoFlat(Integer a) {

		return Mono.defer(() -> {
			if (a == null) {
				throw new IllegalArgumentException("a is null");
			}
			return Mono.just(a);
		}).flatMap(data -> Mono.just(data * 2))
		.flatMap(data -> Mono.just(data + 100));
	}

	monoFlat(1).log().subscribe();
/*
00:15:17.005 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
00:15:17.164 [main] INFO reactor.Mono.FlatMap.1 - | onSubscribe([Fuseable] MonoFlatMap.FlatMapMain)
00:15:17.168 [main] INFO reactor.Mono.FlatMap.1 - | request(unbounded)
00:15:17.171 [main] INFO reactor.Mono.FlatMap.1 - | onNext(102)
00:15:17.173 [main] INFO reactor.Mono.FlatMap.1 - | onComplete()
*/

嵌套模式(nested pattern)

對於后面的 API 需要使用多個前面 API 輸出結果的情況,可以使用嵌套模式。
在嵌套模式中,后面的 API 可以直接使用前面 API 的結果。

	public static Mono<Integer> monoNested(Integer a, Integer b) {
		// return a * 100 + b * 100
		return Mono.defer(() -> {
			if (a == null) {
				throw new IllegalArgumentException("a is null");
			}
			return Mono.just(a * 100).flatMap(o1 -> {
				if (b == null) {
					throw new IllegalArgumentException("a is null");
				}

				return Mono.just(b * 100).map(
						// 在這里可以同時使用 o1 和 o2
						o2 -> o1 + o2);
			});
		});
	}

monoNested(1, 2).log().subscribe();

/*
00:22:43.816 [main] INFO reactor.Mono.Defer.2 - onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
00:22:43.817 [main] INFO reactor.Mono.Defer.2 - request(unbounded)
00:22:43.817 [main] INFO reactor.Mono.Defer.2 - onNext(300)
00:22:43.818 [main] INFO reactor.Mono.Defer.2 - onComplete()
*/

拉鏈模式(zip pattern)

對於后面的 API 需要使用多個前面 API 輸出結果的情況,可以使用拉鏈模式。
在拉鏈模式中,后面的 API 可以通過參數獲取前面 API 的結果。

	public static Mono<Integer> monoZip(Integer a, Integer b) {
		// return a * 100 + b * 100
		return Mono.zip(
				Mono.defer(() -> {
					if (a == null) {
						throw new IllegalArgumentException("a is null");
					}
					return Mono.just(a * 100);
				}),
				Mono.defer(() -> {
					if (b == null) {
						throw new IllegalArgumentException("b is null");
					}
					return Mono.just(b * 100);
				}), (o1, o2) -> o1 + o2);
	}

	monoZip(1, 2).log().subscribe();

/*
00:32:22.326 [main] INFO reactor.Mono.Zip.3 - onSubscribe([Fuseable] MonoZip.ZipCoordinator)
00:32:22.326 [main] INFO reactor.Mono.Zip.3 - request(unbounded)
00:32:22.327 [main] INFO reactor.Mono.Zip.3 - onNext(300)
00:32:22.328 [main] INFO reactor.Mono.Zip.3 - onComplete()
*/

原子模式(atomic pattern)

拉鏈模式和嵌套模式都不能處理 null 值,原子模式可以。
注意下面示例中的 return Mono.just(0) 可以確保不會忽略 null 值的情況。

	public static Mono<Integer> monoAtomic(Integer a, Integer b) {
		AtomicReference<Integer> a100Ref = new AtomicReference<>(0);
		AtomicReference<Integer> b100Ref = new AtomicReference<>(0);

		// return a * 100 + b * 100
		return Mono.defer(() -> {
			if (a == null) {
				a100Ref.set(null);
			} else {
				a100Ref.set(a * 100);
			}

			return Mono.just(0);
		}).flatMap(o -> {
			if (b == null) {
				b100Ref.set(null);
			} else {
				b100Ref.set(b * 100);
			}
			return Mono.just(0);
		}).map(o -> {
			if (a100Ref.get() == null || b100Ref.get() == null) {
				return 0;
			}
			return a100Ref.get() + b100Ref.get();
		});
	}

monoAtomic(1, 2).log().subscribe();
/*
11:03:46.162 [main] INFO reactor.Mono.Map.4 - onSubscribe(FluxMap.MapSubscriber)
11:03:46.163 [main] INFO reactor.Mono.Map.4 - request(unbounded)
11:03:46.163 [main] INFO reactor.Mono.Map.4 - onNext(0)
11:03:46.164 [main] INFO reactor.Mono.Map.4 - onComplete()
*/

null 對象模式(null pattern)

我們還可以使用默認值來處理 null 值的情況。
在處理 null 值時,一個常見的需求是:

在一個 lambda 閉包中:

  • 可以知道這個值是 null 還是非 null。
  • 可以獲取這個值。
  • 可以調用並返回一個新的響應式對象(發布者)

一個技巧是使用 .defaultIfEmpty() 方法來處理 null 值。
這個技巧對於數值或者 String 類型的值可能有效的,但是對於類實例就不好用了。
在這種情況下,可以考慮定義一個接口。

public interface Nullable {
	boolean isNone();
}

package demo.reactive;

public class Employee implements Nullable {

    private static final Employee none = new Employee(true);

    public static Employee none() {
        return none;
    }

    private Employee(boolean isNone) {
        this.isNone = isNone;
    }

    private Employee() {
    }

    private int id;
    private String name;
    private boolean isNone;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public boolean isNone() {
        return isNone;
    }

    public void setNull(boolean isNone) {
        this.isNone = isNone;
    }

}

	public static Mono<Employee> monoNullable() {

		// return nullable object
		return Mono.defer(() -> {
			return Mono.<Employee>empty();
		}).defaultIfEmpty(Employee.none());

	}

monoNullable().map(o -> {
			logger.info(MessageFormat.format("map.isNone: {0}", o.isNone()));
			return o;
		}).doOnSuccess(o -> {
			logger.info(MessageFormat.format("doOnSuccess: {0}", o));
		}).log().subscribe();

/*
18:28:06.789 [main] INFO reactor.Mono.PeekTerminal.1 - | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
18:28:06.794 [main] INFO reactor.Mono.PeekTerminal.1 - | request(unbounded)
18:28:06.796 [main] INFO demo.ReactiveDemo - map.isNone: true
18:28:06.796 [main] INFO demo.ReactiveDemo - doOnSuccess: demo.reactive.Employee@120d6fe6
18:28:06.797 [main] INFO reactor.Mono.PeekTerminal.1 - | onNext(demo.reactive.Employee@120d6fe6)
18:28:06.799 [main] INFO reactor.Mono.PeekTerminal.1 - | onComplete()
*/

返回模式

下面是常見的返回模式。

Mono.empty();
Mono.then();
Mono.then(mono);
Mono.thenReturn(data);

參照


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM