1. 定義
RxJava
在GitHub
的介紹:
RxJava:a library for composing asynchronous and event-based programs using observable sequences for the Java VM // 翻譯:RxJava 是一個在 Java VM 上使用可觀測的序列來組成異步的、基於事件的程序的庫
- 總結:
RxJava
是一個 基於事件流、實現異步操作的庫。
2. 原理
-
RxJava:
基於 一種擴展的觀察者模式 RxJava
的擴展觀察者模式中有4個角色:
角色 | 作用 |
---|---|
被觀察者(Observable) | 產生事件 |
觀察者(Observer) | 接收事件,並給出響應動作 |
訂閱(Subscribe) | 連接 被觀察者 & 觀察者, 相當於注冊監聽 |
事件(Event) | 被觀察者 & 觀察者 溝通的載體 |
- 這里用兩根水管代替觀察者和被觀察者, 用通俗易懂的話把它們的關系解釋清楚, 在這里我將從事件流這個角度來說明RxJava的基本工作原理。
先假設有兩根水管:上面一根水管為事件產生的水管,叫它上游
吧,下面一根水管為事件接收的水管叫它下游
吧。
兩根水管通過一定的方式連接起來,使得上游每產生一個事件,下游就能收到該事件。
這里的上游
和下游
就分別對應着RxJava中的Observable
和Observer
,它們之間的連接就對應着subscribe()
,因此這個關系用RxJava來表示就是:
ChapterOne: public static void case1() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
打印結果如下:
08-05 17:00:05.072 24086 24086 D ChapterOne: onSubscribe
08-05 17:00:05.073 24086 24086 D ChapterOne: onNext: value = 1
08-05 17:00:05.073 24086 24086 D ChapterOne: onNext: value = 2
08-05 17:00:05.073 24086 24086 D ChapterOne: onNext: value = 3
08-05 17:00:05.073 24086 24086 D ChapterOne: onComplete
注意: 只有當上游和下游建立連接之后, 上游才會開始發送事件. 也就是調用了subscribe()
方法之后才開始發送事件.
- 接下來解釋一下其中兩個陌生的玩意:
ObservableEmitter
和Disposable
.
onNext(T value)
、
onComplete()
和
onError(Throwable error)
就可以分別發出next事件、complete事件和error事件。
但是,請注意,並不意味着你可以隨意亂七八糟發射事件,需要滿足一定的規則:
(1) 上游可以發送無限個onNext, 下游也可以接收無限個onNext.
(2) 當上游發送了一個onComplete后, 上游onComplete之后的事件將會繼續
發送, 而下游收到onComplete事件之后將不再繼續
接收事件.
(3) 當上游發送了一個onError后, 上游onError之后的事件將繼續
發送, 而下游收到onError事件之后將不再繼續
接收事件.
(4) 上游可以不發送onComplete或onError.
(5) 最為關鍵的是onComplete和onError必須唯一並且互斥, 即不能發多個onComplete, 也不能發多個onError, 也不能先發一個onComplete, 然后再發一個onError, 反之亦然
注: 關於onComplete和onError唯一並且互斥這一點, 是需要自行在代碼中進行控制, 如果你的代碼邏輯中違背了這個規則, **並不一定會導致程序崩潰.
** 比如發送多個onComplete是可以正常運行的, 依然是收到第一個onComplete就不再接收了, 但若是發送多個onError, 則收到第二個onError事件會導致程序會崩潰.
只發送onNext事件 |
![]() |
發送onComplete事件 |
![]() |
發送onError事件 |
![]() |
-
介紹了ObservableEmitter, 接下來介紹Disposable, 這個單詞的字面意思是一次性用品,用完即可丟棄的. 那么在RxJava中怎么去理解它呢, 對應於上面的水管的例子, 我們可以把它理解成兩根管道之間的一個機關, 當調用它的
dispose()
方法時, 它就會將兩根管道切斷, 從而導致下游收不到事件. 相當於我們平時注冊監聽后,用完了,要反注冊監聽。
注意: 調用dispose()並不會導致上游不再繼續發送事件, 上游會繼續發送剩余的事件.
ChapterOne: public static void case2() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "emit 1"); emitter.onNext(1); Log.d(TAG, "emit 2"); emitter.onNext(2); Log.d(TAG, "emit 3"); emitter.onNext(3); Log.d(TAG, "emit onComplete"); emitter.onComplete(); Log.d(TAG, "emit 4"); emitter.onNext(4); } }).subscribe(new Observer<Integer>() { private Disposable mDisposable; private int mCount = 0; @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); mDisposable = d; } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: value = " + value); mCount++; if (mCount == 2) { Log.d(TAG, "dispose"); mDisposable.dispose(); Log.d(TAG, "isDisposed : " + mDisposable.isDisposed()); } } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
打印結果如下:
08-05 17:06:53.809 24399 24399 D ChapterOne: onSubscribe 08-05 17:06:53.809 24399 24399 D ChapterOne: emit 1 08-05 17:06:53.809 24399 24399 D ChapterOne: onNext: value = 1 08-05 17:06:53.809 24399 24399 D ChapterOne: emit 2 08-05 17:06:53.809 24399 24399 D ChapterOne: onNext: value = 2 08-05 17:06:53.809 24399 24399 D ChapterOne: dispose 08-05 17:06:53.809 24399 24399 D ChapterOne: isDisposed : true 08-05 17:06:53.809 24399 24399 D ChapterOne: emit 3 08-05 17:06:53.809 24399 24399 D ChapterOne: emit onComplete 08-05 17:06:53.809 24399 24399 D ChapterOne: emit 4
- 另外,
subscribe()
有多個重載的方法:
public final Disposable subscribe() {} public final Disposable subscribe(Consumer<? super T> onNext) {} public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {} public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {} public final void subscribe(Observer<? super T> observer) {}
3. 線程調度
- 正常情況下, 上游和下游是工作在同一個線程中的, 也就是說上游在哪個線程發事件, 下游就在哪個線程接收事件.
ChapterOne: public static void case3() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "subscribe: thread = " + Thread.currentThread()); emitter.onNext(1); emitter.onComplete(); } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "accept: thread = " + Thread.currentThread()); } }); }
打印結果如下:
08-05 19:27:09.926 29640 29640 D ChapterOne: subscribe: thread = Thread[main,5,main]
08-05 19:27:09.926 29640 29640 D ChapterOne: accept: thread = Thread[main,5,main]
- 對於一般的需求場景,需要在子線程中實現耗時的操作;然后回到主線程實現
UI
操作。怎么辦?
解決方案:采用 RxJava
內置的線程調度器( Scheduler
),即通過 功能性操作符subscribeOn()
& observeOn()
實現。
作用:線程控制,即指定 被觀察者 (Observable)
/ 觀察者(Observer)
的工作線程類型
線程類型:
類型 | 含義 | 應用場景 |
---|---|---|
Schedulers.immediate() | 當前線程 = 不指定線程 | 默認 |
AndroidSchedulers.mainThread() | Android主線程 | 操作UI |
Schedulers.newThread() | 常規新線程 | 耗時等操作 |
Schedulers.io() | io操作線程 | 網絡請求、讀寫文件等io密集型操作 |
Schedulers.computation() | CPU計算操作線程 | 大量計算操作 |
- 具體使用
public static void case4() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "subscribe: thread = " + Thread.currentThread()); emitter.onNext(1); emitter.onComplete(); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "accept: thread = " + Thread.currentThread()); } }); }
特殊注意:(1) 若Observable.subscribeOn()
多次指定被觀察者 生產事件的線程,則只有第一次指定有效,其余的指定線程無效。
(2) 若Observable.observeOn()多次指定觀察者 接收 & 響應事件的線程,則每次指定均有效,即每指定一次,就會進行一次線程的切換。
ChapterOne: public static void case5() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "subscribe: thread = " + Thread.currentThread()); emitter.onNext(1); emitter.onComplete(); } }) .subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.newThread()) .doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "doOnNext accept: thread = " + Thread.currentThread()); } }) .observeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "onNext: accept: thread = " + Thread.currentThread()); } }); }
- 實例:讀寫數據庫
public Observable<List<Record>> readAllRecords() { return Observable.create(new ObservableOnSubscribe<List<Record>>() { @Override public void subscribe(ObservableEmitter<List<Record>> emitter) throws Exception { Cursor cursor = null; try { cursor = getReadableDatabase().rawQuery("select * from " + TABLE_NAME, new String[]{}); List<Record> result = new ArrayList<>(); while (cursor.moveToNext()) { result.add(Db.Record.read(cursor)); } emitter.onNext(result); emitter.onComplete(); } finally { if (cursor != null) { cursor.close(); } } } }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()); } readAllRecords().subscribe(new Consumer<List<Record>>() { @Override public void accept(List<Record> recordList) throws Exception { } })
4. RxJava相關的庫
本系列文章參考:
1、RxJava操作符(一)Creating Observables