import React from 'react'; import { Observable } from 'rxjs'; const FlowPage = () => { const onSubscribe = observer => { observer.next(1); observer.next(2); observer.next(3); } // 創建一個發布者 // Observable是一個特殊類,它接受一個處理Observer的函數 // 而Observer就是一個普通的對象, // 對於Observer對象要求:它必須包含一個名為next的屬性(是一個函數) // next用於接收被推過來的數據 const source$ = new Observable(onSubscribe); // 參數就是觀察者對象 const theObserver = { next : item => console.log(item) } // 一個觀察者調用Observable對象的subscribe函數 source$.subscribe(theObserver) return <h1>rxjs學習</h1>; }; export default FlowPage;
跨越時間的Observable
// 推送數據可以有時間間隔。 const onSubscribe = observer => { let number = 1; const Timer = setInterval(()=>{ observer.next(number++) if(number > 3){ clearInterval(Timer) } },1000) }
永無止境的Observable
假如我們不中斷這個程序,讓它一直運行下去這個程序也不會消耗更多的內存。
Observable對象每次只吐出一個數據,然后這個數據就被Observer消化處理了,不會存在數據的堆積。
const onSubscribe = observer => { let number = 1; const Timer = setInterval(()=>{ observer.next(number++) },1000) }
Observable的完結
Observer時刻准備着接收數據,如果沒有推送數據了,相關的資源不會被釋放,為了讓Observer明確知道這個數據流已經不會再有新的數據,
需要調用Observer的complete函數。
import React from 'react'; import { Observable } from 'rxjs'; const FlowPage = () => { // 推送數據可以有時間間隔。 const onSubscribe = observer => { let number = 1; const Timer = setInterval(()=>{ observer.next(number++) if(number > 3){ clearInterval(Timer) observer.complete(); } },1000) } const source$ = new Observable(onSubscribe); const theObserver = { next : item => console.log(item), complete:()=> console.log(' no more data') } source$.subscribe(theObserver) return <h1>rxjs學習</h1>; }; export default FlowPage;
observable的錯誤處理
// 一旦進入出錯狀態,observable就終結了,就不會再調用后面的next()和complete() // 調用了complete()函數終結,也不能再調用next()和error() const onSubscribe = observer => { observer.next(1); observer.error('something wrong!') observer.complete() } const source$ = new Observable(onSubscribe); const theObserver = { next : item => console.log(item), error: err => console.log(err), complete:()=> console.log(' no more data') } source$.subscribe(theObserver)
Observable簡潔形式
// 為了讓代碼更加簡潔,沒有必要創建一個參數對象。 // subscribe也可以直接接受函數作為參數, // 第一個參數如果是函數類型,就會被認為是next // 第二參數被認為是error // 第三個參數complete source$.subscribe( item => console.log(item), err => console.log(err), ()=> console.log(' no more data') )
Observable退訂
// 返回一個對象,並且對象包含了unsubscribe函數,表示退訂 const onSubscribe = observer => { let number = 1; const Timer = setInterval(()=>{ observer.next(number++) },1000) return { unsubscribe:()=>{ clearInterval(Timer) } } } const source$ = new Observable(onSubscribe); // subscribe函數的返回結果存為變量subscription const subscription = source$.subscribe( item => console.log(item), err => console.log(err), ()=> console.log(' no more data') ) // 3.5s后調用退訂 // 3.5s后不再接受到被推送的數據,但是Observable的source$資源並沒有終結 // 因為始終沒有調用complete,只不過再也不會調用next函數了 setTimeout(()=>{ subscription.unsubscribe() },3500)
修改以下代碼,便於觀察
// 返回一個對象,並且對象包含了unsubscribe函數,表示退訂 const onSubscribe = observer => { let number = 1; const Timer = setInterval(()=>{ console.log('in onSUbscribe ',number) observer.next(number++) },1000) return { unsubscribe:()=>{ // clearInterval(Timer) } } }
執行結果如下:
由此可見,Observable對象source$在退訂以后依然在不斷調用next函數,
但是已經斷開了source$對象和Observer的連接。
所以onSubscribe中如何調用next,observer都不會做出任何響應