RxJava簡介 - 原理及使用


 

     1.    定義 

  • RxJava 在 GitHub 的介紹:
RxJava:a library for composing asynchronous and event-based programs using observable sequences for the Java VM
// 翻譯:RxJava 是一個在 Java VM 上使用可觀測的序列來組成異步的、基於事件的程序的庫
  • 總結:RxJava 是一個 基於事件流、實現異步操作的庫。

     2.    原理

  • RxJava: 基於 一種擴展的觀察者模式

  • RxJava的擴展觀察者模式中有4個角色:
角色 作用
被觀察者(Observable) 產生事件
觀察者(Observer) 接收事件,並給出響應動作
訂閱(Subscribe) 連接 被觀察者 & 觀察者, 相當於注冊監聽
事件(Event) 被觀察者 & 觀察者 溝通的載體
  • 這里用兩根水管代替觀察者和被觀察者, 用通俗易懂的話把它們的關系解釋清楚, 在這里我將從事件流這個角度來說明RxJava的基本工作原理。

          先假設有兩根水管:上面一根水管為事件產生的水管,叫它上游吧,下面一根水管為事件接收的水管叫它下游吧。

                     

          兩根水管通過一定的方式連接起來,使得上游每產生一個事件,下游就能收到該事件。

          這里的上游下游就分別對應着RxJava中的ObservableObserver,它們之間的連接就對應着subscribe(),因此這個關系用RxJava來表示就是:

ChapterOne:

    public static void case1() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "onNext: value = " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

   打印結果如下:

08-05 17:00:05.072 24086 24086 D ChapterOne: onSubscribe
08-05 17:00:05.073 24086 24086 D ChapterOne: onNext: value = 1
08-05 17:00:05.073 24086 24086 D ChapterOne: onNext: value = 2
08-05 17:00:05.073 24086 24086 D ChapterOne: onNext: value = 3
08-05 17:00:05.073 24086 24086 D ChapterOne: onComplete

           注意: 只有當上游和下游建立連接之后, 上游才會開始發送事件. 也就是調用了subscribe() 方法之后才開始發送事件.

 

  • 接下來解釋一下其中兩個陌生的玩意:ObservableEmitterDisposable.
          ObservableEmitter: Emitter是發射器的意思,那就很好猜了,這個就是用來發出事件的,它可以發出三種類型的事件,通過調用emitter的 onNext(T value)onComplete()onError(Throwable error)就可以分別發出next事件、complete事件和error事件。

           但是,請注意,並不意味着你可以隨意亂七八糟發射事件,需要滿足一定的規則

              (1)  上游可以發送無限個onNext, 下游也可以接收無限個onNext.

              (2)  當上游發送了一個onComplete后, 上游onComplete之后的事件將會繼續發送, 而下游收到onComplete事件之后將不再繼續接收事件.

              (3)  當上游發送了一個onError后, 上游onError之后的事件將繼續發送, 而下游收到onError事件之后將不再繼續接收事件.

              (4)  上游可以不發送onComplete或onError.

              (5)  最為關鍵的是onComplete和onError必須唯一並且互斥, 即不能發多個onComplete, 也不能發多個onError, 也不能先發一個onComplete, 然后再發一個onError, 反之亦然

   注: 關於onComplete和onError唯一並且互斥這一點,  是需要自行在代碼中進行控制, 如果你的代碼邏輯中違背了這個規則, **並不一定會導致程序崩潰. 
** 比如發送多個onComplete是可以正常運行的, 依然是收到第一個onComplete就不再接收了, 但若是發送多個onError, 則收到第二個onError事件會導致程序會崩潰.

 

只發送onNext事件
 
 
發送onComplete事件
  
              
 
發送onError事件
 
 

 

  • 介紹了ObservableEmitter, 接下來介紹Disposable, 這個單詞的字面意思是一次性用品,用完即可丟棄的. 那么在RxJava中怎么去理解它呢, 對應於上面的水管的例子, 我們可以把它理解成兩根管道之間的一個機關, 當調用它的 dispose()方法時, 它就會將兩根管道切斷, 從而導致下游收不到事件. 相當於我們平時注冊監聽后,用完了,要反注冊監聽。

          注意: 調用dispose()並不會導致上游不再繼續發送事件, 上游會繼續發送剩余的事件.

ChapterOne:

    public static void case2() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit onComplete");
                emitter.onComplete();
                Log.d(TAG, "emit 4");
                emitter.onNext(4);
            }
        }).subscribe(new Observer<Integer>() {
            private Disposable mDisposable;
            private int mCount = 0;

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
                mDisposable = d;
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "onNext: value = " + value);
                mCount++;
                if (mCount == 2) {
                    Log.d(TAG, "dispose");
                    mDisposable.dispose();
                    Log.d(TAG, "isDisposed : " + mDisposable.isDisposed());
                }
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

          打印結果如下:

08-05 17:06:53.809 24399 24399 D ChapterOne: onSubscribe
08-05 17:06:53.809 24399 24399 D ChapterOne: emit 1
08-05 17:06:53.809 24399 24399 D ChapterOne: onNext: value = 1
08-05 17:06:53.809 24399 24399 D ChapterOne: emit 2
08-05 17:06:53.809 24399 24399 D ChapterOne: onNext: value = 2
08-05 17:06:53.809 24399 24399 D ChapterOne: dispose
08-05 17:06:53.809 24399 24399 D ChapterOne: isDisposed : true
08-05 17:06:53.809 24399 24399 D ChapterOne: emit 3
08-05 17:06:53.809 24399 24399 D ChapterOne: emit onComplete
08-05 17:06:53.809 24399 24399 D ChapterOne: emit 4

 

  • 另外, subscribe()有多個重載的方法:
    public final Disposable subscribe() {}
    public final Disposable subscribe(Consumer<? super T> onNext) {}
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
    public final void subscribe(Observer<? super T> observer) {}

 

     3.    線程調度

  •  正常情況下, 上游和下游是工作在同一個線程中的, 也就是說上游在哪個線程發事件, 下游就在哪個線程接收事件. 
ChapterOne:

    public static void case3() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "subscribe: thread = " + Thread.currentThread());
                emitter.onNext(1);
                emitter.onComplete();
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "accept: thread = " + Thread.currentThread());
            }
        });
    }

          打印結果如下:

08-05 19:27:09.926 29640 29640 D ChapterOne: subscribe: thread = Thread[main,5,main]
08-05 19:27:09.926 29640 29640 D ChapterOne: accept: thread = Thread[main,5,main]
  • 對於一般的需求場景,需要在子線程中實現耗時的操作;然后回到主線程實現 UI操作。怎么辦?

           解決方案:采用 RxJava內置的線程調度器( Scheduler ),即通過 功能性操作符subscribeOn() & observeOn()實現。

           作用:線程控制,即指定 被觀察者 (Observable) / 觀察者(Observer) 的工作線程類型

           線程類型:

類型 含義 應用場景
Schedulers.immediate() 當前線程 = 不指定線程 默認
AndroidSchedulers.mainThread() Android主線程 操作UI
Schedulers.newThread() 常規新線程 耗時等操作
Schedulers.io() io操作線程 網絡請求、讀寫文件等io密集型操作
Schedulers.computation() CPU計算操作線程 大量計算操作


  • 具體使用
    public static void case4() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "subscribe: thread = " + Thread.currentThread());
                emitter.onNext(1);
                emitter.onComplete();
            }
        })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "accept: thread = " + Thread.currentThread());
                    }
                });
    }

           特殊注意:(1) 若Observable.subscribeOn()多次指定被觀察者 生產事件的線程,則只有第一次指定有效,其余的指定線程無效。

                             (2) 若Observable.observeOn()多次指定觀察者 接收 & 響應事件的線程,則每次指定均有效,即每指定一次,就會進行一次線程的切換。

ChapterOne:

    public static void case5() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "subscribe: thread = " + Thread.currentThread());
                emitter.onNext(1);
                emitter.onComplete();
            }
        })
                .subscribeOn(Schedulers.io())
                .subscribeOn(Schedulers.newThread())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "doOnNext accept: thread = " + Thread.currentThread());
                    }
                })
                .observeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "onNext: accept: thread = " + Thread.currentThread());
                    }
                });
    }
  • 實例:讀寫數據庫
        public Observable<List<Record>> readAllRecords() {
            return Observable.create(new ObservableOnSubscribe<List<Record>>() {
                @Override
                public void subscribe(ObservableEmitter<List<Record>> emitter) throws Exception {
                    Cursor cursor = null;
                    try {
                        cursor = getReadableDatabase().rawQuery("select * from " + TABLE_NAME, new String[]{});
                        List<Record> result = new ArrayList<>();
                        while (cursor.moveToNext()) {
                            result.add(Db.Record.read(cursor));
                        }
                        emitter.onNext(result);
                        emitter.onComplete();
                    } finally {
                        if (cursor != null) {
                            cursor.close();
                        }
                    }
                }
            }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
        }


            readAllRecords().subscribe(new Consumer<List<Record>>() {
                @Override
                public void accept(List<Record> recordList) throws Exception {

                }
            })

 

     4.    RxJava相關的庫

 

本系列文章參考:

1、RxJava操作符(一)Creating Observables

2、RxJava文檔中文版

3、Android RxJava:這是一份全面 & 詳細 的RxJava操作符 使用攻略

4、收集了RxJava常見的使用場景,例子簡潔、經典、易懂...


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM