RxJava中觀察者模式的實現


一、RxJava簡介

  • RxJava官方的解釋如下:

要想了解RxJava首先需要了解Rx,Rx的全稱是reactive extension,翻譯一下就是響應式擴展,Rx是基於觀察者模式的一種編程模型,目標是提供一致的編程接口,幫助開發者更方便地處理異步數據流,Rx滲透到了各種語言中,RxJava就是針對Java語言的一個異步的響應式編程庫,它是對觀察者模式的擴展。很多Android開發者都非常喜歡這個簡潔的編程庫。

 

  • RxJava源碼:

    https://github.com/ReactiveX/RxJava/tree/2.x/src

 

二、觀察者模式

  • 觀察者模式概念

      觀察者模式是將觀察者與被觀察者分離開,實現了對象間一種一對多的組合關系,當被觀察者的狀態發生變化時,所有依賴於它的觀察者就會檢測到變化並且刷新自己的狀態。

 

  • 觀察者模式中的四個重要角色

抽象主題:定義添加和刪除觀察者的功能;

抽象觀察者:定義觀察者收到主題通知后要做什么事情;

具體主題:抽象主題的具體實現;

具體觀察者:抽象觀察者的具體實現。

       大概的步驟就是1、創建被觀察者;2、創建觀察者;3、為被觀察者添加觀察者;4、被觀察者通知所有的觀察者發生變化。了解了觀察者模式的大致實現步驟幫助我們更好的理解RxJava中的觀察者模式的實現。

 

三、從RxJava源碼中看觀察者模式

 依然從上文中提到的四個角色結合實現的四個步驟來看RxJava源碼中的實現:

  • 抽象主題:
public abstract class Observable<T> implements ObservableSource<T> {
……
}

  這是一個Observable類,實現ObservableSource的接口;

public interface ObservableSource<T> {

    /**
     * Subscribes the given Observer to this ObservableSource instance.
     * @param observer the Observer, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(@NonNull Observer<? super T> observer);
}

  ObservableSource接口中subscribe用來實現訂閱觀察者角色的功能,這里的ObservableSource就是抽象主題的角色。

  • 抽象觀察者
public interface Observer<T> {

    void onSubscribe(@NonNull Disposable d);

    void onNext(@NonNull T t);

    void onError(@NonNull Throwable e);

    void onComplete();

}

  Observable就是觀察者的角色,上面就是Observer接口,定義了觀察者收到被觀察者的通知后要做的事情。

  • 具體主題:

有了抽象主題,下一步就是實現觀察者模式的第一步,創建具體主題也就是被觀察者;

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

  RxJava提供了很多的創建的方法,這是其中一種create;

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});

  類似於以上的方式,創建了Observable並且定義了一些事件觸發的規則,create時,傳入了一個OnSubscribe參數,它相當於一個計划表,當該Observable被訂閱時,它就會調用call()方法,觸發定義好的事件。

  • 具體觀察者
Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

  RxJava中的observer接口的實現方式如上。

  • 訂閱

在觀察者和被觀察者都被創建之后,最重要的就是將兩者結合起來,也就是實現訂閱,訂閱部分的代碼非常簡單,而且在我初次看到它的實現方式的時候覺得十分奇怪,從上文的創建Observable類中也可以看到,subscribe()是被定義在Observable中的,這有點不符合我們平常的思考方式,據說這樣更加符合流式API的設計。

observable.subscribe(observer);

  還有一種寫法是DefaultObserver,這里的DefaultObserver和observer的使用是相同的,subscriber是observer的抽象類。

  下面從DefaultObserver的部分源碼中看訂閱的實現機制:

public abstract class DefaultSubscriber<T> implements FlowableSubscriber<T> {

    Subscription upstream;

    @Override
    public final void onSubscribe(Subscription s) {
        if (EndConsumerHelper.validate(this.upstream, s, getClass())) {
            this.upstream = s;
            onStart();
        }
    }

    /**
     * Requests from the upstream Subscription.
     * @param n the request amount, positive
     */
    protected final void request(long n) {
        Subscription s = this.upstream;
        if (s != null) {
            s.request(n);
        }
    }

    /**
     * Cancels the upstream's Subscription.
     */
    protected final void cancel() {
        Subscription s = this.upstream;
        this.upstream = SubscriptionHelper.CANCELLED;
        s.cancel();
    }
    /**
     * Called once the subscription has been set on this observer; override this
     * to perform initialization or issue an initial request.
     * <p>
     * The default implementation requests {@link Long#MAX_VALUE}.
     */
    protected void onStart() {
        request(Long.MAX_VALUE);
    }

}

  

 

  可以看出DefaultObserver有四個方法:1、onStart(),這就是一個准備的方法;2、onSubscribe.call(subscriber),這里事件發送的邏輯開始實現,因此可以看到Observable發送事件不是在創建的時候開始的,而是在建立了訂閱這個連接的時候實現的;3、cancle(),取消一個訂閱連接;4、request(),來自上游訂閱的請求。(RxJava的源碼注釋中用上游表示主題,用下游表示觀察者)

四、觀察者模式的優點

     觀察者模式最主要的優點是它是低耦合的,也就是觀察者和被觀察者都只關注他們之間的接口,而不需要關心對方具體是哪個具體類的實現,當增加觀察者和被觀察者的時候,都對對方的具體實現沒有影響。

     觀察者模式實現的是廣播,被觀察者可以向所有訂閱了它的觀察者發送事件。

     一開始就說RxJava是對觀察者模式的擴展,它對普通的觀察者模式做出了一些調整,主要有:1、觀察者通過onSubscribe()獲取了發送事件的Disposable對象,這樣它就可以獲取訂閱連接中兩者的狀態,甚至可以主動的中斷事件的發送,在普通的觀察者模式中只有被觀察者擁有訂閱的集合並且控制它的訂閱者;2、抽象主題並沒有直接控制onNext(),onError()這些事件,而是關注Emitter實例的發送,而具體的事件發送是在ObservableOnsubscribe接口監聽到ObservableEmitter對象並且接受它之后才實現的。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM