內容為整理博主文章:https://juejin.im/user/58870f04128fe10065efc8d9/article 個人覺得他對Operators的解說較容易理解和全面,顧把它們整理在一起,也方面查找。
Operators:Observable 的 Operators 是實例應用上最重要的部份,我們需要了解各種 Operators 的使用方式,才能輕松實現各種需求!Operators 就是一個個被附加到 Observable 型別的函數。
Marble diagrams:我們把描繪 observable 的圖示稱為 Marble diagrams。
我們用 來表達一小段時間,這些 串起就代表一個 observable
----------------
X (大寫 X)則代表有錯誤發生
---------------X
| 則代表 observable 結束
----------------|
小括號代表着同步發生
map
Observable 的 map 方法使用上跟數組的 map 是一樣的,我們傳入一個 callback function,這個 callback function 會帶入每次發發送來的元素,然后我們回傳新的元素,如下
var source = Rx.Observable.interval(1000);
var newest = source.map(x => x + 1);
newest.subscribe(console.log);
// 1
// 2
// 3
// 4
// 5..
用 Marble diagrams 表達就是
source: -----0-----1-----2-----3--...
map(x => x + 1)
newest: -----1-----2-----3-----4--...
mapTo
mapTo 可以把傳進來的值改成一個固定的值,如下
var source = Rx.Observable.interval(1000);
var newest = source.mapTo(2);
newest.subscribe(console.log);
// 2
// 2
// 2
// 2..
用 Marble diagrams 表達就是
source: -----0-----1-----2-----3--...
mapTo(2)
newest: -----2-----2-----2-----2--...
filter
filter 在使用上也跟數組的相同,我們要傳入一個 callback function,這個 function 會傳入每個被發送的元素,並且回傳一個 boolean 值,如果為 true 的話就會保留,如果為 false 就會被濾掉,如下
var source = Rx.Observable.interval(1000);
var newest = source.filter(x => x % 2 === 0);
newest.subscribe(console.log);
// 0
// 2
// 4
// 6..
用 Marble diagrams 表達就是
source: -----0-----1-----2-----3-----4-...
filter(x => x % 2 === 0)
newest: -----0-----------2-----------4-...
skip
可以略過前幾個發送元素的 operator: skip,示例如下:
var source = Rx.Observable.interval(1000);
var example = source.skip(3);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 3
// 4
// 5...
原本從 0 開始的就會變成從 3 開始,但是記得原本元素的等待時間仍然存在,也就是說此示例第一個取得的元素需要等 4 秒,用 Marble Diagram 表示如下
source : ----0----1----2----3----4----5--....
skip(3)
example: -------------------3----4----5--...
takeLast
除了可以用 take 取前幾個之外,我們也可以倒過來取最后幾個,示例如下:
var source = Rx.Observable.interval(1000).take(6);
var example = source.takeLast(2);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 4
// 5
// complete
這里我們先取了前 6 個元素,再取最后兩個。所以最后會發送 4, 5, complete,這里有一個重點,就是 takeLast 必須等到整個 observable 完成(complete),才能知道最后的元素有哪些,並且同步發送,如果用 Marble Diagram 表示如下
source : ----0----1----2----3----4----5|
takeLast(2)
example: ------------------------------(45)|
這里可以看到 takeLast 后,比須等到原本的 observable 完成后,才立即同步發送 4, 5, complete
last
跟 take(1) 相同,我們有一個 takeLast(1) 的簡化寫法,那就是 last() 用來取得最后一個元素
var source = Rx.Observable.interval(1000).take(6);
var example = source.last();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 5
// complete
用 Marble Diagram 表示如下
source : ----0----1----2----3----4----5|
last()
example: ------------------------------(5)|
concat
concat 可以把多個 observable 實例合並成一個,示例如下
var source = Rx.Observable.interval(1000).take(3);
var source2 = Rx.Observable.of(3)
var source3 = Rx.Observable.of(4,5,6)
var example = source.concat(source2, source3);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// 4
// 5
// 6
// complete
跟 concatAll 一樣,必須先等前一個 observable 完成(complete),才會繼續下一個,用 Marble Diagram 表示如下
source : ----0----1----2|
source2: (3)|
source3: (456)|
concat()
example: ----0----1----2(3456)|
另外 concat 還可以當作靜態方法使用
var source = Rx.Observable.interval(1000).take(3);
var source2 = Rx.Observable.of(3);
var source3 = Rx.Observable.of(4,5,6);
var example = Rx.Observable.concat(source, source2, source3);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
startWith
startWith 可以在 observable 的一開始塞要發送的元素,有點像 concat 但參數不是 observable 而是要發送的元素,使用示例如下
var source = Rx.Observable.interval(1000);
var example = source.startWith(0);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 0
// 1
// 2
// 3...
這里可以看到我們在 source 的一開始塞了一個 0,讓 example 會在一開始就立即發送 0,用 Marble Diagram 表示如下
source : ----0----1----2----3--...
startWith(0)
example: (0)----0----1----2----3--...
記得 startWith 的值是一開始就同步發出的,這個 operator 很常被用來保存程序的起始狀態
merge
merge 跟 concat 一樣都是用來合並 observable,但他們在行為上有非常大的不同!
讓我們直接來看例子吧
var source = Rx.Observable.interval(500).take(3);
var source2 = Rx.Observable.interval(300).take(6);
var example = source.merge(source2);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 0
// 1
// 2
// 1
// 3
// 2
// 4
// 5
// complete
上面可以看得出來,merge 把多個 observable 同時處理,這跟 concat 一次處理一個 observable 是完全不一樣的,由於是同時處理行為會變得較為復雜,這里我們用 Marble Diagram 會比較好解釋
source : ----0----1----2|
source2: --0--1--2--3--4--5|
merge()
example: --0-01--21-3--(24)--5|
這里可以看到 merge 之后的 example 在時間序上同時在跑 source 與 source2,當兩件事情同時發生時,會同步發送資料(被 merge 的在后面),當兩個 observable 都結束時才會真的結束。
merge 同樣可以當作靜態方法用
var source = Rx.Observable.interval(500).take(3);
var source2 = Rx.Observable.interval(300).take(6);
var example = Rx.Observable.merge(source, source2);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
merge 的邏輯有點像是 OR(||),就是當兩個 observable 其中一個被觸發時都可以被處理,這很常用在一個以上的按鈕具有部分相同的行為。
例如一個影片播放器有兩個按鈕,一個是暫停(II),另一個是結束播放(口)。這兩個按鈕都具有相同的行為就是影片會被停止,只是結束播放會讓影片回到 00 秒,這時我們就可以把這兩個按鈕的事件 merge 起來處理影片暫停這件事。
var stopVideo = Rx.Observable.merge(stopButton, endButton);
stopVideo.subscribe(() => {
// 暫停播放影片
})
combineLatest
它會取得各個 observable 最后送出的值,再輸出成一個值,我們直接看示例會比較好解釋
var source = Rx.Observable.interval(500).take(3);
var newest = Rx.Observable.interval(300).take(6);
var example = source.combineLatest(newest, (x, y) => x + y);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// complete
第一次看到這個 output 應該都會很困惑,我們直接來看 Marble Diagram 吧
source : ----0----1----2|
newest : --0--1--2--3--4--5|
combineLatest(newest, (x, y) => x + y);
example: ----01--23-4--(56)--7|
首先 combineLatest 可以接收多個 observable,最后一個參數是 callback function,這個 callback function 接收的參數數量跟合並的 observable 數量相同,依照示例來說,因為我們這里合並了兩個 observable 所以后面的 callback function 就接收 x, y 兩個參數,x 會接收從 source 發送出來的值,y 會接收從 newest 發送出來的值。
最后一個重點就是一定會等兩個 observable 都曾有送值出來才會呼叫我們傳入的 callback,所以這段程式是這樣運行的
- newest 送出了 0,但此時 source 並沒有送出過任何值,所以不會執行 callback
- source 送出了 0,此時 newest 最后一次送出的值為 0,把這兩個數傳入 callback 得到 0。
- newest 送出了 1,此時 source 最后一次送出的值為 0,把這兩個數傳入 callback 得到 1。
- newest 送出了 2,此時 source 最后一次送出的值為 0,把這兩個數傳入 callback 得到 2。
- source 送出了 1,此時 newest 最后一次送出的值為 2,把這兩個數傳入 callback 得到 3。
- newest 送出了 3,此時 source 最后一次送出的值為 1,把這兩個數傳入 callback 得到 4。
- source 送出了 2,此時 newest 最后一次送出的值為 3,把這兩個數傳入 callback 得到 5。
- source 結束,但 newest 還沒結束,所以 example 還不會結束。
- newest 送出了 4,此時 source 最后一次送出的值為 2,把這兩個數傳入 callback 得到 6。
- newest 送出了 5,此時 source 最后一次送出的值為 2,把這兩個數傳入 callback 得到 7。
- newest 結束,因為 source 也結束了,所以 example 結束。
不管是 source 還是 newest 送出值來,只要另一方曾有送出過值(有最后的值),就會執行 callback 並送出新的值,這就是 combineLatest。
combineLatest 很常用在運算多個因子的結果,例如最常見的 BMI 計算,我們身高變動時就拿上一次的體重計算新的 BMI,當體重變動時則拿上一次的身高計算 BMI,這就很適合用 combineLatest 來處理!
zip
zip 會取每個 observable 相同順位的元素並傳入 callback,也就是說每個 observable 的第 n 個元素會一起被傳入 callback,這里我們同樣直接用示例講解會比較清楚
var source = Rx.Observable.interval(500).take(3);
var newest = Rx.Observable.interval(300).take(6);
var example = source.zip(newest, (x, y) => x + y);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 2
// 4
// complete
Marble Diagram 長這樣
source : ----0----1----2|
newest : --0--1--2--3--4--5|
zip(newest, (x, y) => x + y)
example: ----0----2----4|
以我們的示例來說,zip 會等到 source 跟 newest 都送出了第一個元素,再傳入 callback,下次則等到 source 跟 newest 都送出了第二個元素再一起傳入 callback,所以運行的步驟如下:
-
newest 送出了第一個值 0,但此時 source 並沒有送出第一個值,所以不會執行 callback。
-
source 送出了第一個值 0,newest 之前送出的第一個值為 0,把這兩個數傳入 callback 得到 0。
-
newest 送出了第二個值 1,但此時 source 並沒有送出第二個值,所以不會執行 callback。
-
newest 送出了第三個值 2,但此時 source 並沒有送出第三個值,所以不會執行 callback。
-
source 送出了第二個值 1,newest 之前送出的第二個值為 1,把這兩個數傳入 callback 得到 2。
-
newest 送出了第四個值 3,但此時 source 並沒有送出第四個值,所以不會執行 callback。
-
source 送出了第三個值 2,newest 之前送出的第三個值為 2,把這兩個數傳入 callback 得到 4。
-
source 結束 example 就直接結束,因為 source 跟 newest 不會再有對應順位的值
zip 會把各個 observable 相同順位送出的值傳入 callback,這很常拿來做 demo 使用,比如我們想要間隔 100ms 送出 'h', 'e', 'l', 'l', 'o',就可以這么做
var source = Rx.Observable.from('hello');
var source2 = Rx.Observable.interval(100);
var example = source.zip(source2, (x, y) => x);
這里的 Marble Diagram 就很簡單
source : (hello)|
source2: -0-1-2-3-4-...
zip(source2, (x, y) => x)
example: -h-e-l-l-o|
這里我們利用 zip 來達到原本只能同步送出的資料變成了非同步的,很適合用在建立示范用的資料
建議大家平常沒事不要亂用 zip,除非真的需要。因為 zip 必須 cache 住還沒處理的元素,當我們兩個 observable 一個很快一個很慢時,就會 cache 非常多的元素,等待比較慢的那個 observable。這很有可能造成內存相關的問題
withLatestFrom
withLatestFrom 運行方式跟 combineLatest 有點像,只是他有主從的關系,只有在主要的 observable 送出新的值時,才會執行 callback,附隨的 observable 只是在背景下運行。讓我們看一個例子
var main = Rx.Observable.from('hello').zip(Rx.Observable.interval(500), (x, y) => x);
var some = Rx.Observable.from([0,1,0,0,0,1]).zip(Rx.Observable.interval(300), (x, y) => x);
var example = main.withLatestFrom(some, (x, y) => {
return y === 1 ? x.toUpperCase() : x;
});
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
先看一下 Marble Diagram
main : ----h----e----l----l----o|
some : --0--1--0--0--0--1|
withLatestFrom(some, (x, y) => y === 1 ? x.toUpperCase() : x);
example: ----h----e----l----L----O|
withLatestFrom 會在 main 送出值的時候執行 callback,但請注意如果 main 送出值時 some 之前沒有送出過任何值 callback 仍然不會執行!
這里我們在 main 送出值時,去判斷 some 最后一次送的值是不是 1 來決定是否要切換大小寫,執行步驟如下
- main 送出了 h,此時 some 上一次送出的值為 0,把這兩個參數傳入 callback 得到 h。
- main 送出了 e,此時 some 上一次送出的值為 0,把這兩個參數傳入 callback 得到 e。
- main 送出了 l,此時 some 上一次送出的值為 0,把這兩個參數傳入 callback 得到 l。
- main 送出了 l,此時 some 上一次送出的值為 1,把這兩個參數傳入 callback 得到 L。
- main 送出了 o,此時 some 上一次送出的值為 1,把這兩個參數傳入 callback 得到 O。
withLatestFrom 很常用在一些 checkbox 型的功能,例如說一個編輯器,我們開啟粗體后,打出來的字就都要變粗體,粗體就像是 some observable,而我們打字就是 main observable。
scan
scan 其實就是 Observable 版本的 reduce 只是命名不同。如果熟悉數組操作的話,應該會知道原生的 JS Array 就有 reduce 的方法,使用方式如下
var arr = [1, 2, 3, 4];
var result = arr.reduce((origin, next) => {
console.log(origin)
return origin + next
}, 0);
console.log(result)
// 0
// 1
// 3
// 6
// 10
reduce 方法需要傳兩個參數,第一個是 callback 第二個則是起始狀態,這個 callback 執行時,會傳入兩個參數一個是原本的狀態,第二個是修改原本狀態的參數,最后回傳一個新的狀態,再繼續執行。
所以這段代碼是這樣執行的
- 第一次執行 callback 起始狀態是 0 所以 origin 傳入 0,next 為 arr 的第一個元素 1,相加之后變成 1 回傳並當作下一次的狀態。
- 第二次執行 callback,這時原本的狀態(origin)就變成了 1,next 為 arr 的第二個元素 2,相加之后變成 3 回傳並當作下一次的狀態。
- 第三次執行 callback,這時原本的狀態(origin)就變成了 3,next 為 arr 的第三個元素 3,相加之后變成 6 回傳並當作下一次的狀態。
- 第三次執行 callback,這時原本的狀態(origin)就變成了 6,next 為 arr 的第四個元素 4,相加之后變成 10 回傳並當作下一次的狀態。
- 這時 arr 的元素都已經遍歷過了,所以不會直接把 10 回傳。
scan 整體的運行方式都跟 reduce 一樣,示例如下
var source = Rx.Observable.from('hello')
.zip(Rx.Observable.interval(600), (x, y) => x);
var example = source.scan((origin, next) => origin + next, '');
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// h
// he
// hel
// hell
// hello
// complete
畫成 Marble Diagram
source : ----h----e----l----l----o|
scan((origin, next) => origin + next, '')
example: ----h----(he)----(hel)----(hell)----(hello)|
這里可以看到第一次傳入 'h' 跟 '' 相加,返回 'h' 當作下一次的初始狀態,一直重復下去。
scan 很常用在狀態的計算處理,最簡單的就是對一個數字的加減,我們可以綁定一個 button 的 click 事件,並用 map 把 click event 轉成 1,之后送處 scan 計算值再做顯示。
下面一個小示例,來示范如何做最簡單的加減
const addButton = document.getElementById('addButton');
const minusButton = document.getElementById('minusButton');
const state = document.getElementById('state');
const addClick = Rx.Observable.fromEvent(addButton, 'click').mapTo(1);
const minusClick = Rx.Observable.fromEvent(minusButton, 'click').mapTo(-1);
const numberState = Rx.Observable.empty()
.startWith(0)
.merge(addClick, minusClick)
.scan((origin, next) => origin + next, 0)
numberState
.subscribe({
next: (value) => { state.innerHTML = value;},
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
這里我們用了兩個 button,一個是 add 按鈕,一個是 minus 按鈕。
我把這兩個按鈕的點擊事件各建立了 addClcik, minusClick 兩個 observable,這兩個 observable 直接 mapTo(1) 跟 mapTo(-1),代表被點擊后會各自送出的數字!
接着我們用了 empty() 建立一個空的 observable 代表畫面上數字的狀態,搭配 startWith(0) 來設定初始值,接着用 merge 把兩個 observable 合並透過 scan 處理之后的邏輯,最后在 subscribe 來更改畫面的顯示。
buffer
buffer 是一整個家族,總共有五個相關的 operators
- buffer
- bufferCount
- bufferTime
- bufferToggle
- bufferWhen
這里比較常用到的是 buffer, bufferCount 跟 bufferTime 這三個,我們直接來看示例。
var source = Rx.Observable.interval(300);
var source2 = Rx.Observable.interval(1000);
var example = source.buffer(source2);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// [0,1,2]
// [3,4,5]
// [6,7,8]...
畫成 Marble Diagram 則像是
source : --0--1--2--3--4--5--6--7..
source2: ---------0---------1--------...
buffer(source2)
example: ---------([0,1,2])---------([3,4,5])
buffer 要傳入一個 observable(source2),它會把原本的 observable (source)送出的元素緩存在數組中,等到傳入的 observable(source2) 送出元素時,就會觸發把緩存的元素送出。
這里的示例 source2 是每一秒就會送出一個元素,我們可以改用 bufferTime 簡潔的表達,如下
var source = Rx.Observable.interval(300);
var example = source.bufferTime(1000);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// [0,1,2]
// [3,4,5]
// [6,7,8]...
除了用時間來作緩存外,我們更常用數量來做緩存,示例如下
var source = Rx.Observable.interval(300);
var example = source.bufferCount(3);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// [0,1,2]
// [3,4,5]
// [6,7,8]...
在示例上,我們可以用 buffer 來做某個事件的過濾,例如像是滑鼠連點才能真的執行,這里我們一樣寫了一個小示例
const button = document.getElementById('demo');
const click = Rx.Observable.fromEvent(button, 'click')
const example = click
.bufferTime(500)
.filter(arr => arr.length >= 2);
example.subscribe({
next: (value) => { console.log('success'); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
這里我們只有在 500 毫秒內連點兩下,才能成功印出 'success',這個功能在某些特殊的需求中非常的好用,也能用在批次處理來降低 request 傳送的次數
delay
delay 可以延遲 observable 一開始發送元素的時間點,示例如下
var source = Rx.Observable.interval(300).take(5);
var example = source.delay(500);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// 4
當然直接從 log 出的訊息看,是完全看不出差異的
讓我們直接看 Marble Diagram
source : --0--1--2--3--4|
delay(500)
example: -------0--1--2--3--4|
從 Marble Diagram 可以看得出來,第一次送出元素的時間變慢了,雖然在這里看起來沒什么用,但是在 UI 操作上是非常有用的,這個部分我們最后示范。
delay 除了可以傳入毫秒以外,也可以傳入 Date 型別的資料,如下使用方式
var source = Rx.Observable.interval(300).take(5);
var example = source.delay(new Date(new Date().getTime() + 1000));
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
delayWhen
delayWhen 的作用跟 delay 很像,最大的差別是 delayWhen 可以影響每個元素,而且需要傳一個 callback 並回傳一個 observable,示例如下
var source = Rx.Observable.interval(300).take(5);
var example = source
.delayWhen(
x => Rx.Observable.empty().delay(100 * x * x)
);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
這時我們的 Marble Diagram 如下
source : --0--1--2--3--4|
.delayWhen(x => Rx.Observable.empty().delay(100 * x * x));
example: --0---1----2-----3-----4|
這里傳進來的 x 就是 source 送出的每個元素,這樣我們就能對每一個做延遲。
這里我們用 delay 來做一個小功能,這個功能很簡單就是讓多張照片跟着滑鼠跑,但每張照片不能跑一樣快!
首先我們准備六張大頭照,並且寫進 HTML
<img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover6.jpg" alt="">
<img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover5.jpg" alt="">
<img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover4.jpg" alt="">
<img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover3.jpg" alt="">
<img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover2.jpg" alt="">
<img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover1.jpg" alt="">
用 CSS 把 img 改成圓形,並加上邊筐以及絕對位置
img{
position: absolute;
border-radius: 50%;
border: 3px white solid;
transform: translate3d(0,0,0);
}
再來寫 JS,一樣第一步先抓 DOM
var imgList = document.getElementsByTagName('img');
第二步建立 observable
var movePos = Rx.Observable.fromEvent(document, 'mousemove')
.map(e => ({ x: e.clientX, y: e.clientY }))
第三步撰寫邏輯
function followMouse(DOMArr) {
const delayTime = 600;
DOMArr.forEach((item, index) => {
movePos
.delay(delayTime * (Math.pow(0.65, index) + Math.cos(index / 4)) / 2)
.subscribe(function (pos){
item.style.transform = 'translate3d(' + pos.x + 'px, ' + pos.y + 'px, 0)';
});
});
}
followMouse(Array.from(imgList))
這里我們把 imgList 從 Collection 轉成 Array 后傳入 followMouse(),並用 forEach 把每個 omg 取出並利用 index 來達到不同的 delay 時間,這個 delay 時間的邏輯大家可以自己想,最后 subscribe 就完成啦!
完整示例效果:
https://jsbin.com/hayixa/2/edit?html,css,js,output
debounce
跟 buffer、bufferTime 一樣,Rx 有 debounce 跟 debounceTime 一個是傳入 observable 另一個則是傳入毫秒,比較常用到的是 debounceTime,這里我們直接來看一個示例
var source = Rx.Observable.interval(300).take(5);
var example = source.debounceTime(1000);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 4
// complete
這里只印出 4 然后就結束了,因為 debounce 運行的方式是每次收到元素,他會先把元素 cache 住並等待一段時間,如果這段時間內已經沒有收到任何元素,則把元素送出;如果這段時間內又收到新的元素,則會把原本 cache 住的元素釋放掉並重新計時,不斷反復。
以現在這個示例來講,我們每 300 毫秒就會送出一個數值,但我們的 debounceTime 是 1000 毫秒,也就是說每次 debounce 收到元素還等不到 1000 毫秒,就會收到下一個新元素,然后重新等待 1000 毫秒,如此重復直到第五個元素送出時,observable 結束(complete)了,debounce 就直接送出元素。
以 Marble Diagram 表示如下
source : --0--1--2--3--4|
debounceTime(1000)
example: --------------4|
debounce 會在收到元素后等待一段時間,這很適合用來處理間歇行為,間歇行為就是指這個行為是一段一段的,例如要做 Auto Complete 時,我們要打字搜尋不會一直不斷的打字,可以等我們停了一小段時間后再送出,才不會每打一個字就送一次 request!
const searchInput = document.getElementById('searchInput');
const theRequestValue = document.getElementById('theRequestValue');
Rx.Observable.fromEvent(searchInput, 'input')
.debounceTime(300)
.map(e => e.target.value)
.subscribe((value) => {
theRequestValue.textContent = value;
// 在這里發 request
})
throttle
基本上每次看到 debounce 就會看到 throttle,他們兩個的作用都是要降低事件的觸發頻率,但行為上有很大的不同。
跟 debounce 一樣 RxJS 有 throttle 跟 throttleTime 兩個方法,一個是傳入 observable 另一個是傳入毫秒,比較常用到的也是 throttleTime,讓我們直接來看示例
var source = Rx.Observable.interval(300).take(5);
var example = source.throttleTime(1000);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 4
// complete
跟 debounce 的不同是 throttle 會先開放送出元素,等到有元素被送出就會沉默一段時間,等到時間過了又會開放發送元素。
throttle 比較像是控制行為的最高頻率,也就是說如果我們設定 1000 毫秒,那該事件頻率的最大值就是每秒觸發一次不會再更快,debounce 則比較像是必須等待的時間,要等到一定的時間過了才會收到元素。
throttle 更適合用在連續性行為,比如說 UI 動畫的運算過程,因為 UI 動畫是連續的,像我們之前在做拖拉時,就可以加上 throttleTime(12) 讓 mousemove event 不要發送的太快,避免畫面更新的速度跟不上樣式的切換速度。
distinct
它能幫我們把相同值的資料濾掉只留一筆,讓我們直接來看示例
var source = Rx.Observable.from(['a', 'b', 'c', 'a', 'b'])
.zip(Rx.Observable.interval(300), (x, y) => x);
var example = source.distinct()
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// a
// b
// c
// complete
如果用 Marble Diagram 表示如下
source : --a--b--c--a--b|
distinct()
example: --a--b--c------|
從上面的示例可以看得出來,當我們用 distinct 后,只要有重復出現的值就會被過濾掉。
另外我們可以傳入一個 selector callback function,這個 callback function 會傳入一個接收到的元素,並回傳我們真正希望比對的值,舉例如下
var source = Rx.Observable.from([{ value: 'a'}, { value: 'b' }, { value: 'c' }, { value: 'a' }, { value: 'c' }])
.zip(Rx.Observable.interval(300), (x, y) => x);
var example = source.distinct((x) => {
return x.value
});
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// {value: "a"}
// {value: "b"}
// {value: "c"}
// complete
這里可以看到,因為 source 送出的都是實例,而 js 事件的比對是比對內存位置,所以在這個例子中這些實例永遠不會相等,但實際上我們想比對的是實例中的 value,這時我們就可以傳入 selector callback,來選擇我們要比對的值。
distinct 傳入的 callback 在 RxJS 5 幾個 bate 版本中有過很多改變,現在網路上很多文章跟教學都是過時的,請讀者務必小心!
實際上 distinct() 會在背地里建立一個 Set,當接收到元素時會先去判斷 Set 內是否有相同的值,如果有就不送出,如果沒有則存到 Set 並送出。所以記得盡量不要直接把 distinct 用在一個無限的 observable 里,這樣很可能會讓 Set 越來越大,建議大家可以放第二個參數 flushes,或用 distinctUntilChanged
這里指的 Set 其實是 RxJS 自己實現的,跟 ES6 原生的 Set 行為也都一致,只是因為 ES6 的 Set 支持程度還並不理想,所以這里是直接用 JS 實現。
distinct 可以傳入第二個參數 flushes observable 用來清除暫存的資料,示例如下
var source = Rx.Observable.from(['a', 'b', 'c', 'a', 'c'])
.zip(Rx.Observable.interval(300), (x, y) => x);
var flushes = Rx.Observable.interval(1300);
var example = source.distinct(null, flushes);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// a
// b
// c
// c
// complete
這里我們用 Marble Diagram 比較好表示
source : --a--b--c--a--c|
flushes: ------------0---...
distinct(null, flushes);
example: --a--b--c-----c|
其實 flushes observable 就是在送出元素時,會把 distinct 的暫存清空,所以之后的暫存就會從頭來過,這樣就不用擔心暫存的 Set 越來愈大的問題,但其實我們平常不太會用這樣的方式來處理,通常會用另一個方法 distinctUntilChanged。
distinctUntilChanged
distinctUntilChanged 跟 distinct 一樣會把相同的元素過濾掉,但 distinctUntilChanged 只會跟最后一次送出的元素比較,不會每個都比,舉例如下
var source = Rx.Observable.from(['a', 'b', 'c', 'c', 'b'])
.zip(Rx.Observable.interval(300), (x, y) => x);
var example = source.distinctUntilChanged()
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// a
// b
// c
// b
// complete
這里 distinctUntilChanged 只會暫存一個元素,並在收到元素時跟暫存的元素比對,如果一樣就不送出,如果不一樣就把暫存的元素換成剛接收到的新元素並送出。
source : --a--b--c--c--b|
distinctUntilChanged()
example: --a--b--c-----b|
從 Marble Diagram 中可以看到,第二個 c 送出時剛好上一個就是 c 所以就被濾掉了,但最后一個 b 則跟上一個不同所以沒被濾掉。
distinctUntilChanged 是比較常在開發中上使用的,最常見的狀況是我們在做多方同步時。當我們有多個 Client,且每個 Client 有着各自的狀態,Server 會再一個 Client 需要變動時通知所有 Client 更新,但可能某些 Client 接收到新的狀態其實跟上一次收到的是相同的,這時我們就可用 distinctUntilChanged 方法只處理跟最后一次不相同的訊息,像是多方通話、多裝置的資訊同步都會有類似的情境。
catch
catch 是很常見的非同步錯誤處理方法,在 RxJS 中也能夠直接用 catch 來處理錯誤,在 RxJS 中的 catch 可以回傳一個 observable 來送出新的值,讓我們直接來看示例:
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.catch(error => Rx.Observable.of('h'));
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
這個示例我們每隔 500 毫秒會送出一個字串(String),並用字串的方法 toUpperCase() 來把字串的英文字母改成大寫,過程中可能未知的原因送出了一個數值(Number) 2 導致發生例外(數值沒有 toUpperCase 的方法),這時我們在后面接的 catch 就能抓到錯誤。
catch 可以回傳一個新的 Observable、Promise、Array 或任何 Iterable 的事件,來傳送之后的元素。
以我們的例子來說最后就會在送出 X 就結束,畫成 Marble Diagram 如下
source : ----a----b----c----d----2|
map(x => x.toUpperCase())
----a----b----c----d----X|
catch(error => Rx.Observable.of('h'))
example: ----a----b----c----d----h|
這里可以看到,當錯誤發生后就會進到 catch 並重新處理一個新的 observable,我們可以利用這個新的 observable 來送出我們想送的值。
也可以在遇到錯誤后,讓 observable 結束,如下
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.catch(error => Rx.Observable.empty());
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
回傳一個 empty 的 observable 來直接結束(complete)。
另外 catch 的 callback 能接收第二個參數,這個參數會接收當前的 observalbe,我們可以回傳當前的 observable 來做到重新執行,示例如下
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.catch((error, obs) => obs);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
這里可以看到我們直接回傳了當前的 obserable(其實就是 example)來重新執行,畫成 Marble Diagram 如下
source : ----a----b----c----d----2|
map(x => x.toUpperCase())
----a----b----c----d----X|
catch((error, obs) => obs)
example: ----a----b----c----d--------a----b----c----d--..
因為是我們只是簡單的示范,所以這里會一直無限循環,實務上通常會用在斷線重連的情境。
另上面的處理方式有一個簡化的寫法,叫做 retry()。
retry
如果我們想要一個 observable 發生錯誤時,重新嘗試就可以用 retry 這個方法,跟我們前一個講示例的行為是一致
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.retry();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
通常這種無限的 retry 會放在即時同步的重新連接,讓我們在連線斷掉后,不斷的嘗試。另外我們也可以設定只嘗試幾次,如下
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.retry(1);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// a
// b
// c
// d
// a
// b
// c
// d
// Error: TypeError: x.toUpperCase is not a function
這里我們對 retry 傳入一個數值 1,能夠讓我們只重復嘗試 1 次后送出錯誤,畫成 Marble Diagram 如下
source : ----a----b----c----d----2|
map(x => x.toUpperCase())
----a----b----c----d----X|
retry(1)
example: ----a----b----c----d--------a----b----c----d----X|
這種處理方式很適合用在 HTTP request 失敗的場景中,我們可以設定重新發送幾次后,再秀出錯誤訊息。
retryWhen
RxJS 還提供了另一種方法 retryWhen,他可以把例外發生的元素放到一個 observable 中,讓我們可以直接操作這個 observable,並等到這個 observable 操作完后再重新訂閱一次原本的 observable。
這里我們直接來看代碼
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.retryWhen(errorObs => errorObs.delay(1000));
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
這里 retryWhen 我們傳入一個 callback,這個 callback 有一個參數會傳入一個 observable,這個 observable 不是原本的 observable(example) 而是例外事件送出的錯誤所組成的一個 observable,我們可以對這個由錯誤所組成的 observable 做操作,等到這次的處理完成后就會重新訂閱我們原本的 observable。
這個示例我們是把錯誤的 observable 送出錯誤延遲 1 秒,這會使后面重新訂閱的動作延遲 1 秒才執行,畫成 Marble Diagram 如下
source : ----a----b----c----d----2|
map(x => x.toUpperCase())
----a----b----c----d----X|
retryWhen(errorObs => errorObs.delay(1000))
example: ----a----b----c----d-------------------a----b----c----d----...
從上圖可以看到后續重新訂閱的行為就被延后了,但實務上我們不太會用 retryWhen 來做重新訂閱的延遲,通常是直接用 catch 做到這件事。這里只是為了示范 retryWhen 的行為,實務上我們通常會把 retryWhen 拿來做錯誤通知或是例外收集,如下
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.retryWhen(
errorObs => errorObs.map(err => fetch('...')));
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
這里的 errorObs.map(err => fetch('...')) 可以把 errorObs 里的每個錯誤變成 API 的發送,通常這里個 API 會像是送訊息到公司的通訊頻道(Slack 等等),這樣可以讓工程師馬上知道可能哪個 API 掛了,這樣我們就能即時地處理。
retryWhen 實際上是在背地里建立一個 Subject 並把錯誤放入,會在對這個 Subject 進行內部的訂閱,另外記得這個 observalbe 預設是無限的,如果我們把它結束,原本的 observable 也會跟着結束。
repeat
我們有時候可能會想要 retry 一直重復訂閱的效果,但沒有錯誤發生,這時就可以用 repeat 來做到這件事,示例如下
var source = Rx.Observable.from(['a','b','c'])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source.repeat(1);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// a
// b
// c
// a
// b
// c
// complete
這里 repeat 的行為跟 retry 基本一致,只是 retry 只有在例外發生時才觸發,畫成 Marble Diagram 如下
source : ----a----b----c|
repeat(1)
example: ----a----b----c----a----b----c|
同樣的我們可以不給參數讓他無限循環,如下
var source = Rx.Observable.from(['a','b','c'])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source.repeat();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
這樣我們就可以做動不斷重復的行為,這個可以在建立輪詢時使用,讓我們不斷地發 request 來更新畫面。
最后我們來看一個錯誤處理在實際應用中的小示例
const title = document.getElementById('title');
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x)
.map(x => x.toUpperCase());
// 通常 source 會是建立即時同步的連線,像是 web socket
var example = source.catch(
(error, obs) => Rx.Observable.empty()
.startWith('連線發生錯誤: 5秒后重連')
.concat(obs.delay(5000))
);
example.subscribe({
next: (value) => { title.innerText = value },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
這個示例其實就是模仿在即時同步斷線時,利用 catch 返回一個新的 observable,這個 observable 會先送出錯誤訊息並且把原本的 observable 延遲 5 秒再做合並,雖然這只是一個模仿,但它清楚的展示了 RxJS 在做錯誤處理時的靈活性。
concatAll
concatAll 最重要的重點就是他會處理完前一個 observable 才會在處理下一個 observable,讓我們來看一個示例
var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000));
var example = source.concatAll();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// (點擊后)
// 0
// 1
// 2
// 3
// 4
// 5 ...
上面這段代碼,當我們點擊畫面時就會開始送出數值,如果用 Marble Diagram 表示如下
click : ---------c-c------------------c--..
map(e => Rx.Observable.interval(1000))
source : ---------o-o------------------o--..
\ \
\ ----0----1----2----3----4--...
----0----1----2----3----4--...
concatAll()
example: ----------------0----1----2----3----4--..
從 Marble Diagram 可以看得出來,當我們點擊一下 click 事件會被轉成一個 observable 而這個 observable 會每一秒送出一個遞增的數值,當我們用 concatAll 之后會把二維的 observable 攤平成一維的 observable,但 concatAll 會一個一個處理,一定是等前一個 observable 完成(complete)才會處理下一個 observable,因為現在送出 observable 是無限的永遠不會完成(complete),就導致他永遠不會處理第二個送出的 observable!
我們再看一個例子
var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000).take(3));
var example = source.concatAll();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
現在我們把送出的 observable 限制只取前三個元素,用 Marble Diagram 表示如下
click : ---------c-c------------------c--..
map(e => Rx.Observable.interval(1000))
source : ---------o-o------------------o--..
\ \ \
\ ----0----1----2| ----0----1----2|
----0----1----2|
concatAll()
example: ----------------0----1----2----0----1----2--..
這里我們把送出的 observable 變成有限的,只會送出三個元素,這時就能看得出來 concatAll 不管兩個 observable 送出的時間多么相近,一定會先處理前一個 observable 再處理下一個。
switch
switch 同樣能把二維的 observable 攤平成一維的,但他們在行為上有很大的不同,我們來看下面這個示例
var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000));
var example = source.switch();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
用 Marble Diagram 表示如下
click : ---------c-c------------------c--..
map(e => Rx.Observable.interval(1000))
source : ---------o-o------------------o--..
\ \ \----0----1--...
\ ----0----1----2----3----4--...
----0----1----2----3----4--...
switch()
example: -----------------0----1----2--------0----1--...
switch 最重要的就是他會在新的 observable 送出后直接處理新的 observable 不管前一個 observable 是否完成,每當有新的 observable 送出就會直接把舊的 observable 退訂(unsubscribe),永遠只處理最新的 observable!
所以在這上面的 Marble Diagram 可以看得出來第一次送出的 observable 跟第二次送出的 observable 時間點太相近,導致第一個 observable 還來不及送出元素就直接被退訂了,當下一次送出 observable 就又會把前一次的 observable 退訂。
mergeAll
我們之前講過 merge 他可以讓多個 observable 同時送出元素,mergeAll 也是同樣的道理,它會把二維的 observable 轉成一維的,並且能夠同時處理所有的 observable,讓我們來看這個示例
var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000));
var example = source.mergeAll();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
上面這段代碼用 Marble Diagram 表示如下
click : ---------c-c------------------c--..
map(e => Rx.Observable.interval(1000))
source : ---------o-o------------------o--..
\ \ \----0----1--...
\ ----0----1----2----3----4--...
----0----1----2----3----4--...
switch()
example: ----------------00---11---22---33---(04)4--...
從 Marble Diagram 可以看出來,所有的 observable 是並行(Parallel)處理的,也就是說 mergeAll 不會像 switch 一樣退訂(unsubscribe)原先的 observable 而是並行處理多個 observable。以我們的示例來說,當我們點擊越多下,最后送出的頻率就會越快。
另外 mergeAll 可以傳入一個數值,這個數值代表他可以同時處理的 observable 數量,我們來看一個例子
var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000).take(3));
var example = source.mergeAll(2);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
這里我們送出的 observable 改成取前三個,並且讓 mergeAll 最多只能同時處理 2 個 observable,用 Marble Diagram 表示如下
click : ---------c-c----------o----------..
map(e => Rx.Observable.interval(1000))
source : ---------o-o----------c----------..
\ \ \----0----1----2|
\ ----0----1----2|
----0----1----2|
mergeAll(2)
example: ----------------00---11---22---0----1----2--..
當 mergeAll 傳入參數后,就會等處理中的其中一個 observable 完成,再去處理下一個。以我們的例子來說,前面兩個 observabel 可以被並行處理,但第三個 observable 必須等到第一個 observable 結束后,才會開始。
我們可以利用這個參數來決定要同時處理幾個 observable,如果我們傳入 1 其行為就會跟 concatAll 是一模一樣的,這點在原始碼可以看到他們是完全相同的。
concatMap
concatMap 其實就是 map 加上 concatAll 的簡化寫法,我們直接來看一個示例
var source = Rx.Observable.fromEvent(document.body, 'click');
var example = source
.map(e => Rx.Observable.interval(1000).take(3))
.concatAll();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
上面這個示例就可以簡化成
var source = Rx.Observable.fromEvent(document.body, 'click');
var example = source
.concatMap(
e => Rx.Observable.interval(100).take(3)
);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
前后兩個行為是一致的,記得 concatMap 也會先處理前一個送出的 observable 在處理下一個 observable,畫成 Marble Diagram 如下
source : -----------c--c------------------...
concatMap(c => Rx.Observable.interval(100).take(3))
example: -------------0-1-2-0-1-2---------...
這樣的行為也很常被用在發送 request 如下
function getPostData() {
return fetch('https://jsonplaceholder.typicode.com/posts/1')
.then(res => res.json())
}
var source = Rx.Observable.fromEvent(document.body, 'click');
var example = source.concatMap(
e => Rx.Observable.from(getPostData()));
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
這里我們每點擊一下畫面就會送出一個 HTTP request,如果我們快速的連續點擊,大家可以在開發者工具的 network 看到每個 request 是等到前一個 request 完成才會送出下一個 request
concatMap 還有第二個參數是一個 selector callback,這個 callback 會傳入四個參數,分別是
- 外部 observable 送出的元素
- 內部 observable 送出的元素
- 外部 observable 送出元素的 index
- 內部 observable 送出元素的 index
回傳值我們想要的值,示例如下
function getPostData() {
return fetch('https://jsonplaceholder.typicode.com/posts/1')
.then(res => res.json())
}
var source = Rx.Observable.fromEvent(document.body, 'click');
var example = source.concatMap(
e => Rx.Observable.from(getPostData()),
(e, res, eIndex, resIndex) => res.title);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
這個示例的外部 observable 送出的元素就是 click event 實例,內部 observable 送出的元素就是 response 實例,這里我們回傳 response 實例的 title 屬性,這樣一來我們就可以直接收到 title,這個方法很適合用在 response 要選取的值跟前一個事件或順位(index)相關時。
switchMap
switchMap 其實就是 map 加上 switch 簡化的寫法,如下
var source = Rx.Observable.fromEvent(document.body, 'click');
var example = source
.map(e => Rx.Observable.interval(1000).take(3))
.switch();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
上面的代碼可以簡化成
var source = Rx.Observable.fromEvent(document.body, 'click');
var example = source
.switchMap(
e => Rx.Observable.interval(100).take(3)
);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
畫成 Marble Diagram 表示如下
source : -----------c--c-----------------...
concatMap(c => Rx.Observable.interval(100).take(3))
example: -------------0--0-1-2-----------...
只要注意一個重點 switchMap 會在下一個 observable 被送出后直接退訂前一個未處理完的 observable
另外我們也可以把 switchMap 用在發送 HTTP request
function getPostData() {
return fetch('https://jsonplaceholder.typicode.com/posts/1')
.then(res => res.json())
}
var source = Rx.Observable.fromEvent(document.body, 'click');
var example = source.switchMap(
e => Rx.Observable.from(getPostData()));
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
如果我們快速的連續點擊五下,可以在開發者工具的 network 看到每個 request 會在點擊時發送
雖然我們發送了多個 request 但最后真正印出來的 log 只會有一個,代表前面發送的 request 已經不會造成任何的 side-effect 了,這個很適合用在只看最后一次 request 的情境,比如說 自動完成(auto complete),我們只需要顯示使用者最后一次打在畫面上的文字,來做建議選項而不用每一次的。
switchMap 跟 concatMap 一樣有第二個參數 selector callback 可用來回傳我們要的值,這部分的行為跟 concatMap 是一樣的,這里就不再贅述。
mergeMap
mergeMap 其實就是 map 加上 mergeAll 簡化的寫法,如下
var source = Rx.Observable.fromEvent(document.body, 'click');
var example = source
.map(e => Rx.Observable.interval(1000).take(3))
.mergeAll();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
上面的代碼可以簡化成
var source = Rx.Observable.fromEvent(document.body, 'click');
var example = source
.mergeMap(
e => Rx.Observable.interval(100).take(3)
);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
畫成 Marble Diagram 表示
source : -----------c-c------------------...
concatMap(c => Rx.Observable.interval(100).take(3))
example: -------------0-(10)-(21)-2----------...
記得 mergeMap 可以並行處理多個 observable,以這個例子來說當我們快速點按兩下,元素發送的時間點是有機會重疊的
另外我們也可以把 switchMap 用在發送 HTTP request
function getPostData() {
return fetch('https://jsonplaceholder.typicode.com/posts/1')
.then(res => res.json())
}
var source = Rx.Observable.fromEvent(document.body, 'click');
var example = source.mergeMap(
e => Rx.Observable.from(getPostData()));
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
如果我們快速的連續點擊五下,大家可以在開發者工具的 network 看到每個 request 會在點擊時發送並且會 log 出五個實例
mergeMap 也能傳入第二個參數 selector callback,這個 selector callback 跟 concatMap 第二個參數也是完全一樣的,但 mergeMap 的重點是我們可以傳入第三個參數,來限制並行處理的數量
function getPostData() {
return fetch('https://jsonplaceholder.typicode.com/posts/1')
.then(res => res.json())
}
var source = Rx.Observable.fromEvent(document.body, 'click');
var example = source.mergeMap(
e => Rx.Observable.from(getPostData()),
(e, res, eIndex, resIndex) => res.title, 3);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
這里我們傳入 3 就能限制,HTTP request 最多只能同時送出 3 個,並且要等其中一個完成在處理下一個
連續點按了五下,但第四個 request 是在第一個完成后才送出的,這個很適合用在特殊的需求下,可以限制同時發送的 request 數量。
RxJS 5 還保留了 mergeMap 的別名叫 flatMap,雖然官方文件上沒有,但這兩個方法是完全一樣的。請參考https://github.com/ReactiveX/RxJS/issues/333
switchMap, mergeMap, concatMap
這三個 operators 還有一個共同的特性,那就是這三個 operators 可以把第一個參數所回傳的 promise 實例直接轉成 observable,這樣我們就不用再用 Rx.Observable.from 轉一次,如下
function getPersonData() {
return fetch('https://jsonplaceholder.typicode.com/posts/1')
.then(res => res.json())
}
var source = Rx.Observable.fromEvent(document.body, 'click');
var example = source.concatMap(e => getPersonData());
//直接回傳 promise 實例
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
至於在使用上要如何選擇這三個 operators? 其實都還是看使用情境而定,這里筆者簡單列一下大部分的使用情境
- concatMap 用在可以確定內部的 observable 結束時間比外部 observable 發送時間來快的情境,並且不希望有任何並行處理行為,適合少數要一次一次完成到底的的 UI 動畫或特別的 HTTP request 行為。
- switchMap 用在只要最后一次行為的結果,適合絕大多數的使用情境。
- mergeMap 用在並行處理多個 observable,適合需要並行處理的行為,像是多個 I/O 的並行處理。
建議初學者不確定選哪一個時,使用 switchMap
在使用 concatAll 或 concatMap 時,請注意內部的 observable 一定要能夠的結束,且外部的 observable 發送元素的速度不能比內部的 observable 結束時間快太多,不然會有 memory issues
window
window 是一整個家族,總共有五個相關的 operators
- window
- windowCount
- windowTime
- windowToggle
- windowWhen
這里我們只介紹 window 跟 windowToggle 這兩個方法,其他三個的用法相對都簡單很多,大家如果有需要可以再自行到官網查看。
window 很類似 buffer 可以把一段時間內送出的元素拆出來,只是 buffer 是把元素拆分到數組中變成
Observable<T> => Observable<Array<T>>
而 window 則是會把元素拆分出來放到新的 observable 變成
Observable<T> => Observable<Observable<T>>
buffer 是把拆分出來的元素放到數組並送出數組;window 是把拆分出來的元素放到 observable 並送出 observable,讓我們來看一個例子
var click = Rx.Observable.fromEvent(document, 'click');
var source = Rx.Observable.interval(1000);
var example = source.window(click);
example
.switch()
.subscribe(console.log);
// 0
// 1
// 2
// 3
// 4
// 5 ...
首先 window 要傳入一個 observable,每當這個 observable 送出元素時,就會把正在處理的 observable 所送出的元素放到新的 observable 中並送出,這里看 Marble Diagram 會比較好解釋
click : -----------c----------c------------c--
source : ----0----1----2----3----4----5----6---..
window(click)
example: o----------o----------o------------o--
\ \ \
---0----1-|--2----3--|-4----5----6|
switch()
: ----0----1----2----3----4----5----6---...
這里可以看到 example 變成發送 observable 會在每次 click 事件發送出來后結束,並繼續下一個 observable,這里我們用 switch 才把它攤平。
當然這個范例只是想單存的表達 window 的作用,沒什么太大的意義,實務上 window 會搭配其他的 operators 使用,例如我們想計算一秒鍾內觸發了幾次 click 事件
var click = Rx.Observable.fromEvent(document, 'click');
var source = Rx.Observable.interval(1000);
var example = click.window(source)
example
.map(innerObservable => innerObservable.count())
.switch()
.subscribe(console.log);
注意這里我們把 source 跟 click 對調了,並用到了 observable 的一個方法 count(),可以用來取得 observable 總共送出了幾個元素,用 Marble Diagram 表示如下
source : ---------0---------1---------2--...
click : --cc---cc----c-c----------------...
window(source)
example: o--------o---------o---------o--..
\ \ \ \
-cc---cc|---c-c---|---------|--..
count()
: o--------o---------o---------o--
\ \ \ \
-------4|--------2|--------0|--..
switch()
: ---------4---------2---------0--...
從 Marble Diagram 中可以看出來,我們把部分元素放到新的 observable 中,就可以利用 Observable 的方法做更靈活的操作
windowToggle
windowToggle 不像 window 只能控制內部 observable 的結束,windowToggle 可以傳入兩個參數,第一個是開始的 observable,第二個是一個 callback 可以回傳一個結束的 observable,讓我們來看范例
var source = Rx.Observable.interval(1000);
var mouseDown = Rx.Observable.fromEvent(document, 'mousedown');
var mouseUp = Rx.Observable.fromEvent(document, 'mouseup');
var example = source
.windowToggle(mouseDown, () => mouseUp)
.switch();
example.subscribe(console.log);
一樣用 Marble Diagram 會比較好解釋
source : ----0----1----2----3----4----5--...
mouseDown: -------D------------------------...
mouseUp : ---------------------------U----...
windowToggle(mouseDown, () => mouseUp)
: -------o-------------------------...
\
-1----2----3----4--|
switch()
example : ---------1----2----3----4---------...
從 Marble Diagram 可以看得出來,我們用 windowToggle 拆分出來內部的 observable 始於 mouseDown 終於 mouseUp。
groupBy
最后我們來講一個開發上比較常用的 operators - groupBy,它可以幫我們把相同條件的元素拆分成一個 Observable,其實就跟平常在 SQL 下是一樣個概念,我們先來看個簡單的例子
var source = Rx.Observable.interval(300).take(5);
var example = source
.groupBy(x => x % 2);
example.subscribe(console.log);
// GroupObservable { key: 0, ...}
// GroupObservable { key: 1, ...}
上面的例子,我們傳入了一個 callback function 並回傳 groupBy 的條件,就能區分每個元素到不同的 Observable 中,用 Marble Diagram 表示如下
source : ---0---1---2---3---4|
groupBy(x => x % 2)
example: ---o---o------------|
\ \
\ 1-------3----|
0-------2-------4|
在實際上,我們可以拿 groupBy 做完元素的區分后,再對 inner Observable 操作,例如下面這個例子我們將每個人的分數作加總再送出
var people = [
{name: 'Anna', score: 100, subject: 'English'},
{name: 'Anna', score: 90, subject: 'Math'},
{name: 'Anna', score: 96, subject: 'Chinese' },
{name: 'Jerry', score: 80, subject: 'English'},
{name: 'Jerry', score: 100, subject: 'Math'},
{name: 'Jerry', score: 90, subject: 'Chinese' },
];
var source = Rx.Observable.from(people)
.zip(
Rx.Observable.interval(300),
(x, y) => x);
var example = source
.groupBy(person => person.name)
.map(group => group.reduce((acc, curr) => ({
name: curr.name,
score: curr.score + acc.score
})))
.mergeAll();
example.subscribe(console.log);
// { name: "Anna", score: 286 }
// { name: 'Jerry', score: 270 }
這里我們范例是想把 Jerry 跟 Anna 的分數個別作加總,畫成 Marble Diagram 如下
source : --o--o--o--o--o--o|
groupBy(person => person.name)
: --i--------i------|
\ \
\ o--o--o|
o--o--o--|
map(group => group.reduce(...))
: --i---------i------|
\ \
o| o|
mergeAll()
example: --o---------o------|
Subject
Subject 實際上就是 Observer Pattern 的實例,他會在內部管理一份 observer 的清單,並在接收到值時遍歷這份清單並送出值,所以我們可以這樣用 Subject
var subject = new Rx.Subject();
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
subject.subscribe(observerA);
subject.subscribe(observerB);
subject.next(1);
// "A next: 1"
// "B next: 1"
subject.next(2);
// "A next: 2"
// "B next: 2"
這里我們可以直接用 subject 的 next 方法傳送值,所有訂閱的 observer 就會接收到,又因為 Subject 本身是 Observable,所以這樣的使用方式很適合用在某些無法直接使用 Observable 的前端框架中,例如在 React 想對 DOM 的事件做監聽
class MyButton extends React.Component {
constructor(props) {
super(props);
this.state = { count: 0 };
this.subject = new Rx.Subject();
this.subject
.mapTo(1)
.scan((origin, next) => origin + next)
.subscribe(x => {
this.setState({ count: x })
})
}
render() {
return <button onClick={event => this.subject.next(event)}>{this.state.count}</button>
}
}
從上面的代碼可以看出來,因為 React 本身 API 的關系,如果我們想要用 React 自訂的事件,我們沒辦法直接使用 Observable 的 creation operator 建立 observable,這時就可以靠 Subject 來做到這件事。
Subject 因為同時是 observer 和 observable,所以應用面很廣除了前面所提的之外,還有上一篇文章講的組播(multicase)特性也會在接下來的文章做更多應用的介紹,這里先讓我們來看看 Subject 的三個變形。
BehaviorSubject
很多時候我們會希望 Subject 能代表當下的狀態,而不是單存的事件發送,也就是說如果今天有一個新的訂閱,我們希望 Subject 能立即給出最新的值,而不是沒有回應,例如下面這個例子
var subject = new Rx.Subject();
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
subject.subscribe(observerA);
subject.next(1);
// "A next: 1"
subject.next(2);
// "A next: 2"
subject.next(3);
// "A next: 3"
setTimeout(() => {
subject.subscribe(observerB); // 3 秒后才訂閱,observerB 不會收到任何值。
},3000)
以上面這個例子來說,observerB 訂閱的之后,是不會有任何元素送給 observerB 的,因為在這之后沒有執行任何 subject.next(),但很多時候我們會希望 subject 能夠表達當前的狀態,在一訂閱時就能收到最新的狀態是什么,而不是訂閱后要等到有變動才能接收到新的狀態,以這個例子來說,我們希望 observerB 訂閱時就能立即收到 3,希望做到這樣的效果就可以用 BehaviorSubject。
BehaviorSubject 跟 Subject 最大的不同就是 BehaviorSubject 是用來呈現當前的值,而不是單純的發送事件。BehaviorSubject 會記住最新一次發送的元素,並把該元素當作目前的值,在使用上 BehaviorSubject 建構式需要傳入一個參數來代表起始的狀態,范例如下
var subject = new Rx.BehaviorSubject(0); // 0 為起始值
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
subject.subscribe(observerA);
// "A next: 0"
subject.next(1);
// "A next: 1"
subject.next(2);
// "A next: 2"
subject.next(3);
// "A next: 3"
setTimeout(() => {
subject.subscribe(observerB);
// "B next: 3"
},3000)
從上面這個范例可以看得出來 BehaviorSubject 在建立時就需要給定一個狀態,並在之后任何一次訂閱,就會先送出最新的狀態。其實這種行為就是一種狀態的表達而非單存的事件,就像是年齡跟生日一樣,年齡是一種狀態而生日就是事件;所以當我們想要用一個 stream 來表達年齡時,就應該用 BehaviorSubject。
ReplaySubject
在某些時候我們會希望 Subject 代表事件,但又能在新訂閱時重新發送最后的幾個元素,這時我們就可以用 ReplaySubject,范例如下
var subject = new Rx.ReplaySubject(2); // 重復發送最后 2 個元素
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
subject.subscribe(observerA);
subject.next(1);
// "A next: 1"
subject.next(2);
// "A next: 2"
subject.next(3);
// "A next: 3"
setTimeout(() => {
subject.subscribe(observerB);
// "B next: 2"
// "B next: 3"
},3000)
可能會有人以為 ReplaySubject(1) 是不是就等同於 BehaviorSubject,其實是不一樣的,BehaviorSubject 在建立時就會有起始值,比如 BehaviorSubject(0) 起始值就是 0,BehaviorSubject 是代表着狀態而 ReplaySubject 只是事件的重放而已。
AsyncSubject
AsyncSubject 是最怪的一個變形,他有點像是 operator last,會在 subject 結束后送出最后一個值,范例如下
var subject = new Rx.AsyncSubject();
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
subject.subscribe(observerA);
subject.next(1);
subject.next(2);
subject.next(3);
subject.complete();
// "A next: 3"
// "A complete!"
setTimeout(() => {
subject.subscribe(observerB);
// "B next: 3"
// "B complete!"
},3000)
從上面的代碼可以看出來,AsyncSubject 會在 subject 結束后才送出最后一個值,其實這個行為跟 Promise 很像,都是等到事情結束后送出一個值,但實務上我們非常非常少用到 AsyncSubject,絕大部分的時候都是使用 BehaviorSubject 跟 ReplaySubject 或 Subject。
multicast
multicast 可以用來掛載 subject 並回傳一個可連結(connectable)的 observable,如下
var source = Rx.Observable.interval(1000)
.take(3)
.multicast(new Rx.Subject());
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
source.subscribe(observerA); // subject.subscribe(observerA)
source.connect(); // source.subscribe(subject)
setTimeout(() => {
source.subscribe(observerB); // subject.subscribe(observerB)
}, 1000);
上面這段代碼我們透過 multicast 來掛載一個 subject 之后這個 observable(source) 的訂閱其實都是訂閱到 subject 上。
source.subscribe(observerA); // subject.subscribe(observerA)
必須真的等到 執行 connect() 后才會真的用 subject 訂閱 source,並開始送出元素,如果沒有執行 connect() observable 是不會真正執行的。
source.connect();
另外值得注意的是這里要退訂的話,要把 connect() 回傳的 subscription 退訂才會真正停止 observable 的執行,如下
var source = Rx.Observable.interval(1000)
.do(x => console.log('send: ' + x))
.multicast(new Rx.Subject()); // 無限的 observable
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
var subscriptionA = source.subscribe(observerA);
var realSubscription = source.connect();
var subscriptionB;
setTimeout(() => {
subscriptionB = source.subscribe(observerB);
}, 1000);
setTimeout(() => {
subscriptionA.unsubscribe();
subscriptionB.unsubscribe();
// 這里雖然 A 跟 B 都退訂了,但 source 還會繼續送元素
}, 5000);
setTimeout(() => {
realSubscription.unsubscribe();
// 這里 source 才會真正停止送元素
}, 7000);
上面這段的代碼,必須等到 realSubscription.unsubscribe() 執行完,source 才會真的結束。
雖然用了 multicast 感覺會讓我們處理的對象少一點,但必須搭配 connect 一起使用還是讓代碼有點復雜,通常我們會希望有 observer 訂閱時,就立即執行並發送元素,而不要再多執行一個方法(connect),這時我們就可以用 refCount。
refCount
refCount 必須搭配 multicast 一起使用,他可以建立一個只要有訂閱就會自動 connect 的 observable,范例如下
var source = Rx.Observable.interval(1000)
.do(x => console.log('send: ' + x))
.multicast(new Rx.Subject())
.refCount();
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
var subscriptionA = source.subscribe(observerA);
// 訂閱數 0 => 1
var subscriptionB;
setTimeout(() => {
subscriptionB = source.subscribe(observerB);
// 訂閱數 0 => 2
}, 1000);
上面這段代碼,當 source 一被 observerA 訂閱時(訂閱數從 0 變成 1),就會立即執行並發送元素,我們就不需要再額外執行 connect。
同樣的在退訂時只要訂閱數變成 0 就會自動停止發送
var source = Rx.Observable.interval(1000)
.do(x => console.log('send: ' + x))
.multicast(new Rx.Subject())
.refCount();
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
var subscriptionA = source.subscribe(observerA);
// 訂閱數 0 => 1
var subscriptionB;
setTimeout(() => {
subscriptionB = source.subscribe(observerB);
// 訂閱數 0 => 2
}, 1000);
setTimeout(() => {
subscriptionA.unsubscribe(); // 訂閱數 2 => 1
subscriptionB.unsubscribe(); // 訂閱數 1 => 0,source 停止發送元素
}, 5000);
publish
其實 multicast(new Rx.Subject()) 很常用到,我們有一個簡化的寫法那就是 publish,下面這兩段代碼是完全等價的
var source = Rx.Observable.interval(1000)
.publish()
.refCount();
// var source = Rx.Observable.interval(1000)
// .multicast(new Rx.Subject())
// .refCount();
加上 Subject 的三種變形
1.
var source = Rx.Observable.interval(1000)
.publishReplay(1)
.refCount();
// var source = Rx.Observable.interval(1000)
// .multicast(new Rx.ReplaySubject(1))
// .refCount();
2.
var source = Rx.Observable.interval(1000)
.publishBehavior(0)
.refCount();
// var source = Rx.Observable.interval(1000)
// .multicast(new Rx.BehaviorSubject(0))
// .refCount();
3.
var source = Rx.Observable.interval(1000)
.publishLast()
.refCount();
// var source = Rx.Observable.interval(1000)
// .multicast(new Rx.AsyncSubject(1))
// .refCount();
share
另外 publish + refCount 可以再簡寫成 share
var source = Rx.Observable.interval(1000)
.share();
// var source = Rx.Observable.interval(1000)
// .publish()
// .refCount();
// var source = Rx.Observable.interval(1000)
// .multicast(new Rx.Subject())
// .refCount();