簡要:
幾種主要的需求
- 直接創建一個Observable(創建操作)
- 組合多個Observable(組合操作)
- 對Observable發射的數據執行變換操作(變換操作)
- 從Observable發射的數據中取特定的值(過濾操作)
- 轉發Observable的部分值(條件/布爾/過濾操作)
- 對Observable發射的數據序列求值(算術/聚合操作)
創建Observable的各種方式
- create():使用一個函數從頭創建一個Observable
- defer():只有當訂閱者訂閱才創建Observable;為每個訂閱創建一個新的 Observable
- empty() :創建一個什么都不做直接通知完成的Observable
- never():創建一個不發射任何數據的Observable
- error():—創建一個什么都不做直接通知錯誤的Observable
- just():將一個或多個對象轉換成發射這個或這些對象的一個Observable
- from():將一個Iterable, 一個Future, 或者一個數組轉換成一個Observable
- repeat():創建一個重復發射指定數據或數據序列的Observable
- repeatWhen() :創建一個重復發射指定數據或數據序列的Observable,它依賴於另一 個Observable發射的數據
- repeatUntil():根據條件(函數BooleanSupplier)判斷是否需要繼續訂閱
- range():創建一個發射指定范圍的整數序列的Observable
- interval():創建一個按照給定的時間間隔發射整數序列的Observable
- timer():—創建一個在給定的延時之后發射單個數據的Observable
1. Create
使用 Create
操作符從頭開始創建一個Observable,給這個操作符傳遞一個接受觀察者作為參數的函數,編寫這個函數可以調用觀察者的 onNext
,onError
和 onCompleted
方法,當發生訂閱的時候會自動調用觀察者的 onSubscribe
方法。
通過 Subscribe 進行Observable 與 Observer 的訂閱,其中 subscribe 方法可以接收一個完整通知參數的 Observer 對象,也可以接收部分通知參數的 Consumer
(接收數據) 或者 Action
(僅接收通知) 對象。
實例代碼:
// 創建Observable(被觀察者)
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Hello");
emitter.onNext("World");
emitter.onComplete();
}
});
// 創建Observer(觀察者), 可以接受所有通知
Observer<String> observer = new Observer<String>() {
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe");
}
public void onNext(String t) {
System.out.println("--> onNext = " + t);
}
public void onError(Throwable e) {
System.out.println("--> onError");
}
public void onComplete() {
System.out.println("--> onComplete");
}
};
// 創建只接受 onNext(item) 通知的Consumer(觀察者)
Consumer<String> nextConsumer = new Consumer<String>() {
@Override
public void accept(String t) throws Exception {
System.out.println("--> accept nextConsumer: " + t);
}
};
// 創建只接受 onError(Throwable) 通知的Consumer(觀察者)
Consumer<Throwable> errorConsumer = new Consumer<Throwable>() {
@Override
public void accept(Throwable t) throws Exception {
System.out.println("-- accept errorConsumer: " + t);
}
};
// 創建只接受 onComplete() 通知的Action(觀察者)
Action completedAction = new Action() {
@Override
public void run() throws Exception {
System.out.println("--> run completedAction");
}
};
// 創建只接受 onSubscribe 通知的Consumer(觀察者)
Consumer<Disposable> onSubscribeComsumer = new Consumer<Disposable>() {
@Override
public void accept(Disposable t) throws Exception {
System.out.println("--> accept onSubscribeComsumer ");
}
};
// 1. 進行訂閱,subscribe(Observer)
observable.subscribe(observer);
System.out.println("---------------------------------------------");
// 2. 進行訂閱,subscribe(Consumer onNext)
observable.subscribe(nextConsumer);
System.out.println("---------------------------------------------");
// 3. 進行訂閱,subscribe(Consumer onNext, Consumer onError)
observable.subscribe(nextConsumer, errorConsumer);
System.out.println("---------------------------------------------");
// 4. 進行訂閱,subscribe(Consumer onNext, Consumer onError, Action onCompleted)
observable.subscribe(nextConsumer, errorConsumer, completedAction);
System.out.println("---------------------------------------------");
// 5. 進行訂閱,subscribe(Consumer onNext, Consumer onError, Action onCompleted, Consumer onSubscribe)
observable.subscribe(nextConsumer, errorConsumer, completedAction, onSubscribeComsumer);
輸出:
--> onSubscribe
--> onNext = Hello
--> onNext = World
--> onComplete
---------------------------------------------
--> accept nextConsumer: Hello
--> accept nextConsumer: World
---------------------------------------------
--> accept nextConsumer: Hello
--> accept nextConsumer: World
---------------------------------------------
--> accept nextConsumer: Hello
--> accept nextConsumer: World
--> run completedAction
---------------------------------------------
--> accept onSubscribeComsumer
--> accept nextConsumer: Hello
--> accept nextConsumer: World
--> run completedAction
注意:create 方法默認不在任何特定的調度器上執行。
onSubscribe(Disposable): 在發生訂閱時接收。
onNext(item): 在被觀察者發射數據接收。
onError(Throwable): 在被觀察者發射Error時接收。
onComplete(): 在被觀察者完成數據發送時接收。
Javadoc: create(OnSubscribe)
Javadoc: subscribe()
Javadoc: subscribe(observer)
Javadoc: subscribe(onNext)
Javadoc: subscribe(onNext, onError)
Javadoc: subscribe(onNext, onError, onComplete)
Javadoc: subscribe(onNext, onError, onComplete, onSubscribe)
2. Defer
直到有觀察者訂閱時才創建 Observable,並且為每個觀察者創建一個新的 Observable.
Defer
操作符會一直等待直到有觀察者訂閱它,然后它使用Observable工廠方法生成一個 Observable。它對每個觀察者都這樣做,因此盡管每個訂閱者都以為自己訂閱的是同一個 Observable,事實上每個訂閱者獲取的是它們自己的單獨的數據序列。
實例代碼:
// 創建一個Defer類型的Observable
Observable<Integer> deferObservable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
public ObservableSource<? extends Integer> call() throws Exception {
// 創建每個觀察者訂閱所返回的 Observable
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
emitter.onNext(5);
emitter.onComplete();
}
});
return observable;
}
});
// 創建第一個觀察者並訂閱defer Observable
deferObservable.subscribe(new Consumer<Integer>() {
public void accept(Integer t) throws Exception {
System.out.println("No.1 --> accept = " + t);
}
});
// 創建第二個觀察者並訂閱defer Observable
deferObservable.subscribe(new Consumer<Integer>() {
public void accept(Integer t) throws Exception {
System.out.println("No.2 --> accept = " + t);
}
});
// 創建第三個觀察者並訂閱defer Observable
deferObservable.subscribe(new Consumer<Integer>() {
public void accept(Integer t) throws Exception {
System.out.println("No.3 --> accept = " + t);
}
});
輸出:
No.1 --> accept = 1
No.1 --> accept = 2
No.1 --> accept = 3
No.1 --> accept = 4
No.1 --> accept = 5
No.2 --> accept = 1
No.2 --> accept = 2
No.2 --> accept = 3
No.2 --> accept = 4
No.2 --> accept = 5
No.3 --> accept = 1
No.3 --> accept = 2
No.3 --> accept = 3
No.3 --> accept = 4
No.3 --> accept = 5
注意:defer 方法默認不在任何特定的調度器上執行。
Javadoc: defer(Func0)
3. Empty/Never/Error
Empty
:創建一個不發射任何數據但是正常終止的Observable
Never
:創建一個不發射數據也不終止的Observable
Error
:創建一個不發射數據以一個錯誤終止的Observable
這三個操作符生成的 Observable 行為非常特殊和受限,多用於一些特殊的場景(某些操作狀態異常后返回一個error、empty、never 的 Observable)。測試的時候很有用,有時候也用於結合其它的 Observables,或者作為其它需要 Observable 的操作符的參數。
實例代碼:
System.out.println("--> 1 -----------------------------------");
// 1. 創建一個不發射任何數據但是正常終止的Observable
Observable.empty()
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Object t) {
System.out.println("onNext: " + t);
}
@Override
public void onError(Throwable e) {
System.out.println("onError: " + e);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
System.out.println("--> 2 -----------------------------------");
// 2. 創建一個不輸出數據,並且不會終止的Observable
Observable.never()
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Object t) {
System.out.println("onNext: " + t);
}
@Override
public void onError(Throwable e) {
System.out.println("onError: " + e);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
System.out.println("--> 3 -----------------------------------");
// 3. 創建一個不發射數據以一個錯誤終止的Observable
Observable.error(new NullPointerException("error test"))
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Object t) {
System.out.println("onNext: " + t);
}
@Override
public void onError(Throwable e) {
System.out.println("onError: " + e);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
輸出:
--> 1 -----------------------------------
onSubscribe
onComplete
--> 2 -----------------------------------
onSubscribe
--> 3 -----------------------------------
onSubscribe
onError: java.lang.NullPointerException: error test
注意:
- RxJava將這些操作符實現為 empty,never和 error。
- error 操作符需要一 個 Throwable參數,你的Observable會以此終止。
- 這些操作符默認不在任何特定的調度器上執行,但是 empty 和 error 有一個可選參數是Scheduler,如果你傳遞了Scheduler參數,它 們會在這個調度器上發送通知.
Javadoc: empty()
Javadoc: never()
Javadoc: error(java.lang.Throwable)
4. Just
創建一個發射指定值的Observable。
Just
將單個數據轉換為發射那個數據的Observable。類似於From
,但是From會將數組或Iterable的數據取出然后逐個發射,而Just只是簡單的原樣發射,將數組或Iterable當做單個數據。
注意: 如果你傳遞 null
給 Just
,它會返回一個發射 null 值的 Observable。不要誤認為它會返回一個空Observable(完全不發射任何數據的Observable),如果需要空Observable你應該使用Empty
操作符。
實例代碼:
// 單個對象發送
Observable.just(1)
.subscribe(new Consumer<Integer>() {
public void accept(Integer t) throws Exception {
System.out.println("--> singe accept: " + t);
}
});
System.out.println("---------------------------------");
// 多個對象發送,內部實際使用from實現 (接受一至九個參數,返回一個按參數列表順序發射這些數據的Observable)
Observable.just(1, 2, 3, 4, 5)
.subscribe(new Consumer<Integer>() {
public void accept(Integer t) throws Exception {
System.out.println("--> mutil accept: " + t);
}
});
輸出:
--> singe accept: 1
---------------------------------
--> mutil accept: 1
--> mutil accept: 2
--> mutil accept: 3
--> mutil accept: 4
--> mutil accept: 5
Javadoc: just(item ...)
5. From
將其它種類的對象和數據類型轉換為Observable,發射來自對應數據源數據類型的數據,在RxJava中,from
操作符可以轉換 Future
、Iterable
和數組
。對於Iterable和數組,產生的Observable會發射Iterable或數組的每一項數據。
實例代碼:
// 初始化數據
Integer[] array = { 1, 2, 3, 4, 5, 6 };
List<String> iterable = new ArrayList<String>();
iterable.add("A");
iterable.add("B");
iterable.add("C");
iterable.add("D");
iterable.add("E");
// 1. fromArray
Observable.fromArray(array).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept(1):fromArray: " + t);
}
});
System.out.println("---------------------------------------");
// 2. fromIterable
Observable.fromIterable(iterable)
.subscribe(new Consumer<String>() {
@Override
public void accept(String t) throws Exception {
System.out.println("--> accept(2) fromIterable: " + t);
}
});
System.out.println("---------------------------------------");
// 3. fromCallable
Observable.fromCallable(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 1;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept(3): fromCallable: " + t);
}
});
System.out.println("---------------------------------------");
// 4. fromFuture
Observable.fromFuture(new Future<String>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public String get() throws InterruptedException, ExecutionException {
System.out.println("--> fromFutrue: get()");
return "hello";
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return false;
}
@Override
public String get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String t) throws Exception {
System.out.println("--> accept(4): fromFuture: " + t);
}
});
輸出:
--> accept(1):fromArray: 1
--> accept(1):fromArray: 2
--> accept(1):fromArray: 3
--> accept(1):fromArray: 4
--> accept(1):fromArray: 5
--> accept(1):fromArray: 6
---------------------------------------
--> accept(2) fromIterable: A
--> accept(2) fromIterable: B
--> accept(2) fromIterable: C
--> accept(2) fromIterable: D
--> accept(2) fromIterable: E
---------------------------------------
--> accept(3): fromCallable: 1
---------------------------------------
--> fromFutrue: get()
--> accept(4): fromFuture: hello
注意:from默認不在任何特定的調度器上執行。然而你可以將Scheduler作為可選的第二個參數傳遞給Observable,它會在那個調度器上管理這個Future。
Javadoc: from(array)
Javadoc: from(Iterable)
Javadoc: from(Callable)
Javadoc: from(Future)
Javadoc: from(Future,Scheduler)
Javadoc: from(Future,timeout,timeUnit)
6. Repeat
創建一個發射特定數據重復多次的Observable,它不是創建一個Observable,而是重復發射原始 Observable的數據序列,這個序列或者是無限的,或者通過 repeat(n)
指定重復次數。
實例代碼:
// 1. repeat(): 一直重復發射原始 Observable的數據序列
Observable.range(1, 5)
.repeat()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept(1): " + t);
}
});
System.out.println("----------------------------------------");
// 2. repeat(n): 重復執行5次
Observable.range(1, 2)
.repeat(3)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept(2): " + t);
}
});
輸出:
--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
--> accept(1): 4
--> accept(1): 5
--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
--> accept(1): 4
--> accept(1): 5
--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
--> accept(1): 4
--> accept(1): 5
......
----------------------------------------
--> accept(2): 1
--> accept(2): 2
--> accept(2): 1
--> accept(2): 2
--> accept(2): 1
--> accept(2): 2
注意: repeat 操作符默認在 trampoline 調度器上執行。有一個變體可以通過可選參數指定 Scheduler。
Javadoc: repeat()
Javadoc: repeat(long)
Javadoc: repeat(Scheduler)
Javadoc: repeat(long,Scheduler)
7. RepeatWhen
repeatWhen
的操作符,它不是緩存和重放原始 Observable 的數據序列,接收到原始 Observable 終止通知后,有條件的決定是否重新訂閱原來的 Observable 。
將原始 Observable 的終止通知(完成或錯誤)當做一個 void 數據傳遞給一個通知處理器,它以此來決定是否要重新訂閱和發射原來的 Observable。這個通知處理器就像一個 Observable 操作符,接受一個發射 void通知的 Observable為輸入,返回一個發射 void 數據(意思是,重新訂閱和發射原始 Observable)或者直接終止(意思是,使用 repeatWhen 終止發射數據)的 Observable。
實例代碼:
// repeatWhen(Func1()):接收到終止通知后,在函數中決定是否重新訂閱原來的Observable
// 需要注意的是repeatWhen的objectObservable處理(也可以單獨自定義Observable返回),這里使用flathMap進行處理,
// 讓它延時發出onNext,這里onNext發出什么數據都不重要,它只是僅僅用來處理重訂閱的通知,如果發出的是onComplete/onError,則不會觸發重訂閱
Observable.range(1, 2)
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
System.out.println("-----------> 完成一次訂閱");
}
}).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
private int n = 0;
@Override
public ObservableSource<?> apply(Observable<Object> t) throws Exception {
// 接收到原始Observable的終止通知,決定是否重新訂閱
System.out.println("--> apply repeat ");
return t.flatMap(new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Object t) throws Exception {
if(n < 3) { // 重新訂閱3次
n ++;
return Observable.just(0);
} else {
return Observable.empty();
}
}
});
// return Observable.timer(1, TimeUnit.SECONDS); // 間隔一秒后重新訂閱一次
// return Observable.interval(1, TimeUnit.SECONDS); // 每間隔一秒重新訂閱一次
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept: " + t);
}
});
輸出:
--> apply repeat
--> accept: 1
--> accept: 2
-----------> 完成一次訂閱
--> accept: 1
--> accept: 2
-----------> 完成一次訂閱
--> accept: 1
--> accept: 2
-----------> 完成一次訂閱
--> accept: 1
--> accept: 2
-----------> 完成一次訂閱
注意:repeatWhen操作符默認在 trampoline 調度器上執行。
Javadoc: repeatWhen(Func1)
8. RepeatUntil
根據條件(函數BooleanSupplier
)判斷是否需要繼續訂閱: false:繼續訂閱; true:取消訂閱
實例代碼:
// repeatUntil 根據條件(BooleanSupplier)判斷是否需要繼續訂閱
Observable.range(1, 2)
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
System.out.println("-----------> 完成一次訂閱");
}
}).repeatUntil(new BooleanSupplier() {
private int n = 0;
@Override
public boolean getAsBoolean() throws Exception {
System.out.println("getAsBoolean = " + (n < 3? false:true) );
// 是否需要終止
if (n < 3) {
n++;
return false; // 繼續重新訂閱
}
return true; // 終止重新訂閱
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept: " + t);
}
});
輸出:
--> accept: 1
--> accept: 2
-----------> 完成一次訂閱
getAsBoolean = false
--> accept: 1
--> accept: 2
-----------> 完成一次訂閱
getAsBoolean = false
--> accept: 1
--> accept: 2
-----------> 完成一次訂閱
getAsBoolean = false
--> accept: 1
--> accept: 2
-----------> 完成一次訂閱
getAsBoolean = true
Javadoc: repeatWhen(Func1)
9. Range
創建一個發射特定整數序列的Observable。
Range
操作符發射一個范圍內的有序整數序列,你可以指定范圍的起始和長度。
RxJava將這個操作符實現為 range 函數,它接受兩個參數,一個是范圍的起始值,一個是范圍的數據的數目。如果你將第二個參數設為0,將導致Observable不發射任何數據(如果設置 為負數,會拋異常)。
實例代碼:
// 1. range(n,m) 發射從n開始的m個整數序列,序列區間[n,n+m-1)
Observable.range(0, 5)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("-- accept(range): " + t);
}
});
System.out.println("------------------------------");
// 2. rangeLong(n,m) 發射從n開始的m個長整型序列,序列區間[n,n+m-1)
Observable.rangeLong(1, 5)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long t) throws Exception {
System.out.println("-- accept(rangeLong): " + t);
}
});
輸出:
-- accept(range): 0
-- accept(range): 1
-- accept(range): 2
-- accept(range): 3
-- accept(range): 4
------------------------------
-- accept(rangeLong): 1
-- accept(rangeLong): 2
-- accept(rangeLong): 3
-- accept(rangeLong): 4
-- accept(rangeLong): 5
Javadoc: range(int start,int count)
Javadoc: rangeLong(long start, long count)
10. interval
創建一個按固定時間間隔發射整數序列的Observable,它按固定的時間間隔發射一個無限遞增的整數序列。
RxJava將這個操作符實現為 interval 方法。它接受一個表示時間間隔的參數和一個表示時間單位的參數。
實例代碼:
// [1] interval(long period, TimeUnit unit)
// 每間隔period時間單位,發射一次整數序列
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
public void accept(Long l) throws Exception {
System.out.println("--> accept(1): " + l);
}
});
System.out.println("------------------------------------");
// [2] interval(long initialDelay, long period, TimeUnit unit)
// 在延遲initialDelay秒后每隔period時間單位發射一個整數序列
Observable.interval(0, 1, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
public void accept(Long t) throws Exception {
System.out.println("--> accept(2): " + t);
}
});
System.out.println("------------------------------------");
// [3] intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
// 延遲initialDelay秒后從起始數據start開始,每隔period秒發送一個數字序列,一共發送count個數據
Observable.intervalRange(1, 5, 3, 2, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
public void accept(Long t) throws Exception {
System.out.println("--> accept(3): " + t);
}
});
注意:interval 默認在 computation 調度器上執行, 有一個變體可以通過可選參數指定 Scheduler。
Javadoc: interval(long period, TimeUnit unit)
Javadoc: interval(long period, TimeUnit unit, Scheduler scheduler)
Javadoc: interval(long initialDelay, long period, TimeUnit unit)
Javadoc: interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
Javadoc: intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
Javadoc: intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
輸出:
--> accept(1): 0
--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
--> accept(1): 4
--> accept(1): 5
...
------------------------------------
--> accept(2): 0
--> accept(2): 1
--> accept(2): 2
--> accept(2): 3
--> accept(2): 4
--> accept(2): 5
...
------------------------------------
--> accept(3): 1
--> accept(3): 2
--> accept(3): 3
--> accept(3): 4
--> accept(3): 5
11. Timer
創建一個給定的延遲后發射一個特殊的值的Observable。
RxJava將這個操作符實現為 timer 函數。timer 返回一個Observable,它在延遲一段給定的時間后發射一個簡單的數字0
實例代碼:
// timer(long delay, TimeUnit unit, Scheduler scheduler)
// 定時delay時間 單位后發送數字0,指定可選參數Schedule調度器為trampoline(當前線程排隊執行)
Observable.timer(5, TimeUnit.SECONDS, Schedulers.trampoline())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long t) throws Exception {
System.out.println("--> accept: " + t);
}
});
輸出:
--> accept: 0
注意:timer 操作符默認在 computation 調度器上執行。有一個變體可以通過可選參數指定 Scheduler。
Javadoc: timer(long delay, TimeUnit unit)
Javadoc: timer(long delay, TimeUnit unit, Scheduler scheduler)
小結
根據實際情況,使用不同的方式創建不同種類的Observable,這個在開發中非常有用,可以減少很多重復、復雜、冗余的操作,可以快速的創建一個符合要求的Observable,一定程度上提高了開發的效率。
提示:以上使用的Rxjava2版本: 2.2.12
Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例
實例代碼: