- 原文出自《RxJava Essentials》
- 原文作者 : Ivan Morgillo
- 譯文出自 : 開發技術前線 www.devtf.cn
- 轉載聲明: 本譯文已授權開發者頭條享有獨家轉載權,未經允許,不得轉載!
- 譯者 : yuxingxin
- 項目地址 : RxJava-Essentials-CN
為什么是Observables?
在面向對象的架構中,開發者致力於創建一組解耦的實體。這樣的話,實體就可以在不用妨礙整個系統的情況下可以被測試、復用和維護。設計這種系統就帶來一個棘手的負面影響:維護相關對象之間的統一。
在Smalltalk MVC架構中,創建模式的第一個例子就是用來解決這個問題的。用戶界面框架提供一種途徑使UI元素與包含數據的實體對象相分離,並且同時,它提供一種靈活的方法來保持它們之間的同步。
在這本暢銷的四人組編寫的《設計模式——可復用面向對象軟件的基礎》一書中,觀察者模式是最有名的設計模式之一。它是一種行為模式並提供一種以一對多的依賴來綁定對象的方法:即當一個對象發生變化時,依賴它的所有對象都會被通知並且會自動更新。
在本章中,我們將會對觀察者模式有一個概述,它是如何實現的以及如何用RxJava來擴展,Observable是什么,以及Observables如何與Iterables相關聯。
觀察者模式
在今天,觀察者模式是出現的最常用的軟件設計模式之一。它基於subject這個概念。subject是一種特殊對象,當它改變時,那些由它保存的一系列對象將會得到通知。而這一系列對象被稱作Observers,它們會對外暴漏了一個通知方法,當subject狀態發生變化時會調用的這個方法。
在上一章中,我們看到了電子表單的例子。現在我們可以展開這個例子講,展示一個更復雜的場景。讓我們考慮這樣一個填着賬戶數據的電子表單。我們可以把這些數據比作一張表,或者是3D柱狀圖,或者是餅狀圖。它們中每一個代表的意義都取決於同一組要展示的數據。每一個都是一個觀察者,都依賴於那一個subject,維護着全部信息。
3D柱狀圖這個類、餅狀圖類、表這個類以及維護這些數據的類是完全解耦的:它們彼此相互獨立復用,但也能協同工作。這些表示類彼此不清楚對方,但是正如它們所做的:它們知道在哪能找到它們需要展示的信息,它們也知道一旦數據發生變化就通知需要更新數據表示的那個類。
這有一張圖描述了Subject/Observer的關系是怎樣的一對多的關系:
上面這張圖展示了一個Subject為3個Observers提供服務。很明顯,沒有理由去限制Observers的數量:如果有需要,一個Subject可以有無限多個Observers,當subject狀態發生變化時,這些Observers中的每一個都會收到通知。
你什么時候使用觀察者模式?
觀察者模式很適合下面這些場景中的任何一個:
- 當你的架構有兩個實體類,一個依賴另一個,你想讓它們互不影響或者是獨立復用它們時。
- 當一個變化的對象通知那些與它自身變化相關聯的未知數量的對象時。
- 當一個變化的對象通知那些無需推斷具體類型的對象時。
RxJava觀察者模式工具包
在RxJava的世界里,我們有四種角色:
* Observable
* Observer
* Subscriber
* Subjects
Observables和Subjects是兩個“生產”實體,Observers和Subscribers是兩個“消費”實體。
Observable
當我們異步執行一些復雜的事情,Java提供了傳統的類,例如Thread、Future、FutureTask、CompletableFuture來處理這些問題。當復雜度提升,這些方案就會變得麻煩和難以維護。最糟糕的是,它們都不支持鏈式調用。
RxJava Observables被設計用來解決這些問題。它們靈活,且易於使用,也可以鏈式調用,並且可以作用於單個結果程序上,更有甚者,也可以作用於序列上。無論何時你想發射單個標量值,或者一連串值,甚至是無窮個數值流,你都可以使用Observable。
Observable的生命周期包含了三種可能的易於與Iterable生命周期事件相比較的事件,下表展示了如何將Observable async/push 與 Iterable sync/pull相關聯起來。
Event | Iterable(pull) | Observable(push) |
---|---|---|
檢索數據 | T next() |
onNext(T) |
發現錯誤 | throws Exception |
onError(Throwable) |
完成 | !hasNext() |
onCompleted() |
使用Iterable時,消費者從生產者那里以同步的方式得到值,在這些值得到之前線程處於阻塞狀態。相反,使用Observable時,生產者以異步的方式把值推給觀察者,無論何時,這些值都是可用的。這種方法之所以更靈活是因為即便值是同步或異步方式到達,消費者在這兩種場景都可以根據自己的需要來處理。
為了更好地復用Iterable接口,RxJava Observable類擴展了GOF觀察者模式的語義。引入了兩個新的接口:
* onCompleted() 即通知觀察者Observable沒有更多的數據。
* onError() 即觀察者有錯誤出現了。
熱Observables和冷Observables
從發射物的角度來看,有兩種不同的Observables:熱的和冷的。一個”熱”的Observable典型的只要一創建完就開始發射數據,因此所有后續訂閱它的觀察者可能從序列中間的某個位置開始接受數據(有一些數據錯過了)。一個”冷”的Observable會一直等待,直到有觀察者訂閱它才開始發射數據,因此這個觀察者可以確保會收到整個數據序列。
創建一個Observable
在接下來的小節中將討論Observables提供的兩種創建Observable的方法。
Observable.create()
create()方法使開發者有能力從頭開始創建一個Observable。它需要一個OnSubscribe對象,這個對象繼承Action1,當觀察者訂閱我們的Observable時,它作為一個參數傳入並執行call()函數。
1
2
3
4
5
6
7
|
Observable.create(new Observable.OnSubscribe<Object>(){
@Override
public void call(Subscriber<? super Object> subscriber) {
}
});
|
Observable通過使用subscriber變量並根據條件調用它的方法來和觀察者通信。讓我們看一個“現實世界”的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
Observable<Integer> observableString = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
for (int i = 0; i < 5; i++) {
observer.onNext(i);
}
observer.onCompleted();
}
});
Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no! Something wrong happened!");
}
@Override
public void onNext(Integer item) {
System.out.println("Item is " + item);
}
});
|
例子故意寫的簡單,是因為即便是你第一次見到RxJava的操作,我想讓你明白接下來要發生什么。
我們創建一個新的Observable<Integer>
,它執行了5個元素的for循環,一個接一個的發射他們,最后完成。
另一方面,我們訂閱了Observable,返回一個Subscription
。一旦我們訂閱了,我們就開始接受整數,並一個接一個的打印出它們。我們並不知道要接受多少整數。事實上,我們也無需知道是因為我們為每種場景都提供對應的處理操作:
* 如果我們接收到了整數,那么就打印它。
* 如果序列結束,我們就打印一個關閉的序列信息。
* 如果錯誤發生了,我們就打印一個錯誤信息。
Observable.from()
在上一個例子中,我們創建了一個整數序列並一個一個的發射它們。假如我們已經有一個列表呢?我們是不是可以不用for循環而也可以一個接一個的發射它們呢?
在下面的例子代碼中,我們從一個已有的列表中創建一個Observable序列:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
List<Integer> items = new ArrayList<Integer>();
items.add(1);
items.add(10);
items.add(100);
items.add(200);
Observable<Integer> observableString = Observable.from(items);
Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no! Something wrong happened!");
}
@Override
public void onNext(Integer item) {
System.out.println("Item is " + item);
}
});
|
輸出的結果和上面的例子絕對是一樣的。
from()
創建符可以從一個列表/數組來創建Observable,並一個接一個的從列表/數組中發射出來每一個對象,或者也可以從Java Future
類來創建Observable,並發射Future對象的.get()
方法返回的結果值。傳入Future
作為參數時,我們可以指定一個超時的值。Observable將等待來自Future
的結果;如果在超時之前仍然沒有結果返回,Observable將會觸發onError()
方法通知觀察者有錯誤發生了。
Observable.just()
如果我們已經有了一個傳統的Java函數,我們想把它轉變為一個Observable又改怎么辦呢?我們可以用create()
方法,正如我們先前看到的,或者我們也可以像下面那樣使用以此來省去許多模板代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
Observable<String> observableString = Observable.just(helloWorld());
Subscription subscriptionPrint = observableString.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no! Something wrong happened!");
}
@Override
public void onNext(String message) {
System.out.println(message);
}
});
|
helloWorld()
方法比較簡單,像這樣:
1
2
3
4
|
private String helloWorld(){
return "Hello World";
}
|
不管怎樣,它可以是我們想要的任何函數。在剛才的例子中,我們一旦創建了Observable,just()
執行函數,當我們訂閱Observable時,它就會發射出返回的值。
just()
方法可以傳入一到九個參數,它們會按照傳入的參數的順序來發射它們。just()
方法也可以接受列表或數組,就像from()
方法,但是它不會迭代列表發射每個值,它將會發射整個列表。通常,當我們想發射一組已經定義好的值時會用到它。但是如果我們的函數不是時變性的,我們可以用just來創建一個更有組織性和可測性的代碼庫。
最后注意just()
創建符,它發射出值后,Observable正常結束,在上面那個例子中,我們會在控制台打印出兩條信息:“Hello World”和“Observable completed”。
Observable.empty(),Observable.never(),和Observable.throw()
當我們需要一個Observable毫無理由的不再發射數據正常結束時,我們可以使用empty()
。我們可以使用never()
創建一個不發射數據並且也永遠不會結束的Observable。我們也可以使用throw()
創建一個不發射數據並且以錯誤結束的Observable。
Subject = Observable + Observer
subject
是一個神奇的對象,它可以是一個Observable同時也可以是一個Observer:它作為連接這兩個世界的一座橋梁。一個Subject可以訂閱一個Observable,就像一個觀察者,並且它可以發射新的數據,或者傳遞它接受到的數據,就像一個Observable。很明顯,作為一個Observable,觀察者們或者其它Subject都可以訂閱它。
一旦Subject訂閱了Observable,它將會觸發Observable開始發射。如果原始的Observable是“冷”的,這將會對訂閱一個“熱”的Observable變量產生影響。
RxJava提供四種不同的Subject:
* PublishSubject
* BehaviorSubject
* ReplaySubject.
* AsyncSubject
PublishSubject
Publish是Subject的一個基礎子類。讓我們看看用PublishSubject實現傳統的ObservableHello World
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
PublishSubject<String> stringPublishSubject = PublishSubject.create();
Subscription subscriptionPrint = stringPublishSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no!Something wrong happened!");
}
@Override
public void onNext(String message) {
System.out.println(message);
}
});
stringPublishSubject.onNext("Hello World");
|
在剛才的例子中,我們創建了一個PublishSubject
,用create()
方法發射一個String
值,然后我們訂閱了PublishSubject。此時,沒有數據要發送,因此我們的觀察者只能等待,沒有阻塞線程,也沒有消耗資源。就在這隨時准備從subject接收值,如果subject沒有發射值那么我們的觀察者就會一直在等待。再次聲明的是,無需擔心:觀察者知道在每個場景中該做什么,我們不用擔心什么時候是因為它是響應式的:系統會響應。我們並不關心它什么時候響應。我們只關心它響應時該做什么。
最后一行代碼展示了手動發射字符串“Hello World”,它觸發了觀察者的onNext()
方法,讓我們在控制台打印出“Hello World”信息。
讓我們看一個更復雜的例子。話說我們有一個private
聲明的Observable,外部不能訪問。Observable在它生命周期內發射值,我們不用關心這些值,我們只關心他們的結束。
首先,我們創建一個新的PublishSubject來響應它的onNext()
方法,並且外部也可以訪問它。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
final PublishSubject<Boolean> subject = PublishSubject.create();
subject.subscribe(new Observer<Boolean>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Boolean aBoolean) {
System.out.println("Observable Completed");
}
});
|
然后,我們創建“私有”的Observable,只有subject才可以訪問的到。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}).doOnCompleted(new Action0() {
@Override
public void call() {
subject.onNext(true);
}
}).subscribe();
|
Observable.create()
方法包含了我們熟悉的for循環,發射數字。doOnCompleted()
方法指定當Observable結束時要做什么事情:在subject上發射true。最后,我們訂閱了Observable。很明顯,空的subscribe()
調用僅僅是為了開啟Observable,而不用管已發出的任何值,也不用管完成事件或者錯誤事件。為了這個例子我們需要它像這樣。
在這個例子中,我們創建了一個可以連接Observables並且同時可被觀測的實體。當我們想為公共資源創建獨立、抽象或更易觀測的點時,這是極其有用的。
BehaviorSubject
簡單的說,BehaviorSubject會首先向他的訂閱者發送截至訂閱前最新的一個數據對象(或初始值),然后正常發送訂閱后的數據流。
1
2
|
BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(1);
|
在這個短例子中,我們創建了一個能發射整形(Integer)的BehaviorSubject。由於每當Observes訂閱它時就會發射最新的數據,所以它需要一個初始值。
ReplaySubject
ReplaySubject會緩存它所訂閱的所有數據,向任意一個訂閱它的觀察者重發:
1
2
|
ReplaySubject<Integer> replaySubject = ReplaySubject.create();
|
AsyncSubject
當Observable完成時AsyncSubject只會發布最后一個數據給已經訂閱的每一個觀察者。
1
2
|
AsyncSubject<Integer> asyncSubject = AsyncSubject.create();
|
總結
本章中,我們了解到了什么是觀察者模式,為什么Observables在今天的編程場景中如此重要,以及如何創建Observables和subjects。
下一章中,我們將創建第一個基於RxJava的Android應用程序,學習如何檢索數據來填充listview,以及探索如何創建一個基於RxJava的響應式UI。