Net中的反應式編程


Net中的反應式編程(Reactive Programming)

 

系列主題:基於消息的軟件架構模型演變

 

一、反應式編程(Reactive Programming)

1、什么是反應式編程:反應式編程(Reactive programming)簡稱Rx,他是一個使用LINQ風格編寫基於觀察者模式的異步編程模型。簡單點說Rx = Observables + LINQ + Schedulers。

2、為什么會產生這種風格的編程模型?我在本系列文章開始的時候說過一個使用事件的例子:

1
2
3
4
5
6
7
8
9
var  watch = new  FileSystemWatcher();
   watch.Created += (s, e) =>
   {
       var  fileType = Path.GetExtension(e.FullPath);
       if  (fileType.ToLower() == "jpg" )
       {
           //do some thing
       }
   };

這個代碼定義了一個FileSystemWatcher,然后在Watcher事件上注冊了一個匿名函數。事件的使用是一種命令式代碼風格,有沒有辦法寫出聲明性更強的代碼風格?我們知道使用高階函數可以讓代碼更具聲明性,整個LINQ擴展就是一個高階函數庫,常見的LINQ風格代碼如下:

1
2
3
4
var  list = Enumerable.Range(1, 10)
                 .Where(x => x > 8)
                 .Select(x => x.ToString())
                 .First();

能否使用這樣的風格來編寫事件呢?

3、事件流
LINQ是對IEnumerable<T>的一系列擴展方法,我們可以簡單的將IEnumerable<T>認為是一個集合。當我們將事件放在一個時間范圍內,事件也變成了集合。我們可以將這個事件集合理解為事件流。

事件流的出現給了我們一個能夠對事件進行LINQ操作的靈感。

二、反應式編程中的兩個重要類型

事件模型從本質上來說是觀察者模式,所以IObservable<T>和IObserver<T>也是該模型的重頭戲。讓我們來看看這兩個接口的定義:

1
2
3
4
5
public  interface  IObservable< out  T>
{
       //Notifies the provider that an observer is to receive notifications.
       IDisposable Subscribe(IObserver<T> observer);
}
1
2
3
4
5
6
7
8
9
10
11
public  interface  IObserver< in  T>
{
     //Notifies the observer that the provider has finished sending push-based notifications.
     void  OnCompleted();
 
     //Notifies the observer that the provider has experienced an error condition.
     void  OnError(Exception error);
    
     //Provides the observer with new data.
     void  OnNext(T value);
}

這兩個名稱准確的反應出了它兩的職責:IObservable<T>-可觀察的事物,IObserver<T>-觀察者。

IObservable<T>只有一個方法Subscribe(IObserver<T> observer),此方法用來對事件流注冊一個觀察者。

IObserver<T>有三個回調方法。當事件流中有新的事件產生的時候會回調OnNext(T value),觀察者會得到事件中的數據。OnCompleted()和OnError(Exception error)則分別用來通知觀察者事件流已結束,事件流發生錯誤。

顯然事件流是可觀察的事物,我們用Rx改寫上面的例子:

1
2
3
4
5
6
Observable.FromEventPattern<FileSystemEventArgs>(watch, "Created" )
                 .Where(e => Path.GetExtension(e.EventArgs.FullPath).ToLower() == "jpg" )
                 .Subscribe(e =>
                 {
                     //do some thing
                 });

注:在.net下使用Rx編程需要安裝以下Nuget組件:

  • Rx-Core
  • Rx-Interfaces
  • Rx-Linq

三、UI編程中使用Rx

Rx模型不但使得代碼更加具有聲明性,而且使得UI編程更加簡單。

1、UI編程中的第一段Rx代碼

為了簡單的展示如何在UI編程中使用Rx,我們以Winform中的Button為例,看看事件模型和Rx有何不同。

1
2
3
4
5
6
7
8
9
private  void  BindFirstGroupButtons()
  {
      btnFirstEventMode.Click += btnFirstEventMode_Click;
  }
 
  void  btnFirstEventMode_Click( object  sender, EventArgs e)
  {
      MessageBox.Show( "hello world" );
  }

添加了一個Button,點擊Button的時候彈出一個對話框。使用Rx做同樣的實現:

1
2
3
4
//得到了Button的Click事件流。
var  clickedStream = Observable.FromEventPattern<EventArgs>(btnFirstReactiveMode, "Click" );
//在事件流上注冊了一個觀察者。
clickedStream.Subscribe(e => MessageBox.Show( "Hello world" ));

有朋友指出字符串“Click”非常讓人不爽,這確實是個問題。由於Click是一個event類型,無法用表達式樹獲取其名稱,最終我想到使用擴展方法來實現:

1
2
3
4
5
6
7
8
9
public  static  IObservable<EventPattern<EventArgs>> FromClickEventPattern( this  Button button)
  {
      return  Observable.FromEventPattern<EventArgs>(button, "Click" );
  }
 
  public  static  IObservable<EventPattern<EventArgs>> FromDoubleClickEventPattern( this  Button button)
  {
      return  Observable.FromEventPattern<EventArgs>(button, "DoubleClick" );
  }

我們平時常用的事件類型也就那么幾個,可以暫時通過這種方案來實現,該方案算不上完美,但是比起直接使用字符串又能優雅不少。

1
2
btnFirstReactiveMode.FromClickEventPattern()
                 .Subscribe(e => MessageBox.Show( "hello world" ));

2、UI編程中存在一個很常見的場景:當一個事件的注冊者阻塞了線程時,整個界面都處於假死狀態。.net中的異步模型也從APM,EAP,TPL不斷演化直至async/await模型的出現才使得異步編程更加簡單易用。我們來看看界面假死的代碼:

1
2
3
4
5
6
void  btnSecondEventMode_Click( object  sender, EventArgs e)
  {
      btnSecondEventMode.BackColor = Color.Coral;
      Thread.Sleep(5000);
      MessageBox.Show( "hello world" );
  }

Thread.Sleep(5000);模擬了一個長時間的操作,當你點下Button時整個界面處於假死狀態並且此時的程序無法響應其他的界面事件。傳統的解決方案是使用多線程來解決假死:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void  BtnSecondEventAsyncModel_Click( object  sender, EventArgs e)
{
     Action action = () =>
     {
         Task.Run(() =>
         {
             BtnSecondEventAsyncModel.BackColor = Color.Coral;
             Thread.Sleep(5000);
             MessageBox.Show( "hello world" );
         });
 
     };
     BeginInvoke(action);
}

這個代碼的復雜點在於:普通的多線程無法對UI進行操作,在Winform中需要用Control.BeginInvoke(Action action)經過包裝后,多線程中的UI操作才能正確執行,WPF則要使用Dispatcher.BeginInvoke(Action action)包裝。

在Rx中選擇合適的Scheduler類型就可以輕松解決多線程更新UI的問題。

1
2
3
4
5
6
7
8
btnSecondReactiveMode.FromClickEventPattern()
                 .ObserveOn( new  NewThreadScheduler())
                 .Subscribe(e =>
                 {
                     btnSecondReactiveMode.BackColor = Color.Coral;
                     Thread.Sleep(5000);
                     MessageBox.Show( "hello world" );
                 });

一句ObserveOn(new NewThreadScheduler())成功讓后面的觀察者跑在了新線程中,並且避免了多線程更新UI的問題,使用者無需再被這種問題所困擾。——抱歉,這一段描述有問題,簡單的使用NewThreadScheduler並不能保證異步更新UI。待我周末了詳細補充一下Rx中的異步能力。
注:使用Scheduler需要從Nuget中安裝Rx-PlatformServices。

常用的Scheduler有:

  • DefaultScheduler:默認的Scheduler,會將觀察者添加在調用隊列中
  • ImmediateScheduler:立即執行
  • TaskPoolScheduler:用來執行短時間的任務
  • ...

3、再來一個例子,讓我們感受一下Rx的魅力

界面上有兩個Button分別為+和-操作,點擊+按鈕則+1,點擊-按鈕則-1,最終的結果顯示在一個Label中。
這樣的一個需求使用經典事件模型只需要維護一個內部變量,兩個按鈕的Click事件分別對變量做加1或減1的操作即可。
Rx作為一種函數式編程模型講求immutable-不可變性,即不使用變量來維護內部狀態。

1
2
3
4
5
6
7
8
var  increasedEventStream = btnIncreasement.FromClickEventPattern()
     .Select(_ => 1);
var  decreasedEventStream = btnDecrement.FromClickEventPattern()
     .Select(_ => -1);
 
increasedEventStream.Merge(decreasedEventStream)
     .Scan(0, (result, s) => result + s)
     .Subscribe(x => lblResult.Text = x.ToString());

這個例子使用了IObservable<T>的”謂詞”來對事件流做了一些操作。

  • Select跟Linq操作有點類似,分別將兩個按鈕的事件變形為IObservable<int>(1)和IObservable<int>(-1);
  • Merge操作將兩個事件流合並為一個;
  • Scan稍顯復雜,對事件流做了一個折疊操作,給定了一個初始值,並通過一個函數來對結果和下一個值進行累加;

下面就讓我們來看看IObservable<T>中常用的“謂詞”

四、IObservable<T>中的謂詞

IObservable<T>的靈感來源於LINQ,所以很多操作也跟LINQ中的操作差不多,例如Where、First、Last、Single、Max、Any。
還有一些“謂詞”則是新出現的,例如上面提到的”Merge”、“Scan”等,為了理解這些“謂詞”的含義,我們請出一個神器RxSandbox

1、Merge操作,從下面的圖中我們可以清晰的看出Merge操作將三個事件流中的事件合並在了同一個時間軸上。

2、Where操作則是根據指定的條件篩選出事件。

有了這個工具我們可以更加方便的了解這些“謂詞”的用途。

五、IObservable<T>的創建

Observable類提供了很多靜態方法用來創建IObservable<T>,之前的例子我們都使用FromEventPattern方法來將事件轉化為IObservable<T>,接下來再看看別的方法。

Range方法可以產生一個指定范圍內的IObservable<T>

1
2
Observable.Range(1, 10)
           .Subscribe(x => Console.WriteLine(x.ToString()));

Interval方法可以每隔一定時間產生一個IObservable<T>:

1
2
Observable.Interval(TimeSpan.FromSeconds(1))
            .Subscribe(x => Console.WriteLine(x.ToString()));

Subscribe方法有一個重載,可以分別對Observable發生異常和Observable完成定義一個回調函數。

1
2
Observable.Range(1, 10)
           .Subscribe(x => Console.WriteLine(x.ToString()), e => Console.WriteLine( "Error"  + e.Message), () => Console.WriteLine( "Completed" ));

還可以將IEnumerable<T>轉化為IObservable<T>類型:

1
2
Enumerable.Range(1, 10).ToObservable()
           .Subscribe(x => Console.WriteLine(x.ToString()));

也可以將IObservable<T>轉化為IEnumerable<T>

1
var  list= Observable.Range(1, 10).ToEnumerable();

六、其他Rx資源

除了.net中的Rx.net,其他語言也紛紛推出了自己的Rx框架。

 
分類:  .NET


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM