Reactive Extensions(Rx)是對LINQ的一種擴展,他的目標是對異步的集合進行操作,也就是說,集合中的元素是異步填充的,比如說從Web或者雲端獲取數據然后對集合進行填充。Rx起源於Microsoft DevLabs小組的研究,他擴展了LINQ的一些特性,目前Rx支持多種平台如JavaScript,Windows Phone,ios,Android 。隨着數據處理變得復雜,LINQ使得我們的處理邏輯變得簡單清晰,同樣地,隨着越來越多的數據通過從雲端異步獲取,Rx使得這種異步數據處理操作變得簡單和容易維護。
在處理靜態集合數據方面,LINQ使用類似SQL的語法來操作和使用不同來源的數據。相反,Rx被設計出來用來處理將來才會填充好的集合,也就是說,集合類型定義好了,但是集合中的元素可能在未來的某一時刻才會被填充。
LINQ和Rx在技術上有很多相似的地方。在LINQ對集合進行一系列操作如添加,移除,修改,提取后,會得到一個新的集合,新集合只是原始集合的一個修改版本。Rx也是一樣,集合和數據流看起來非常不同,但是他們在很多關鍵的地方有聯系,這就是我們將數據流稱之為未來的集合的原因。集合和數據流都是多數據按某種順序進行排列。LINQ和Rx可以這些序列進行一系列操作然后得到一個新的序列。
Rx提供了一種新的組織和協調異步事件的方式,例如協調多個從雲端返回的多個異步的數據流。Rx能夠是的我們用一個簡單的方式來處理這些數據流,極大的簡化了代碼的編寫。例如,.NET中傳統的Begin/End異步編程模式在處理單個異步操作時可以應付,但是如果同時多個異步調用時,線程控制就會使得代碼變得比較復雜。使用Rx,Begin/End模式就變成了一條簡單的方法,這使得代碼更加清晰和容易理解。
Rx最顯著的特性是使用可觀察集合(Observable Collection)來達到集成異步(composing asynchronous)和基於事件(event-based)的編程的效果。Rx有一些幾個特性。
- 組合(Composing): Reactive Extension的首要目標之一就是將多種異步操作組合起來是的代碼更加簡單。要做到這一點,數據流必須定義清楚,這樣代碼就很清晰集中,使得異步操作代碼異步處理代碼不會充斥整個應用程序。
- 異步(Asynchronous): 雖然Rx不僅僅能處理異步操作,但是使用Rx,大大簡化了異步操作的實現,並且代碼容易理解進而容易維護。
- 基於事件(Event-based): Rx簡化了傳統的異步編程方式
- 可觀察集合(Observable collections): Obervable Collection是Rx的核心,它是一種集合,集合的元素在第一次訪問的時候肯能還沒有填充。它對與Rx的重要性類始於enumerable集合對LINQ的重要性。
下面來看看一個簡單的例子來說明Rx的用法:
新建一個工程RxDemo,通過Nuget 獲取Rx的最新版本:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Reactive.Linq; using System.IO; namespace RxDemo { class Program { static void Main(string[] args) { //使用Range方法返回Observable集合 IObservable<Int32> input = Observable.Range(1, 15); input.Where(i => i % 2 == 0).Subscribe(x => Console.Write("{0} ", x)); Console.WriteLine(); //使用Array返回Observabale集合 var myArray = new[] { 1, 3, 5, 7, 9 }; IObservable<Int32> varmyObservable = myArray.ToObservable(); varmyObservable.Subscribe(x => Console.WriteLine("Integer:{0}", x)); Console.WriteLine(); //Take操作符,用來指定獲取集合中的前幾項 var take = new[] { 1, 2, 3, 4, 5, 4, 3, 2, 1 }.ToObservable(); take.Take(5).Select(x => x * 10).Subscribe(x => Console.WriteLine(x)); Console.WriteLine(); //Skip操作符表示跳過集合中的n條記錄。 var skip = new[] { 1, 2, 3, 4, 5, 4, 3, 2, 1 }.ToObservable(); skip.Skip(6).Select(x => x * 10).Subscribe(x => Console.WriteLine(x)); Console.WriteLine(); //Distinct操作符用來去除集合中的非重復數據。 var distinct = new[] { 1, 2, 3, 4, 5, 4, 3, 2, 1 }.ToObservable(); distinct.Distinct().Select(x => x * 10).Subscribe(x => Console.WriteLine(x)); //Rx也需要釋放資源 Console.WriteLine(); var ObservableStrings = Observable.Using<char, StreamReader>( () => new StreamReader(new FileStream("randomtext.txt", FileMode.Open)), streamReader => (streamReader.ReadToEnd().Select(str => str)).ToObservable() ); ObservableStrings.Subscribe(Console.Write); Console.WriteLine(); //在Rx中Zip是將兩個Observable對象合並為一個新的Observable對象。 var numberCitys = varmyObservable.Zip(input, (range, array) => range + ":" + array); numberCitys.Subscribe(Console.WriteLine); Console.ReadKey(); } } }
上述代碼使用Observable.Range返回個生產Observable對象,他和Enumerable對象的Range方法含義類似,該方法接受兩個參數,一個開始值,以及產生值的個數。
Mono 3.2已經包含了Rx框架,我們的代碼都是Mono中執行的,看下效果:
Rx中的一些操作符和LINQ操作符有很多功能是相同的。下面對最常用的take,skip,distinct,using和zip這個操作符進行說明。
Take
Rx中的Take操作符和LINQ中的功能一樣,它用來指定獲取集合中的前幾項。
Skip
Skip語句表示跳過集合中的n條記錄。這在有些情況下非常有用,比如解析文本的時候,可能第一行是表頭,所以可以使用skip跳過第一行,從第二行開始讀取。還有就是在分頁的時候和take一起使用非常方便。
Distinct
Distinct用來去除集合中的非重復數據。
Using
Rx也需要清理資源,當使用到了一些受限制資源或者非托管資源時,需要我們去管理這些資源的釋放。
當然,我們可以調用Observable對象的一個稱之為Using的靜態方法。方法返回一個IObservable<char>類型對象,接受兩個參數,第一個參數是一個返回StreamReaderde的Func類型參數,第二個是一個接受第一Func參數返回的StreamReader對象,返回一個類型為char的IObservable集合。
Zip
和LINQ中的Zip操作類似。LINQ中的Zip是將兩個集合合並為一個新的集合,在Rx中Zip是將兩個Observable對象合並為一個新的Observable對象。