Winform同步調用異步函數死鎖原因分析、為什么要用異步


1、前言

幾年前,一個開發同學遇到同步調用異步函數出現死鎖問題,導致UI界面假死。我解釋了一堆,關於狀態機、線程池、WindowsFormsSynchronizationContext.Post、control.BeginInvoke、APC、IOCP,結果我也沒講明白、他也沒聽明白。后來路過他座位時看到他在各種摸索、嘗試,使用Task、await、async各種組合,當時的場景是這樣的:

1。問題有點復雜,隨着那個開發同學離職轉做產品后,就不了了之了。工作中許多同事對於同步、異步也不是特別了解,我會以執行流程圖表加源碼的形式表述,希望通過這篇文章最少能讓大家了解.NET的async await出現deadlock的原因,最好能粗略了解async狀態機機制、.NET在不同平台網絡調用實現機制。如果文章中表述存在問題,歡迎指正。

2、場景再現、執行過程解析

Winform死鎖場景

如下代碼,如果點擊按鈕觸發btn_realDead_Click事件,Ui線程將掛起在DeadTask().Result陷入死鎖。

死鎖產生的原因: Ui線程阻塞等待Task完成,Task需要通過Ui線程設置完成結果。

        private void btn_realDead_Click(object sender, EventArgs e)
        {
            var result = DeadTask().Result; // UI線程掛起位置
            PrintInfo(result);
        }

        /// <summary>
        /// 
        /// </summary>
        /// <returns></returns>
        private async Task<string> DeadTask()
        {
            await Task.Delay(500);
            return await Task.FromResult("Hello world");
        }

場景模擬,解析WindowsFormsSynchronizationContext.Post執行過程

Demo代碼地址 : https://gitee.com/RiverBied/async-demo

死鎖模擬代碼

使用async關鍵字將會由編譯器生成狀態機代碼,反編譯的代碼也不太直觀,所以我先使用非async代碼進行簡化模擬,async代碼下文在解析。

死鎖產生的原因: Ui線程阻塞等待Task完成,Task需要通過Ui線程設置完成結果。

解除死鎖: 通過其他線程設置Task完成結果,Ui線程等到Task完成信號繼續執行,死鎖得到解除。

image-20211016172925842

點擊模擬死鎖后,輸出信息:

image-20211016175145305

執行過程

相信大家看完下面這個圖,會有更直觀認識。可以看到CurrentSynchronizationContext.Post的SendOrPostCallback內容被包裝為ThreadMethodEntry寫入到窗體的隊列對象的_threadCallbackList。但是 _threadCallbackList什么觸發的,采用的是User32 MessageW異步消息接口,最后在UI線程空閑時系統觸發窗體回調函數WndProc。

image-20211017114752877

CurrentSynchronizationContext=WindowsFormsSynchronizationContext

WindowsFormsSynchronizationContext設置代碼:

        // 示例代碼
		public Form1()
        {
            InitializeComponent();
            CurrentSynchronizationContext = SynchronizationContext.Current;
            var controlToSendToField = typeof(WindowsFormsSynchronizationContext).GetField("controlToSendTo", BindingFlags.Instance | BindingFlags.NonPublic);
            // controlToSendTo設置為當前窗口對象,讓重寫的WndProc執行接收到消息
            controlToSendToField.SetValue(CurrentSynchronizationContext, this);
        }

WindowsFormsSynchronizationContext.Post源碼:

SynchronizationContext.Post功能為發送一個異步委托消息,不阻塞當前線程,委托消息需要在SynchronizationContext綁定線程進行執行。在死鎖模擬場景中SynchronizationContext綁定的為Ui線程,所以委托消息需要在Ui線程進行執行。

//源碼地址: //https://github.com/dotnet/winforms/blob/release/5.0/src/System.Windows.Forms/src/System/Windows/Forms/WindowsFormsSynchronizationContext.cs#L90
		public override void Post(SendOrPostCallback d, object state)
        {
            // 調用form1窗口對象的BeginInvoke
            controlToSendTo?.BeginInvoke(d, new object[] { state });
        }

Control.BeginInvoke

BeginInvoke關鍵源碼:

 // 定義保證在整個系統中唯一的窗口消息,消息值可用於發送或發布消息,返回窗口消息標識(int)。
 s_threadCallbackMessage = User32.RegisterWindowMessageW(Application.WindowMessagesVersion + "_ThreadCallbackMessage"); 
 // 將回調函數執行信息添加到回調函數隊列,回調函數即為WindowsFormsSynchronizationContext.Post的SendOrPostCallback參數,_threadCallbackList為Control字段
 _threadCallbackList.Enqueue(tme);
 // 在與創建指定窗口的線程關聯的消息隊列中放置(發布)一條消息,並在不等待線程處理消息的情況下返回
 User32.PostMessageW(this, s_threadCallbackMessage);   

BeginInvoke源碼:

//源碼地址:
//https://github.com/dotnet/winforms/blob/release/5.0/src/System.Windows.Forms/src/System/Windows/Forms/Control.cs#L4678
private object MarshaledInvoke(Control caller, Delegate method, object[] args, bool synchronous)
        {
            if (!IsHandleCreated)
            {
                throw new InvalidOperationException(SR.ErrorNoMarshalingThread);
            }

            ActiveXImpl activeXImpl = (ActiveXImpl)Properties.GetObject(s_activeXImplProperty);

            // We don't want to wait if we're on the same thread, or else we'll deadlock.
            // It is important that syncSameThread always be false for asynchronous calls.
            bool syncSameThread = false;

            if (User32.GetWindowThreadProcessId(this, out _) == Kernel32.GetCurrentThreadId())
            {
                if (synchronous)
                {
                    syncSameThread = true;
                }
            }

            ExecutionContext executionContext = null;
            if (!syncSameThread)
            {
                executionContext = ExecutionContext.Capture();
            }
            ThreadMethodEntry tme = new ThreadMethodEntry(caller, this, method, args, synchronous, executionContext);

            lock (this)
            {
                if (_threadCallbackList is null)
                {
                    _threadCallbackList = new Queue();
                }
            }

            lock (_threadCallbackList)
            {
                if (s_threadCallbackMessage == User32.WM.NULL)
                {
                    // 注冊消息返回消息標識(int)
                    s_threadCallbackMessage = User32.RegisterWindowMessageW(Application.WindowMessagesVersion + "_ThreadCallbackMessage");
                }
				// 將回調函數執行信息添加到回調函數隊列
                _threadCallbackList.Enqueue(tme);
            }
			// 同一個線程則直接執行
            if (syncSameThread)
            {
                InvokeMarshaledCallbacks();
            }
            else
            {
                // 將一個消息放入(寄送)到與指定窗口創建的線程相聯系消息隊列里
                User32.PostMessageW(this, s_threadCallbackMessage);
            }

            if (synchronous)
            {
                if (!tme.IsCompleted)
                {
                    WaitForWaitHandle(tme.AsyncWaitHandle);
                }
                if (tme._exception != null)
                {
                    throw tme._exception;
                }
                return tme._retVal;
            }
            else
            {
                return tme;
            }
        }

WndProc

應用程序中定義的回調函數,用於處理發送到窗口的消息。

示例中的代碼:

        /// <summary>
        ///  重寫接收窗口的消息的回調函數
        /// </summary>
        /// <param name="m"></param>
        protected override void WndProc(ref Message m)
        {
            if (m.Msg == GetThreadCallbackMessage())
            {
                var threadCallbackList = GetThreadCallbackList();
                PrintInfo($"觸發WndProc:msg={m.Msg},threadCallbackList.Count={threadCallbackList.Count}");
                base.WndProc(ref m);
            }
            else
            {
                base.WndProc(ref m);
            }
        }

		/// <summary>
        /// 獲取需要在Ui線程執行的回調委托隊列
        /// </summary>
        /// <returns></returns>
        private System.Collections.Queue GetThreadCallbackList()
        {
            var threadCallbackListFiled = typeof(Control).GetField("_threadCallbackList", BindingFlags.NonPublic | BindingFlags.Instance);
            return (System.Collections.Queue)threadCallbackListFiled.GetValue(this);
        }

        private static int _threadCallbackMessage = 0;

        /// <summary>
        /// 獲取觸發回調委托的窗口消息標識
        /// </summary>
        /// <returns></returns>
        private int GetThreadCallbackMessage()
        {
            if (_threadCallbackMessage == 0)
            {
                var threadCallbackMessageFiled = typeof(Control).GetField("s_threadCallbackMessage", BindingFlags.NonPublic | BindingFlags.Static);
                _threadCallbackMessage = Convert.ToInt32(threadCallbackMessageFiled.GetValue(null));
            }
            return _threadCallbackMessage;
        }

WndProc源碼:

WndProc接收到s_threadCallbackMessage消息觸發執行隊列_threadCallbackList的消息。

//源碼地址:
//https://github.com/dotnet/winforms/blob/release/5.0/src/System.Windows.Forms/src/System/Windows/Forms/Control.cs#L12681
		/// <summary>
        ///  Base wndProc. All messages are sent to wndProc after getting filtered
        ///  through the preProcessMessage function. Inheriting controls should
        ///  call base.wndProc for any messages that they don't handle.
        /// </summary>
        protected virtual void WndProc(ref Message m)
        {
            // 此處省略代碼未知行
            // If you add any new messages below (or change the message handling code for any messages)
            // please make sure that you also modify AxHost.WndProc to do the right thing and intercept
            // messages which the Ocx would own before passing them onto Control.WndProc.
            switch ((User32.WM)m.Msg)
            {
                // 此處省略代碼未知行
                default:
                    // If we received a thread execute message, then execute it.
                    if (m.Msg == (int)s_threadCallbackMessage && m.Msg != 0)
                    {
                        InvokeMarshaledCallbacks();
                        return;
                    }
                    break;
                 // 此處省略代碼未知行
            }
            // 此處省略代碼未知行
        }

		/// <summary>
        ///  Called on the control's owning thread to perform the actual callback.
        ///  This empties this control's callback queue, propagating any exceptions
        ///  back as needed.
        /// </summary>
        private void InvokeMarshaledCallbacks()
        {
            ThreadMethodEntry current = null;
            lock (_threadCallbackList)
            {
                if (_threadCallbackList.Count > 0)
                {
                    current = (ThreadMethodEntry)_threadCallbackList.Dequeue();
                }
            }

            // Now invoke on all the queued items.
            while (current != null)
            {
                if (current._method != null)
                {
                    try
                    {
                        // If we are running under the debugger, don't wrap asynchronous
                        // calls in a try catch.  It is much better to throw here than pop up
                        // a thread exception dialog below.
                        if (NativeWindow.WndProcShouldBeDebuggable && !current._synchronous)
                        {
                            InvokeMarshaledCallback(current);
                        }
                        else
                        {
                            try
                            {
                                InvokeMarshaledCallback(current);
                            }
                            catch (Exception t)
                            {
                                current._exception = t.GetBaseException();
                            }
                        }
                    }
                    finally
                    {
                        current.Complete();
                        if (!NativeWindow.WndProcShouldBeDebuggable &&
                            current._exception != null && !current._synchronous)
                        {
                            Application.OnThreadException(current._exception);
                        }
                    }
                }

                lock (_threadCallbackList)
                {
                    if (_threadCallbackList.Count > 0)
                    {
                        current = (ThreadMethodEntry)_threadCallbackList.Dequeue();
                    }
                    else
                    {
                        current = null;
                    }
                }
            }
        }

3、async deadlock代碼解析

死鎖代碼示例、反編譯代碼查看

打開鏈接查看反編譯代碼: https://sharplab.io/#v2:CYLg1APgAgTAjAWAFBQAwAIpwKwG5nJQDMmM6AYgPYBOAtnOgN7LqvotucAO1AlgG4BDAC4BTTABZ0AI2EA7APrVRggDYARFcAUBhVbwDGAawAUlaQCtRB4egDOoucFHUANOgCi/R8ICC1AHM7dFEASg5OVmYkSNihanRlOwBXVVsAXnRNQWAoADYTUIA6ACVRFLT8GNj0AF8CatiAehb0AB4U2lpBagBPAD4IyJam9kbh1ramzu6+wfHOEfaoAHY7fqnV9aHuPiExTAAOTDy2rFR+rK18wp22aJrIqABOE6LNVUFek2xUVFCqo9OKtMK98kVyNRKLQyhVhCYAEQACVEqlUlHQAHcaKpgAiAXdWPUkLUgA==

https://sharplab.io/很不錯的一個網站,可在線查看C#編譯后代碼、中間語言代碼。

image-20211016203612036

image-20211016204719967

執行過程:

可以看到9和10都在UI線程執行,但是UI線程已經被10的執行流程占用,導致9無法將任務設置為完成狀態,陷入死鎖。

image-20211017114845175

編譯后的DeadTask函數

由於編譯的代碼不清晰,我進行重命名和代碼精簡。

可以看到DeadTask返回DeadTaskAsyncStateMachine.Task,看來要整明白AsyncTaskMethodBuilder執行過程,才能清楚來龍去脈了。

	    private Task<string> DeadTask()
        {
            DeadTaskAsyncStateMachine stateMachine = new DeadTaskAsyncStateMachine();
            stateMachine.tBuilder = AsyncTaskMethodBuilder<string>.Create();
            stateMachine.form1 = this;
            stateMachine.state1 = -1;
            stateMachine.tBuilder.Start(ref stateMachine);
            return stateMachine.tBuilder.Task;
        }

編譯生成的DeadTaskAsyncStateMachine類

由於編譯的代碼不清晰,我進行重命名。

	  private sealed class DeadTaskAsyncStateMachine : IAsyncStateMachine
        {
            public int state1;
            public AsyncTaskMethodBuilder<string> tBuilder;
            public Form1 form1;
            private string taskResult;
            private TaskAwaiter delay500Awaiter;
            private TaskAwaiter<string> helloWorldAwaiter;
            private void MoveNext()
            {
                int num = state1;
                string finalResult;
                try
                {
                    TaskAwaiter<string> awaiter;
                    TaskAwaiter awaiter2;
                    if (num != 0)
                    {
                        if (num == 1)
                        {
                            awaiter = helloWorldAwaiter;
                            helloWorldAwaiter = default(TaskAwaiter<string>);
                            num = (state1 = -1);
                            goto finalTag;
                        }
                        awaiter2 = Task.Delay(500).GetAwaiter();
                        if (!awaiter2.IsCompleted)
                        {
                            num = (state1 = 0);
                            delay500Awaiter = awaiter2;
                            DeadTaskAsyncStateMachine stateMachine = this;
                            tBuilder.AwaitUnsafeOnCompleted(ref awaiter2, ref stateMachine);
                            return;
                        }
                    }
                    else
                    {
                        awaiter2 = delay500Awaiter;
                        delay500Awaiter = default(TaskAwaiter);
                        num = (state1 = -1);
                    }
                    awaiter2.GetResult();
                    awaiter = Task.FromResult("Hello world").GetAwaiter();
                    // 因為awaiter.IsCompleted == true,部分代碼進行移除
                    goto finalTag;
                finalTag:
                    finalResult = awaiter.GetResult();
                }
                catch (Exception exception)
                {
                    state1 = -2;
                    tBuilder.SetException(exception);
                    return;
                }
                state1 = -2;
                tBuilder.SetResult(finalResult); // 設置結果,同時設置任務為完成狀態
            }

            void IAsyncStateMachine.MoveNext()
            {
                //ILSpy generated this explicit interface implementation from .override directive in MoveNext
                this.MoveNext(); // 執行狀態機當前任務,初始狀態state1 = -1
            }

            private void SetStateMachine(IAsyncStateMachine stateMachine)
            {
            }

            void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine)
            {
                //ILSpy generated this explicit interface implementation from .override directive in SetStateMachine
                this.SetStateMachine(stateMachine);
            }
        }

關鍵代碼:

MoveNext源碼

image-20211016224555545

AsyncTaskMethodBuilder .AwaitUnsafeOnCompleted源碼:

可以看到將會調用函數TaskAwaiter.UnsafeOnCompletedInternal(ta.m_task, box, continueOnCapturedContext: true)。


public void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine =>
            AsyncMethodBuilderCore.Start(ref stateMachine);

//源碼地址:
//https://github.com/dotnet/runtime/blob/release/5.0/src/libraries/System.Private.CoreLib/src/System/Runtime/CompilerServices/AsyncTaskMethodBuilderT.cs#L101
        internal static void AwaitUnsafeOnCompleted<TAwaiter>(
            ref TAwaiter awaiter, IAsyncStateMachineBox box)
            where TAwaiter : ICriticalNotifyCompletion
        {
			// 執行位置,默認continueOnCapturedContext = true即為繼續在上下文執行
            // 最終SynchronizationContext.Current.Post觸發執行stateMachine.MoveNext
            if ((null != (object?)default(TAwaiter)) && (awaiter is ITaskAwaiter))
            {
                ref TaskAwaiter ta = ref Unsafe.As<TAwaiter, TaskAwaiter>(ref awaiter); // relies on TaskAwaiter/TaskAwaiter<T> having the same layout
                TaskAwaiter.UnsafeOnCompletedInternal(ta.m_task, box, continueOnCapturedContext: true);
            }
          // ConfigureAwait(false).GetAwaiter()返回類型為IConfiguredTaskAwaiter,可以避免死鎖
            else if ((null != (object?)default(TAwaiter)) && (awaiter is IConfiguredTaskAwaiter))
            {
                ref ConfiguredTaskAwaitable.ConfiguredTaskAwaiter ta = ref Unsafe.As<TAwaiter, ConfiguredTaskAwaitable.ConfiguredTaskAwaiter>(ref awaiter);
                TaskAwaiter.UnsafeOnCompletedInternal(ta.m_task, box, ta.m_continueOnCapturedContext);
            }
           // 省略代碼未知行
        }
AsyncMethodBuilderCore.Start源碼:
//源碼地址
//https://github.com/dotnet/runtime/blob/release/5.0/src/libraries/System.Private.CoreLib/src/System/Runtime/CompilerServices/AsyncMethodBuilderCore.cs#L21
        public static void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine
        {
            if (stateMachine == null) // TStateMachines are generally non-nullable value types, so this check will be elided
            {
                ThrowHelper.ThrowArgumentNullException(ExceptionArgument.stateMachine);
            }

            // enregistrer variables with 0 post-fix so they can be used in registers without EH forcing them to stack
            // Capture references to Thread Contexts
            Thread currentThread0 = Thread.CurrentThread;
            Thread currentThread = currentThread0;
            ExecutionContext? previousExecutionCtx0 = currentThread0._executionContext;
            ExecutionContext? previousExecutionCtx = previousExecutionCtx0;
            SynchronizationContext? previousSyncCtx = currentThread0._synchronizationContext;

            try
            {
                // 執行DeadTaskAsyncStateMachine.MoveNext()
                stateMachine.MoveNext();
            }
            finally
            {
                // Re-enregistrer variables post EH with 1 post-fix so they can be used in registers rather than from stack
                SynchronizationContext? previousSyncCtx1 = previousSyncCtx;
                Thread currentThread1 = currentThread;
                // The common case is that these have not changed, so avoid the cost of a write barrier if not needed.
                if (previousSyncCtx1 != currentThread1._synchronizationContext)
                {
                    // Restore changed SynchronizationContext back to previous
                    currentThread1._synchronizationContext = previousSyncCtx1;
                }

                ExecutionContext? previousExecutionCtx1 = previousExecutionCtx;
                ExecutionContext? currentExecutionCtx1 = currentThread1._executionContext;
                if (previousExecutionCtx1 != currentExecutionCtx1)
                {
                    ExecutionContext.RestoreChangedContextToThread(currentThread1, previousExecutionCtx1, currentExecutionCtx1);
                }
            }
        }
TaskAwaiter.UnsafeOnCompletedInternal源碼:
// 源碼地址
//https://github.com/dotnet/runtime/blob/release/5.0/src/libraries/System.Private.CoreLib/src/System/Runtime/CompilerServices/TaskAwaiter.cs#L220
	internal static void UnsafeOnCompletedInternal(Task task, IAsyncStateMachineBox stateMachineBox, bool continueOnCapturedContext)
        {
            task.UnsafeSetContinuationForAwait(stateMachineBox, continueOnCapturedContext);
        }
Task.UnsafeSetContinuationForAwait源碼:
// 源碼地址
//https://github.com/dotnet/runtime/blob/release/5.0/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs#L2513

        internal void UnsafeSetContinuationForAwait(IAsyncStateMachineBox stateMachineBox, bool continueOnCapturedContext)
        {
            // continueOnCapturedContext == true,走這個分支
            if (continueOnCapturedContext)
            {
                SynchronizationContext? syncCtx = SynchronizationContext.Current;
                if (syncCtx != null && syncCtx.GetType() != typeof(SynchronizationContext))
                {
                    var tc = new SynchronizationContextAwaitTaskContinuation(syncCtx, stateMachineBox.MoveNextAction, flowExecutionContext: false);
                    // 添加到m_continuationObject,如果添加失敗則代表任務已經完成,tc直接執行
                    if (!AddTaskContinuation(tc, addBeforeOthers: false))
                    {
                        tc.Run(this, canInlineContinuationTask: false);
                    }
                    return;
                }
                else
                {
                    TaskScheduler? scheduler = TaskScheduler.InternalCurrent;
                    if (scheduler != null && scheduler != TaskScheduler.Default)
                    {
                        var tc = new TaskSchedulerAwaitTaskContinuation(scheduler, stateMachineBox.MoveNextAction, flowExecutionContext: false);
                        if (!AddTaskContinuation(tc, addBeforeOthers: false))
                        {
                            tc.Run(this, canInlineContinuationTask: false);
                        }
                        return;
                    }
                }
            }

            // Otherwise, add the state machine box directly as the continuation.
            // If we're unable to because the task has already completed, queue it.
            if (!AddTaskContinuation(stateMachineBox, addBeforeOthers: false))
            {
                ThreadPool.UnsafeQueueUserWorkItemInternal(stateMachineBox, preferLocal: true);
            }
        }
SynchronizationContextAwaitTaskContinuation源碼:
// 源碼地址
//https://github.com/dotnet/runtime/blob/release/5.0/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/TaskContinuation.cs#L364
/// <summary>Task continuation for awaiting with a current synchronization context.</summary>
    internal sealed class SynchronizationContextAwaitTaskContinuation : AwaitTaskContinuation
    {
        /// <summary>SendOrPostCallback delegate to invoke the action.</summary>
        private static readonly SendOrPostCallback s_postCallback = static state =>
        {
            Debug.Assert(state is Action);
            ((Action)state)();
        };
        /// <summary>Cached delegate for PostAction</summary>
        private static ContextCallback? s_postActionCallback;
        /// <summary>The context with which to run the action.</summary>
        private readonly SynchronizationContext m_syncContext;

        internal SynchronizationContextAwaitTaskContinuation(
            SynchronizationContext context, Action action, bool flowExecutionContext) :
            base(action, flowExecutionContext)
        {
            Debug.Assert(context != null);
            m_syncContext = context;
        }

        internal sealed override void Run(Task task, bool canInlineContinuationTask)
        {
            // If we're allowed to inline, run the action on this thread.
            if (canInlineContinuationTask &&
                m_syncContext == SynchronizationContext.Current)
            {
                RunCallback(GetInvokeActionCallback(), m_action, ref Task.t_currentTask);
            }
            // Otherwise, Post the action back to the SynchronizationContext.
            else
            {
                TplEventSource log = TplEventSource.Log;
                if (log.IsEnabled())
                {
                    m_continuationId = Task.NewId();
                    log.AwaitTaskContinuationScheduled((task.ExecutingTaskScheduler ?? TaskScheduler.Default).Id, task.Id, m_continuationId);
                }
                // 執行PostAction
                RunCallback(GetPostActionCallback(), this, ref Task.t_currentTask);
            }
            // Any exceptions will be handled by RunCallback.
        }

        private static void PostAction(object? state)
        {
            Debug.Assert(state is SynchronizationContextAwaitTaskContinuation);
            var c = (SynchronizationContextAwaitTaskContinuation)state;

            TplEventSource log = TplEventSource.Log;
            if (log.IsEnabled() && log.TasksSetActivityIds && c.m_continuationId != 0)
            {
                // 調用Control.BeginInvoke
                c.m_syncContext.Post(s_postCallback, GetActionLogDelegate(c.m_continuationId, c.m_action));
            }
            else
            {
                c.m_syncContext.Post(s_postCallback, c.m_action); // s_postCallback is manually cached, as the compiler won't in a SecurityCritical method
            }
        }

        private static Action GetActionLogDelegate(int continuationId, Action action)
        {
            return () =>
                {
                    Guid activityId = TplEventSource.CreateGuidForTaskID(continuationId);
                    System.Diagnostics.Tracing.EventSource.SetCurrentThreadActivityId(activityId, out Guid savedActivityId);
                    try { action(); }
                    finally { System.Diagnostics.Tracing.EventSource.SetCurrentThreadActivityId(savedActivityId); }
                };
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        private static ContextCallback GetPostActionCallback() => s_postActionCallback ??= PostAction;
    }

Task.Delay實現過程

Task.Delay有多種實現,我精簡后畫了大致實現流程,感興趣的同學可以閱讀一下源碼,部分在coreclr實現。

QueueUseAPC: https://docs.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-queueuserapc

SleepEx: https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-sleepex

image-20211017154545021

為什么IO型、延時任務要采用async

原因: 線程池默認的最小工作線程數量為CPU核心數,如果不采用async會導致線程同步阻塞,需要線程池創建更多的工作線程來應對的並發。當線程池工作線程的數量大於最小工作線程數量時,工作線程的創建速度受限於最小工作線程數量,每秒不超過2個,這時候程序會出現假死的情況。線程池默認設置最小工作線程數量為CPU核心數,主要是希望使用async通過多路復用來提升程序的並發性能。如果舊程序不好改造,快速解決的方法就是通過ThreadPool.SetMinThreads設置最小工作線程數量,放開工作線程創建速度限制,以多線程模型應對更多的並發,雖然系統性能差一些,至少不會假死。

小實驗:

Demo源碼地址: https://gitee.com/RiverBied/async-demo

啟動Web.Api站點,運行WinForms.App進行測試,不過不要在調試狀態運行

image-20211017162854078

HttpClient.GetStringAsync執行過程

可以看到在Windows平台是通過IOCP觸發回調事件。在Unix平台是在SocketAsyncEngine類創建while(true)循環的執行線程,再通過Wait epoll或kqueue獲取IO事件,最后觸發回調事件。IOPC為異步非阻塞IO、epoll為同步非阻塞IO,IOCP、epoll會涉及IO模型、IO多路復用等知識,網上介紹較多,可以自行查閱。同時需要注意AwaitableSocketAsyncEventArgs既繼承SocketAsyncEventArgs類也實現IValueTaskSource接口。

HttpClient.GetStringAsync請求:

image-20211017200529706

NetworkStream.WriteAsync在Windows平台實現:

image-20211017200552834

NetworkStream.WriteAsync在Unix平台實現:

image-20211017200610472

async await推薦實踐方法

  • async/await適用於IO型(文件讀取、網絡通信)、延時型任務。對於計算型任務可以使用Task.Factory創建LongRunning任務,該任務會獨立新建一個后台線程進行處理。

  • 關於MySql驅動組件: 建議采用MySqlConnector組件。因為MySqlConnector組件支持異步IO,MySql.Data組件不支持真實的異步IO。

  • 如果條件允許、盡量使用ConfigureAwait(false)。如果不設置在Winform場景下會調用SynchronizationContext.Post通過UI線程執行回調函數,同步方法調用異步方式時會出現死鎖。

  • Task方法替代清單:

同步方法 異步方式 描述信息
task.Wait await task 等待一個任務執行完成
task.Result await task 獲取任務返回結果
Task.WaitAny await Task.WhenAny 等待其中一個任務執行完成,繼續執行
Task.WaitAll await Task.WhenAll 等待所有任務執行完成,繼續執行
Thread.Sleep await Task.Delay 延時幾秒繼續執行

Demo代碼地址 : https://gitee.com/RiverBied/async-demo


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM