關於Reactive Extensions(Rx)
關於Reactive Extensions(Rx),先來看一下來自微軟的官方描述:
The Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators. Using Rx, developers represent asynchronous data streams with Observables, query asynchronous data streams using LINQ operators, and parameterize the concurrency in the asynchronous data streams using Schedulers. Simply put, Rx = Observables + LINQ + Schedulers.
“Reactive Extensions(Rx)是一個類庫,它集成了異步、基於可觀察(observable)序列的事件驅動編程和LINQ-style的查詢操作。使用Rx,開發人員可以用observable對象描述異步數據流,使用LINQ操作符異步查詢數據和使用Schedulers控制異步過程中的並發。簡而言之,Rx = Observables + LINQ + Schedulers。”
Reactive Extensions(Rx)就一定是多線程?
在以上的描述中,反復出現了一個詞“異步”。一般來講,提到“異步”,首先反應到的就是多線程。那問題來了,使用Reactive Extensions就一定意味着多線程嗎?先來看一個示例,代碼來了:
1 static void Main(string[] args) 2 { 3 Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 4 var sub = new Subject<Object>(); 5 6 sub.Subscribe(o => Console.WriteLine("Received {1} on threadId:{0}", //為Observable訂閱處理器(handler),輸出handler thread id 7 Thread.CurrentThread.ManagedThreadId, 8 o)); 9 ParameterizedThreadStart notify = obj => //委托定義,其內輸出被觀察對象的thread id 10 { 11 Console.WriteLine("OnNext({1}) on threadId:{0}", 12 Thread.CurrentThread.ManagedThreadId, 13 obj); 14 sub.OnNext(obj); 15 }; 16 notify(1); 17 new Thread(notify).Start(2); 18 new Thread(notify).Start(3); 19 20 Console.Read(); 21 }
代碼中,分別輸出了通知者的thread id和callback handler的thread id。這里使用的是Rx默認的線程並發方式。輸出結果如下:

無論是在當前線程調用,還是新啟線程執行,通知者和處理方法所在線程均為同一個。在該示例中,Rx的線程分配是在free-threaded模式下工作的,free-threaded就意味着我們不強行指其Rx中的subscription, notification執行線程。這是Rx的默認工作模式,而這種模式下subscribing/call OnNext並沒有引發新的線程來處理observable序列,線處理方式是單線程(Single Threaded Apartment,STA)。所以,我們可以這樣說:單線程是Reactive Extensions(Rx)的默認處理方式。
使用SubscribeOn控制訂閱(subscribing)的上下文
IObservable<TSource>的擴展方法SubscribeOn<TSource>(IScheduler)允許我們傳入一調度器(Scheduler),控制訂閱執行的上下文。
1 static void Main(string[] args) 2 { 3 Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 4 var source = Observable.Create<int>( 5 o => 6 { 7 Console.WriteLine("Invoked on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 8 o.OnNext(1); 9 o.OnNext(2); 10 o.OnNext(3); 11 o.OnCompleted(); 12 Console.WriteLine("Finished on threadId:{0}", 13 Thread.CurrentThread.ManagedThreadId); 14 return Disposable.Empty; 15 }); 16 source 17 //.SubscribeOn(Scheduler.ThreadPool) 18 .Subscribe( 19 o => Console.WriteLine("Received {1} on threadId:{0}", 20 Thread.CurrentThread.ManagedThreadId, 21 o), 22 () => Console.WriteLine("OnCompleted on threadId:{0}", 23 Thread.CurrentThread.ManagedThreadId)); 24 Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 25 26 Console.Read(); 27 }
代碼中,使用Observable.Create創建一Observable序列,隨后訂閱該序列。輸出結果為:

當序列被訂閱source.Subscribe,代理Observable.Create被調用執行。首先是OnNext(1) handler,依次是OnNext(2) OnNext(3) handler和OnCompleted handler,最后執行到“Subscribed on threadId:10”。整個過程是線性的,阻塞(block)式的。這是符合上面分析的Rx默認單線程的模式的。
我們放開.SubscribeOn(Scheduler.ThreadPool)的注釋,指定Rx工作在線程池內完成。執行結果如下:

可以看到,所有的handler都是在一新線程內完成的。這是一個非阻塞的(no=-block)模式。
SubscribeOn方法常用來指定Observable notifications的線程執行模式(哪里執行)。其常用於以下的場景中:
》 UI線程不允許阻塞
》 不需在UI線程中更新顯示
常用的Scheduler屬性:
| CurrentThread | 在當前進程中盡快的調度工作,同步(synchronous,block) |
| Immediate | 在當前進程中立即調度工作,同步(synchronous,block) |
| NewThread | 在新線程中調度工作(asynchronous,no-block) |
| TaskPool | 在任務工廠中調度工作(asynchronous,no-block) |
| ThreadPool | 在線程池中調度工作(asynchronous,no-block) |
參考資料:
The Reactive Extensions(Rx)...
