C# 可指定並行度任務調度器


可指定並行度的任務調度器

https://social.msdn.microsoft.com/Forums/zh-CN/b02ba3b4-539b-46b7-af6b-a5ca3a61a309/task?forum=visualcshartzhchs

/// <summary>
/// 指定最大並行度的任務調度器
/// </summary>
public class SpecifyDegreeOfParallelismTaskScheduler : TaskScheduler
{
    /// <summary>
    /// 信號量鎖
    /// </summary>
    private static System.Threading.SemaphoreSlim _lock = new System.Threading.SemaphoreSlim(1);

    /// <summary>
    /// 當前線程是否正在處理任務
    /// </summary>
    [ThreadStatic]
    private static bool _currentThreadIsProcessingItems;

    /// <summary>
    /// 執行的任務隊列
    /// </summary>
    private readonly LinkedList<Task> _tasks = new LinkedList<Task>();

    /// <summary>
    /// 指定的最大並行度
    /// </summary>
    private readonly int _maxDegressOfParallelism;

    /// <summary>
    ///當前調度器中正在執行的任務數
    /// </summary>
    private int _runingTasks = 0;

    /// <summary>
    /// 指示此調度器能夠支持的最大並發級別。
    /// </summary>
    public override int MaximumConcurrencyLevel { get { return this._maxDegressOfParallelism; } }
    

    /// <summary>
    /// 初始化一個可指定最大並行度的任務調度器
    /// </summary>
    /// <param name="maxDegreeOfParallelism">最大並行度</param>
    public SpecifyDegreeOfParallelismTaskScheduler(int maxDegreeOfParallelism)
    {
        if (maxDegreeOfParallelism < 1)
            throw new ArgumentOutOfRangeException("maxDegreeOfParallelism至少為1");
        this._maxDegressOfParallelism = maxDegreeOfParallelism;
    }

    
    /// <summary>
    /// 將Task排隊到調度器中
    /// </summary>
    /// <param name="task">要排隊的任務</param>
    protected override void QueueTask(Task task)
    {
        _lock.Wait();
        try
        {
            this._tasks.AddLast(task);
            if (this._runingTasks < this._maxDegressOfParallelism)
            {
                ++this._runingTasks;
                ConsumeTaskOfPending();
            }
        }
        finally
        {
            _lock.Release();
        }
    }

    /// <summary>
    /// 嘗試在當前線程上執行指定的任務
    /// </summary>
    /// <param name="task">被執行的任務</param>
    /// <param name="taskWasPreviouslyQueued">指定的任務之前是否已經排隊</param>
    /// <returns>是否能在當前線程執行此任務</returns>
    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        //如果當前前程沒有正在處理項目,無法內聯
        if (!_currentThreadIsProcessingItems)
            return false;

        //如果任務之前已經被排隊,將其從隊列中刪除
        if (taskWasPreviouslyQueued)
            TryDequeue(task);
        return base.TryExecuteTask(task);
    }

    /// <summary>
    /// 消費隊列中等待的任務
    /// </summary>
    private void ConsumeTaskOfPending()
    {
        ThreadPool.UnsafeQueueUserWorkItem(p =>
        {
            _currentThreadIsProcessingItems = true;
            try
            {
                while (true)
                {
                    Task item;
                    _lock.Wait();
                    try
                    {
                        if (this._tasks.Count == 0)
                        {
                            --this._runingTasks;
                            break;
                        }
                        item = this._tasks.First.Value;
                        this._tasks.RemoveFirst();
                    }
                    finally
                    {
                        _lock.Release();
                    }
                    base.TryExecuteTask(item);
                }
            }
            finally
            {
                _currentThreadIsProcessingItems = false;
            }
        }, null);
    }

    /// <summary>
    /// 嘗試將任務從隊列移除
    /// </summary>
    /// <param name="task">要移除的任務</param>
    /// <returns>是否成功將任務從隊列中移除</returns>
    protected override bool TryDequeue(Task task)
    {
        _lock.Wait();
        try
        {
            return this._tasks.Remove(task);
        }
        finally
        {
            _lock.Release();
        }
    }

    /// <summary>
    /// 獲取當前調度器中已調度任務序列
    /// </summary>
    /// <returns>可遍歷已調度任務序列</returns>
    protected override IEnumerable<Task> GetScheduledTasks()
    {
        _lock.Wait();
        try
        {
            return this._tasks.ToArray();
        }
        finally
        {
            _lock.Release();
        }
    }

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM