ReactiveX序列——RxSwift
Swift是蘋果公司新推出的一門現代化的編程語言,並且將其開源出來了,Swift具有很多的優點,這也使得這門語言推出的短時間引起了很大反應的原因,在最近的2016年3月的編程語言排行榜處於第14位,甚至超過了OC(15位)。可見Swift的在開發者心中的地位。
RxSwift的觀察者對象(Observable)
在RxSwift中,可以有多種創建Observable對象的方法,主要有以下幾種:
- asObservable
- create
- deferred
- empty
- error
- toObservable/from
- interval
- never
- just
- of
- range
- repeatElement
- timer
要弄明白Observable就要先弄清楚Observable是繼承了哪些類和協議,從源碼開始分析:
首先第一個是ObservableConvertibleType:
/** Type that can be converted to observable sequence (`Observer<E>`). */ public protocol ObservableConvertibleType { /** Type of elements in sequence. */ typealias E /** Converts `self` to `Observable` sequence. - returns: Observable sequence that represents `self`. */ func asObservable() -> Observable<E> }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
從ObservableConvertibleType協議源碼可以看出,它定義了一個typealias類型別名和asObservable方法,類型別名是用來定義將要處理的類型(例如String,Int等等),而asObervable這個我們在后面會具體敘述。其次是ObservableType,它繼承了ObservableConvertibleType,ObservableType主要干了兩個事情,第一個是創建出subscribe方法,它是用來執行訂閱事件的(onNext、onError/onComplete),第二個就是簡易實現asObservable方法(通過extension ObservableType 實現),asObservable主要是通過Observable.create(subscrible())實現的。再上來就是Observable,它是一個類,繼承了ObservableType協議接口。
下面我們分別對以上幾種創建Observable對象做詳細的介紹。
- asObservable方法:
asObservable其實是相當於clone方法,其內部實現如下:
public func asObservable() -> Observable<E> { return self }
- 1
- 2
- 3
從這里看,它return self也就是自己,這就意味着,你必須先有Observable對象才能調用asObservable方法。例如:
var obs = Observable<String>.create { (observer) -> Disposable in observer.on(.Next("hahah")) observer.on(.Next("deasd")) observer.on(.Completed) return NopDisposable.instance } let observable = obs.asObservable() observable.subscribeOn(MainScheduler.instance) .subscribe{ event in print(event.debugDescription) }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
第二個是subscribe方法,這個方法具體實現調用了一個“抽象”方法,這個“抽象”方法就是打印出來一個錯誤日志並且停止運行。
public func subscribe<O: ObserverType where O.E == E>(observer: O) -> Disposable { abstractMethod() }
- 1
- 2
- 3
- 4
當然,這個Observable類中方法,但是extension Observable其實是有很多用法的。這也是我們上面提到創建Observable的各種方法。
2、create方法
public static func create(subscribe: (AnyObserver<E>) -> Disposable) -> Observable<E> { return AnonymousObservable(subscribe) }
- 1
- 2
- 3
- 4
這是一個“靜態方法”(在class中用static關鍵字標注,在struct和enum中使用class關鍵字標注),這個方法的參數是一個函數(通常我們會用閉包的方式),函數的參數是AnyObserver,返回的是Disposable。AnyObserver其實就是訂閱者,Disposable是一個協議接口,里面只有一個dispose方法,用來釋放一些資源。整個create方法返回的是一個AnonymousObservable(匿名Observable),AnonymousObservable繼承自Producer,Producer實現了線程調度功能,可以安排某個線程來執行run方法。因此create方法返回的AnonymousObservable是可以運行在指定線程中Observable。完整的create例子:
var obs = Observable<String> .create ({ (observer) -> Disposable in observer.on(.Next("hahah")) observer.on(.Next("deasd")) observer.on(.Completed) return NopDisposable.instance }) .observeOn(MainScheduler.instance) .subscribe({event in if let str = event.element { print(str) } }) //.dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
最后obs變量是一個Disposable類型變量,可以繼續調用dispose方法釋放資源。整個代碼輸出結果:
hahah
deasd
- 1
- 2
3、empty方法
public static func empty() -> Observable<E> { return Empty<E>() }
- 1
- 2
- 3
empty方法是一個空方法,里面沒有onNext事件處理,只會處理onComplete方法。empty創建Observable對象比較簡單。代碼例子:
let obs1 = Observable<String>.empty() obs1.subscribe( onNext: {str in print(str)}, onError: { (errorType) -> Void in print(errorType) }, onCompleted: { () -> Void in print("complete") }) { () -> Void in print("dispose") }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
輸出結果:
complete
dispose
- 1
- 2
- 3
這個例子中有四個閉包,其中最后一個是尾隨閉包,而且這些閉包都是可選類型。當然你也可以如下寫法:
let obs1 = Observable<String>.empty() obs1.subscribe( onNext: {str in print(str) }, onError: { (errorType) -> Void in print(errorType) }, onCompleted: { () -> Void in print("complete") }, onDisposed: {() -> Void in print("dispose") })
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
4、never方法
public static func never() -> Observable<E> { return Never() }
- 1
- 2
- 3
官方解釋是返回一個無終止的觀察者事件序列,可以用來表示無限持續時間。盡管我們給安排了next事件,但實際上,他是不會執行的。不會輸出onNext
Observable<String>
.never()
.subscribeNext( { (str) -> Void in print("onNext") }) //.dispose()
- 1
- 2
- 3
- 4
- 5
- 6
5、just方法
public static func just(element: E, scheduler: ImmediateSchedulerType) -> Observable<E> { return JustScheduled(element: element, scheduler: scheduler) }
- 1
- 2
- 3
just方法只能處理單個事件,簡單來說,我們使用just方法不能將一組數據一起處理,只能一個一個處理。例如:
Observable<String>
.just("just test") .subscribeOn(MainScheduler.instance) .subscribeNext({ (str) -> Void in print(str) }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
輸出結果:
just test
- 1
just方法是一個多態方法,允許在傳入參數時候指定線程,例如:
它指定當前線程完成subscribe相關事件。
Observable<String>
.just("just with Scheduler", scheduler: CurrentThreadScheduler.instance) .subscribeNext({ (str) -> Void in print(str) }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
6、error方法
public static func error(error: ErrorType) -> Observable<E> { return Error(error: error) }
- 1
- 2
- 3
error方法是返回一個只能調用onError方法的Observable序列。其中的onNext和OnComleted方法是不會執行的。例如:
public static func error(error: ErrorType) -> Observable<E> { return Error(error: error) } Observable<String> .error(RxError.Timeout) .subscribe( onNext: { (str) -> Void in print(str) print("onNext") }, onError: { (error)-> Void in print(error) }, onCompleted: { () -> Void in print("onCompleted") }, onDisposed: { () -> Void in print("dispose") }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
最后的輸出結果是:
Sequence timeout dispose
- 1
- 2
7、of方法
可以說of方法是just方法的升級版,它可以將一序列的事情組合起來一起處理。極大方便了開發者對數組(Array)、字典(Dictionary)進行分布處理。
public static func of(elements: E ..., scheduler: ImmediateSchedulerType? = nil) -> Observable<E> { return Sequence(elements: elements, scheduler: scheduler) } Observable<String> .of("d1","d2", "d3", "d4") .subscribe( { (event) -> Void in if let els = event.element { print(els) } }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
這里解釋一下subscribe(on: Event->Void)方法,例子中event.element在調用get屬性的時候其實會執行一個onNext方法,它返回的是一個可選類型,因此要用if let解析處理。當然如果代碼改成如下,那么是不會輸出結果的,因為event.error執行的是錯誤監聽(也就是執行的onError方法,因此不會輸出結果)。of和just一樣,存在一個多態方法,可以帶入線程控制。
Observable<String>
.of("d1","d2", "d3", "d4") .subscribe( { (event) -> Void in if let els = event.error{ print(els) } }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
8、deferred方法
deferred方法是延時創建Observable對象,當subscribe的時候才去創建,它為每一個bserver創建一個新的Observable; deferred采用一個Factory函數型作為參數,Factory函數返回的是Observable類型。這也是其延時創建Observable的主要實現。
public static func deferred(observableFactory: () throws -> Observable<E>) -> Observable<E> { return Deferred(observableFactory: observableFactory) }
- 1
- 2
- 3
- 4
整個deferred方法的原理如上圖,從圖中可以看出,deferred不是第一步創建Observable,而是在subscriber的時候創建的。(圖中紅色的是error,綠色的是next事件)
9、generate方法
public static func generate(initialState initialState: E, condition: E throws -> Bool, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance, iterate: E throws -> E) -> Observable<E> { return Generate(initialState: initialState, condition: condition, iterate: iterate, resultSelector: { $0 }, scheduler: scheduler) }
- 1
- 2
- 3
generate方法是一個迭代器,它一直循環onNext事件,直到condition不滿足要求退出。generate有四個參數,第一個是最開始的循環變量,第二個是條件,第三個是迭代器,這個迭代器每次運行都會返回一個E類型,作為下一次是否執行onNext事件源,而是否正的要執行則看是否滿足condition條件。其實我們可以理解generate就是一個循環體(其內部實現也正是一個循環,代碼在:GenerateSink的run方法中)。例子:
Observable<String>
.generate(
initialState: "ah", condition: ({ (str) -> Bool in return str.hasPrefix("ah") }), iterate: ({ (str1) -> String in return "h" + str1 })) //.subscribeOn(MainScheduler.instance) .subscribe ({ (event) -> Void in if let res = event.element { print(res) } }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
輸出結果:
ah
- 1
上面這個例子說的是,初始的變量是“ah”,第一個條件滿足,執行onNext事件,同時生成一個hah,不滿足條件,不執行onNext事件。generate是一個具有高度可變的of方法,它同時兼備了后面要介紹的過濾(filter)特性。當然generate還有一個多態方法,允許傳入執行線程。這個線程是為循環體而生的,並不是為subscrible而生的。
10、repeatElement方法
public static func repeatElement(element: E, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> { return RepeatElement(element: element, scheduler: scheduler) }
- 1
- 2
- 3
repeatElement方法是一個無限循環的,它會一直循環onNext方法。當然這種循環是可以指定線程的。例子:
Observable<String>
.repeatElement("daa") .subscribeNext { (str) -> Void in print(str) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
其中subscribeNext是一個尾隨閉包。
11、using方法
public static func using<R: Disposable>(resourceFactory: () throws -> R, observableFactory: R throws -> Observable<E>) -> Observable<E> { return Using(resourceFactory: resourceFactory, observableFactory: observableFactory) }
- 1
- 2
- 3
using方法是通過Factory方法生成一個對象(resourceFactory)再轉換成Observable,中間我們要使用Factory方法,上面已經介紹過一次Factory方法。using方法相對其他的方法比較復雜和特殊,原因是using方法是由我們構建出資源和構建清除資源的,中間通過一個轉換生成Observable對象。
Observable<String>
.using( { () -> Student<String> in return Student(source: Observable<String>.just("jarlene"), disposeAction: { () -> () in print("hah") }) }, observableFactory: { (r) -> Observable<String> in return r.asObservable() }) .subscribeNext( { (ss) -> Void in print(ss) }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
其中Student類繼承了兩個協議:ObservableConvertibleType和Disposable;ObservableConvertibleType是為了生成Observable對象(通過調用asObservable方法),Disposable是為了清除資源。源碼如下:
class Student<E>: ObservableConvertibleType, Disposable{ private let _source: Observable<E> private let _dispose: AnonymousDisposable init(source: Observable<E>, disposeAction: () -> ()) { _source = source _dispose = AnonymousDisposable(disposeAction) } func dispose() { _dispose.dispose() } func asObservable() -> Observable<E> { return _source } var name :String{ get { return self.name } set { self.name = newValue } } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
在上面例子中,我們采用Observable.just方法生成了一個Observable對象傳遞給Student對象,同時也定義了一個釋放資源的方法。等到調用dispose()方法,就會執行我們定義的釋放資源的方法。例子結果為:
jarlene
hah
- 1
- 2
12、range方法
public static func range(start start: E, count: E, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> { return RangeProducer<E>(start: start, count: count, scheduler: scheduler) }
- 1
- 2
- 3
range方法其實方便版of方法,其功能和of差不多,我們只要輸出start和count然后就能生成一組數據,讓他們執行onNext。值得注意的是,range方法只生成Observable型。在調用bindNext的時候可以將其對應成其他相應的類型。
例如:
let arr: [String] = ["ad", "cd", "ef", "gh"] Observable<Int> .range(start: 1, count: 3) .subscribeNext { (n) -> Void in print(arr[n]) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
結果
cd ef gh
- 1
- 2
- 3
13、toObservable(from)
public func toObservable(scheduler: ImmediateSchedulerType? = nil) -> Observable<Generator.Element> { return Sequence(elements: self, scheduler: scheduler) }
- 1
- 2
- 3
- 4
toObservable方法是擴展自Array,是將一個一個array轉換成Observable,其內部實調用了一個序列Sequence,其用法很簡單。
let arr: [String] = ["ab", "cd", "ef", "gh"] arr.toObservable() .subscribeNext { (s) -> Void in print(s) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
運行結果:
ab
cd ef gh
- 1
- 2
- 3
- 4
14、interval/timer
public static func interval(period: RxTimeInterval, scheduler: SchedulerType) -> Observable<E> { return Timer(dueTime: period, period: period, scheduler: scheduler ) } public static func timer(dueTime: RxTimeInterval, period: RxTimeInterval? = nil, scheduler: SchedulerType) -> Observable<E> { return Timer( dueTime: dueTime, period: period, scheduler: scheduler ) }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
interval方法是定時產生一個序列,interval第一個參數就是時間間隔,第二個參數是指定線程。 可以看出interval是range和repeateElement的結合。timer方法和interval方法類似。差別在於timer可以設置間隔時間和持續時間,而interval的間隔時間和持續時間是一樣的。
至此,我們將Observable對象基本的產生方法都講述完了,下一節開始我們詳細講述Observer的創建以及制作器Producer,其次將詳細敘述Producer和事件方法onNext、onError、onComplete之間的聯系,以及Producer是怎么調度線程來完成線程控制的。
RxSwift的觀察者對象的變換(Transform Observable)和過濾(Filter Observable)
對觀察着對象進行變換使得一個對象變換成另一個對象,這個是RxSwift核心之一,因此對於熟悉RxSwift特別重要。RxSwift存在以下變換方法:
- buffer
- flatMap
- flatMapFirst
- flatMapLatest
- map
- scan
- window
過濾方法
- debounce / throttle
- distinctUntilChanged
- elementAt
- filter
- sample
- skip
- take
- takeLast
- single
下面我們分別對以上幾種對Observable對象變換做詳細的介紹(不全部闡述)。
1、 buffer方法:
buffer方法是extension ObservableType中的一個方法,它的作用是緩沖組合,第一個參數是緩沖時間,第二個參數是緩沖個數,第三個參數是線程。總體來說就是經過一定時間,將一定個數的事件組合成一個數組,一起處理,在組合的過程中,你可以選擇線程。
public func buffer(timeSpan timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType) -> Observable<[E]> { return BufferTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler) }
- 1
- 2
- 3
- 4
例子:
Observable<String>
.of("ab", "cd", "ef", "gh") .buffer(timeSpan: 1, count: 2, scheduler: MainScheduler.instance) .subscribeNext({ (n) -> Void in if !n.isEmpty { print(n) } }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
輸出結果
["ab", "cd"] ["ef", "gh"]
- 1
- 2
2、flatMap
flatMap也是擴展自ObservableType,它的作用是將一種類型變換成另一種類型。flatMap的參數是一個方法,這個方法的輸入參數與Observable的E是同一種類型,輸出依然是Observable類型。
public func flatMap<O: ObservableConvertibleType>(selector: (E) throws -> O) -> Observable<O.E> { return FlatMap(source: asObservable(), selector: selector) }
- 1
- 2
- 3
- 4
我們看一個例子,例子中首先是一組Observable,通過flatMap后還是一組Observable,但是flatMap作用是,如果元素中遇到“a”字母開頭的,那么它就重新組裝一個數組,這個數組是只有元素和“a”;如果元素不是“a”字母開頭的就與“b”字母組裝成另一個數組。這兩種情況都通過調用toObservable返回Observable。flatMapFirst、flatMapLast、flatMapWithIndex都是類似的作用,這里就不重復。
Observable<String>
.of("ab", "cd", "aef", "gh") .flatMap({ (element: String) -> Observable<String> in if element.hasPrefix("a") { let sd : [String] = [element, "a"] return sd.toObservable() } else { let sd : [String] = [element, "b"] return sd.toObservable() } }) .subscribeNext({ (n) -> Void in if !n.isEmpty { print(n) } }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
結果
ab
a cd b aef a gh b
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
3、map
map方法是通過其實flatMap的簡化版本,它返回的可以是任何類型。其中R是返回類型。
public func map<R>(selector: Self.E throws -> R) -> RxSwift.Observable<R>
- 1
例子:
Observable<String>
.of("ab", "cd", "aef", "gh") .map({ (str) -> String in return str+"ss" }) .subscribeNext({ (n) -> Void in if !n.isEmpty { print(n) } }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
結果
abss
cdss
aefss
ghss
- 1
- 2
- 3
- 4
4、scan方法
scan方法有兩個參數,第一個參數是種子,第二個參數是加速器。所謂的種子就是最初的狀態,加速器就是將每一次運行的結果延續到下一次。scan方法也是擴展自ObservableType
public func scan<A>(seed: A, accumulator: (A, E) throws -> A) -> Observable<A> { return Scan(source: self.asObservable(), seed: seed, accumulator: accumulator) }
- 1
- 2
- 3
- 4
例子:
Observable<String>
.of("a", "b", "c", "d", "e") .scan("s", accumulator: { (a, b) -> String in return a+b }) .subscribeNext({ (n) -> Void in print(n) }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
這個例子中是將所有的字符依次串起來,運行結果是:
sa
sab
sabc
sabcd
sabcde
- 1
- 2
- 3
- 4
- 5
5、window
window方法同樣擴展自ObservableType,它有三個參數,第一個是時間間隔,第二個是數量,第三個是線程。時間間隔指的的是window方法開窗的時間間隔;第二個參數數量指的的是每次通過窗口的個數;線程就是這種操作執行在什么線程上。起源碼如下:
public func window(timeSpan timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType) -> Observable<Observable<E>> { return WindowTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler) }
- 1
- 2
- 3
- 4
需要特別注意的是window方法之后,返回的是Observable
Observable<String>
.of("ab", "bc", "cd", "de", "ef") .window(timeSpan: 1, count: 2, scheduler: MainScheduler.instance) .subscribeNext({ (n) -> Void in n.subscribeNext({ (ss) -> Void in print(ss) }) }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
結果:
ab
bc
cd de ef
- 1
- 2
- 3
- 4
- 5
變換的方法基本就這些,但是開發者可以通過自定義的方式擴展變換的方法以達到所需的目的。接下來我們看看過濾方法。
1、debounce / throttle
debounce/throttle 方法在規定的時間中過濾序列元素,就如上圖描述的一樣,當debounce打開的時候,剛好那個黃色的序列元素過來,那么它就不會通知到事件(onNext、onError、onComplete)上去。下面是debounce方法源碼。
public func throttle(dueTime: RxTimeInterval, scheduler: SchedulerType) -> Observable<E> { return Throttle(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler) }
- 1
- 2
- 3
- 4
例子:
Observable<String>
.of("a", "b", "c", "d", "e", "f") .debounce(1, scheduler: MainScheduler.instance) .subscribeNext { (str) -> Void in print(str) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
輸出結果
f
- 1
2、distinctUntilChanged
distinctUntilChanged 主要是過濾相鄰兩個元素是否重復,重復的話就過濾掉其中之一。
public func distinctUntilChanged<K>(keySelector: (E) throws -> K, comparer: (lhs: K, rhs: K) throws -> Bool) -> Observable<E> { return DistinctUntilChanged(source: self.asObservable(), selector: keySelector, comparer: comparer) }
- 1
- 2
- 3
- 4
例子:
Observable<String>
.of("a", "a", "c", "e", "e", "f") .distinctUntilChanged({ (lhs, rhs) -> Bool in return lhs==rhs }) .subscribeNext { (str) -> Void in print(str) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
輸出結果:
a c e f
- 1
- 2
- 3
- 4
3、elementAt
elementAt方法其實就挑選出所需要的序列元素,上圖描述的很清楚。
這個方法很簡單。沒有什么難點。當index超界的時候,throwOnEmpty參數是否拋出異常。
public func elementAt(index: Int) -> Observable<E> { return ElementAt(source: asObservable(), index: index, throwOnEmpty: true) }
- 1
- 2
- 3
- 4
例子:
Observable<String>
.of("aa", "av", "cs", "ed", "ee", "ff") .elementAt(2) .subscribeNext { (str) -> Void in print(str) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
輸出結果
cs
- 1
4、filter
filter方法很簡單,指出過濾條件就行,滿足過濾條件的就能執行事件通知,否則不行
public func filter(predicate: (E) throws -> Bool) -> Observable<E> { return Filter(source: asObservable(), predicate: predicate) }
- 1
- 2
- 3
- 4
例子:
Observable<String>
.of("aa", "av", "cs", "ed", "ee", "ff") .filter({ (ss) -> Bool in return ss.hasPrefix("a") }) .subscribeNext { (str) -> Void in print(str) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
輸出結果
aa
av
- 1
- 2
接下來的幾個方法都是類似的,這里不就在詳細敘述啦。
RxSwift的Observable事件處理以及線程調度
由第一部分可以知道,幾乎在創建所有的Observable的時候都要用到Producer,而在事件處理(onNext、onError、onComplete)過程中經常要用到線程調度(Scheduler),它們之間存在一種很巧妙的設計。首先先看看Producer源碼。
class Producer<Element> : Observable<Element> { override init() { super.init() } override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable { if !CurrentThreadScheduler.isScheduleRequired { return run(observer) } else { return CurrentThreadScheduler.instance.schedule(()) { _ in return self.run(observer) } } } func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable { abstractMethod() } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
Producer是繼承了Observable類,我們在創建Observable類時候都用到了Producer,那么Producer主要做了兩件事情,第一個實現subscribe方法,在subscribe方法中傳入了observer參數,observer類型是ObserverType,在上一部分介紹了ObserverType中有一個類型別名E,那么在Producer的范型element就必須和ObserverType中類型別名E一樣。回過頭來說subscribe,我們首先看CurrentThreadScheduler 的源碼,CurrentThreadScheduler 是繼承ImmediateSchedulerType協議,它里面就定義了一個方法:
func schedule<StateType>(state: StateType, action: (StateType) -> Disposable) -> Disposable
- 1
而這個方法在CurrentThreadScheduler 具體實現了。
public class CurrentThreadScheduler : ImmediateSchedulerType { typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>> /** The singleton instance of the current thread scheduler. */ public static let instance = CurrentThreadScheduler() static var queue : ScheduleQueue? { get { return NSThread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKeyInstance) } set { NSThread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKeyInstance) } } /** Gets a value that indicates whether the caller must call a `schedule` method. */ public static private(set) var isScheduleRequired: Bool { get { let value: CurrentThreadSchedulerValue? = NSThread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerKeyInstance) return value == nil } set(isScheduleRequired) { NSThread.setThreadLocalStorageValue(isScheduleRequired ? nil : CurrentThreadSchedulerValueInstance, forKey: CurrentThreadSchedulerKeyInstance) } } /** Schedules an action to be executed as soon as possible on current thread. If this method is called on some thread that doesn't have `CurrentThreadScheduler` installed, scheduler will be automatically installed and uninstalled after all work is performed. - parameter state: State passed to the action to be executed. - parameter action: Action to be executed. - returns: The disposable object used to cancel the scheduled action (best effort). */ public func schedule<StateType>(state: StateType, action: (StateType) -> Disposable) -> Disposable { if CurrentThreadScheduler.isScheduleRequired { CurrentThreadScheduler.isScheduleRequired = false let disposable = action(state) defer { CurrentThreadScheduler.isScheduleRequired = true CurrentThreadScheduler.queue = nil } guard let queue = CurrentThreadScheduler.queue else { return disposable } while let latest = queue.value.dequeue() { if latest.disposed { continue } latest.invoke() } return disposable } let existingQueue = CurrentThreadScheduler.queue let queue: RxMutableBox<Queue<ScheduledItemType>> if let existingQueue = existingQueue { queue = existingQueue } else { queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1)) CurrentThreadScheduler.queue = queue } let scheduledItem = ScheduledItem(action: action, state: state) queue.value.enqueue(scheduledItem) return scheduledItem } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
其實主要是根據CurrentThreadScheduler.isScheduleRequired參數來選擇是否需要當前線程運行,如果需要,首調用action方法,而這個action方法其實就是onNext、onError、onCompelete方法。然后做了一個延遲清除(defer)和一個判斷(guard)。然后循環一個queue其實主要是看看是否還有沒有執行完的onNext時間。latest.invoke()其實就是做action(state),然后返回Disposable。如果不需要,則組合queue,生成Disposable返回。接下來我們看看怎么設置線程執行的,首選看看subscribleOn方法,這個方法就是指定接下來事情要發生在那個線程中,具體看一下代碼:
public func observeOn(scheduler: ImmediateSchedulerType) -> Observable<E> { if let scheduler = scheduler as? SerialDispatchQueueScheduler { return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler) } else { return ObserveOn(source: self.asObservable(), scheduler: scheduler) } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
方法是定義在extension ObservableType 中的,它指定ObservableType 運行線程,這里面指定了兩種運行方式,第一種是運行ObserveOnSerialDispatchQueue,第二種是ObserveOn這兩個都繼承自Producer,上面我們已經敘述了Producer,不管是ObserveOnSerialDispatchQueue還是ObserveOn都重寫了run方法,他們返回的都是ObserverBase。ObserverBase其實就是在執行onNext、onError、onComplete方法。
class ObserverBase<ElementType> : Disposable, ObserverType {
typealias E = ElementType
private var _isStopped: AtomicInt = 0 func on(event: Event<E>) { switch event { case .Next: if _isStopped == 0 { onCore(event) } case .Error, .Completed: if !AtomicCompareAndSwap(0, 1, &_isStopped) { return } onCore(event) } } func onCore(event: Event<E>) { abstractMethod() } func dispose() { _isStopped = 1 } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
onCore方法是由繼承者實現,比如在ObserveOnSink類中及具體實現了onCore方法
override func onCore(event: Event<E>) { let shouldStart = _lock.calculateLocked { () -> Bool in self._queue.enqueue(event) switch self._state { case .Stopped: self._state = .Running return true case .Running: return false } } if shouldStart { _scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run) } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
這個onCore方法是判斷當前運行到那一步(onNext,onError,onComplete)。現在我們回過頭來看Producer中的subscribe其實就是執行事件,只不過這個事件是在某個線程上執行的。我們可以繪制一個簡單的流程圖描述這些。
Observable執行subscribleOn方法,產生一個新的Observable,這個新Observable是Produce,他繼承了Observable,當Observable執行subscrible方法的時候,會根據線程來執行,如果指定了線程,那么就會通過run方法去執行事件。如果沒有指定線程,就用當前線程執行run方法去執行事件。當然如果要用到變換或者過濾,也可以通過指定線程來執行變換和過濾,其原理是一樣的。
RxSwift的觀察者對象的合並(Conbinate Observable)和鏈接器(Connect Observable)
對觀察着合並就是將多個觀察着(Observables)合並起來處理,使用起來更方便。它主要由以下方法:
- merge
- startWith
- switchLatest
- combineLatest
- zip
鏈接器
- multicast
- publish
- refCount
- replay
- shareReplay
當然為了將多個相同類型觀察者對象合並起來處理,可以極大減少重復代碼的工作量。從本節開始我們將會敘述觀察者對象的合並和發布。
1、merge
從圖中很容易看出merge方法就是將多個Observable對象合並處理。
public func merge() -> Observable<E.E> { return Merge(source: asObservable()) }
- 1
- 2
- 3
例子:
Observable.of( Observable.of("a", "b"), Observable.of("c", "d"), Observable.of("e", "f"), Observable.of("g", "h")) .merge() .subscribeNext { (str) -> Void in print(str) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
結果:
a b c d e f g h
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
merge方法看起來特別簡單。
2、startWith
startWith方法可以說是定制開始位置的,是一種比較特殊的merge方法。
public func startWith(elements: E ...) -> Observable<E> { return StartWith(source: self.asObservable(), elements: elements) }
- 1
- 2
- 3
- 4
startWith方法其實就是指定一個特殊的開頭,
例子:
Observable.of( Observable.of("a", "b"), Observable.of("c", "d"), Observable.of("e", "f"), Observable.of("g", "h")) .merge() .startWith("x") .subscribeNext { (str) -> Void in print(str) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
結果:
x
a b c d e f g h
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
3、switchLatest
RxSwift中switchLatest相當與其他語言的switch方法,從圖中可以很明顯的看出來,第一個序列的最后一個元素被去掉了,沒有執行OnNext方法。
public func switchLatest() -> Observable<E.E> { return Switch(source: asObservable()) }
- 1
- 2
- 3
例子:
let var1 = Variable(0) let var2 = Variable(200) // var3 is like an Observable<Observable<Int>> let var3 = Variable(var1.asObservable()) let d = var3 .asObservable() .switchLatest() .subscribeNext { (str) -> Void in print(str) } var1.value = 1 var1.value = 2 var1.value = 3 var1.value = 4 var3.value = var2.asObservable() var2.value = 201 var1.value = 5 var1.value = 6 var1.value = 7
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
結果:
0
1
2
3
4
200
201
- 1
- 2
- 3
- 4
- 5
- 6
- 7
Variable是一個類,它里面包含了一個Observable對象(BehaviorSubject),另外Variable中還是實現了asObservable方法,而這個方法返回的就是里面的Observable對象。Variable源碼很簡單,這里不做特別的介紹。至於var1.value=5\6\7沒有執行,這個正是switchLatest的作用,當var1的作用完成后,切換到var2的Observable。那么var1后續變化,是不會通知到var3的。
4、combineLatest
combineLatest和其他方法一樣都是擴展自ObservableType協議,
public func combineLatest<R>(resultSelector: [Generator.Element.E] throws -> R) -> Observable<R> { return CombineLatestCollectionType(sources: self, resultSelector: resultSelector) }
- 1
- 2
- 3
例子:
let var1 = Variable(0) let var2 = Variable(200) // var3 is like an Observable<Observable<Int>> let var3 = Variable(var1.asObservable()) let d = var3 .asObservable() .switchLatest() .subscribeNext { (str) -> Void in print(str) } var1.value = 1 var1.value = 2 var1.value = 3 var1.value = 4 var3.value = var2.asObservable() var2.value = 201 var1.value = 5 var1.value = 6 var1.value = 7 Observable.combineLatest(var1.asObservable(), var2.asObservable()) { (as1, as2) -> Int in return as1 + as2 } .subscribeNext { (mon) -> Void in print(mon) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
輸出結果:
208
- 1
簡單的分析一下,就可以看出來,var2.value = 201; var1.value = 7,最后就是208;combineLatest是將多個Observable方法按照一定意願組合起來。它提供了開發者組合的方法,自己實現就行了。
5、zip
zip和combineLatest差不多,可以將多個Observable合並起來處理,上面的例子同樣可以用zip來實現,具體看例子,下面是zip方法的源碼
public static func zip<O1: ObservableType, O2: ObservableType>
(source1: O1, _ source2: O2, resultSelector: (O1.E, O2.E) throws -> E) -> Observable<E> { return Zip2( source1: source1.asObservable(), source2: source2.asObservable(), resultSelector: resultSelector ) }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
例子:
Observable.zip(var1.asObservable(), var2.asObservable()) { (s1, s2) -> Int in return s1+s2 } .subscribeNext { (mon) -> Void in print(mon) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
當所有的工作都做完之后,我們要的觀察着對象進行發布,那么這個時候就要用到Connect,它是鏈接觀察着對象和被觀察着之間的一個鏈接器。它主要由以下方法:
1、multicast/publish
multicast和publish方法一樣,它們都是通過發布/多播將Observable發出去,當然發出去必須要有一個連接(connect)的過程。只有鏈接的對象才會收到publish/multicast的通知。下面是源碼:
public func multicast<S: SubjectType where S.SubjectObserverType.E == E>(subject: S) -> ConnectableObservable<S.E> { return ConnectableObservableAdapter(source: self.asObservable(), subject: subject) } public func publish() -> ConnectableObservable<E> { return self.multicast(PublishSubject()) }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
例子:
let subject1 = PublishSubject<Int>()
_ = subject1
.subscribe { print("Subject \($0)") } let int1 = Observable<Int>.interval(1, scheduler: MainScheduler.instance) .multicast(subject1) _ = int1 .subscribe { print("first subscription \($0)") } int1.connect()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
這個例子通過使用一個interval方法一直放送,然后通過multicast將subject1通知(多播)出去,int1.subscribe來接受subject1的變化。
輸出的結果:
Subject Next(0) first subscription Next(0) Subject Next(1) first subscription Next(1) Subject Next(2) first subscription Next(2) Subject Next(3) first subscription Next(3) Subject Next(4) first subscription Next(4)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
當然publish也有類似的功能。
let subject1 = PublishSubject<Int>()
_ = subject1
.subscribe { print("Subject \($0)") } let int1 = Observable<Int>.interval(1, scheduler: MainScheduler.instance) .publish() _ = int1 .subscribe { print("first subscription \($0)") } int1.connect()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
輸出結果:
first subscription Next(0) first subscription Next(1) first subscription Next(2) first subscription Next(3) first subscription Next(4) first subscription Next(5) first subscription Next(6) first subscription Next(7) first subscription Next(8) first subscription Next(9) first subscription Next(10)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
publish方法只是將發布出去的結果的變化告知,原變化沒有告知出來,當然這個事publish和multicast一點小區別。
2、refCount
refCount是結合了publish方法使用的,當Observable發布出去,通過一個引用計數(refCount)方法來記錄,其實refCount就是相當於connect方法。refCount源碼如下,
public func refCount() -> Observable<E> { return RefCount(source: self) }
- 1
- 2
- 3
例子:
let int1 = Observable<Int>.interval(1, scheduler: MainScheduler.instance) .publish() .refCount() .subscribeNext({ (ss) -> Void in print(ss) })
- 1
- 2
- 3
- 4
- 5
- 6
輸出結果:
0 1 2 3 4 ...
- 1
- 2
- 3
- 4
- 5
- 6
3、replay/shareReplay
其實replay和multicast是一樣,在其源碼中其實也是調用multicast方法。起源碼如下:
public func replay(bufferSize: Int) -> ConnectableObservable<E> { return self.multicast(ReplaySubject.create(bufferSize: bufferSize)) } public func shareReplay(bufferSize: Int) -> Observable<E> { if bufferSize == 1 { return ShareReplay1(source: self.asObservable()) } else { return self.replay(bufferSize).refCount() } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
只不過,之前multicast傳入的PublishSubject,這里是ReplaySubject,兩者其實區別不大。
ReplaySubject中調用了ReplayMany,ReplayMany有一個事件隊列來輪循事件。ReplaySubject和ReplayMany比較簡單,這里不再敘述。
例子:
let int2 = Observable<Int>.interval(1, scheduler: MainScheduler.instance) .replay(1) _ = int2.subscribeNext({ (ss) -> Void in print(ss) }) int2.connect() _ = int2.subscribeNext({ (ss) -> Void in print("a..\(ss)") })
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
輸出結果:
0 a..0 1 a..1 2 a..2 3 a..3 4 a..4 5 a..5
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
其中shareReplay和replay是一回事,從源碼中酒可以看出來,shareReplay是replay和refCount的組合,這里不再敘述。
至此為止,關於Swift的一些基本用法和基本的概念都講述完畢,當然還有一些相關的擴展,但是這個都和上面講述原理是一樣的,大家可以參考源碼理解。
RxSwift、RxCocoa
從本節開始講敘述與RxSwift配套使用的RxCocoa,RxCocoa主要是針對上層Ui做擴展,這些擴展主要是將上面所敘述的東西結合Ui控件使用。將RxSwift和RxCocoa結合起來使用對ios開發可以節省很大開發的時間,RxCocoa擴展性和RxSwift一樣靈活,可以針對不同的業務進行不同的擴展,很方便開發者使用。由於本人對於ios sdk不是很了解。所有這里不具體討論
總結
整篇博客寫下來字數達到2w多,可以當作一個小論文了,作為一個swift新手,我花了10天左右的時間熟悉swift基本語法,為了能夠更加熟悉swift,於是選擇研究RxSwift,已達到更加熟練掌握swift的目的,整篇博客下下來花費時間就長達一個多月,很多swift的基本東西看完之后就忘記,又不得不重新看起語法來。不過總算完成了,當然研究完RxSwift,我更加可以熟練的swift進行ios開發。之后我會繼續研究ReactiveX其他語言。期待越來越好吧。
參考鏈接:http://m.blog.csdn.net/article/details?id=50823558