Rxjava2 Observable的數據變換詳解及實例(一)


簡要:

需求了解:

對於 Observable 發射的數據有的時候可能不滿足我們的要求,或者需要轉化為其他類型的數據,比如:緩存,數據類型轉化,數據攔截等。此時可以使用 Rx 中的一些對於數據操作的操作進行數據的變換,方便我們的開發。

執行變換的操作方法:

  • Buffer:它定期從Observable收集數據到一個集合,然后把這些數據集合打包發射,而不是一次發射一個
  • Map:對序列的每一項都應用一個函數來變換Observable發射的數據序列
  • FlatMap,FlatMapIterable,ConcatMap:將Observable發射的數據集合變換為Observables集合,然后將這些Observable發射的數據平鋪化的放進一個單獨的 Observable
  • SwitchMap:將Observable發射的數據集合變換為Observables集合,然后只發射這些Observables最近發射的數據
  • Window:定期將來自Observable的數據分拆成一些Observable窗口,然后發射這些窗口,而不是每次發射一項
  • GroupBy:將Observable分拆為Observable集合,將原始Observable發射的數據按 Key
    分組,每一個Observable發射一組不同的數據
  • Scan:對Observable發射的每一項數據應用一個函數,然后按順序依次發射每一個值
  • Cast:在發射之前強制將Observable發射的所有數據轉換為指定類型

1. Buffer

定期收集Observable的數據放進一個數據包裹(緩存),然后發射這些數據包裹,而不是一次發射一個值。

img-buffer

Buffer 操作符將一個Observable變換為另一個,原來的Observable正常發射數據,變換產生 的Observable發射這些數據的緩存集合。 Buffer 操作符在很多語言特定的實現中有很多種變 體,它們在如何緩存這個問題上存在區別。

Window 操作符與 Buffer 類似,但是它在發射之前把收集到的數據放進單獨的Observable, 而不是放進一個數據結構。

注意: 如果原來的Observable發射了一個 onError 通知, Buffer 會立即傳遞這個通知,而不是首先發射緩存的數據,即使在這之前緩存中包含了原始Observable發射的數據。

在RxJava中的一些 Buffer 的操作如下:

1.1 buffer(count)

img-buffer(count)

以列表(List)的形式發射非重疊的緩存,每一個緩存至多包含來自原始 Observable 的 count 項數據(最后發射的列表數據可能少於count項)。

實例代碼:

	// 1. buffer(count) 	
	// 以列表(List)的形式發射非重疊的緩存,
	// 每一個緩存至多包含來自原始 Observable的count項數據(最后發射的列表數據可能少於count項)
	Observable.range(1, 10)
		.buffer(3)
		.subscribe(new Consumer<List<Integer>>() {

			@Override
			public void accept(List<Integer> t) throws Exception {
				System.out.println("--> bufferr(1) accept: " + t);
			}
		});

輸出:

--> bufferr(1) accept: [1, 2, 3]
--> bufferr(1) accept: [4, 5, 6]
--> bufferr(1) accept: [7, 8, 9]
--> bufferr(1) accept: [10]

Javadoc: buffer(count)

1.2 buffer(boundary)

開始創建一個List 收集原始 Observable 數據,監視一個名叫 boundary 的Observable,每當這個Observable發射了一個值,它就創建一個新的 List 開始收集來自原始Observable的數據並發射原來已經收集數據的 List, 當 boundary Observable 發送了完成通知,會將此時還未發送的 List 發送。

注意: 所有發送的 List 可能沒有收集到數據,此時數據的收集可能並不會完整收集所有原始 Observable 數據。

img-buffer(boundary)

實例代碼:

	// 2. buffer(boundary) 監視一個名叫boundary的Observable,
	// 開始創建一個List收集原始 Observable 數據,監視一個名叫boundary的Observable,
	// 每當這個Observable發射了一個值,它就創建一個新的List開始收集來自原始Observable的數據並發射原來已經收集數據的List,
	// 當boundary發送了完成通知,會將此時還未發送的 List 發送。 
	// 所有發送的 List 可能沒有收集到數據,此時數據的收集可能並不會完整收集所有原始Observable數據。
	Observable.range(1, 10000)
		.buffer(Observable.timer(1, TimeUnit.MILLISECONDS)) 		// 1毫秒后開始接受原始數據
		.subscribe(new Consumer<List<Integer>>() {

			@Override
			public void accept(List<Integer> t) throws Exception {
				System.out.println("--> accept(2): " + t.size());	// 每次收集的數據序列個數
			}
		});

輸出:

--> accept(2): 2858
--> accept(2): 5471

Javadoc: buffer(boundary)

1.3 buffer(count, skip)

從原始Observable的第一項數據開始創建新的緩存,此后每當收 到 skip 項數據,用 count 項數據填充緩存:開頭的一項和后續的 count-1 項,它以列表 (List)的形式發射緩存,取決於 count 和 skip 的值,這些緩存可能會有重疊部分(比如skip < count時),也可能會有間隙(比如skip > count時)。

img-buffer(count, skip)

解析: 在指定的數據序列中移動指針來獲取緩存數據:指針每次移動 skip 個數據長度,每次緩存指針位置及后面count個數據,指針初始位置在原始數據的第一個(存在的情況下)。

實例代碼:

	// 3. buffer(int count, int skip)
	// 在指定的數據中移動指針來獲取緩存數據:指針每次移動1個數據長度,每次緩存3個數據
	Observable.range(1, 5)
		.buffer(3, 1)
		.subscribe(new Consumer<List<Integer>>() {

			@Override
			public void accept(List<Integer> t) throws Exception {
				System.out.println("--> bufferr(3) accept: " + t);
			}
		});

輸出:

--> bufferr(3) accept: [1, 2, 3]
--> bufferr(3) accept: [2, 3, 4]
--> bufferr(3) accept: [3, 4, 5]
--> bufferr(3) accept: [4, 5]
--> bufferr(3) accept: [5]

Javadoc: buffer(count, skip)

1.4 buffer(timespan, TimeUnit)

定期以 List 的形式發射新的數據,在每個時間段,收集來自原始 Observable 的數據(從前面一個數據包裹之后,或者如果是第一個數據包裹,從有觀察者訂閱原來的 Observale 之后開始)。還有另一個版本的 buffer 接受一個 Scheduler 參數。

img-buffer(timespan,TimeUnit)
解析: 每隔 timespan 時間段以 List 的形式收集原始Observable的數據

實例代碼:

	// 4. buffer(long timespan, TimeUnit unit)
	// 每隔timespan時間段以list的形式收集數據
	Observable.range(1, 50000)
		.buffer(1, TimeUnit.MILLISECONDS)									// 每隔1毫秒收集一次原始序列數據
		.subscribe(new Consumer<List<Integer>>() {

			@Override
			public void accept(List<Integer> t) throws Exception {
				System.out.println("--> bufferr(4) accept: " + t.size());	// 每次收集的數據序列個數
			}
		});

輸出:

--> bufferr(4) accept: 2571
--> bufferr(4) accept: 5457
--> bufferr(4) accept: 13248
--> bufferr(4) accept: 12755
--> bufferr(4) accept: 9543
--> bufferr(4) accept: 6426

注意: buffer(timespan,TimeUnit) 默認情況下會使用 computation 調度器
Javadoc: buffer(timespan,TimeUnit)
Javadoc: buffer(timespan,TimeUnit,Scheduler)

1.5 buffer(timespan, TimeUnit, count)

每當收到來自原始 Observablecount 項數據,或者每過了一段指定 timespan 時間后, 就以 List 的形式發射這期間的數據,即使數據項少於 count 項。還有另一個版本的 buffer 接受一個 Scheduler 參數。

img-buffer(timespan, TimeUnit, count)

實例代碼:

	// 5. buffer(long timespan, TimeUnit unit, int count)
	// 每隔1毫秒緩存50個數據
	Observable.range(1, 1000)
		.buffer(1, TimeUnit.MILLISECONDS, 50)								// 每隔1毫秒收集50個數據序列
		.subscribe(new Consumer<List<Integer>>() {

			@Override
			public void accept(List<Integer> t) throws Exception {
				System.out.println("--> bufferr(5) accept: " + t.size());	// 每次收集的數據序列個數
			}
		});

輸出:

--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 20
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 4
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 26

注意: buffer(timespan, TimeUnit, count) 默認情況下會使用 computation 調度器
Javadoc: buffer(timespan, TimeUnit, count)
Javadoc: buffer(timespan, TimeUnit, scheduler, count)

1.6 buffer(timespan, timeskip, TimeUnit)

在每一個 timeskip 時期內都創建一個新的 List ,然后用原始 Observable 發射的每一項數據填充這個列表(在把這個 List 當做自己的數據發射前,從創建時開始,直到過了 timespan 這么長的時間)。如果 timespan 長於 timeskip ,它發射的數據包將會重疊,因此可能包含重復的數據項。

img-buffer(imespan, timeskip, TimeUnit)

解析: 在每隔 timeskip 時間段都創建一個新的 List ,每個 List 都獨立收集 timespan 時間段原始Observable發射的數據。 因此在 timespan 長於 timeskip 時,它發射的數據包將會重疊,因此不同 List 中可能包含重復的數據項。 還有另一個版本的 buffer 接受一個 Scheduler 參數。

實例代碼:

	// 6. buffer(long timespan, long timeskip, TimeUnit unit)
	// 在每一個timeskip時期內都創建一個新的 List,
	// 每個List都獨立收集timespan時間段原始Observable發射的數據,
	// 如果 timespan 長於 timeskip,它發射的數據包將會重疊,因此不同List中可能包含重復的數據項
	Observable.range(1, 50000)
			.buffer(1, 1, TimeUnit.MILLISECONDS, Schedulers.newThread())
			.subscribe(new Consumer<List<Integer>>() {

				@Override
				public void accept(List<Integer> t) throws Exception {
					System.out.println("--> accept(6): " + t.size());	// 每次收集的數據序列個數
				}
			});

輸出:

--> accept(6): 1412
--> accept(6): 733
--> accept(6): 10431
--> accept(6): 694
--> accept(6): 18944
--> accept(6): 10710
--> accept(6): 944
--> accept(6): 6132

注意:buffer(imespan, timeskip, TimeUnit) 默認情況下會使用 computation 調度器。
Javadoc: buffer(imespan, timeskip, TimeUnit)
Javadoc: buffer(imespan, timeskip, TimeUnit, schedule)

1.7 buffer(bufferClosingSelector)

當它訂閱原來的Observable時,開始將數據收集到一個List,然后它調用 bufferClosingSelector 生成第二個Observable,當第二個Observable 發射一個TClosing 時,buffer 發射當前的 List ,然后重復這個過程:開始組裝一個新的List,然后調用bufferClosingSelector創建一個新的Observable並監視它。

注意: 它會一直這樣做直到原來的Observable執行完成,可以收集完整的原始 Observable 的數據

img-buffer(bufferClosingSelector)

實例代碼:

	// 7. buffer(Callable<ObservableSource<T>> boundarySupplier)
	// 當它訂閱原來的Observable時,開始將數據收集到一個List,然后它調用 bufferClosingSelector 生成第二個Observable,
	// 當第二個Observable 發射一個 TClosing 時,buffer 發射當前的 List ,
	// 然后重復這個過程:開始組裝一個新的List,然后調用bufferClosingSelector創建一個新的Observable並監視它。
	// 它會一直這樣做直到原來的Observable執行完成。會收集完整的原始 Observable 的數據
	Observable.range(1, 50000)
		.buffer(new Callable<Observable<Long>>() {

			@Override
			public Observable<Long> call() throws Exception {
				return Observable.timer(1, TimeUnit.MILLISECONDS);
			}
		}).subscribe(new Consumer<List<Integer>>() {

			@Override
			public void accept(List<Integer> t) throws Exception {
				System.out.println("--> accept(7): " + t.size());	// 每次收集的數據序列個數
			}
		});

輸出:

--> accept(7): 14650
--> accept(7): 9708
--> accept(7): 25642

Javadoc: buffer(bufferClosingSelector)

2. Map

對Observable發射的每一項數據應用一個函數,執行變換操作。

實例代碼:

	// map(Function<T,R))
	// 接受原始Observable的數據,發送處理后的數據
	Observable.range(1, 5)
		.map(new Function<Integer, Integer>() {

			@Override
			public Integer apply(Integer t) throws Exception {
				System.out.println("--> apply: " + t);
				return t*t;	// 計算原始數據的平方
			}
		}).subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept Map: " + t);
			}
		});

輸出:

--> apply: 1
--> accept Map: 1
--> apply: 2
--> accept Map: 4
--> apply: 3
--> accept Map: 9
--> apply: 4
--> accept Map: 16
--> apply: 5
--> accept Map: 25

Javadoc: map(mapper)

3. FlatMap

主要對原始數據進行轉換操作后發送至訂閱者。

RxJava2 中的一些 FlatMap 操作方法如下:

3.1 flatMap(mapper)

FlatMap 將一個發射數據的 Observable 變換為 多個 Observables,然后將它們發射的數據合並后放進一個單獨的 Observable。

img-flatMap(mapper)

FlatMap 操作符使用一個指定的函數對原始Observable發射的每一項數據執行變換操作,這個函數返回一個本身也發射數據的Observable,然后 FlatMap 合並這些Observables發射的數據,最后將合並后的結果當做它自己的數據序列發射。

這個方法是很有用的,例如,當你有一個這樣的Observable:它發射一個數據序列,這些數據本身包含Observable成員或者可以變換為Observable,因此你可以創建一個新的 Observable發射這些次級Observable發射的數據的完整集合。

注意: FlatMap 對這些Observables發射的數據做的是合並(merge)操作,因此它們可能是交錯的。
在許多語言特定的實現中,還有一個操作符不會讓變換后的Observables發射的數據交錯,它按照嚴格的順序發射這些數據,這個操作符通常被叫作ConcatMap或者類似的名字。

實例代碼:

	// 	1. flatMap(Function)
	// 	對原始Observable發射的每一項數據執行變換操作,這個函數返回一個本身也發射數據的Observable,
	// 	然后FlatMap合並這些Observables發射的數據,最后將合並后的結果當做它自己的數據序列發射
	Observable.range(1, 5)
		.flatMap(new Function<Integer, ObservableSource<? extends Integer>>() {

			@Override
			public ObservableSource<? extends Integer> apply(Integer t) throws Exception {
				System.out.println("--> apply(1): " + t);							// 原始數據
				return Observable.range(1, t).subscribeOn(Schedulers.newThread());	// 處理后數據
			}
		}).subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept flatMap(1): " + t);					// 接受的所有數據
			}
		});

輸出:

--> apply(1): 1
--> apply(1): 2
--> apply(1): 3
--> apply(1): 4
--> accept flatMap(1): 1
--> accept flatMap(1): 2
--> apply(1): 5
--> accept flatMap(1): 1
--> accept flatMap(1): 1
--> accept flatMap(1): 2
--> accept flatMap(1): 3
--> accept flatMap(1): 4
--> accept flatMap(1): 1
--> accept flatMap(1): 2
--> accept flatMap(1): 3
--> accept flatMap(1): 4
--> accept flatMap(1): 5
--> accept flatMap(1): 1
--> accept flatMap(1): 2
--> accept flatMap(1): 3

Javadoc: flatMap(mapper)

3.2 flatMap(mapper, maxConcurrency)

maxConcurrency 這個參數設置 flatMap 從原來的Observable映射Observables的最大同時訂閱數。當達到這個限制時,它會等待其中一個終止然后再訂閱另一個。

img-flatMap(mapper, maxConcurrency)

實例代碼:

	// 2. flatMap(Function, maxConcurrency)
	// maxConcurrency 這個參數設置 flatMap 從原來的Observable映射Observables的最大同時訂閱數。
	// 當達到這個限制時,它會等待其中一個終止然后再訂閱另一個
	Observable.range(1, 5)
	.flatMap(new Function<Integer, ObservableSource<? extends Integer>>() {

		@Override
		public ObservableSource<? extends Integer> apply(Integer t) throws Exception {
			System.out.println("--> apply(2): " + t);
			return Observable.range(1, t).subscribeOn(Schedulers.newThread());
		}
	// 指定最大訂閱數為1,此時等待上一個訂閱的Observable結束,在進行下一個Observable訂閱
	}, 1).subscribe(new Consumer<Integer>() {

		@Override
		public void accept(Integer t) throws Exception {
			System.out.println("--> accept flatMap(2): "+ t);
		}
	});

輸出:

--> apply(2): 1
--> apply(2): 2
--> apply(2): 3
--> apply(2): 4
--> apply(2): 5
--> accept flatMap(2): 1
--> accept flatMap(2): 1
--> accept flatMap(2): 2
--> accept flatMap(2): 1
--> accept flatMap(2): 2
--> accept flatMap(2): 3
--> accept flatMap(2): 1
--> accept flatMap(2): 2
--> accept flatMap(2): 3
--> accept flatMap(2): 4
--> accept flatMap(2): 1
--> accept flatMap(2): 2
--> accept flatMap(2): 3
--> accept flatMap(2): 4
--> accept flatMap(2): 5

Javadoc: flatMap(mapper, maxConcurrency)

3.3 flatMap(mapper, delayErrors)

delayError 這個參數指定是否延遲發生 Error 的Observable通知。還有一個可以指定最大訂閱數參數 maxConcurrency 的變體。

img-flatMap(mapper, delayErrors)
解析: 當值為 true 時延遲發生Error的這個訂閱的Observable通知,不中斷當前的訂閱操作,繼續下一個Observable的訂閱,在所有訂閱的Observable全部結束后發送 Error 這個Observable的通知,當值為 false 時則中斷所有訂閱的操作,並發送 Error 的通知。

實例代碼:

	// 3. flatMap(Function, delayErrors)
	// delayErrors 這個參數指定是否延遲發生Error的Observable通知
	// 當true 時延遲發生Error的這個訂閱的Observable通知,不中斷當前的訂閱操作,
	// 繼續下一個Observable的訂閱,在所有訂閱的Observable全部結束后發送Error這個Observable的通知
	Observable.range(1, 5)
		.flatMap(new Function<Integer, ObservableSource<? extends Integer>>() {

			@Override
			public ObservableSource<? extends Integer> apply(Integer t) throws Exception {
				System.out.println("--> apply(3): " + t);
				
				return Observable.create(new ObservableOnSubscribe<Integer>() {

					@Override
					public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
						if( t == 3) {
							throw new NullPointerException("delayErrors test!");	// 測試 Error
						}
						for (int i = 1; i <= t; i++) {
							emitter.onNext(i);
						}
						emitter.onComplete();
					}
				});
			}
		// 設置延遲 Error 通知到最后
		}, true).subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept flatMap(3): "+ t);
			}
		},new Consumer<Throwable>() {

			@Override
			public void accept(Throwable t) throws Exception {
				System.out.println("--> acceot Error(3): " + t);
			}
		});

輸出:

--> apply(3): 1
--> accept flatMap(3): 1
--> apply(3): 2
--> accept flatMap(3): 1
--> accept flatMap(3): 2
--> apply(3): 3
--> apply(3): 4
--> accept flatMap(3): 1
--> accept flatMap(3): 2
--> accept flatMap(3): 3
--> accept flatMap(3): 4
--> apply(3): 5
--> accept flatMap(3): 1
--> accept flatMap(3): 2
--> accept flatMap(3): 3
--> accept flatMap(3): 4
--> accept flatMap(3): 5
--> acceot Error(3): java.lang.NullPointerException: delayErrors test!

Javadoc: flatMap(Function, delayErrors)
Javadoc: flatMap(Function, delayErrors, maxConcurrency)

3.4 flatMapIterable(mapper)

flatMapIterable 這個變體成對的打包數據,然后生成 Iterable 而不是原始數據和生成的 Observables,但是處理方式是相同的。

img-flatMapIterable(Func)

解析: 對數據進行處理轉換成 Iterable 來發射數據。

實例代碼:

	//	4. flatMapIterable(Function(T,R))
	// 	對數據進行處理轉換成Iterable來發射數據
	Observable.range(1, 5)
		.flatMapIterable(new Function<Integer, Iterable<? extends Integer>>() {

			@Override
			public Iterable<? extends Integer> apply(Integer t) throws Exception {
				System.out.println("--> apply: " + t);
				ArrayList<Integer> list = new ArrayList<Integer>();
				list.add(888);
				list.add(999);
				return list; // 將原始數據轉換為兩個數字發送
			}
		}).subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept flatMapIterable(4): " + t);
			}
		});

輸出:

--> apply: 1
--> accept flatMapIterable(4): 888
--> accept flatMapIterable(4): 999
--> apply: 2
--> accept flatMapIterable(4): 888
--> accept flatMapIterable(4): 999
--> apply: 3
--> accept flatMapIterable(4): 888
--> accept flatMapIterable(4): 999
--> apply: 4
--> accept flatMapIterable(4): 888
--> accept flatMapIterable(4): 999
--> apply: 5
--> accept flatMapIterable(4): 888
--> accept flatMapIterable(4): 999

Javadoc: flatMapIterable(mapper)

3.5 flatMapIterable(mapper, resultSelector)

參數 mapper 接收原始數據,resultSelector 同時接收原始數據和 mapper 處理的數據,進行二次數據轉換。

img-flatMapIterable(mapper, resultSelector)

實例代碼:

	//	5. flatMapIterable(Function(T,R),Function(T,T,R))
	// 	第一個func接受原始數據,轉換數據,第二個func同時接受原始和處理的數據,進行二次轉換處理
	Observable.range(1, 3)
			.flatMapIterable(new Function<Integer, Iterable<? extends Integer>>() {

				@Override
				public Iterable<? extends Integer> apply(Integer t) throws Exception {
					ArrayList<Integer> list = new ArrayList<Integer>();
					list.add(888);
					list.add(999);
					return list; // 將原始數據轉換為兩個數字發送
				}
			}, new BiFunction<Integer, Integer, Integer>() {

				@Override
				public Integer apply(Integer t1, Integer t2) throws Exception {
					System.out.println("--> apply(5): t1 = " + t1 + ", t2 = " + t2);
					return t1 + t2;	// 將原始數據和處理過的數據組合進行二次處理發送
				}
			}).subscribe(new Consumer<Integer>() {

				@Override
				public void accept(Integer t) throws Exception {
					System.out.println("--> accept flatMapIterable(5): " + t);
				}
			});

輸出:

--> apply(5): t1 = 1, t2 = 888
--> accept flatMapIterable(5): 889
--> apply(5): t1 = 1, t2 = 999
--> accept flatMapIterable(5): 1000
--> apply(5): t1 = 2, t2 = 888
--> accept flatMapIterable(5): 890
--> apply(5): t1 = 2, t2 = 999
--> accept flatMapIterable(5): 1001
--> apply(5): t1 = 3, t2 = 888
--> accept flatMapIterable(5): 891
--> apply(5): t1 = 3, t2 = 999
--> accept flatMapIterable(5): 1002

Javadoc: flatMapIterable(mapper, resultSelector)

4. ConcatMap

concatMap 操作符的功能和 flatMap 是非常相似的,只是有一點,concatMap 最終輸出的數據序列和原數據序列是一致,它是按順序鏈接Observables,而不是合並(flatMap用的是合並)。

通過 mapper 處理原數據后,轉換成 Observables ,按照順序進行連接 Observables 發送數據。

img-concatMap(mapper)
解析: concatMapflatMap的功能是一樣的, 將一個發射數據的Observable變換為多個Observables,然后將它們發射的數據放進一個單獨的Observable。只不過最后合並ObservablesflatMap采用的merge,而concatMap采用的是連接(concat)。區別:concatMap是有序的,flatMap是無序的,concatMap最終輸出的順序與原序列保持一致,而flatMap則不一定,有可能出現交錯。

實例代碼:

	// 1. concatMap(Function(T,R))
	// 按照順序依次處理原始數據和處理的數據
	Observable.range(1, 3)
		.concatMap(new Function<Integer, ObservableSource<? extends Integer>>() {

			@Override
			public ObservableSource<? extends Integer> apply(Integer t) throws Exception {
				System.out.println("--> apply(1): " + t);
				return Observable.range(1, t).doOnSubscribe(new Consumer<Disposable>() {

					@Override
					public void accept(Disposable t) throws Exception {
						System.out.println("--> accept(1): Observable on Subscribe");	// 當前的Observable被訂閱
					}
				});
			}
		}).subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept concatMap(1): " + t);
			}
		});
	
	System.out.println("--------------------------------------------");
	// 2. concatMap(mapper, prefetch)
	// prefetch 參數是在處理后的Observables發射的數據流中預讀數據個數,不影響原數據的發射和接收順序
	Observable.range(1, 3)
		.concatMap(new Function<Integer, ObservableSource<? extends Integer>>() {
	
			@Override
			public ObservableSource<? extends Integer> apply(Integer t) throws Exception {
				System.out.println("--> apply(2): " + t);
				return Observable.range(1, 3).doOnSubscribe(new Consumer<Disposable>() {
	
					@Override
					public void accept(Disposable t) throws Exception {
						System.out.println("--> accept(2): Observable on Subscribe");	// 當前的Observable被訂閱
					}
				});
			}
		}, 2).subscribe(new Consumer<Integer>() {
	
			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept concatMap(2): " + t);
			}
		});

輸出:

--> apply(1): 1
--> accept(1): Observable on Subscribe
--> accept concatMap(1): 1
--> apply(1): 2
--> accept(1): Observable on Subscribe
--> accept concatMap(1): 1
--> accept concatMap(1): 2
--> apply(1): 3
--> accept(1): Observable on Subscribe
--> accept concatMap(1): 1
--> accept concatMap(1): 2
--> accept concatMap(1): 3
--------------------------------------------
--> apply(2): 1
--> accept(2): Observable on Subscribe
--> accept concatMap(2): 1
--> accept concatMap(2): 2
--> accept concatMap(2): 3
--> apply(2): 2
--> accept(2): Observable on Subscribe
--> accept concatMap(2): 1
--> accept concatMap(2): 2
--> accept concatMap(2): 3
--> apply(2): 3
--> accept(2): Observable on Subscribe
--> accept concatMap(2): 1
--> accept concatMap(2): 2
--> accept concatMap(2): 3

Javadoc: concatMap(mapper)
Javadoc: concatMap(mapper, refetch)

5. SwitchMap

有選擇的訂閱 Observable,當原始 Observable 發射一個數據,通過 witchMap 返回一個 Observable,
當原始Observable發射一個新的數據時,它將取消訂閱並停止監視產生執之前的Observable,開始監視當前新的Observable。

img-SwitchMap

解析: 如果上一個任務尚未完成時,就開始下一個任務的話,上一個任務就會被取消掉。如果所有任務都是在同一個線程里執行的話,此時這個操作符與 ContactMap 一致,都是依次順序執行。只有在不同的線程里執行的時候,即線程方案為newThread的時候,才會出現這種情況,常用於網絡請求中。

實例代碼:

	// 1. witchMap(Function(T,R))
	// 同一個線程執行
	Observable.range(1, 3)
	.switchMap(new Function<Integer, ObservableSource<? extends Integer>>() {

		@Override
		public ObservableSource<? extends Integer> apply(Integer t) throws Exception {
			System.out.println("--> apply(1): " + t);
			return Observable.range(1, 3);	// 每個任務指定在同一個線程執行
		}
	}).subscribe(new Consumer<Integer>() {

		@Override
		public void accept(Integer t) throws Exception {
			System.out.println("--> accept switchMap(1): " + t);
		}
	});
	
	System.out.println("---------------------------------------");
	// 2. witchMap(Function(T,R))
	// 不同線程執行
	Observable.range(1, 3)
		.switchMap(new Function<Integer, ObservableSource<? extends Integer>>() {

			@Override
			public ObservableSource<? extends Integer> apply(Integer t) throws Exception {
				System.out.println("--> apply(2): " + t);
				return Observable.range(1, 3)
								 .subscribeOn(Schedulers.newThread());	// 每個任務指定在子線程執行
			}
		}).subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept switchMap(2): " + t);
			}
		});
	

	System.out.println("---------------------------------------");
	// 3. switchMap(mapper, bufferSize)
	// bufferSize 參數是從當前活動的Observable中預讀數據的大小
	Observable.range(1, 3)
	.switchMap(new Function<Integer, ObservableSource<? extends Integer>>() {

		@Override
		public ObservableSource<? extends Integer> apply(Integer t) throws Exception {
			System.out.println("--> apply(3): " + t);
			return Observable.range(1, 5).subscribeOn(Schedulers.newThread());
		}
	}, 3).subscribe(new Consumer<Integer>() {	// 指定緩存大小為3

		@Override
		public void accept(Integer t) throws Exception {
			System.out.println("--> accept switchMap(3): " + t);
		}
	});

輸出:

--> apply(1): 1
--> accept switchMap(1): 1
--> accept switchMap(1): 2
--> accept switchMap(1): 3
--> apply(1): 2
--> accept switchMap(1): 1
--> accept switchMap(1): 2
--> accept switchMap(1): 3
--> apply(1): 3
--> accept switchMap(1): 1
--> accept switchMap(1): 2
--> accept switchMap(1): 3
---------------------------------------
--> apply(2): 1
--> apply(2): 2
--> apply(2): 3
--> accept switchMap(2): 1
--> accept switchMap(2): 2
--> accept switchMap(2): 3
---------------------------------------
--> apply(3): 1
--> apply(3): 2
--> apply(3): 3
--> accept switchMap(3): 1
--> accept switchMap(3): 2
--> accept switchMap(3): 3
--> accept switchMap(3): 4
--> accept switchMap(3): 5

Javadoc: switchMap(mapper)
Javadoc: switchMap(mapper, bufferSize)

接續:

后續的Rx相關數據變換部分請參考: Rxjava2 Observable的數據變換詳解及實例(二)

Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM