可指定並行度的任務調度器
https://social.msdn.microsoft.com/Forums/zh-CN/b02ba3b4-539b-46b7-af6b-a5ca3a61a309/task?forum=visualcshartzhchs
/// <summary>
/// 指定最大並行度的任務調度器
/// </summary>
public class SpecifyDegreeOfParallelismTaskScheduler : TaskScheduler
{
/// <summary>
/// 信號量鎖
/// </summary>
private static System.Threading.SemaphoreSlim _lock = new System.Threading.SemaphoreSlim(1);
/// <summary>
/// 當前線程是否正在處理任務
/// </summary>
[ThreadStatic]
private static bool _currentThreadIsProcessingItems;
/// <summary>
/// 執行的任務隊列
/// </summary>
private readonly LinkedList<Task> _tasks = new LinkedList<Task>();
/// <summary>
/// 指定的最大並行度
/// </summary>
private readonly int _maxDegressOfParallelism;
/// <summary>
///當前調度器中正在執行的任務數
/// </summary>
private int _runingTasks = 0;
/// <summary>
/// 指示此調度器能夠支持的最大並發級別。
/// </summary>
public override int MaximumConcurrencyLevel { get { return this._maxDegressOfParallelism; } }
/// <summary>
/// 初始化一個可指定最大並行度的任務調度器
/// </summary>
/// <param name="maxDegreeOfParallelism">最大並行度</param>
public SpecifyDegreeOfParallelismTaskScheduler(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism < 1)
throw new ArgumentOutOfRangeException("maxDegreeOfParallelism至少為1");
this._maxDegressOfParallelism = maxDegreeOfParallelism;
}
/// <summary>
/// 將Task排隊到調度器中
/// </summary>
/// <param name="task">要排隊的任務</param>
protected override void QueueTask(Task task)
{
_lock.Wait();
try
{
this._tasks.AddLast(task);
if (this._runingTasks < this._maxDegressOfParallelism)
{
++this._runingTasks;
ConsumeTaskOfPending();
}
}
finally
{
_lock.Release();
}
}
/// <summary>
/// 嘗試在當前線程上執行指定的任務
/// </summary>
/// <param name="task">被執行的任務</param>
/// <param name="taskWasPreviouslyQueued">指定的任務之前是否已經排隊</param>
/// <returns>是否能在當前線程執行此任務</returns>
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
//如果當前前程沒有正在處理項目,無法內聯
if (!_currentThreadIsProcessingItems)
return false;
//如果任務之前已經被排隊,將其從隊列中刪除
if (taskWasPreviouslyQueued)
TryDequeue(task);
return base.TryExecuteTask(task);
}
/// <summary>
/// 消費隊列中等待的任務
/// </summary>
private void ConsumeTaskOfPending()
{
ThreadPool.UnsafeQueueUserWorkItem(p =>
{
_currentThreadIsProcessingItems = true;
try
{
while (true)
{
Task item;
_lock.Wait();
try
{
if (this._tasks.Count == 0)
{
--this._runingTasks;
break;
}
item = this._tasks.First.Value;
this._tasks.RemoveFirst();
}
finally
{
_lock.Release();
}
base.TryExecuteTask(item);
}
}
finally
{
_currentThreadIsProcessingItems = false;
}
}, null);
}
/// <summary>
/// 嘗試將任務從隊列移除
/// </summary>
/// <param name="task">要移除的任務</param>
/// <returns>是否成功將任務從隊列中移除</returns>
protected override bool TryDequeue(Task task)
{
_lock.Wait();
try
{
return this._tasks.Remove(task);
}
finally
{
_lock.Release();
}
}
/// <summary>
/// 獲取當前調度器中已調度任務序列
/// </summary>
/// <returns>可遍歷已調度任務序列</returns>
protected override IEnumerable<Task> GetScheduledTasks()
{
_lock.Wait();
try
{
return this._tasks.ToArray();
}
finally
{
_lock.Release();
}
}
}