RxBus
通過RxJava實現Rxbus。
相信大家已經非常熟悉EventBus了。最近正在學習Rxjava,如果在項目中已經使用了Rxjava,使用RxBus來代替EventBus應該是不錯的選擇。
RxJava最核心的兩個東西是Observables(被觀察者,事件源)和Subscribers(觀察者)。Observables發出一系列事件,Subscribers處理這些事件。
直接看代碼
Note that it is important to subscribe to the exact same rxBus instance that was used to post the events
采用單例模式來保證rxBus對象一致
public class RxBus { private static RxBus rxBus; private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create()); private RxBus() { } public static RxBus getInstance() { if (rxBus == null) { synchronized (RxBus.class) { if (rxBus == null) { rxBus = new RxBus(); } } } return rxBus; } public void send(Object o) { _bus.onNext(o); } public Observable<Object> toObserverable() { return _bus; } }
Activity中發送事件
public void sendTap(View view){ RxBus.getInstance().send(new TapEvent()); } public void sendOther(View view){ RxBus.getInstance().send(new OtherEvent()); }
Fragment中接收事件
RxBus.getInstance().toObserverable() .subscribe(new Action1<Object>() { @Override public void call(Object o) { if (o instanceof TapEvent) { textView.setText("tap"); } else if (o instanceof OtherEvent) { textView.setText("other"); } } });
效果

以上就是使用Rxjava簡單實現RxBus的功能,當然這還遠遠不夠
RxBus升級
在具體使用過程中總會碰到各種各樣的問題
場景1
我在上一個項目中實現了無限輪播的功能,並且希望輪播圖在用戶滑動、不可見、以及程序在后台休眠時都停止滾動,這時候就希望EventBus及時的傳遞這3種狀態,為此我需要寫slideEvent、visibleEvent、aliveEvent3個類,雖然他們都需要傳遞一個簡單的Boolen值。
解決方案
創建一個Event“管家”
類似key-value的方式,每個事件都有自己的唯一的Code,接收事件時根據Code返回對應的content
public class Events<T> { //所有事件的CODE public static final int TAP = 1; //點擊事件 public static final int OTHER = 21; //其它事件 //枚舉 @IntDef({TAP, OTHER}) @Retention(RetentionPolicy.SOURCE) public @interface EventCode {} public @Events.EventCode int code; public T content; public static <O> Events<O> setContent(O t) { Events<O> events = new Events<>(); events.content = t; return events; } public <T> T getContent() { return (T) content; } }
場景2
怎么又內存泄漏了?
每個人在開發過程中,或多或少都會碰到內存泄漏的的問題,我一直有一個天真的想法,RxJava那么牛逼,是不是能無聲無息地就能解決內存泄漏的問題了,答案是否定的。
我看了不少有關RxJava的文章,都會提到
一定要記得在生命周期結束的地方取消訂閱事件,防止RxJava可能會引起的內存泄漏問題。
你可以
@Overrideprotected void onDestroy() { super.onDestroy(); if(!rxSubscription.isUnsubscribed()) { rxSubscription.unsubscribe(); } }
又或者
使用CompositeSubscription把 Subscription 收集到一起,方便 Activity(基類) 銷毀時取消訂閱,防止內存泄漏。
前者可以在任一生命周期階段取消訂閱,缺點是每個acivity/fragment都要重寫方法。
后者可以寫在BaseActivity(大家都不會陌生),每個activity都能用,缺點是不夠靈活。
以上兩種方法似乎都欠缺點意思,所幸Rx家族”人丁興旺“,早已想好了解決方案
RxLifecycle
一、bindToLifecycle()方法
在子類使用Observable中的compose操作符,調用,完成Observable發布的事件和當前的組件綁定,實現生命周期同步。從而實現當前組件生命周期結束時,自動取消對Observable訂閱。
Observable.interval(1, TimeUnit.SECONDS) .compose(this.bindToLifecycle()) .subscribe(new Action1<Long>() { @Override public void call(Long num) { Log.i(TAG, " " +num); } });
二、bindUntilEvent() 方法
使用ActivityEvent類,其中的CREATE、START、 RESUME、PAUSE、STOP、 DESTROY分別對應生命周期內的方法。使用bindUntilEvent指定在哪個生命周期方法調用時取消訂閱。
public enum ActivityEvent { CREATE, START, RESUME, PAUSE, STOP, DESTROY }
public enum FragmentEvent { ATTACH, CREATE, CREATE_VIEW, START, RESUME, PAUSE, STOP, DESTROY_VIEW, DESTROY, DETACH }
組裝零件
public class RxBus { private static RxBus rxBus; private final Subject<Events<?>, Events<?>> _bus = new SerializedSubject<>(PublishSubject.<Events<?>>create()); private RxBus(){} public static RxBus getInstance(){ if (rxBus == null){ synchronized (RxBus.class){ if (rxBus == null){ rxBus = new RxBus(); } } } return rxBus; } public void send(Events<?> o) { _bus.onNext(o); } public void send(@Events.EventCode int code, Object content){ Events<Object> event = new Events<>(); event.code = code; event.content = content; send(event); } public Observable<Events<?>> toObservable() { return _bus; } public static SubscriberBuilder with(FragmentLifecycleProvider provider){ return new SubscriberBuilder(provider); } public static SubscriberBuilder with(ActivityLifecycleProvider provider){ return new SubscriberBuilder(provider); } public static class SubscriberBuilder{ private FragmentLifecycleProvider mFragLifecycleProvider; private ActivityLifecycleProvider mActLifecycleProvider; private FragmentEvent mFragmentEndEvent; private ActivityEvent mActivityEndEvent; private int event; private Action1<? super Events<?>> onNext; private Action1<Throwable> onError; public SubscriberBuilder(FragmentLifecycleProvider provider) { this.mFragLifecycleProvider = provider; } public SubscriberBuilder(ActivityLifecycleProvider provider){ this.mActLifecycleProvider = provider; } public SubscriberBuilder setEvent(@Events.EventCode int event){ this.event = event; return this; } public SubscriberBuilder setEndEvent(FragmentEvent event){ this.mFragmentEndEvent = event; return this; } public SubscriberBuilder setEndEvent(ActivityEvent event){ this.mActivityEndEvent = event; return this; } public SubscriberBuilder onNext(Action1<? super Events<?>> action){ this.onNext = action; return this; } public SubscriberBuilder onError(Action1<Throwable> action){ this.onError = action; return this; } public void create(){ _create(); } public Subscription _create(){ if (mFragLifecycleProvider!=null){ return RxBus.getInstance().toObservable() .compose(mFragmentEndEvent == null ? mFragLifecycleProvider.bindToLifecycle() :mFragLifecycleProvider.<Events<?>>bindUntilEvent(mFragmentEndEvent)) // 綁定生命周期 .filter(new Func1<Events<?>, Boolean>() { @Override public Boolean call(Events<?> events) { return events.code == event; } }) //過濾 根據code判斷返回事件 .subscribe(onNext, onError == null ? new Action1<Throwable>() { @Override public void call(Throwable throwable) { throwable.printStackTrace(); } } : onError); } if (mActLifecycleProvider!=null){ return RxBus.getInstance().toObservable() .compose(mActivityEndEvent == null ? mActLifecycleProvider.bindToLifecycle() :mActLifecycleProvider.<Events<?>>bindUntilEvent(mActivityEndEvent)) .filter(new Func1<Events<?>, Boolean>() { @Override public Boolean call(Events<?> events) { return events.code == event; } }) .subscribe(onNext, onError == null ? (Action1<Throwable>) new Action1<Throwable>() { @Override public void call(Throwable throwable) { throwable.printStackTrace(); } } : onError); } return null; } } }
新BUS上路
依然使用前面的例子
Activity中發送事件
public void sendTap(View view){ RxBus.getInstance().send(Events.TAP, "Tap傳了一個String"); } public void sendOther(View view){ RxBus.getInstance().send(Events.OTHER, null); // RxBus.getInstance().send(Events.OTHER, new OtherEvent("Cloud", 25)); }
Fragment中接收事件
fragment需要繼承RxLifecycle對應組件
public class BlankFragment extends RxFragment {}
RxBus.with(this) .setEvent(Events.TAP) // .setEndEvent(FragmentEvent.DESTROY_VIEW) //不設置默認與fragment生命周期同步 .onNext(new Action1<Events<?>>() { @Override public void call(Events<?> events) { String content = events.getContent(); textView.setText(content); } }) .create(); RxBus.with(this) .setEvent(Events.OTHER) .setEndEvent(FragmentEvent.DESTROY_VIEW) //不設置默認與fragment生命周期同步 .onNext(new Action1<Events<?>>() { @Override public void call(Events<?> events) { OtherEvent event = events.getContent(); textView.setText("Name: " + event.getName() + ",Age: "+ event.getAge()); } }) .onError(new Action1<Throwable>() { @Override public void call(Throwable throwable) { textView.setText(throwable.toString()); } }) // 異常處理,默認捕獲異常,不做處理,程序不會crash。 .create();
效果


完整代碼,請移步
參考資料
- FlowGeek:開源中國Android客戶端MVP架構Material Design設計風格版
- Implementing an Event Bus With RxJava – RxBus:用RxJava實現EventBus