多線程編程學習筆記——使用並發集合(二)


接上文 多線程編程學習筆記——使用並發集合(一)

 

二、   使用ConcurrentQueue來實現異步處理

本示例將學習如何創建一個能被多個線程異步處理的一組任務的例子。

 一、程序示例代碼如下:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading; 

namespace ThreadCollectionDemo
{

    class Program
    {
        const string item = "Dict Name";
        public static string CurrentItem;
        static double time1;
        static void Main(string[] args)
        {
            Console.WriteLine(string.Format("-----  ConcurrentQueue 操作----"));

            Task task = TaskRun1();
            task.Wait();

            Console.Read();
        }

        private static async Task TaskRun1()
        {

            var queue = new ConcurrentQueue<CustomTask>();
            var cts = new CancellationTokenSource();
            var taskSrc = Task.Run(() => TaskProduct(queue));
            Task[] process = new Task[4];
            for (int i = 1; i <= 4; i++)
            {
                string processId = i.ToString();
                process[i - 1] = Task.Run(() => TaskProcess(queue, "Processer " + processId, cts.Token));
            }
            await taskSrc;
            cts.CancelAfter(TimeSpan.FromSeconds(2));
            await Task.WhenAll(process);
        }
 

        static async Task TaskProduct(ConcurrentQueue<CustomTask> queue)
        {
            for (int i = 0; i < 20; i++)
            {
                await Task.Delay(50);
                var workitem = new CustomTask { Id = i };
                queue.Enqueue(workitem);
                Console.WriteLine(string.Format("把{0} 元素添加到ConcurrentQueue",workitem.Id));
            }
        }

        static async Task TaskProcess(ConcurrentQueue<CustomTask> queue,string name,CancellationToken token)
        {
            CustomTask workitem;
            bool dequeueSuccesfl = false;
            await GetRandomDely();
            do
            {
                dequeueSuccesfl = queue.TryDequeue(out workitem);

                if (dequeueSuccesfl)
                {
                    Console.WriteLine(string.Format("元素 {0} 從ConcurrentQueue中取出 ,名稱:{1} ", workitem.Id, name));

                }

                await GetRandomDely();
            }

            while (!token.IsCancellationRequested);
        }

        static Task GetRandomDely()
        {
            int dely = new Random(DateTime.Now.Millisecond).Next(1, 1000);
            return Task.Delay(dely);
        }
 
    }

    public class CustomTask
    {
        public int Id { get; set; }
    }
}

 

 

 2.程序運行結果如下圖。

      當程序運行時,我們使用ConcurrentQueue集合實現創建了一個任務隊列。然后創建了一個取消標志,它是用來在我們將任務放入隊列后停止工作 的。接下來啟動了一個單獨的工作線程來將任務放入任務隊列中。這部分分為異步處理產生了工作 量。

       現在定義這個程序中消費任務的部分。我們創建了四個工作 線程,它們會隨機等待一段時間,然后從任務隊列中獲取一個任務,處理這個任務,一直重復整個過程直到我們發出取消標志信號。最后,我們啟動產生任務的線程,等待這個線程完成。然后使用取消標志給消費發信號 我們完成了工作。最后一步將等待所有的消費完成。

       我們看到隊列中的任務按從前到后的順序被 處理,但一個后面的任務是有可能會比前面的任務先處理的,因為我們有四個工作 線程獨立地運行,而且任務處理時間並不是恆定的。我們看到 訪問這個隊列是線程安全的,沒有一個元素會被提取兩次。

 

 

三、   改變ConcurrentStack異步處理順序

      本示例是對上一面的示例的修改版。我們又一次創建了被多個工作線程異步處理的一組任務,但是這次使用ConcurrentStack來實現,我們來看看這兩個示例會有什么不同。

 1. 程序的代碼如下圖。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading; 

namespace ThreadCollectionDemo
{

    class Program
    {       

        static void Main(string[] args)
        {
            Console.WriteLine(string.Format("-----  ConcurrentStack 操作----"));
            Task task = TaskStack();
            task.Wait();
            Console.Read();
        }

        private static async Task TaskStack()
        { 

            var stack = new ConcurrentStack<CustomTask>();
            var cts = new CancellationTokenSource();
            var taskSrc = Task.Run(() => TaskProduct(stack));
            Task[] process = new Task[4];
            for (int i = 1; i <= 4; i++)
            {
                string processId = i.ToString();
                process[i - 1] = Task.Run(() => TaskProcess(stack, "Processer " + processId, cts.Token));
            }

            await taskSrc;
            cts.CancelAfter(TimeSpan.FromSeconds(2));
            await Task.WhenAll(process);
        }
 

        static async Task TaskProduct(ConcurrentStack<CustomTask> stack)
        {
            for (int i = 0; i < 20; i++)
            {
                await Task.Delay(50);
                var workitem = new CustomTask { Id = i };
                stack.Push(workitem);
                Console.WriteLine(string.Format("把{0} 元素添加到ConcurrentStack",workitem.Id));
            }
        }

        static async Task TaskProcess(ConcurrentStack<CustomTask> stack,string name,CancellationToken token)
        {

            CustomTask workitem;
            bool popSuccesful = false;
            await GetRandomDely();
            do
            {
                popSuccesful = stack.TryPop(out workitem);
                if (popSuccesful)
                {
                    Console.WriteLine(string.Format("元素 {0} ConcurrentStack  取出 ,名稱:{1} ", workitem.Id, name));
                }
                await GetRandomDely();
            }
            while (!token.IsCancellationRequested);
        }

        static Task GetRandomDely()
        {
            int dely = new Random(DateTime.Now.Millisecond).Next(1, 1000);
            return Task.Delay(dely);
        } 
    } 
}

 

 

 

 2.程序的運行結果如下圖。

        當程序運行時,我們創建了一個ConcurrentStack集合的實例。其余的代碼與前一示例幾乎一樣,唯一不同之年是我們對並發堆棧使用了Push和TryPop方法,而對並發隊列使用Enqueue和TryDequeue方法。

       從上圖結果中可以扯到任務處理的順序被改變了。堆棧是一個LIFO集合,工作線程先處理最近的任務。在並發隊列中,任務被處理的順序與被添加的順序幾乎一樣。這說明根據工作線程的數量,我們將在一定時間內處理先被創建的任務。而在堆棧中,早先創建的任務具有較低的優先級,而且直到生產者停止向堆棧中放入更多任務后,這個任務才有可能被處理。這行為是確定 的,最好在這種場景下使用隊列。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM