在許多軟件編程任務中,你或多或少期待你的指令將會按照你已經寫好的順序,依次增量執行和完成。但在ReactiveX,很多指令可以通過“觀察者”並行執行,其結果將以任意順序被捕獲。你定義了一種“可觀察的形式“的檢索和轉換數據機制而不是調用方法,然后訂閱觀察者給它,每當之前定義好的機制已經准備好了,這些機制就會觸發常設的哨兵去捕獲並反饋結果。
這種方法的優點是,當你有一大堆的任務是不相互依賴,你就可以同時執行他們,而不是等待每一個來啟動下一個前完成,這樣你的整個任務包只需要花最長的任務時間。
有很多屬於來描述異步編程和設計模型。本文將使用下列術語:一個觀察者(observer)訂閱可觀察到的(Observable)。可觀察到的(Observable)通過調用觀察者的方法來發射項目或通知給它的所有觀察者(observer)。
觀察者有些時候也被稱作是訂閱者,觀看者,響應者。因此這樣的模式通常就叫做響應模式。
在很多存在UI操作的地方,UI上的操作不應該等待耗時執行程序的完成而阻塞。在一般編程模式下,都會采用異步線程+回調的方式完成這樣的交互操作。不過當回調層次越來越多的時候,那代碼可維護性將變得很麻煩。因此ReactiveX最出色的地方就是將多個操作過程按照自定義順序組合完成最終結果,在每次一的操作中只需要關心業務邏輯本身的執行即可。
這些話描述起來比較生硬,一些簡單的使用介紹可以見如下站點:
本文主要針對實現過程做梳理和剖析,因此基礎部分不在做過多闡述。
OK,接下來講講ReactiveX中幾個比較重要的概念
在ReactiveX中,一個觀察者observer訂閱到一個可觀察的對象Observable。無論是某一個還是多個Observable執行,這些觀察者都會做出響應。這樣的模式有利於並發操作,因為這樣不需要去等待Observable去廣播,但是它創建了一個觀察者形式的哨兵,此哨兵在今后的任何時間里隨時准備做適當的響應,Observable也會做出這樣的響應。
下面這這張圖很好的說明了什么是Observables和observers,以及他們之間的轉換關系
關於Observers的創建
下面是采用了偽代碼來展示Observers的實現過程:
- 同步方式:
- 調用一個方法
- 用一個變量存儲方法返回值
- 使用這個變量作為一個新的值做其他事情
例如:
1 |
// 寫一個回調方法,並且指定到 `returnVal` |
- 異步方式:
- 定義一個方法,此方法是做一些事情並帶有來之於異步調用的返回值;這個方法也是observer的一部分
- 定義異步調用自身作為一個Observable
- 通過訂閱的方式連接observer到Observable(這個過程也是初始化Observable的actions)
- 執行你的業務;每當調用返回,observer的方法將會操作它自身返回值,這里的返回值是通過Observable廣播
例如:
1 |
// 定義但是不執行, 訂閱者的onNext方法 |
onNext, onCompleted, and onError
訂閱方法就是展示了observer如何連接到Observable。oberver實現了下列方法的一些子集:
- onNext
每當Observable廣播數據時將會調用該方法。這個方法將會被作為Observable的一個廣播項目參數被發送 - onError
Observable調用此方法表示它內部已經發生異常數據或者發生一些其他錯誤。這樣停止觀察,並且也不會做將來的調用onNext或者onCompleted。該onError方法作為它的參數來指示了錯誤的原因。 - onCompleted
Observable在已經調用了onNext方法作為最后的時間,如果沒有遇到任何錯誤,那么該方法將會被調用
通過Observable的定義,它可能調用onNext零次或者很多次,並且接下來的調用可能是onCompleted或者onError方法,但是不是同時調用,這都是最終才會被調用。在調用過程中,onNext通常稱作任務的執行,而onCompleted或者onError被稱作任務的結果通知
下面是一個subscribe調用例子:
1 |
def myOnNext = { item -> /* 任務執行 */ }; |
更多相關信息也可以參考
Unsubscribing取消訂閱
在ReactiveX實現中,有一個特殊的observer接口是Subscriber,這個接口中有一個unsubscribe方法。當你調用此方法,表示訂閱者不在對當前任何被訂閱的Observables。如果沒有其他observer,那么當前的Observables就會選擇停止對新數據的廣播。
退訂結果將會通過應用於哪些之前觀察者訂閱了的Observable的操作連來聯級返回。這個操作將會導致整個連接鏈上的每一個環節都停止發送動作。這個過程雖然不能保證立即發生,但是,在沒有觀察者仍然觀察這些回調數據的時候,Observable是有可能試圖去發送或者廣播數據的。
Observables的“冷”與“熱”
Observable具體在什么時候發送他的數據隊列?這依賴於Observable。一個“熱”的Observable可能隨着它的創建就會立即發送回調數據,哪些之后訂閱到Observable的任何observer也可以立即發起對觀察隊列的監聽。另一方面,一個“冷”Observable就會等待,直到一個觀察者observer訂閱它之前開始發送動作,所以這樣就能保證觀察者從一開始就能看到整個序列。
Composition
Observables和observers只是ReactiveX的一個開始。通過對標准的觀察者模式的稍微擴展,更好的去處理了事件序列,而不是單個回調。
真正的核心就是“無擴展”,操作符允許你去轉換,合並,操作以及同發送序列被Observables一起發送。也就是說操作符和操作結果都是可發送,可傳遞的。
ReactiveX的操作符允許你以聲明的方式一起構成異步序列,同時還保持着回調函數的高效率, 但沒有嵌套的回調處理程序通常與異步系統相關的問題。
這里羅列一下Observable中定義的一些主要功能點:
- Observable的創建
Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, and Timer - Observable發送項目的轉換
Buffer, FlatMap, GroupBy, Map, Scan, and Window - Observable過濾
Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, and TakeLast - Observable合並
And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, and Zip - 錯誤處理操作符
Catch and Retry - 實用工具操作符
Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, and Using - 條件和布爾運算符
All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, and TakeWhile - 數學和聚集操作符
Average, Concat, Count, Max, Min, Reduce, and Sum - 轉換操作符
To - 可連接到Observable的操作符
Connect, Publish, RefCount, and Replay
下面來詳細說一下ReactiveX的操作符知識
ReactiveX有一些列的操作集合,但是在不同的語言上表現都是大相徑庭的。在一些特殊的語言可能還會有特定的定義操作符。
鏈式操作符
大多數操作符操作一個Observable並且返回一個Observable。這樣允許開發人員以一個鏈式的方式一個接一個的執行操作符。在可修改的鏈式中每一個操作結果Observable都是來之於上一個操作,這里的操作也就是定義的operator。
這里有一些類似於構造器Builder模式,該模式描述了一個含有一系方法的特定類通過操作方法來操作具有相同功能的類的每一項。這個模式也允許你以類似的方式去鏈式操作方法。在Builder模式中,操作方法出現的順序在鏈式中可能不是那么重要,但是在Observable中的操作符順序就很重要。
Observable操作符鏈不會依賴於原來的Observable去操作原始的鏈,但他們會反過來操作,每一個在Observable上的正在操作的operator都是上一個操作立即產生的。
我們也可以自己選擇自定義操作符,具體如何實現可以參考Implementing Your Own Operators。
上面在Observable中羅列過簡單的功能點,下面羅列一下按照類別划分的操作符以及各自的功能:
創建Observables
創建新的Observables的操作符
- Create——通過調用observer方法編程從頭創建一個Observable
- Defer——不立即創建Observable,直到observer觸發訂閱動作。此方法為每一個observer創建一個新的Observable
- Empty/Never/Throw——為非常精確和有限的行為創建Observables
- From——將其他對象或數據結構轉換成一個Observable
- Interval——創建一個具有發出一個整數序列間隔為一個特定的時間間隔的Observable
- Just——把一個對象或一組對象轉換成一個Observable,同時該Observable發送這樣的對象
- Range——創建一個Observable,發送一系列連續的整數
- Repeat——創建一個Observable,發送一個特定的項目或項目重復序列
- Start——創建一個Observable,發送一個函數的返回值
- Timer——創建一個Observable,在一個給定的一段時間延遲后發送一個對象或者項目
轉換Observables
轉換被一個Observable發送的項目的操作符
- Buffer——定期收集從Observable中發出的數據到集合中,並且發送這些集合而不是發送一次
- FlatMap——將一個Observable發送的數據或者項目轉換到Observables中,然后把這些數據壓縮成一個單個的Observable
- GroupBy——拆分一個Observable成多個Observable組,並且每個組發送的數據會租床成一個不同的發送數據組,當然這些發送數據時來至於原始的Observable。這些分組都是通過划分key來實現
- Map——轉換一個Observable發送的每個數據或者項目映射到一個函數上
- Scan——應用一個函數給一個Observable發送出來的每一想數據,並且是按照順序發送每個連續值
- Window——定期細分條目從一個Observable到Observable的windows,並且發送結果是這些windows而不是一次發送原始的數據或者項目
過濾Observables
過濾被Observable發送的項目的操作符
- Debounce——如果Observable在一個特定時間間隔過去后沒有發送其他數據或者項目,那么它只發送一個數據或者項目
- Distinct——該Observable不可以發送重復的數據
- ElementAt——只發送被Observable發送的某一個元素
- Filter——一個Observable只發送通過來特定測試描述語的匹配項
- First——只發出第一項,或第一項符合條件的項
- IgnoreElements——不發送任何數據,但是必須反饋它的中斷通知
- Last——只發送最后一項
- Sample——發出Observables周期時間間隔內最新的項
- Skip——跳過發送前幾項
- SkipLast——跳過發送后幾項
- Take——僅僅發送前幾項
- TakeLast——僅僅發送后幾項
合並Observables
將多個Observables合並成單個的Observable的操作符
- And/Then/When——通過Pattern和Plan媒介將兩個或者多個Observables發送的數據或項目合並成集合
- CombineLatest——當某一項數據由兩個Observables發送時,通過一個特殊的函數來合並每一個Observable發送的項,並且最終發送數據是該函數的結果
- Join——合並兩個Observables發送的結果數據。其中兩個Observable的結果遵循如下規則:每當一個Observable在定義的數據窗口中發送一個數據都是依據另外一個Observable發送的數據。
- Merge——通過合並多個Observables發送的結果數據將多個Observables合並成一個
- StartWith——在Observable源開始發送數據項目之前發送一個指定的項目序列
- Switch——轉換一個Observable,並且發送Observables到一個單個Observable,這個單個的Observable發送的項目就是轉換之前的Observables最近發送的項目
- Zip——通過特定的函數合並多個Observable的結果,並且對於每個組合都發出單獨的項目數據,這些數據就是之前定義的合並函數
錯誤處理操作符
錯誤處理操作符主要用於幫助來之於一個Observable里的錯誤通知的恢復功能
- Catch——從OnError方法通知中恢復持續的沒有錯誤的序列
- Retry——如果一個源Observable發送一個onError通知,重新訂閱給它,希望它將沒有錯誤的執行完成
實用工具操作符
一個實用的操作符工具箱
- Delay——按照一個特定量及時的將Observable發送的結果數據向前推移
- Do——注冊一個事件去監聽Observable生命周期
- Materialize/Dematerialize——代表發送出來的項目數據或者通知,或相反過程
- ObserveOn——指定一個observer將會觀察這個Observable的調度
- Serialize——強制一個Observable去做序列化調用
- Subscribe——操作可觀測的排放和通知
- SubscribeOn——指定一個Observable在被訂閱的時候應該使用的調度
- TimeInterval——轉換一個Observable的發送項目到另一個項目,在這些發送項之間,此項目具有指示這些發送的時間開銷功能
- Timeout——鏡像源Observable,但如果某段時間過后沒有任何通知發出將會發出一個錯誤通知
- Timestamp——給一個Observable發送的每一個項目附加一個時間戳
- Using——創建一個一次性的資源,這個資源就像Observable一樣有相同的壽命
條件和布爾運算操作符
評估一個或者多個Observables或者被Observables發送的項目的操作符
- All——確定發出的所有項目滿足某些標准
- Amb——給定兩個或兩個以上的Observable來源,從只有第一個可見發出一個項目發送所有的項目數據
- Contains——決定是否Observable發出一個特定的項
- DefaultIfEmpty——發送項從Observable源,或者如果Observable源沒有任何發送內容,那么將會發送一個默認的項
- SequenceEqual——確定兩個Observables發出相同的序列條目
- SkipUntil——丟棄Observable發出的項,直到第二個Observable發出一項
- SkipWhile——丟棄Observable發出的項,直到指定的條件變成了false
- TakeUntil——在第二個Observable發送一項或者終止之后,丟棄Observable發出的項
- TakeWhile——在指定的條件變成了false之后,丟棄Observable發出的項
數學和聚集操作符
- 操作一個被Observable發送出來的一整個項目序列操作符
- Average——計算一個Observable發送所有結果的平均值,並且發送這個值
- Concat——發送兩個或兩個以上Observables沒有交叉的值
- Count——計算Observable源發出的項目數據數量,只發出這個值
- Max——確定,發送最大值項
- Min——確定,發送最小值項
- Reduce——應用一個函數給一個Observable發送的項,並且發送該函數的結果
- Sum——計算Observable發送的所有數據的求和,並且發送這個求和結果
轉換操作符
- To——將一個Observable轉換到另一個對象或數據結構
可連接到Observable的操作符
指定Observables有更多精確控制訂閱動態的操作符
- Connect——定義一個可連接的Observable發送項目數據給它的訂閱者
- Publish——把一個普通的Observable轉化為一個可連接的Observable(向下轉換)
- RefCount——把一個可連接的Observable轉化成一個看起來就行一個普通的Observable(向上轉換)
- Replay——確保所有的Observables能看到所有發送的相同的項目數據序列,及時是在Observable已經開始發送后才訂閱的
由於Single是Observable的一個衍生變體,因此這里就不再做介紹。有興趣的同學可以查看ReactiveX–Single文檔
一個Subject是一種橋梁或者也可以叫做代理,一個Subject在ReactiveX的實現中既是一個observer也是一個Observable。因為它本身是一個observer,它能訂閱到一個或者多個Observables中,同時它也是一個Observable,他通過重新發送項目數據,能遍歷它所有的observers,同時,它也能發送新的項目數據。
因為一個Subject訂閱到一個Observable時,這將會觸發Observable開始發送他的項目數據(當然這里的操作必須是定義Observable為“冷的”)。
這里還有一些其他介紹可以參考:
- To Use or Not to Use Subject from Dave Sexton’s blog
- Introduction to Rx: Subject
- 101 Rx Samples: ISubject and ISubject<t1,t2>
- Advanced RxJava: Subject by Dávid Karnok
- Using Subjects by Dennis Stoyanov
各種不同的Subject類型
這里有四種不同類型的Subject來滿足於特定的使用場景。 注意:下面的示例圖中每一條帶有向右箭頭橫線都是一個單向過程,藍色的subscribe()方法表示每一次訂閱觸發執行函數。每一次的訂閱觸發即圖中藍色箭頭。
AsyncSubject
只有在源Observable完成之后,一個AsyncSubject將會發送由源Observable發送的最后一個值。(如果源Observable並沒有發送任何值,那么AsyncSubject在完成的時候也不會發送任何值)
AsyncSubject將會發送相同的最終值給接下來的observers。但是,如果源Observable因為錯誤而中斷,AsyncSubject並不會發送任何值,但是會傳遞來之於源Observable的錯誤通知。
通俗的來講,異步的Subject在每次觸發subscribe()方法發送項目的時候,只有在源Observable結束后才會發送源發送的結果。
更詳細介紹參考
BehaviorSubject
當一個observer訂閱到一個BehaviorSubject上時,通過發送當源Observable發送的最近項目數據,這個observer將會被觸發執行。並且它會繼續發送源Observable后續發送的項。
但是,如果源Observable發生錯誤而中斷,BehaviorSubject將不會發送任何數據給隨后的observers。不過,來之於源Observable的錯誤通知任然會傳遞。
更詳細介紹參考
PublishSubject
PublishSubject的主要職責就是將源Observable發送的所有數據發送給隨后一個已經訂閱了的observer。
值得注意的就是,一個PublishSubject可能在創建的時候立即發送項目數據,不過在Subject的創建和observer訂閱到這個Subject的這段時間中,一個或者多個發送項目數據可能存在丟失的風險。如果你要確保傳送所有的源Observable發送項,你可以使用Observable的Create方式來構建,以便你能手動重新構建“冷的”Observable行為。或者你也可以使用ReplaySubject。
如果源Observable發生錯誤而中斷,PublishSubject將不會發送任何項給接下來的observer。不過,來之於源Observable的錯誤通知任然會傳遞。
ReplaySubject
ReplaySubject發送過去源Observable發送的所有項目數據給任意的observer,不管observer在什么時候訂閱。
一旦replay緩沖項逐漸增長超過了一個固定值后,ReplaySubject將會丟棄舊的項。或者給已經發送的數據指定一個有效時間,在失效過后就會扔掉。
如果你使用ReplaySubject作為一個observer,必須確保不會再多線程中調用onNext方法,因為這可能導致亂序調用,這是違反了Observable定義規則的。並且會創建一個有歧義的Subject去replay。
更詳細介紹參考
如果你想在多線程中使用Observable的聯級操作鏈,你可以在特殊的Schedulers上去制定這些操作鏈去操作。
在ReactiveX中Observable操作符將Scheduler作為一個變量,這些操作在一個特定的Scheduler上做一些操作或者所有的工作。
默認情況下,你應用操作鏈到Observable上做一些事情的時候,這將會通知它的observers,在同一個線程中它的Subscribe方法將會被調起。根據Observable應有的操作,定義一個不同的Scheduler,SubscribeOn操作就會改變這些行為。ObserveOn操作指定一個不同的Scheduler,這個Scheduler主要用於Observable去發送通知給它自身的observers上。
如下圖所示,SubscribeOn操作指派哪一個Observable線程將會開始操作,操作鏈中的所有操作都可以被調起。另一方面,在操作出現的地方,ObserveOn會影響下面的Observable將使用的線程。對於這樣的原因,在Observable操作鏈中,你可以在多個點多次調用ObserveOn方法,這樣來確保多線程上的這些操作的執行。
結束語
由於這篇文章大部分來至ReactiveX原始文檔,加上一些的個人理解形成。有一些翻譯或者個人理解會有一定的偏差,再后續會繼續修正。如果閱讀到這篇文章的同學發現有不妥當的地方,還請回復指出,謝謝。
另外,后面有時間打算針對Rxjava做一下源碼上的分析。