.NET(C#):線程安全集合的阻塞BlockingCollection的使用


 

 

 

返回目錄

1. 限制最大容量:BoundedCapacity

BoundedCapacity屬性和CompleteAdding方法,它們都可以從某種方式上限制元素被加入到集合中。但BoundedCapacity是用來限制集合的最大容量,當容量已滿后,后續的添加操作會被阻塞,一旦有元素被移除,那么阻塞的添加操作會成功執行。

 

比如下面代碼,試圖將1-50加入到BlockingCollection,此時默認內部是ConcurrentBag,當然你可以指定任意IProducerConsumerCollection。我們把BoundedCapacity設置成2。

 

var bcollec = new BlockingCollection<int>(2);

//試圖添加1-50

Task.Run(() =>

    {

        Parallel.For(151, i =>

        {

            bcollec.Add(i);

            Console.WriteLine("加入:" + i);

        });

    });

 

Thread.Sleep(1000);

Console.WriteLine("調用一次Take");

bcollec.Take();

 

Thread.Sleep(Timeout.Infinite);

 

可能的輸出:

加入:37

加入:13

調用一次Take

加入:25

 

只有最多兩個可以加入,然后調用Take后,下一個元素才會被加入。(注意此時Parallel.For中會有多個線程處於阻塞狀態,因為無法加入數據)。

 

 

 

返回目錄

2. 禁止加入:CompleteAdding和IsCompleted

 

CompleteAdding方法則是直接不允許任何元素被加入集合,即使是當前元素的數量小於BoundedCapacity屬性。

 

代碼:

var bcollec = new BlockingCollection<int>(5);

//試圖添加1-50

Task.Run(() =>

    {

        Parallel.For(151, i =>

        {

            Console.WriteLine("准備加入:" + i);

            bcollec.Add(i);

            Console.WriteLine("== 成功加入:" + i);

            Thread.Sleep(1000);

        });

    });

 

//等待一小段時間后馬上調用CompleteAdding

Thread.Sleep(500);

Console.WriteLine("調用CompleteAdding");

bcollec.CompleteAdding();

 

Thread.Sleep(Timeout.Infinite);

 

上述代碼可能的輸出:

准備加入:1

准備加入:13

准備加入:25

准備加入:37

== 成功加入:13

== 成功加入:1

== 成功加入:37

== 成功加入:25

調用CompleteAdding

准備加入:2

准備加入:38

准備加入:26

准備加入:14

 

可以看到,雖然BlockingCollection的BoundedCapacity為5,但是由於提前調用了CompleteAdding,即使當前集合只有4個元素,也不會再同意新的加入操作了。

 

那么CompleteAdding有什么用?當使用了CompleteAdding方法后且集合內沒有元素的時候,另一個屬性IsCompleted此時會為True,這個屬性可以用來判斷是否當前集合內的所有元素都被處理完,而BlockingCollection背后的IProducerConsumerCollection恰恰常用來處理此類生產者-消費者問題的。

 

下面我們首先在多個線程中試圖往BlockingCollection中加入元素,然后中途調用CompleteAdding,接着通過IsCompleted屬性逐個處理被成功加入的元素。

 

如下代碼:

var bcollec = new BlockingCollection<int>();

//試圖添加1-50

Task.Run(() =>

    {

        Parallel.For(151, i =>

        {

            bcollec.Add(i);

            Console.WriteLine("成功加入:" + i);

            Thread.Sleep(1000);

        });

    });

 

//等待一小段時間后馬上調用CompleteAdding

Thread.Sleep(700);

Console.WriteLine("調用CompleteAdding");

bcollec.CompleteAdding();

 

Console.WriteLine("容器元素數量:" + bcollec.Count);

 

while (!bcollec.IsCompleted)

{

    var res = bcollec.Take();

    Console.WriteLine("取出:" + res);

}

 

Console.WriteLine("操作完成");

 

Thread.Sleep(Timeout.Infinite);

 

可能的輸出:

成功加入:37

成功加入:25

成功加入:13

成功加入:1

調用CompleteAdding

容器元素數量:4

取出:1

取出:37

取出:25

取出:13

操作完成

 

 

返回目錄

3. 枚舉:GetConsumingEnumerable和BlockingCollection本身

BlockingCollection有兩種枚舉方法,首先BlockingCollection本身繼承自IEnumerable<T>,所以它自己就可以被foreach枚舉,首先BlockingCollection包裝了一個線程安全集合,那么它自己也是線程安全的,而當多個線程在同時修改或訪問線程安全容器時,BlockingCollection自己作為IEnumerable會返回一個一定時間內的集合片段,也就是只會枚舉在那個時間點上內部集合的元素。

 

看下面代碼:

var bcollec = new BlockingCollection<int>();

//試圖添加1-10

Task.Run(() =>

{

    var forOpt = new ParallelOptions()

    {

        //防止在某些硬件上並發數太多

        MaxDegreeOfParallelism = 2

    };

 

    Parallel.For(111, forOpt, i =>

    {

        bcollec.Add(i);

        Console.WriteLine("成功加入:" + i);

        Thread.Sleep(500);

    });

});

 

Thread.Sleep(700);

//開始枚舉

Task.Run(() =>

{

    foreach (var i in bcollec)

        Console.WriteLine("輸出:" + i);

});

 

Thread.Sleep(Timeout.Infinite);

 

我們邊加入元素邊進行枚舉(直接在BlockingCollection上foreach),可能的輸出:

成功加入:1

成功加入:6

成功加入:2

成功加入:7

輸出:1

輸出:6

輸出:2

輸出:7

成功加入:8

成功加入:3

成功加入:4

成功加入:9

成功加入:5

成功加入:10

 

可以看到,BlockingCollection本身的迭代器只能反映出一時的容器內容。

 

而BlockingCollection還有一個GetConsumingEnumerable方法,同樣返回一個IEnumerable<T>,這個可枚舉的集合背后的迭代器不同於BlockingCollection本身的迭代器,它可以返回最新的加入的元素,如果當前時間段沒有元素被加入,它會阻塞然后等新加進來的元素。

 

我們把上面的使用BlockingCollection本身枚舉代碼中的枚舉Task改成這樣:

//開始枚舉

Task.Run(() =>

{

    foreach (var i in bcollec.GetConsumingEnumerable())

        Console.WriteLine("輸出:" + i);

    Console.WriteLine("完成枚舉");

});

 

可能的輸出:

成功加入:6

成功加入:1

成功加入:2

成功加入:7

輸出:6

輸出:1

輸出:2

輸出:7

成功加入:3

成功加入:8

輸出:3

輸出:8

成功加入:4

成功加入:9

輸出:4

輸出:9

成功加入:10

成功加入:5

輸出:10

輸出:5

 

這個迭代器很給力,一直處於等待和執行的狀態,只要有新的元素被加入,它會找機會去執行foreach的內容,然后再阻塞去等新的元素。

 

而且在輸出中,代碼里的“完成枚舉”字符串一直沒有被輸出。此時它還在賣力地等……因為它不確定什么時候才不會有新元素被加入。

 

 

 

返回目錄

4. GetConsumingEnumerable和CompleteAdding

 

好,此時你應該想到了上面學的CompleteAdding方法,它可以禁止新的元素被加入到BlockingCollection的內部線程安全集合中,所以使用這個方法可以通知GetConsumingEnumerable的迭代器您老不用再等了,后面不會有元素被加進來了。

 

如下代碼:

 

抱歉,這幾段代碼都不短,而且都類似。但我仍然把完整代碼貼出來,雖然這使文章比較冗長,但是我覺得這樣讀者瀏覽或者復制時從上到下一目了然,總比看到諸如“請把前面xxx個代碼做如下修改:把xxx行改成xxx,在xxx行加入這段代碼……”好吧。

 

var bcollec = new BlockingCollection<int>();

//試圖添加1-10

Task.Run(() =>

{

    var forOpt = new ParallelOptions()

    {

        //防止在某些硬件上並發數太多

        MaxDegreeOfParallelism = 2

    };

 

    Parallel.For(111, forOpt, i =>

    {

        Console.WriteLine("等待加入:" + i);

        bcollec.Add(i);

        Console.WriteLine("成功加入:" + i);

        Thread.Sleep(500);

    });

});

 

Thread.Sleep(600);

//開始枚舉

Task.Run(() =>

{

    foreach (var i in bcollec.GetConsumingEnumerable())

        Console.WriteLine("輸出:" + i);

    Console.WriteLine("完成枚舉");

});

 

Thread.Sleep(300);

 

bcollec.CompleteAdding();

Console.WriteLine("=== 調用CompleteAdding");

 

Thread.Sleep(Timeout.Infinite);

 

可能的輸出:

等待加入:1

等待加入:6

成功加入:1

成功加入:6

等待加入:2

成功加入:2

等待加入:7

成功加入:7

輸出:1

輸出:6

輸出:2

輸出:7

=== 調用CompleteAdding

完成枚舉

等待加入:3

等待加入:8

 

可以看到,等CompleteAdding,“枚舉完成”馬上被輸出!

 

:D

作者: Mgen

本文版權歸作者所有,歡迎以網址(鏈接)的方式轉載,不歡迎復制文章內容的方式轉載,其一是為了在搜索引擎中去掉重復文章內容,其二復制后的文章往往沒有提供本博客的頁面格式和鏈接,造成文章可讀性很差。望有素質人自覺遵守上述建議。

如果一定要以復制文章內容的方式轉載,必須在文章開頭標明作者信息和原文章鏈接地址。否則保留追究法律責任的權利。

 

當前標簽: BCL

共9頁: 1  2  3  4  5  6  7  8  9  下一頁 
.NET(C#):線程安全集合的阻塞BlockingCollection的使用  _Mgen 2012-03-15 00:50 閱讀:1091 評論:3  
 
.NET(C#):多個await和和異常處理  _Mgen 2012-03-13 22:22 閱讀:24 評論:0  
 
.NET 4.5 beta中關於Task的未覺察異常的更新  _Mgen 2012-03-13 20:12 閱讀:26 評論:0  
 
.NET(C#):await返回Task的async方法  _Mgen 2012-03-12 23:56 閱讀:132 評論:0  
 
WPF:談談各種多線程去修改或訪問UI線程數據的方法  _Mgen 2012-03-10 21:15 閱讀:28 評論:0  
 
.NET(C#) TPL:TaskFactory.FromAsync與委托的異步調用  _Mgen 2012-03-02 14:09 閱讀:1259 評論:3  
 
.NET(C#):設置文件系統對象的訪問控制  _Mgen 2012-02-28 14:26 閱讀:146 評論:0  
 
.NET(C#):ConcurrentBag<T>同線程元素的添加和刪除  _Mgen 2012-02-21 13:45 閱讀:42 評論:0  
 
.NET(C#) TPL:Parallel循環和多個Task的異常處理  _Mgen 2012-02-20 13:40 閱讀:38 評論:0  
 
.NET(C#):DebuggerDisplay特性碉堡了!  _Mgen 2012-02-19 11:33 閱讀:42 評論:0  
 
.NET(C#) TPL:ParallelLoopState的Break方法和LowestBreakIteration屬性  _Mgen 2012-02-18 20:53 閱讀:25 評論:0  
 
.NET(C#):警惕PLINQ結果的無序性  _Mgen 2012-02-17 23:23 閱讀:26 評論:0  
 
.NET(C#):DLR有趣的調用自己的動態對象  _Mgen 2012-02-17 20:28 閱讀:28 評論:0  
 
.NET(C#):計算HttpWebResponse的下載速度  _Mgen 2012-01-30 16:34 閱讀:109 評論:0  
 
.NET(C#):使用HttpWebRequest頭中的Range下載文件片段  _Mgen 2012-01-30 10:38 閱讀:103 評論:0  
 
.NET(C#):將數據字節大小轉換成易讀的單位字符串  _Mgen 2012-01-28 18:29 閱讀:59 評論:0  
 
共9頁: 1  2  3  4  5  6  7  8  9  下一頁 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM