RxJS Observable與axios、promise


如果用一句話闡述對於observable的簡單理解,我會這么說:事件流(event stream) + 觀察訂閱模式(observe/subscribe pattern)。
observable相比promise,可以更好地實現函數式編程、支持取消、可以有多個事件的訂閱者,等等。 在以往的項目實踐中,如react,我們會用axios發起異步請求,在then中處理返回結果。當嵌套多層后,代碼就會變得混亂。我們當然可以用async/await來避免回調嵌套,不過本文旨在說明如何在RxJS框架下以函數式編程的方式實現多層的異步調用。

建立Node項目

npm init -y
npm install @types/node rxjs typescript axios --save-dev

結果如下

{
  "name": "test",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "keywords": [],
  "author": "",
  "license": "ISC",
  "dependencies": {
    "@types/node": "^16.9.1",
    "rxjs": "^7.3.0",
    "typescript": "^4.4.3"
  },
  "devDependencies": {
    "axios": "^0.21.4"
  }
}

示例1:自定義實現Observable對Promise的包裝

import { Observable } from 'rxjs';
import axios from 'axios';

let task = new Observable( ( observer: any ) => {
    axios.get( 'https://jsonplaceholder.typicode.com/users' )
    .then( ( response ) => {
        observer.next( response.data );
        observer.complete();
    } )
    .catch( ( error ) => {
        observer.error( error );
    } );
} );

task.subscribe({
    next(data) { console.log('data: ', data); },
    error(err) { console.log('something wrong occurred: ' + err); },
    complete() { console.log('done'); }
});

運行方式:
tsc [文件名].ts
node [文件名].js

示例2:用RxJS提供的from方法將Promise轉換為Observable

import { from, map } from 'rxjs';
import axios from 'axios';

const promise = axios.get('https://jsonplaceholder.typicode.com/users')
const observable = from(promise).pipe(map(d => d.data));

observable.subscribe({
    next(data) { console.log('data: ', data); },
    error(err) { console.log('something wrong occurred: ' + err); },
    complete() { console.log('done'); }
})

此處在獲取到數據后,再使用map將數據進行格式轉換(示例中原封不動地返回)

示例3:多個Observable並行執行

import { Observable, from, forkJoin, map, of } from 'rxjs';
import { mergeMap, reduce, filter } from 'rxjs/operators';
import axios from 'axios';

const task1 = from(axios.get('https://jsonplaceholder.typicode.com/users')).pipe(map(d => d.data));;
const task2 = from(new Promise(function(resolve) { 
    setTimeout(()=>resolve('Hello!'), 1500);
}))

forkJoin([task1, task2]).subscribe({
    next(data) { console.log('data: ', data); },
    error(err) { console.log('something wrong occurred: ' + err); },
    complete() { console.log('done'); }
})

const squareOdd = of(1, 2, 3, 4, 5)
  .pipe(
    filter(n => n % 2 !== 0),
    map(n => n * n)
  );

// Subscribe to get values
squareOdd.subscribe(x => console.log(x));

讀者可能會問,Promise.all也可以實現同樣的結果,async/await也能避免回調嵌套。 那么RxJS的優勢到底在哪里呢?
的確,簡單的應用場景下並無明顯差別,后續我們給出更高級的應用場景,再做分享。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM