數據合並函數
由於業務需求有的時候需要將多個數據源進行合並,Reactor提供了concat方法和merge方法:
- concat
public static <T> Flux<T> concat(Iterable<? extends Publisher<? extends T>> sources)
連接兩個Flux, 連接由源下游發射的迭代轉發元素提供的所有源。
通過順序訂閱第一個源,然后在訂閱下一個源之前等待它完成,等等,直到最后一個源完成,從而實現連接。任何錯誤立即中斷序列並被轉發到下游。
- concatWithValues
@SafeVarargs
public final Flux<T> concatWithValues(T... values)
將值連接到Flux的末尾。
- concatDelayError
public static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources)
將從父Publisher發出的ONNEXT信號連接到所有源,轉發由下游源發出的元素。
通過順序訂閱第一個源,然后在訂閱下一個源之前等待它完成,等等,直到最后一個源完成,從而實現連接。錯誤不會中斷主序列,但是在其余的源有機會被連接之后被傳播。
此操作符在取消時丟棄內部排隊的元素以產生背壓。
- merge
public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source)
將由Publisher發出的Publisher序列的數據合並為交織合並序列。與concat不同的是,內部source踴躍競爭。
返回一個合並的Flux
- mergeOrdered
@SafeVarargs
public static <I extends Comparable<? super I>> Flux<I> mergeOrdered(Publisher<? extends I>... sources)
通過從每個源(由它們的自然順序定義)中選擇最小值,將來自提供的Publisher序列的數據合並成有序的合並序列。這不是一個SORT(),因為它不考慮整個序列。
相反,該操作符只考慮來自每個源的一個值,並選擇所有這些值中最小的值,然后為選擇的源補充槽。
返回 一個合並Flux,但保持原始排序的合並Flux
從圖中可以很清楚的看出這兩種合並方法的不同:
-
concat是合並的flux,按照順序分別運行,flux1運行完成以后再運行flux2
-
merge是同時運行,根據時間先后運行
下面對concat和merge相關的方法進行測試,先准備測試數據:
private Flux<Integer> flux1() {
return Flux.range(1,4);
}
private Flux<Integer> flux2() {
return Flux.range(5,8);
}
private Flux<String> hotFlux1() {
return flux1().map(i-> "[1]"+i).delayElements(Duration.ofMillis(10));
}
private Flux<String> hotFlux2() {
return flux2().map(i-> "[2]"+i).delayElements(Duration.ofMillis(4));
}
concat相關方法
concat代碼演示
@Test
public void concatTest() throws InterruptedException {
Flux.concat(hotFlux1(), hotFlux2())
.subscribe(i -> System.out.print("->"+i));
Thread.sleep(200);
}
運行結果
從結果可以看出先運行完flux1之后再運行flux2
concatWith方法
用法和concat基本相同,寫法略有不同:
@Test
public void concatWithTest () {
flux1().concatWith(flux2())
.log()
.subscribe();
}
merge相關方法
merge代碼演示
@Test
public void mergeTest() throws InterruptedException {
Flux.merge(hotFlux1(), hotFlux2())
.subscribe(i -> System.out.print("->"+i));
Thread.sleep(200);
}
運行結果
很明顯順序和concat的區別,是按照時間先后執行
mergeWith用法
用法和merge相同,寫法不同而已
@Test
public void mergeWithTest() throws InterruptedException {
hotFlux1().mergeWith(hotFlux2())
.subscribe(i -> System.out.print("->"+i));
Thread.sleep(200);
}
mergeSequential用法
- 跟concat有些相似,得到的結果類似
- 跟concat不同在於,訂閱的源是hot型,接收數據后根據訂閱順序重新排序
@Test
public void mergeSequentialTest() throws InterruptedException {
Flux.mergeSequential(hotFlux1(), hotFlux2())
.subscribe(i -> System.out.print("->"+i));
Thread.sleep(200);
}
結果和concat的一樣都是
mergeOrdered用法
合並接收之后再排序
@Test
public void mergeOrderedTest() throws InterruptedException {
Flux.mergeOrdered(Comparator.reverseOrder(), hotFlux1(), hotFlux2())
.subscribe(i -> System.out.print("->"+i));
Thread.sleep(200);
}
combineLatest用法
- 跟concat和merge不同該方法是將多個源的最后得到元素通過函數進行融合的到新的值
代碼示例
@Test
public void combineLatestTest() throws InterruptedException {
Flux.combineLatest(hotFlux1(), hotFlux2(), (v1, v2) -> v1 + ":" + v2)
.subscribe(i -> System.out.print("->"+i));
Thread.sleep(200);
}
運行結果
- 結果都是flux1和flux2中元素進行融合之后的元素
轉換操作函數
- first
@SafeVarargs
public static <I> Flux<I> first(Publisher<? extends I>... sources)
選擇第一個Publisher發出任何信號(onNext/onError/onComplete)並重放來自該Publisher的所有信號,有效地表現得像這些競爭源中最快的一個。
返回一個新的Flux,其性能最快。
- switchOnNext
public static <T> Flux<T> switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers)
創建一個Flux,該鏡像反映最近發布的Publisher,轉發其數據直到源代碼中的新Publisher進入。
一旦源中沒有新的Publisher(源已完成),並且最后一個鏡像Publisher也已完成,則生成的Flux將完成。
- zip
public static <T1,T2,O> Flux<O> zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
BiFunction<? super T1,? super T2,? extends O> combinator)
將兩個源壓縮在一起,也就是說,等待所有源發出一個元素,並將這些元素組合成一個輸出值(由提供的組合器構造)。操作將繼續這樣做,直到任何來源完成。錯誤將立即被轉發。這種“Step-Merge”處理在分散聚集場景中特別有用。
- all
public final Mono<Boolean> all(Predicate<? super T> predicate)
如果這個序列的所有值與謂詞匹配,則發出一個布爾布爾值true 。
該實現使用短路邏輯,如果謂詞與值不匹配,則用FALSE完成。
- any
public final Mono<Boolean> any(Predicate<? super T> predicate)
如果這個Flux序列的任何值與謂詞匹配,則發出一個布爾布爾值true。
該實現使用短路邏輯,如果任何值與謂詞不匹配,則完成FALSE。
- as
public final <P> P as(Function<? super Flux<T>,P> transformer)
將此Flux轉換為目標類型。
- buffer
public final Flux<List<T>> buffer()
將所有傳入的值收集到一個列表緩沖器中,一旦該Flux完成,將由返回的Flux發出。
該操作在數據信號觸發的取消或錯誤時丟棄緩沖器。
- cache
public final Flux<T> cache()
將此Flux量轉換為熱源,並為進一步的用戶緩存最后發射的信號。將保留一個無限量的OnNeXT信號。完成和錯誤也將被重放。
返回一個緩存的 Flux
- cast
public final <E> Flux<E> cast(Class<E> clazz)
將產生的Flux類型轉換為目標產生類型。
- collect
public final <E> Mono<E> collect(Supplier<E> containerSupplier,
BiConsumer<E,? super T> collector)
通過應用收集器BiConsumer獲取容器和每個元素,將此Flux發出的所有元素收集到用戶定義的容器中。收集的結果將在這個序列完成時發出。
- collectList
public final Mono<List<T>> collectList()
將此Flux所發射的所有元素收集到序列完成時由結果Mono發出的列表。
- defaultIfEmpty
public final Flux<T> defaultIfEmpty(T defaultV)
如果沒有任何數據完成此序列,則提供默認的唯一值
- distinct
public final Flux<T> distinct()
對於每一個Subscriber,跟蹤已經從這個Flux 跟蹤元素和過濾出重復。
值本身被記錄到一個用於檢測的哈希集中。如果希望使用distinct(Object::hashcode)更輕量級的方法,該方法不保留所有對象,但是更容易由於hashcode沖突而錯誤地認為兩個元素是不同的。
- doOnCancel
public final Flux<T> doOnCancel(Runnable onCancel)
當Flux被取消時觸發附加行為(side-effect)。
- doOnComplete
public final Flux<T> doOnComplete(Runnable onComplete)
當Flux完成時觸發附加行為(side-effect)。
- doOnNext
public final Flux<T> doOnNext(Consumer<? super T> onNext)
當Flux發射一個項目時附加行為(side-effect)。
- filter
public final Flux<T> filter(Predicate<? super T> p)
根據給定謂詞評估每個源值。如果謂詞測試成功,則發出該值。如果謂詞測試失敗,則忽略該值,並在上游生成請求1。
- log
public final Flux<T> log()
觀察所有 Reactive Streams信號並使用記錄器支持跟蹤它們。默認值將使用Level.INFO和 java.util.logging日志記錄。如果SLF4J是可用的,它將被用來代替。
- map
public final <V> Flux<V> map(Function<? super T,? extends V> mapper)
通過對每個項目應用同步功能來轉換由該Flux發出的項目。
- then
public final <V> Mono<V> then(Mono<V> other)
讓這個Flux完成,然后播放信號提供的Mono。
換句話說,忽略這個Flux和轉換完成信號的發射和提供Mono完成信號。在得到的Mono中重放錯誤信號。