並發編程的幾種形式
在並發編程中我們經常聽到以下一些概念,今天我將嘗試進行闡述。
一、並發
同時干多件事情,這就是並發的作用。
web服務器可以利用並發同時處理大量用戶的請求。
只要我們需要程序同時干多件事情,我們就需要並發。
二、多線程
並發編程的一種形式,其采用多個線程執行程序。
線程是一個獨立的運行單元,每個進程內部有多個線程,每個線程可以各自同時執行指令。
每個線程有自己獨立的棧,但是與進程內的其他線程共享內存。
線程池是線程更廣泛的一種應用形式,其維護着一定數量的工作線程,這些線程等待着執行分配下來的任務。線程池可以隨時監測線程的數量
線程池催生了另外一種重要的並發形式:並行處理。
多線程並不是並發編程的唯一形式,雖然.NET和Java等語言框架都對底層線程類型提供了支持,但是對開發人員並不友好,最新的.NET和Java
都提供了更高級別的抽象,讓我們開發並發程序更加方便高效。
三、並行處理
將大塊的任務分割成相互獨立的小塊,並分配給多個同時運行的線程處理。
並行處理采用多線程,提高了處理器的利用效率。
並行編程通常不適合服務器系統,服務器本身都具有並發處理能力。
數據並行可以處理大量的彼此獨立的數據,比如Hadoop等大數據處理框架。
任務並行可以將彼此獨立的拆分任務同時執行。
下邊看下.NET中提供的並行編程
使用Parallel.ForEach進行數據並行
void RotateMatrices(IEnumerable<Matrix> matrices, float degrees) { Parallel.ForEach(matrices, matrix => matrix.Rotate(degrees)); }
使用Parallel.ForEach進行數據並行
IEnumerable<bool> PrimalityTest(IEnumerable<int> values) { return values.AsParallel().Select(val => IsPrime(val)); }
數據的獨立性是並行性最大化的前提,否為了確保安全性就需要引入同步,從而影響程序的並行程度。
只能最大程度的並行,但是總是消滅不了同步,數據並行的結果總是需要進行聚合,Parallel實現了響應的重載及map/reduce函數。
Parallel類的Invoke方式可以實現任務並行
void ProcessArray(double[] array) { Parallel.Invoke( () => ProcessPartialArray(array, 0, array.Length / 2), () => ProcessPartialArray(array, array.Length / 2, array.Length) ); } void ProcessPartialArray(double[] array, int begin, int end) { // CPU 密集型的操作...... }
任務並行也依賴任務的獨立性,同時要注意閉包對變量的引用,即使是值類型也是引用。
任務不要特別短,也不要特別長。如果任務太短,把數據分割進任務和在線程池中調度任務的開銷會很大。如果任務太長,線程池就不能進行
有效的動態調整以達到工作量的平衡。
四、異步編程
並發編程的一種形式,它采用future模式或者回調(callback)機制,以避免產生不必要的線程。
回調和事件作為老式的異步編程,在服務器端和GUI中都有廣泛的應用。
一個future或者promise代表一些即將完成的操作,在.NET中的TPL中有Task和Task<TResult>,在Java中有FutureTask,在JS中有fetch(新版Firefox
和Chorm支持)。
異步編程可以在啟動一個操作之后,可以繼續執行而不會被阻塞,待操作執行完之后,通知future或者執行回調函數,以便告知操作結束。
異步編程是一種功能強大的並發形式,但傳統的異步編程特別復雜而且不易於代碼維護。.NET和Node.JS支持的async和await,讓異步編程變得
跟串行編程一樣簡單。
下面看下.NET 的兩個關鍵字: async 和 await 。 async 關鍵字加在方法聲明上,它的主要目的是使方法內的 await 關鍵字生效。如果 async 方法有
返回值,應返回 Task<T> ;如果沒有返回值,應返回 Task 。這些task 類型相當於 future,用來在異步方法結束時通知主程序。下邊的例子同時請求兩
個服務地址,只要有一個返回結果即可完成。
// 返回第一個響應的 URL 的數據長度。 private static async Task<int> FirstRespondingUrlAsync(string urlA, string urlB) { var httpClient = new HttpClient(); // 並發地開始兩個下載任務。 Task<byte[]> downloadTaskA = httpClient.GetByteArrayAsync(urlA); Task<byte[]> downloadTaskB = httpClient.GetByteArrayAsync(urlB); // 等待任意一個任務完成。 Task<byte[]> completedTask = await Task.WhenAny(downloadTaskA, downloadTaskB); // 返回從 URL 得到的數據的長度。 byte[] data = await completedTask; return data.Length; }
五、響應式編程
一種聲明式的編程模式,程序在該模式中對事件進行響應。
程序針對不同的事件進行響應並更新自己的狀態。
異步編程針對啟動的操作,響應編程針對可以任何事件重復發生的異步事件。
響應式編程基於“可觀察的流”(observable stream)。一旦申請了可觀察流,就可以收到任意數量的數據項( OnNext ),並且流在結束時會發出一個錯誤(
OnError )或一個結束的通知( OnCompleted )。實際的接口如下
interface IObserver<in T> { void OnNext(T item); void OnCompleted(); void OnError(Exception error); } interface IObservable<out T> { IDisposable Subscribe(IObserver<T> observer); }
微軟的 Reactive Extensions(Rx)庫已經實現了所有接口。下面的代碼中,前面是我們不熟悉的操作符( Interval 和 Timestamp ),最后是一個 Subscribe ,
但是中間部分是我們在 LINQ 中熟悉的操作符: Where 和 Select 。LINQ 具有的特性,Rx也都有。Rx 在此基礎上增加了很多它自己的操作符,特別
是與時間有關的操作符:
Observable.Interval(TimeSpan.FromSeconds(1)) .Timestamp() .Where(x => x.Value % 2 == 0) .Select(x => x.Timestamp) .Subscribe(x => Trace.WriteLine(x));
上面的代碼中,首先是一個延時一段時間的計數器( Interval ),隨后、后為每個事件加了一個時間戳( Timestamp )。接着對事件進行過濾,只包含偶數
值( Where ),選擇了時間戳的值( Timestamp ),然后當每個時間戳值到達時,把它輸入調試器( Subscribe )。可觀察流的定義和其訂閱是互相獨立的。
上面最后一個例子與下面的代碼等效:
IObservable<DateTimeOffset> timestamps = Observable.Interval(TimeSpan.FromSeconds(1)) .Timestamp() .Where(x => x.Value % 2 == 0) .Select(x => x.Timestamp); timestamps.Subscribe(x => Trace.WriteLine(x));
一種常規的做法是把可觀察流定義為一種類型,然后將其作為 IObservable<T> 資源使用。其他類型可以訂閱這些流,或者把這些流與其他操作符
組合,創建另一個可觀察流Rx 的訂閱也是一個資源。 Subscribe 操作符返回一個 IDisposable ,即表示訂閱完成。當你響應了那個可觀察流,就得處
理這個訂閱。對於hot observable(熱可觀察流)和 cold observable(冷可觀察流)這兩種對象,訂閱的做法各有不同。一個 hot observable 對象是指一直
在發生的事件流,如果在事件到達時沒有訂閱者,事件就丟失了。例如,鼠標的移動就是一個 hot observable 對象。cold observable 對象是始終沒有
輸入事件(不會主動產生事件)的觀察流,它只會通過啟動一個事件隊列來響應訂閱。例如,HTTP 下載是一個 cold observable 對象,只有在訂閱后
才會發出 HTTP 請求。
六、並發集合和不可變集合
大多數並發集合通過快照,既可以確保一個線程修改數據,同時也可以允許多個線程同時枚舉數據。
不可變集合的無法修改性確保了所有操作的簡潔性,特別適合在並發編程中使用。
七、並發編程與函數編程
大多數並發編程技術本質上都是函數式(functional) 的。
函數式編程理念簡化並發編程的設計。每一個並行的片段都有輸入和輸出。他們不依賴於全局(或共享)變量,也不會修改全局(或共享)數據結構。
函數式編程的數據不變性在確保並發安全性的前提下,同時也防止了並發的活躍性問題。