《C#並行編程高級教程》第4章 並發集合 筆記


這一章主要介紹了System.Collections.Concurrent下的幾個類。

ConcurrentQueue<T>

並發隊列。完全無鎖,使用CAS(compare-and-swap)比較並交換和自旋重試來實現線程安全。
//加入隊尾
public void Enqueue(T item)
//嘗試刪除隊頭,並將元素通過out返回,返回值表示是否操作成功
public bool TryDequeue( out T result)
//嘗試獲取隊頭,通過out返回元素,返回值為代表是否操作成功
public bool TryPeek( out T result)

ConcurrentStack<T>

並發棧,完全無鎖,使用CAS(compare-and-swap)比較並交換和自旋重試來實現線程安全。
 
public void Push(T item)
public void PushRange(T[] items)
public void PushRange(T[] items, int startIndex, int count)
 
public bool TryPeek( out T result)
 
public bool TryPop( out T result)
public int TryPopRange(T[] items)
public int TryPopRange(T[] items, int startIndex, int count)

ConcurrentBag<T>

這是一個無序的對象集合,而且支持對象重復。在同一線程下添加和刪除效率高,又是需要鎖。
public void Add(T item)
public bool TryPeek( out T result)
public bool TryTake( out T result)

ConcurrentDictionary<TKey,TValue>

並發的鍵值對,讀是沒有鎖的,寫會有細粒度的鎖。
 
public TValue GetOrAdd(TKey key,TValue value)
public bool TryAdd(TKey key,TValue value)
public bool TryGetValue(TKey key, out TValue value)
public bool TryRemove(TKey key, out TValue value)
public bool TryUpdate(TKey key,TValue newValue,TValue comparisonValue)

IProducerConsumer<T>與BlockingCollection<T>

IProducerConsumerCollection<T>是對生產者-消費者模型的一個操作抽象,BlockingCollection<T>是一個實現。
針對生產者消費者模型,給出了很多方便的功能。
構造的時候可以設置最大容量,當集合達到最大容量,添加請求會被阻塞。
添加完成的判斷也不需要自己在維護標識符
更有GetConsumingEnumerable(),獲取集合迭代器,可以使用foreach,如果集合為空着會阻塞,並等待添加新的元素,如果集合沒有元素並且IsAddingCompleted為true那么循環終止,可以省去不必要的自旋。
還支持超時和取消功能
 
private const int NUM_SENTENCES = 2000000;
private static BlockingCollection < string > _sentencesBC;
private static BlockingCollection < string > _capWordsInSentencesBC;
private static BlockingCollection < string > _finalSentencesBC;
 
private static void ProduceSentences(System.Threading.CancellationToken ct)
{
     //...
     if ( !_sentencesBC.TryAdd(newSentence, 2000, ct))
    {
         throw new TimeoutException(...);
    }
     //...
    _sentencesBC.CompleteAdding();
}
 
private static void CapitalizeWordsInSentences()
{
     //...
     while ( !_sentencesBC.IsCompleted)
    {
         string sentence;
         if (_sentencesBC.TryTake( out sentence))
        {
            _capWordsInSentencesBC.Add(...);
        }
    }
    _capWordsInSentencesBC.CompleteAdding();
}
 
private static void RemoveLettersInSentences()
{
     //...
     foreach (var sentence in _capWordsInSentencesBC.GetConsumingEnumerable())
    {
        _finalSentencesBC.Add(RemoveLetters(letterChars, sentence));
    }
    _finalSentencesBC.CompleteAdding();
}
 
static void Main( string[] args)
{
    _sentencesBC = new BlockingCollection < string >(NUM_SENTENCES);
    _capWordsInSentencesBC = new BlockingCollection < string >(NUM_SENTENCES);
    _finalSentencesBC = new BlockingCollection < string >(NUM_SENTENCES);
 
    var cts = new System.Threading.CancellationTokenSource();
    var ct = cts.Token;
    var deferredCancelTask = Task.Factory.StartNew(() = >
    {
        System.Threading.Thread.Sleep( 500);
        cts.Cancel();
    });
 
    Parallel.Invoke(
        () = > ProduceSentences(ct),
        () = > CapitalizeWordsInSentences(),
        () = > RemoveLettersInSentences()
        );
}

任務計數器

常常需要知道還在運行的Task的數量。所以需要對計數器進行原子的加減
可以在任務新建的時候使用System.Threading.Interlocked.Increment(ref tasksRunning)
在任務結束后System.Threading.Interlocked.Decrement(ref tasksRunning);
private static int tasksRunning = 0;
 
//inti method
for ( int i = 0; i < taskMax; i ++)
{
    System.Threading.Interlocked.Increment( ref tasksRunning);
    tasks[i] = Task.Factory.StartNew(() = >
    {
         try
        {
             //...
        }
         finally
        {
            System.Threading.Interlocked.Decrement( ref tasksRunning);
        }
    });
}
 
//other method
while ((tasksRunning > 0))
{
     //...
}

並發集合和不安全集合互轉

並發集合可以結構一個IEnumerable接口的集合做構造函數參數。
例如
string[] _invalidHexValues = { "AF", "BD", "BF", "CF", "DA", "FA", "FE", "FF" };
var invalidHexValuesStack = new ConcurrentStack < string >(_invalidHexValues);
要把並發集合轉成不安全的可以使用CopyTo ToArray等方法

volatile關鍵字

使用volatile可以確保在不同的線程中進行訪問的時候,可以得到最新值。這些變量不會被編譯器按照只在一個線程中的進行訪問的假定進行優化。
private static volatile bool _flag = false;
 

 




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM