面試八股文:你寫過自定義任務調度器嗎?


最近入職了新公司,嘗試閱讀祖傳代碼,記錄並更新最近的編程認知。
思緒由Q1引發,后續Q2、Q3基於Q1的發散探究。

Q1. Task.Run、Task.Factory.StartNew 的區別?


我們常使用`Task.Run`和`Task.Factory.StartNew`創建並啟動任務,但是他們的區別在哪里?在哪種場景下使用前后者?

Task.Factory.StartNew於.NET Framework 4.0時代引入,衍生到.netcore,.net8

Task.Factory.StartNew通過TaskCreationOptions、TaskScheduler參數提供了精細化控制任務調度的能力

精細化控制的的背景是:
比如:一個長時間運行的任務,如果由線程池線程執行,可能濫用線程池線程(因為線程池線程數量有限,一般處理快&短的任務),這個時候最好在獨立線程中執行這個任務。
對於這樣的任務就可以: Task.Factory.StartNew(..., TaskCreationOptions.LongRunning);

Task.Run方法於.netframework 4.5時代引入,衍生到現在。

官方引入Task.Run並不是為廢棄Task.Factory, 而是因為Task.Run提供了一種簡寫,或者說是Task.Run是Task.Factory.StartNew的一個特例,Task.Run 只是提供了一個無參、默認的任務創建和調度方式。
當你在Task.Run傳遞委托 Task.Run(someAction);
實際上等價於Task.Factory.StartNew(someAction, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);


源碼驗證:

        /// <summary>
        /// Queues the specified work to run on the ThreadPool and returns a Task handle for that work.
        /// </summary>
        /// <param name="action">The work to execute asynchronously</param>
        /// <returns>A Task that represents the work queued to execute in the ThreadPool.</returns>
        /// <exception cref="T:System.ArgumentNullException">
        /// The <paramref name="action"/> parameter was null.
        /// </exception>
        [MethodImplAttribute(MethodImplOptions.NoInlining)] // Methods containing StackCrawlMark local var have to be marked non-inlineable            
        public static Task Run(Action action)
        {
            StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
            return Task.InternalStartNew(null, action, null, default(CancellationToken), TaskScheduler.Default,
                TaskCreationOptions.DenyChildAttach, InternalTaskOptions.None, ref stackMark);
        }
     // Implicitly converts action to object and handles the meat of the StartNew() logic.
        internal static Task InternalStartNew(
            Task creatingTask, Delegate action, object state, CancellationToken cancellationToken, TaskScheduler scheduler,
            TaskCreationOptions options, InternalTaskOptions internalOptions, ref StackCrawlMark stackMark)
        {
            // Validate arguments.
            if (scheduler == null)
            {
                throw new ArgumentNullException("scheduler");
            }
            Contract.EndContractBlock();

            // Create and schedule the task. This throws an InvalidOperationException if already shut down.
            // Here we add the InternalTaskOptions.QueuedByRuntime to the internalOptions, so that TaskConstructorCore can skip the cancellation token registration
            Task t = new Task(action, state, creatingTask, cancellationToken, options, internalOptions | InternalTaskOptions.QueuedByRuntime, scheduler);
            t.PossiblyCaptureContext(ref stackMark);

            t.ScheduleAndStart(false);
            return t;
        }
    

僅此無他。

另外 Task.Run 和Task.Factory.StartNew創建的任務大部分時候會利用線程池任務調度器ThreadPoolTaskScheduler

   // [github: TaskScheduler](https://github.com/dotnet/coreclr/blob/master/src/System.Private.CoreLib/shared/System/Threading/Tasks/TaskScheduler.cs) 251行顯示`TaskSchedule.Dafult`確實是線程池任務調度器。  
   // An AppDomain-wide default manager.
   private static readonly TaskScheduler s_defaultTaskScheduler = new ThreadPoolTaskScheduler();

thread/task, threadpool taskscheduler 的關系?



Q2. 既然說到Task.Run使用線程池線程,線程池線程有哪些特征? 為什么有自定義任務調度器一說?


線程池線程的特征:
① 池中線程都是后台線程
② 線程可重用,一旦線程池中的線程完成任務,將返回到等待線程隊列中, 避免了創建線程的開銷
③ 池中預熱了工作者線程、IO線程,

  • 線程池最大線程數:線程池線程都忙碌,后續任務將排隊等待空閑線程;
  • 最小值:線程池根據需要提供 工作線程/IO完成線程, 直到達到某最小值; 達到某最小值,線程池可以創建或者等待。

我啟動一個腳手架項目: 默認最大工作者線程32767,最大IO線程1000 ; 默認最小工作線程數、最小IO線程數均為8個

github: ThreadPoolTaskScheduler 顯示線程池任務調度器是這樣調度任務的:

/// <summary>
/// Schedules a task to the ThreadPool.
/// </summary>
/// <param name="task">The task to schedule.</param>
protected internal override void QueueTask(Task task)
{
     TaskCreationOptions options = task.Options;
     if ((options & TaskCreationOptions.LongRunning) != 0)
     {
          // Run LongRunning tasks on their own dedicated thread.
          Thread thread = new Thread(s_longRunningThreadWork);
          thread.IsBackground = true; // Keep this thread from blocking process shutdown
          thread.Start(task);
    }
    else
    {
         // Normal handling for non-LongRunning tasks.
        bool preferLocal = ((options & TaskCreationOptions.PreferFairness) == 0);
        ThreadPool.UnsafeQueueUserWorkItemInternal(task, preferLocal);
    }
}

注意8-14行:若上層使用者將LongRunning任務應用到默認的任務調度器(也即線程池任務調度器),線程池任務調度器會有一個兜底方案,會將任務放在獨立線程上執行。

何時不使用線程池線程

有幾種應用場景,其中適合創建並管理自己的線程,而非使用線程池線程:

  • 需要一個前台線程。
  • 需要具有特定優先級的線程。
  • 擁有會導致線程長時間阻塞的任務。 線程池具有最大線程數,因此大量被阻塞的線程池線程可能會阻止任務啟動。
  • 需將線程放入單線程單元。 所有 ThreadPool 線程均位於多線程單元中。
  • 需具有與線程關聯的穩定標識,或需將一個線程專用於一項任務。

Q3. 既然要自定義任務調度器,那我們就來自定義一下?

實現TaskScheduler 抽象類,其中的抓手是調度,也就是 QueueTask 方法,之后你自由定義數據結構, 從數據結構中調度出線程來執行任務。

public sealed class CustomTaskScheduler : TaskScheduler, IDisposable
    {
        private BlockingCollection<Task> tasksCollection = new BlockingCollection<Task>();
        private readonly Thread mainThread = null;
        public CustomTaskScheduler()
        {
            mainThread = new Thread(new ThreadStart(Execute));
            if (!mainThread.IsAlive)
            {
                mainThread.Start();
            }
        }
        private void Execute()
        {
            foreach (var task in tasksCollection.GetConsumingEnumerable())
            {
                TryExecuteTask(task);
            }
        }
        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return tasksCollection.ToArray();
        }
        protected override void QueueTask(Task task)
        {
            if (task != null)
                tasksCollection.Add(task);           
        }
        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            return false;
        }
        private void Dispose(bool disposing)
        {
            if (!disposing) return;
            tasksCollection.CompleteAdding();
            tasksCollection.Dispose();
        }
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }
    }

應用我們的自定義任務調度器:

CustomTaskScheduler taskScheduler = new CustomTaskScheduler();
Task.Factory.StartNew(() => SomeMethod(), CancellationToken.None, TaskCreationOptions.None, taskScheduler);

文末總結

  1. Task.Run提供了創建任務的默認方式,是Task.Factory.StartNew的特例, 兩者大部分時候是利用線程池執行任務。
  2. 線程池任務調度器對長時間運行的任務 做了兜底方案。
  3. 自定義任務調度器。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM