前言
很多項目使用流行的Rxjava2 + Retrofit搭建網絡框架,Rxjava現在已經發展到Rxjava2,之前一直都只是再用Rxjava,但從來沒有了解下Rxjava的內部實現,接下來一步步來分析Rxjava2的源碼,Rxjava2分Observable和Flowable兩種(無被壓和有被壓),我們今天先從簡單的無背壓的observable來分析。源碼基於rxjava:2.1.1。
一、Rxjava如何創建事件源、發射事件、何時發射事件、如何將觀察者和被觀察者關聯起來
簡單的例子
先來段最簡單的代碼,直觀的了解下整個Rxjava運行的完整流程。
1 private void doSomeWork() { 2 Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { 3 @Override 4 public void subscribe(ObservableEmitter<String> e) throws Exception { 5 e.onNext("a"); 6 e.onComplete(); 7 } 8 }); 9 Observer observer = new Observer<String>() { 10 11 @Override 12 public void onSubscribe(Disposable d) { 13 Log.i("lx", " onSubscribe : " + d.isDisposed()); 14 } 15 16 @Override 17 public void onNext(String str) { 18 Log.i("lx", " onNext : " + str); 19 } 20 21 @Override 22 public void onError(Throwable e) { 23 Log.i("lx", " onError : " + e.getMessage()); 24 } 25 26 @Override 27 public void onComplete() { 28 Log.i("lx", " onComplete"); 29 } 30 }; 31 observable.subscribe(observer); 32 }
上面代碼之所以將observable和observer單獨聲明,最后再調用observable.subscribe(observer);
是為了分步來分析:
- 被觀察者 Observable 如何生產事件的
- 被觀察者 Observable 何時生產事件的
- 觀察者Observer是何時接收到上游事件的
- Observable 與Observer是如何關聯在一起的
Observable
Observable是數據的上游,即事件生產者
首先來分析事件是如何生成的,直接看代碼 Observable.create()
方法。
1 @SchedulerSupport(SchedulerSupport.NONE) 2 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { // ObservableOnSubscribe 是個接口,只包含subscribe方法,是事件生產的源頭。 3 ObjectHelper.requireNonNull(source, "source is null"); // 判空 4 return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); 5 }
最重要的是RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));這句代碼。繼續跟蹤進去
1 /** 2 * Calls the associated hook function. 3 * @param <T> the value type 4 * @param source the hook's input value 5 * @return the value returned by the hook 6 */ 7 @SuppressWarnings({ "rawtypes", "unchecked" }) 8 @NonNull 9 public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { 10 Function<? super Observable, ? extends Observable> f = onObservableAssembly; 11 if (f != null) { 12 return apply(f, source); 13 } 14 return source; 15 }
看注釋,原來這個方法是個hook function。 通過調試得知靜態對象onObservableAssembly默認為null, 所以此方法直接返回傳入的參數source。
onObservableAssembly可以通過靜態方法RxJavaPlugins. setOnObservableAssembly ()設置全局的Hook函數, 有興趣的同學可以自己去試試。 這里暫且不談,我們繼續返回代碼。
現在我們明白了:
1 Observable<String> observable=Observable.create(new ObservableOnSubscribe<String>() { 2 ... 3 ... 4 })
相當於:
1 Observable<String> observable=new ObservableCreate(new ObservableOnSubscribe<String>() { 2 ... 3 ... 4 }))
好了,至此我們明白了,事件的源就是new ObservableCreate()
對象,將ObservableOnSubscribe
作為參數傳遞給ObservableCreate
的構造函數。
事件是由接口ObservableOnSubscribe
的subscribe方法上產的,至於何時生產事件,稍后再分析。
Observer
Observer 是數據的下游,即事件消費者
Observer是個interface,包含 :
1 void onSubscribe(@NonNull Disposable d); 2 void onNext(@NonNull T t); 3 void onError(@NonNull Throwable e); 4 void onComplete();
上游發送的事件就是再這幾個方法中被消費的。上游何時發送事件、如何發送,稍后再表。
subscribe
重點來了,接下來最重要的方法來了:observable.subscribe(observer);
從這個方法的名字就知道,subscribe是訂閱,是將觀察者(observer)與被觀察者(observable)連接起來的方法。只有subscribe方法執行后,上游產生的事件才能被下游接收並處理。其實自然的方式應該是observer訂閱(subscribe) observable, 但這樣會打斷rxjava的鏈式結構。所以采用相反的方式。
接下來看源碼,只列出關鍵代碼
1 public final void subscribe(Observer<? super T> observer) { 2 ObjectHelper.requireNonNull(observer, "observer is null"); 3 ...... 4 observer = RxJavaPlugins.onSubscribe(this, observer); // hook ,默認直接返回observer 5 ...... 6 subscribeActual(observer); // 這個才是真正實現訂閱的方法。 7 ...... 8 } 9 10 // subscribeActual 是抽象方法,所以需要到實現類中去看具體實現,也就是說實現是在上文中提到的ObservableCreate中 11 protected abstract void subscribeActual(Observer<? super T> observer);
接下來我們來看ObservableCreate.java:
1 public ObservableCreate(ObservableOnSubscribe<T> source) { 2 this.source = source; // 事件源,生產事件的接口,由我們自己實現 3 } 4 5 @Override 6 protected void subscribeActual(Observer<? super T> observer) { 7 CreateEmitter<T> parent = new CreateEmitter<T>(observer); // 發射器 8 observer.onSubscribe(parent); //直接回調了觀察者的onSubscribe 9 10 try { 11 // 調用了事件源subscribe方法生產事件,同時將發射器傳給事件源。 12 // 現在我們明白了,數據源生產事件的subscribe方法只有在observable.subscribe(observer)被執行 13 后才執行的。 換言之,事件流是在訂閱后才產生的。 14 //而observable被創建出來時並不生產事件,同時也不發射事件。 15 source.subscribe(parent); 16 } catch (Throwable ex) { 17 Exceptions.throwIfFatal(ex); 18 parent.onError(ex); 19 } 20 }
現在我們明白了,數據源生產事件的subscribe方法只有在observable.subscribe(observer)被執行后才執行的。 換言之,事件流是在訂閱后才產生的。而observable被創建出來時並不生產事件,同時也不發射事件。
接下來我們再來看看事件是如何被發射出去,同時observer是如何接收到發射的事件的
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
CreateEmitter 實現了ObservableEmitter接口,同時ObservableEmitter接口又繼承了Emitter接口。
CreateEmitter 還實現了Disposable接口,這個disposable接口是用來判斷是否中斷事件發射的。
從名稱上就能看出,這個是發射器,故名思議是用來發射事件的,正是它將上游產生的事件發射到下游的。
Emitter是事件源與下游的橋梁。
CreateEmitter 主要包括方法:
1 void onNext(@NonNull T value); 2 void onError(@NonNull Throwable error); 3 void onComplete(); 4 public void dispose() ; 5 public boolean isDisposed();
是不是跟observer的方法很像?
我們來看看CreateEmitter中這幾個方法的具體實現:
只列出關鍵代碼
1 public void onNext(T t) { 2 if (!isDisposed()) { // 判斷事件是否需要被丟棄 3 observer.onNext(t); // 調用Emitter的onNext,它會直接調用observer的onNext 4 } 5 } 6 public void onError(Throwable t) { 7 if (!isDisposed()) { 8 try { 9 observer.onError(t); // 調用Emitter的onError,它會直接調用observer的onError 10 } finally { 11 dispose(); // 當onError被觸發時,執行dispose(), 后續onNext,onError, onComplete就不會繼 12 續發射事件了 13 } 14 } 15 } 16 17 @Override 18 public void onComplete() { 19 if (!isDisposed()) { 20 try { 21 observer.onComplete(); // 調用Emitter的onComplete,它會直接調用observer的onComplete 22 } finally { 23 dispose(); // 當onComplete被觸發時,也會執行dispose(), 后續onNext,onError, onComplete 24 同樣不會繼續發射事件了 25 } 26 } 27 }
CreateEmitter 的onError和onComplete方法任何一個執行完都會執行dispose()中斷事件發射,所以observer中的onError和onComplete也只能有一個被執行。
現在終於明白了,事件是如何被發射給下游的。當訂閱成功后,數據源ObservableOnSubscribe開始生產事件,調用Emitter的onNext,onComplete向下游發射事件,
Emitter包含了observer的引用,又調用了observer onNext,onComplete,這樣下游observer就接收到了上游發射的數據。
總結
Rxjava的流程大概是:
- Observable.create 創建事件源,但並不生產也不發射事件。
- 實現observer接口,但此時沒有也無法接受到任何發射來的事件。
- 訂閱 observable.subscribe(observer), 此時會調用具體Observable的實現類中的subscribeActual方法,
此時會才會真正觸發事件源生產事件,事件源生產出來的事件通過Emitter的onNext,onError,onComplete發射給observer對應的方法由下游observer消費掉。從而完成整個事件流的處理。
observer中的onSubscribe在訂閱時即被調用,並傳回了Disposable, observer中可以利用Disposable來隨時中斷事件流的發射。
今天所列舉的例子是最簡單的一個事件處理流程,沒有使用線程調度,Rxjava最強大的就是異步時對線程的調度和隨時切換觀察者線程,未完待續。
上面分析了Rxjava是如何創建事件源,如何發射事件,何時發射事件,也清楚了上游和下游是如何關聯起來的。
下面着重來分析下Rxjava強大的線程調度是如何實現的。
二、RxJava的線程調度機制
簡單的例子
1 private void doSomeWork() { 2 Observable.create(new ObservableOnSubscribe<String>() { 3 @Override 4 public void subscribe(ObservableEmitter<String> e) throws Exception { 5 Log.i("lx", " subscribe: " + Thread.currentThread().getName()); 6 Thread.sleep(2000); 7 e.onNext("a"); 8 e.onComplete(); 9 } 10 }).subscribe(new Observer<String>() { 11 @Override 12 public void onSubscribe(Disposable d) { 13 Log.i("lx", " onSubscribe: " + Thread.currentThread().getName()); 14 } 15 @Override 16 public void onNext(String str) { 17 Log.i("lx", " onNext: " + Thread.currentThread().getName()); 18 } 19 @Override 20 public void onError(Throwable e) { 21 Log.i("lx", " onError: " + Thread.currentThread().getName()); 22 } 23 @Override 24 public void onComplete() { 25 Log.i("lx", " onComplete: " + Thread.currentThread().getName()); 26 } 27 }); 28 }
運行結果:
1 com.rxjava2.android.samples I/lx: onSubscribe: main 2 com.rxjava2.android.samples I/lx: subscribe: main 3 com.rxjava2.android.samples I/lx: onNext: main 4 com.rxjava2.android.samples I/lx: onComplete: main
因為此方法筆者是在main線程中調用的,所以沒有進行線程調度的情況下,所有方法都運行在main線程中。但我們知道Android的UI線程是不能做網絡操作,也不能做耗時操作,所以一般我們把網絡或耗時操作都放在非UI線程中執行。接下來我們就來感受下Rxjava強大的線程調度能力。
1 private void doSomeWork() { 2 Observable.create(new ObservableOnSubscribe<String>() { 3 @Override 4 public void subscribe(ObservableEmitter<String> e) throws Exception { 5 Log.i("lx", " subscribe: " + Thread.currentThread().getName()); 6 Thread.sleep(2000); 7 e.onNext("a"); 8 e.onComplete(); 9 } 10 }).subscribeOn(Schedulers.io()) //增加了這一句 11 .subscribe(new Observer<String>() { 12 @Override 13 public void onSubscribe(Disposable d) { 14 Log.i("lx", " onSubscribe: " + Thread.currentThread().getName()); 15 } 16 @Override 17 public void onNext(String str) { 18 Log.i("lx", " onNext: " + Thread.currentThread().getName()); 19 } 20 @Override 21 public void onError(Throwable e) { 22 Log.i("lx", " onError: " + Thread.currentThread().getName()); 23 } 24 @Override 25 public void onComplete() { 26 Log.i("lx", " onComplete: " + Thread.currentThread().getName()); 27 } 28 }); 29 }
運行結果:
1 com.rxjava2.android.samples I/lx: onSubscribe: main 2 com.rxjava2.android.samples I/lx: subscribe: RxCachedThreadScheduler-1 3 com.rxjava2.android.samples I/lx: onNext: RxCachedThreadScheduler-1 4 com.rxjava2.android.samples I/lx: onComplete: RxCachedThreadScheduler-1
只增加了subscribeOn
這一句代碼, 就發生如此神奇的現象,除了onSubscribe方法還運行在main線程(訂閱發生的線程)其它方法全部都運行在一個名為RxCachedThreadScheduler-1的線程中。我們來看看rxjava是怎么完成這個線程調度的。
線程調度subscribeOn
首先我們先分析下Schedulers.io()
這個東東。
1 @NonNull 2 public static Scheduler io() { 3 return RxJavaPlugins.onIoScheduler(IO); // hook function 4 // 等價於 5 return IO; 6 }
再看看IO是什么, IO是個static變量,初始化的地方是
1 IO = RxJavaPlugins.initIoScheduler(new IOTask()); // 又是hook function 2 // 等價於 3 IO = callRequireNonNull(new IOTask()); 4 // 等價於 5 IO = new IOTask().call();
繼續看看IOTask
1 static final class IOTask implements Callable<Scheduler> { 2 @Override 3 public Scheduler call() throws Exception { 4 return IoHolder.DEFAULT; 5 // 等價於 6 return new IoScheduler(); 7 } 8 }
代碼層次很深,為了便於記憶,我們再回顧一下:
1 Schedulers.io()等價於 new IoScheduler() 2 3 // Schedulers.io()等價於 4 @NonNull 5 public static Scheduler io() { 6 return new IoScheduler(); 7 }
好了,排除了其他干擾代碼,接下來看看IoScheduler()是什么東東了
IoScheduler看名稱就知道是個IO線程調度器,根據代碼注釋得知,它就是一個用來創建和緩存線程的線程池。看到這個豁然開朗了,原來Rxjava就是通過這個調度器來調度線程的,至於具體怎么實現我們接着往下看
1 public IoScheduler() { 2 this(WORKER_THREAD_FACTORY); 3 } 4 5 public IoScheduler(ThreadFactory threadFactory) { 6 this.threadFactory = threadFactory; 7 this.pool = new AtomicReference<CachedWorkerPool>(NONE); 8 start(); 9 } 10 11 @Override 12 public void start() { 13 CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory); 14 if (!pool.compareAndSet(NONE, update)) { 15 update.shutdown(); 16 } 17 } 18 19 CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { 20 this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L; 21 this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>(); 22 this.allWorkers = new CompositeDisposable(); 23 this.threadFactory = threadFactory; 24 25 ScheduledExecutorService evictor = null; 26 Future<?> task = null; 27 if (unit != null) { 28 evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY); 29 task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS); 30 } 31 evictorService = evictor; 32 evictorTask = task; 33 }
從上面的代碼可以看出,new IoScheduler()
后Rxjava會創建CachedWorkerPool
的線程池,同時也創建並運行了一個名為RxCachedWorkerPoolEvictor
的清除線程,主要作用是清除不再使用的一些線程。
但目前只創建了線程池並沒有實際的thread,所以Schedulers.io()
相當於只做了線程調度的前期准備。
OK,終於可以開始分析Rxjava是如何實現線程調度的。回到Demo來看subscribeOn()
方法的內部實現:
1 public final Observable<T> subscribeOn(Scheduler scheduler) { 2 ObjectHelper.requireNonNull(scheduler, "scheduler is null"); 3 return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); 4 }
很熟悉的代碼RxJavaPlugins.onAssembly
,上一篇已經分析過這個方法,就是個hook function, 等價於直接return new ObservableSubscribeOn<T>(this, scheduler);
, 現在知道了這里的scheduler其實就是IoScheduler。
跟蹤代碼進入ObservableSubscribeOn
,
可以看到這個ObservableSubscribeOn 繼承自Observable,並且擴展了一些屬性,增加了scheduler。 各位看官,這不就是典型的裝飾模式嘛,Rxjava中大量用到了裝飾模式,后面還會經常看到這種wrap類。
上篇文章我們已經知道了Observable.subscribe()
方法最終都是調用了對應的實現類的subscribeActual
方法。我們重點分析下subscribeActual
:
1 @Override 2 public void subscribeActual(final Observer<? super T> s) { 3 final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); 4 5 // 沒有任何線程調度,直接調用的,所以下游的onSubscribe方法沒有切換線程, 6 //本文demo中下游就是觀察者,所以我們明白了為什么只有onSubscribe還運行在main線程 7 s.onSubscribe(parent); 8 9 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); 10 }
SubscribeOnObserver
也是裝飾模式的體現, 是對下游observer
的一個wrap
,只是添加了Disposable
的管理。
接下來分析最重要的scheduler.scheduleDirect(new SubscribeTask(parent))
1 // 這個類很簡單,就是一個Runnable,最終運行上游的subscribe方法 2 final class SubscribeTask implements Runnable { 3 private final SubscribeOnObserver<T> parent; 4 5 SubscribeTask(SubscribeOnObserver<T> parent) { 6 this.parent = parent; 7 } 8 9 @Override 10 public void run() { 11 source.subscribe(parent); 12 } 13 } 14 @NonNull 15 public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { 16 // IoSchedular 中的createWorker() 17 final Worker w = createWorker(); 18 // hook decoratedRun=run; 19 final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); 20 // decoratedRun的wrap,增加了Dispose的管理 21 DisposeTask task = new DisposeTask(decoratedRun, w); 22 // 線程調度 23 w.schedule(task, delay, unit); 24 25 return task; 26 }
回到IoSchedular
1 public Worker createWorker() { 2 // 工作線程是在此時創建的 3 return new EventLoopWorker(pool.get()); 4 } 5 6 public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { 7 if (tasks.isDisposed()) { 8 // don't schedule, we are unsubscribed 9 return EmptyDisposable.INSTANCE; 10 } 11 // action 中就包含上游subscribe的runnable 12 return threadWorker.scheduleActual(action, delayTime, unit, tasks); 13 }
最終線程是在這個方法內調度並執行的。
1 public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { 2 // decoratedRun = run, 包含上游subscribe方法的runnable 3 Runnable decoratedRun = RxJavaPlugins.onSchedule(run); 4 5 // decoratedRun的wrap,增加了dispose的管理 6 ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); 7 8 if (parent != null) { 9 if (!parent.add(sr)) { 10 return sr; 11 } 12 } 13 14 // 最終decoratedRun被調度到之前創建或從線程池中取出的線程, 15 // 也就是說在RxCachedThreadScheduler-x運行 16 Future<?> f; 17 try { 18 if (delayTime <= 0) { 19 f = executor.submit((Callable<Object>)sr); 20 } else { 21 f = executor.schedule((Callable<Object>)sr, delayTime, unit); 22 } 23 sr.setFuture(f); 24 } catch (RejectedExecutionException ex) { 25 if (parent != null) { 26 parent.remove(sr); 27 } 28 RxJavaPlugins.onError(ex); 29 } 30 31 return sr; 32 }
至此我們終於明白了Rxjava是如何調度線程並執行的,通過subscribeOn方法將上游生產事件的方法運行在指定的調度線程中。
1 com.rxjava2.android.samples I/lx: onSubscribe: main 2 com.rxjava2.android.samples I/lx: subscribe: RxCachedThreadScheduler-1 3 com.rxjava2.android.samples I/lx: onNext: RxCachedThreadScheduler-1 4 com.rxjava2.android.samples I/lx: onComplete: RxCachedThreadScheduler-1
從上面的運行結果來看,因為上游生產者已被調度到RxCachedThreadScheduler-1
線程中,同時發射事件並沒有切換線程,所以發射后消費事件的onNext onErro onComplete
也在RxCachedThreadScheduler-1
線程中。
總結
Schedulers.io()
等價於new IoScheduler()。
new IoScheduler()
Rxjava
創建了線程池,為后續創建線程做准備,同時創建並運行了一個清理線程RxCachedWorkerPoolEvictor
,定期執行清理任務。subscribeOn()
返回一個ObservableSubscribeOn
對象,它是Observable
的一個裝飾類,增加了scheduler
。- 調用
subscribe()
方法,在這個方法調用后,subscribeActual()
被調用,才真正執行了IoSchduler
中的createWorker()
創建線程並運行,最終將上游Observable
的subscribe()
方法調度到新創建的線程中運行。
現在了解了被觀察者執行線程是如何被調度到指定線程中執行的,但很多情況下,我們希望觀察者(事件下游)處理事件最好在UI線程執行,比如更新UI操作等。下面分析下游何時調度,如何調度由於篇幅問題。
簡單的例子
1 private void doSomeWork() { 2 Observable.create(new ObservableOnSubscribe<String>() { 3 @Override 4 public void subscribe(ObservableEmitter<String> e) throws Exception { 5 Log.i("lx", " subscribe: " + Thread.currentThread().getName()); 6 e.onNext("a"); 7 e.onComplete(); 8 } 9 }).subscribeOn(Schedulers.io()) 10 .observeOn(AndroidSchedulers.mainThread()) 11 .subscribe(new Observer<String>() { 12 @Override 13 public void onSubscribe(Disposable d) { 14 Log.i("lx", " onSubscribe: " + Thread.currentThread().getName()); 15 } 16 @Override 17 public void onNext(String str) { 18 Log.i("lx", " onNext: " + Thread.currentThread().getName()); 19 } 20 @Override 21 public void onError(Throwable e) { 22 Log.i("lx", " onError: " + Thread.currentThread().getName()); 23 } 24 @Override 25 public void onComplete() { 26 Log.i("lx", " onComplete: " + Thread.currentThread().getName()); 27 } 28 }); 29 }
看看運行結果:
1 com.rxjava2.android.samples I/lx: onSubscribe: main 2 com.rxjava2.android.samples I/lx: subscribe: RxCachedThreadScheduler-1 3 com.rxjava2.android.samples I/lx: onNext: main 4 com.rxjava2.android.samples I/lx: onComplete: main
從結果可以看出,事件的生產線程運行在RxCachedThreadScheduler-1中,而事件的消費線程則被調度到了main線程中。關鍵代碼是因為這句.observeOn(AndroidSchedulers.mainThread())
。 下面我們着重分析下這句代碼都做了哪些事情。
AndroidSchedulers.mainThread()
先來看看AndroidSchedulers.mainThread()
是什么?貼代碼
1 /** A {@link Scheduler} which executes actions on the Android main thread. */ 2 public static Scheduler mainThread() { 3 return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD); 4 }
注釋已經說的很明白了,是一個在主線程執行任務的scheduler,接着看
1 private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler( 2 new Callable<Scheduler>() { 3 @Override public Scheduler call() throws Exception { 4 return MainHolder.DEFAULT; 5 } 6 }); 7 8 public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler) { 9 if (scheduler == null) { 10 throw new NullPointerException("scheduler == null"); 11 } 12 Function<Callable<Scheduler>, Scheduler> f = onInitMainThreadHandler; 13 if (f == null) { 14 return callRequireNonNull(scheduler); 15 } 16 return applyRequireNonNull(f, scheduler); 17 }
代碼很簡單,這個AndroidSchedulers.mainThread()
想當於new HandlerScheduler(new Handler(Looper.getMainLooper()))
,原來是利用Android
的Handler
來調度到main
線程的。
我們再看看HandlerScheduler
,它與我們上節分析的IOScheduler
類似,都是繼承自Scheduler
,所以AndroidSchedulers.mainThread()
其實就是是創建了一個運行在main thread
上的scheduler。
好了,我們再回過頭來看observeOn
方法。
observeOn
1 public final Observable<T> observeOn(Scheduler scheduler) { 2 return observeOn(scheduler, false, bufferSize()); 3 } 4 5 public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { 6 ObjectHelper.requireNonNull(scheduler, "scheduler is null"); 7 ObjectHelper.verifyPositive(bufferSize, "bufferSize"); 8 return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); 9 } 10
重點是這個new ObservableObserveOn
,看名字是不是有種似成相識的感覺,還記得上篇的ObservableSubscribeOn
嗎? 它倆就是親兄弟,是繼承自同一個父類。
重點還是這個方法,我們前文已經提到了,Observable的subscribe方法最終都是調用subscribeActual
方法。下面看看這個方法的實現:
1 @Override 2 protected void subscribeActual(Observer<? super T> observer) { 3 // scheduler 就是前面提到的 HandlerScheduler,所以進入else分支 4 if (scheduler instanceof TrampolineScheduler) { 5 source.subscribe(observer); 6 } else { 7 // 創建 HandlerWorker 8 Scheduler.Worker w = scheduler.createWorker(); 9 // 調用上游Observable的subscribe,將訂閱向上傳遞 10 source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); 11 } 12 }
從上面代碼可以看到使用了ObserveOnObserver
類對observer
進行裝飾,好了,我們再來看看ObserveOnObserver
。
我們已經知道了,事件源發射的事件,是通過observer的onNext
,onError
,onComplete
發射到下游的。所以看看ObserveOnObserver
的這三個方法是如何實現的。
由於篇幅問題,我們只分析onNext
方法,onError
和onComplete
方法有興趣的同學可以自己分析下。
1 @Override 2 public void onNext(T t) { 3 if (done) { 4 return; 5 } 6 7 // 如果是非異步方式,將上游發射的時間加入到隊列 8 if (sourceMode != QueueDisposable.ASYNC) { 9 queue.offer(t); 10 } 11 schedule(); 12 } 13 14 void schedule() { 15 // 保證只有唯一任務在運行 16 if (getAndIncrement() == 0) { 17 // 調用的就是HandlerWorker的schedule方法 18 worker.schedule(this); 19 } 20 } 21 22 @Override 23 public Disposable schedule(Runnable run, long delay, TimeUnit unit) { 24 if (run == null) throw new NullPointerException("run == null"); 25 if (unit == null) throw new NullPointerException("unit == null"); 26 27 if (disposed) { 28 return Disposables.disposed(); 29 } 30 31 run = RxJavaPlugins.onSchedule(run); 32 33 ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); 34 35 Message message = Message.obtain(handler, scheduled); 36 message.obj = this; // Used as token for batch disposal of this worker's runnables. 37 38 handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay))); 39 40 // Re-check disposed state for removing in case we were racing a call to dispose(). 41 if (disposed) { 42 handler.removeCallbacks(scheduled); 43 return Disposables.disposed(); 44 } 45 46 return scheduled; 47 }
schedule
方法將傳入的run
調度到對應的handle
所在的線程來執行,這個例子里就是有main
線程來完成。 再回去看看前面傳入的run
吧。
回到ObserveOnObserver
中的run
方法:
1 @Override 2 public void run() { 3 // 此例子中代碼不會進入這個分支,至於這個drainFused是什么,后面章節再討論。 4 if (outputFused) { 5 drainFused(); 6 } else { 7 drainNormal(); 8 } 9 } 10 11 void drainNormal() { 12 int missed = 1; 13 14 final SimpleQueue<T> q = queue; 15 final Observer<? super T> a = actual; 16 17 for (;;) { 18 if (checkTerminated(done, q.isEmpty(), a)) { 19 return; 20 } 21 22 for (;;) { 23 boolean d = done; 24 T v; 25 26 try { 27 // 從隊列中queue中取出事件 28 v = q.poll(); 29 } catch (Throwable ex) { 30 Exceptions.throwIfFatal(ex); 31 s.dispose(); 32 q.clear(); 33 a.onError(ex); 34 worker.dispose(); 35 return; 36 } 37 boolean empty = v == null; 38 39 if (checkTerminated(d, empty, a)) { 40 return; 41 } 42 43 if (empty) { 44 break; 45 } 46 //調用下游observer的onNext將事件v發射出去 47 a.onNext(v); 48 } 49 50 missed = addAndGet(-missed); 51 if (missed == 0) { 52 break; 53 } 54 } 55 }
至此我們明白了RXjava是如何調度消費者線程了。
消費者線程調度流程概括
Rxjava調度消費者現在的流程,以observeOn(AndroidSchedulers.mainThread())
為例。
AndroidSchedulers.mainThread()
先創建一個包含handler
的Scheduler
, 這個handler
是主線程的handler
。observeOn
方法創建ObservableObserveOn
,它是上游Observable
的一個裝飾類,其中包含前面創建的Scheduler
和bufferSize
等.- 當訂閱方法
subscribe
被調用后,ObservableObserveOn
的subscribeActual
方法創建Scheduler.Worker
並調用上游的subscribe
方法,同時將自身接收的參數'observer'用裝飾類ObserveOnObserver
裝飾后傳遞給上游。 - 當上游調用被
ObserveOnObserver
的onNext
、onError
和onComplete
方法時,ObserveOnObserver
將上游發送的事件通通加入到隊列queue
中,然后再調用scheduler
將處理事件的方法調度到對應的線程中(本例會調度到main thread)。 處理事件的方法將queue
中保存的事件取出來,調用下游原始的observer再發射出去。 - 經過以上流程,下游處理事件的消費者線程就運行在了
observeOn
調度后的thread中。
總結
經過前面兩節的分析,我們已經明白了Rxjava是如何對線程進行調度的。
- Rxjava的
subscribe
方法是由下游一步步向上游進行傳遞的。會調用上游的subscribe
,直到調用到事件源。
如: source.subscribe(xxx);
而上游的source
往往是經過裝飾后的Observable
, Rxjava就是利用ObservableSubscribeOn
將subscribe
方法調度到了指定線程運行,生產者線程最終會運行在被調度后的線程中。但多次調用subscribeOn
方法會怎么樣呢? 我們知道因為subscribe
方法是由下而上傳遞的,所以事件源的生產者線程最終都只會運行在第一次執行subscribeOn
所調度的線程中,換句話就是多次調用subscribeOn
方法,只有第一次有效。
- Rxjava發射事件是由上而下發射的,上游的
onNext
、onError
、onComplete
方法會調用下游傳入的observer的對應方法。往往下游傳遞的observer對象也是經過裝飾后的observer對象。Rxjava就是利用ObserveOnObserver
將執行線程調度后,再調用下游對應的onNext
、onError
、onComplete
方法,這樣下游消費者就運行再了指定的線程內。 那么多次調用observeOn
調度不同的線程會怎么樣呢? 因為事件是由上而下發射的,所以每次用observeOn
切換完線程后,對下游的事件消費都有效,比如下游的map操作符。最終的事件消費線程運行在最后一個observeOn
切換后線程中。 - 另外通過源碼可以看到
onSubscribe
運行在subscribe
的調用線程中,這個就不具體分析了。