angular2 學習筆記 ( rxjs 流 )


更新: 2020-05-12

rxjs 有很多操作, 如果遇到不夠用的時候也可以自己寫.

寫法超級簡單. 

寫一個方法, 接受 Observable 返回 new Observable 就可以了.

然后就是調用時的參數. 很多時候我們會傳入其它 obs 作為監聽

返回的 obs 我們要記得寫 unsubscribe 方法,一旦被 unsubscribe 就去釋放其它監聽.

這樣就很安全了.

比如 :

function skipAfter<T>(skip$: Observable<void>): (source: Observable<T>) => Observable<T> {
  return (source: Observable<T>) => {
    return new Observable(subscribe => {
      const subscription = new Subscription();
      let skip = false;

      subscription.add(
        skip$.subscribe(() => {
          skip = true;
        })
      );

      subscription.add(
        source.subscribe({
          complete: () => subscribe.complete(),
          error: error => subscribe.error(error),
          next: v => {
            if (skip) {
              skip = false;
            }
            else {
              subscribe.next(v);
            }
          }
        })
      )

      return () => {
        subscription.unsubscribe();
      }
    });
  }
}

使用起來時這樣

    const mainSubject = new Subject<string>();
    const skipSubject = new Subject<void>();

    const s = mainSubject.pipe(
      skipAfter(skipSubject)
    ).subscribe(v => {
      console.log(v);
    });

    mainSubject.next('1');
    mainSubject.next('2');
    skipSubject.next();
    mainSubject.next('3');
    mainSubject.next('4');
    skipSubject.next();
    skipSubject.next();
    mainSubject.next('5');
    mainSubject.next('6');
    s.unsubscribe();

result : 

 

 

 

更新: 2020-04-03

今天寫了一個 bug,來梳理一下 

subject, observalbe, multicast, connect, publishReplay, refCount, ShareReplay

obs 是一個方法, 你 sub 它就等於調用它. 

const o = new Observable((observer => {
  let i = 0;
  const timer = setInterval(() => {
    console.log('interval run');
    observer.next(i++);
  }, 1000);
  return () => {
    window.clearInterval(timer);
  };
}));

比如上面這個, o.sub 就會調用里面的函數, 創建一個 interval, 你 sub 多少次就有多少個 interval 被創建. 

全部是獨立的. 最后返回的是 unsubscribe 后觸發的事情, 就是 clear interval. 

如果我們不希望這樣,而希望它像觀察者那樣,大家公用一個 interval, 然后后面來的 sub 也立馬拿到當前的值, 那么我們需要 subject 

它的做法就是只 sub 一次 obs, 然后通過 ReplaySubject.next 去廣播給后邊的人. 

subject 不是函數,所以它不存在每次 sub 都會創建新的東西那樣. 它只是把 sub 的方法存在 array 而已,觀察者模式的實現. 

const o1 = o.pipe(
  multicast(new ReplaySubject<number>(1)),
) as ConnectableObservable<number>;
const subscrition = o1.connect();
o1.subscribe(v => console.log(v));
subscrition.unsubscribe();

當 connect 被調用, interval 開始

當 connect 返回的 subscrition 被退訂后, interval 就結束了

然后其它人 sub 的其實是 multicast 里得 ReplaySubject.

    const o1 = o.pipe(
      multicast(new ReplaySubject<number>(1)),
      refCount(),
    ) as ConnectableObservable<number>;
    o1.subscribe(v => console.log(v));

refCount 是用來替代手動 connect 的. 它的邏輯是, 當第一個人 sub 的時候去 connect 

同時當所有人都退訂后去 disconnect

今天寫的就是這個 bug, 我的第一個訂閱是 toPromise, 執行完后立馬就退訂了. 導致后續的人 sub 的時候又去發 ajax 了. 

所以 refCount 也不是每次都好用的. 

multicast 可以用 publishReplay 來替代

multicast(new ReplaySubject<number>(1))

可以改寫為 

publishReplay(1)

而 publishReplay + refCount 不可以用 shareReplay 來替代, 在原 source complete 和 error 的時候, share replay 會創建新的訂閱, 而 publish 只是直接返回 complete 狀態, 這點不同。

publishReplay(1),
refCount(),

不可以用 shareReplay 來替代. 

  shareReplay({ refCount: true, bufferSize: 1 })

refCount 默認是 false, true 就是 refCount 咯, false 的話它算一半的 refCount 

它依然具有第一個 sub 它就 connect 但是沒有因為 0 而 disconnect. 這點滿特別的哦 

鼓勵大家讀讀下面這篇, 其實有些細節我沒搞太清楚,但是如果你遇到 bug 不妨認真看一看它們之前的微差.

https://itnext.io/the-magic-of-rxjs-sharing-operators-and-their-differences-3a03d699d255

 

在說說我的情況要怎么處理. 

在一個 component 里, 我的 html 需要 subscribe 這個流. 

而在 init 的時候我也需要這個 source value. 

但是在 init 的時候我只要拿一次就好了. 並不需要 watch. 

所以我一開始的寫法就是 await obs.sub.take(1).toPromise()

由於只是 await + take 1 所以等 ajax 回來后立馬就退訂了. 

這就導致后續的 html | async 的時候又去發 ajax 了. refCount 的原則是至少要有一個訂閱者在線.

那么解決方法有 2 個方向, 第一個就是我在 promise 之前寫一個訂閱. 指導 component destory. 這個訂閱不需要做什么,只是確保在線就好了. 

另一個方法是 refCount : false. 這樣它就不會因為沒有訂閱者在線而退訂。然后再 shareplay 之前放一個 take until 來退訂.當 component destory 的時候就觸發.

 

 

更新: 2020-03-30 

takeUntil vs unsubscribe 

調用 subscribe 后, 通常需要做一個 unsubscribe 的動作, 
因為 subscribe 會調用創建 observable, 內部通常會有 setinterval, event listening 之類的.
所以如果不 unsubscribe 監聽就無法被釋放. 
所以最好的方式就是退訂. 
但是寫起來不太優雅, 所以有些人會鼓勵用 takeUntil 來替代掉 unsubscribe
但是 takeUntil 是不一樣的東西,要用要小心 
takeUntil 不是退訂,而是結束一個流. 
unsubscribe 並不會有任何 complete 的事件觸發. 但是 takeUntil 會.
可以這樣理解 
stream$.pipe(
 takeUntil(stop$),
   switchMap()
).subscribe
其實我們 subscribe 的是 takeUntil 之后的流, 而 stop 觸發后, 這個流會 unsubscrube 之前的流,並自生會 complete.
所以上面這個寫法就和 unsubscrube 差很多了, unsubscrube 是從最低成到最上層.
takeUntil 是從某一層 unsubscribe 上層, 然后 complete 去下層.
所以才有人說要把 takeUntil 放到最后. 因為這樣才能符合退訂的效果
我個人認為不應該把 takeUntil 當成 unsubscrube 來用。因為這 2 個東西並不一樣。所以要清楚才用.
 
最后說說一退訂這個東西。 
組件內如果訂閱某個東西,那么最好就是在 destroy 的時候去退訂. 
如果組件自生有 subject 並不需要在 destory 的時候去 complete 
如果組件是通過 service 去訂閱, 那么要嘛把退訂方法 return 出來,要嘛把 onDestroy 事件傳進去. (這個就是 takeUntil 的用法了咯)
 
糾結: 什么時候可以不需要退訂 ? 
1. http
之所以說可以不需要退訂是因為當 http 回來后會調用 complete, 也算是退訂了. 
但是如果 http 比較慢回來,而這個時候我要退訂呢 ? 我完全不想執行 subscribe 了呢,依然是需要退訂才能實現. 
 
 
總結: 
兩種東西可以訂閱. 
1. Observable 
比如 fromEvent...
這些東西你 subscribe 它就去 add event listener 跑 setinterval 等
 
 
2. subject 
就是觀察者模式. 
一旦 subscribe 它就把方法存在 array 種. 
 
要釋放內存,可以從 2 邊。
源頭消失了.
比如 setinterval 跑到 10 就 complete. clear interval 那就沒有內存了咯. 
或者 subject 的 ref 被垃圾回收掉了. 
一旦源頭消失了,那么就再也不會有新的事件傳出去. (注意: 但之前的事件如果還沒有觸發完依然是會觸發的. 只是源頭沒了而已)
所以如果我在一個 component 內 call 一個 http 我沒有去 unsubscribe 就 destroy 這個 component 
那么內存是不會有問題的. 但是我的 subscribe 方法依然會被觸發. 
form valuechange 也是同樣的, 我沒有退訂並不會有遺漏,因為 formControl 被和 component 一起被垃圾回收掉了.
 
另一種方式是退訂
從結尾去做. 退訂是王道,setinterval 被清楚了. array 被 splice 了 
最后的觸發也直接停掉了. 
從上面可以看出來 takeUntil 其實是用了第一種方式. 所以為什么我們說一定要放在結尾才能模擬的出來退訂的效果. 
 
  
   
 
 

 

更新 : 2020-03-28

swithMap 里的 share replay 
const s = new BehaviorSubject('s1');
const s2 = new BehaviorSubject('s2');

const o = s.pipe(
  switchMap(v => {
    return timer(300).pipe(
      tap(v => console.log('dada'),
      shareReplay({ bufferSize: 1, refCount: true })
    ));
  }),
   shareReplay({ bufferSize: 1, refCount: true }) // 要放哦
);

o.subscribe(v => console.log(v));
s2.next('z');
o.subscribe(v => console.log(v));

里面 share 沒啥用,關鍵是外面要 share,不然 dada 依據跑.

 

更新: 2020-02-12

一旦 Subscription 被 unsubscribe 后, 千萬不要在 add 了, 它是不會觸發的

const s = new Subject();
const a = new Subscription();
a.unsubscribe();
a.add(s.subscribe(v => console.log('done')));
s.next('aaa'); // 不會觸發

我經常會開一個 subscription 然后把所有 sub add 進去,等到 destroy 的時候一次毀掉. 無意間發現原來有上面這種情況。

 

更新: 2020-01-19

小心 share replay 泄漏

refer : 

https://blog.strongbrew.io/share-replay-issue/

https://blog.jerry-hong.com/series/rxjs/thirty-days-RxJS-24/

先看看歷史...

https://stackoverflow.com/questions/47793518/share-operator-that-doesnt-unsubscribe

https://medium.com/angular-in-depth/rxjs-whats-changed-with-sharereplay-65c098843e95

obs 一但被訂閱, 方法就會被調用. 如果不希望每次都重新調用,我們可以把結果存起來.

這時候就需要 subject 的幫忙了. 

new subject 讓往后的人都訂閱 subject, subject 和 obs 不同,它是觀察者模式, 不像 obs 每次重新 run 方法.

接下來就是把一開始的 obs 連接上 subject, 下面這長很好解釋了

 

 source.connect() 會返回一個退訂方法作為 disconnect 用.

通常有 2 種預期的使用效果

1. 當調用 multicast 后立馬執行 obs 然后把 value 存入 replay subject, 一有新值就 update. 往后的人就可以馬上獲取到值了. 

2. 當有人訂閱后才執行 obs, 然后一直保持更新, 當訂閱人數 0 時退訂 obs, 直到下一次訂閱才又開始. 

如果是第一種情況那么就直接 connect 就行了.

第 2 種的做法是通過 refCount()

好了,說主題 shareReplay, 它采用的是第一種, 以前用的是第 2 種. 所以會有點混亂.

所以最好呢,是我們在調用的時候自己 set 一下, 確保它是你要的, 比如

shareReplay({ refCount: false })

refCount false 意味着 obs 的 subscribe 不會因為 subject 退訂而退訂. 它會一直等到 obs complete 

這很容易產生泄露. 所以建議還是用 refCount 的好哦. 

refCount 則又可能導致 last value 丟失, 這個也是要多多留意一下, 當然如果 obs 是源頭這個就不會發生,如果源頭是 subject 就有可能啦.

 

更新: 2019-12-12

finalize
以前都只是拿它當 c# try catch finally 用。今天懂多一點了。
因為 ng form 的一個 bug ! 
v2.4 的 bug, 到 v9.0 還沒有 fix ... 唉...
work around  就是用來這個 finalize. 
首先要知道一個流, 如果發生了 error, 那么它就 stop 了, 這時候 subject.isStopped 是 true
complete 也會讓一個 stream stop 掉. 還有 unsubscribe 也是. 
而 finalize 就是在一個流 stop 掉的時候被調用的. 

 

 這位朋友的 work around 很聰明丫, 利用了 timer + finalize

timer 會觸發 complete 然后 subscribe 會先執行, 最后 finalize 更新 status 多一次, 就觸發了 emit 

 

 

 

 

更新: 2019-11-24

startWith 和 pairwise 

s.pipe(
  startWith('c'),
  map(v => v + 1),
  tap(v => console.log(v)), // c1
  startWith(null),
  pairwise(),
  tap(v => console.log(v)), // [null, c1]
).subscribe(([before, after]) => {
  // console.log(before, after);
});

2 個點要留意

第一,pairwise 需要 2次值才會觸發. 所以 startWith('c') 到 pairwise 就被吃掉了. subscribe 不會觸發

所以需要 startWith(null) 來喂它一次. 

第二, startWith 的次序. 最后一個 tap 的值是 [null, c1] 而不是 [c1, null] 

startWith('c1'),
startWith('c2'),
tap(v => console.log(v)), // [c2, c1]

要記住哦。

 

 

 

 

 

更新: 2019-07-18

unsubscribe vs complete 

在用 ng 的時候, 我們會糾結什么時候要調用 unsubscribe, 因為據說 angular 會幫我們處理...

其實最好是每一次都調用 unsubscribe. 比如下面這個例子, setimeout 代表 component destroy 

當 destroy 時,即使我 subject.complete(), 我也無法阻止 tap 的觸發. 所以還是 unsubscribe 妥當一些.

按邏輯講, 當 component destroy 我們是取消我們對外部的監聽, 意思是我們不再處理了, 而 complete 則是它不再發送了. 

它不再發送, 和我不再處理是 2 個概念. 它不發, 但是我手上的工作還是得做完, 我不處理是我立馬停掉手上工作. 

const s = new Subject();
const o = s.asObservable();
const sub = o.pipe(delay(3000), tap(() => console.log('tap'))).subscribe(() => {
  console.log('done');
});
s.next('dada');
setTimeout(() => {
  s.complete();
  // sub.unsubscribe();
}, 1000);

 

 

更新 : 2019-06-2

defer 用於延后一個 function 執行. 當 subscribe 后才被執行, 通常用於做 promise retry 

比如我有一個 promise 方法

async function getValueAsync(ok: boolean): Promise<string> {
    return new Promise((resolve, reject) => {
        console.log('run');
        setTimeout(() => {
            if (ok) {
                resolve('dada');
            }
            else {
                reject('fail');
            }
        }, 3000);
    });
}
from(getValueAsync(false)).pipe(
    retry(1)
).subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

如果我直接這樣跑是不能 retry 的. refer : https://stackoverflow.com/questions/33072512/rx-frompromise-and-retry 這里有解釋 

用 defer

defer(() =>  getValueAsync(false)).pipe(
    retry(2)
).subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

這樣就可以了

 

 

 

更新 : 2018-03-12 

學 rxjs 最好的就是看官網的文檔,解釋的很清楚. 

http://cn.rx.js.org/manual/overview.html#h39

https://rxjs-cn.github.io/learn-rxjs-operators/

function subscribe(observer) {
  var intervalID = setInterval(() => {
    observer.next('hi');
  }, 1000);

  return function unsubscribe() {
    clearInterval(intervalID);
  };
}
var unsubscribe = subscribe({next: (x) => console.log(x)});
var unsubscribe = subscribe({next: (x) => console.log(x)});
// 稍后:
unsubscribe(); // 清理資源

上面這一段代碼讓我明白了幾個重點

1. 惰性 (當 observable 被創建時,沒有 subscribe 是不會開始運行的, 因為 observable 就像函數調用 )

2. 每一個 subscribe 是獨立的 

3. observable 和常用的 addEventListener 不同,它不會吧所有的 observer 保存在列表里頭. 

 

而 subject 則是道道地地的 addEventListener, 它會保存所有 observer 在列表里. 

而把它們結合的方式就是 observable.subscribe(subject); 

因為 subject 就是一個 observer 擁有 next 方法. 

這就是 rxjs 其中 2 大核心, observable and subject 

至於其它的 subject 還有一堆的 operator, create observable 等,都是基於這 2 個核心的擴展而已. 

 

 

更新 : 2017-11-14 

最近從新看了 30 天 rxjs, 這里補上一些筆記.

順便提一下, ng 5.x 開始 rxjs 的寫法換掉了 

參考 : https://github.com/ReactiveX/rxjs/blob/master/doc/lettable-operators.md

import { combineLatest } from 'rxjs/observable/combineLatest';
import { catchError, take } from 'rxjs/operators';

combineLatest(pendingEmitters).pipe(
    take(1),
    catchError(() => {
        reject();
        return '';
    })
).subscribe(() => {
    resolve();
});

不像 jquery 那樣串連了. 

 

1.concat 

https://ithelp.ithome.com.tw/articles/10187520

concat 是把多個 observeable 合並起來, 其特色是只有前面一個 observeable complete 了后邊的 observeable 才開始起作用.

concat(o1,o2,o3), o1 沒有 complete 的話, o2 怎么叫都不會觸發.

另一個說法就是 concat 先 subcribe o1, 然后等到 o1 completed 后再去 subscribe o2, 一直到完, 那它自己也就 completed 了.

concat 是可以調用的 import { concat } from 'rxjs/observable/concat';

 

2. merge

https://ithelp.ithome.com.tw/articles/10187520

它和 concat 都是用來合並的. 區別是它不需要等 complete, 任何一個 observable 觸發都會有效果. 

換句話說就是 merge 會直接 subcribe 所有的 obserable 不像 concat 那樣會等 completed.

merge(o1,o2) o1 沒有 complete, o2 叫一樣有效.

 

3.concatAll

concatAll 屬於 operators 

o1.pipe(map(_ => o2),concatAll()) 

每一次 o1 叫,都會產生多一個 o2 

比如 o1 叫了 3 次 , 那么就有 3 個 o2 

concatAll 就是把這 3 個組合起來( 每一次 o1 叫, 都會 push 新的 o2 去這個 array ) 然后 concat(o2,o2,o2),后續的步驟就和 concat 一摸一樣了. 所以它起到了打平和 concat 的作用.

 

4. mergeAll

mergeAll 和 concat 是同一個原理只是, 最后不是用 concat 而是用 merge(o2,o2,o2);

另外, mergeAll(2) 可以傳入一個變量去控制允許並發的數量 

比如你輸入 2,  那么 a 叫了 3次, 你有 merge(o2,o2,o2), 第3個 o2 叫的時候本來是會有效果的,但是由於限制了 2, 那么只能等第一或第二個 o2 complete, 第 3 o2 叫才有效果了。確保同一時期只有 2 個.

note mergeAll(1) === concatAll()

 

5.switchAll

沒有 switch 只有 switchAll 

它和 concatAll, mergeAll 的區別是, o1 每一次叫, 永遠只保留最新的 o2, 之前的 o2 統統丟掉. 

 

concatAll, switchAll, mergeAll 

https://ithelp.ithome.com.tw/articles/10188325

這 3 個都是用來打平 obs 

concatAll 上面講了重點是會等上一個 complete 才去下一個 

switchAll 則是一旦有新的一個來,舊的就忽略掉. 

mergeAll 則是並發處理.

 

6 concatMap, switchMap, mergeMap 

https://ithelp.ithome.com.tw/articles/10188387

就是 map() + switchAll(), map() + concatAll(), map() + mergeAll() 的縮寫而已. 

還有個好處是它可以傳入一個方法 (a,b,c,d) => next, 可以獲取到 o1,o2 的值然后返回下一個值. 

 

6.5 exhaustMap 

這個和 concatMap 很像. 唯一的區別是, concat 等待第一個 complete 了以后會去 subscribe 下一個(第二個)

而 exhasust 呢, 它會去 subscribe 下下下下一個 (最后一個), 

 

7. combineLatest 

https://ithelp.ithome.com.tw/articles/10187638

它也是用來做合拼處理的,

它需要等 "每一個" observable 至少有一個開始值之后才開始工作. 這和 merge 不同, merge 不需要等

它每一次觸發都可以獲取到所有 observable 當前的值, 這和 merge 不同, merge 每一次觸發只能獲取一個 observable 的值.

 

8.withLatestFrom

https://ithelp.ithome.com.tw/articles/10187638

它和 combineLatest 一樣,唯一的區別是, 它只有 main observable next 值時才會觸發 callback , 其它 observable next 只是記入值而已. 

 2019-06-14 補上一個例子, form value update 但是只有 button click 的時候才 emit

this.formGroup = this.formBuilder.group({
  date: [null, Validators.required]
});

const f = this.formBuilder.group({
  name: [''],
  age: [11]
});
const button = new Subject();

button.pipe(withLatestFrom(f.valueChanges)).subscribe(v => {
  console.log(v);
});


f.setValue({ name: 'dada', age: 15 });
f.setValue({ name: 'dada', age: 18 });
button.next('dada');
button.next('dada2');

 

 

9. zip 

https://ithelp.ithome.com.tw/articles/10187638

它也是合並. 

它的關鍵是順位, 比如 2 個 observables, 2 個都 next 1次 的時候就會 callback 並得到 2 個的第一個值, 如果 2 個不平均, 比如一個 next 了 10 次, 另一個 next 2 次, 那么 callback 就只有 2 次. 

等第 2 個 next 第 3 次時, callback 就會得到 第1和第2個的第3次 next 的值. 

 

10. scan 

https://ithelp.ithome.com.tw/articles/10187882

就是 js 的 reduce, 區別在於它總是返回 observable. 每一次 next 觸發, callback 都可以獲取上一次的值 + 現在的值做一些處理, 返回下一個值. 

還有一個重點就是 第一次的 emit 會直接 skip 掉 scan 因為這時候還沒有 prev value, 通過第二個參數就可以放入初始值了.

這個初始值是不會觸發的,不怕。

 

 

11. buffer 

https://ithelp.ithome.com.tw/articles/10187882

buffer, bufferTime, bufferCount 

它用於累積 next 等待另一個 obserable next 的時候才觸發 callback 

o1.pipe(buffer(o2))...   o2 next 的時候 o1 的 callback 才觸發, 並且返回期間所有的 o1 next 值. 

 

12. delay & delayWhen

https://ithelp.ithome.com.tw/articles/10187999

delay(1000) 就是延遲 1 秒咯, 如果我們要每一次都不用的話. 

就用 delayWhen(v => empty().delay(v + 1000)); 必須返回一個 observable

 

13.debounce & debounceTime 

https://ithelp.ithome.com.tw/articles/10188121

它和 buffer 有點像, 都是會累積值, 但是區別在於, 當一個新值被 next 進來, 它會把之前的值釋放掉, 並且時間從新開始算. 

 

14. throttle & throttleTime

https://ithelp.ithome.com.tw/articles/10188121

它用於限制一個時間內, 最高觸發的頻率. 比如 throttleTime(1000) 就限制了一秒內不管 next 幾次, 只有第一次會 callback 往后的都不會, 直到下一秒開始. 

 

14.1 auditTime

它和 debounceTime 很像,只是 debounceTime 會 clear 掉上一次,而這個不會

3個分別的使用 : 

用戶 keydown 時我想監聽 

1. debounceTime 1000, 用戶連續 keydown 我等 1 秒, 一秒中內用戶又 keydown 了,我重新記時,再等 1 秒... 一直到用戶 1 秒內再也沒有 keydown 我才觸發

2. auditTime 1000 用戶連續 keydown, 我等 1 秒,一秒中內用戶又 keydown 了, 但我 "不" 重新記時了, 1 秒后我就觸發, 也就是說, 用戶 1 秒內按多少次,我都當成 1 次 並且在 1 秒后才觸發. 

3. debounceTime 1000, 用戶連續 keydown, 我直接觸發, 一秒中內用戶又 keydown 了, 我不理,直到 1 秒后 

特色

debounceTime 重新幾時, 一直不觸發

auditTime 等..觸發

debounceTime 觸發...等

 

 

15.distinct, distinctUntilChanged

https://ithelp.ithome.com.tw/articles/10188194

它就是我們熟悉的 distinct, 如果值相同就忽略 callback 

distinct((x) => { return x.value }); 可以提供一個取值的方法,對比估計是用 === 

distinct 會把所有的值都存起來做對比, distinctUntilChanged 的區別是它只會存最后一次的值做對比.

 

16.catchError, retry, retryWhen, repeat

https://ithelp.ithome.com.tw/articles/10188263

catchError 是捕獲錯誤, 

catchError((error, obs) => obs); 返回第 2 個參數 obs 可以實現從跑. 

retry 就是做了上述的事情, 而 retry 多了一個可以指定次數, retry(3) 從試 3 次 

retryWhen(errorObs => errorObs.delay(1000)) retryWhen 可以讓我們操作更多,比如間隔多久才 retry 下一次等等. 

repeat 和 retry 是一樣的,區別在於 retry 必須在 error 時才會有, 而 repeat 則是不管有沒有 error 都會執行. 

 

 

17.  ReplaySubject, BehaviorSubject

behavior 代表一個保持值的 subject, 一旦訂閱馬上會觸發 callback 並獲取到最新的值. 即便不訂閱也可以調用 .value 來獲取當前值.

replay 會緩存之前的 next 一旦新的訂閱加入,就會 playback 之前所有的 next 值. 

 

18. race 

race(s1, s2, s3) 的意思是, 哪一個先觸發,那么之后我就 watch 這一個罷了,另外 2 個 subject 就不理會了。

 

19. mapTo

對比 map, mapTo 的參數是一個值,而不是獲取值的方法, 所以值就只有一個,要留意哦。

 

 

  

rxjs 在處理 dom 事件時是非常好用的. 

步驟一般上是 獲取所有 element, 建立所有的 event 

然后就是各做 rxjs operator 對 event 和 element 的處理. 

參考 : https://ithelp.ithome.com.tw/articles/10187756

 

 

 

 

  

更新 : 2017-10-14 

import 'rxjs/add/observable/combineLatest'; //每一個至少一次后才開始觸發(無需 completed), 一個一次可以獲取大家最新的 value
import 'rxjs/add/observable/forkJoin'; // 類似 promise.all 所有 observable 需要 最少next 一次 and completed 才會觸發,所以只觸發一次獲取所有 value
import 'rxjs/add/observable/merge'; // 用來監聽多個 click event 一個一次, merge(a,b) 如果 a,b 都有初始值, 那么會馬上觸發 2 次.
import 'rxjs/add/observable/concat'; // 和 merge 一樣, 唯一不同的是它需要第一個 completed 才會觸發第 2 個, 第一還沒 completed 第 2 next 都不會叫哦

 

更新 2017-05-17 

今天被 toPromise 給騙了. 

我一直以為, 所有的 "流" 都可以輕松的轉成 await stream.toPromise();

后來我發現有個流一直沒有反應 

let subject = new BehaviorSubject('a');
let o = subject.asObservable();
o.toPromise().then(() => console.log('pro')); //不會跑
setTimeout(()=> {
  subject.next('haha');
  //subject.complete();
}, 1000);

上網找了一下才發現,原來 toPromise().then 必須是 completed 才會跑

所以上面的 subject.complete() 必須要打開才行. 這也意味着 toPromise 只能用在一次的 async 中, 如果是要持續 subscribe 的情況下請使用 .subscribe() 

 

 

 

更新 2017-04-02 

Subject 的主要功能就是觀察者模式. 

我們可以隨時寫入值,完全自己操控. 

但是有時候我們希望它依賴於其它 stream 那么我們使用 connect 

let s1 = new Subject();
let o1 = s1.asObservable(); //我們想以來的 stream 

let s2 = new Subject();
s2.subscribe(v => console.log(v));

o1.subscribe(v => s2.next(v),v => s2.error(v)); //第1種寫法,超麻煩
o1.subscribe(s2); //第2種,可是我沒有要馬上 subscribe 的話呢 ?  
let connector = o1.multicast(s2); //第3種
connector.connect();

s1.next("value");

 

 

更新 2017-03-31

好文,力推 : http://ithelp.ithome.com.tw/articles/10189028?sc=iThomeR
再談談 cold & hot 

observeable 是 default cold 的. 

code 的意思是說, 當有多個 subscribe 時,每一個都是一條獨立的鏈.

比如 http 多個 subscribe 的話,你會發現你的 request 會發了好幾個.

hot 的意思則是每個 subscribe 共享一個鏈, 不管你什么之后插入subscribe 你都不會從新開始. 

把一個 cold 變成 hot 的方法是使用 Subject 充當中間人. 

具體看這 3 篇就明白了 

http://ithelp.ithome.com.tw/articles/10188633

http://ithelp.ithome.com.tw/articles/10188677

http://ithelp.ithome.com.tw/articles/10188750

這里介紹一下 ReplaySubject

Subject.subscirbe() , 不會馬上執行, 因為要等待下一個 Subject.next

BehaviorSubject.subscribe() , 馬上執行, 因為里面一定會有值. 

ReplaySubject.subscribe(), 不一定馬上執行,如果曾經 .next 過才會執行 

multicast, refCount,publish,share 的目的就是把 cold 轉換成 hot .

其原理就是使用了 Subject 系列. 

multicast 后來被 publish 取代了. publish 對應 subject 所以有 publishBehavior, pulishReplay 

refCount 是 connect 的意思,就是把 observer 鏈接上 subject 的動作. 

由於 publish().refCount() 太經常用到,所以發明了 share 寫的更快了嘻嘻。

在使用 ng 的 http 時要注意. observable 是 cold 的。但很多情況下我們更希望它是 hot 的. 

每一次的 subscribe 應該只返回同一個結果, 而這個 http 只發一次請求. 

這時我們需要這樣寫 : http.get().publishReplay(1).refCount() 

publishReplay(1) 之后的每一個 subscribe 都會得到同一個資料了.

 

 

更新 : 2017-03-27 

什么時候需要 unsubscribe ? 

http 不需要, router param 也不需要. 

下面說說幾個情況 

1. Subject.complete 之后, 所有的 subscribe 都不會再觸發, 新的 subscribe 也加不進來了. 

所以如果我們知道訂閱的 Subject 之后會被 complete 那么我們可以無需擔心 unsubscribe 的問題 

2. 使用 async/await toPromise 可以避開 unsubscribe 的問題. 

3. 可以使用 .first(判斷) 表示什么時候開始拿然后停 (比如一個值需要等待 ajax)

4. 使用 takeWhile(判斷) 來決定什么時候取消訂閱. 

5. 使用 OnDestroy 

 

 

更新 : 2017-03-18

Observable, 
Subject,
BehaviorSubject
的區別和用法 
 
當我們手中有一個 Observable, 我們可以去監聽它,那么往后的事情發生我們都會知道。
但是, 之前發生的事情我們都沒辦法知道. 我們也沒辦法用它廣播一個新的事件。
Observable 能做的事情, Subject 都可以做到. 而 Subject 多了一個能力,就是可以用它廣播一個新事件出去. 
所以如果你站在一個監聽者的角度, Subject 和 Observable 沒啥區別. 你依然沒辦法知道過往發生的事情. 也只有在下一次事件廣播時才會被通知. 
Subject 能做的事情 BehaviorSubject 都可以做,能監聽也能廣播. 最大的特點在於它能知道過往的事情。
當你手中有一個 BehaviorSubject 你可以馬上調用 .value 獲取當前的值, 你訂閱它的話,你也會立馬收到一個事件,而不像 Subject 或 Observable 一直傻傻等下一次廣播才能得到值. 
 
自己選擇用吧.
 
ng 的 http.get 返回的是 Observable, 你一監聽它你會馬上獲得響應, 從這里我們就可以推斷出沿着這條鏈往上走最終就是一個 BehaviorSubject. 因為只有 BehaviorSubject 被監聽的時候才會馬上得到響應。

 

2016-09-23

RxJS 博大精深,看了好幾篇文章都沒有明白. 

范圍牽扯到了函數響應式開發去了... 我對函數式一知半解, 響應式更是第一次聽到... 

唉...不過日子還是得過...混着過先唄

我目前所理解的很淺, 大致上是這樣的概念.

1.某些場景下它比 promise 好用, 它善於過濾掉不關心的東西. 

2.它是觀察者模式 + 迭代器模式組成的 

3.跟時間,事件, 變量有密切關系

4.世界上有一種東西叫 "流" stream, 一個流能表示了一段時間里,一樣東西發生的變化. 

  比如有一個值, 它在某段時間里從 "我" 變成 "你" 再變成 "他". 

  而我們可以對這個流進行觀察,所以只要它發生變化,我們就會發現然后做任何事情。

5.站在游覽器的角度, 服務器推送數據過來, 用戶操作界面, timer 都是我們關心的流.

好,來看例子. 

我們通過 new Subject 來創建流. 也可以使用 new EventEmitter 或者 BehaviorSubject. 這些都繼承了 Subject

EventEmitter 是 ng2 提供的

BehaviorSubject 可以填入初始值

import { Subject } from "rxjs/Subject";
private textEmitter: Subject<string> = new Subject(); 

要改變流中的值,我們使用 .next(value), 這個是迭代器的概念咯

keyup(value : string)
{
    this.textEmitter.next(value);
}

那么訂閱是這樣的 

ngOnInit() {
    this.text$ = this.textEmitter
        .debounceTime(500)
        .distinctUntilChanged()
        .switchMap(v => this.getDataAsync(v));

    this.text$.subscribe((value) => {
        console.log(value);
    });            
}

input keyup 性能優化, 我們通常會寫一個 timeout + cleartimeout 的方式, 這個 debounceTime 就是干這個的 

流更新結束后 500ms 才會通知觀察者 

distinctUntilChanged 是說只有當值和上一次通知時的值不一樣的時候才通知觀察者 

.map 和 .switchMap 都是用來對值進行處理的, 這個和 array.map 概念是一樣的

而 .map 和 .switchMap 的區別是 .swichMap 處理那些返回 Observeable 的值 

getDataAsync(value : string): Observable<string>
{        
    let subject = new Subject();
    setTimeout(() => {
        console.log("after 2second");
        subject.next(value + "final");
    }, 2000);
    return subject;
}

如果我們使用 map 的話,它會直接返回 "subject" 這個對象, 而如果用 switchMap 它會返回這個 subject 對象的響應值.

<input type="text" #input (keyup)="keyup(input.value)" />
<p>{{ text$ | async }}</p>

ng2 提供了一個 async Pipe, 它會監聽左邊這個 text$ stream. 后面加一個 $ 符號通常用來表明這是一個 stream.

還有一個常用的功能是 combineLatest

就是可以同時監聽多個流,只要其中一個有變動,那么所有的最新值都會發布出去, 可以用來實現依賴屬性.

這里需要注意一點 combineLatest 的所有流都必須有值, 不可以是一個從來都沒有 next 過的 Observable 不然它就不會運行了.

最簡單的方法是使用 observable.startWith(null) 讓它有一個值. 

@Component({
    selector: "compute-property",
    template: ` 
        <input type="text" #input1 (keyup)="text1.next(input1.value)" />
        <input type="text" #input2 (keyup)="text2.next(input2.value)" />  
        {{ result$ | async }}                 
    `
})
export class ComputePropertyComponent implements OnInit {

    text1: BehaviorSubject<string> = new BehaviorSubject<string>("a");
    text2: BehaviorSubject<string> = new BehaviorSubject<string>("b");
    result$: Observable<string>;
    constructor() {}
    
    ngOnInit() {
        this.result$ = Observable.combineLatest(this.text1, this.text2).map(values => {          
            return values[0] + " " + values[1];
        }); 
    }     
}

還有 bufferCount, bufferTime 也是常用到

text: Subject<number> = new Subject<number>();
    
ngOnInit() {
    this.text.bufferCount(2)
        .subscribe(v => console.log(v)); //[v1,v2] 存夠 count 了就發布

    this.text.bufferTime(2000)
        .subscribe(v => console.log(v)); //[v1,v2,...]把 2 秒內的所有 next value 放進來
}

Observable.of 可以簡單的返回一個默認值 

Observable.of<string>("").subscribe(v => console.log(v));

rxjs 整個文檔非常大,要按需加載.

通常做法是為項目開一個 rxjs-operators.ts 

import 'rxjs/add/observable/throw'; 
import 'rxjs/add/observable/combineLatest'; 
import 'rxjs/add/observable/from'; 
import 'rxjs/add/observable/of'; 
import 'rxjs/add/operator/catch';
import 'rxjs/add/operator/debounceTime';
import 'rxjs/add/operator/distinctUntilChanged';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/switchMap';
import 'rxjs/add/operator/toPromise';
import 'rxjs/add/operator/startWith'; 
import 'rxjs/add/operator/bufferCount';
import 'rxjs/add/operator/bufferTime';

放入常用的方法 

然后再 app.module.ts 里面導入它 

import './rxjs-operators'; 

 

Hot or cold , share or not

refer : 

http://blog.thoughtram.io/angular/2016/06/16/cold-vs-hot-observables.html

http://blog.csdn.net/tianjun2012/article/details/51351823

1. by default, observable is not share.

let sub = new Subject();
let obs = sub.map(v => {
    console.log("ajax call"); 
});
obs.subscribe(v => console.log("subscribe 1"));
obs.subscribe(v => console.log("subscribe 2"));         
sub.next("value");

ajax 發了 2 次. angular2 的 Http 也是 not share 哦. 

所以當我們有多個 subscribe 的時候要想一想是否我們需要 share 

let obs = sub.map(v => {
    console.log("ajax call"); 
}).share();

調用一個 share 方法就可以了,或者是 

let obs = sub.map(v => {
    console.log("ajax call"); 
}).publish().refCount();

效果是一樣的. 

 

by default, observable is cold.

意思是說只有在 subscribe 出現了以后才會啟動. ( 當第一個 subscribe 出現時, observable 就會立刻啟動了哦 ) 

let sub = new Subject();
let obs = sub.map(v => {
    console.log("ajax call");
});
sub.next("aaa");
//obs.subscribe(v => console.log("subscribe 1"));
//obs.subscribe(v => console.log("subscribe 2")); 

ajax 不會觸發. 

如果我們希望它在沒有 subscribe 的情況下觸發的話, 可以這樣寫. 

let sub = new Subject();
let obs = sub.map(v => {
    console.log("ajax call");
}).publish();
obs.connect();
sub.next("aaa");

至於什么情況下使用哪一種,我還沒有實戰,以后再說.

多一個例子解釋: 

let obs = Observable.create(observer => {
    console.log("observer run");
    observer.next(Date.now());
});
obs.subscribe(v => console.log("1st subscriber: " + v));
obs.subscribe(v => console.log("2nd subscriber: " + v));     
//observer run
//1st subscriber: 1474649902498
//observer run
//2nd subscriber: 1474649902501 

no share. 所以 observer run 了 2 次. 

let obs = Observable.create(observer => {
    console.log("observer run");
    observer.next(Date.now());
}).share();
obs.subscribe(v => console.log("1st subscriber: " + v));
obs.subscribe(v => console.log("2nd subscriber: " + v));     
//observer run
//1st subscriber: 1474650049833

share 了, 所以 observer only run 1 次.

cold, 所以當第一個 subcribe 出現后 observer 立刻運行 -> .next 更新了 value -> 第一個 subcribe callback 被調用 -> 整個過程結束 -> 然后第2個 subcribe 注冊 .. 由於是 share 所以 observer 沒有載被觸發. 第2個 subscribe callback 沒有被調用. 

延后觸發的做法 : 

let obs = Observable.create(observer => {
    console.log("observer run");
    observer.next(Date.now());
}).publish();
obs.subscribe(v => console.log("1st subscriber: " + v));
obs.subscribe(v => console.log("2nd subscriber: " + v));
obs.connect();
//observer run 
//1st subscriber: 1474650370505 
//2nd subscriber: 1474650370505

可以看到 .publish() 之后, subscribe 不再能激活 observer 了,而必須手動調用 .connect() 才能激活 observer. 

這幾個例子只是為了讓你了解它們的玩法.

小結:

observer default is cold and not share.

cold 表示只有 subscribe 出現 observer 才會被激活.

not share 表示每一個 subscribe 都會激活 observer 鏈. 

 

常用 : 

1. finally 的使用

import 'rxjs/add/operator/finally';

this
.http.get( "http://localhost:58186/api/products", { headers: new Headers({ "Accept": "application/json" })} ).finally(() => { console.log("finally"); //不管 success or error 最后都會跑這個 }).subscribe(response => { console.log("success"); }, response => { console.log("fail"); }, () => { console.log("success final"); }); //result : //success -> success final -> finally //fail -> finally

 

2. 錯誤處理 throw catch  

 
         

import 'rxjs/add/operator/catch';
import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/throw';


this
.http.get( "http://localhost:58186/api/products", { headers: new Headers({ "Accept": "application/json" }) }) .map(r => r.json()) .catch((r) => { if ("1" == "1") { //do something ... return null; //catch 了在返回真確 } else { return Observable.throw("error"); //catch 了繼續返回錯誤 } }) .subscribe( r => console.log(r), r => { console.log("fail") } );

 

3. previous / current value 

用 .pairwise()

let userSubject = new BehaviorSubject<string>("default value");
let user$ = userSubject.asObservable().pairwise();
user$.subscribe(([before, after]) => { console.log(before), console.log(after); });  
userSubject.next("super");
userSubject.next("ttc");
//result : 
//["default value","super"]
//["super","ttc"] 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM