Parallel.Foreach的並發問題解決方法-比如爬蟲WebClient


場景五:線程局部變量

Parallel.ForEach 提供了一個線程局部變量的重載,定義如下:

public static ParallelLoopResult ForEach<TSource, TLocal>( IEnumerable<TSource> source, Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal,TLocal> body, Action<TLocal> localFinally)

使用的示例:

public static List<R> Filtering<T,R>(IEnumerable<T> source) { var results = new List<R>(); using (SemaphoreSlim sem = new SemaphoreSlim(1)) { Parallel.ForEach(source, () => new List<R>(), (element, loopstate, localStorage) => { bool filter = filterFunction(element); if (filter) localStorage.Add(element); return localStorage; }, (finalStorage) => { lock(myLock) { results.AddRange(finalStorage) }; }); } return results; }

線程局部變量有什么優勢呢?請看下面的例子(一個網頁抓取程序):

public static void UnsafeDownloadUrls () { WebClient webclient = new WebClient(); Parallel.ForEach(urls, (url,loopstate,index) => { webclient.DownloadFile(url, filenames[index] + ".dat"); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); }); }

通常第一版代碼是這么寫的,但是運行時會報錯“System.NotSupportedException -> WebClient does not support concurrent I/O operations.”。這是因為多個線程無法同時訪問同一個 WebClient 對象。所以我們會把 WebClient 對象定義到線程中來:

public static void BAD_DownloadUrls () { Parallel.ForEach(urls, (url,loopstate,index) => { WebClient webclient = new WebClient(); webclient.DownloadFile(url, filenames[index] + ".dat"); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); }); }

修改之后依然有問題,因為你的機器不是服務器,大量實例化的 WebClient 迅速達到你機器允許的虛擬連接上限數。線程局部變量可以解決這個問題:

public static void downloadUrlsSafe()
{
	Parallel.ForEach(urls,
		() => new WebClient(), (url, loopstate, index, webclient) => { webclient.DownloadFile(url, filenames[index]+".dat"); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); return webclient; }, (webclient) => { }); }

這樣的寫法保證了我們能獲得足夠的 WebClient 實例,同時這些 WebClient 實例彼此隔離僅僅屬於各自關聯的線程。

雖然 PLINQ 提供了 ThreadLocal<T> 對象來實現類似的功能:

public static void downloadUrl() { var webclient = new ThreadLocal<WebClient>(()=> new WebClient ()); var res = urls .AsParallel() .ForAll( url => { webclient.Value.DownloadFile(url, host[url] +".dat")); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); }); }

但是請注意:ThreadLocal<T> 相對而言開銷更大!

--

場景五:退出操作 (使用 Parallel.ForEach)

Parallel.ForEach 有個重載聲明如下,其中包含一個 ParallelLoopState 對象:

public static ParallelLoopResult ForEach<TSource >( IEnumerable<TSource> source, Action<TSource, ParallelLoopState> body)

ParallelLoopState.Stop() 提供了退出循環的方法,這種方式要比其他兩種方法更快。這個方法通知循環不要再啟動執行新的迭代,並盡可能快的推出循環。

ParallelLoopState.IsStopped 屬性可用來判定其他迭代是否調用了 Stop 方法。

示例:

public static boolean FindAny<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T> { var matchFound = false; Parallel.ForEach(TSpace, (curValue, loopstate) => { if (curValue.Equals(match) ) { matchFound = true; loopstate.Stop(); } }); return matchFound; }

ParallelLoopState.Break() 通知循環繼續執行本元素前的迭代,但不執行本元素之后的迭代。最前調用 Break 的起作用,並被記錄到 ParallelLoopState.LowestBreakIteration 屬性中。這種處理方式通常被應用在一個有序的查找處理中,比如你有一個排序過的數組,你想在其中查找匹配元素的最小 index,那么可以使用以下的代碼:

public static int FindLowestIndex<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T> { var loopResult = Parallel.ForEach(source, (curValue, loopState, curIndex) => { if (curValue.Equals(match)) { loopState.Break(); } }); var matchedIndex = loopResult.LowestBreakIteration; return matchedIndex.HasValue ? matchedIndex : -1; }

更多:
http://www.tuicool.com/articles/jqaUVj


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM