簡書地址 http://www.jianshu.com/p/2badfbb3a33b
描述 | RxJava 1.X | RxJava 2.X |
---|---|---|
package 包名 |
rx.xxx |
io.reactivex.xxx |
Reactive Streams規范 | 1.X 早於Reactive Streams 規范出現,僅部分支持規范 |
完全支持 |
Backpressure 背壓 | 對背壓的支持不完善 | Observable 設計為不支持背壓新增 Flowable 支持背壓 |
null 空值 |
支持 | 不再支持null 值,傳入null 值會拋出 NullPointerException |
Schedulers 線程調度器 |
Schedulers.immediate() Schedulers.trampoline() Schedulers.computation() Schedulers.newThread() Schedulers.io() Schedulers.from(executor) AndroidSchedulers.mainThread() |
移除Schedulers.immediate() 新增 Schedulers.single() 其它未變 |
Single |
行為類似Observable ,但只會發射一個onSuccess 或onError |
按照Reactive Streams 規范重新設計,遵循協議onSubscribe(onSuccess/onError) |
Completable |
行為類似Observable ,要么全部成功,要么就失敗 |
按照Reactive Streams 規范重新設計,遵循協議onSubscribe (onComplete/onError) |
Maybe |
無 | 2.X 新增,行為類似Observable ,可能會有一個數據或一個錯誤,也可能什么都沒有。可以將其視為一種返回可空值的方法。這種方法如果不拋出異常的話,將總是會返回一些東西,但是返回值可能為空,也可能不為空。按照Reactive Streams 規范設計,遵循協議onSubscribe (onSuccess/onError/onComplete) |
Flowable |
無 | 2.X 新增,行為類似Observable ,按照Reactive Streams 規范設計,支持背壓Backpressure |
Subject |
AsyncSubject BehaviorSubject PublishSubject ReplaySubject UnicastSubject |
2.X 依然維護這些Subject 現有的功能,並新增:AsyncProcessor BehaviorProcessor PublishProcessor ReplayProcessor UnicastProcessor 支持背壓 Backpressure |
Subscriber |
Subscriber |
由於與Reactive Streams 的命名沖突,Subscriber 已重命名為Disposable |
RxJava 2.X + Retrofit + OkHttp 簡單示例點這里
library
依賴變化
//1.X compile 'io.reactivex:rxjava:1.2.1' compile 'io.reactivex:rxandroid:1.2.1' //2.X compile 'io.reactivex.rxjava2:rxjava:2.0.0' compile 'io.reactivex.rxjava2:rxandroid:2.0.0'
package
變化
變動主要為rx.xxx --> io.reactivex.xxx
//1.X import rx.Observable; import rx.Subscription; import rx.android.schedulers.AndroidSchedulers; import rx.schedulers.Schedulers; import rx.functions.Action1; //2.X import io.reactivex.Observable; import io.reactivex.ObservableSource; import io.reactivex.ObservableTransformer; import io.reactivex.disposables.Disposable; import io.reactivex.android.schedulers.AndroidSchedulers; import io.reactivex.schedulers.Schedulers; import io.reactivex.functions.Consumer;
null
值
RxJava 2.X
不再支持null
值,如果傳入一個null
會拋出NullPointerException
Observable.just(null); Single.just(null); Observable.fromCallable(() -> null) .subscribe(System.out::println, Throwable::printStackTrace); Observable.just(1).map(v -> null) .subscribe(System.out::println, Throwable::printStackTrace);
案例1
//1.X public static final Observable.Transformer IO_TRANSFORMER = new Observable.Transformer() { @Override public Object call(Object observable) { return ((Observable) observable).subscribeOn(Schedulers.io()) .unsubscribeOn(Schedulers.io()) .observeOn(Schedulers.io()); } }; public static final <T> Observable.Transformer<T, T> applySchedulers(Observable.Transformer transformer){ return (Observable.Transformer<T, T>)transformer; } Action1<Integer> onNext = null; String[] items = { "item1", "item2", "item3" }; Subscription subscription = Observable.from(items) .compose(RxUtil.<String>applySchedulers(IO_TRANSFORMER)) .map(new Func1<String, Integer>() { @Override public Integer call(String s) { return Integer.valueOf(s); } }) .subscribe(onNext); //TODO subscription.unsubscribe(); //2.X public static final ObservableTransformer IO_TRANSFORMER = new ObservableTransformer() { @Override public ObservableSource apply(Observable upstream) { return upstream.subscribeOn(Schedulers.io()) .unsubscribeOn(Schedulers.io()) .observeOn(Schedulers.io()); } }; public static final <T> ObservableTransformer<T, T> applySchedulers(ObservableTransformer transformer){ return (ObservableTransformer<T, T>)transformer; } Consumer<Integer> onNext = null; String[] items = { "item1", "item2", "item3" }; Disposable disposable = Observable.fromArray(items) .compose(RxUtil.<String>applySchedulers(IO_TRANSFORMER)) .map(new Function<String, Integer>() { @Override public Integer apply(String s) throws Exception { return Integer.valueOf(s); } }) .subscribe(onNext); //TODO disposable.dispose();
.subscribe(...)
返回值的變化:1.X為Subscription
, 2.X為Disposable
Transformer
的變化:1.X為rx.Observable
內部的Transformer
接口, 繼承自Func1<Observable<T>, Observable<R>>
, 2.X為io.reactivexObservableTransformer<Upstream, Downstream>
,是一個獨立的接口AndroidSchedulers
的變化: 1.X為rx.android.schedulers.AndroidSchedulers
, 2.X為io.reactivex.android.schedulers.AndroidSchedulers
Func1
的變化: 1.X為rx.functions.Func1
, 2.X為io.reactivex.functions.Function
- 其它重載方法見下方截圖
//1.X public class AppBaseActivity extends AppCompatActivity { ... private CompositeSubscription mCompositeSubscription; protected void addSubscription(Subscription subscription) { if (null == mCompositeSubscription) { mCompositeSubscription = new CompositeSubscription(); } mCompositeSubscription.add(subscription); } @Override protected void onDestroy() { if (null != mCompositeSubscription) { mCompositeSubscription.unsubscribe(); } super.onDestroy(); } ... } //2.X public class AppBaseActivity extends AppCompatActivity { ... private CompositeDisposable mCompositeDisposable; protected void addDisposable(Disposable disposable) { if (null == mCompositeDisposable) { mCompositeDisposable = new CompositeDisposable(); } mCompositeDisposable.add(disposable); } @Override protected void onDestroy() { if (null != mCompositeDisposable) { mCompositeDisposable.clear(); } super.onDestroy(); } ... }