一張圖解釋RxJava中的線程控制


如果調用鏈中包含多個subscribeOn和observeOn,會是什么情況?

這實際上是一個至關重要的問題,因為在任何情況下,我們都應該弄清楚我們寫的每一行代碼到底是運行在哪個線程上。這個問題絕對不能含糊。

假設有下面這段偽代碼:

[代碼]java代碼:

?
01
02
03
04
05
06
07
08
09
10
11
12
13
Observable.create(...)
     .lift1(...)
     .subscribeOn(scheduler1)
     .lift2(...)
     .observeOn(scheduler2)
     .lift3(...)
     .subscribeOn(scheduler3)
     .lift4(...)
     .observeOn(scheduler4)
     .doOnSubscribe(...)
     .subscribeOn(scheduler5)
     .observeOn(scheduler6)
     .subscribe(...);

 

其中,lift1, lift2, lift3, lift4指的是基於lift實現的變換操作,比如filter, map, reduce等。

那么在這段代碼中:

  • lift1, lift2, lift3, lift4指定的代碼分別在哪個線程執行?
  • doOnSubscribe指定的代碼在哪個線程執行?
  • 產生事件的代碼(create指定的代碼)在哪個線程執行?
  • 消費事件的代碼(subscribe指定的代碼)在哪個線程執行?

相信很多同學會覺得這段代碼多少有些令人暈眩,在實際中是不太可能出現的。確實,實際中的代碼大都沒有這么復雜,但弄清它有助於我們理解整個RxJava的實現流程。

RxJava流程圖

上面這幅圖表達了一個典型的RxJava調用鏈中控制流的傳遞過程。它可以分成兩個階段:

  1. 驅動階段。整個異步事件流的觸發由subscribe開始。它發起了一個反向驅動過程(從下游到上游),跨過每一個中間的Observable和OnSubscribe,到達第一個Observable(產生事件的源頭)。對應圖中的(1)和(2)。這個階段一般就是從下游到上游調用一次就結束了。
  2. 事件發射階段。第一個Observable開始產生事件,然后事件流就開始正向傳遞,經過每一個中間的Observable,最終到達Subscriber(事件的消費者)。對應圖中的(3)。與前一階段不同,事件從上游往下游傳遞,不是一次就完了,而是多個事件組成的事件流。

我們分析一下這整個流程,其中有幾點需要特別說明一下(注:這里的分析過程涉及RxJava的一些實現細節,如不關心細節可以跳過這一段,直接看后面的結論):

  • 圖中的(1)對應的是調用前一級Observable的OnSubscribe.call,是個無返回值的方法,因此可以切換線程,從而變為異步的。所以用虛線表示。
  • 圖中的(2)對應的是lift操作指定的Operator.call,是個有返回值的方法(輸入一個Subscriber,返回一個新的Subscriber)。因此,它只能同步調用,不能切換線程。所以用實線表示。
  • 圖中的(3)對應的是調用后一級Observable對應的Subscriber(onNext, onCompleted, onError),也都是無返回值的方法,因此可以切換線程,從而變為異步的。所以也用虛線表示。
  • observeOn是基於lift實現的,且切換線程的動作發生在Subscriber(onNext, onCompleted, onError),因此它影響(3)流程上在它下游的所有lift變換。
  • subscribeOn不是基於lift實現的,它直接在調用前一級Observable的OnSubscribe時切換線程。因此,它影響(1)流程上在它上游的所有OnSubscribe調用,直到產生事件的源頭;然后,(3)流程上的所有lift操作也會在新切換到的線程上,直到碰到一個observeOn操作。
  • doOnSubscribe稍微特殊一點。它雖然是基於lift實現的,但它所指定的代碼發生在Operator.call中,不像其它的lift操作,它們指定的代碼發生在Subscriber。因此它的執行線程受它下游的subscribeOn的影響。

結合上面的分析,我們沿着前面流程圖中箭頭所指的方向一路走過去:

  • 首先從調用subscribe方法開始,沿着前面流程圖中的(1)->(2)->(1)->(2)…->(1)路徑(即驅動階段),從下游向上游回溯,每經過一個subscribeOn,線程就切換一次;每次切換的線程環境影響這一路徑上后面(即上游)的doOnSubscribe指定的代碼和產生事件的代碼(create指定的代碼)。
  • 經過事件的源頭(create指定的代碼),轉而進入事件發射階段。
  • 然后,再沿着(3)路徑(即事件發射階段),從上游到下游,每經過一個observeOn,線程就切換一次;每次切換的線程環境影響這一路徑上后面(即下游)的所有lift操作,直至消費事件的代碼(subscribe指定的代碼)。

現在,把前面的描述換一種說法,就很容易得到下面的結論了:

  • doOnSubscribe指定的代碼和產生事件的代碼(create指定的代碼),在它們下游最近的一個subscribeOn指定的Scheduler上執行;如果它們下游沒有subscribeOn了,那么它們就在調用subscribe方法的那一個線程上執行(注意:是調用subscribe方法的那一個線程,不是subscribe指定的代碼執行的那個線程,這是兩回事)。
  • 普通的lift操作(比如filter, map, reduce等)和消費事件的代碼(subscribe指定的代碼),在它們上游最近的一個observeOn指定的Scheduler上執行;如果它們上游沒有observeOn了,那么它們就在位於整個調用鏈最上游的第一個subscribeOn指定的Scheduler上執行;如果沒找到subscribeOn調用,那么它們就在調用subscribe方法的那一個線程上執行。

把這些結論應用在本文開始的那段代碼上,我們很快能得到:

  • 產生事件的代碼(create指定的代碼)在scheduler1上執行;
  • lift1和lift2指定的代碼在scheduler1上執行;
  • lift3和lift4指定的代碼在scheduler2上執行;
  • doOnSubscribe指定的代碼在scheduler5上執行;
  • 消費事件的代碼(subscribe指定的代碼)在scheduler6上執行。
推薦:

RxJava源碼初探


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM