一、概述
本節將分析RxJava2的線程切換模型。通過對線程切換源代碼的分析到達對RxJava2線程切換徹底理解的目的。通過對本節的學習你會發現,RxJava2線程切換是如此的簡單,僅僅是通過兩個操作符就能完成從子線程到主線程,或者主線程到子線程,再或者從子線程到子線程的切換。對應的操作符為:observerOn:指定觀察者運行的線程。subscribeOn:執行被觀察者運行的線程。
二、簡單例子入手
private void threadSwitchTest() {
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("《深入Java虛擬機》");
MyLog.log("Thread:" + Thread.currentThread().getName());
}
});
observable
.observeOn(AndroidSchedulers.mainThread())//觀察者執行線程
.subscribeOn(Schedulers.io())//被觀察者執行線程
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
MyLog.log("Thread:" + Thread.currentThread().getName());
}
});
}
以上例子中我們使用observeOn(AndroidSchedulers.mainThread())來指定觀察者運行在主線程,使用subscribeOn(Schedulers.io())來指定被觀察運行在子線程
三、源碼分析
本節針對RxJava2的源代碼我們需要弄明白三件事情:
1.子線程如何切換到主線程原理分析
2.主線程如何切換到子線程原理分析
3.子線程如何切換到子線程原理分析
通過上一節的分析我們知道RxJava2通過創建一個被觀察者(ObservableCreate)和一個觀察者(LambdaObserver),並實現觀察者和被觀察者的綁定。通過ObservableEmitter.onNext發送消息,Consumer.accept中接收消息。而操作符map僅僅是對被觀察者ObservableCreate做了一層包裝(裝飾模式),變成了ObservableMap。而觀察者裝飾后則變成了MapObserver。
很顯然,observeOn和subscribeOn都屬於操作符(他們都是用來做線程切換的操作符而已),所以這兩個操作符也符合上面Map操作符的包裝規則。
subscribeOn源碼分析:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
從上述源碼可以看出subscribeOn確實如上面所說,會被包裝成為一個ObservableSubscribeOn。其構造方法會傳入兩個參數,一個是this:代表當前被觀察者,也就是操作符上面修飾的那個被觀察者,本例中指的是ObservableObserveOn,ObservableObserverOn又裝飾了ObservableCreate。scheduler指的是Schedulers.io(), 指被觀察者運行在io線程,也就是子線程中。
下面看下Schedulers類是個什么東西。
public final class Schedulers {
@NonNull
static final Scheduler SINGLE;
@NonNull
static final Scheduler COMPUTATION;
@NonNull
static final Scheduler IO;
@NonNull
static final Scheduler TRAMPOLINE;
@NonNull
static final Scheduler NEW_THREAD;
static final class SingleHolder {
static final Scheduler DEFAULT = new SingleScheduler();
}
static final class ComputationHolder {
static final Scheduler DEFAULT = new ComputationScheduler();
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
Schedulers內部封裝了各種Scheduler。每一個Scheduler中都封裝的有線程池,用於執行后台任務。
到此處ObservableSubscribeOn對象也就創建完成了。
下面看下ObserverOn操作符都干了什么事情:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
ObserveOn方法內部包裝了一個ObservableObserveOn對象,其有兩個參數,this:代表當前Observable對象,此處指的是ObservableCreate這個對象,scheduler代表的是AndroidSchedulers.mainThread()。
我們看一下AndroidSchedulers的源代碼,看它都干了寫什么事
public final class AndroidSchedulers {
private static final class MainHolder {
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
/** A {@link Scheduler} which executes actions on {@code looper}. */
public static Scheduler from(Looper looper) {
return from(looper, false);
}
AndroidSchedulers的內部類MainHolder的作用是在主線程中創建一個Handler。由new Handler(Looper.getMainLooper())來完成。因為Looper所在的線程為Handler所在的線程,又因為Looper.getMainLooper()獲取到的是主線程的looper,所以當前Handler運行在主線程,順帶着這塊的邏輯也是在主線程中完成的。字段MAIN_THREAD僅僅是把HandlerScheduler返回而已,而HandlerScheduler持有主線程handler。那么manThread()方法就好理解了 ,就是返回了一個持有主線程Handler的Scheduler而已。
所以ObservableObserverOn包裝了ObservableCreate並持有了主線程Handler。到此被觀察者就已經創建完成了。
下面說下觀察者Consumer.accept方法在這個鏈式調用中是如何被執行的:
1.經過上面的分析被觀察者已經變為:ObservableObserverOn,ObservableObserverOn持有ObservableSubscribeOn對象的引用,ObservableSubscribeOn又持有ObservableCreate的引用。所以Observable對象經過三層包裝最終成為了ObservableObserverOn。
2.Observable.subscribe(Consumer consumer)方法執行訂閱,會把原始的觀察者對象LambdaObserver對象包裝成為ObserverOnObserver對象,ObserverOnObserver又會被包裝成SubscribeOnObserver對象。用以在ObservableSubscribeOn對象執行subscribeActual方法的時候正式執行綁定操作。至此,觀察者和被觀察者建立了綁定關系。
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
從上面的代碼中我們基本無法判斷是在哪里綁定的。從上面的分析我們知道scheduler要是一個HandlerScheduler.那么我們可以斷定的是scheduler.scheduleDirect一定是用來執行任務的,那么SubscribeTask肯定是一個任務沒錯。事實也如我們所料一樣,確實是這樣的。
看下HandlerScheduler的scheduleDirect都干了什么
@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
handler.postDelayed(scheduled, unit.toMillis(delay));
return scheduled;
}
非常的簡單,構建一個ScheduleRunnable,並把handler和runnable傳入進去,然后執行handler.postDeayed向handler發送消息就行了。postDeayed方法最終會調用Runnable的run方法。
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
private volatile boolean disposed; // Tracked solely for isDisposed().
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
RxJavaPlugins.onError(t);
}
}
ScheduleRunnable在run方法中又會調用SubscribeTask的run方法。
SubscribeTask.java
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
在subscribeTask的run方法中最終完成了綁定,source指ObservableOnSubscribe
3.被觀察者在執行ObservableOnSubscribe實例的subscribe方法的ObservableEmitter參數的onNext方法的時候,會首先調用SubscribeOnObserver的onNext方法,又由於SubscribeOnObserver持有ObserverOnObserver的引用,因此在SubscribeOnObserver的onNext方法中又會調用ObserveOnObserver對象的onNext方法,在此Next方法中又會調用CreateObserver的onNext方法,在其內部又會調用LambdaObserver.onNext,然后在LambdaObserver的onNext方法中又會調用Consumer.accept方法。最后完成數據的從發送到接收的流轉。
了解了以上操作符的整體流轉流程后,我們接下來回過頭來看開頭我們提出的三個問題:
1.主線程切換到子線程
我們先來看ObservableSubscribeOn這個類,在上面的小例子中,直接將被觀察者運行在IO線程中了。我們直接看ObservableSubscribeOn的subscribeActual方法的源代碼
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
在subscribeActual方法內部先創建一個SubscribeOnObserver對象,並執行setDisposable執行任務。這里的scheduler指的是HandlerScheduler。SubscribeTask是一個實現了Runnable的對象在其內部完成了綁定操作。
先來看下HandlerScheduler的scheduleDirect方法
@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
handler.postDelayed(scheduled, unit.toMillis(delay));
return scheduled;
}
scheduleDirect方法邏輯上很簡單,1.把subscribeTask和handler封裝成ScheduleRunnable。然后利用Handler.postDelayed執行這個Runnable對象。postDelayed執行的最后會調用msg.callback.run()其實就是調用ScheduleRunnable的run方法。在在run方法內又會調用SubscribeTask的run方法。
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
private volatile boolean disposed; // Tracked solely for isDisposed().
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
//SubscribeTask的run方法
delegate.run();
} catch (Throwable t) {
RxJavaPlugins.onError(t);
}
}
我們看下SubscribeTask的run方法都干了啥事。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
在SubscribeTask類的run方法中完成最終的綁定。此處的source指的是ObservableOnSubscribe
在主線程中執行其實也就這么多最終會把方法放到Handler中執行
2.在子線程中執行任務
直接看ObservableObserveOn類的subscribeActual
@Override
protected void subscribeActual(Observer<? super T> observer) {
//首先判斷一下調度線程是否是在當前線程中執行,如果是就直接綁定,如果不是就開啟工作線程
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
首先根據TrampolineScheduler判斷任務是否是在當前線程執行,如果是就直接綁定。如果不是就創建一個ObserverOnObserver對象,並把Observer和Worker對象傳遞進去。即可完成綁定。
我們接下來主要看下其是如何在子線程中執行的
ObserverOnObserver繼承了Runnable對象。在執行onNext方法的時候會調用worker的schedule(this)方法。
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
其實到這我們大致可以判斷出來worker.schedule(this)必定會運行run方法。不着急,我們先看IoSchedule類中的worker以及worker.schedule干了什么
@NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
創建一個Worker對象
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
執行threadWorker.scheduleActual(action),這里的action指的就是ObserverObserveOn對象,因其繼承了Runnable對象。
看看ThreadWorker.scheduleActual干了啥
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
邏輯很清晰,把傳入的Runnable(ObservableObserveOn)封裝成為一個ScheduleRunnable對象。並把這個對象放入線程池中去執行。
executor都代表線程池。執行的時候會運行ScheduleRunnable的run方法。在其run方法內部又會調用ObserverObserveOn的run方法。
下面回過頭來再看看ObserverObserveOn的run方法
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
....
a.onNext(v);
....
}
}
其會調用a.onNext方法,讓onNext方法運行在線程池中。a值的就是一個CreateObserver或者其包裝類。通過一層層的調用Consume.accept方法最終會運行到子線程中。
2.主線程如何切換到主線程
回過頭看ObservableObserveOn的subscribeActual方法

這里的scheduler指的是HandlerScheduler。HandlerScheduler內部維護了一個運行在主線程的Handler和一個內部類HandlerWorker。其調用source.subscribe執行觀察者和被觀察者的訂閱。當ObservableEmitter.onNext方法執行后,會調用ObserveOnObserver內部的onNext方法。

schedule方法又會調用worker.scheduler方法

此處的worker為HandlerScheduler中的Worker,源碼如下

通過Handler把ScheduleRunnable發送到主線程中執行。因為HandlerScheduler是主線程handler所以在Handler中執行的邏輯也會被切換到主線程中去執行。其實這里的run方法最終運行的是ObserveOnObserver中的run方法。在其run方法中會調用其上級包裝類SubscribeOnObserver的onNext方法。之后又會調用LambdaObserver的onNext方法。在其onNext方法中會調用Consumer.accept方法,最終讓其運行在主線程中。
3.子線程如何切換到子線程
這里分析下把Consumer.accept方法運行在子線程的流程
同樣只需要設置observeOn(Schedulers.io())就OK了。同樣會創建一個ObserveOnObserver,其接受兩個重要的參數this:當前Observer,scheduler:ioScheduler。
其綁定過程會執行ObservableObserveOn的subscribeActual方法

只是此處的scheduler不再是HandlerScheduler,而是IoScheduler。當ObservableEmitter.onNext方法被執行的時候,會調用ObserveOnObserver的onNext方法。而在onNext方法中又會調用IoScheduler中worker.schedule。最終會執行NewThreadWorker的scheduleActual方法

當上述方法被執行后就會調用ObserveOnObserver中的run方法。其run方法又會逐個解包裝調用其OnNext方法。知道LambdaObserver的onNext被調用。onNext又會調用Consumer.accept。經過以上步驟就完成了最終的調用。因為run是在線程池中執行的,所以跟着把業務邏輯代碼也切換到了線程池中執行,即子線程中執行。
總結:
經過上面的分析,RxJava切換線程已經分析完了,相信大家了解后對RxJava的線程切換會有一定的感悟。在這里再用白花總結一下。
1.子線程切換主線程:給主線程所在的Handler發消息,然后就把邏輯切換過去了。
2.主線程切換子線程:把任務放到線程池中執行就能把執行邏輯切換到子線程
3.子線程切換子線程:把任務分別扔進兩個線程就行了。
