目錄
返回目錄
1. 限制最大容量:BoundedCapacity
BoundedCapacity屬性和CompleteAdding方法,它們都可以從某種方式上限制元素被加入到集合中。但BoundedCapacity是用來限制集合的最大容量,當容量已滿后,后續的添加操作會被阻塞,一旦有元素被移除,那么阻塞的添加操作會成功執行。
比如下面代碼,試圖將1-50加入到BlockingCollection,此時默認內部是ConcurrentBag,當然你可以指定任意IProducerConsumerCollection。我們把BoundedCapacity設置成2。
var bcollec = new BlockingCollection<int>(2);
//試圖添加1-50
Task.Run(() =>
{
Parallel.For(1, 51, i =>
{
bcollec.Add(i);
Console.WriteLine("加入:" + i);
});
});
Thread.Sleep(1000);
Console.WriteLine("調用一次Take");
bcollec.Take();
Thread.Sleep(Timeout.Infinite);
可能的輸出:
加入:37
加入:13
調用一次Take
加入:25
只有最多兩個可以加入,然后調用Take后,下一個元素才會被加入。(注意此時Parallel.For中會有多個線程處於阻塞狀態,因為無法加入數據)。
返回目錄
2. 禁止加入:CompleteAdding和IsCompleted
CompleteAdding方法則是直接不允許任何元素被加入集合,即使是當前元素的數量小於BoundedCapacity屬性。
代碼:
var bcollec = new BlockingCollection<int>(5);
//試圖添加1-50
Task.Run(() =>
{
Parallel.For(1, 51, i =>
{
Console.WriteLine("准備加入:" + i);
bcollec.Add(i);
Console.WriteLine("== 成功加入:" + i);
Thread.Sleep(1000);
});
});
//等待一小段時間后馬上調用CompleteAdding
Thread.Sleep(500);
Console.WriteLine("調用CompleteAdding");
bcollec.CompleteAdding();
Thread.Sleep(Timeout.Infinite);
上述代碼可能的輸出:
准備加入:1
准備加入:13
准備加入:25
准備加入:37
== 成功加入:13
== 成功加入:1
== 成功加入:37
== 成功加入:25
調用CompleteAdding
准備加入:2
准備加入:38
准備加入:26
准備加入:14
可以看到,雖然BlockingCollection的BoundedCapacity為5,但是由於提前調用了CompleteAdding,即使當前集合只有4個元素,也不會再同意新的加入操作了。
那么CompleteAdding有什么用?當使用了CompleteAdding方法后且集合內沒有元素的時候,另一個屬性IsCompleted此時會為True,這個屬性可以用來判斷是否當前集合內的所有元素都被處理完,而BlockingCollection背后的IProducerConsumerCollection恰恰常用來處理此類生產者-消費者問題的。
下面我們首先在多個線程中試圖往BlockingCollection中加入元素,然后中途調用CompleteAdding,接着通過IsCompleted屬性逐個處理被成功加入的元素。
如下代碼:
var bcollec = new BlockingCollection<int>();
//試圖添加1-50
Task.Run(() =>
{
Parallel.For(1, 51, i =>
{
bcollec.Add(i);
Console.WriteLine("成功加入:" + i);
Thread.Sleep(1000);
});
});
//等待一小段時間后馬上調用CompleteAdding
Thread.Sleep(700);
Console.WriteLine("調用CompleteAdding");
bcollec.CompleteAdding();
Console.WriteLine("容器元素數量:" + bcollec.Count);
while (!bcollec.IsCompleted)
{
var res = bcollec.Take();
Console.WriteLine("取出:" + res);
}
Console.WriteLine("操作完成");
Thread.Sleep(Timeout.Infinite);
可能的輸出:
成功加入:37
成功加入:25
成功加入:13
成功加入:1
調用CompleteAdding
容器元素數量:4
取出:1
取出:37
取出:25
取出:13
操作完成
返回目錄
3. 枚舉:GetConsumingEnumerable和BlockingCollection本身
BlockingCollection有兩種枚舉方法,首先BlockingCollection本身繼承自IEnumerable<T>,所以它自己就可以被foreach枚舉,首先BlockingCollection包裝了一個線程安全集合,那么它自己也是線程安全的,而當多個線程在同時修改或訪問線程安全容器時,BlockingCollection自己作為IEnumerable會返回一個一定時間內的集合片段,也就是只會枚舉在那個時間點上內部集合的元素。
看下面代碼:
var bcollec = new BlockingCollection<int>();
//試圖添加1-10
Task.Run(() =>
{
var forOpt = new ParallelOptions()
{
//防止在某些硬件上並發數太多
MaxDegreeOfParallelism = 2
};
Parallel.For(1, 11, forOpt, i =>
{
bcollec.Add(i);
Console.WriteLine("成功加入:" + i);
Thread.Sleep(500);
});
});
Thread.Sleep(700);
//開始枚舉
Task.Run(() =>
{
foreach (var i in bcollec)
Console.WriteLine("輸出:" + i);
});
Thread.Sleep(Timeout.Infinite);
我們邊加入元素邊進行枚舉(直接在BlockingCollection上foreach),可能的輸出:
成功加入:1
成功加入:6
成功加入:2
成功加入:7
輸出:1
輸出:6
輸出:2
輸出:7
成功加入:8
成功加入:3
成功加入:4
成功加入:9
成功加入:5
成功加入:10
可以看到,BlockingCollection本身的迭代器只能反映出一時的容器內容。
而BlockingCollection還有一個GetConsumingEnumerable方法,同樣返回一個IEnumerable<T>,這個可枚舉的集合背后的迭代器不同於BlockingCollection本身的迭代器,它可以返回最新的加入的元素,如果當前時間段沒有元素被加入,它會阻塞然后等新加進來的元素。
我們把上面的使用BlockingCollection本身枚舉代碼中的枚舉Task改成這樣:
//開始枚舉
Task.Run(() =>
{
foreach (var i in bcollec.GetConsumingEnumerable())
Console.WriteLine("輸出:" + i);
Console.WriteLine("完成枚舉");
});
可能的輸出:
成功加入:6
成功加入:1
成功加入:2
成功加入:7
輸出:6
輸出:1
輸出:2
輸出:7
成功加入:3
成功加入:8
輸出:3
輸出:8
成功加入:4
成功加入:9
輸出:4
輸出:9
成功加入:10
成功加入:5
輸出:10
輸出:5
這個迭代器很給力,一直處於等待和執行的狀態,只要有新的元素被加入,它會找機會去執行foreach的內容,然后再阻塞去等新的元素。
而且在輸出中,代碼里的“完成枚舉”字符串一直沒有被輸出。此時它還在賣力地等……因為它不確定什么時候才不會有新元素被加入。
返回目錄
4. GetConsumingEnumerable和CompleteAdding
好,此時你應該想到了上面學的CompleteAdding方法,它可以禁止新的元素被加入到BlockingCollection的內部線程安全集合中,所以使用這個方法可以通知GetConsumingEnumerable的迭代器您老不用再等了,后面不會有元素被加進來了。
如下代碼:
抱歉,這幾段代碼都不短,而且都類似。但我仍然把完整代碼貼出來,雖然這使文章比較冗長,但是我覺得這樣讀者瀏覽或者復制時從上到下一目了然,總比看到諸如“請把前面xxx個代碼做如下修改:把xxx行改成xxx,在xxx行加入這段代碼……”好吧。
var bcollec = new BlockingCollection<int>();
//試圖添加1-10
Task.Run(() =>
{
var forOpt = new ParallelOptions()
{
//防止在某些硬件上並發數太多
MaxDegreeOfParallelism = 2
};
Parallel.For(1, 11, forOpt, i =>
{
Console.WriteLine("等待加入:" + i);
bcollec.Add(i);
Console.WriteLine("成功加入:" + i);
Thread.Sleep(500);
});
});
Thread.Sleep(600);
//開始枚舉
Task.Run(() =>
{
foreach (var i in bcollec.GetConsumingEnumerable())
Console.WriteLine("輸出:" + i);
Console.WriteLine("完成枚舉");
});
Thread.Sleep(300);
bcollec.CompleteAdding();
Console.WriteLine("=== 調用CompleteAdding");
Thread.Sleep(Timeout.Infinite);
可能的輸出:
等待加入:1
等待加入:6
成功加入:1
成功加入:6
等待加入:2
成功加入:2
等待加入:7
成功加入:7
輸出:1
輸出:6
輸出:2
輸出:7
=== 調用CompleteAdding
完成枚舉
等待加入:3
等待加入:8
可以看到,等CompleteAdding,“枚舉完成”馬上被輸出!
:D
本文版權歸作者所有,歡迎以網址(鏈接)的方式轉載,不歡迎復制文章內容的方式轉載,其一是為了在搜索引擎中去掉重復文章內容,其二復制后的文章往往沒有提供本博客的頁面格式和鏈接,造成文章可讀性很差。望有素質人自覺遵守上述建議。
如果一定要以復制文章內容的方式轉載,必須在文章開頭標明作者信息和原文章鏈接地址。否則保留追究法律責任的權利。