何為Reactive Extensions(Rx)
Rx是一個遵循函數式編程的類庫,它引用觀察者以及迭代器設計模式對可觀察對象產生的數據進行異步消費。使用Rx,
開發人員將使用LINQ運算符操作異步數據流,並使用調度程序參數化異步數據流中的並發性,簡單地說,Rx = Observables + LINQ + Schedulers。
使用Rx需要Nuget安裝System.Reactive Nuget包
Rx的使用場景
響應式UI
UI界面上,用戶對一個綁定數據集合的控件進行關鍵字查詢。常規的流程是我們必須在等待用戶鍵盤按下指定的完成鍵(如回車)或鼠標點擊查詢按鈕后程序才開始執行相應的查詢處理。但假設需求變更:“用戶希望在每輸入一個關鍵字后就能及時將關鍵字相應的查詢結果集綁定到控件” 如果面臨這個需求,那你會如何實現呢? 你會少不了定義相應的全局狀態字段,少不了相應的時間間隔刷新。我相信寫出來的代碼也會讓你很煩惱。 其實你有更好的選擇,那就是我們的主角Rx。
Rx 核心
Rx有兩個核心接口 IObservable<T>、IObserver<T>
IObservable<T>
先來看此接口的結構:
IObservable<T>接口就提供一個Subscribe(訂閱)方法,入參是一個觀察者對象接口。
我們可以將IObservable<T>稱之為被觀察者(可觀察者),IObserver<T>稱之為觀察者。
通過可接口簽名可以看出被觀察者需要輸出T類型的對象。需要理解被觀察者IObservable<T>我們需要與現有的一些常規知識點做出比較,這里我們用IEnumerable<T>比較。
我想我們都使用過Linq,操作過IEnumerable<T>集合,IEnumerable<T>集合有個明顯的狀態就是它所存儲的元素是靜態的。集合內的元素狀態除非代碼顯示的新增或刪除、修改,否則這個集合基本是靜態(數據未變動)的。但是IObservable<T>則不同,它的元素是根據被觀察者提供的數據而變動的(不可預測的),就好比在UI上你無法預測用戶的操作行為一樣。
下面這個表格可以看出兩者區別
IEnumerable |
可方便的列舉集合元素值 |
---|---|
IObservable |
可觀察對象變動的值 |
IObserver<T>
IObserver<T>接口可以理解為消費被貫徹着提供數據的一個接口,它的三個方法決定了本次數據流的觀察行為的走向。
通俗理解就是被觀察者生成數據,觀察者消費數據。
來看下IObserver<T>的結構
- OnNext 表示消費新數據
- OnError 表示觀察數據流出現異常
- OnCompleted 表示明確關閉觀察數據流
代碼示例
下面代碼定義了一個可觀察的隊列,該隊列會提供給觀察者三個int類型的入參 1、2、3 供觀察者對象的OnNext方法消費。 MyConsoleObserver(觀察者)在得到數據后打印出來。
1 class Program 2 { 3 4 static void Main(string[] args) 5 { 6 Test(); 7 } 8 9 private static void Test() 10 { 11 var numbers = new MySequenceOfNumbers(); 12 var observer = new MyConsoleObserver<int>(); 13 numbers.Subscribe(observer); 14 Console.ReadLine(); 15 } 16 17 } 18 19 /// <summary> 20 /// 自定義被觀察隊列 21 /// </summary> 22 public class MySequenceOfNumbers : IObservable<int> 23 { 24 public IDisposable Subscribe(IObserver<int> observer) 25 { 26 observer.OnNext(1); 27 observer.OnNext(2); 28 observer.OnNext(3); 29 observer.OnCompleted(); 30 return Disposable.Empty; 31 } 32 } 33 34 /// <summary> 35 /// 自定義觀察者對象 36 /// </summary> 37 /// <typeparam name="T"></typeparam> 38 public class MyConsoleObserver<T> : IObserver<T> 39 { 40 public void OnNext(T value) 41 { 42 Console.WriteLine("接收到 value {0}", value); 43 } 44 public void OnError(Exception error) 45 { 46 Console.WriteLine("出現異常! {0}", error); 47 } 48 public void OnCompleted() 49 { 50 Console.WriteLine("關閉觀察行為"); 51 } 52 }
通過示例代碼我們得知了Rx.Net的數據流訂閱、消費流程。
Subject<T>
我們再來認識下Subject<T>,Subject是一個IObservable,它以命令形式生成一個值,並將該值推送給觀察者對象。我們看下Subject<T>的結構。
看這繼承關系,我們繼續看SubjectBase<T>里面有些啥~
哎呀,這個類真不得了啊,把IObserver<T>和IObservable<T>都給繼承了。這不是可以自己提供數據自己進行訂閱和消費了么....
我們來看看Subject<T>是怎么玩的:
1 private static void SubjectTest() 2 { 3 //定義一個 類型string的Subject對象 4 var inputs = new Subject<string>(); 5 //訂閱數據流 6 inputs.Subscribe((p => Console.WriteLine($"得到的值:{p}"))); 7 //循環造數據 8 for (int i = 0; i < 4; i++) 9 { 10 inputs.OnNext($"時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}, 下標:{i}"); 11 } 12 Console.ReadLine(); 13 }
執行結果如下圖:
需要注意的是訂閱方法需要在數據生成前聲明。
總結
好啦,要睡覺啦,基本上Rx核心的幾個點就先講到這,我們也來總結下Rx.Net的幾個核心知識點:
- 可觀察(被觀察)對象生產數據;
- 觀察者總是被動接收數據;
- 需要明確訂閱后觀察者才得以消費數據;
這里也只是自己的一個學習總結,Rx也不是眼前的幾個小示例就可以一目了然,不過核心的知識基本上就是這些。至於還有一些操作符的話相信用過Linq的話上手不難,難點在於需要在實際業務中找到合適的場景使用,只有不斷使用才會融會貫通。但是任何技術都不能濫用,每一項新技術都有它最佳使用場景,優秀的開發者需要做好權衡。