rxjs簡單的Observable用例


import React from 'react';
import { Observable } from 'rxjs';

const FlowPage = () => {
  
  const onSubscribe = observer => {
    observer.next(1);
    observer.next(2);
    observer.next(3);
  }

  // 創建一個發布者
  // Observable是一個特殊類,它接受一個處理Observer的函數
  // 而Observer就是一個普通的對象,
  // 對於Observer對象要求:它必須包含一個名為next的屬性(是一個函數)
  // next用於接收被推過來的數據
  const source$ = new Observable(onSubscribe);

  // 參數就是觀察者對象
  const theObserver = {
    next : item => console.log(item)
  }

  // 一個觀察者調用Observable對象的subscribe函數
  source$.subscribe(theObserver)

  return <h1>rxjs學習</h1>;
};

export default FlowPage;

 

跨越時間的Observable

// 推送數據可以有時間間隔。
  const onSubscribe = observer => {
    let number = 1;
    const Timer = setInterval(()=>{
      observer.next(number++)
      if(number > 3){
        clearInterval(Timer)
      }
    },1000)
  }

永無止境的Observable

假如我們不中斷這個程序,讓它一直運行下去這個程序也不會消耗更多的內存。

Observable對象每次只吐出一個數據,然后這個數據就被Observer消化處理了,不會存在數據的堆積。

const onSubscribe = observer => {
    let number = 1;
    const Timer = setInterval(()=>{
      observer.next(number++)
    },1000)
  }

Observable的完結

Observer時刻准備着接收數據,如果沒有推送數據了,相關的資源不會被釋放,為了讓Observer明確知道這個數據流已經不會再有新的數據,

需要調用Observer的complete函數。

import React from 'react';
import { Observable } from 'rxjs';

const FlowPage = () => {
  
  // 推送數據可以有時間間隔。
  const onSubscribe = observer => {
    let number = 1;
    const Timer = setInterval(()=>{
      observer.next(number++)
      if(number > 3){
        clearInterval(Timer)
        observer.complete();
      }
    },1000)
  }
  const source$ = new Observable(onSubscribe);

  const theObserver = {
    next : item => console.log(item),
    complete:()=> console.log(' no more data')
  }

  source$.subscribe(theObserver)

  return <h1>rxjs學習</h1>;
};

export default FlowPage;

 observable的錯誤處理

// 一旦進入出錯狀態,observable就終結了,就不會再調用后面的next()和complete()
  // 調用了complete()函數終結,也不能再調用next()和error()
  const onSubscribe = observer => {
    observer.next(1);
    observer.error('something wrong!')
    observer.complete()
  }
  const source$ = new Observable(onSubscribe);

  const theObserver = {
    next : item => console.log(item),
    error: err => console.log(err),
    complete:()=> console.log(' no more data')
  }

  source$.subscribe(theObserver)

 Observable簡潔形式

// 為了讓代碼更加簡潔,沒有必要創建一個參數對象。
  // subscribe也可以直接接受函數作為參數,
  // 第一個參數如果是函數類型,就會被認為是next
  // 第二參數被認為是error
  // 第三個參數complete
  source$.subscribe(
    item => console.log(item),
    err => console.log(err),
    ()=> console.log(' no more data')
  )

 Observable退訂

// 返回一個對象,並且對象包含了unsubscribe函數,表示退訂
  const onSubscribe = observer => {
    let number = 1;
    const Timer = setInterval(()=>{
      observer.next(number++)
    },1000)
    return {
      unsubscribe:()=>{
        clearInterval(Timer)
      }
    }
  }
  const source$ = new Observable(onSubscribe);

  // subscribe函數的返回結果存為變量subscription
  const subscription =  source$.subscribe(
    item => console.log(item),
    err => console.log(err),
    ()=> console.log(' no more data')
  )

  // 3.5s后調用退訂
  // 3.5s后不再接受到被推送的數據,但是Observable的source$資源並沒有終結
  // 因為始終沒有調用complete,只不過再也不會調用next函數了
  setTimeout(()=>{
    subscription.unsubscribe()
  },3500)

 

修改以下代碼,便於觀察

// 返回一個對象,並且對象包含了unsubscribe函數,表示退訂
  const onSubscribe = observer => {
    let number = 1;
    const Timer = setInterval(()=>{
      console.log('in onSUbscribe ',number)
      observer.next(number++)
    },1000)
    return {
      unsubscribe:()=>{
        // clearInterval(Timer)
      }
    }
  }

執行結果如下:

 

 由此可見,Observable對象source$在退訂以后依然在不斷調用next函數,

但是已經斷開了source$對象和Observer的連接。

所以onSubscribe中如何調用next,observer都不會做出任何響應


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM