C#(07):並發集合 System.Collections.Concurrent 命名空間




一、概述:

System.Collections.Concurrent 命名空間提供多個線程安全集合類。

當有多個線程並發訪問集合時,應使用這些類代替 System.CollectionsSystem.Collections.Generic 命名空間中的對應類型。

為了對集合進行線程安全的訪問,定義了 IProducerConsumerCollection 接口。這個接口中最重 要的方法是TryAdd()和TryTake()。

  1. TryAdd()方法嘗試給集合添加一項,但如果集合禁止添加項,這個操作就可能失敗。為了給出相關信息,TryAdd()方法返回一個布爾值,以說明操作是成功還是失敗。
  2. TryTake()方法也以這種方式工作,以通知調用者操作是成功還是失敗,並在操作成功時返回集合中的項。

二、空間中包含的類

ConcurrentXXX :這些集合是線程安全的,如果某個動作不適用於線程的當前狀態,它們就返回false。在繼續之前,總是霈要確認添加或提取元素是否成功。不能相信集合會完成任務。

1、ConcurrentQueue 隊列

這個集合類用一種免鎖定的算法實現,使用在內部合並到一個鏈表中的32項數組。

訪問隊列元素的方法有Enqueue()、TryDequeue()和TryPeek()。這些方法的命名非常類似於前面Queue 類的方法,只是給可能調用失敗的方法加上了前綴Try。

因為這個類實現了IProducerConsumerCollection 接口,所以 TryAdd()和 TryTake()方法僅調用 Enqueue()和 TryDequeue()方法。

2、ConcurrentStack 堆棧

非常類似於ConcurrentQueue 類,只是帶有另外的元素訪問方法。

ConcurrcntStack 類定義了 Push()、PushRange()、TryPeek()、TiyPop()和 TryPopRange() 方法。在內部這個類使用其元素的鏈表。

3、ConcurrentBag

該類沒有定義添加或提取項的任何順序。這個類使用一個把線程映射到內部使用的數組上的概念,因此嘗試減少鎖定。

訪問元素的方法有Add()、TryPeek()和 TryTake()。

4、ConcurrentDictionary 字典

——這是一個線程安全的鍵值集合。

TryAdd()、TryGetValue()、TryRemove()和TryUpdate()方法以非阻塞的方式訪問成員。

因為元素基於鍵和值, 所以 ConcurrentDictionary 沒有實現 IProducerConsumerCollection

5、BlockingCollection 集合

這個集合在可以添加或提取元素之前,會阻塞線程並一直等待。

BlockingCollection 集合提供了一個接口,以使用Add()和Take()方法來添加和刪除元素。 這些方法會阻寒線程,一直等到任務可以執行為止。

Add()方法有一個重載版本,其中可以給該重載版本傳遞一個CancellationToken令牌。

這個令牌允許取消被阻塞的調用。如果不希望線程無限期地等待下去,且不希望從外部取消調用,就可以使用TryAdd()和 TryTake()方法,在這些方法中,也可以指定一個超時值,它表示在調用失敗之前應阻塞線程和等待的最長時間。

6、BlockingCollection 阻塞式集合

這是對實現了 IProducerConsumerCollection 接口的任意類的修飾器,它默認使用ConcurrentQueue 類。

還可以給構造函數傳遞任何其他實現了 IProducerConsumerCollection 接口的類。

下面的代碼示例簡單演示了使用BlockingCollection 類和多個線程的過程。

一個線程是生成器,它使用Add()方法給集合寫入元素,另一個線程是使用者,它使用Take()方法從集合中提取元素:

internal static BlockingCollection<int> _TestBCollection;

class ThreadWork1  // 生產者
{
    public ThreadWork1()
    { }
    public void run()
    {
        System.Console.WriteLine("ThreadWork1 run { ");
        for (int i = 0; i < 100; i++)
        {
            System.Console.WriteLine("ThreadWork1 producer: " + i);
            _TestBCollection.Add(i);
        }
        _TestBCollection.CompleteAdding();
        System.Console.WriteLine("ThreadWork1 run } ");
    }
}

class ThreadWork2  // 消費者
{
    public ThreadWork2()
    { }
    public void run()
    {
        int i = 0;
        int nCnt = 0;
        bool IsDequeuue = false;
        System.Console.WriteLine("ThreadWork2 run { ");
        while (!_TestBCollection.IsCompleted)
        {
            IsDequeuue = _TestBCollection.TryTake(out i);
            if (IsDequeuue)
            {
                System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "   =====" + i);
                nCnt++;
            }
        }
        System.Console.WriteLine("ThreadWork2 run } ");
    }
}
static void StartT1()
{
    ThreadWork1 work1 = new ThreadWork1();
    work1.run();
}
static void StartT2()
{
    ThreadWork2 work2 = new ThreadWork2();
    work2.run();
}
static void Main(string[] args)
{
    Task t1 = new Task(() => StartT1());
    Task t2 = new Task(() => StartT2());
    _TestBCollection = new BlockingCollection<int>();//可以跟容量
    Console.WriteLine("Sample 4-4 Main {");
    Console.WriteLine("Main t1 t2 started {");
    t1.Start();
    t2.Start();
    Console.WriteLine("Main t1 t2 started }");
    Console.WriteLine("Main wait t1 t2 end {");
    Task.WaitAll(t1, t2);
    Console.WriteLine("Main wait t1 t2 end }");
    Console.WriteLine("Sample 4-4 Main }");
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM