RxJS入門


一、RxJS是什么?

官方文檔使用了一句話總結RxJS: Think of RxJS as Lodash for events。那么Lodash主要解決了什么問題?Lodash主要集成了一系列關於數組、對象、字符串等集合操作,極大的方便了對這些集合數據進行衍生。舉個簡單的例子:求數組偶數元素的平方和

const { pipe, filter, map, reduce } = require('lodash/fp')
const source = [0, 1, 2, 3, 4]
const result = pipe(
  filter(x => x % 2 === 0),
  map(x => x * x),
  reduce((acc, cur) => acc + cur, 0)
)(source)
console.log(result) // 20

那么如果source中的元素序列是異步產生的呢,如何處理?其中一種解決方案是:Observer Pattern(觀察者模式) + Iterator Pattern(迭代器模式)

const event = new (require('events')).EventEmitter()
let count = 0,
  sum = 0
const source = []
const itr = source[Symbol.iterator]()
event.on('pushData', data => {
  source.push(data)
  const { value } = itr.next()
  if (value % 2 === 0) {
    sum += value * value
  }
})

event.on('pushDataComplete', () => {
  console.log(sum) // 20
})
const timer = setInterval(() => {
  if (count > 4) {
    clearInterval(timer)
    event.emit('pushDataComplete')
    return
  }
  event.emit('pushData', count++)
}, 2000)

上述代碼有什么問題?——沒問題,但是結構松散,不易閱讀,不符合函數式編程規范。用RxJS實現則簡單的多,代碼如下:

const { interval } = require('rxjs')
const { reduce, take, filter, map } = require('rxjs/operators')

const source$ = interval(1000)
const result$ = source$.pipe(
  take(5)
  filter(x => x % 2 === 0),
  map(x => x * x),
  reduce((acc, cur) => acc + cur, 0)
)
result$.subscribe(x => console.log(x))

這段代碼與上文中Lodash實現的代碼基本一致,唯一不同的是RxJS處理的是異步數據序列,這個異步數據序列在RxJS中被稱為流(stream)。RxJS提供了很多操作符,可以對單條數據流進行轉化、過濾等操作,也可以對多條數據流進行合並等操作。

二、RxJS數據表示方法

RxJS中表示流的方法是Observable對象,也可以這么說,RxJS就是通過Observable組合各種異步行為的庫。RxJS結合了觀察者和迭代器模式的思想,可以簡單的表示為:

Observable = Publisher + Iterator

下面是一個簡單的例子

const { Observable } = require('rxjs')
const onSubscribe = observer => {
  observer.next(0)
  observer.next(1)
  setTimeout(() => {
    observer.next(3)
    observer.complete()
    observer.next(4)
  }, 1000)
  observer.next(2)
}
// 創建流
const source$ = new Observable(onSubscribe)
// 創建觀察者
const observer = {
  next: item => console.log(item),
  complete: () => console.log('complete'),
  error: error => console.log(error)
}
// 訂閱流
console.log('start')
source$.subscribe(observer)
console.log('end')
// start
// 0
// 1
// 2
// end
// 3
// complete

前面我們說過Observable是Publisher和Iterator的結合,也僅僅是思想上的,實際上還是有很多區別,一般發布-訂閱模式會在內部維護一個listeners清單,在要發布通知時會逐一的調用訂閱者。但是Observable不是這樣的,在其內部並沒有一份訂閱者的清單。訂閱Observable的行為像是執行一個回調方法(onSubscribe),並且這個回調方法是把觀察者observer當做參數的,而這里的觀察者observer是一個具有三個方法屬性的普通對象,觀察者的三個方法(method):

  • next:每當Observable吐出新的值,next方法就會被調用。
  • complete:在Observable再也沒有值吐出時調用,在complete被調用之后,next方法就不會再起作用。
  • error:每當Observable內發生錯誤時,error方法就會被調用。

沒有強制要求observer對象必須要具有這三種方法,但至少需要有next方法,除此之外,Observablesubscribe方法還可以直接依次傳入next/error/complete方法,其內部會自動組成完整的observer對象。

從上面的例子可以看出RxJS可以同時處理同步和異步行為,Observable可以通過創建時傳入的回調onSubscribe方法控制數據吐出的節奏,這種數據流的節奏可以用一個時間軸來表示,在RxJS中被稱為彈珠圖(Marble Diagrams),上面的例子可以使用下面的彈珠圖表示,第一顆彈珠表示同步吐出的0,1,2,第二顆彈珠表示1秒后吐出的3,彈珠上的豎線表示數據流不再產生數據,也就是調用了observercomplete方法.

... ×3 3

理解彈珠圖的意義的話,可以很容易畫出本文第一節中例子對應的彈珠圖

const { interval } = require('rxjs')
const { reduce, take, filter, map } = require('rxjs/operators')

const source$ = interval(1000)
const result$ = source$.pipe(
  take(5)
  filter(x => x % 2 === 0),
  map(x => x * x),
  reduce((acc, cur) => acc + cur, 0)
)
result$.subscribe(x => console.log(x))

interval(1000)

0 1 2 3 4 5 6 7 8 9

take(5)

0 1 2 3 4

filter(x => x % 2 === 0)

0 2 4

map(x => x * x)

0 4 16

reduce((acc, cur) => acc + cur, 0)

20

三、RxJS操作符

如同lodash,RxJS完成復雜異步操作的關鍵是其實現了大量的操作符,RxJS實現了多達100+的操作符,包括創建類、轉換類、過濾類、聯合類、工具類等,如上面的例子中,interval屬於創建類操作符,它創建了一個Observable對象,作為數據的源頭,takefilter屬於過濾類操作符,map屬於轉換類, reduce屬於聚合類。實際應用中,我們會花很多時間在操作符的選擇上,想熟悉掌握這些操作符不是短期內能完成的,但至少初學者要了解大部分操作符能完成什么樣的操作,由於篇幅限制,本文不打算一一介紹所有的操作符,這些操作符可以具體可參考官方文檔,后續例子中如果應用到的操作符會着重介紹一下,下面還是借着前面的例子說一下操作符的實現原理,RxJS中大多數操作符都是Pipeable Operators,例子中除了interval以外都是Pipeable Operators,Pipeable Operators本質上是一個純函數,它將一個Observable作為輸入,生成另一個Observable作為輸出。訂閱輸出Observable也將訂閱輸入Observable。在RxJS中自定義一個操作符非常簡單,只需要符合上述指導原則。下面的代碼自行實現了例子中所有操作符,看起來一目了然。

const { Observable } = require('rxjs')
const interval = duration =>
  new Observable(observer => {
    let count = 0
    setInterval(() => {
      observer.next(count++)
    }, duration)
  })

const take = num => observable =>
  new Observable(observer => {
    let count = 0
    const subscription = observable.subscribe({
      next(value) {
        if (count <= num) {
          observer.next(value)
          ++count
          if (count === num) {
            observer.complete()
            subscription.unsubscribe()
          }
        }
      },
      error(err) {
        observer.error(err)
      },
      complete() {
        observer.complete()
      }
    })

    return () => {
      subscription.unsubscribe()
    }
  })

const filter = handler => observable =>
  new Observable(observer => {
    const subscription = observable.subscribe({
      next(value) {
        if (handler(value)) {
          observer.next(value)
        }
      },
      error(err) {
        observer.error(err)
      },
      complete() {
        observer.complete()
      }
    })
    return () => {
      subscription.unsubscribe()
    }
  })

const map = handler => observable =>
  new Observable(observer => {
    const subscription = observable.subscribe({
      next(value) {
        observer.next(handler(value))
      },
      error(err) {
        observer.error(err)
      },
      complete() {
        observer.complete()
      }
    })
    return () => {
      subscription.unsubscribe()
    }
  })

const reduce = (handler, seed) => observable =>
  new Observable(observer => {
    const arr = []
    const subscription = observable.subscribe({
      next(value) {
        arr.push(value)
      },
      error(err) {
        observer.error(err)
      },
      complete() {
        seed = arr.reduce(handler, seed)
        observer.next(seed)
        observer.complete()
      }
    })
    return () => {
      subscription.unsubscribe()
    }
  })

const source$ = interval(1000).pipe(
  take(5),
  filter(x => x % 2 === 0),
  map(x => x * x),
  reduce((acc, seed) => acc + seed, 0)
)

source$.subscribe(item => console.log(item), null, () =>
  console.log('complete')
)

四、RxJS與Promise

目前主流的異步解決方案是Promise,Await本質也是Promise,那么RxJS解決方案相比Promise有什么優勢呢?
1.Observable可以處理異步事件流,但是Promise只能處理單次事件

const { Observable } = require('rxjs')
const source$ = new Observable(observer => {
  setTimeout(() => observer.next(1), 1000)
  setTimeout(() => observer.next(2), 2000)
  setTimeout(() => observer.next(3), 3000)
  setTimeout(() => observer.complete(), 4000)
})
source$.subscribe(result => console.log(result))

2019-09-29_16-38-40 -4-

2.Observable是懶執行的(Lazyable),而new Promise(executor)executor會立即執行

const { Observable } = require('rxjs')

const source$ = new Observable(observer => {
  setTimeout(() => observer.next(1), 1000)
  setTimeout(() => observer.next(2), 2000)
  setTimeout(() => observer.complete(), 3000)
})
setTimeout(() => {
  console.log(3)
  source$.subscribe(result => console.log(result))
}, 3000)
// 3
// 1
// 2

3.Observable 數據是可丟棄的(Cancellable/Abortable)
如前面例子中的take操作符,實際上只取了前5個數據,而丟棄了后面所有的數據,RxJS中還有很多操作符具有類似的性質,如takeUntil( observable ), takeWhile( predicate ), take( n ), first(), first( predicate )從它們的名稱和參數就大概能猜到它們的作用。
再比如實際應用中可以會遇到需要丟棄網絡請求的結果,如果單純使用Promise是無法實現的,

const delay = wait => {
  return new Promise(resolve => {
    setTimeout(resolve, wait)
  })
}
delay(3000).then(() => console.log('xxxx'))

上面的Promise無論如何都會打印出xxxx,目前未知ES6規范的Promise仍未實現cancellation,但是使用Observable可以很方便的實現

const { defer } = require('rxjs')
const delay = wait => {
  return new Promise(resolve => {
    setTimeout(resolve, wait)
  })
}
const source$ = defer(() => delay(3000))
const subscription = source$.subscribe(() => console.log('xxxx'))
setTimeout(subscription.unsubscribe.bind(subscription), 2000)

只需要在結果返回前取消訂閱就不會打印出結果

4.Observable 是可以重試的(Retryable)

const { Observable } = require('rxjs')
const { retry } = require('rxjs/operators')

const source$ = new Observable(observer => {
  observer.next(1)
  throw 'Error!'
  setTimeout(() => observer.complete(), 4000)
})
source$
  .pipe(retry(3))
  .subscribe(result => console.log(result), err => console.log('Error'))

// 1
// 1
// 1
// 1
// Error

從上面的比較可以看出,RxJS可以處理很多Promise難以處理的場景,而Promise也可以很方便的用過defer操作符轉化成Observable.

const { defer } = require('rxjs')
const delay = wait => {
  return new Promise(resolve => {
    setTimeout(resolve, wait)
  })
}
defer(() => delay(3000)).subscribe(() => console.log('xxx'))

五、RxJS使用案列

1. 搜索類問題

搜索類問題是我們實際開發中常遇到的一類問題,如下面的兩種場景,上圖是查詢翻譯結果,下圖是獲取保險報價,這兩種場景實際上是一類問題——根據特定的條件查詢正確的結果,這類問題在實踐的時候需要注意幾點:

  • 查詢請求需要防抖
  • 檢查查詢條件是否合理
  • 檢查結果是否和條件對應

2019-10-09_10-39-51 -1-
2019-10-09_10-44-22 -1

參照上述三個注意點,我們首先看使用普通JS實現方式.

const input = document.querySelector('#input')
let lastShowedResult = 0
let timer = null
input.addEventListener('change', evt => {
  clearTimeout(timer)
  timer = setTimeout(
    async query => {
      if (query && query.length > 0) {
        const requestTime = +Date.now()
        const data = await fetch(url)
        if (requestTime > lastShowedResult) {
          lastShowedResult = requestTime
          showResult(data)
        }
      }
    },
    500,
    evt.target.value.trim()
  )
})

上面的功能涉及到三個非同步行為:輸入框輸入、防抖、網絡請求,如果使用普通JS實現,可以看到這三種異步行為使用了不同的范式,另外上面的代碼還有一個丑陋的地方是使用了很多外部標識,如timerlastShowedResult。下面是RxJS實現的版本。

import { fromFetch } from "rxjs/fetch"
import {
  debounceTime,
  pluck,
  map,
  filter,
  switchMap
} from "rxjs/operators"

const search$ = fromEvent(document.querySelector('#input'), 'change').pipe(
  debounceTime(500),
  pluck('target', 'value'),
  map(query => query.trim()),
  filter(query => query.length !== 0),
  switchMap(query => fromFetch(`${url}?keyword=${query}`))
)
search$.subscribe(data => {
  showResult(data)
})

上面使用的一些操作符在前面沒有提到過,fromEvent是一個創建類操作符,它可以基於給定事件目標的特定類型事件創建一個Observable對象,debounceTime很好理解,只有在特定時間間隔過去源Observable沒有吐出下個值,才從源Observable獲取一個值,pluck用來獲取嵌套熟悉值,fromFetch創建一個網絡請求的Observable對象,switchMap是一個關鍵的操作符,也是比較難以理解的,實際上switchMap 等價於 pipe(map, switchAll),為了便於演示,將上述模型進行一下簡化。

const { defer, interval, range } = require('rxjs')
const {
  debounceTime,
  take,
  map,
  concatAll,
  scan,
  mapTo,
  switchMap
} = require('rxjs/operators')
const fetchData = query => {
  return new Promise((resolve, reject) => {
    setTimeout(resolve, (query + 1) * 100, query)
  })
}

const source$ = range(2, 6).pipe(
  map(s => interval(s * 100).pipe(take(1))),
  concatAll(),
  mapTo(1),
  scan((acc, one) => acc + one, 0)
)

const search$ = source$.pipe(
  debounceTime(500),
  map(query => defer(() => fetchData(query))),
  switchAll()
  // switchMap(query => defer(() => fetchData(query))
)

search$.subscribe(data => {
  console.log(data)
})

// 3
// 4
// 6

假設數據不是來源於用戶輸入而是來源於模擬的source$可觀測對象,響應數據會原樣返回請求的數據。使用彈珠圖分析異步數據的變化。

source$
這里我們不關注構造source$的原理,需要它能夠模擬用戶輸入,產生不同時間間隔的異步數據序列,如下彈珠圖所示

1 2 3 4 5 6

debounceTime(500)
500ms內無數據吐出,則釋放數據,注意數據6和數據5時間間隔比較近是因為source$吐出數據6后就complete了,后面再無數據了,因此會立即釋放

3 4 5 6

map(query => defer(() => fetchData(query)))
這里map操作符產生的數據也是Observable,因此經過此操作符后產生的Observable是一個高階Observable

3 4 5 6

switchAll()
switchAll操作符的作用是將高階Observable轉換成一階Observable,這個一階Observable吐出的數據為最新的內層Observable產生的數據

3 4 6

2. 網絡請求問題

前面比較Observable和Promise的時候,提到過Observable是可以重試的(Retryable),而支持失敗重試可以保證應用的高可用性。
RxJx中實現重試最簡單的方法是使用retry操作符

source$.pipe(retry(3)).subscribe((data) => console.log(data))

上述操作會在source$出錯時立即重試,最多重試3次,但是在真實的應用中往往由於系統問題,不能即刻恢復正常,解決方案是延時一段時間再重試,借助retryWhen操作符可以實現

source$
  .pipe(retryWhen(err$ => err$.pipe(delay(100))))
  .subscribe(data => console.log(data))

但是不能無限的重試下去,還是需要添加重試上限,借助scan操作符的數據累計功能可以實現

source$
  .pipe(
    retryWhen(
      err$.pipe(
        scan((errorCount, err) => {
          if (errorCount >= 3) {
            throw err
          }
          return errorCount + 1
        }, 0),
        delay(100)
      )
    )
  )
  .subscribe(data => console.log(data))

當訪問某個服務器API,第一次失敗,可以等100毫秒之后再嘗試,結果又失敗了,這時候一個比較經驗性的做法不是再等100毫秒之后重試,過去的100毫秒服務器沒有恢復,那估計再等100毫秒恢復的概率也不高,而且訪問太頻繁對服務器造成壓力也不大好,所以,可以選擇200毫秒之后重試,如果再失敗,就進一步增加重試延遲,400毫秒之后重試,然后800毫秒后重試,以每次失敗選擇2n ×100毫秒的延時,n為失敗次數。

source$
  .pipe(
    retryWhen(
      err$.pipe(
        scan((errorCount, err) => {
          if (errorCount >= 3) {
            throw err
          }
          return errorCount + 1
        }, 0),
        delayWhen(errorCount => {
          const delayTime = Math.pow(2, errorCount - 1) * 100
          return timer(delayTime)
        })
      )
    )
  )
  .subscribe(data => console.log(data))

綜上我們可以自定義一個重試的操作符

const retryWithExpotentialDelay = (
  maxRetry,
  initialDelay,
  delayFunction
) => source$ => {
  return source$.pipe(
    retryWhen(err$ =>
      err$.pipe(
        scan((errorCount, err) => {
          if (errorCount >= maxRetry) {
            throw err
          }
          return errorCount + 1
        }, 0),
        delayWhen(errorCount => {
          const delayTime = delayFunction(initialDelay, errorCount)
          return timer(delayTime)
        })
      )
    )
  )
}

可以對現有應用進行一些小的改造,將網絡請求替換成下面的request方法就能保障應用接口請求的成功率

const request = async (options = {}) => {
  if (!options.url) {
    throw new Error('invalid request options')
  }
  const {
    maxRetry = 2,
    initialDelay = 100,
    delayFunction = (initialDelay, errorCount) =>
      Math.pow(2, errorCount - 1) * initialDelay,
    testResSuccess = rawData => {
      return rawData && rawData.code === 0
    },
    ...restOptions
  } = options
  return new Promise((resovle, reject) => {
    defer(async () => {
      const { data } = await axios(restOptions)
      if (!testResSuccess(data)) {
        return Promise.reject()
      }
      return Promise.resolve(data)
    })
      .pipe(retryWithExpotentialDelay(maxRetry, initialDelay, delayFunction))
      .subscribe(
        data => resovle(data),
        err => {
          reject(err)
        }
      )
  }).catch(e => {
    console.log(e)
  })
}

3. 拖動排序

下面是cms系統中常用的拖拽排序功能,讀者可以對照RxJS官方文檔進行分析

六、總結

這篇文章僅僅介紹了RxJS的冰山一角,使用的操作符不過十幾個,旨在學習RxJS的基本概念和使用場景,還有諸如多播(multicast)、時間調度(Schedule)以及與常用前端技術棧結合等問題都未涉及,文章結尾有一些學習資料/網站/工具可以參考

RxJS
深入淺出RxJS
Thirty-days-RxJS
Learn RxJS
reactive.how
Rx Visualizer


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM