如果調用鏈中包含多個subscribeOn和observeOn,會是什么情況?
這實際上是一個至關重要的問題,因為在任何情況下,我們都應該弄清楚我們寫的每一行代碼到底是運行在哪個線程上。這個問題絕對不能含糊。
假設有下面這段偽代碼:
[代碼]java代碼:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
|
Observable.create(...)
.lift1(...)
.subscribeOn(scheduler1)
.lift2(...)
.observeOn(scheduler2)
.lift3(...)
.subscribeOn(scheduler3)
.lift4(...)
.observeOn(scheduler4)
.doOnSubscribe(...)
.subscribeOn(scheduler5)
.observeOn(scheduler6)
.subscribe(...);
|
其中,lift1, lift2, lift3, lift4指的是基於lift實現的變換操作,比如filter, map, reduce等。
那么在這段代碼中:
- lift1, lift2, lift3, lift4指定的代碼分別在哪個線程執行?
- doOnSubscribe指定的代碼在哪個線程執行?
- 產生事件的代碼(create指定的代碼)在哪個線程執行?
- 消費事件的代碼(subscribe指定的代碼)在哪個線程執行?
相信很多同學會覺得這段代碼多少有些令人暈眩,在實際中是不太可能出現的。確實,實際中的代碼大都沒有這么復雜,但弄清它有助於我們理解整個RxJava的實現流程。
上面這幅圖表達了一個典型的RxJava調用鏈中控制流的傳遞過程。它可以分成兩個階段:
- 驅動階段。整個異步事件流的觸發由subscribe開始。它發起了一個反向驅動過程(從下游到上游),跨過每一個中間的Observable和OnSubscribe,到達第一個Observable(產生事件的源頭)。對應圖中的(1)和(2)。這個階段一般就是從下游到上游調用一次就結束了。
- 事件發射階段。第一個Observable開始產生事件,然后事件流就開始正向傳遞,經過每一個中間的Observable,最終到達Subscriber(事件的消費者)。對應圖中的(3)。與前一階段不同,事件從上游往下游傳遞,不是一次就完了,而是多個事件組成的事件流。
我們分析一下這整個流程,其中有幾點需要特別說明一下(注:這里的分析過程涉及RxJava的一些實現細節,如不關心細節可以跳過這一段,直接看后面的結論):
- 圖中的(1)對應的是調用前一級Observable的OnSubscribe.call,是個無返回值的方法,因此可以切換線程,從而變為異步的。所以用虛線表示。
- 圖中的(2)對應的是lift操作指定的Operator.call,是個有返回值的方法(輸入一個Subscriber,返回一個新的Subscriber)。因此,它只能同步調用,不能切換線程。所以用實線表示。
- 圖中的(3)對應的是調用后一級Observable對應的Subscriber(onNext, onCompleted, onError),也都是無返回值的方法,因此可以切換線程,從而變為異步的。所以也用虛線表示。
- observeOn是基於lift實現的,且切換線程的動作發生在Subscriber(onNext, onCompleted, onError),因此它影響(3)流程上在它下游的所有lift變換。
- subscribeOn不是基於lift實現的,它直接在調用前一級Observable的OnSubscribe時切換線程。因此,它影響(1)流程上在它上游的所有OnSubscribe調用,直到產生事件的源頭;然后,(3)流程上的所有lift操作也會在新切換到的線程上,直到碰到一個observeOn操作。
- doOnSubscribe稍微特殊一點。它雖然是基於lift實現的,但它所指定的代碼發生在Operator.call中,不像其它的lift操作,它們指定的代碼發生在Subscriber。因此它的執行線程受它下游的subscribeOn的影響。
結合上面的分析,我們沿着前面流程圖中箭頭所指的方向一路走過去:
- 首先從調用subscribe方法開始,沿着前面流程圖中的(1)->(2)->(1)->(2)…->(1)路徑(即驅動階段),從下游向上游回溯,每經過一個subscribeOn,線程就切換一次;每次切換的線程環境影響這一路徑上后面(即上游)的doOnSubscribe指定的代碼和產生事件的代碼(create指定的代碼)。
- 經過事件的源頭(create指定的代碼),轉而進入事件發射階段。
- 然后,再沿着(3)路徑(即事件發射階段),從上游到下游,每經過一個observeOn,線程就切換一次;每次切換的線程環境影響這一路徑上后面(即下游)的所有lift操作,直至消費事件的代碼(subscribe指定的代碼)。
現在,把前面的描述換一種說法,就很容易得到下面的結論了:
- doOnSubscribe指定的代碼和產生事件的代碼(create指定的代碼),在它們下游最近的一個subscribeOn指定的Scheduler上執行;如果它們下游沒有subscribeOn了,那么它們就在調用subscribe方法的那一個線程上執行(注意:是調用subscribe方法的那一個線程,不是subscribe指定的代碼執行的那個線程,這是兩回事)。
- 普通的lift操作(比如filter, map, reduce等)和消費事件的代碼(subscribe指定的代碼),在它們上游最近的一個observeOn指定的Scheduler上執行;如果它們上游沒有observeOn了,那么它們就在位於整個調用鏈最上游的第一個subscribeOn指定的Scheduler上執行;如果沒找到subscribeOn調用,那么它們就在調用subscribe方法的那一個線程上執行。
把這些結論應用在本文開始的那段代碼上,我們很快能得到:
- 產生事件的代碼(create指定的代碼)在scheduler1上執行;
- lift1和lift2指定的代碼在scheduler1上執行;
- lift3和lift4指定的代碼在scheduler2上執行;
- doOnSubscribe指定的代碼在scheduler5上執行;
- 消費事件的代碼(subscribe指定的代碼)在scheduler6上執行。
推薦:

