關於 LimitedConcurrencyLevelTaskScheduler 的疑惑


1. LimitedConcurrencyLevelTaskScheduler 介紹

這個TaskScheduler用過的應該都知道,微軟開源的一個任務調度器,它的代碼很簡單, 也很好懂,但是我沒有明白的是他是如何實現限制並發數的 首先貼下它的代碼,大家先熟悉一下。
public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
    /// <summary>Whether the current thread is processing work items.</summary> 
    [ThreadStatic]
    private static bool _currentThreadIsProcessingItems;
    /// <summary>The list of tasks to be executed.</summary> 
    private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks) 
                                                                       /// <summary>The maximum concurrency level allowed by this scheduler.</summary> 
    private readonly int _maxDegreeOfParallelism;
    /// <summary>Whether the scheduler is currently processing work items.</summary> 
    private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks) 

    /// <summary> 
    /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the 
    /// specified degree of parallelism. 
    /// </summary> 
    /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param> 
    public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
    {
        if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
        _maxDegreeOfParallelism = maxDegreeOfParallelism;
    }

    /// <summary>
    /// current executing number;
    /// </summary>
    public int CurrentCount { get; set; }

    /// <summary>Queues a task to the scheduler.</summary> 
    /// <param name="task">The task to be queued.</param> 
    protected sealed override void QueueTask(Task task)
    {
        // Add the task to the list of tasks to be processed. If there aren't enough 
        // delegates currently queued or running to process tasks, schedule another. 
        lock (_tasks)
        {
            Console.WriteLine("Task Count : {0} ", _tasks.Count);
            _tasks.AddLast(task);
            if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
            {
                ++_delegatesQueuedOrRunning;
                NotifyThreadPoolOfPendingWork();
            }
        }
    }
    int executingCount = 0;
    private static object executeLock = new object();
    /// <summary> 
    /// Informs the ThreadPool that there's work to be executed for this scheduler. 
    /// </summary> 
    private void NotifyThreadPoolOfPendingWork()
    {
        ThreadPool.UnsafeQueueUserWorkItem(_ =>
        {
            // Note that the current thread is now processing work items. 
            // This is necessary to enable inlining of tasks into this thread. 
            _currentThreadIsProcessingItems = true;
            try
            {
                // Process all available items in the queue. 
                while (true)
                {
                    Task item;
                    lock (_tasks)
                    {
                        // When there are no more items to be processed, 
                        // note that we're done processing, and get out. 
                        if (_tasks.Count == 0)
                        {
                            --_delegatesQueuedOrRunning;

                            break;
                        }
                        
                        // Get the next item from the queue 
                        item = _tasks.First.Value;
                        _tasks.RemoveFirst();
                    }
                  

                    // Execute the task we pulled out of the queue 
                    base.TryExecuteTask(item);
                }
            }
            // We're done processing items on the current thread 
            finally { _currentThreadIsProcessingItems = false; }
        }, null);
    }

    /// <summary>Attempts to execute the specified task on the current thread.</summary> 
    /// <param name="task">The task to be executed.</param> 
    /// <param name="taskWasPreviouslyQueued"></param> 
    /// <returns>Whether the task could be executed on the current thread.</returns> 
    protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {

        // If this thread isn't already processing a task, we don't support inlining 
        if (!_currentThreadIsProcessingItems) return false;

        // If the task was previously queued, remove it from the queue 
        if (taskWasPreviouslyQueued) TryDequeue(task);

        // Try to run the task. 
        return base.TryExecuteTask(task);
    }

    /// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary> 
    /// <param name="task">The task to be removed.</param> 
    /// <returns>Whether the task could be found and removed.</returns> 
    protected sealed override bool TryDequeue(Task task)
    {
        lock (_tasks) return _tasks.Remove(task);
    }

    /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary> 
    public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }

    /// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary> 
    /// <returns>An enumerable of the tasks currently scheduled.</returns> 
    protected sealed override IEnumerable<Task> GetScheduledTasks()
    {
        bool lockTaken = false;
        try
        {
            Monitor.TryEnter(_tasks, ref lockTaken);
            if (lockTaken) return _tasks.ToArray();
            else throw new NotSupportedException();
        }
        finally
        {
            if (lockTaken) Monitor.Exit(_tasks);
        }
    }
}

簡單使用

下面是調用代碼。

static void Main(string[] args)
{
    
        TaskFactory fac = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(5));
         
        //TaskFactory fac = new TaskFactory();
        for (int i = 0; i < 1000; i++)
        {
           
            fac.StartNew(s => {
                Thread.Sleep(1000);
                Console.WriteLine("Current Index {0}, ThreadId {1}",s,Thread.CurrentThread.ManagedThreadId);
            }, i);
        }

        Console.ReadKey();
}

調用很簡單
根據調試調用順序可以知道。
使用 LimitedConcurrencyLevelTaskScheduler 創建好TaskFactory 后,
調用該TaskFacotry.StartNew 方法后。會進入 LimitedConcurrencyLevelTaskScheduler
QueueTask 方法。

/// <summary>Queues a task to the scheduler.</summary> 
    /// <param name="task">The task to be queued.</param> 
    protected sealed override void QueueTask(Task task)
    {
        // Add the task to the list of tasks to be processed. If there aren't enough 
        // delegates currently queued or running to process tasks, schedule another. 
        lock (_tasks)
        {
            Console.WriteLine("Task Count : {0} ", _tasks.Count);
            _tasks.AddLast(task);
            if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
            {
                ++_delegatesQueuedOrRunning;
                NotifyThreadPoolOfPendingWork();
            }
        }
    }

代碼很簡單,把剛創建的Task 添加到任務隊列中去,然后判斷當前正在執行的任務數量與設置的允許最大並發數進行比較, 如果小於該值,則開始通知正在掛起的任務開始執行。
我的疑問主要在 NotifyThreadPoolOfPendingWork 這個方法上。

private void NotifyThreadPoolOfPendingWork()
    {
        ThreadPool.UnsafeQueueUserWorkItem(_ =>
        {
            // Note that the current thread is now processing work items. 
            // This is necessary to enable inlining of tasks into this thread. 
            _currentThreadIsProcessingItems = true;
            try
            {
                // Process all available items in the queue. 
                while (true)
                {
                    Task item;
                    lock (_tasks)
                    {
                        // When there are no more items to be processed, 
                        // note that we're done processing, and get out. 
                        if (_tasks.Count == 0)
                        {
                            --_delegatesQueuedOrRunning;

                            break;
                        }
                        
                        // Get the next item from the queue 
                        item = _tasks.First.Value;
                        _tasks.RemoveFirst();
                    }
                    // Execute the task we pulled out of the queue 
                    base.TryExecuteTask(item);
                }
            }
            // We're done processing items on the current thread 
            finally { _currentThreadIsProcessingItems = false; }
        }, null);
    }

從代碼中看到的意思是一直跑一個死循環, 不斷從_tasks 中取出Task執行,
直到_task為空為止,然后退出循環。從這里並沒有看到限制並發數的限制,只有在QueueTask中調用的時候有個簡單的限制,然而好像並沒有什么卵用,
因為只要 NotifyThreadPoolOfPendingWork 方法啟動了, 就會一直跑,直到所有的Task執行完成。那他的並發數是如何限制的呢?

一直很迷惑,是不是我哪里理解錯了, 還請知道的大神解惑一下。

~~簡直醉了。markdown 不太會用,顯示有些問題。。
如果看着不舒服可以看這里 https://www.zybuluo.com/kevinsforever/note/115066
使用這里的編輯器寫的,但是復制過來不能正常顯示。。
3Q .


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM