線程池ThreadPool及Task調度死鎖分析


近1年,偶爾發生應用系統啟動時某些操作超時的問題,特別在使用4核心Surface以后。筆記本和台式機比較少遇到,服務器則基本上沒有遇到過。

這些年,我寫的應用都有一個習慣,就是啟動時異步做很多准備工作。基本上確定這個問題跟它們有關。

最近兩個月花了些時間分析線程池調度機制,有點繞,這里記錄下來,防止以后忘了。

(文章很長,時間不多的同學可以直接看最后!) 

 

一、現象

這里以一個典型WinForm應用來分析。開發環境Surface Pro4,CPU=4

在vs中調試應用,可以明顯感覺到啟動時會卡3~5秒,卡住時點下暫停。

通過調用棧發現程序死鎖了,調用邏輯偽代碼:Click=>6 * Task.Run(GetBill)=>Init=>GetConfig

業務邏輯,點擊按鈕Click,異步調用6次GetBill,每次都要Init判斷初始化,這里有lock,拿到鎖的第一個線程GetConfig從配置中心拿配置數據。

線程窗口,5個線程卡在 Init 的lock那里,1個線程通過Init進入GetConfig。

GetConfig內部通過HttpClient異步請求數據,用了 task.Wait(5000),這里也卡住了。

就這樣,6個線程死在這,一動不動的。

通過網絡抓包發現,Http的請求早就返回來了,根本不需要等5000ms。

 

查看任務窗口,大量“已阻止”和“已計划”,兩個“等待”,然后大家都不動,這就是死鎖了。

 從任務調度層面來猜測,應該是Task調度隊列擁擠,導致HttpClient異步請求完成以后,沒有辦法安排線程去同時task.Wait(5000)退出。

Task調度一直覺得很復雜,不好深入分析。

 

二、線程池

剛開始以為是大量使用Task.Run所致,大部分改為ThreadPool.QueueUserWorkItem以后,堵塞有所減少,但還是存在。

ILSpy打開ThreadPool發現,它也變得復雜了,不再是.Net2.0時代那個單純的小伙子。

時間優先,上個月寫了個線程池ThreadPoolX,並行隊列管理線程,每次排隊任務委托,就拿一個出來用,用完后還回去。

源碼如下:https://github.com/NewLifeX/X/blob/master/NewLife.Core/Threading/ThreadPoolX.cs

更新到上面這個WinForm應用,死鎖問題立馬解決。

ThreadPoolX非常簡單,所有異步任務都有平等獲取線程的機會,不存在說前面的線程卡住了,后面線程就沒有機會執行。

盡管利用率低一些,但是可以輕易避免這種死鎖的發生。

因此,可以確定是因為Task調度和ThreadPoll調度里面的某種智能化機制,加上程序里可能不合理的使用,導致了死鎖的發生!

 

三、調度機制

上個月雖然解決了問題,但沒有搞清楚內部機制,總是睡不好。最近晚上有時間查了各種資料,以及分析了源碼。

Task/TPL默認都是調用ThreadPool來執行任務,我們就以最常用的Task.Run作為切入點來分析。

 

/// <summary>
/// Queues the specified work to run on the ThreadPool and returns a Task handle for that work.
/// </summary>
/// <param name="action">The work to execute asynchronously</param>
/// <returns>A Task that represents the work queued to execute in the ThreadPool.</returns>
/// <exception cref="T:System.ArgumentNullException">
/// The <paramref name="action"/> parameter was null.
/// </exception>
public static Task Run(Action action)
{
    return Task.InternalStartNew(null, action, null, default(CancellationToken), TaskScheduler.Default,
        TaskCreationOptions.DenyChildAttach, InternalTaskOptions.None);
}

Task.Run內部使用了默認調度器,另一個關注點就是 DenyChildAttach了,阻止其它任務作為當前任務的子任務。

 

// Implicitly converts action to object and handles the meat of the StartNew() logic.
internal static Task InternalStartNew(
    Task creatingTask, Delegate action, object state, CancellationToken cancellationToken, TaskScheduler scheduler,
    TaskCreationOptions options, InternalTaskOptions internalOptions)
{
    // Validate arguments.
    if (scheduler == null)
    {
        ThrowHelper.ThrowArgumentNullException(ExceptionArgument.scheduler);
    }
    Contract.EndContractBlock();

    // Create and schedule the task. This throws an InvalidOperationException if already shut down.
    // Here we add the InternalTaskOptions.QueuedByRuntime to the internalOptions, so that TaskConstructorCore can skip the cancellation token registration
    Task t = new Task(action, state, creatingTask, cancellationToken, options, internalOptions | InternalTaskOptions.QueuedByRuntime, scheduler);

    t.ScheduleAndStart(false);
    return t;
}

InternalStartNew內部實例化一個Task對象,然后調用ScheduleAndStart,加入調度器並且啟動

 

/// <summary>
/// Schedules the task for execution.
/// </summary>
/// <param name="needsProtection">If true, TASK_STATE_STARTED bit is turned on in
/// an atomic fashion, making sure that TASK_STATE_CANCELED does not get set
/// underneath us.  If false, TASK_STATE_STARTED bit is OR-ed right in.  This
/// allows us to streamline things a bit for StartNew(), where competing cancellations
/// are not a problem.</param>
internal void ScheduleAndStart(bool needsProtection)
{
    Debug.Assert(m_taskScheduler != null, "expected a task scheduler to have been selected");
    Debug.Assert((m_stateFlags & TASK_STATE_STARTED) == 0, "task has already started");

    // Set the TASK_STATE_STARTED bit
    if (needsProtection)
    {
        if (!MarkStarted())
        {
            // A cancel has snuck in before we could get started.  Quietly exit.
            return;
        }
    }
    else
    {
        m_stateFlags |= TASK_STATE_STARTED;
    }

    if (s_asyncDebuggingEnabled)
    {
        AddToActiveTasks(this);
    }

    if (AsyncCausalityTracer.LoggingOn && (Options & (TaskCreationOptions)InternalTaskOptions.ContinuationTask) == 0)
    {
        //For all other task than TaskContinuations we want to log. TaskContinuations log in their constructor
        AsyncCausalityTracer.TraceOperationCreation(CausalityTraceLevel.Required, this.Id, "Task: " + m_action.Method.Name, 0);
    }


    try
    {
        // Queue to the indicated scheduler.
        m_taskScheduler.InternalQueueTask(this);
    }
    catch (ThreadAbortException tae)
    {
        AddException(tae);
        FinishThreadAbortedTask(delegateRan: false);
    }
    catch (Exception e)
    {
        // The scheduler had a problem queueing this task.  Record the exception, leaving this task in
        // a Faulted state.
        TaskSchedulerException tse = new TaskSchedulerException(e);
        AddException(tse);
        Finish(false);

        // Now we need to mark ourselves as "handled" to avoid crashing the finalizer thread if we are called from StartNew(),
        // because the exception is either propagated outside directly, or added to an enclosing parent. However we won't do this for
        // continuation tasks, because in that case we internally eat the exception and therefore we need to make sure the user does
        // later observe it explicitly or see it on the finalizer.

        if ((Options & (TaskCreationOptions)InternalTaskOptions.ContinuationTask) == 0)
        {
            // m_contingentProperties.m_exceptionsHolder *should* already exist after AddException()
            Debug.Assert(
                (m_contingentProperties != null) &&
                (m_contingentProperties.m_exceptionsHolder != null) &&
                (m_contingentProperties.m_exceptionsHolder.ContainsFaultList),
                    "Task.ScheduleAndStart(): Expected m_contingentProperties.m_exceptionsHolder to exist " +
                    "and to have faults recorded.");

            m_contingentProperties.m_exceptionsHolder.MarkAsHandled(false);
        }
        // re-throw the exception wrapped as a TaskSchedulerException.
        throw tse;
    }
}

准備了很多工作,最終還是為了加入調度器m_taskScheduler.InternalQueueTask(this)

 

protected internal override void QueueTask(Task task)
{
    if ((task.Options & TaskCreationOptions.LongRunning) != 0)
    {
        Thread thread = new Thread(s_longRunningThreadWork);
        thread.IsBackground = true;
        thread.Start(task);
    }
    else
    {
        bool forceGlobal = (task.Options & TaskCreationOptions.PreferFairness) != TaskCreationOptions.None;
        ThreadPool.UnsafeQueueCustomWorkItem(task, forceGlobal);
    }
}

線程池任務調度器ThreadPoolTaskScheduler的QueueTask是重點。

首先是LongRunning標識,直接開了個新線程,很粗暴很直接。

其次是PreferFairness標識,公平,forceGlobal,這個應該就是導致死鎖的根本。

 

public void Enqueue(IThreadPoolWorkItem callback, bool forceGlobal)
{
    if (loggingEnabled)
        System.Diagnostics.Tracing.FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject(callback);

    ThreadPoolWorkQueueThreadLocals tl = null;
    if (!forceGlobal)
        tl = ThreadPoolWorkQueueThreadLocals.threadLocals;

    if (null != tl)
    {
        tl.workStealingQueue.LocalPush(callback);
    }
    else
    {
        workItems.Enqueue(callback);
    }

    EnsureThreadRequested();
}

未打開全局且有本地隊列時,放入本地隊列threadLocals,否則加入全局隊列workItems。

正式化這個本地隊列的優化機制,導致了我們的死鎖。

 

如果應用層直接調用 ThreadPool.QueueUserWorkItem ,都是 forceGlobal=true,也就都是全局隊列。

這也說明了為什么我們把部分Task.Run改為ThreadPool.QueueUserWorkItem后,情況有所改觀。

 

internal void EnsureThreadRequested()
{
    //
    // If we have not yet requested #procs threads from the VM, then request a new thread.
    // Note that there is a separate count in the VM which will also be incremented in this case, 
    // which is handled by RequestWorkerThread.
    //
    int count = numOutstandingThreadRequests;
    while (count < ThreadPoolGlobals.processorCount)
    {
        int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count + 1, count);
        if (prev == count)
        {
            ThreadPool.RequestWorkerThread();
            break;
        }
        count = prev;
    }
}

上面把任務放入隊列后,通過QCall調用了EnsureThreadRequested,通過numOutstandingThreadRequests控制數量,此時豁然開朗!

原來,這里才是真正的申請線程池來處理隊列里面的任務,並且最大申請數就是處理器個數!

我們可以寫個簡單程序來驗證一下:

Console.WriteLine("CPU={0}", Environment.ProcessorCount);
for (var i = 0; i < 10; i++)
{
    ThreadPool.QueueUserWorkItem(s =>
    {
        var n = (Int32)s;
        Console.WriteLine("{0:HH:mm:ss.fff} th {1} start", DateTime.Now, n);
        Thread.Sleep(2000);
        Console.WriteLine("{0:HH:mm:ss.fff} th {1} end", DateTime.Now, n);
    }, i);
}

CPU=4
18:05:27.936 th 2 start
18:05:27.936 th 3 start
18:05:27.936 th 1 start
18:05:27.936 th 0 start
18:05:29.373 th 4 start
18:05:29.939 th 2 end
18:05:29.940 th 5 start
18:05:29.940 th 0 end
18:05:29.941 th 6 start
18:05:29.940 th 1 end
18:05:29.940 th 3 end
18:05:29.942 th 7 start
18:05:29.942 th 8 start
18:05:30.871 th 9 start
18:05:31.374 th 4 end
18:05:31.942 th 5 end
18:05:31.942 th 6 end
18:05:31.943 th 7 end
18:05:31.943 th 8 end
18:05:32.872 th 9 end

在我的4核心CPU上執行,27.936先調度了4個任務,然后1秒多之后再調度第5個任務,其它任務則是等前面4個任務完成以后才有機會。

第5個任務能夠在前4個完成之前得到調度,可能跟Sleep有關,這是內部機制了。(后面證明這個猜測是有誤的)

目前可以判斷的是,ThreadPool空有1000個最大線程數,但實際上只能用略大於CPU個數的線程!(CPU+1 ?)

當然,它內部應該有其它機制來增加線程調度,比如Sleep。

 

最后是調度Dispatch

internal static bool Dispatch()
{
    var workQueue = ThreadPoolGlobals.workQueue;
    //
    // The clock is ticking!  We have ThreadPoolGlobals.TP_QUANTUM milliseconds to get some work done, and then
    // we need to return to the VM.
    //
    int quantumStartTime = Environment.TickCount;

    //
    // Update our records to indicate that an outstanding request for a thread has now been fulfilled.
    // From this point on, we are responsible for requesting another thread if we stop working for any
    // reason, and we believe there might still be work in the queue.
    //
    // Note that if this thread is aborted before we get a chance to request another one, the VM will
    // record a thread request on our behalf.  So we don't need to worry about getting aborted right here.
    //
    workQueue.MarkThreadRequestSatisfied();

    // Has the desire for logging changed since the last time we entered?
    workQueue.loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool | FrameworkEventSource.Keywords.ThreadTransfer);

    //
    // Assume that we're going to need another thread if this one returns to the VM.  We'll set this to 
    // false later, but only if we're absolutely certain that the queue is empty.
    //
    bool needAnotherThread = true;
    IThreadPoolWorkItem workItem = null;
    try
    {
        //
        // Set up our thread-local data
        //
        ThreadPoolWorkQueueThreadLocals tl = workQueue.EnsureCurrentThreadHasQueue();

        //
        // Loop until our quantum expires.
        //
        while ((Environment.TickCount - quantumStartTime) < ThreadPoolGlobals.TP_QUANTUM)
        {
            bool missedSteal = false;
            workItem = workQueue.Dequeue(tl, ref missedSteal);

            if (workItem == null)
            {
                //
                // No work.  We're going to return to the VM once we leave this protected region.
                // If we missed a steal, though, there may be more work in the queue.
                // Instead of looping around and trying again, we'll just request another thread.  This way
                // we won't starve other AppDomains while we spin trying to get locks, and hopefully the thread
                // that owns the contended work-stealing queue will pick up its own workitems in the meantime, 
                // which will be more efficient than this thread doing it anyway.
                //
                needAnotherThread = missedSteal;

                // Tell the VM we're returning normally, not because Hill Climbing asked us to return.
                return true;
            }

            if (workQueue.loggingEnabled)
                System.Diagnostics.Tracing.FrameworkEventSource.Log.ThreadPoolDequeueWorkObject(workItem);

            //
            // If we found work, there may be more work.  Ask for another thread so that the other work can be processed
            // in parallel.  Note that this will only ask for a max of #procs threads, so it's safe to call it for every dequeue.
            //
 workQueue.EnsureThreadRequested();

            //
            // Execute the workitem outside of any finally blocks, so that it can be aborted if needed.
            //
            if (ThreadPoolGlobals.enableWorkerTracking)
            {
                bool reportedStatus = false;
                try
                {
                    ThreadPool.ReportThreadStatus(isWorking: true);
                    reportedStatus = true;
                    workItem.ExecuteWorkItem();
                }
                finally
                {
                    if (reportedStatus)
                        ThreadPool.ReportThreadStatus(isWorking: false);
                }
            }
            else
            {
                workItem.ExecuteWorkItem();
            }
            workItem = null;

            // 
            // Notify the VM that we executed this workitem.  This is also our opportunity to ask whether Hill Climbing wants
            // us to return the thread to the pool or not.
            //
            if (!ThreadPool.NotifyWorkItemComplete())
                return false;
        }

        // If we get here, it's because our quantum expired.  Tell the VM we're returning normally.
        return true;
    }
    catch (ThreadAbortException tae)
    {
        //
        // This is here to catch the case where this thread is aborted between the time we exit the finally block in the dispatch
        // loop, and the time we execute the work item.  QueueUserWorkItemCallback uses this to update its accounting of whether
        // it was executed or not (in debug builds only).  Task uses this to communicate the ThreadAbortException to anyone
        // who waits for the task to complete.
        //
        workItem?.MarkAborted(tae);

        //
        // In this case, the VM is going to request another thread on our behalf.  No need to do it twice.
        //
        needAnotherThread = false;
        // throw;  //no need to explicitly rethrow a ThreadAbortException, and doing so causes allocations on amd64.
    }
    finally
    {
        //
        // If we are exiting for any reason other than that the queue is definitely empty, ask for another
        // thread to pick up where we left off.
        //
        if (needAnotherThread)
            workQueue.EnsureThreadRequested();
    }

    // we can never reach this point, but the C# compiler doesn't know that, because it doesn't know the ThreadAbortException will be reraised above.
    Debug.Fail("Should never reach this point");
    return true;
}
public IThreadPoolWorkItem Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool missedSteal)
{
    WorkStealingQueue localWsq = tl.workStealingQueue;
    IThreadPoolWorkItem callback;

    if ((callback = localWsq.LocalPop()) == null && // first try the local queue
        !workItems.TryDequeue(out callback)) // then try the global queue
    {
        // finally try to steal from another thread's local queue
        WorkStealingQueue[] queues = WorkStealingQueueList.Queues;
        int c = queues.Length;
        Debug.Assert(c > 0, "There must at least be a queue for this thread.");
        int maxIndex = c - 1;
        int i = tl.random.Next(c);
        while (c > 0)
        {
            i = (i < maxIndex) ? i + 1 : 0;
            WorkStealingQueue otherQueue = queues[i];
            if (otherQueue != localWsq && otherQueue.CanSteal)
            {
                callback = otherQueue.TrySteal(ref missedSteal);
                if (callback != null)
                {
                    break;
                }
            }
            c--;
        }
    }

    return callback;
}
internal void MarkThreadRequestSatisfied()
{
    int num2;
    for (int num = numOutstandingThreadRequests; num > 0; num = num2)
    {
        num2 = Interlocked.CompareExchange(ref numOutstandingThreadRequests, num - 1, num);
        if (num2 == num)
        {
            break;
        }
    }
}

這個Dispatch應該是由內部借出來的線程池線程調用,有點意思:

  1. 每次Dispatch開始,通過MarkThreadRequestSatisfied遞減了numOutstandingThreadRequests,也就是說它只是一個backlog,真正的線程數可以超過CPU
  2. 一次Dispatch處理多個任務,只要總耗時不超過30個滴答,這樣可以減少線程切換
  3. 每次從隊列拿一個任務來處理,然后檢查打開更多線程(如果不足CPU數)
  4. 先從本地隊列彈出任務,然后到全局隊列,最后再從其它線程的本地隊列隨機偷一個
  5. 本地隊列是壓棧彈棧FILO,也就是先進來的任務后執行

這里最復雜的就是本地隊列FILO結構,這也是專門為Task而設計。

這個numOutstandingThreadRequests就是一個backlog積壓設計,意思是排隊請求線程池借線程的個數,不超過CPU個數,等線程借出來以后(Dispatch),個數減一,可以空出來一個排隊位。

 

四、線程池增長

從前面的測試來看,線程調度超過4以后,的確受到了限制,並沒有直接繼續分配,也沒有停止分配,而是在大概430ms以后。

這個現象在.Net 2.0時代就有前輩分析過,我們可以找到一些比較新的說法。

https://gist.github.com/JonCole/e65411214030f0d823cb

Once the number of existing (busy) threads hits the "minimum" number of threads, the ThreadPool will throttle the rate at which is injects new threads to one thread per 500 milliseconds. This means that if your system gets a burst of work needing an IOCP thread, it will process that work very quickly. However, if the burst of work is more than the configured "Minimum" setting, there will be some delay in processing some of the work as the ThreadPool waits for one of two things to happen 1. An existing thread becomes free to process the work 2. No existing thread becomes free for 500ms, so a new thread is created.

在ThreadPool的繁忙線程數達到最小值之前,處理速度非常快,達到最小值之后,就會有500ms的延遲,或者等待其它繁忙線程空閑下來

Given the above information, we strongly recommend that customers set the minimum configuration value for IOCP and WORKER threads to something larger than the default value. We can't give one-size-fits-all guidance on what this value should be because the right value for one application will be too high/low for another application. This setting can also impact the performance of other parts of complicated applications, so each customer needs to fine-tune this setting to their specific needs. A good starting place is 100, then test and tweak as needed.

根據這些信息,建議用戶去加大ThreadPool的最小值,當然這個設置也可能會對應用系統的其它部分造成影響,建議最小值100,具體要根據實際應用去測試。

五、死鎖

線程池內阻塞調用其它操作導致死鎖的問題,還是沒有解決。

我們從官方找到了一些資料:https://msdn.microsoft.com/en-us/library/ms973903.aspx

文章有點長,直接看“Deadlocks”這一節。

Imagine a method in your code that needs to connect via socket with a Web server. A possible implementation is opening the connection asynchronously with the Socket class' BeginConnect method and wait for the connection to be established with the EndConnect method. 

假設一個場景,需要連接Web服務器,實現代碼是Socket.BeginConnect異步連接,然后EndConnect阻塞。

class ConnectionSocket
{
   public void Connect()
   {
      IPHostEntry ipHostEntry = Dns.Resolve(Dns.GetHostName());
      IPEndPoint ipEndPoint = new IPEndPoint(ipHostEntry.AddressList[0],
         80);
      Socket s = new Socket(ipEndPoint.AddressFamily, SocketType.Stream,
         ProtocolType.Tcp);
      IAsyncResult ar = s.BeginConnect(ipEndPoint, null, null);
      s.EndConnect(ar);
   }
}

So far, so good—calling BeginConnect makes the asynchronous operation execute on the thread pool and EndConnect blocks waiting for the connection to be established.

What happens if we use this class from a function executed on the thread pool? Imagine that the size of the pool is just two threads and we launch two asynchronous functions that use our connection class. With both functions executing on the pool, there is no room for additional requests until the functions are finished. The problem is that these functions call our class' Connect method. This method launches again an asynchronous operation on the thread pool, but since the pool is full, the request is queued waiting any thread to be free. Unfortunately, this will never happen because the functions that are using the pool are waiting for the queued functions to finish. The conclusion: our application is blocked.

到目前為止挺好的,這代碼也能用。但是,當客戶端借助線程池並行發起多個連接操作時,想象一下線程池只有2個線程,而我們發起了2個異步請求,都在池里執行,沒有更多的線程來完成這些異步請求,除非現有函數執行完成。而現有函數在等待異步完成,因此,產生了死鎖!

In general, a deadlock can appear whenever a pool thread waits for an asynchronous function to finish. If we change the code so that we use the synchronous version of Connect, the problem will disappear:

一般來說,在線程池里等待一個異步操作完成,就可能會產生死鎖!

可以把上面例程修改為同步調用,問題即可解決:

class ConnectionSocket
{
   public void Connect()
   {
      IPHostEntry ipHostEntry = Dns.Resolve(Dns.GetHostName());
      IPEndPoint ipEndPoint = new IPEndPoint(ipHostEntry.AddressList[0], 80);
      Socket s = new Socket(ipEndPoint.AddressFamily, SocketType.Stream,
         ProtocolType.Tcp);
      s.Connect(ipEndPoint);
   }
}

If you want to avoid deadlocks in your applications, do not ever block a thread executed on the pool that is waiting for another function on the pool. This seems to be easy, but keep in mind that this rule implies two more:

  • Do not create any class whose synchronous methods wait for asynchronous functions, since this class could be called from a thread on the pool.
  • Do not use any class inside an asynchronous function if the class blocks waiting for asynchronous functions.

If you want to detect a deadlock in your application, check the available number of threads on the thread pool when your system is hung. The lack of available threads and CPU utilization near 0% are clear symptoms of a deadlock. You should monitor your code to identify where a function executed on the pool is waiting for an asynchronous operation and remove it.

如果想要解決死鎖,核心法則是不要在線程池里等待另一個在池里執行的函數!

具體規則如下:

  1. 不要在同步方法中等待異步方法,因為同步方法可能在線程池里執行
  2. 不要在異步方法中阻塞等待異步方法

 

我們的代碼,很多地方存在同步異步混用的情況,需要逐步改進。

最后,我們采取了一個快速有效的辦法,把線程池最小值設置為256,各種死鎖馬上消失

 

End.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM