2016-06-06
RxJava是最近兩年火起來的一個框架,核心是異步,但是對於我來說印象最深的是響應式編程的思想。最近剛好想把自己的項目改成用RxJava實現,所以就研究了下。拋物線和大頭鬼兩位大牛也講解的很詳細和形象,其實RxJava里除了這種響應式的編程思想不太好理解外,操作符也是比較難理解的一部分。響應式編程思想不是三言兩語就能講清楚,想學習的人也不是通過看幾遍blog就能學會的。我這里主要是講操符,通過分解的方式一步一步帶領大家看着到底是怎么回事,就以常用的map為例。
首先看一段偽代碼:
Observable.create(new Observable.Onsubscrible(){ //------------ Observable one @override public object call(…subscriber){ //----------------------call one …… subscriber.onNext(); //-------------- onNext one ……. } }).map(new Func1(){ // ----------------------map 操作后返回的為Observable two @override Public object call(…){ //-----------------------------call two ……. } }).subscrible(new Subscriber(){ @override public void onCompleted(){} @override Public void onError(){} @override Public void onNext(){ //-------------------------onNext two ……. } });
我將剖析上面的一段RxJava代碼。為了一步一步的把問題描述清楚,我們先把上面的代碼簡化如下,定義observableTemp變量:
observableTemp.subscrible(new Subscriber (){…..});
其中ObservableTemp為:
observableTemp = observable.create(new Observable.Onsubscrible(){ //------------ Observable one @override public object call(…subscriber){ //----------------------call one …… subscriber.onNext(); //-------------- onNext one ……. } }).map(new Func1(){ // ----------------------map 操作后返回的為Observable two @override Public object call(…){ //-----------------------------call two ……. } });
第一步,我們先解析subscrible到底做了什么
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
這是整個subscrible方法的核心。onSubscribeStart返回的是observable.onSubscribe,而observable.onSubscribe是傳入參數,為this對象。所以可以把subcrible方法的主要功能簡化為:
ObservableTemp. onSubscribe.call(new Subscriber (){….});
舉例說明下:如果
ObservableTemp =
Observable.create(new Observable.Onsubscrible(){
@override
public object call(…subscriber){ //---------------1-1
subscriber.onNext();
}
})
那么ObservableTemp. onSubscribe.call(new Subscriber (){….});分解如下:
ObservableTemp. onSubscrib = Observable.Onsubscrible()
這里的1-1傳入值按照我們剛才簡化后形式,應該是new Subscriber ()。那么整個就成了:
new Observable.Onsubscrible(){
@override
public object call(…subscriber){
……
subscriber.onNext();
……..
}
}.call(new Subscriber (){….});
這里的call就是調用的@override的call方法,所以這里進一步分解:
public object call(){
……
new Subscriber (){….}.onNext();
……..
}
看明白了嗎?其實整個一大段代碼只是執行了Subscriber里的onNext方法。
相信看到這里已經明白了subscribe方法的作用了。但是如果把ObservableTemp展開呢?也就是還原到開頭的那段代碼,又干了什么事情呢?請看第二步的分解
第二步,map到底做了什么
我們已經知道ObservableTemp. onSubscribe.call(new Subscriber (){….});其實是通過ObservableTemp里的Onsubscrible.call方法直接調用Subscriber的onNext方法。那么加入map后,到底是怎么一種調用關系呢?Map字面的意思應該是個映射操作,那到底是不是字面上的映射?如果是映射,到底誰和誰之間的映射呢?
為了方便說明,我們還是把開頭第一段代碼做個簡化,簡化為:
ObservableTemp.map(new Func1(){…}).subcrible(new Subscriber(){…});
其中ObservableTemp 就是一個簡單的對象創建和賦值過程了:
Observable.create(new Observable.Onsubscrible(){ @override public object call(…subscriber){ …… subscriber.onNext(); ……. } }); // 沒有什么理解上的難度
map的操作代碼是:
lift(new OperatorMap<T, R>(func));
lift是RxJava里核心,幾乎大部分Observable操作都是需要用到lift方法。A.lift(Operator op)返回的是一個新的Observable,新Observable中的OnSubscribe為OnSubscribeLift。
ObservableTemp.map(new Func1(){…}).subcrible(new Subscriber(){…});
就變成了:
new Observable(new OnSubscribeLift(){ @override Public object call(…){ ……. } }).subcrible(new Subscriber(){…}); // ------------------------- 2-2
是不是很眼熟,Yes, 就是第一步里簡化的步驟。那么這里關鍵的部分來了,OnSubscribeLift里的call做了什么,如果我們把這個搞清楚了,那么map就自然而然的就清楚了。
OnSubscribeLift里的call主要有兩行代碼:
Subscriber<? super T> st = hook.onLift(operator).call(o); parent.call(st);
這里的operator,o ,parent以及 st到底是什么呢?
因為這里的o是OnSubscribeLift里call方法傳入的參數,由2-2可以知道,o其實代表的是new Subscriber()。
再看operator,由代碼可以看出:
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) { return lift(new OperatorMap<T, R>(func)); }
這里的operator就是指經過OperatorMap包裝過的Operator對象:new OperatorMap<T, R>(new Func1());而hook.onLift(operator)返回的就是operator,那么上面兩句代碼可以看成:
Subscriber<? super T> st = (new OperatorMap<T, R>(new Func1())).call(o); parent.call(st);
OperatorMap里的call做了三件事情:
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer); o.add(parent); return parent;
把 new subscriber()和new Func1()(就是transformer對象)通過MapSubscriber封裝起來。然后新的Subcriber(MapSubscriber對象)放入訂閱列表中,以便最后一起把訂閱對象釋放。同時返回新的Subcriber。
根據上述的,
Subscriber<? super T> st = (new OperatorMap<T, R>(new Func1())).call(o); parent.call(st);
可以看成:
Subscriber<? super T> st = new MapSubscriber<T, R>(o, transformer); parent.call(st);
而parent是傳入的參數,指ObservableTemp.onSubscribe,也就是開頭代碼中的Observable one。
進一步上述兩行代碼可以看成:
Subscriber<? super T> st = new MapSubscriber<T, R>(o, transformer); (new Observable.Onsubscrible(){ //------------ Observable one @override public object call(…subscriber){ //----------------------call one …… subscriber.onNext(); //-------------- onNext one ……. } }).call(st);
這里的override里的call參數為new MapSubscriber<T, R>(o, transformer); new MapSubscriber<T, R>(o, transformer).onNext主要代碼為:
@Override public void onNext(T t) { result = mapper.call(t); actual.onNext(result); }
Mapper就是transformer對象,也就是new Func1(),而actual就是new subscriber()。也就是說:把Observable one里的類型經過new Func1().call的方法轉換成另外一個Subscriber,最后調用new Subscriber的onNext方法。
所以map整個過程就清楚了:
A.map(B).subscribe(C) 就是:
先通過map方法,把A中想要轉換的數據通過調用B里的call方法進行轉換,最后把轉換過的數據用C里的onNext方法進行處理。
我們拋開onCompleted和onError方法,
A.map(B).subscribe(C) <==> C.onNext (B.call(A.onNext()))