RxJava2線程切換原理分析


一、概述

  本節將分析RxJava2的線程切換模型。通過對線程切換源代碼的分析到達對RxJava2線程切換徹底理解的目的。通過對本節的學習你會發現,RxJava2線程切換是如此的簡單,僅僅是通過兩個操作符就能完成從子線程到主線程,或者主線程到子線程,再或者從子線程到子線程的切換。對應的操作符為:observerOn:指定觀察者運行的線程。subscribeOn:執行被觀察者運行的線程。

二、簡單例子入手

 private void threadSwitchTest() {
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("《深入Java虛擬機》");
                MyLog.log("Thread:" + Thread.currentThread().getName());
            }
        });
        observable
                .observeOn(AndroidSchedulers.mainThread())//觀察者執行線程
                .subscribeOn(Schedulers.io())//被觀察者執行線程
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        MyLog.log("Thread:" + Thread.currentThread().getName());
                    }
                });
    }

  以上例子中我們使用observeOn(AndroidSchedulers.mainThread())來指定觀察者運行在主線程,使用subscribeOn(Schedulers.io())來指定被觀察運行在子線程

三、源碼分析

  本節針對RxJava2的源代碼我們需要弄明白三件事情:

  1.子線程如何切換到主線程原理分析

  2.主線程如何切換到子線程原理分析

  3.子線程如何切換到子線程原理分析

  通過上一節的分析我們知道RxJava2通過創建一個被觀察者(ObservableCreate)和一個觀察者(LambdaObserver),並實現觀察者和被觀察者的綁定。通過ObservableEmitter.onNext發送消息,Consumer.accept中接收消息。而操作符map僅僅是對被觀察者ObservableCreate做了一層包裝(裝飾模式),變成了ObservableMap。而觀察者裝飾后則變成了MapObserver。

  很顯然,observeOn和subscribeOn都屬於操作符(他們都是用來做線程切換的操作符而已),所以這兩個操作符也符合上面Map操作符的包裝規則。

  subscribeOn源碼分析:

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    } 

 從上述源碼可以看出subscribeOn確實如上面所說,會被包裝成為一個ObservableSubscribeOn。其構造方法會傳入兩個參數,一個是this:代表當前被觀察者,也就是操作符上面修飾的那個被觀察者,本例中指的是ObservableObserveOn,ObservableObserverOn又裝飾了ObservableCreate。scheduler指的是Schedulers.io(), 指被觀察者運行在io線程,也就是子線程中。

下面看下Schedulers類是個什么東西。

public final class Schedulers {
    @NonNull
    static final Scheduler SINGLE;

    @NonNull
    static final Scheduler COMPUTATION;

    @NonNull
    static final Scheduler IO;

    @NonNull
    static final Scheduler TRAMPOLINE;

    @NonNull
    static final Scheduler NEW_THREAD;

    static final class SingleHolder {
        static final Scheduler DEFAULT = new SingleScheduler();
    }

    static final class ComputationHolder {
        static final Scheduler DEFAULT = new ComputationScheduler();
    }

    static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }

    static final class NewThreadHolder {
        static final Scheduler DEFAULT = new NewThreadScheduler();
    }

    static {
        SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

        COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

        IO = RxJavaPlugins.initIoScheduler(new IOTask());

        TRAMPOLINE = TrampolineScheduler.instance();

        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    }

Schedulers內部封裝了各種Scheduler。每一個Scheduler中都封裝的有線程池,用於執行后台任務。

到此處ObservableSubscribeOn對象也就創建完成了。

下面看下ObserverOn操作符都干了什么事情: 

  @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    } 

 ObserveOn方法內部包裝了一個ObservableObserveOn對象,其有兩個參數,this:代表當前Observable對象,此處指的是ObservableCreate這個對象,scheduler代表的是AndroidSchedulers.mainThread()。

我們看一下AndroidSchedulers的源代碼,看它都干了寫什么事

public final class AndroidSchedulers {

    private static final class MainHolder {
        static final Scheduler DEFAULT
            = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

    /** A {@link Scheduler} which executes actions on {@code looper}. */
    public static Scheduler from(Looper looper) {
        return from(looper, false);
    }

 AndroidSchedulers的內部類MainHolder的作用是在主線程中創建一個Handler。由new Handler(Looper.getMainLooper())來完成。因為Looper所在的線程為Handler所在的線程,又因為Looper.getMainLooper()獲取到的是主線程的looper,所以當前Handler運行在主線程,順帶着這塊的邏輯也是在主線程中完成的。字段MAIN_THREAD僅僅是把HandlerScheduler返回而已,而HandlerScheduler持有主線程handler。那么manThread()方法就好理解了 ,就是返回了一個持有主線程Handler的Scheduler而已。

所以ObservableObserverOn包裝了ObservableCreate並持有了主線程Handler。到此被觀察者就已經創建完成了。

下面說下觀察者Consumer.accept方法在這個鏈式調用中是如何被執行的:

  1.經過上面的分析被觀察者已經變為:ObservableObserverOn,ObservableObserverOn持有ObservableSubscribeOn對象的引用,ObservableSubscribeOn又持有ObservableCreate的引用。所以Observable對象經過三層包裝最終成為了ObservableObserverOn。

  2.Observable.subscribe(Consumer consumer)方法執行訂閱,會把原始的觀察者對象LambdaObserver對象包裝成為ObserverOnObserver對象,ObserverOnObserver又會被包裝成SubscribeOnObserver對象。用以在ObservableSubscribeOn對象執行subscribeActual方法的時候正式執行綁定操作。至此,觀察者和被觀察者建立了綁定關系。

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

  從上面的代碼中我們基本無法判斷是在哪里綁定的。從上面的分析我們知道scheduler要是一個HandlerScheduler.那么我們可以斷定的是scheduler.scheduleDirect一定是用來執行任務的,那么SubscribeTask肯定是一個任務沒錯。事實也如我們所料一樣,確實是這樣的。

   看下HandlerScheduler的scheduleDirect都干了什么

@Override
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        run = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        handler.postDelayed(scheduled, unit.toMillis(delay));
        return scheduled;
    }

  非常的簡單,構建一個ScheduleRunnable,並把handler和runnable傳入進去,然后執行handler.postDeayed向handler發送消息就行了。postDeayed方法最終會調用Runnable的run方法。

private static final class ScheduledRunnable implements Runnable, Disposable {
        private final Handler handler;
        private final Runnable delegate;

        private volatile boolean disposed; // Tracked solely for isDisposed().

        ScheduledRunnable(Handler handler, Runnable delegate) {
            this.handler = handler;
            this.delegate = delegate;
        }

        @Override
        public void run() {
            try {
                delegate.run();
            } catch (Throwable t) {
                RxJavaPlugins.onError(t);
            }
        }

  ScheduleRunnable在run方法中又會調用SubscribeTask的run方法。

  SubscribeTask.java  

final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

  在subscribeTask的run方法中最終完成了綁定,source指ObservableOnSubscribe

   3.被觀察者在執行ObservableOnSubscribe實例的subscribe方法的ObservableEmitter參數的onNext方法的時候,會首先調用SubscribeOnObserver的onNext方法,又由於SubscribeOnObserver持有ObserverOnObserver的引用,因此在SubscribeOnObserver的onNext方法中又會調用ObserveOnObserver對象的onNext方法,在此Next方法中又會調用CreateObserver的onNext方法,在其內部又會調用LambdaObserver.onNext,然后在LambdaObserver的onNext方法中又會調用Consumer.accept方法。最后完成數據的從發送到接收的流轉。

  了解了以上操作符的整體流轉流程后,我們接下來回過頭來看開頭我們提出的三個問題:

  1.主線程切換到子線程

  我們先來看ObservableSubscribeOn這個類,在上面的小例子中,直接將被觀察者運行在IO線程中了。我們直接看ObservableSubscribeOn的subscribeActual方法的源代碼 

 @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

   在subscribeActual方法內部先創建一個SubscribeOnObserver對象,並執行setDisposable執行任務。這里的scheduler指的是HandlerScheduler。SubscribeTask是一個實現了Runnable的對象在其內部完成了綁定操作。

  先來看下HandlerScheduler的scheduleDirect方法

  @Override
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        run = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        handler.postDelayed(scheduled, unit.toMillis(delay));
        return scheduled;
    }

  scheduleDirect方法邏輯上很簡單,1.把subscribeTask和handler封裝成ScheduleRunnable。然后利用Handler.postDelayed執行這個Runnable對象。postDelayed執行的最后會調用msg.callback.run()其實就是調用ScheduleRunnable的run方法。在在run方法內又會調用SubscribeTask的run方法。

private static final class ScheduledRunnable implements Runnable, Disposable {
        private final Handler handler;
        private final Runnable delegate;

        private volatile boolean disposed; // Tracked solely for isDisposed().

        ScheduledRunnable(Handler handler, Runnable delegate) {
            this.handler = handler;
            this.delegate = delegate;
        }

        @Override
        public void run() {
            try {
          //SubscribeTask的run方法 delegate.run(); } catch (Throwable t) { RxJavaPlugins.onError(t); } }

  我們看下SubscribeTask的run方法都干了啥事。

 final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

  在SubscribeTask類的run方法中完成最終的綁定。此處的source指的是ObservableOnSubscribe

  在主線程中執行其實也就這么多最終會把方法放到Handler中執行

 

  2.在子線程中執行任務

  直接看ObservableObserveOn類的subscribeActual 

 @Override
    protected void subscribeActual(Observer<? super T> observer) {
    //首先判斷一下調度線程是否是在當前線程中執行,如果是就直接綁定,如果不是就開啟工作線程 if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }

 首先根據TrampolineScheduler判斷任務是否是在當前線程執行,如果是就直接綁定。如果不是就創建一個ObserverOnObserver對象,並把Observer和Worker對象傳遞進去。即可完成綁定。

 我們接下來主要看下其是如何在子線程中執行的

 ObserverOnObserver繼承了Runnable對象。在執行onNext方法的時候會調用worker的schedule(this)方法。

 @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        } 
 void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }

  其實到這我們大致可以判斷出來worker.schedule(this)必定會運行run方法。不着急,我們先看IoSchedule類中的worker以及worker.schedule干了什么

  @NonNull
    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }

  創建一個Worker對象 

@NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }

            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }

  執行threadWorker.scheduleActual(action),這里的action指的就是ObserverObserveOn對象,因其繼承了Runnable對象。

看看ThreadWorker.scheduleActual干了啥

 @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }

  邏輯很清晰,把傳入的Runnable(ObservableObserveOn)封裝成為一個ScheduleRunnable對象。並把這個對象放入線程池中去執行。

executor都代表線程池。執行的時候會運行ScheduleRunnable的run方法。在其run方法內部又會調用ObserverObserveOn的run方法。

  下面回過頭來再看看ObserverObserveOn的run方法

  @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        } 
void drainNormal() {
          ....
                    a.onNext(v);
              ....
            }
        }  

  其會調用a.onNext方法,讓onNext方法運行在線程池中。a值的就是一個CreateObserver或者其包裝類。通過一層層的調用Consume.accept方法最終會運行到子線程中。

  

 

  2.主線程如何切換到主線程

  回過頭看ObservableObserveOn的subscribeActual方法

 

 這里的scheduler指的是HandlerScheduler。HandlerScheduler內部維護了一個運行在主線程的Handler和一個內部類HandlerWorker。其調用source.subscribe執行觀察者和被觀察者的訂閱。當ObservableEmitter.onNext方法執行后,會調用ObserveOnObserver內部的onNext方法。

 

 schedule方法又會調用worker.scheduler方法

 

 此處的worker為HandlerScheduler中的Worker,源碼如下

 

 通過Handler把ScheduleRunnable發送到主線程中執行。因為HandlerScheduler是主線程handler所以在Handler中執行的邏輯也會被切換到主線程中去執行。其實這里的run方法最終運行的是ObserveOnObserver中的run方法。在其run方法中會調用其上級包裝類SubscribeOnObserver的onNext方法。之后又會調用LambdaObserver的onNext方法。在其onNext方法中會調用Consumer.accept方法,最終讓其運行在主線程中。

  3.子線程如何切換到子線程

  這里分析下把Consumer.accept方法運行在子線程的流程

  同樣只需要設置observeOn(Schedulers.io())就OK了。同樣會創建一個ObserveOnObserver,其接受兩個重要的參數this:當前Observer,scheduler:ioScheduler。

  其綁定過程會執行ObservableObserveOn的subscribeActual方法

只是此處的scheduler不再是HandlerScheduler,而是IoScheduler。當ObservableEmitter.onNext方法被執行的時候,會調用ObserveOnObserver的onNext方法。而在onNext方法中又會調用IoScheduler中worker.schedule。最終會執行NewThreadWorker的scheduleActual方法

 

 當上述方法被執行后就會調用ObserveOnObserver中的run方法。其run方法又會逐個解包裝調用其OnNext方法。知道LambdaObserver的onNext被調用。onNext又會調用Consumer.accept。經過以上步驟就完成了最終的調用。因為run是在線程池中執行的,所以跟着把業務邏輯代碼也切換到了線程池中執行,即子線程中執行。

 

總結:

  經過上面的分析,RxJava切換線程已經分析完了,相信大家了解后對RxJava的線程切換會有一定的感悟。在這里再用白花總結一下。

  1.子線程切換主線程:給主線程所在的Handler發消息,然后就把邏輯切換過去了。

  2.主線程切換子線程:把任務放到線程池中執行就能把執行邏輯切換到子線程

  3.子線程切換子線程:把任務分別扔進兩個線程就行了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM