Net中的反應式編程(Reactive Programming)
系列主題:基於消息的軟件架構模型演變
一、反應式編程(Reactive Programming)
1、什么是反應式編程:反應式編程(Reactive programming)簡稱Rx,他是一個使用LINQ風格編寫基於觀察者模式的異步編程模型。簡單點說Rx = Observables + LINQ + Schedulers。
2、為什么會產生這種風格的編程模型?我在本系列文章開始的時候說過一個使用事件的例子:
1
2
3
4
5
6
7
8
9
|
var
watch =
new
FileSystemWatcher();
watch.Created += (s, e) =>
{
var
fileType = Path.GetExtension(e.FullPath);
if
(fileType.ToLower() ==
"jpg"
)
{
//do some thing
}
};
|
這個代碼定義了一個FileSystemWatcher,然后在Watcher事件上注冊了一個匿名函數。事件的使用是一種命令式代碼風格,有沒有辦法寫出聲明性更強的代碼風格?我們知道使用高階函數可以讓代碼更具聲明性,整個LINQ擴展就是一個高階函數庫,常見的LINQ風格代碼如下:
1
2
3
4
|
var
list = Enumerable.Range(1, 10)
.Where(x => x > 8)
.Select(x => x.ToString())
.First();
|
能否使用這樣的風格來編寫事件呢?
3、事件流
LINQ是對IEnumerable<T>的一系列擴展方法,我們可以簡單的將IEnumerable<T>認為是一個集合。當我們將事件放在一個時間范圍內,事件也變成了集合。我們可以將這個事件集合理解為事件流。
事件流的出現給了我們一個能夠對事件進行LINQ操作的靈感。
二、反應式編程中的兩個重要類型
事件模型從本質上來說是觀察者模式,所以IObservable<T>和IObserver<T>也是該模型的重頭戲。讓我們來看看這兩個接口的定義:
1
2
3
4
5
|
public
interface
IObservable<
out
T>
{
//Notifies the provider that an observer is to receive notifications.
IDisposable Subscribe(IObserver<T> observer);
}
|
1
2
3
4
5
6
7
8
9
10
11
|
public
interface
IObserver<
in
T>
{
//Notifies the observer that the provider has finished sending push-based notifications.
void
OnCompleted();
//Notifies the observer that the provider has experienced an error condition.
void
OnError(Exception error);
//Provides the observer with new data.
void
OnNext(T value);
}
|
這兩個名稱准確的反應出了它兩的職責:IObservable<T>-可觀察的事物,IObserver<T>-觀察者。
IObservable<T>只有一個方法Subscribe(IObserver<T> observer),此方法用來對事件流注冊一個觀察者。
IObserver<T>有三個回調方法。當事件流中有新的事件產生的時候會回調OnNext(T value),觀察者會得到事件中的數據。OnCompleted()和OnError(Exception error)則分別用來通知觀察者事件流已結束,事件流發生錯誤。
顯然事件流是可觀察的事物,我們用Rx改寫上面的例子:
1
2
3
4
5
6
|
Observable.FromEventPattern<FileSystemEventArgs>(watch,
"Created"
)
.Where(e => Path.GetExtension(e.EventArgs.FullPath).ToLower() ==
"jpg"
)
.Subscribe(e =>
{
//do some thing
});
|
注:在.net下使用Rx編程需要安裝以下Nuget組件:
- Rx-Core
- Rx-Interfaces
- Rx-Linq
三、UI編程中使用Rx
Rx模型不但使得代碼更加具有聲明性,而且使得UI編程更加簡單。
1、UI編程中的第一段Rx代碼
為了簡單的展示如何在UI編程中使用Rx,我們以Winform中的Button為例,看看事件模型和Rx有何不同。
1
2
3
4
5
6
7
8
9
|
private
void
BindFirstGroupButtons()
{
btnFirstEventMode.Click += btnFirstEventMode_Click;
}
void
btnFirstEventMode_Click(
object
sender, EventArgs e)
{
MessageBox.Show(
"hello world"
);
}
|
添加了一個Button,點擊Button的時候彈出一個對話框。使用Rx做同樣的實現:
1
2
3
4
|
//得到了Button的Click事件流。
var
clickedStream = Observable.FromEventPattern<EventArgs>(btnFirstReactiveMode,
"Click"
);
//在事件流上注冊了一個觀察者。
clickedStream.Subscribe(e => MessageBox.Show(
"Hello world"
));
|
有朋友指出字符串“Click”非常讓人不爽,這確實是個問題。由於Click是一個event類型,無法用表達式樹獲取其名稱,最終我想到使用擴展方法來實現:
1
2
3
4
5
6
7
8
9
|
public
static
IObservable<EventPattern<EventArgs>> FromClickEventPattern(
this
Button button)
{
return
Observable.FromEventPattern<EventArgs>(button,
"Click"
);
}
public
static
IObservable<EventPattern<EventArgs>> FromDoubleClickEventPattern(
this
Button button)
{
return
Observable.FromEventPattern<EventArgs>(button,
"DoubleClick"
);
}
|
我們平時常用的事件類型也就那么幾個,可以暫時通過這種方案來實現,該方案算不上完美,但是比起直接使用字符串又能優雅不少。
1
2
|
btnFirstReactiveMode.FromClickEventPattern()
.Subscribe(e => MessageBox.Show(
"hello world"
));
|
2、UI編程中存在一個很常見的場景:當一個事件的注冊者阻塞了線程時,整個界面都處於假死狀態。.net中的異步模型也從APM,EAP,TPL不斷演化直至async/await模型的出現才使得異步編程更加簡單易用。我們來看看界面假死的代碼:
1
2
3
4
5
6
|
void
btnSecondEventMode_Click(
object
sender, EventArgs e)
{
btnSecondEventMode.BackColor = Color.Coral;
Thread.Sleep(5000);
MessageBox.Show(
"hello world"
);
}
|
Thread.Sleep(5000);模擬了一個長時間的操作,當你點下Button時整個界面處於假死狀態並且此時的程序無法響應其他的界面事件。傳統的解決方案是使用多線程來解決假死:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
void
BtnSecondEventAsyncModel_Click(
object
sender, EventArgs e)
{
Action action = () =>
{
Task.Run(() =>
{
BtnSecondEventAsyncModel.BackColor = Color.Coral;
Thread.Sleep(5000);
MessageBox.Show(
"hello world"
);
});
};
BeginInvoke(action);
}
|
這個代碼的復雜點在於:普通的多線程無法對UI進行操作,在Winform中需要用Control.BeginInvoke(Action action)經過包裝后,多線程中的UI操作才能正確執行,WPF則要使用Dispatcher.BeginInvoke(Action action)包裝。
在Rx中選擇合適的Scheduler類型就可以輕松解決多線程更新UI的問題。
1
2
3
4
5
6
7
8
|
btnSecondReactiveMode.FromClickEventPattern()
.ObserveOn(
new
NewThreadScheduler())
.Subscribe(e =>
{
btnSecondReactiveMode.BackColor = Color.Coral;
Thread.Sleep(5000);
MessageBox.Show(
"hello world"
);
});
|
一句ObserveOn(new NewThreadScheduler())成功讓后面的觀察者跑在了新線程中,並且避免了多線程更新UI的問題,使用者無需再被這種問題所困擾。——抱歉,這一段描述有問題,簡單的使用NewThreadScheduler並不能保證異步更新UI。待我周末了詳細補充一下Rx中的異步能力。
注:使用Scheduler需要從Nuget中安裝Rx-PlatformServices。
常用的Scheduler有:
- DefaultScheduler:默認的Scheduler,會將觀察者添加在調用隊列中
- ImmediateScheduler:立即執行
- TaskPoolScheduler:用來執行短時間的任務
- ...
3、再來一個例子,讓我們感受一下Rx的魅力
界面上有兩個Button分別為+和-操作,點擊+按鈕則+1,點擊-按鈕則-1,最終的結果顯示在一個Label中。
這樣的一個需求使用經典事件模型只需要維護一個內部變量,兩個按鈕的Click事件分別對變量做加1或減1的操作即可。
Rx作為一種函數式編程模型講求immutable-不可變性,即不使用變量來維護內部狀態。
1
2
3
4
5
6
7
8
|
var
increasedEventStream = btnIncreasement.FromClickEventPattern()
.Select(_ => 1);
var
decreasedEventStream = btnDecrement.FromClickEventPattern()
.Select(_ => -1);
increasedEventStream.Merge(decreasedEventStream)
.Scan(0, (result, s) => result + s)
.Subscribe(x => lblResult.Text = x.ToString());
|
這個例子使用了IObservable<T>的”謂詞”來對事件流做了一些操作。
- Select跟Linq操作有點類似,分別將兩個按鈕的事件變形為IObservable<int>(1)和IObservable<int>(-1);
- Merge操作將兩個事件流合並為一個;
- Scan稍顯復雜,對事件流做了一個折疊操作,給定了一個初始值,並通過一個函數來對結果和下一個值進行累加;
下面就讓我們來看看IObservable<T>中常用的“謂詞”
四、IObservable<T>中的謂詞
IObservable<T>的靈感來源於LINQ,所以很多操作也跟LINQ中的操作差不多,例如Where、First、Last、Single、Max、Any。
還有一些“謂詞”則是新出現的,例如上面提到的”Merge”、“Scan”等,為了理解這些“謂詞”的含義,我們請出一個神器RxSandbox。
1、Merge操作,從下面的圖中我們可以清晰的看出Merge操作將三個事件流中的事件合並在了同一個時間軸上。
2、Where操作則是根據指定的條件篩選出事件。
有了這個工具我們可以更加方便的了解這些“謂詞”的用途。
五、IObservable<T>的創建
Observable類提供了很多靜態方法用來創建IObservable<T>,之前的例子我們都使用FromEventPattern方法來將事件轉化為IObservable<T>,接下來再看看別的方法。
Range方法可以產生一個指定范圍內的IObservable<T>
1
2
|
Observable.Range(1, 10)
.Subscribe(x => Console.WriteLine(x.ToString()));
|
Interval方法可以每隔一定時間產生一個IObservable<T>:
1
2
|
Observable.Interval(TimeSpan.FromSeconds(1))
.Subscribe(x => Console.WriteLine(x.ToString()));
|
Subscribe方法有一個重載,可以分別對Observable發生異常和Observable完成定義一個回調函數。
1
2
|
Observable.Range(1, 10)
.Subscribe(x => Console.WriteLine(x.ToString()), e => Console.WriteLine(
"Error"
+ e.Message), () => Console.WriteLine(
"Completed"
));
|
還可以將IEnumerable<T>轉化為IObservable<T>類型:
1
2
|
Enumerable.Range(1, 10).ToObservable()
.Subscribe(x => Console.WriteLine(x.ToString()));
|
也可以將IObservable<T>轉化為IEnumerable<T>
1
|
var
list= Observable.Range(1, 10).ToEnumerable();
|
六、其他Rx資源
除了.net中的Rx.net,其他語言也紛紛推出了自己的Rx框架。