場景五:線程局部變量
Parallel.ForEach 提供了一個線程局部變量的重載,定義如下:
public static ParallelLoopResult ForEach<TSource, TLocal>( IEnumerable<TSource> source, Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal,TLocal> body, Action<TLocal> localFinally)
使用的示例:
public static List<R> Filtering<T,R>(IEnumerable<T> source) { var results = new List<R>(); using (SemaphoreSlim sem = new SemaphoreSlim(1)) { Parallel.ForEach(source, () => new List<R>(), (element, loopstate, localStorage) => { bool filter = filterFunction(element); if (filter) localStorage.Add(element); return localStorage; }, (finalStorage) => { lock(myLock) { results.AddRange(finalStorage) }; }); } return results; }
線程局部變量有什么優勢呢?請看下面的例子(一個網頁抓取程序):
public static void UnsafeDownloadUrls () { WebClient webclient = new WebClient(); Parallel.ForEach(urls, (url,loopstate,index) => { webclient.DownloadFile(url, filenames[index] + ".dat"); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); }); }
通常第一版代碼是這么寫的,但是運行時會報錯“System.NotSupportedException -> WebClient does not support concurrent I/O operations.”。這是因為多個線程無法同時訪問同一個 WebClient 對象。所以我們會把 WebClient 對象定義到線程中來:
public static void BAD_DownloadUrls () { Parallel.ForEach(urls, (url,loopstate,index) => { WebClient webclient = new WebClient(); webclient.DownloadFile(url, filenames[index] + ".dat"); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); }); }
修改之后依然有問題,因為你的機器不是服務器,大量實例化的 WebClient 迅速達到你機器允許的虛擬連接上限數。線程局部變量可以解決這個問題:
public static void downloadUrlsSafe()
{
Parallel.ForEach(urls,
() => new WebClient(), (url, loopstate, index, webclient) => { webclient.DownloadFile(url, filenames[index]+".dat"); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); return webclient; }, (webclient) => { }); }
這樣的寫法保證了我們能獲得足夠的 WebClient 實例,同時這些 WebClient 實例彼此隔離僅僅屬於各自關聯的線程。
雖然 PLINQ 提供了 ThreadLocal<T> 對象來實現類似的功能:
public static void downloadUrl() { var webclient = new ThreadLocal<WebClient>(()=> new WebClient ()); var res = urls .AsParallel() .ForAll( url => { webclient.Value.DownloadFile(url, host[url] +".dat")); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); }); }
但是請注意:ThreadLocal<T> 相對而言開銷更大!
--
場景五:退出操作 (使用 Parallel.ForEach)
Parallel.ForEach 有個重載聲明如下,其中包含一個 ParallelLoopState 對象:
public static ParallelLoopResult ForEach<TSource >( IEnumerable<TSource> source, Action<TSource, ParallelLoopState> body)
ParallelLoopState.Stop() 提供了退出循環的方法,這種方式要比其他兩種方法更快。這個方法通知循環不要再啟動執行新的迭代,並盡可能快的推出循環。
ParallelLoopState.IsStopped 屬性可用來判定其他迭代是否調用了 Stop 方法。
示例:
public static boolean FindAny<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T> { var matchFound = false; Parallel.ForEach(TSpace, (curValue, loopstate) => { if (curValue.Equals(match) ) { matchFound = true; loopstate.Stop(); } }); return matchFound; }
ParallelLoopState.Break() 通知循環繼續執行本元素前的迭代,但不執行本元素之后的迭代。最前調用 Break 的起作用,並被記錄到 ParallelLoopState.LowestBreakIteration 屬性中。這種處理方式通常被應用在一個有序的查找處理中,比如你有一個排序過的數組,你想在其中查找匹配元素的最小 index,那么可以使用以下的代碼:
public static int FindLowestIndex<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T> { var loopResult = Parallel.ForEach(source, (curValue, loopState, curIndex) => { if (curValue.Equals(match)) { loopState.Break(); } }); var matchedIndex = loopResult.LowestBreakIteration; return matchedIndex.HasValue ? matchedIndex : -1; }
更多:
http://www.tuicool.com/articles/jqaUVj