1、package更改
rx1包名由原來的rx.xxx更改為io.reactivex.xxx,並且在同一個module之下,rx1和rx2是不兼容的。
2、背壓支持
RxJava在1.0只有一個個觀察者模式,只能部分支持背壓:
- Observable(被觀察者)/Observer(觀察者)
- Observable(被觀察者)/Subscriber(觀察者)
RxJava在2.0出現了兩個觀察者模式,新增Flowable支持背壓,而Observable不支持背壓:
- Observable(被觀察者)/Observer(觀察者)
- Flowable(被觀察者)/Subscriber(觀察者)
注:背壓是指在異步場景中,被觀察者發送事件速度遠快於觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發送速度的策略,簡而言之,背壓是流速控制的一種策略。
3、NULL值支持
rx1.x是可以支持NULL值的
Observable.just(null);
Single.from(null);
rx2.x是不支持NULL值的
Observable.just(null)
.subscribe(System.out::println, Throwable::printStackTrace);
Single.just(null)
.subscribe(System.out::println, Throwable::printStackTrace);
4、ActionN 和 FuncN 改名
ActionN 和 FuncN 遵循Java 8的命名規則。
其中,Action0 改名成Action,Action1改名成Consumer,而Action2改成BiConsumer,而Action3 - Action9都不再使了,ActionN改成Consumer<Object[]>,新的Action都增加拋出異常處理
同樣,Func改名成Function,取消了Func0的方法,Func2改名成BiFunction,Func3 - Func9 改成 Function3 - Function9,FuncN 改為 Function<Object[], R>,新的Function都增加拋出異常處理
5、Subscription改為Disposable
rx1.x的Subscription
public final Subscription subscribe(final Action1<? super T> onNext)
rx2.x返回值的是Disposable,由於已經存在了 org.reactivestreams.subscription 這個類,為了避免名字沖突將原先的 rx.Subscription 改名為 io.reactivex.disposables.Disposable
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
注意:Subscription 不再有訂閱subcribe和unSubcribe的概念。
6、Observer和Subscriber更改
rx2.x中Observable不在支持Subscriber方法,改由Flowable支持Subscriber,而Observable只支持Observer方法。
新增onSubscribe方法,Observer提供了Disposable參數,而Subscriber提供了Subscription參數,向觀察者提供以同步和異步方式取消與Observable的連接的方法。
rx1.x的Observer
public interface Observer<T> {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}
rx2.x的Observer
public interface Observer<T> {
void onSubscribe(Disposable d);//
void onNext(T value);
void onError(Throwable e);
void onComplete();
}
public interface Subscriber<T> {
void onSubscribe(Subscription s);//相當於onStart方法
void onNext(T value);
void onError(Throwable e);
void onComplete();
}
7、Observable.OnSubscribe改為ObservableOnSubscribe
rx1.x寫法
Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
}
});
rx2.x寫法
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> e) throws Exception {
}
});
8、ObservableOnSubscribe 中使用 ObservableEmitter 發送數據給 Observer
ObservableEmitter可以理解為發射器,這個就是用來發出事件的,它可以發出三種類型的事件,通過調用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分別發出next事件、complete事件和error事件。emitter的onComplete()調用后,Consumer不再接收任何next事件。
9、Observable.Transformer 變成 ObservableTransformer
rx1.x的寫法
public synchronized <T> Observable.Transformer<T, T> bindToLifeCycle() {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> tObservable) {
if (lifecycleSubject == null) return upstream;
return upstream.takeUntil(lifecycleSubject);
}
};
}
rx2.x的寫法
public synchronized <T> ObservableTransformer<T, T> bindToLifeCycle() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
if (lifecycleSubject == null) return upstream;
return upstream.takeUntil(lifecycleSubject);
}
};
}
同時新增Flowable的寫法
public synchronized <T> FlowableTransformer<T, T> bindToLifeCycle() {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> upstream) {
if (lifecycleSubject == null) return upstream;
return upstream.takeUntil(lifecycleSubject);
}
};
}
10、from變更
rx1.x中統一使用from方法名,但rx2.x不在使用from,而是已from+參數類型名命名
rx2.x新增了Publisher參數的from方法。
rx1.0
public static <T> Observable<T> from(T[] array)
public static <T> Observable<T> fromCallable(Callable<? extends T> func)
public static <T> Observable<T> from(Iterable<? extends T> iterable)
public static <T> Observable<T> from(Future<? extends T> future)
rx2.0
public static <T> Observable<T> fromArray(T... items)
public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)
public static <T> Observable<T> fromFuture(Future<? extends T> future)
public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)
11、blocking變更
rx1.x
.toBlocking().firstOrDefault(defaultValue)
rx2.x
.blockingFirst(defaultValue);
在項目升級RxJava中暫時遇到這些變更,具體RxJava2.x的使用請去github查看https://github.com/ReactiveX/RxJava