RxJava Map操作詳解


2016-06-06

  RxJava是最近兩年火起來的一個框架,核心是異步,但是對於我來說印象最深的是響應式編程的思想。最近剛好想把自己的項目改成用RxJava實現,所以就研究了下。拋物線和大頭鬼兩位大牛也講解的很詳細和形象,其實RxJava里除了這種響應式的編程思想不太好理解外,操作符也是比較難理解的一部分。響應式編程思想不是三言兩語就能講清楚,想學習的人也不是通過看幾遍blog就能學會的。我這里主要是講操符,通過分解的方式一步一步帶領大家看着到底是怎么回事,就以常用的map為例。

首先看一段偽代碼: 

Observable.create(new Observable.Onsubscrible(){  //------------ Observable one
  @override
  public object call(…subscriber){  //----------------------call one
     ……
     subscriber.onNext();  //-------------- onNext one
     …….
}
}).map(new Func1(){ // ----------------------map 操作后返回的為Observable two
  @override
  Public object call(…){   //-----------------------------call two
     ……. 
}

}).subscrible(new Subscriber(){
   @override
   public void onCompleted(){}
   @override
   Public void onError(){}
 
   @override
   Public void onNext(){  //-------------------------onNext two
     …….
    }
});

我將剖析上面的一段RxJava代碼。為了一步一步的把問題描述清楚,我們先把上面的代碼簡化如下,定義observableTemp變量:

observableTemp.subscrible(new Subscriber (){…..});

 其中ObservableTemp為:

observableTemp = 
observable.create(new Observable.Onsubscrible(){  //------------ Observable one
  @override
  public object call(…subscriber){  //----------------------call one
     ……
     subscriber.onNext();  //-------------- onNext one
     …….
}
}).map(new Func1(){ // ----------------------map 操作后返回的為Observable two
  @override
  Public object call(…){   //-----------------------------call two
     ……. 
}

});

   

第一步,我們先解析subscrible到底做了什么

 

  hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);

這是整個subscrible方法的核心。onSubscribeStart返回的是observable.onSubscribe,而observable.onSubscribe是傳入參數,為this對象。所以可以把subcrible方法的主要功能簡化為:

 ObservableTemp. onSubscribe.call(new Subscriber (){….});

舉例說明下:如果

ObservableTemp =

Observable.create(new Observable.Onsubscrible(){

  @override

  public object call(…subscriber){  //---------------1-1

     subscriber.onNext();

}

})

那么ObservableTemp. onSubscribe.call(new Subscriber (){….});分解如下:

ObservableTemp. onSubscrib = Observable.Onsubscrible()

這里的1-1傳入值按照我們剛才簡化后形式,應該是new Subscriber ()。那么整個就成了:

new Observable.Onsubscrible(){

  @override

  public object call(…subscriber){ 

……

     subscriber.onNext();

     …….. 

}

}.call(new Subscriber (){….});

這里的call就是調用的@override的call方法,所以這里進一步分解:

public object call(){ 

……

     new Subscriber (){….}.onNext();

     …….. 

}

看明白了嗎?其實整個一大段代碼只是執行了Subscriber里的onNext方法。

相信看到這里已經明白了subscribe方法的作用了。但是如果把ObservableTemp展開呢?也就是還原到開頭的那段代碼,又干了什么事情呢?請看第二步的分解

  

第二步,map到底做了什么

 

  我們已經知道ObservableTemp. onSubscribe.call(new Subscriber (){….});其實是通過ObservableTemp里的Onsubscrible.call方法直接調用Subscriber的onNext方法。那么加入map后,到底是怎么一種調用關系呢?Map字面的意思應該是個映射操作,那到底是不是字面上的映射?如果是映射,到底誰和誰之間的映射呢?

 為了方便說明,我們還是把開頭第一段代碼做個簡化,簡化為:

ObservableTemp.map(new Func1(){…}).subcrible(new Subscriber(){…});

其中ObservableTemp 就是一個簡單的對象創建和賦值過程了:

Observable.create(new Observable.Onsubscrible(){ 
  @override
  public object call(…subscriber){ 
     ……
     subscriber.onNext();
     …….
}
}); // 沒有什么理解上的難度

map的操作代碼是:

lift(new OperatorMap<T, R>(func));

lift是RxJava里核心,幾乎大部分Observable操作都是需要用到lift方法。A.lift(Operator op)返回的是一個新的Observable,新Observable中的OnSubscribe為OnSubscribeLift。

ObservableTemp.map(new Func1(){…}).subcrible(new Subscriber(){…});

就變成了:

new Observable(new OnSubscribeLift(){
@override
  Public object call(…){  
     ……. 
}
}).subcrible(new Subscriber(){…}); // ------------------------- 2-2

是不是很眼熟,Yes, 就是第一步里簡化的步驟。那么這里關鍵的部分來了,OnSubscribeLift里的call做了什么,如果我們把這個搞清楚了,那么map就自然而然的就清楚了。

OnSubscribeLift里的call主要有兩行代碼:

Subscriber<? super T> st = hook.onLift(operator).call(o);
parent.call(st);

這里的operator,o ,parent以及 st到底是什么呢?

 因為這里的o是OnSubscribeLift里call方法傳入的參數,由2-2可以知道,o其實代表的是new Subscriber()。

再看operator,由代碼可以看出:

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
    }

這里的operator就是指經過OperatorMap包裝過的Operator對象:new OperatorMap<T, R>(new Func1());而hook.onLift(operator)返回的就是operator,那么上面兩句代碼可以看成:

Subscriber<? super T> st = (new OperatorMap<T, R>(new Func1())).call(o);
parent.call(st);

OperatorMap里的call做了三件事情:

MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer); 
o.add(parent);
return parent;

把 new subscriber()和new Func1()(就是transformer對象)通過MapSubscriber封裝起來。然后新的Subcriber(MapSubscriber對象)放入訂閱列表中,以便最后一起把訂閱對象釋放。同時返回新的Subcriber。

根據上述的,

Subscriber<? super T> st = (new OperatorMap<T, R>(new Func1())).call(o);
parent.call(st);

可以看成:

Subscriber<? super T> st  = new MapSubscriber<T, R>(o, transformer);
parent.call(st);

而parent是傳入的參數,指ObservableTemp.onSubscribe,也就是開頭代碼中的Observable one。

進一步上述兩行代碼可以看成:

Subscriber<? super T> st  = new MapSubscriber<T, R>(o, transformer);
(new Observable.Onsubscrible(){  //------------ Observable one
  @override
  public object call(…subscriber){  //----------------------call one
     ……
     subscriber.onNext();  //-------------- onNext one
     …….
}
}).call(st);

這里的override里的call參數為new MapSubscriber<T, R>(o, transformer); new MapSubscriber<T, R>(o, transformer).onNext主要代碼為:

@Override
 public void onNext(T t) {
    result = mapper.call(t);
    actual.onNext(result);
 }

Mapper就是transformer對象,也就是new Func1(),而actual就是new subscriber()。也就是說:把Observable one里的類型經過new Func1().call的方法轉換成另外一個Subscriber,最后調用new Subscriber的onNext方法。

所以map整個過程就清楚了:
A.map(B).subscribe(C) 就是:

先通過map方法,把A中想要轉換的數據通過調用B里的call方法進行轉換,最后把轉換過的數據用C里的onNext方法進行處理。

我們拋開onCompleted和onError方法,

A.map(B).subscribe(C)  <==>  C.onNext (B.call(A.onNext()))


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM