.Net中的反應式編程(Reactive Programming)


系列主題:基於消息的軟件架構模型演變

 

一、反應式編程(Reactive Programming)

1、什么是反應式編程:反應式編程(Reactive programming)簡稱Rx,他是一個使用LINQ風格編寫基於觀察者模式的異步編程模型。簡單點說Rx = Observables + LINQ + Schedulers。

2、為什么會產生這種風格的編程模型?我在本系列文章開始的時候說過一個使用事件的例子:

          var watch = new FileSystemWatcher();
            watch.Created += (s, e) =>
            {
                var fileType = Path.GetExtension(e.FullPath);
                if (fileType.ToLower() == "jpg")
                {
                    //do some thing
                }
            };

這個代碼定義了一個FileSystemWatcher,然后在Watcher事件上注冊了一個匿名函數。事件的使用是一種命令式代碼風格,有沒有辦法寫出聲明性更強的代碼風格?我們知道使用高階函數可以讓代碼更具聲明性,整個LINQ擴展就是一個高階函數庫,常見的LINQ風格代碼如下:

var list = Enumerable.Range(1, 10)
                .Where(x => x > 8)
                .Select(x => x.ToString())
                .First();

能否使用這樣的風格來編寫事件呢?

3、事件流
LINQ是對IEnumerable<T>的一系列擴展方法,我們可以簡單的將IEnumerable<T>認為是一個集合。當我們將事件放在一個時間范圍內,事件也變成了集合。我們可以將這個事件集合理解為事件流。

事件流的出現給了我們一個能夠對事件進行LINQ操作的靈感。

二、反應式編程中的兩個重要類型

事件模型從本質上來說是觀察者模式,所以IObservable<T>和IObserver<T>也是該模型的重頭戲。讓我們來看看這兩個接口的定義:

    public interface IObservable<out T>
    {
          //Notifies the provider that an observer is to receive notifications.
          IDisposable Subscribe(IObserver<T> observer);
    }
    public interface IObserver<in T>
    {
        //Notifies the observer that the provider has finished sending push-based notifications.
        void OnCompleted();

        //Notifies the observer that the provider has experienced an error condition.
        void OnError(Exception error);
       
        //Provides the observer with new data.
        void OnNext(T value);
    }

這兩個名稱准確的反應出了它兩的職責:IObservable<T>-可觀察的事物,IObserver<T>-觀察者。

IObservable<T>只有一個方法Subscribe(IObserver<T> observer),此方法用來對事件流注冊一個觀察者。

IObserver<T>有三個回調方法。當事件流中有新的事件產生的時候會回調OnNext(T value),觀察者會得到事件中的數據。OnCompleted()和OnError(Exception error)則分別用來通知觀察者事件流已結束,事件流發生錯誤。

顯然事件流是可觀察的事物,我們用Rx改寫上面的例子:

Observable.FromEventPattern<FileSystemEventArgs>(watch, "Created")
                .Where(e => Path.GetExtension(e.EventArgs.FullPath).ToLower() == "jpg")
                .Subscribe(e =>
                {
                    //do some thing
                });

注:在.net下使用Rx編程需要安裝以下Nuget組件:

Install-Package Rx-main

三、UI編程中使用Rx

Rx模型不但使得代碼更加具有聲明性,Rx還可以用在UI編程中。

1、UI編程中的第一段Rx代碼

為了簡單的展示如何在UI編程中使用Rx,我們以Winform中的Button為例,看看事件模型和Rx有何不同。

       private void BindFirstGroupButtons()
        {
            btnFirstEventMode.Click += btnFirstEventMode_Click;
        }

        void btnFirstEventMode_Click(object sender, EventArgs e)
        {
            MessageBox.Show("hello world");
        }

添加了一個Button,點擊Button的時候彈出一個對話框。使用Rx做同樣的實現:

            //得到了Button的Click事件流。
            var clickedStream = Observable.FromEventPattern<EventArgs>(btnFirstReactiveMode, "Click");
            //在事件流上注冊了一個觀察者。 
            clickedStream.Subscribe(e => MessageBox.Show("Hello world"));

有朋友指出字符串“Click”非常讓人不爽,這確實是個問題。由於Click是一個event類型,無法用表達式樹獲取其名稱,最終我想到使用擴展方法來實現:

        public static IObservable<EventPattern<EventArgs>> FromClickEventPattern(this Button button)
         {
             return Observable.FromEventPattern<EventArgs>(button, "Click");
         }

         public static IObservable<EventPattern<EventArgs>> FromDoubleClickEventPattern(this Button button)
         {
             return Observable.FromEventPattern<EventArgs>(button, "DoubleClick");
         }

我們平時常用的事件類型也就那么幾個,可以暫時通過這種方案來實現,該方案算不上完美,但是比起直接使用字符串又能優雅不少。

btnFirstReactiveMode.FromClickEventPattern()
                .Subscribe(e => MessageBox.Show("hello world"));

2、UI編程中存在一個很常見的場景:當一個事件的注冊者阻塞了線程時,整個界面都處於假死狀態。.net中的異步模型也從APM,EAP,TPL不斷演化直至async/await模型的出現才使得異步編程更加簡單易用。我們來看看界面假死的代碼:

       void btnSecondEventMode_Click(object sender, EventArgs e)
        {
            btnSecondEventMode.BackColor = Color.Coral;
            Thread.Sleep(2000);
            lblMessage.Text = "event mode";
        }

Thread.Sleep(2000);模擬了一個長時間的操作,當你點下Button時整個界面處於假死狀態並且此時的程序無法響應其他的界面事件。傳統的解決方案是使用多線程來解決假死:

          BtnSecondEventAsyncModel.BackColor = Color.Coral;

            Task.Run(() =>
            {
                Thread.Sleep(2000);
                Action showMessage = () => lblMessage.Text = "async event mode";
                lblMessage.Invoke(showMessage);
            });

這個代碼的復雜點在於:普通的多線程無法對UI進行操作,在Winform中需要用Control.BeginInvoke(Action action)經過包裝后,多線程中的UI操作才能正確執行,WPF則要使用Dispatcher.BeginInvoke(Action action)包裝。

Rx方案:

btnSecondReactiveMode.FromClickEventPattern()
                .Subscribe(e =>
                {
                    Observable.Start(() =>
                    {
                        btnSecondReactiveMode.BackColor = Color.Coral;
                        Thread.Sleep(2000);
                        return "reactive mode";
                    })
                        .SubscribeOn(ThreadPoolScheduler.Instance)
                        .ObserveOn(this)
                        .Subscribe(x =>
                        {
                            lblMessage.Text = x;
                        });
                });

一句SubscribeOn(ThreadPoolScheduler.Instance)將費時的操作跑在了新線程中,ObserveOn(this)讓后面的觀察者跑在了UI線程中。

注:使用ObserveOn(this)需要使用Rx-WinForms

Install-Package Rx-WinForms

這個例子雖然成功了,但是並沒有比BeginInvoke(Action action)的方案有明顯的進步之處。在一個事件流中再次使用Ovservable.Start()開啟新的觀察者讓人更加摸不着頭腦。這並不是Rx的問題,而是事件模型在UI編程中存在局限性:不方便使用異步,不具備可測試性等。以XMAL和MVVM為核心的UI編程模型將在未來處於主導地位,由於在MVVM中可以將UI綁定到一個Command,從而解耦了事件模型。

開源項目ReactiveUI提供了一個以Rx基礎的UI編程方案,可以使用在XMAL和MVVM為核心的UI編程中,例如:Xamarin,WFP,Windows Phone8等開發中。

注:在WPF中使用ObserveOn()需要安裝Rx-WPF

Install-Package Rx-WPF

3、再來一個例子,讓我們感受一下Rx的魅力

界面上有兩個Button分別為+和-操作,點擊+按鈕則+1,點擊-按鈕則-1,最終的結果顯示在一個Label中。
這樣的一個需求使用經典事件模型只需要維護一個內部變量,兩個按鈕的Click事件分別對變量做加1或減1的操作即可。
Rx作為一種函數式編程模型講求immutable-不可變性,即不使用變量來維護內部狀態。

            var increasedEventStream = btnIncreasement.FromClickEventPattern()
                .Select(_ => 1);
            var decreasedEventStream = btnDecrement.FromClickEventPattern()
                .Select(_ => -1);

            increasedEventStream.Merge(decreasedEventStream)
                .Scan(0, (result, s) => result + s)
                .Subscribe(x => lblResult.Text = x.ToString());

這個例子使用了IObservable<T>的”謂詞”來對事件流做了一些操作。

  • Select跟Linq操作有點類似,分別將兩個按鈕的事件變形為IObservable<int>(1)和IObservable<int>(-1);
  • Merge操作將兩個事件流合並為一個;
  • Scan稍顯復雜,對事件流做了一個折疊操作,給定了一個初始值,並通過一個函數來對結果和下一個值進行累加;

下面就讓我們來看看IObservable<T>中常用的“謂詞”

四、IObservable<T>中的謂詞

IObservable<T>的靈感來源於LINQ,所以很多操作也跟LINQ中的操作差不多,例如Where、First、Last、Single、Max、Any。
還有一些“謂詞”則是新出現的,例如上面提到的”Merge”、“Scan”等,為了理解這些“謂詞”的含義,我們請出一個神器RxSandbox

1、Merge操作,從下面的圖中我們可以清晰的看出Merge操作將三個事件流中的事件合並在了同一個時間軸上。

2、Where操作則是根據指定的條件篩選出事件。

有了這個工具我們可以更加方便的了解這些“謂詞”的用途。

五、IObservable<T>的創建

Observable類提供了很多靜態方法用來創建IObservable<T>,之前的例子我們都使用FromEventPattern方法來將事件轉化為IObservable<T>,接下來再看看別的方法。

Return可以創建一個具體的IObservable<T>:

       public static void UsingReturn()
        {
            var greeting = Observable.Return("Hello world");
            greeting.Subscribe(Console.WriteLine);
        }

Create也可以創建一個IObservable<T>,並且擁有更加豐富的重載:

       public static void UsingCreate()
        {
            var greeting = Observable.Create<string>(observer =>
            {
                observer.OnNext("Hello world");
                return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
            });

            greeting.Subscribe(Console.WriteLine);
        }

Range方法可以產生一個指定范圍內的IObservable<T>

 Observable.Range(1, 10)
           .Subscribe(x => Console.WriteLine(x.ToString()));

Generate方法是一個折疊操作的逆向操作,又稱Unfold方法:

       public static void UsingGenerate()
        {
            var range = Observable.Generate(0, x => x < 10, x => x + 1, x => x);
            range.Subscribe(Console.WriteLine);
        }

Interval方法可以每隔一定時間產生一個IObservable<T>:

Observable.Interval(TimeSpan.FromSeconds(1))
           .Subscribe(x => Console.WriteLine(x.ToString()));

Subscribe方法有一個重載,可以分別對Observable發生異常和Observable完成定義一個回調函數。

 Observable.Range(1, 10)
           .Subscribe(x => Console.WriteLine(x.ToString()), e => Console.WriteLine("Error" + e.Message), () => Console.WriteLine("Completed"));

還可以將IEnumerable<T>轉化為IObservable<T>類型:

Enumerable.Range(1, 10).ToObservable()
          .Subscribe(x => Console.WriteLine(x.ToString()));

也可以將IObservable<T>轉化為IEnumerable<T>

var list= Observable.Range(1, 10).ToEnumerable();

六、Scheduler

Rx的核心是觀察者模式和異步,Scheduler正是為異步而生。我們在之前的例子中已經接觸過一些具體的Scheduler了,那么他們都具體是做什么的呢?

1、先看下面的代碼:

        public static void UsingScheduler()
        {
            Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
            var source = Observable.Create<int>(
            o =>
            {
                Console.WriteLine("Invoked on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
                o.OnNext(1);
                o.OnNext(2);
                o.OnNext(3);
                o.OnCompleted();
                Console.WriteLine("Finished on threadId:{0}",Thread.CurrentThread.ManagedThreadId);
                return Disposable.Empty;
            });
            source
            //.SubscribeOn(NewThreadScheduler.Default)
            //.SubscribeOn(ThreadPoolScheduler.Instance)
            .Subscribe(
            o => Console.WriteLine("Received {1} on threadId:{0}",Thread.CurrentThread.ManagedThreadId,o),
            () => Console.WriteLine("OnCompleted on threadId:{0}",Thread.CurrentThread.ManagedThreadId));
            Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
        } 

當我們不使用任何Scheduler的時候,整個Rx的觀察者和主題都跑在主線程中,也就是說並沒有異步執行。正如下面的截圖,所有的操作都跑在threadId=1的線程中。

當我們使用SubscribeOn(NewThreadScheduler.Default)或者SubscribeOn(ThreadPoolScheduler.Instance)的時候,觀察者和主題都跑在了theadId=3的線程中。

這兩個Scheduler的區別在於:NewThreadScheduler用於執行一個長時間的操作,ThreadPoolScheduler用來執行短時間的操作。

2、SubscribeOn和ObserveOn的區別

上面的例子僅僅展示了SubscribeOn()方法,Rx中還有一個ObserveOn()方法。stackoverflow上有一個這樣的問題:What's the difference between SubscribeOn and ObserveOn,其中一個簡單的例子很好的詮釋了這個區別。

        public static void DifferenceBetweenSubscribeOnAndObserveOn()
        {
            Thread.CurrentThread.Name = "Main";

            IScheduler thread1 = new NewThreadScheduler(x => new Thread(x) { Name = "Thread1" });
            IScheduler thread2 = new NewThreadScheduler(x => new Thread(x) { Name = "Thread2" });

            Observable.Create<int>(o =>
            {
                Console.WriteLine("Subscribing on " + Thread.CurrentThread.Name);
                o.OnNext(1);
                return Disposable.Create(() => { });
            })
            //.SubscribeOn(thread1)
            //.ObserveOn(thread2)
            .Subscribe(x => Console.WriteLine("Observing '" + x + "' on " + Thread.CurrentThread.Name));
        }
  • 當我們注釋掉:SubscribeOn(thread1)和ObserveOn(thread2)時的結果如下:

    觀察者和主題都跑在name為Main的thread中。

  • 當我們放開SubscribeOn(thread1):

    主題和觀察者都跑在了name為Thread1的線程中

  • 當我們注釋掉:SubscribeOn(thread1),放開ObserveOn(thread2)時的結果如下:

    主題跑在name為Main的主線程中,觀察者跑在了name=Thread2的線程中。

  • 當我們同時放開SubscribeOn(thread1)和ObserveOn(thread2)時的結果如下:

    主題跑在name為Thread1的線程中,觀察者跑在了name為Thread2的線程中。

至此結論應該非常清晰了:SubscribeOn()和ObserveOn()分別控制着主題和觀察者的異步。

七、其他Rx資源

除了.net中的Rx.net,其他語言也紛紛推出了自己的Rx框架。

 

參考資源:

http://rxwiki.wikidot.com/101samples

http://introtorx.com/Content/v1.0.10621.0/01_WhyRx.html#WhyRx

http://www.codeproject.com/Articles/646361/Reactive-Programming-For-NET-And-Csharp-Developers


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM