Android 使用RxJava實現一個發布/訂閱事件總線


1.簡單介紹

1.1.發布/訂閱事件主要用於網絡請求的回調。

  事件總線可以使Android各組件之間的通信變得簡單,而且可以解耦。

  其實RxJava實現事件總線和EventBus比較類似,他們都依據與觀察者模式。

  個人比較習慣用RxJava來實現,因為非常簡單而清晰。

  

 

1.2.當然EventBus實現總線的方式也有很多人用。

  這里給個傳送門==>EventBus的github地址:https://github.com/greenrobot/EventBus

  然后Otto實現總線也不錯==>Otto的github地址:https://github.com/square/otto

 

 

1.3.使用RxJava的好處以及注意點

  最明顯的好處就是:項目體積縮小了。

  注意:使用RxLifecycle來解決RxJava內存泄漏的問題。

  ==>參考我的另一篇博客:RxLifecycle第三方庫的使用。

  

 

1.4.理解一下觀察者模式。

  這是一種行為模式。

  當你的類或者主對象(稱為被觀察者)的狀態發生改變就會通知所有對此感興趣的類或對象(稱為觀察者)。

  詳情了解請參考這篇文章:觀察者模式--千軍萬馬穿雲箭。

 

 

1.5.理解一下發布/訂閱

  發布/訂閱 模式的功能和觀察者模式是一樣的,都是完成特定事件發生后的消息通知。

  觀察者模式和發布/訂閱模式之間還是存在了一些差別,在發布/訂閱模式中重點是發布消息,然后由調度中心

  統一調度,不需要知道具體有哪些訂閱者。(這樣就可以匿名)

為什么要匿名?
在計算機程序設計中有一個非常棒的思想叫“解耦”。你通常希望在你的設計中保持盡可能低的耦合度。
通常情況下,你希望消息發布商能夠直接了解所有需要接收消息的訂閱者,
這樣,一旦“事件”或消息准備好就可以及時通知每一個訂閱者。
但是使用事件總線,發布者可以免除這種職責並實現獨立性,
因為消息發布者和消息訂閱者可以相互不知道對方,只關心對應的消息,從而接觸兩者之間的依賴關系
怎么實現匿名?
提到匿名,自然而然你就會問:你是如何真正實現發布者和訂閱者之間的匿名? 
很簡單,只要找到一個中間人,讓這個中間人負責兩方的消息調度。事件總線就是一個這樣的中間人。 綜上所述,事件總線就是這么簡單。

 

 

1.6.使用RxJava實現事件總線的簡單案例

  案例來源:用RxJava實現事件總線-RxBus。

  github參考案例地址:https://github.com/kaushikgopal/RxJava-Android-Samples

  如下面的例子:

  我們從頂部片段(綠色部分)發布事件,並從底部片段(藍色部分)監聽點擊事件(通過事件總線)。

  

  怎么實現這個功能呢?

  第一步自定義一個事件總線 

public class RxBus {
 
  private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create());
 
  public void send(Object o) {
    _bus.onNext(o);
  }
 
  public Observable<Object> toObserverable() {
    return _bus;
  }
}

  第二步將事件發布到總線中。

@OnClick(R.id.btn_demo_rxbus_tap)
public void onTapButtonClicked() {
 
    _rxBus.send(new TapEvent());
}

  第三步監聽來自其他組件或服務事件

_rxBus.toObserverable()
    .subscribe(new Action1<Object>() {
      @Override
      public void call(Object event) {
 
        if(event instanceof TapEvent) {
          _showTapText();
 
        }else if(event instanceof SomeOtherEvent) {
          _doSomethingElse();
        }
      }
    });

 

 

1.7.本篇文章的參考文獻

  Android RxJava實現RxBus。

  Android基於RxJava、RxAndroid的EventBus實現。

  用RxJava實現事件總線-RxBus。


2.封裝好的總線類

2.1.RxJava1.x的總線實現方式

/**
 * desc   : 利用 PublishSubject的特性:與普通的Subject不同,在訂閱時並不立即觸發訂閱事件,
 * 而是允許我們在任意時刻手動調用onNext(),onError(),onCompleted來觸發事件。
 */
public class RxBus {

    private ConcurrentHashMap<Object, List<Subject>> subjectMapper = new ConcurrentHashMap<>();

    private RxBus() {

    }

    private static class Holder {
        private static RxBus instance = new RxBus();
    }

    public static RxBus getInstance() {
        return Holder.instance;
    }

    public <T> Observable<T> register(@NonNull Class<T> clz) {
        return register(clz.getName());
    }

    public <T> Observable<T> register(@NonNull Object tag) {
        List<Subject> subjectList = subjectMapper.get(tag);
        if (null == subjectList) {
            subjectList = new ArrayList<>();
            subjectMapper.put(tag, subjectList);
        }

        Subject<T, T> subject = PublishSubject.create();
        subjectList.add(subject);

        //System.out.println("注冊到rxbus");
        return subject;
    }

    public <T> void unregister(@NonNull Class<T> clz, @NonNull Observable observable) {
        unregister(clz.getName(), observable);
    }

    public void unregister(@NonNull Object tag, @NonNull Observable observable) {
        List<Subject> subjects = subjectMapper.get(tag);
        if (null != subjects) {
            subjects.remove(observable);
            if (subjects.isEmpty()) {
                subjectMapper.remove(tag);
                //System.out.println("從rxbus取消注冊");
            }
        }
    }

    public void post(@NonNull Object content) {
        post(content.getClass().getName(), content);
    }

    public void post(@NonNull Object tag, @NonNull Object content) {
        List<Subject> subjects = subjectMapper.get(tag);
        if (!subjects.isEmpty()) {
            for (Subject subject: subjects) {
                subject.onNext(content);
            }
        }
    }
}
幾個關鍵方法: 
register —— 由tag,生成一個subject List,同時利用PublishSubject創建一個Subject並返回,
它同時也是Observable的子類。
unregister —— 移除tag對應subject List 中的Observable。若subject List為空,也將被移除。
post —— 遍歷tag對應subject List 中的Subject,執行onNext()。
這里實際執行的是觀察者Observer的onNext(),
Subject的定義:
public abstract class Subject<T, R> extends Observable<R> implements Observer<T>。

  測試代碼:

/*
rxbus
 */
Observable<String> observable = RxBus.getInstance().register(String.class);
observable.map(s -> {
    try {
        int v = Integer.valueOf(s);
        System.out.println("map變換成功, source = " + s);
        return v;
    } catch (Exception e) {
        System.out.println("map變換失敗, source = " + s);
        return s;
    }
}).subscribe(value -> {
    System.out.println("訂閱 " + value);
});

RxBus.getInstance().post("888");
RxBus.getInstance().post("發發發");
RxBus.getInstance().unregister(String.class, observable);
//這里比較有意思的是,使用了lambda表達式。
//在map變換時,如果將字符串轉成Integer,沒有問題就返回整型;
//若報異常,就返回String型。
//同樣的,在最終訂閱時,value參數的類型也是由map變換來決定的。

 

 

2.2.RxJava2.0總線實現類

  因為在RxJava2.0之后,io.reactivex.Observable中沒有進行背壓處理了。

  如果有大量消息堆積在總線中來不及處理會產生OutOfMemoryError。

  有新類io.reactivex.Flowable專門針對背壓問題。

 

  無背壓處理的Observable實現,跟RxJava1.0x中一樣,使用PublishSubject來實現。

  要實現有背壓的2.0x版,使用FlowableProcessor的子類PublishProcessor來產生Flowable。

  

  源代碼如下:

public class RxBus {

    private final FlowableProcessor<Object> mBus;

    private RxBus() {
        mBus = PublishProcessor.create().toSerialized();
    }

    private static class Holder {
        private static RxBus instance = new RxBus();
    }

    public static RxBus getInstance() {
        return Holder.instance;
    }

    public void post(@NonNull Object obj) {
        mBus.onNext(obj);
    }

    public <T> Flowable<T> register(Class<T> clz) {
        return mBus.ofType(clz);
    }

    public void unregisterAll() {
        //會將所有由mBus 生成的 Flowable 都置  completed 狀態  后續的 所有消息  都收不到了
        mBus.onComplete();
    }

    public boolean hasSubscribers() {
        return mBus.hasSubscribers();
    }

}

  測試代碼:

Flowable<Integer> f1 = RxBus.getInstance().register(Integer.class);
f1.subscribe(value -> System.out.println("訂閱f1消息 .. " + value));
RxBus.getInstance().post(999);

 


3.實際項目調用方式

3.1.首先自定義一個RxBus。

  這個類感覺有點像工具類。和其他函數沒有任何耦合關系。

  這個類見在上面2中封裝好的RxBus類。

 

 

3.2.在BaseListFragment實現了LazyLoadFragment中的抽象函數。

  這里解釋一下:

  BaseListFragment是一個可以刷新可以加載更多的一個碎片。

  LazyLoadFragment是一個懶加載的被BaseListFragmetn繼承的一個基類。

  LazyLoadFragment通過判斷是否可見的函數setUserVisibleHint執行了一個抽象函數fetchData()。

  adapter是頁面內容的一個適配器。

  然后在BaseListFragment中重寫這個抽象函數。

 @Override
    public void fetchData() {
        observable = RxBus.getInstance().register(BaseListFragment.TAG);
        observable.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                adapter.notifyDataSetChanged();
            }
        });
    }

  observable.subscribe(new Consumer<Integer>)返回的是一個Disposable類型。

  如下面Disposable的簡單使用方式。

 Disposable disposable = observable.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                  //這里接收數據項
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
              //這里接收onError
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
              //這里接收onComplete。
            }
        });

 

 

3.3.小貼士

  RxBus的注冊與反注冊一定要對應出現。

  一般在活動或者Fragment中的onStart中register這個活動或者片段的TAG(也就是一個唯一標識字符串)。

  一般在活動或者Fragment中的onDestroy中ungister這個活動或者片段的TAG。

  post用於傳遞消息,看情況調用唄。

  




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM