| Markdown版本筆記 | 我的GitHub首頁 | 我的博客 | 我的微信 | 我的郵箱 |
|---|---|---|---|---|
| MyAndroidBlogs | baiqiantao | baiqiantao | bqt20094 | baiqiantao@sina.com |
RxJava【變換】操作符 map flatMap concatMap buffer MD
demo地址
參考
目錄
變換操作符
map cast
flatMap concatMap switchMap flatMapIterable
使用 flatMap 化解循環嵌套
concatMap
switchMap
使用 flatMap 化解接口嵌套
flatMapIterable
buffer
buffer(count)
buffer(count, skip)
buffer(timespan, unit)
buffer(timespan, unit, count)
scan
groupBy
window
window(count)
window(count, skip)
window(timespan, unit)
window(timespan, unit, count)
變換操作符
常用的變換操作符
- map、cast:【數據類型轉換】將Observable發送的事件轉換為另一種類型的事件
- flatMap、concatMap、switchMap、flatMapIterable:扁平化
- flatMap:【化解循環嵌套和接口嵌套】將Observable發送的事件序列進行拆分 & 轉換 后合並成一個新的事件序列,最后再進行發送
- concatMap:【有序】與 flatMap 的 區別在於,拆分 & 重新合並生成的事件序列 的順序與被觀察者舊序列生產的順序一致
- switchMap:當原始Observable發射一個新的數據時,它將取消訂閱並停止監視產生執之前那個數據的Observable,只監視當前這一個
- flatMapIterable:相當於對 flatMap 的數據進行了二次扁平化
- buffer:【打包】定期從Observable發送的事件中獲取一定數量的事件並放到緩存區中,然后把這些數據集合打包發射
- scan:【連續】對Observable發射的每一項數據應用一個函數,然后按順序依次發射每一個值
- groupBy:【分組】將一個Observable分拆為一些Observables集合,它們中的每一個發射原始Observable的一個子序列
- window:定期將來自Observable的數據分拆成一些Observable窗口,然后發射這些窗口,而不是每次發射一項
map cast
Map操作符對Observable發射的每一項數據應用一個函數,執行變換操作
Map操作符對原始Observable發射的每一項數據應用一個你選擇的函數,然后返回一個發射這些結果的Observable。
Observable.just(new Date()) // Date 類型
.map(Date::getTime) // long 類型
.map(time -> time + 1000 * 60 * 60)// 改變 long 類型時間的值
.map(time -> new SimpleDateFormat("HH:mm:ss", Locale.getDefault()).format(new Date(time))) //String 類型
.subscribe(this::log);
cast操作符將原始Observable發射的每一項數據都強制轉換為一個指定的類型,然后再發射數據,它是map的一個特殊版本。
Observable.just(28)
.cast(Number.class)
.subscribe(number -> log(number.getClass().getSimpleName())); //Integer
flatMap concatMap switchMap flatMapIterable
flatMap 將一個發射數據的Observable變換為多個Observables,然后將它們發射的數據合並后放進一個單獨的Observable
flatMap 操作符使用一個指定的函數對原始Observable發射的每一項數據執行變換操作,這個函數返回一個本身也發射數據的Observable,然后FlatMap合並這些Observables發射的數據,最后將合並后的結果當做它自己的數據序列發射。
注意:flatMap 對這些Observables發射的數據做的是合並(merge)操作,因此它們可能是交錯的。
flatMap 有很多個重載方法。
flatMap(Function mapper)
flatMap(Function mapper, boolean delayErrors)
flatMap(Function mapper, boolean delayErrors, int maxConcurrency)
flatMap(Function mapper, boolean delayErrors, int maxConcurrency, int bufferSize)
flatMap(Function onNextMapper, Function onErrorMapper, Callable onCompleteSupplier)
flatMap(Function onNextMapper, Function onErrorMapper, Callable onCompleteSupplier, int maxConcurrency)
flatMap(Function mapper, int maxConcurrency)
flatMap(Function mapper, BiFunction resultSelector)
flatMap(Function mapper, BiFunction combiner, boolean delayErrors)
flatMap(Function mapper, BiFunction combiner, boolean delayErrors, int maxConcurrency)
flatMap(Function mapper, BiFunction combiner, boolean delayErrors, int maxConcurrency, int bufferSize)
flatMap(Function mapper, BiFunction combiner, int maxConcurrency)
使用 flatMap 化解循環嵌套
Observable.just(new Person(Arrays.asList("籃球", "足球", "排球")), new Person(Arrays.asList("畫畫", "跳舞")))
.map(person -> person.loves)
.flatMap(Observable::fromIterable) //fromIterable:逐個發送集合中的元素
.subscribe(this::log);
籃球,22:56:43 009,true
足球,22:56:43 010,true
排球,22:56:43 010,true
畫畫,22:56:43 011,true
跳舞,22:56:43 012,true
注意:如果任何一個通過這個 flatMap 操作產生的單獨的 Observable 調用 onError 異常終止了,這個 Observable 自身會立即調用 onError 並終止。例如:
Observable.just(new Person(Arrays.asList("籃球", null, "排球")), new Person(Arrays.asList("畫畫", "跳舞"))) ...
籃球,00:20:14 762,true
onError:The iterator returned a null value,00:20:14 767,true
concatMap
concatMap 操作符的功能和 flatMap 非常相似,只不過經過 flatMap 操作變換后,最后輸出的序列有可能是交錯的(flatMap最后合並結果采用的是 merge 操作符),而 concatMap 最終輸出的數據序列和原數據序列是一致的。
flatMap:
long start = System.currentTimeMillis();
Observable.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5))
.flatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//flatMap是無序的
.subscribe((i -> log("f:" + i)), e -> log("f"), () -> log("f耗時" + (System.currentTimeMillis() - start))); //3秒
f:4,23:21:07 944,false //flatMap后,訂閱者首先接收到的事件是【4】而不是【1】
f:5,23:21:07 945,false
f:1,23:21:08 942,false
f:2,23:21:08 943,false
f:3,23:21:08 943,false
f耗時3025,23:21:08 945,false //flatMap耗時3秒
concatMap:
Observable.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5))
.concatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//concatMap是有序的
.subscribe(i -> log("c:" + i), e -> log("c"), () -> log("c耗時" + (System.currentTimeMillis() - start))); //5秒
c:1,14:27:27 450,false //concatMap后,訂閱者首先接收到的事件是【1】
c:2,14:27:27 451,false
c:3,14:27:27 451,false
c:4,14:27:29 454,false
c:5,14:27:29 455,false
c耗時5021,14:27:29 455,false //concatMap耗時5秒
switchMap
switchMap 和 flatMap 很像,除了一點:當源Observable發射一個新的數據項時,如果舊數據項訂閱還未完成,就取消舊訂閱數據和停止監視那個數據項產生的Observable,開始監視新的數據項。
It behaves much like flatMap, except that whenever a new item is emitted by the source Observable, it will unsubscribe to and stop mirroring鏡像 the Observable that was generated from the previously-emitted item, and begin only mirroring the current one.
如果是在同一線程產生數據,因為當第二個數據項來臨時,第一個已經完成了,此時其和 concatMap 是完全一致的。
如果是並發產生數據項,當第二個數據項來臨時,如果前一個任務尚未執行結束,就會被后一個任務給取消。
Observable.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5))
.switchMap(Observable::fromIterable)
.subscribeOn(Schedulers.newThread()) //與這里的線程無關
.subscribe(i -> log("s:" + i)); //1, 2, 3,4, 5
Observable.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5))
.switchMap(list -> Observable.fromIterable(list).subscribeOn(Schedulers.newThread())) //只與這里的線程有關
.subscribeOn(AndroidSchedulers.mainThread()) //與這里的線程無關
.observeOn(AndroidSchedulers.mainThread()) //與這里的線程無關
.subscribe(i -> log("s:" + i)); //4, 5
Observable.range(1, 8)
.switchMap(i -> Observable.just(i).subscribeOn(Schedulers.newThread())) //只與這里的線程有關
.subscribe(i -> log("s:" + i)); //8
s:1,15:05:23 537,false
s:2,15:05:23 537,false
s:3,15:05:23 538,false
s:4,15:05:23 538,false
s:5,15:05:23 539,false
s:4,15:05:21 013,true
s:5,15:05:21 014,true
s:8,15:05:22 669,false
使用 flatMap 化解接口嵌套
可以利用 flatMap 操作符實現網絡請求依次依賴,即:第一個接口的返回值包含第二個接口請求需要用到的數據。
首先是兩個請求網絡的操作:
private Observable<String> firstRequest(String parameter) {
return Observable.create(emitter -> {
SystemClock.sleep(2000);//模擬網絡請求
emitter.onNext(parameter + ",第一次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));
emitter.onComplete();
});
}
private Observable<String> secondRequest(String parameter) {
return Observable.create(emitter -> {
SystemClock.sleep(3000);//模擬網絡請求
emitter.onNext(parameter + ",第二次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));
emitter.onComplete();
});
}
然后可以通過 flatMap 將兩者串聯起來:
firstRequest("原始值:" + FORMAT.format(new Date(System.currentTimeMillis())))
.subscribeOn(Schedulers.io()) // 在io線程進行網絡請求
.observeOn(AndroidSchedulers.mainThread()) // 在主線程處理請求結果
.doOnNext(response -> log("【第一個網絡請求結束,響應為】" + response))//true
.observeOn(Schedulers.io()) // 回到 io 線程去處理下一個網絡請求
.flatMap(this::secondRequest)//實現多個網絡請求依次依賴
.observeOn(AndroidSchedulers.mainThread()) // 在主線程處理請求結果
.subscribe(string -> log("【第二個網絡請求結束,響應為】" + string));//true,5 秒
打印結果為:
【第一個網絡請求結束,響應為】原始值:23:58:11 220,第一次修改:23:58:13 245,true
【第二個網絡請求結束,響應為】原始值:23:58:11 220,第一次修改:23:58:13 245,第二次修改:23:58:16 256,true
簡化形式的Demo代碼為:
Observable.just("包青天").delay(1000, TimeUnit.MILLISECONDS) //第一個網絡請求,返回姓名
.flatMap(s -> Observable.just(s + ",男").delay(1000, TimeUnit.MILLISECONDS)) //第二個網絡請求,返回性別
.flatMap(s -> Observable.just(s + ",28歲").delay(1000, TimeUnit.MILLISECONDS)) //第三個網絡請求,返回年齡
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::log); //包青天,男,28歲,耗時:3058毫秒,true
當然這種情況下使用 concatMap 的效果也是完全一樣的,然而因為 concatMap 的核心是用來保證在合並時"有序"的,而這兩種情況根本就沒涉及到合並,所以這些情況下使用 concatMap是沒有任何意義的。
flatMapIterable
flatMapIterable這個變體成對的打包數據,然后生成Iterable而不是原始數據和生成的Observables,但是處理方式是相同的。
flatMapIterable 返回一個Observable,它將源ObservableSource發出的每個項目與 the values in an Iterable corresponding to that item that is generated by a selector 合並[merges]。
public final <U> Observable<U> flatMapIterable(final Function<? super T, ? extends Iterable<? extends U>> mapper)
參數 mapper:一個在源 ObservableSource 發出指定項時,返回一個 Iterable 值序列的函數。
案例1:
Observable.just(Arrays.asList("籃球1", "足球1"))
.flatMap(Observable::fromIterable) //返回一個 Observable
.subscribe(string -> log("" + string));
Observable.just(Arrays.asList("籃球2", "足球2"))
.flatMapIterable(list -> list) //返回一個 Iterable 而不是另一個 Observable
.subscribe(string -> log("" + string));
Observable.fromIterable(Arrays.asList("籃球3", "足球3")) //和上面兩種方式的結果一樣
.subscribe(string -> log("" + string));
籃球1,01:00:39 493,true
足球1,01:00:39 494,true
籃球2,01:00:39 496,true
足球2,01:00:39 496,true
籃球3,01:00:39 499,true
足球3,01:00:39 499,true
案例2:
Observable.just(new Person(Arrays.asList("包青天", "哈哈")), new Person(Arrays.asList("白乾濤", "你好")))
.map(person -> person.loves)
.flatMap(Observable::fromIterable) //返回一個 Observable
.flatMap(string -> Observable.fromArray(string.toCharArray())) //返回一個 Observable
.subscribe(array -> log(Arrays.toString(array)));
Observable.just(new Person(Arrays.asList("廣州", "上海")), new Person(Arrays.asList("武漢", "長沙")))
.map(person -> person.loves)
.flatMap(Observable::fromIterable) //返回一個 Observable
.flatMapIterable(string -> Arrays.asList(string.toCharArray())) //返回一個 Iterable 而不是另一個 Observable
.subscribe(array -> log(Arrays.toString(array)));
Observable.just(new Person(Arrays.asList("你妹", "泥煤")), new Person(Arrays.asList("你美", "你沒")))
.map(person -> person.loves)
.flatMapIterable(list -> {
List<char[]> charList = new ArrayList<>();
for (String string : list) {
charList.add(string.toCharArray());
}
return charList; //返回一個 Iterable 而不是另一個 Observable
}
).subscribe(array -> log(Arrays.toString(array)));
[包, 青, 天],21:48:37 917,true
[哈, 哈],21:48:37 919,true
[白, 乾, 濤],21:48:37 921,true
[你, 好],21:48:37 921,true
[廣, 州],21:48:37 925,true
[上, 海],21:48:37 926,true
[武, 漢],21:48:37 926,true
[長, 沙],21:48:37 927,true
[你, 妹],21:48:37 929,true
[泥, 煤],21:48:37 929,true
[你, 美],21:48:37 930,true
[你, 沒],21:48:37 931,true
buffer
定期收集Observable的數據放進一個數據包裹,然后發射這些數據包裹,而不是一次發射一個值。
Buffer操作符將一個Observable變換為另一個,原來的Observable正常發射數據,變換產生的Observable發射這些數據的緩存集合。
注意:如果原來的Observable發射了一個onError通知,Buffer會立即傳遞這個通知,而不是首先發射緩存的數據,即使在這之前緩存中包含了原始Observable發射的數據[without first emitting the buffer it is in the process of assembling]。
Window操作符與Buffer類似,但是它在發射之前把收集到的數據放進單獨的Observable,而不是放進一個數據結構。
在RxJava中有許多Buffer的變體:
buffer(ObservableSource openingIndicator, Function closingIndicator)
buffer(ObservableSource openingIndicator, Function closingIndicator, Callable bufferSupplier)
buffer(count)
buffer(count)以列表(List)的形式發射非重疊的緩存,每一個緩存至多包含來自原始Observable的count項數據(最后發射的列表數據可能少於count項)。
每接收到 count 個數據包裹,將這 count 個包裹打包,發送給訂閱者
一次訂閱2個:
Observable.range(1, 5)
.buffer(2) //緩存區大小,步長==緩存區大小,等價於buffer(count, count)
.subscribe(list -> log(list.toString()), t -> log(""), () -> log("完成")); //[1, 2],[3, 4],[5],完成
一次全部訂閱(將所有元素組裝到集合中的效果):
Observable.range(1, 10)
.buffer(10)
.subscribe(list -> log(list.toString())); //[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
buffer(count, skip)
buffer(count, skip)從原始Observable的第一項數據開始創建新的緩存,此后每當收到skip項數據,用count項數據填充緩存:開頭的一項和后續的count-1項,它以列表(List)的形式發射緩存,取決於count和skip的值,這些緩存可能會有重疊部分(比如skip < count時),也可能會有間隙(比如skip > count時)。
生成的ObservableSource每隔 skip 項就會 emits buffers,每個 buffers 都包含 count 個 items。
隊列效果(先進先出):
Observable.range(1, 5)
.buffer(3, 1) // 緩存區大小,步長(每次獲取新事件的數量)
.subscribe(list -> log(list.toString()));//[1, 2, 3],[2, 3, 4],[3, 4, 5],[4, 5],[5]
每次剔除一個效果:
Observable.range(1, 5).buffer(5, 1)
.subscribe(list -> log(list.toString()));//[1, 2, 3, 4, 5],[2, 3, 4, 5],[3, 4, 5],[4, 5],[5]
只取奇數個效果:
Observable.range(1, 5)
.buffer(1, 2)
.subscribe(list -> log(list.toString()));//[1],[3],[5]
buffer(timespan, unit)
buffer(timespan, unit)定期以List的形式發射新的數據,每個時間段,收集來自原始Observable的數據(從前面一個數據包裹之后,或者如果是第一個數據包裹,從有觀察者訂閱原來的Observale之后開始)。
持續收集直到指定的每隔時間后,然后發射一次並清空緩存區。
周期性訂閱多個結果:
Observable.intervalRange(0, 8, 100, 100, TimeUnit.MILLISECONDS) //從0開始發射8個
.buffer(250, TimeUnit.MICROSECONDS) //等價於 count = Integer.MAX_VALUE
.subscribe(list -> log("緩存區中事件:" + list.toString())); //[0, 1],[2, 3],[4, 5, 6],[7]
buffer(timespan, unit, count)
每當收到來自原始Observable的count項數據,或者每過了一段指定的時間后,buffer(timespan, unit, count)就以List的形式發射這期間的數據,即使數據項少於count項。
當達到指定時間【或】緩沖區中達到指定數量時發射
Observable.intervalRange(0, 8, 100, 100, TimeUnit.MILLISECONDS) //從0開始發射8個
.buffer(250, TimeUnit.MICROSECONDS, 2) //可以指定工作所在的線程
.subscribe(list -> log("緩存區中事件:" + list.toString())); //[0, 1],[],[2, 3],[],[4, 5],[6],[7]
scan
連續地對數據序列的每一項應用一個函數,然后連續發射結果

Scan操作符對原始Observable發射的第一項數據應用一個函數,然后將那個函數的結果作為自己的第一項數據發射。它將函數的結果同第二項數據一起填充給這個函數來產生它自己的第二項數據。它持續進行這個過程來產生剩余的數據序列。這個操作符在某些情況下被叫做accumulator。
Observable.range(1, 10)
.scan((i1, i2) -> i1 + i2)
.subscribe(sum -> log("" + sum)); //1,3,6,10,15,21...
Observable.just("包青天", "你好", "我是泥煤")
.scan((s1, s2) -> s1 + "," + s2)
.subscribe(s -> log("值為:" + s)); //包青天,你好,我是泥煤
groupBy
GroupBy操作符將原始Observable分拆為一些Observables集合,它們中的每一個發射原始Observable數據序列的一個子序列。
哪個數據項由哪一個Observable發射是由一個函數判定的,這個函數給每一項指定一個Key,Key相同的數據會被同一個Observable發射。
注意:groupBy將原始Observable分解為一個發射多個GroupedObservable的Observable,一旦有訂閱,每個GroupedObservable就開始緩存數據。因此,如果你忽略這些GroupedObservable中的任何一個,這個緩存可能形成一個潛在的內存泄露。因此,如果你不想觀察,也不要忽略GroupedObservable。你應該使用像take(0)這樣會丟棄自己的緩存的操作符。
如果你取消訂閱一個GroupedObservable,那個Observable將會終止。如果之后原始的Observable又發射了一個與這個Observable的Key匹配的數據,groupBy將會為這個Key創建一個新的GroupedObservable。
有一個版本的groupBy允許你傳遞一個變換函數,這樣它可以在發射結果GroupedObservable之前改變數據項。
groupBy(Function keySelector)
groupBy(Function keySelector, boolean delayError)
groupBy(Function keySelector, Function valueSelector)
groupBy(Function keySelector, Function valueSelector, boolean delayError)
groupBy(Function keySelector, Function valueSelector, boolean delayError, int bufferSize)
案例:
Observable.range(1, 5)
.groupBy(i -> "包青天" + i % 2) //返回值決定組名
.subscribe(groupedObservable ->
groupedObservable.subscribe(i -> log("組名為:" + groupedObservable.getKey() + ",值為:" + i)));
組名為:包青天1,值為:1,16:34:47 561,true
組名為:包青天0,值為:2,16:34:47 562,true
組名為:包青天1,值為:3,16:34:47 564,true
組名為:包青天0,值為:4,16:34:47 565,true
組名為:包青天1,值為:5,16:34:47 566,true
window
定期將來自原始Observable的數據分解為一個Observable窗口,發射這些窗口,而不是每次發射一項數據
Window和Buffer類似,但不是發射來自原始Observable的數據包,它發射的是Observables,這些Observables中的每一個都發射原始Observable數據的一個子集,最后發射一個onCompleted通知。
和Buffer一樣,Window有很多變體,每一種都以自己的方式將原始Observable分解為多個作為結果的Observable,每一個都包含一個映射原始數據的window。用Window操作符的術語描述就是,當一個窗口打開(when a window "opens")意味着一個新的Observable已經發射了,而且這個Observable開始發射來自原始Observable的數據;當一個窗口關閉(when a window "closes")意味着發射的Observable停止發射原始Observable的數據,並且發射終止通知onCompleted給它的觀察者們。
如果從原始Observable收到了onError或onCompleted通知它也會關閉當前窗口。
和Buffer一樣,Window有很多變體:

window(ObservableSource openingIndicator, Function closingIndicator)
window(ObservableSource openingIndicator, Function closingIndicator, int bufferSize)
以下案例中均用到以下代碼:
private Consumer<Observable<Integer>> consumer = observable -> {
SystemClock.sleep(100);
String name = new SimpleDateFormat("SSS", Locale.getDefault()).format(new Date());
log("打開了一個新的窗口 " + name); //每當當前窗口發射了count項數據,它就關閉當前窗口並打開一個新窗口
observable.subscribe(i -> {
SystemClock.sleep(100);
log("窗口 " + name + " 發射了數據:" + i);
SystemClock.sleep(100);
}, e -> log("窗口 " + name + " 異常了"), () -> log("窗口 " + name + " 關閉了"));
};
window(count)
這個window的變體立即打開它的第一個窗口。每當當前窗口發射了count項數據,它就關閉當前窗口並打開一個新窗口。
這種window變體發射一系列不重疊的窗口,這些窗口的數據集合與原始Observable發射的數據是一一對應的。
Observable.range(1, 5)
.window(2)
.subscribe(consumer);
打開了一個新的窗口 418
窗口 418 發射了數據:1
窗口 418 發射了數據:2
窗口 418 關閉了
打開了一個新的窗口 926
窗口 926 發射了數據:3
窗口 926 發射了數據:4
窗口 926 關閉了
打開了一個新的窗口 435
窗口 435 發射了數據:5
窗口 435 關閉了
window(count, skip)
這個window的變體立即打開它的第一個窗口。原始Observable每發射skip項數據它就打開一個新窗口(例如,如果skip等於3,每到第三項數據,它會打開一個新窗口)。每當當前窗口發射了count項數據,它就關閉當前窗口並打開一個新窗口。
如果skip > count,在兩個窗口之間會有skip - count項數據被丟棄。
Observable.range(20, 5)
.window(1, 2)
.subscribe(consumer); //每發射2項就打開一個新窗口,每當當前窗口發射了1項就關閉當前窗口並打開一個新窗口
打開了一個新的窗口 714
窗口 714 發射了數據:1
窗口 714 關閉了
打開了一個新的窗口 020
窗口 020 發射了數據:3
窗口 020 關閉了
打開了一個新的窗口 326
窗口 326 發射了數據:5
窗口 326 關閉了
如果skip=count,它的行為與window(count)相同
Observable.range(20, 5)
.window(2, 2)
.subscribe(consumer); //每發射2項就打開一個新窗口,每當當前窗口發射了2項就關閉當前窗口並打開一個新窗口 `
打開了一個新的窗口 867
窗口 867 發射了數據:10
窗口 867 發射了數據:11
窗口 867 關閉了
打開了一個新的窗口 376
窗口 376 發射了數據:12
窗口 376 發射了數據:13
窗口 376 關閉了
打開了一個新的窗口 885
窗口 885 發射了數據:14
窗口 885 關閉了
如果skip < count,窗口可會有count - skip 個重疊的數據;
Observable.range(20, 5)
.window(2, 1)
.subscribe(consumer); //每發射1項就打開一個新窗口,每當當前窗口發射了2項就關閉當前窗口並打開一個新窗口 `
打開了一個新的窗口 005
窗口 005 發射了數據:20
打開了一個新的窗口 310
窗口 005 發射了數據:21
窗口 310 發射了數據:21
窗口 005 關閉了
打開了一個新的窗口 818
窗口 310 發射了數據:22
窗口 818 發射了數據:22
窗口 310 關閉了
打開了一個新的窗口 327
窗口 818 發射了數據:23
窗口 327 發射了數據:23
窗口 818 關閉了
打開了一個新的窗口 836
窗口 327 發射了數據:24
窗口 836 發射了數據:24
窗口 327 關閉了
窗口 836 關閉了
window(timespan, unit)
這個window的變體立即打開它的第一個窗口。每當過了timespan這么長的時間它就關閉當前窗口並打開一個新窗口。
這種window變體發射一系列不重疊的窗口,這些窗口的數據集合與原始Observable發射的數據也是一一對應的。
Observable.range(1, 6)
.window(500, TimeUnit.MILLISECONDS)
.subscribe(consumer); `
打開了一個新的窗口 195
窗口 195 發射了數據:1
窗口 195 發射了數據:2
窗口 195 發射了數據:3
窗口 195 關閉了
打開了一個新的窗口 908
窗口 908 發射了數據:4
窗口 908 發射了數據:5
窗口 908 關閉了
打開了一個新的窗口 416
窗口 416 發射了數據:6
窗口 416 關閉了
window(timespan, unit, count)
這個window的變體立即打開它的第一個窗口。這個變體是window(count)和window(timespan, unit)的結合,每當過了timespan的時長或者當前窗口收到了count項數據,它就關閉當前窗口並打開另一個。
這種window變體發射一系列不重疊的窗口,這些窗口的數據集合與原始Observable發射的數據也是一一對應的。
Observable.range(10, 4)
.window(1000, TimeUnit.MILLISECONDS, 2)
.subscribe(consumer);
打開了一個新的窗口 725
窗口 725 發射了數據:10
窗口 725 發射了數據:11
窗口 725 關閉了
打開了一個新的窗口 233
窗口 233 發射了數據:12
窗口 233 發射了數據:13
窗口 233 關閉了
打開了一個新的窗口 250
窗口 250 關閉了
案例2:
Observable.range(10, 3)
.window(200, TimeUnit.MILLISECONDS, 2)
.subscribe(consumer); `
打開了一個新的窗口 696
窗口 696 發射了數據:10
窗口 696 關閉了
打開了一個新的窗口 001
窗口 001 發射了數據:11
窗口 001 關閉了
打開了一個新的窗口 306
窗口 306 關閉了
打開了一個新的窗口 408
窗口 408 發射了數據:12
窗口 408 關閉了
打開了一個新的窗口 712
窗口 712 關閉了
打開了一個新的窗口 815
窗口 815 關閉了
2018-9-18
