C# 線程安全集合


轉載

對於並行任務,與其相關緊密的就是對一些共享資源,數據結構的並行訪問。經常要做的就是對一些隊列進行加鎖-解鎖,然后執行類似插入,刪除等等互斥操作。 .NetFramework 4.0 中提供了一些封裝好的支持並行操作數據容器,可以減少並行編程的復雜程度。




基本信息

.NetFramework中並行集合的名字空間: System.Collections.Concurrent

並行容器:

•ConcurrentQueue
•ConcurrentStack
•ConcurrentBag : 一個無序的數據結構集,當不需要考慮順序時非常有用。
•BlockingCollection : 與經典的阻塞隊列數據結構類似
•ConcurrentDictionary




這些集合在某種程度上使用了無鎖技術(CAS Compare-and-Swap和內存屏障 Memory Barrier),與加互斥鎖相比獲得了性能的提升。但在串行程序中,最好不用這些集合,它們必然會影響性能。




關於CAS: 

•http://www.tuicool.com/articles/zuui6z
•http://www.360doc.com/content/11/0914/16/7656248_148221200.shtml

關於內存屏障

•http://en.wikipedia.org/wiki/Memory_barrier




用法與示例

ConcurrentQueue

其完全無鎖,但當CAS面臨資源競爭失敗時可能會陷入自旋並重試操作。




•Enqueue:在隊尾插入元素
•TryDequeue:嘗試刪除隊頭元素,並通過out參數返回
•TryPeek:嘗試將對頭元素通過out參數返回,但不刪除該元素。




程序示例:




using System;
using System.Text;

using System.Threading.Tasks;
using System.Collections.Concurrent;

namespace Sample4_1_concurrent_queue
{
    class Program
    {
        internal static ConcurrentQueue<int> _TestQueue;

        class ThreadWork1  // producer
        {
            public ThreadWork1()
            { }

            public void run()
            {
                System.Console.WriteLine("ThreadWork1 run { ");
                for (int i = 0; i < 100; i++)
                {
                    System.Console.WriteLine("ThreadWork1 producer: " + i);
                    _TestQueue.Enqueue(i);
                }
                System.Console.WriteLine("ThreadWork1 run } ");
            }
        }

        class ThreadWork2  // consumer
        {
            public ThreadWork2()
            { }

            public void run()
            {
                int i = 0;
                bool IsDequeuue = false;
                System.Console.WriteLine("ThreadWork2 run { ");
                for (; ; )
                {
                    IsDequeuue = _TestQueue.TryDequeue(out i);
                    if (IsDequeuue)
                        System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "   =====");

                    if (i == 99)
                        break;
                }
                System.Console.WriteLine("ThreadWork2 run } ");
            }
        }

        static void StartT1()
        {
            ThreadWork1 work1 = new ThreadWork1();
            work1.run();
        }

        static void StartT2()
        {
            ThreadWork2 work2 = new ThreadWork2();
            work2.run();
        }
        static void Main(string[] args)
        {
            Task t1 = new Task(() => StartT1());
            Task t2 = new Task(() => StartT2());

            _TestQueue = new ConcurrentQueue<int>();

            Console.WriteLine("Sample 3-1 Main {");

            Console.WriteLine("Main t1 t2 started {");
            t1.Start();
            t2.Start();
            Console.WriteLine("Main t1 t2 started }");

            Console.WriteLine("Main wait t1 t2 end {");
            Task.WaitAll(t1, t2);
            Console.WriteLine("Main wait t1 t2 end }");

            Console.WriteLine("Sample 3-1 Main }");

            Console.ReadKey();
        }
    }
}







ConcurrentStack

其完全無鎖,但當CAS面臨資源競爭失敗時可能會陷入自旋並重試操作。






•Push:向棧頂插入元素
•TryPop:從棧頂彈出元素,並且通過out 參數返回
•TryPeek:返回棧頂元素,但不彈出。




程序示例:


using System;
using System.Text;

using System.Threading.Tasks;
using System.Collections.Concurrent;

namespace Sample4_2_concurrent_stack
{
    class Program
    {
        internal static ConcurrentStack<int> _TestStack;

        class ThreadWork1  // producer
        {
            public ThreadWork1()
            { }

            public void run()
            {
                System.Console.WriteLine("ThreadWork1 run { ");
                for (int i = 0; i < 100; i++)
                {
                    System.Console.WriteLine("ThreadWork1 producer: " + i);
                    _TestStack.Push(i);
                }
                System.Console.WriteLine("ThreadWork1 run } ");
            }
        }

        class ThreadWork2  // consumer
        {
            public ThreadWork2()
            { }

            public void run()
            {
                int i = 0;
                bool IsDequeuue = false;
                System.Console.WriteLine("ThreadWork2 run { ");
                for (; ; )
                {
                    IsDequeuue = _TestStack.TryPop(out i);
                    if (IsDequeuue)
                        System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "   =====" + i);

                    if (i == 99)
                        break;
                }
                System.Console.WriteLine("ThreadWork2 run } ");
            }
        }

        static void StartT1()
        {
            ThreadWork1 work1 = new ThreadWork1();
            work1.run();
        }

        static void StartT2()
        {
            ThreadWork2 work2 = new ThreadWork2();
            work2.run();
        }
        static void Main(string[] args)
        {
            Task t1 = new Task(() => StartT1());
            Task t2 = new Task(() => StartT2());

            _TestStack = new ConcurrentStack<int>();

            Console.WriteLine("Sample 4-1 Main {");

            Console.WriteLine("Main t1 t2 started {");
            t1.Start();
            t2.Start();
            Console.WriteLine("Main t1 t2 started }");

            Console.WriteLine("Main wait t1 t2 end {");
            Task.WaitAll(t1, t2);
            Console.WriteLine("Main wait t1 t2 end }");

            Console.WriteLine("Sample 4-1 Main }");

            Console.ReadKey();
        }
    }
}



測試中一個有趣的現象:




雖然生產者已經在棧中插入值已經到了25,但消費者第一個出棧的居然是4,而不是25。很像是出錯了。但仔細想想入棧,出棧和打印語句是兩個部分,而且並不是原子操作,出現這種現象應該也算正常。



Sample 3-1 Main {
Main t1 t2 started {
Main t1 t2 started }
Main wait t1 t2 end {
ThreadWork1 run {
ThreadWork1 producer: 0
 ThreadWork2 run {
ThreadWork1 producer: 1
ThreadWork1 producer: 2
 ThreadWork1 producer: 3
ThreadWork1 producer: 4
ThreadWork1 producer: 5
ThreadWork1 producer: 6
ThreadWork1 producer: 7
ThreadWork1 producer: 8
ThreadWork1 producer: 9
ThreadWork1 producer: 10
ThreadWork1 producer: 11
ThreadWork1 producer: 12
ThreadWork1 producer: 13
 ThreadWork1 producer: 14
ThreadWork1 producer: 15
ThreadWork1 producer: 16
ThreadWork1 producer: 17
ThreadWork1 producer: 18
ThreadWork1 producer: 19
ThreadWork1 producer: 20
ThreadWork1 producer: 21
 ThreadWork1 producer: 22
ThreadWork1 producer: 23
ThreadWork1 producer: 24
ThreadWork1 producer: 25
ThreadWork2 consumer: 16   =====4
ThreadWork2 consumer: 625   =====25
ThreadWork2 consumer: 576   =====24
ThreadWork2 consumer: 529   =====23
ThreadWork1 producer: 26
ThreadWork1 producer: 27
 ThreadWork1 producer: 28










ConcurrentBag

一個無序的集合,程序可以向其中插入元素,或刪除元素。

在同一個線程中向集合插入,刪除元素的效率很高。




• Add:向集合中插入元素
• TryTake:從集合中取出元素並刪除
• TryPeek:從集合中取出元素,但不刪除該元素。





程序示例:




using System;
using System.Text;

using System.Threading.Tasks;
using System.Collections.Concurrent;

namespace Sample4_3_concurrent_bag
{
    class Program
    {
        internal static ConcurrentBag<int> _TestBag;

        class ThreadWork1  // producer
        {
            public ThreadWork1()
            { }

            public void run()
            {
                System.Console.WriteLine("ThreadWork1 run { ");
                for (int i = 0; i < 100; i++)
                {
                    System.Console.WriteLine("ThreadWork1 producer: " + i);
                    _TestBag.Add(i);
                }
                System.Console.WriteLine("ThreadWork1 run } ");
            }
        }

        class ThreadWork2  // consumer
        {
            public ThreadWork2()
            { }

            public void run()
            {
                int i = 0;
                int nCnt = 0;
                bool IsDequeuue = false;
                System.Console.WriteLine("ThreadWork2 run { ");
                for (;;)
                {
                    IsDequeuue = _TestBag.TryTake(out i);
                    if (IsDequeuue)
                    {
                        System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "   =====" + i);
                        nCnt++;
                    }

                    if (nCnt == 99)
                        break;
                }
                System.Console.WriteLine("ThreadWork2 run } ");
            }
        }

        static void StartT1()
        {
            ThreadWork1 work1 = new ThreadWork1();
            work1.run();
        }

        static void StartT2()
        {
            ThreadWork2 work2 = new ThreadWork2();
            work2.run();
        }
        static void Main(string[] args)
        {
            Task t1 = new Task(() => StartT1());
            Task t2 = new Task(() => StartT2());

            _TestBag = new ConcurrentBag<int>();

            Console.WriteLine("Sample 4-3 Main {");

            Console.WriteLine("Main t1 t2 started {");
            t1.Start();
            t2.Start();
            Console.WriteLine("Main t1 t2 started }");

            Console.WriteLine("Main wait t1 t2 end {");
            Task.WaitAll(t1, t2);
            Console.WriteLine("Main wait t1 t2 end }");

            Console.WriteLine("Sample 4-3 Main }");

            Console.ReadKey();
        }
    }
}







BlockingCollection

一個支持界限和阻塞的容器




•Add :向容器中插入元素
•TryTake:從容器中取出元素並刪除
•TryPeek:從容器中取出元素,但不刪除。
•CompleteAdding:告訴容器,添加元素完成。此時如果還想繼續添加會發生異常。
•IsCompleted:告訴消費線程,生產者線程還在繼續運行中,任務還未完成。




示例程序:




程序中,消費者線程完全使用  while (!_TestBCollection.IsCompleted) 作為退出運行的判斷條件。

在Worker1中,有兩條語句被注釋掉了,當i 為50時設置CompleteAdding,但當繼續向其中插入元素時,系統拋出異常,提示無法再繼續插入。




using System;
using System.Text;

using System.Threading.Tasks;
using System.Collections.Concurrent;


namespace Sample4_4_concurrent_bag
{
    class Program
    {
        internal static BlockingCollection<int> _TestBCollection;

        class ThreadWork1  // producer
        {
            public ThreadWork1()
            { }

            public void run()
            {
                System.Console.WriteLine("ThreadWork1 run { ");
                for (int i = 0; i < 100; i++)
                {
                    System.Console.WriteLine("ThreadWork1 producer: " + i);
                    _TestBCollection.Add(i);
                    //if (i == 50)
                    //    _TestBCollection.CompleteAdding();
                }
                _TestBCollection.CompleteAdding();

                System.Console.WriteLine("ThreadWork1 run } ");
            }
        }

        class ThreadWork2  // consumer
        {
            public ThreadWork2()
            { }

            public void run()
            {
                int i = 0;
                int nCnt = 0;
                bool IsDequeuue = false;
                System.Console.WriteLine("ThreadWork2 run { ");
                while (!_TestBCollection.IsCompleted)
                {
                    IsDequeuue = _TestBCollection.TryTake(out i);
                    if (IsDequeuue)
                    {
                        System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "   =====" + i);
                        nCnt++;
                    }
                }
                System.Console.WriteLine("ThreadWork2 run } ");
            }
        }

        static void StartT1()
        {
            ThreadWork1 work1 = new ThreadWork1();
            work1.run();
        }

        static void StartT2()
        {
            ThreadWork2 work2 = new ThreadWork2();
            work2.run();
        }
        static void Main(string[] args)
        {
            Task t1 = new Task(() => StartT1());
            Task t2 = new Task(() => StartT2());

            _TestBCollection = new BlockingCollection<int>();

            Console.WriteLine("Sample 4-4 Main {");

            Console.WriteLine("Main t1 t2 started {");
            t1.Start();
            t2.Start();
            Console.WriteLine("Main t1 t2 started }");

            Console.WriteLine("Main wait t1 t2 end {");
            Task.WaitAll(t1, t2);
            Console.WriteLine("Main wait t1 t2 end }");

            Console.WriteLine("Sample 4-4 Main }");

            Console.ReadKey();
        }
    }
}




當然可以嘗試在Work1中注釋掉 CompleteAdding 語句,此時Work2陷入循環無法退出。




ConcurrentDictionary

對於讀操作是完全無鎖的,當很多線程要修改數據時,它會使用細粒度的鎖。




•AddOrUpdate:如果鍵不存在,方法會在容器中添加新的鍵和值,如果存在,則更新現有的鍵和值。
•GetOrAdd:如果鍵不存在,方法會向容器中添加新的鍵和值,如果存在則返回現有的值,並不添加新值。
•TryAdd:嘗試在容器中添加新的鍵和值。
•TryGetValue:嘗試根據指定的鍵獲得值。
•TryRemove:嘗試刪除指定的鍵。
•TryUpdate:有條件的更新當前鍵所對應的值。
•GetEnumerator:返回一個能夠遍歷整個容器的枚舉器。








程序示例:




using System;
using System.Text;

using System.Threading.Tasks;
using System.Collections.Concurrent;


namespace Sample4_5_concurrent_dictionary
{
    class Program
    {
        internal static ConcurrentDictionary<int, int> _TestDictionary;

        class ThreadWork1  // producer
        {
            public ThreadWork1()
            { }

            public void run()
            {
                System.Console.WriteLine("ThreadWork1 run { ");
                for (int i = 0; i < 100; i++)
                {
                    System.Console.WriteLine("ThreadWork1 producer: " + i);
                    _TestDictionary.TryAdd(i, i);
                }

                System.Console.WriteLine("ThreadWork1 run } ");
            }
        }

        class ThreadWork2  // consumer
        {
            public ThreadWork2()
            { }

            public void run()
            {
                int i = 0, nCnt = 0;
                int nValue = 0;
                bool IsOk = false;
                System.Console.WriteLine("ThreadWork2 run { ");
                while (nCnt < 100)
                {
                    IsOk = _TestDictionary.TryGetValue(i, out nValue);
                    if (IsOk)
                    {
                        System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "   =====" + i);
                        nValue = nValue * nValue;
                        _TestDictionary.AddOrUpdate(i, nValue, (key, value) => { return value = nValue; });
                        nCnt++;
                        i++;
                    }
                }
                System.Console.WriteLine("ThreadWork2 run } ");
            }
        }

        static void StartT1()
        {
            ThreadWork1 work1 = new ThreadWork1();
            work1.run();
        }

        static void StartT2()
        {
            ThreadWork2 work2 = new ThreadWork2();
            work2.run();
        }
        static void Main(string[] args)
        {
            Task t1 = new Task(() => StartT1());
            Task t2 = new Task(() => StartT2());
            bool bIsNext = true;
            int  nValue = 0;

            _TestDictionary = new ConcurrentDictionary<int, int>();

            Console.WriteLine("Sample 4-5 Main {");

            Console.WriteLine("Main t1 t2 started {");
            t1.Start();
            t2.Start();
            Console.WriteLine("Main t1 t2 started }");

            Console.WriteLine("Main wait t1 t2 end {");
            Task.WaitAll(t1, t2);
            Console.WriteLine("Main wait t1 t2 end }");

            foreach (var pair in _TestDictionary)
            {
                Console.WriteLine(pair.Key + " : " + pair.Value);
            }

            System.Collections.Generic.IEnumerator<System.Collections.Generic.KeyValuePair<int, int>> 
                enumer = _TestDictionary.GetEnumerator();

            while (bIsNext)
            {
                bIsNext = enumer.MoveNext();
                Console.WriteLine("Key: " + enumer.Current.Key +
                                  "  Value: " + enumer.Current.Value);

                _TestDictionary.TryRemove(enumer.Current.Key, out nValue);
            }

            Console.WriteLine("\n\nDictionary Count: " + _TestDictionary.Count);

            Console.WriteLine("Sample 4-5 Main }");

            Console.ReadKey();
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM