Rxjava2 Observable的數據過濾詳解及實例(一)


簡要:

需求了解:

對於數據的觀察以及處理過程中往往有需要過濾一些不需要的數據的需求,比如防抖(防止快速操作),獲取第一項、指定序列項或者最后一項的需要,獲取指定時間內的有效數據等。Rx中提供了豐富的數據過濾處理的操作方法。

可用於過濾和選擇Observable發射的數據序列的方法:

  • Debounce:過濾發射速率較快的數據項,防抖操作。
  • Throttle: 對數據序列進行限流操作,可以指定獲取周期內的指定數據項,也可以用於防抖。
  • Sample: 允許通過將序列划分為時間片段收集數據,並從每片中取出一個值來稀疏序列。
  • Distinct: 過濾掉重復數據。
  • Skip: 跳過指定的N項數據。
  • Filter: 通過函數指定過濾的數據。
  • First: 只發射第一項或者滿足某個條件的第一項數據。
  • Single: 與 first 類似,但是如果原始Observable在完成之前不是正好發射一次數據,它會拋出一個NoSuchElementException 的異常通知。
  • ElementAt: 獲取原始Observable發射的數據序列指定索引位置的數據項,然后當做自己的唯一數據發射。
  • ignoreElements: 不發射任何數據,只發射Observable的終止通知。
  • Last: 只發射最后一項(或者滿足某個條件的最后一項)數據。
  • Take: 只返回Observable發送數據項序列前面的N項數據,忽略剩余的數據。
  • TakeLast: 只發射Observable發送數據項序列的后N項數據,忽略其他數據。
  • ofType: 過濾一個Observable只返回指定類型的數據。

1. Debounce

僅在過了一段指定的時間還沒發射數據時才發射一個數據。Debounce 操作符會過濾掉發射速率過快的數據項。

提示: 操作默認在 computation 調度器上執行,但是你可以指定其它的調度器。

1.1 debounce(timeout, unit)

指定每個數據發射后在 timeout 時間內,原始數據序列中沒有下一個數據發射時,發射此項數據,否則丟棄這項數據。此操作與 throttleWithTimeout 方法相同。

注意: 這個操作符會在原始數據的 onCompleted 時候直接發射發射數據,不會因為限流而丟棄數據。

img-debounce(timeout, unit)

實例代碼:

	 // 1. debounce(long timeout, TimeUnit unit)
	 // 發送一個數據,如果在包含timeout時間內,沒有第二個數據發射,那么就會發射此數據,否則丟棄此數據
	Observable.create(new ObservableOnSubscribe<Integer>() {

		@Override
		public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
			emitter.onNext(1);	// 下一個數據到此數據發射,	30 < timeout	--> skip			
			Thread.sleep(30);
			emitter.onNext(2); 	// 下一個數據到此數據發射,	100 > timeout	--> deliver
			Thread.sleep(100);
			emitter.onNext(3); 	// 下一個數據到此數據發射,	50 = timeout	--> skip: 			
			Thread.sleep(50);
			emitter.onNext(4); 	// 下一個數據到此數據發射,	onCompleted		--> deliver
			emitter.onComplete();

		}
	}).debounce(50, TimeUnit.MILLISECONDS)	// 指定防抖丟棄時間段為50毫秒
	//  .debounce(50, TimeUnit.MILLISECONDS, Schedulers.trampoline())	// 指定調度為當前線程排隊
		.subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept debounce(1-1): " + t);
			}
		});

輸出:

--> accept debounce(1-1): 2
--> accept debounce(1-1): 4

Javadoc: debounce(timeout, unit)
Javadoc: debounce(timeout, unit, scheduler)

1.2 debounce(debounceSelector)

原始數據發射每一個序列都通過綁定監聽debounceSelector的數據通知,在debounceSelector數據發送前,如果有下一個數據,則丟棄當前項數據,繼續監視下一個數據。

注意: 這個操作符會在原始數據的 onCompleted 時候直接發射發射數據,不會因為限流而丟棄數據。

img-debounce(debounceSelector)

實例代碼:

	// 2. debounce(debounceSelector)
	// 原始數據發射每一個序列的通過監聽debounceSelector的數據通知,
	// 在debounceSelector數據發送前,如果有下一個數據,則丟棄當前項數據,繼續監視下一個數據
	Observable.create(new ObservableOnSubscribe<Integer>() {

		@Override
		public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
			emitter.onNext(1);		// skip		--> debounceSelector is no emitter(<2s)
			Thread.sleep(1000);	
			emitter.onNext(2);		// skip		--> debounceSelector is no emitter(<2s)
			Thread.sleep(200);
			emitter.onNext(3);		// deliver	--> debounceSelector is emitter(>2s)
			Thread.sleep(2500);
			emitter.onNext(4);		// skip		--> debounceSelector is no emitter(=2s)
			Thread.sleep(2000);
			emitter.onNext(5);		// deliver	--> onComplete
			Thread.sleep(500);
			emitter.onComplete();
		}
	}).debounce(new Function<Integer, ObservableSource<Long>>() {

			@Override
			public ObservableSource<Long> apply(Integer t) throws Exception {
				System.out.println("--> apply(1-2): " + t);
				// 設置過濾延遲時間為2秒,此時返回的Observable從訂閱到發送數據時間段即為timeout
				return Observable.timer(2, TimeUnit.SECONDS)
						.doOnSubscribe(new Consumer<Disposable>() {

							@Override
							public void accept(Disposable t) throws Exception {
								// 開始訂閱,監聽數據的發送來過濾數據
								System.out.println("--> debounceSelector(1-2) is onSubscribe!");
							}
						}).doOnDispose(new Action() {
		
							@Override
							public void run() throws Exception {
								// 發射數據后,丟棄當前的數據,解除當前綁定
								System.out.println("--> debounceSelector(1-2) is unSubscribe!");
							}
						});
			}
		}).subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("----------> accept(1-2): " + t);
			}
		});

輸出:

--> apply(1-2): 1
--> debounceSelector(1-2) is onSubscribe!
--> debounceSelector(1-2) is unSubscribe!
--> apply(1-2): 2
--> debounceSelector(1-2) is onSubscribe!
--> debounceSelector(1-2) is unSubscribe!
--> apply(1-2): 3
--> debounceSelector(1-2) is onSubscribe!
--> debounceSelector(1-2) is unSubscribe!
----------> accept(1-2): 3
--> apply(1-2): 4
--> debounceSelector(1-2) is onSubscribe!
--> debounceSelector(1-2) is unSubscribe!
--> apply(1-2): 5
--> debounceSelector(1-2) is onSubscribe!
----------> accept(1-2): 5
--> debounceSelector(1-2) is unSubscribe!

Javadoc: debounce(debounceSelector)

2. Throttle

主要應用於數據序列的節流操作,在指定的采樣周期內獲取指定的數據。Throttling 也用於稀疏序列。當生產者發出的值超出我們想要的值時,我們不需要每個序列值,我們可以通過限制它來稀釋序列。

注意: 時間的划分不一定是統一的。例如,發射數據的時間間隔與划分數據的時間間隔一致時,在原始數據發送的一個時間點(此時數據還沒有實際發送),此時可能由於划分時間已到,划分的數據片直接關閉了,所以有的時間片數據會有時間間隙差異。

提示: 操作默認在 computation 調度器上執行,但是你可以指定其它的調度器。

2.1 throttleFirst(windowDuration, unit)

獲取每個 windowDuration 時間段內的原始數據序列中的第一項數據,直到原始數據全部發送完畢。

img-throttleFirst(windowDuration, unit)

解析: 實際在每個采樣周期內,先發送第一項接收到的數據,然后丟棄后續周期內的數據項。

實例代碼:

	// 1. throttleFirst(long windowDuration, TimeUnit unit)
	// 指定每個指定時間內取第一項數據, 直到原始數據序列全部發送結束
	Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS)
		.doOnNext(new Consumer<Long>() {

			@Override
			public void accept(Long t) throws Exception {
				System.out.println("--> DataSource doOnNext : " + t);
			}
		}).throttleFirst(2, TimeUnit.SECONDS)							// 獲取每隔2秒之內收集的第一項數據
	 //   .throttleFirst(2, TimeUnit.SECONDS, Schedulers.newThread())	// 指定調度線程為newThread()
		  .subscribe(new Observer<Long>() {

				@Override
				public void onSubscribe(Disposable d) {
					System.out.println("--> throttleFirst onSubscribe");
				}

				@Override
				public void onNext(Long t) {
					System.out.println("-------------> throttleFirst onNext: " + t);
				}

				@Override
				public void onError(Throwable e) {
					System.out.println("--> throttleFirst onError: " + e);
				}

				@Override
				public void onComplete() {
					System.out.println("--> throttleFirst onComplete");
				}
			});

輸出:

--> throttleFirst onSubscribe
--> DataSource doOnNext : 1
-------------> throttleFirst onNext: 1
--> DataSource doOnNext : 2
--> DataSource doOnNext : 3
--> DataSource doOnNext : 4
-------------> throttleFirst onNext: 4
--> DataSource doOnNext : 5
--> DataSource doOnNext : 6
--> DataSource doOnNext : 7
-------------> throttleFirst onNext: 7
--> DataSource doOnNext : 8
--> DataSource doOnNext : 9
-------------> throttleFirst onNext: 9
--> DataSource doOnNext : 10
--> throttleFirst onComplete

Javadoc: throttleFirst(windowDuration, unit)
Javadoc: throttleFirst(windowDuration, unit, scheduler)

2.2 throttleLast(intervalDuration, unit)

獲取每個 windowDuration 時間段內的原始數據序列中的最近的一項數據,直到原始數據全部發送完畢。throttleLast 運算符以固定間隔而不是相對於最后一項來划分時間。它會在每個窗口中發出最后一個值,而不是它后面的第一個值。

img-throttleLast(intervalDuration, unit)

解析: 實際在每個采樣周期內,先緩存收集的數據,等周期結束發送最后一項數據,丟棄最后數據項前面的數據。

實例代碼:

	// 2. throttleLast(long intervalDuration, TimeUnit unit)
	// 指定間隔時間內取最后一項數據,直到原始數據序列全部發送結束
	Observable.intervalRange(1, 10, 0, 1050, TimeUnit.MILLISECONDS)
		.doOnNext(new Consumer<Long>() {

			@Override
			public void accept(Long t) throws Exception {
				System.out.println("--> DataSource doOnNext : " + t);
			}
		}).throttleLast(2, TimeUnit.SECONDS)							// 獲取每隔2秒之內收集的最后一項數據
     //   .throttleLast(2, TimeUnit.SECONDS, Schedulers.newThread())	// 指定調度線程為newThread()
		  .subscribe(new Observer<Long>() {

				@Override
				public void onSubscribe(Disposable d) {
					System.out.println("--> throttleLast onSubscribe");
				}

				@Override
				public void onNext(Long t) {
					System.out.println("-------------> throttleLast onNext: " + t);
				}

				@Override
				public void onError(Throwable e) {
					System.out.println("--> throttleLast onError: " + e);
				}

				@Override
				public void onComplete() {
					System.out.println("--> throttleLast onComplete");
				}
			});

輸出:

--> throttleLast onSubscribe
--> DataSource doOnNext : 1
--> DataSource doOnNext : 2
-------------> throttleLast onNext: 2
--> DataSource doOnNext : 3
--> DataSource doOnNext : 4
-------------> throttleLast onNext: 4
--> DataSource doOnNext : 5
--> DataSource doOnNext : 6
-------------> throttleLast onNext: 6
--> DataSource doOnNext : 7
--> DataSource doOnNext : 8
-------------> throttleLast onNext: 8
--> DataSource doOnNext : 9
--> DataSource doOnNext : 10
--> throttleLast onComplete

Javadoc: throttleLast(intervalDuration, unit)
Javadoc: throttleLast(intervalDuration, unit, scheduler)

2.3 throttleWithTimeout(timeout, unit)

指定每個數據發射后在 timeout 時間內,原始數據序列中沒有下一個數據發射時,發射此項數據,否則丟棄這項數據。此操作與 debounce 方法相同。

注意: 這個操作符會在原始數據的 onCompleted 時候直接發射發射數據,不會因為限流而丟棄數據。

img-throttleWithTimeout(timeout,  unit)

實例代碼:

	// 3. throttleWithTimeout(long timeout, TimeUnit unit)
	// 發送一個數據,如果在包含timeout時間內,沒有第二個數據發射,那么就會發射此數據,否則丟棄此數據
	Observable.create(new ObservableOnSubscribe<Integer>() {

		@Override
		public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
			emitter.onNext(1);	// 下一個數據到此數據發射,	--> skip: 		30 < timeout
			Thread.sleep(30);
			emitter.onNext(2);	// 下一個數據到此數據發射,	--> skip: 		50 = timeout
			Thread.sleep(50);
			emitter.onNext(3);	// 下一個數據到此數據發射,	--> deliver: 	60 > timeout
			Thread.sleep(60);
			emitter.onNext(4);	// onComplete			--> deliver:	onComplete
			emitter.onComplete();
		}
	}).throttleWithTimeout(50, TimeUnit.MILLISECONDS) // 指定防抖丟棄時間段為50毫秒
 //   .throttleWithTimeout(50, TimeUnit.MILLISECONDS, Schedulers.newThread()) // 指定調度線程為newThread()
	  .subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				// TODO Auto-generated method stub
				System.out.println("--> accept throttleWithTimeout(3): " + t);
			}
		});

輸出:

--> accept throttleWithTimeout(3): 3
--> accept throttleWithTimeout(3): 4

Javadoc: throttleWithTimeout(timeout, unit)
Javadoc: throttleWithTimeout(timeout, unit, scheduler)

3. Sample

sample 允許您通過將序列划分為時間片段,並從每片中取出一個值來稀疏序列。當每片結束時,將發出其中的最后一個值(如果有的話)。

注意: 時間的划分不一定是統一的。例如,發射數據的時間間隔與划分數據的時間間隔一致時,在原始數據發送的一個時間點(此時數據還沒有實際發送),此時可能由於划分時間已到,划分的數據片直接關閉了,所以有的時間片數據會有時間間隙差異。

3.1 sample(period, unit)

獲取每個 period 時間片段內手機收據序列的最后一項,忽略此時間片內收集的其他數據項。

實例代碼:

	// 1. sample(long period, TimeUnit unit)/sample(long period, TimeUnit unit)
	// 將序列分為 period 的時間片段,從每片重取出最近的一個數據
	// 等同於throttleLast
	Observable.intervalRange(1, 5, 0, 1100, TimeUnit.MILLISECONDS)
		.doOnNext(new Consumer<Long>() {

			@Override
			public void accept(Long t) throws Exception {
				System.out.println("--> DataSource onNext: " + t);
			}
		}).sample(2, TimeUnit.SECONDS) 								// 每3秒時間段數據中取最近一個值
	//    .sample(2, TimeUnit.SECONDS, true) 						// 參數emitLast,設置是否忽略未采樣的最后一個數據
	//	  .sample(2, TimeUnit.SECONDS, Schedulers.newThread())		// 指定調度器為newThread()
		  .subscribe(new Consumer<Long>() {
	
				@Override
				public void accept(Long t) throws Exception {
					System.out.println("--> accept(1): " + t);
				}
		  });

輸出:

--> DataSource onNext: 1
--> DataSource onNext: 2
--> accept(1): 2
--> DataSource onNext: 3
--> DataSource onNext: 4
--> accept(1): 4
--> DataSource onNext: 5

Javadoc: sample(long period, TimeUnit unit)
Javadoc: sample(long period, TimeUnit unit, emitLast)
Javadoc: sample(long period, TimeUnit unit, scheduler)
Javadoc: sample(long period, TimeUnit unit, scheduler, emitLast)

3.2 sample(sampler)

sample 的這個方法每當第二個 sampler 發射一個數據(或者當它終止)時就對原始 Observable 進行采樣。第二個Observable通過參數傳遞給 sample

img-sample(sampler)

實例代碼:

	// 2. sample(ObservableSource sampler)
	// 每當第二個 sampler 發射一個數據(或者當它終止)時就對原始 Observable進行采樣
	Observable.intervalRange(1, 5, 0, 1020, TimeUnit.MILLISECONDS)
		.doOnNext(new Consumer<Long>() {

				@Override
				public void accept(Long t) throws Exception {
					System.out.println("--> DataSource onNext: " + t);
				}
		}).sample(Observable.interval(2, TimeUnit.SECONDS))	// 每隔2秒進行一次采樣
		  .subscribe(new Consumer<Long>() {

				@Override
				public void accept(Long t) throws Exception {
					System.out.println("--> accept(2): " + t);
				}
		  });

輸出:

--> DataSource onNext: 1
--> DataSource onNext: 2
--> accept(2): 2
--> DataSource onNext: 3
--> DataSource onNext: 4
--> accept(2): 4
--> DataSource onNext: 5

Javadoc: sample(sampler)
Javadoc: sample(sampler, emitLast)

4. Distinct

抑制(過濾掉)重復的數據項。Distinct 的過濾規則是:只允許還沒有發射過的數據項通過。

在某些實現中,有一些方法中允許你調整判定兩個數據不同( distinct )的標准。還有一些實現只比較一項數據和它的直接前驅,因此只會從序列中過濾掉連續重復的數據。

4.1 distinct()

只允許還沒有發射過的數據項通過,過濾數據序列中的所有重復的數據項,保證處理后的數據序列沒有重復。

img-distinct

示例代碼:

	// 1. distinct()
	// 去除全部數據中重復的數據
	Observable.just(1, 2, 3, 3, 3, 4, 4, 5, 6, 6)
			.distinct()
			.subscribe(new Consumer<Integer>() {
			
				@Override
				public void accept(Integer t) throws Exception {
					System.out.println("--> accept distinct(1): " + t);
				}
			});

輸出:

--> accept distinct(1): 1
--> accept distinct(1): 2
--> accept distinct(1): 3
--> accept distinct(1): 4
--> accept distinct(1): 5
--> accept distinct(1): 6

Javadoc: distinct()

4.2 distinct(keySelector)

這個操作符接受一個函數。這個函數根據原始Observable發射的數據項產生一個 Key,然后,比較這些Key而不是數據本身,來判定兩個數據是否是不同的

img-distinct(keySelector)

實例代碼:

	// 數根據原始Observable發射的數據項產生一個 Key,然后比較這些Key而不是數據本身,來判定兩個數據是否是不同的(去除全部數據中重復的數據)
	Observable.just(1, 2, 3, 3, 4, 5, 6, 6)
			.distinct(new Function<Integer, String>() {

				@Override
				public String apply(Integer t) throws Exception {
					// 根據奇數或偶數來判斷數據序列的重復的key
					return t % 2 == 0 ? "even" : "odd";
				}
			}).subscribe(new Consumer<Integer>() {

				@Override
				public void accept(Integer t) throws Exception {
					System.out.println("--> accept distinct(2): " + t);
				}
			});

輸出:

--> accept distinct(2): 1
--> accept distinct(2): 2

Javadoc: distinct(keySelector)

4.3 distinctUntilChanged()

distinctUntilChanged 操作符,去除數據序列中的連續重復項。它只判定一個數據和它的直接前驅是否是不同的。

img-distinctUntilChanged

實例代碼:

	// 3. distinctUntilChanged()
	// 去除連續重復的數據
	Observable.just(1, 2, 3, 3, 4, 5, 6, 6, 3, 2)
		.distinctUntilChanged()
		.subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept distinctUntilChanged(3): " + t);
			}
		});

輸出:

--> accept distinctUntilChanged(3): 1
--> accept distinctUntilChanged(3): 2
--> accept distinctUntilChanged(3): 3
--> accept distinctUntilChanged(3): 4
--> accept distinctUntilChanged(3): 5
--> accept distinctUntilChanged(3): 6
--> accept distinctUntilChanged(3): 3
--> accept distinctUntilChanged(3): 2

Javadoc: distinctUntilChanged()

4.4 distinctUntilChanged(keySelector)

distinctUntilChanged(keySelector) 操作符,根據一個函數產生的 Key 判定兩個相鄰的數據項是不是相同的,去除連續重復的數據。

實例代碼:

	// 4. distinctUntilChanged(Function<T,K>)
	// 數根據原始Observable發射的數據項產生的 Key,去除連續重復的數據
	Observable.just(8, 2, 3, 5, 9, 5, 6, 6)
			.distinctUntilChanged(new Function<Integer, String>() {

				@Override
				public String apply(Integer t) throws Exception {
					// 根據原始數據處理后添加key,依據這個key來判斷是否重復(去除連續重復的數據)
					return t % 2 == 0 ? "even" : "odd";
				}
			}).subscribe(new Consumer<Integer>() {

				@Override
				public void accept(Integer t) throws Exception {
					System.out.println("--> accept distinctUntilChanged(4): " + t);
				}
			});

輸出:

--> accept distinctUntilChanged(4): 8
--> accept distinctUntilChanged(4): 3
--> accept distinctUntilChanged(4): 6

Javadoc: distinctUntilChanged(keySelector)

5. Skip

主要用於忽略Observable發射的指定的 N 項數據,如跳過數據序列的前面或后面 N 項數據,指定時間段內的數據項。

Skip 操作符的還有一些變體的操作方法如下:

5.1 skip(count)

忽略 Observable 發射的前 N 項數據,只保留之后的數據。

img-skip(count)

實例代碼:

	// 1. skip(long count)
	// 跳過前count項數據,保留后面的數據
	Observable.range(1, 10)
		.skip(5) // 過濾數據序列前5項數據
		.subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept skip(1): " + t);
			}
		});

輸出:

--> accept skip(1): 6
--> accept skip(1): 7
--> accept skip(1): 8
--> accept skip(1): 9
--> accept skip(1): 10

Javadoc: skip(count)

5.2 skip(time, unit)

skip 的這個變體接受一個時長參數,它會丟棄原始Observable開始的那段時間段發射的數據,時長和時間單位通過參數指定。

img-skip(time, unit)

實例代碼:

	// 2. skip(long time, TimeUnit unit)
	// 跳過開始的time時間段內的數據,保留后面的數據
	Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
		.skip(2, TimeUnit.SECONDS)	// 跳過前2秒的數據
		.subscribe(new Consumer<Long>() {

			@Override
			public void accept(Long t) throws Exception {
				System.out.println("--> accept skip(2): " + t);
			}
		});

輸出:

--> accept skip(2): 4
--> accept skip(2): 5

Javadoc: skip(time, unit)
Javadoc: skip(time, unit, scheduler)

5.3 skipLast(count)

使用 SkipLast 操作符修改原始Observable,你可以忽略Observable發射的后 N 項數據,只保留前面的數據。

img-skipLast(count)

實例代碼:

	// 3. skipLast(int count)
	// 跳過數據后面的count個數據
	Observable.range(1, 10)
		.skipLast(5) // 跳過數據序列的后5項數據
		.subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept skipLast(3): " + t);
			}
		});

輸出:

--> accept skipLast(3): 1
--> accept skipLast(3): 2
--> accept skipLast(3): 3
--> accept skipLast(3): 4
--> accept skipLast(3): 5

Javadoc: skipLast(count)

5.4 skipLast(time, unit)

還有一個 skipLast 變體接受一個時間段參數,它會丟棄在原始 Observable 的生命周期內最后一段時間內發射的數據。時長和時間單位通過參數指定。

注意: 這個機制是這樣實現的:延遲原始 Observable 發射的任何數據項,直到自原始數據發射之后過了給定的時長之后,才開始發送數據。

img-skipLast(time, unit)

實例代碼:

	// 4. skipLast(long time, TimeUnit unit, [boolean delayError])
	// 丟棄在原始Observable的生命周 期內最后time時間內發射的數據
	// 可選參數delayError:延遲異常通知
	Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS)
		.doOnNext(new Consumer<Long>() {

			@Override
			public void accept(Long t) throws Exception {
				System.out.println("--> DataSource: " + t);
			}
		}).skipLast(2, TimeUnit.SECONDS)
	//	  .skipLast(2, TimeUnit.SECONDS, Schedulers.trampoline()) // 通過scheduler指定工作線程
	//	  .skipLast(2, TimeUnit.SECONDS, true)					// 延遲Error的通知,多用於組合Observable的場景
		  .subscribe(new Consumer<Long>() {

			  @Override
			  public void accept(Long t) throws Exception {
			  	  System.out.println("--> accept skipLast(4): " + t);
			  }
		  });

輸出:

--> DataSource: 1
--> DataSource: 2
--> DataSource: 3
--> accept skipLast(4): 1
--> DataSource: 4
--> accept skipLast(4): 2
--> DataSource: 5
--> accept skipLast(4): 3
--> DataSource: 6
--> accept skipLast(4): 4
--> DataSource: 7
--> accept skipLast(4): 5
--> DataSource: 8
--> accept skipLast(4): 6
--> DataSource: 9
--> accept skipLast(4): 7
--> DataSource: 10
--> accept skipLast(4): 8

注意: skipLast 的這個操作默認在 computation 調度器上執行,但是你可以使用Scheduler參數指定其 它的調度器。
Javadoc: skipLast(time, unit)
Javadoc: skipLast(time, unit, delayError)
Javadoc: skipLast(time, unit, scheduler)
Javadoc: skipLast(time, unit, scheduler, delayError)
Javadoc: skipLast(time, unit, scheduler, delayError, bufferSize)

接續:

后續的Rx相關數據過濾部分請參考: Rxjava2 Observable的數據過濾詳解及實例(二)

Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM