RXJS Observable的冷,熱和Subject


一、Observable的冷和熱

 

Observable 熱:直播。所有的觀察者,無論進來的早還是晚,看到的是同樣內容的同樣進度,訂閱的時候得到的都是最新時刻發送的值。

Observable 冷:點播。 新的訂閱者每次從頭開始。

冷的Observable例子:

一開始有個訂閱者,

兩秒后又有個訂閱者,這兩個序列按照自己的節奏走的,不同步。每個流進行都會從interval的0開始。

console.log('RxJS included?', !!Rx);

const count$ = Rx.Observable.interval(1000).take(5);
const sub1 = count$.subscribe((val)=>{
  console.log(val);
});

setTimeout(function(){
  const sub2 = count$.subscribe((val)=>{
  console.log(val);
});
},2000);

熱的Observable例子

第二個訂閱者直接從2開始起,跟第一個訂閱者看到的內容是一樣的。

const count$ = Rx.Observable.interval(1000).take(5).share();
const sub1 = count$.subscribe((val)=>{
  console.log(val);
});

setTimeout(function(){
  const sub2 = count$.subscribe((val)=>{
  console.log(val);
});
},2000);

 

二、Subject

Subject即是觀察者Observer,也是被觀察對象Observable,同時實現了這兩個接口。

意味着

  • 一方面它可以作為流的組成的一方,輸出的一方。
  • 另一方面,它可以作為流的觀察一方,接收一方。

Subject分為ReplaySubject和BehaviorSubject。

ReplaySubject:這種Subject會保留最新的n個值

BehaviorSubject:是ReplaySubject的特殊形式。 保留最新的一個值

【20200529】

拿subject做一個observer觀察者,看Observable會丟什么東西出來,由它對外廣播出去。

再拿subject去訂閱兩個觀察者。

有n個observable去訂閱subject,但是subject只會發出一個訂閱的要求訂閱原始observable。

1、subscribe的等價寫法

subscribe 后面寫的一個函數,相當於語法糖,快捷方式,臨時創建冷一個observer對象。

默認情況應該是傳入一個observer對象

console.log('RxJS included?', !!Rx);


const counter$ = Rx.Observable.interval(1000).take(5);

const subject = new Rx.Subject();

const observer1 = {
  next: (val)=>{console.log('1: ' +val);},
  error: (err)=>{console.log('ERROR>> 1:'+ err);},
  complete: ()=>{console.log('1 is complete');}
}


const observer2 = {
  next: (val)=>{console.log('2: ' +val);},
  error: (err)=>{console.log('ERROR>> 2:'+ err);},
  complete: ()=>{console.log('2 is complete');}
}

//等價寫法
counter$.subscribe(val =>{console.log(val);});
counter$.subscribe(observer2); 

2、兩個observer ,兩次subscribe

console.log('RxJS included?', !!Rx);


const counter$ = Rx.Observable.interval(1000).take(5);

const subject = new Rx.Subject();

const observer1 = {
  next: (val)=>{console.log('1: ' +val);},
  error: (err)=>{console.log('ERROR>> 1:'+ err);},
  complete: ()=>{console.log('1 is complete');}
}


const observer2 = {
  next: (val)=>{console.log('2: ' +val);},
  error: (err)=>{console.log('ERROR>> 2:'+ err);},
  complete: ()=>{console.log('2 is complete');}
}

counter$.subscribe(observer1);

setTimeout(function(){
  counter$.subscribe(observer2);
},2000);
View Code

 

問題:需要在兩處執行subscribe,很多情況下是這樣的,定義好這些序列應該在什么時候被觸發,我執行執行一句subscribe(),兩個序列都會這么執行。這種情況下就需要用subject()。

3、subject

subject即使observable,因為它可以subscribe observer。

也是observer,因為它可以被observable subscribe。

 

console.log('RxJS included?', !!Rx);


const counter$ = Rx.Observable.interval(1000).take(5);

const subject = new Rx.Subject();


const observer1 = {
  next: (val)=>{console.log('1: ' +val);},
  error: (err)=>{console.log('ERROR>> 1:'+ err);},
  complete: ()=>{console.log('1 is complete');}
}


const observer2 = {
  next: (val)=>{console.log('2: ' +val);},
  error: (err)=>{console.log('ERROR>> 2:'+ err);},
  complete: ()=>{console.log('2 is complete');}
}

//不再用counter$去subscribe,用subject去subscribe, 
subject.subscribe(observer1);

setTimeout(function(){
  subject.subscribe(observer2);
},2000);

//定義好兩邊后,用counter$去subscribe
counter$.subscribe(subject);
View Code

 

一句執行counter$.subscribe(subject),把定義好的序列,包括等待2秒的序列全部完成了。

4,subject是一個hot observable

往流里推送新值

 第二個拿不到新值,因為第二個流訂閱的時候,兩個新值已經過去了。

5,ReplaySubject

replay把過去發生的事件進行重播。

ReplaySubject(2)把過去的2個事件進行重播。這樣observer1 subscribe的時候就可以看到10和11。

6、BehaviorSubject只記住最新的值

總有一個最新值,總記住上一次的最新值

console.log('RxJS included?', !!Rx);


const counter$ = Rx.Observable.interval(1000).take(5);

const subject = new Rx.BehaviorSubject();


subject.next(10);
subject.next(11);
const observer1 = {
  next: (val)=>{console.log('1: ' +val);},
  error: (err)=>{console.log('ERROR>> 1:'+ err);},
  complete: ()=>{console.log('1 is complete');}
}


const observer2 = {
  next: (val)=>{console.log('2: ' +val);},
  error: (err)=>{console.log('ERROR>> 2:'+ err);},
  complete: ()=>{console.log('2 is complete');}
}


//不再用counter$去subscribe,用subject去subscribe, 
subject.subscribe(observer1);

setTimeout(function(){
  subject.subscribe(observer2);
},2000);

//定義好兩邊后,用counter$去subscribe
counter$.subscribe(subject);
View Code

 

取值的時候,會取得到最新的data,盡管在取值的時候也就是subscribre的時候值已經發射完了,盡管時機已經錯失了還是能夠得到它上一次發射之后的最新的一個值。

 

 

三、Angular中對Rx的支持

大量內置Observable支持:如Http,ReactiveForms,Route等。

Async Pipe是什么?有什么用?

 

Observable需要subscribe 一下,成員數組變量等於Observable得到的值。

使用Async Pipe可以直接使用Observable,還不用去取消訂閱。

memberResults$: Observable<User[]>; 

 

本文作者starof,因知識本身在變化,作者也在不斷學習成長,文章內容也不定時更新,為避免誤導讀者,方便追根溯源,請諸位轉載注明出處:https://www.cnblogs.com/starof/p/10505617.html 有問題歡迎與我討論,共同進步。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM