1.簡單介紹
1.1.發布/訂閱事件主要用於網絡請求的回調。
事件總線可以使Android各組件之間的通信變得簡單,而且可以解耦。
其實RxJava實現事件總線和EventBus比較類似,他們都依據與觀察者模式。
個人比較習慣用RxJava來實現,因為非常簡單而清晰。
1.2.當然EventBus實現總線的方式也有很多人用。
這里給個傳送門==>EventBus的github地址:https://github.com/greenrobot/EventBus
然后Otto實現總線也不錯==>Otto的github地址:https://github.com/square/otto
1.3.使用RxJava的好處以及注意點
最明顯的好處就是:項目體積縮小了。
注意:使用RxLifecycle來解決RxJava內存泄漏的問題。
==>參考我的另一篇博客:RxLifecycle第三方庫的使用。
1.4.理解一下觀察者模式。
這是一種行為模式。
當你的類或者主對象(稱為被觀察者)的狀態發生改變就會通知所有對此感興趣的類或對象(稱為觀察者)。
1.5.理解一下發布/訂閱
發布/訂閱 模式的功能和觀察者模式是一樣的,都是完成特定事件發生后的消息通知。
觀察者模式和發布/訂閱模式之間還是存在了一些差別,在發布/訂閱模式中重點是發布消息,然后由調度中心
統一調度,不需要知道具體有哪些訂閱者。(這樣就可以匿名)
為什么要匿名?
在計算機程序設計中有一個非常棒的思想叫“解耦”。你通常希望在你的設計中保持盡可能低的耦合度。
通常情況下,你希望消息發布商能夠直接了解所有需要接收消息的訂閱者,
這樣,一旦“事件”或消息准備好就可以及時通知每一個訂閱者。
但是使用事件總線,發布者可以免除這種職責並實現獨立性,
因為消息發布者和消息訂閱者可以相互不知道對方,只關心對應的消息,從而接觸兩者之間的依賴關系
怎么實現匿名?
提到匿名,自然而然你就會問:你是如何真正實現發布者和訂閱者之間的匿名?
很簡單,只要找到一個中間人,讓這個中間人負責兩方的消息調度。事件總線就是一個這樣的中間人。
綜上所述,事件總線就是這么簡單。
1.6.使用RxJava實現事件總線的簡單案例
github參考案例地址:https://github.com/kaushikgopal/RxJava-Android-Samples
如下面的例子:
我們從頂部片段(綠色部分)發布事件,並從底部片段(藍色部分)監聽點擊事件(通過事件總線)。
怎么實現這個功能呢?
第一步自定義一個事件總線
public class RxBus { private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create()); public void send(Object o) { _bus.onNext(o); } public Observable<Object> toObserverable() { return _bus; } }
第二步將事件發布到總線中。
@OnClick(R.id.btn_demo_rxbus_tap) public void onTapButtonClicked() { _rxBus.send(new TapEvent()); }
第三步監聽來自其他組件或服務事件
_rxBus.toObserverable() .subscribe(new Action1<Object>() { @Override public void call(Object event) { if(event instanceof TapEvent) { _showTapText(); }else if(event instanceof SomeOtherEvent) { _doSomethingElse(); } } });
1.7.本篇文章的參考文獻
2.封裝好的總線類
2.1.RxJava1.x的總線實現方式
/** * desc : 利用 PublishSubject的特性:與普通的Subject不同,在訂閱時並不立即觸發訂閱事件, * 而是允許我們在任意時刻手動調用onNext(),onError(),onCompleted來觸發事件。 */ public class RxBus { private ConcurrentHashMap<Object, List<Subject>> subjectMapper = new ConcurrentHashMap<>(); private RxBus() { } private static class Holder { private static RxBus instance = new RxBus(); } public static RxBus getInstance() { return Holder.instance; } public <T> Observable<T> register(@NonNull Class<T> clz) { return register(clz.getName()); } public <T> Observable<T> register(@NonNull Object tag) { List<Subject> subjectList = subjectMapper.get(tag); if (null == subjectList) { subjectList = new ArrayList<>(); subjectMapper.put(tag, subjectList); } Subject<T, T> subject = PublishSubject.create(); subjectList.add(subject); //System.out.println("注冊到rxbus"); return subject; } public <T> void unregister(@NonNull Class<T> clz, @NonNull Observable observable) { unregister(clz.getName(), observable); } public void unregister(@NonNull Object tag, @NonNull Observable observable) { List<Subject> subjects = subjectMapper.get(tag); if (null != subjects) { subjects.remove(observable); if (subjects.isEmpty()) { subjectMapper.remove(tag); //System.out.println("從rxbus取消注冊"); } } } public void post(@NonNull Object content) { post(content.getClass().getName(), content); } public void post(@NonNull Object tag, @NonNull Object content) { List<Subject> subjects = subjectMapper.get(tag); if (!subjects.isEmpty()) { for (Subject subject: subjects) { subject.onNext(content); } } } }
幾個關鍵方法: register —— 由tag,生成一個subject List,同時利用PublishSubject創建一個Subject並返回,
它同時也是Observable的子類。
unregister —— 移除tag對應subject List 中的Observable。若subject List為空,也將被移除。
post —— 遍歷tag對應subject List 中的Subject,執行onNext()。
這里實際執行的是觀察者Observer的onNext(),
Subject的定義:public abstract class Subject<T, R> extends Observable<R> implements Observer<T>。
測試代碼:
/* rxbus */ Observable<String> observable = RxBus.getInstance().register(String.class); observable.map(s -> { try { int v = Integer.valueOf(s); System.out.println("map變換成功, source = " + s); return v; } catch (Exception e) { System.out.println("map變換失敗, source = " + s); return s; } }).subscribe(value -> { System.out.println("訂閱 " + value); }); RxBus.getInstance().post("888"); RxBus.getInstance().post("發發發"); RxBus.getInstance().unregister(String.class, observable);
//這里比較有意思的是,使用了lambda表達式。
//在map變換時,如果將字符串轉成Integer,沒有問題就返回整型;
//若報異常,就返回String型。
//同樣的,在最終訂閱時,value參數的類型也是由map變換來決定的。
2.2.RxJava2.0總線實現類
因為在RxJava2.0之后,io.reactivex.Observable中沒有進行背壓處理了。
如果有大量消息堆積在總線中來不及處理會產生OutOfMemoryError。
有新類io.reactivex.Flowable專門針對背壓問題。
無背壓處理的Observable實現,跟RxJava1.0x中一樣,使用PublishSubject來實現。
要實現有背壓的2.0x版,使用FlowableProcessor的子類PublishProcessor來產生Flowable。
源代碼如下:
public class RxBus { private final FlowableProcessor<Object> mBus; private RxBus() { mBus = PublishProcessor.create().toSerialized(); } private static class Holder { private static RxBus instance = new RxBus(); } public static RxBus getInstance() { return Holder.instance; } public void post(@NonNull Object obj) { mBus.onNext(obj); } public <T> Flowable<T> register(Class<T> clz) { return mBus.ofType(clz); } public void unregisterAll() { //會將所有由mBus 生成的 Flowable 都置 completed 狀態 后續的 所有消息 都收不到了 mBus.onComplete(); } public boolean hasSubscribers() { return mBus.hasSubscribers(); } }
測試代碼:
Flowable<Integer> f1 = RxBus.getInstance().register(Integer.class); f1.subscribe(value -> System.out.println("訂閱f1消息 .. " + value)); RxBus.getInstance().post(999);
3.實際項目調用方式
3.1.首先自定義一個RxBus。
這個類感覺有點像工具類。和其他函數沒有任何耦合關系。
這個類見在上面2中封裝好的RxBus類。
3.2.在BaseListFragment實現了LazyLoadFragment中的抽象函數。
這里解釋一下:
BaseListFragment是一個可以刷新可以加載更多的一個碎片。
LazyLoadFragment是一個懶加載的被BaseListFragmetn繼承的一個基類。
LazyLoadFragment通過判斷是否可見的函數setUserVisibleHint執行了一個抽象函數fetchData()。
adapter是頁面內容的一個適配器。
然后在BaseListFragment中重寫這個抽象函數。
@Override public void fetchData() { observable = RxBus.getInstance().register(BaseListFragment.TAG); observable.subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { adapter.notifyDataSetChanged(); } }); }
observable.subscribe(new Consumer<Integer>)返回的是一個Disposable類型。
如下面Disposable的簡單使用方式。
Disposable disposable = observable.subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { //這里接收數據項 } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { //這里接收onError } }, new Action() { @Override public void run() throws Exception { //這里接收onComplete。 } });
3.3.小貼士
RxBus的注冊與反注冊一定要對應出現。
一般在活動或者Fragment中的onStart中register這個活動或者片段的TAG(也就是一個唯一標識字符串)。
一般在活動或者Fragment中的onDestroy中ungister這個活動或者片段的TAG。
post用於傳遞消息,看情況調用唄。