Android 異步框架 RxJava2


觀察者模式的概念

RxJava是android的異步框架,官方介紹是可觀測的序列,組成異步基於事件程序的庫。特點是觀察者模式,基於事件流的鏈式調用,隨着異步操作調度過程復雜的情況下,程序邏輯也變得越來越復雜,但RxJava依然能夠保持簡潔。

簡單的說觀察者A與被觀察者B建立訂閱關系,當被觀察者B發生某種改變時,立即通知觀察者A

添加依賴

compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

基本模式

Observable被觀察者

注意各地方添加泛型避免大片警告,onNext()是事件的回調,onComplete()是事件的結尾。onComplete()與onError互斥需要保持唯一性,並只能調用一次。

Observable<String> observable= Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("消息1");
                e.onNext("消息2");
                e.onNext("消息3");
                e.onComplete();
    }
});

Observer觀察者

創建觀察者時回調的onSubscribe可以獲取Disposable對象,在合適的時候判斷條件,調用dispose()即可接觸訂閱關系

Observer<String> observer=new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        //通過判斷解除訂閱關系
         d.dispose();
    }

    @Override
    public void onNext(String o) {
        //對應observable的onNext方法
    }

    @Override
    public void onError(Throwable e) {
        //對應observable的onError方法
    }

    @Override
    public void onComplete() {
        //對應observable的onComplete方法
    }
};

建立訂閱關系

observable.subscribeOn(Schedulers.io()) //指定事件生產在子線程
          .observeOn(AndroidSchedulers.mainThread()) //指定事件消費在UI線程
          .subscribe(observer);

Observable被觀察者的其他模式

//just模式,將自動發送onNext()事件
Observable<String> observable = Observable.just("發送消息");

//fromIterable模式,遍歷集合,並自動發送onNext()事件
Observable<String> observable = Observable.fromIterable((Iterable<String>) mList);

//interval模式,定時自動發送整數序列,從0開始每隔2秒計數,
Observable<Long> observable = Observable.interval(0,2, TimeUnit.SECONDS)

//range模式,自動發送特定的整數序列,0表示不發送,負數會拋異常,從1開始發送到20
Observable<Integer> observable = Observable.range(1,20);

//timer模式,定時執行觀察者的onNext()方法
Observable<Integer> observable = Observable.timer(2, TimeUnit.SECONDS);

Observable被觀察者的更多創建方式以及操作符

如創建操作,數據過濾操作,條件操作,轉載以下博客,很詳細:

RxJava操作符大全

Scheduler調度器

四種常見模式

Schedulers.immediate() 默認模式,在當前線程運行

Schedulers.newThread() 創建新的子線程運行

Schedulers.io() 創建新的子線程運行,內部使用的是無上限的線程池,可重用空閑的線程,效率高

 AndroidSchedulers.mainThread() 在UI主線程運行

訂閱事件時的生產與消費線程

subscribeOn() 指定Observable(被觀察者)所在的線程,或者叫做事件產生的線程

observeOn() 指定 Observer(觀察者)所運行在的線程,或者叫做事件消費的線程

新的觀察者模式

Flowable被觀察者

Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
    @Override
    public void subscribe(FlowableEmitter<String> e) throws Exception {
                e.onNext("hello RxJava!");
                e.onComplete();
    }
},BackpressureStrategy.BUFFER);//增加背壓模式

Subscriber觀察者

onSubscribe()會返回Subscription對象,調用cancel()即可取消訂閱關系,request()即可指定消費事件的數量 

Subscriber<String> subscriber=new Subscriber<String>() {
    @Override
    public void onSubscribe(Subscription s) {
         s.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(String s) {
        Log.i("RxJava", "onNext: "+s);
    }

    @Override
    public void onError(Throwable t) {
        Log.i("RxJava", "onError");
    }

    @Override
    public void onComplete() {
        Log.i("RxJava", "onComplete");
    }
};
flowable.subscribe(subscriber);//建立訂閱關系

Backpressure背壓模式

如果生產者和消費者不在同一線程的情況下,如果生產者的速度大於消費者的速度,就會產生Backpressure問題。即異步情況下,Backpressure問題才會存在。

BUFFER

所謂BUFFER就是把RxJava中默認的只能存128個事件的緩存池換成一個大的緩存池,支持存很多很多的數據。
這樣,消費者通過request()即使傳入一個很大的數字,生產者也會生產事件,並將處理不了的事件緩存。
但是這種方式任然比較消耗內存,除非是我們比較了解消費者的消費能力,能夠把握具體情況,不會產生OOM。

DROP

當消費者處理不了事件,就丟棄。
消費者通過request()傳入其需求n,然后生產者把n個事件傳遞給消費者供其消費。其他消費不掉的事件就丟掉

LATEST

LATEST與DROP功能基本一致,唯一的區別就是LATEST總能使消費者能夠接收到生產者產生的最后一個事件

ERROR

這種方式會在產生Backpressure問題的時候直接拋出一個異常,這個異常就是著名的MissingBackpressureException

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM