Github 相關代碼: Github地址
一直感覺 RxJava2 的取消訂閱有點混亂, 這樣也能取消, 那樣也能取消, 沒能系統起來的感覺就像掉進了盤絲洞, 迷亂…
下面說說這幾種情況
幾種取消的情況
-
subscribe 時返回了 disposable:

-
subscribe 不返回 disposable, 從 observer 的 onSubscribe 中獲取:

-
之前從網上看的, 使用繼承 DisposableObserver 的 observer, 這個 observer 可以直接 dispose

源碼分析
啰嗦啥啊, 這么簡單的東西還需要貼源碼?
大哥, 下面有總結….
從第一種情況開始看, 我們進入到 .subscribe((s) -> {}) 中看, 發現它是返回了一個四參數的重載方法
public final Disposable subscribe(Consumer<? super T> onNext) { return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer()); }
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) { ... LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe); subscribe(ls); return ls; }
可以看到, 這個方法里創建了一個 LambdaObserver, 這個 Observer 實現了Disposable 接口, 所以可以直接作為 Disposable 返回到最上級, 這就是為什么第一種情況中的 subscribe 能返回 disposable 的原因.
public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable, LambdaConsumerIntrospection { ... }
而第二種情況的 subscribe 其實就是上面方法里的 subscribe(LambdaObserver)
public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
這個方法內執行了 subscribeActual(observer) 抽象方法, 交給了 Observale 的子類重寫.
第三種情況 中出現的 DisposableObserver 與 LambdaObserver 差不多, 甚至更簡單
public abstract class DisposableObserver<T> implements Observer<T>, Disposable { final AtomicReference<Disposable> s = new AtomicReference<Disposable>(); @Override public final void onSubscribe(@NonNull Disposable s) { if (EndConsumerHelper.setOnce(this.s, s, getClass())) { onStart(); } } protected void onStart() { } @Override public final boolean isDisposed() { return s.get() == DisposableHelper.DISPOSED; } @Override public final void dispose() { DisposableHelper.dispose(s); } }
簡單總結
稍微一看我們就明白了, 當傳入 Observer 接口的四個方法時, subscribe 在內部構建了一個 LambdaObserver , 而這個 LambdaObserver 和第三種情況的 DisposableObserver 都實現了 Disposable 接口, 所以可以作為 Disposable 返回, 就是這么簡單.
另外第三種情況里出現的 CompositeDisposable, 簡單說就是一個 Disposable 集合( 由 RxJava 內部提供的OpenHashSet 維護, 線程安全 ), CompositeDisposable.dispose() 時會遍歷內部的所有 Disposable 執行 dispose 操作.
/** * Dispose the contents of the OpenHashSet by suppressing non-fatal * Throwables till the end. * @param set the OpenHashSet to dispose elements of */ void dispose(OpenHashSet<Disposable> set) { ... for (Object o : array) { if (o instanceof Disposable) { try { ((Disposable) o).dispose(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); if (errors == null) { errors = new ArrayList<Throwable>(); } errors.add(ex); } } } ... }
幾種方式的適用情況
-
如果是零星使用的話, 第一種最方便,
observer的四個方法可以按需使用, 相同邏輯的方法有多種可供選擇
-
如果使用的
Observer有很多共同邏輯, 則可以寫一個BaseObserver繼承DisposableObserver或者LambdaObserver, 直接使用xxObserver.dispose()-
open class BaseObserver<T>() : DisposableObserver<T>()
-
- 如果有很多的
diposable需要取消的話, 使用CompositeDisposable會更簡單一些
如何在MVP中自動取消訂閱避免內存泄漏, 在我的github中有封裝示例 Github地址
