一,響應式編程
響應式編程是一種關注於數據流(data streams)和變化傳遞(propagation of change)的異步編程方式。
1.1 異步編程
傳統的編程方式是順序執行的,必須在完成了上一個任務之后才能執行下一個任務。無論是提升機器的性能還是代碼的性能,本質上都需要依賴上一個任務的完成。如果需要響應迅速,就得把同步執行的方式換成異步執行,方法執行變成消息發送。這樣的優點是,當你有一堆不依賴於彼此的任務時,你可以同時啟動所有任務,而不是等到每個任務完成后再啟動下一個。這就是異步編程的方法,它是響應式編程的重要特性之一。
1.2 什么是數據流?
數據流是一種編程范式,數據流也被稱為流處理或響應式編程。
傳統上的程序指的是按照特定的順序所執行的一系列操作,這通常被稱為控制流或者命令式編程,這種程序側重於命令,這與馮·諾依曼順序編程的觀點一致。
相比之下,數據流編程強調數據的傳遞,並將程序構建為一系列的連接。它顯式地定義了輸入和輸出的連接操作,其功能類似於黑匣子。當一個操作的所有輸入都有效時,它就會運行。因此,數據流語言本質上是並行的,可以在大型分布式系統中很好地工作。
舉一個例子,App 的歡迎界面通常會需要加載廣告,本地數據庫等操作,這些都可以通過並行的方式來進行處理,全部成功之后才會進入主頁,如下圖:
通過這個例子,我們能夠發現數據流的概念其實非常簡單。它通過將不同的程序模塊(每個程序模塊可獨立執行)連接起來,構建出並行的應用程序模型。
這里提到的不同的程序模塊,在響應式編程中都被稱為流。任何東西,數據庫,共享內存,用戶輸入,變量等等都可以用流來表示。
1.3 變化傳播
簡單來說就是以一個數據流為輸入,經過一系列的操作之后轉換為另一個數據流,然后再分發給各個訂閱者的過程。因為構建了數據之間的訂閱依賴關系,將有助於自動傳播改變的數據。
正是因為具備了以上這些特點,使用響應式編程可以很方便的表達數據流,同時相關的計算模型也能夠自動變化並通過數據流進行傳播。
響應式編程在處理 UI 事件,處理嵌套回調的異步事件,復雜的列表過濾等等方面都有很好的表現。
二,Reactive Extensions
Reactive Extensions(簡稱 ReactiveX) 是對響應式編程理念的一個實現。它最初是由微軟的一個團隊所開發的響應式擴展庫(Reactive Extensions libraray,Rx),隨后越來流行,目前已經支持了幾乎全部的流行編程語言。
社區網站 reactivex.io 對其給出的定義是:
ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.
Rx 是一個使用可觀測的序列來組成異步的、基於事件的程序的庫。
定義中提到的序列即是指的數據序列,數據序列可以采用多種形式,例如來自文件或者Web服務的數據流,Web服務請求,系統通知或者用戶輸入的一系列事件。Rx 將所有這些數據序列表示為可觀察序列 。應用程序可以訂閱這些可觀察序列,以在新數據到達時接收異步通知。(Rx 是一個編程模型,目標是提供一致的編程接口,幫助開發者更方便的處理異步數據流)。
在響應式編程中,通過訂閱數據流(在Rx中稱為可觀察序列)來向應用程序提供數據,應用程序在數據檢索過程中是被動的:除了訂閱了可觀察的數據源之外,它不會主動輪詢數據源,而只是對推送到它的數據做出反應。當流不再沒有更多數據或者出錯時,數據源也會向訂閱者發送通知。
Reactive Extension采用的是這種推送(push)模式,它使得應用程序更具響應性。
ReactiveX 結合了觀察者模式、迭代器模式和函數式編程的精華,為什么這么說呢?接着往下看:
現在假設有一個生產者 A 和一個消費者 B,生產者 A 持有一個可迭代的數據集合。當消費者 B 想要得到這個集合中的數據時,如果使用迭代器模式,消費者 B 就需要去從生產者 A 提供的迭代器中主動的讀取數據,而如果使用的是觀察者模式,數據將會由生產者 A 推送給消費者 B(本質上是使用回調之類的操作)。
這兩種設計模式的區別就在於:迭代器模式中,消費者從生產者那里以同步的方式得到值,是消費者在控制何時取出數據。而在觀察者模式中,生產者以異步的方式把值推給消費者,是生產者在控制消費者何時將接收到數據。
觀察者模式適合使用在 UI 點擊事件上,而迭代器模式的優勢又在於能夠使用一種更通用的方式來遍歷不同類型的數據集合。
Reactive Extension(響應式擴展)結合了迭代器模式的思想,擴展了 GOF 觀察者模式。簡單來說就是 Rx 將可觀察的數據類型和可迭代的數據類型統一成了一種名為 Observable 的新類型。
我們可以將 Observable 類視為與 Iterable 的 “拉” 操作功能相當的 “推” 操作。在 Iterable 中,消費者從生產者和線程中同步地提取值,而 Observable 中,只要值可用,生產者就會將值推送給消費者,這種方法更靈活,因為值可以同步或者異步到達。
Observables 和 Iterables(可迭代對象)共用一個相似的 API:在 Iterables 可以執行的許多操作也都同樣可以在 Observables 上執行。不過由於 Observables 流的本質,所以並沒有類似 Iterable.remove() 這樣的方法。
Observables 支持異步推送多個值:
在傳統的 GOF 觀察者模式缺少了這樣兩個語義(或者說接口):
- 生產者在沒有更多數據的時候能夠發出通知。
- 生產者在發生錯誤的時候能夠發出通知。
所以為了更好復用 Iterable 接口,Rx 的 Observable 類擴展了 GOF 觀察者模式,將 Observable async/push(異步推送)與 Iterable sync/pull(同步拉取)關聯了起來。引入了兩個新接口:
- onCompleted():通知觀察者Observable沒有更多的數據。
- onError():觀察者有錯誤出現了。
同時 Rx 還提供了一些操縱符,讓你可以使用聲明式的方式來組合這些序列,而無須關注底層的實現,比如線程、同步、線程安全、並發數據結構和非阻塞I/O。
正是這種 Observable 模型讓開發者可以像使用集合數據一樣操作數據流,並對數據流進行諸如過濾,查詢,組合等一系列操作。
2.1 總結Rx模式
- 使用訂閱模式
- 創建:Rx 可以方便的創建事件流和數據流
- 組合:Rx 使用查詢式的操作符組合和變換數據流
- 監聽:Rx可以訂閱任何可觀察的數據流並執行操作
- 簡化代碼
- 函數式風格:對可觀察的數據流使用無副作用的輸入/輸出函數,避免程序里錯綜復雜的狀態
- 簡化代碼:Rx 操作符有助於簡化代碼,避免嵌套
- 異常處理機制:傳統的 try/catch 無法處理異步計算,Rx 提供了合適的錯誤處理機制
- 更容易處理同步與並發問題
參考:
ReactiveX/RxJava文檔中文版
響應式編程
數據流
https://docs.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions/hh242985(v=vs.103)
http://www.jonathanbeard.io/blog/2015/09/19/streaming-and-dataflow.html
http://reactivex.io/intro.html