AsyncLocal
的實現很簡單,將AsyncLocal
實例和當前線程的值以鍵值對的形式保存在Thread.CurrentThread.ExecutionContext.m_localValues.中。由於使用[ThreadStatic] 修飾了 Thread.CurrentThread屬性對應的字段,所以實現了多個線程之間各自維護不同的一份數據。同時,在每一次修改AsyncLocal
的時候,都新建了ExecutionContext和IAsyncLocalValueMap對象並賦值給當前的線程。
以下為AsyncLocal
的測試代碼
class AsyncLocalTests : Singleton<AsyncLocalTests>,ITestMethod
{
private readonly AsyncLocal<int> asyncLocalVariable = new AsyncLocal<int>();
public async Task MethodAsync()
{
asyncLocalVariable.Value = 88;
await Task.Run(() =>
{
Console.WriteLine($"進入 Task,值:{asyncLocalVariable.Value};線程Id:{Thread.CurrentThread.ManagedThreadId};ExecutionContext:Hashcode:{Thread.CurrentThread.ExecutionContext.GetHashCode()}");
asyncLocalVariable.Value = 888;
});
Console.WriteLine($"await Task 后,值:{asyncLocalVariable.Value};線程Id:{Thread.CurrentThread.ManagedThreadId};ExecutionContext:Hashcode:{Thread.CurrentThread.ExecutionContext.GetHashCode()}");
}
public void RunTest()
{
asyncLocalVariable.Value = 1;
Console.WriteLine($"初始值:{asyncLocalVariable.Value};線程Id:{Thread.CurrentThread.ManagedThreadId};ExecutionContext:Hashcode:{Thread.CurrentThread.ExecutionContext.GetHashCode()}");
MethodAsync();
Thread.Sleep(1000);
Console.WriteLine($"async方法后,值:{asyncLocalVariable.Value};線程Id:{Thread.CurrentThread.ManagedThreadId};ExecutionContext:Hashcode:{Thread.CurrentThread.ExecutionContext.GetHashCode()}");
Console.ReadKey();
}
}
測試結果

從上面的測試結果我們看出:
- 在
MethodAsync()異步方法前后,AsyncLocal的值相同.Value - 在
await MethodAsync()異步方法前后,AsyncLocal的值相同.Value - 在
await Task.Run()前后代碼塊中,AsyncLocal的值相同.Value
由於AsyncLocal
是從 Thread.CurrentThread.ExecutionContext 獲取實際的值,那么理解ExecutionContext在 async、Task、Thread 的中流動就十分重要。先說結論,並簡單描述一下原因:
1、進入Task.Run()前后,ExecutionContext相同(處於不同線程)
原因:這是因為 Task.Run()和Thread.Start() 會捕獲當前線程的 ExecutionContext 傳遞給工作線程,並且在工作線程修改 AsyncLocal
的值, 不會影響原線程的ExecutionContext 。因為每次修改 AsyncLocal
的值,都會新建 ExecutionContext 實例並保存到工作線程
2、 MethodAsync() 前后代碼塊的 ExecutionContext 相同(不使用await)
原因:在狀態機第一次執行前后會備份、恢復ExecutionContext(線程並沒有進行切換)
3、await Task.Run() 前后代碼塊的 ExecutionContext 相同(處於不同線程)
原因:我們知道await Task.Run()肯定位於一個異步方法中,該異步方法會被編譯成一個狀態機,通過狀態的切換,將await前后的代碼分成了兩步來執行。第一次執行由當前線程執行,在開啟新 Task 后、當前線程返回之前,會保存當前線程的 ExecutionContext,供狀態機第二次執行使用(工作線程)。從第一點我們知道新建Task實例的時候會捕獲一次ExecutionContext給工作線程,碰到await返回之前會捕獲一次ExecutionContext給狀態機,這兩次捕獲的實際上是同一個對象。
4、 await MethodAsync() 前后代碼塊的 ExecutionContext 相同(處於不同線程)
原因:在狀態機第一次執行(當前線程)的時候,會捕獲當前線程的 ExecutionContext,供狀態機第二次執行使用(工作線程)。
2. async關鍵字對ExecutionContext的影響
async關鍵字實際上是編譯器的語法糖,可以通過Dnspy 反編譯查看去除語法糖的原始代碼。
Dnspy配置如下,去除勾選"反編譯異步方法(async/await)"

原代碼:
public async Task MethodAsyncWithAwait()
{
asyncLocalVariable.Value = 88;
await Task.Run(() =>
{
asyncLocalVariable.Value = 888;
});
asyncLocalVariable.Value = 8888;
}
去除async/await 語法糖代碼:
異步方法代碼:
可以看到編譯器生成了一個實現 IAsyncStateMachine 接口的異步狀態機,並生成了一個私有方法,保存了Task.Run()中的的代碼塊。異步方法中實際上做了以下四個步驟:
- 實例化異步狀態機, 將狀態置為 -1
- 創建
AsyncTaskMethodBuilder - 通過
AsyncTaskMethodBuilder.Sratr(ref IAsyncStateMachine)啟動狀態機 - 返回
Task
public Task MethodAsyncWithAwait()
{
AsyncLocalTests.<MethodAsyncWithAwait>d__1 <MethodAsyncWithAwait>d__ = new AsyncLocalTests.<MethodAsyncWithAwait>d__1();
<MethodAsyncWithAwait>d__.<>4__this = this;
<MethodAsyncWithAwait>d__.<>t__builder = AsyncTaskMethodBuilder.Create();
<MethodAsyncWithAwait>d__.<>1__state = -1;
AsyncTaskMethodBuilder <>t__builder = <MethodAsyncWithAwait>d__.<>t__builder;
<>t__builder.Start<AsyncLocalTests.<MethodAsyncWithAwait>d__1>(ref <MethodAsyncWithAwait>d__);
return <MethodAsyncWithAwait>d__.<>t__builder.Task;
}
狀態機啟動代碼:
可以看到在執行 stateMachine.MoveNext() 之前備份了當前線程的 _executionContext 和 _synchronizationContext,並且在 finally 代碼塊中恢復了備份的數據。
這樣也就解釋了:在不使用 await 等待異步方法的情況下,雖然在原線程修改了AsyncLocal
的值,但是離開async方法后,我們獲取的還是原來的值。值得注意的是,這里的備份恢復針對的都是當前線程,而不涉及到工作線程。
public void Start<[Nullable(0)] TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine
{
AsyncMethodBuilderCore.Start<TStateMachine>(ref stateMachine);
}
internal static class AsyncMethodBuilderCore
{
[DebuggerStepThrough]
public static void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine
{
if (stateMachine == null)
{
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.stateMachine);
}
Thread currentThread = Thread.CurrentThread;
Thread thread = currentThread;
ExecutionContext executionContext = currentThread._executionContext;
ExecutionContext executionContext2 = executionContext;
SynchronizationContext synchronizationContext = currentThread._synchronizationContext;
try
{
stateMachine.MoveNext();
}
finally
{
SynchronizationContext synchronizationContext2 = synchronizationContext;
Thread thread2 = thread;
if (synchronizationContext2 != thread2._synchronizationContext)
{
thread2._synchronizationContext = synchronizationContext2;
}
ExecutionContext executionContext3 = executionContext2;
ExecutionContext executionContext4 = thread2._executionContext;
if (executionContext3 != executionContext4)
{
ExecutionContext.RestoreChangedContextToThread(thread2, executionContext3, executionContext4);
}
}
}
狀態機代碼:
實際上編譯器將標記為 async 的方法分成了兩部分,一部分是 await 之前的代碼(包括新建並啟動啟動Task部分),另一部分是 await之后的代碼。通過狀態的改變,這兩部分代碼分兩次執行。如果沒有使用await修飾異步方法,那么該狀態機沒有 else代碼塊, 只會執行一次stateMachine.MoveNext()。可以看到在第一次執行stateMachine.MoveNext() 之后,當前線程就直接返回了,然后一層層的返回到最外層。這也是為什么說碰到await之后,當前線程就直接返回,當然最內層的返回是在開啟新Task之后。
[CompilerGenerated]
private void <MethodAsyncWithAwait>b__1_0()
{
this.asyncLocalVariable.Value = 888;
}
[CompilerGenerated]
private sealed class <MethodAsyncWithAwait>d__1 : IAsyncStateMachine
{
void IAsyncStateMachine.MoveNext()
{
int num = this.<>1__state;
try
{
TaskAwaiter awaiter;
if (num != 0)
{
this.<>4__this.asyncLocalVariable.Value = 88;
awaiter = Task.Run(new Action(this.<>4__this.<MethodAsyncWithAwait>b__1_0)).GetAwaiter();
if (!awaiter.IsCompleted)
{
this.<>1__state = 0;
this.<>u__1 = awaiter;
AsyncLocalTests.<MethodAsyncWithAwait>d__1 <MethodAsyncWithAwait>d__ = this;
this.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, AsyncLocalTests.<MethodAsyncWithAwait>d__1>(ref awaiter, ref <MethodAsyncWithAwait>d__);
return;
}
}
else
{
awaiter = this.<>u__1;
this.<>u__1 = default(TaskAwaiter);
this.<>1__state = -1;
}
awaiter.GetResult();
this.<>4__this.asyncLocalVariable.Value = 8888;
}
catch (Exception exception)
{
this.<>1__state = -2;
this.<>t__builder.SetException(exception);
return;
}
this.<>1__state = -2;
this.<>t__builder.SetResult();
}
[DebuggerHidden]
void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine)
{
}
public int <>1__state;
public AsyncTaskMethodBuilder <>t__builder;
public AsyncLocalTests <>4__this;
private TaskAwaiter <>u__1;
}
我們可以看到在第一次執行stateMachine.MoveNext()的時候,會通過當前線程執行 await 之前的代碼塊,並通過Task.Run()啟用工作線程去完成任務。IAsyncStateMachine.MoveNext() 里面有三句代碼比較重要,這里先大概描述一下作用:
1、 Task.Run():
新建Task實例、捕獲當前線程的ExecutionContext 保存到Task實例中、啟動新任務
2、 AsyncTaskMethodBuilder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine):
將狀態機和當前線程的上下文包裝成 AsyncStateMachineBox:Task 對象,保存到在 Task(第一步新建的 字段中,最后將其傳遞到 Task實例).m_continuationObjectstateMachine.AsyncTaskMethodBuilder.Task屬性中, 這樣外層狀態機可以通過MethodAsync().GetAwaiter().m_task獲取到內層狀態機的AsyncStateMachineBox:Task 對象,同樣的外層狀態機再次執行本步驟,將自身的 AsyncStateMachineBox:Task 對象賦值給內層 AsyncStateMachineBox:Task 對象的 m_continuationObject字段,這樣的話,就構建了一個單向鏈表,該鏈表保存了每一層異步方法的stateMachine 和 ExecutionContext。
3、 this.<>t__builder.SetResult():
設置異步方法的結果,並檢查 Task.m_continuationObject 是否為空,不為空的情況下,執行外層狀態機的第二次 stateMachine.MoveNext()。最內層 Task.m_continuationObject的執行會在Task完成之后調用,接下來通過SetResult()一層層調用了外層狀態機的第二次 stateMachine.MoveNext()
3. AwaitUnsafeOnCompleted() 代碼分析
以下只放出了簡化的代碼。這里首先構建 IAsyncStateMachineBox實例,並將其賦值給 m_task 供外層狀態機使用。IAsyncStateMachineBox實例保存了本層的狀態機,並捕獲了當前線程的ExecutionContext。awaiter.m_task 是調用內層異步方法返回的 Task實例。最后在 TaskAwaiter.UnsafeOnCompletedInternal() 方法中,將構建的IAsyncStateMachineBox實例保存到 awaiter(內層).m_task.m_continuationObject字段中,使得內層狀態機指向本層狀態機。因為每一層狀態機都會調用AwaitUnsafeOnCompleted 方法,所以一層層構建了 await 后的所有回調,並且每一層回調的 ExecutionContext 都不同。
public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine)
where TAwaiter : ICriticalNotifyCompletion where TStateMachine : IAsyncStateMachine
{
IAsyncStateMachineBox box = GetStateMachineBox(ref stateMachine);
if ((null != (object)default(TAwaiter)!) && (awaiter is ITaskAwaiter))
{
ref TaskAwaiter ta = ref Unsafe.As<TAwaiter, TaskAwaiter>(ref awaiter);
TaskAwaiter.UnsafeOnCompletedInternal(ta.m_task, box, continueOnCapturedContext: true);
}
}
private IAsyncStateMachineBox GetStateMachineBox<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine
{
ExecutionContext? currentContext = ExecutionContext.Capture();
AsyncStateMachineBox<TStateMachine> box = AsyncMethodBuilderCore.TrackAsyncMethodCompletion ?
CreateDebugFinalizableAsyncStateMachineBox<TStateMachine>() :
new AsyncStateMachineBox<TStateMachine>();
m_task = box;
box.StateMachine = stateMachine;
box.Context = currentContext;
return box;
}
4. SetResult() 代碼分析
該方法最后調用了 Task
方法,讀取了Task.m_continuationObject 來獲取外一層的回調(包括狀態機、ExecutionContext),並通過FinishContinuations()執行外層狀態機的第二次執行。
internal bool TrySetResult([AllowNull] TResult result)
{
bool result2 = false;
if (base.AtomicStateUpdate(67108864, 90177536))
{
this.m_result = result;
Interlocked.Exchange(ref this.m_stateFlags, this.m_stateFlags | 16777216);
Task.ContingentProperties contingentProperties = this.m_contingentProperties;
if (contingentProperties != null)
{
base.NotifyParentIfPotentiallyAttachedTask();
contingentProperties.SetCompleted();
}
base.FinishContinuations();
result2 = true;
}
return result2;
}
FinishContinuations() 方法接下來的調用在 Task.Run() 部分會有介紹。
internal void FinishContinuations()
{
object obj = Interlocked.Exchange(ref this.m_continuationObject, Task.s_taskCompletionSentinel);
if (obj != null)
{
this.RunContinuations(obj);
}
}
5. Task.Run() 代碼分析
實際上在Task.Run()內部也是先新建一個Task 實例,然后通過Task.ScheduleAndStart()方法來調度並啟動任務。兩者的區別在於傳入的Options 和scheduler 是不相同的。
var task1 = Task.Run(() => { });
//默認配置無法更改: InternalTaskOptions.QueuedByRuntime, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default
//t.ScheduleAndStart(false);
var task2 = new Task(() => { }, TaskCreationOptions.LongRunning);
//默認配置可以修改: InternalTaskOptions.None, TaskCreationOptions.None, scheduler:null
task2.Start();
//可以傳入scheduler
//默認使用TaskScheduler.Current: 先取[ThreadStatic]Task.InternalCurrent,如果為空取 TaskScheduler.Default
//t.ScheduleAndStart(true);
通過在Task的構造函數中調用ExecutionContext.Capture() 方法來保存當前線程的ExecutionContext到Task實例中,這樣的話,只要將到Task實例作為參數傳入到工作線程中,工作線程就可以獲取到ExecutionContext
internal Task(Delegate action, object state, Task parent, CancellationToken cancellationToken, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, TaskScheduler scheduler)
{
if (action == null)
{
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.action);
}
if (parent != null && (creationOptions & TaskCreationOptions.AttachedToParent) != TaskCreationOptions.None)
{
this.EnsureContingentPropertiesInitializedUnsafe().m_parent = parent;
}
this.TaskConstructorCore(action, state, cancellationToken, creationOptions, internalOptions, scheduler);
this.CapturedContext = ExecutionContext.Capture();
}
Task.ScheduleAndStart()方法:
Task的調度分兩種情況:1、配置了TaskCreationOptions.LongRunning 的Task實例直接新建一個后台 Thread,並將Task實例作為啟動參數來啟動工作線程;2、對於沒有配置TaskCreationOptions.LongRunning 的Task實例,將其加入ThreadPool的線程池,由線程池來調度運行
internal sealed class ThreadPoolTaskScheduler : TaskScheduler
{
protected internal override void QueueTask(Task task)
{
TaskCreationOptions options = task.Options;
if ((options & TaskCreationOptions.LongRunning) != 0)
{
// Run LongRunning tasks on their own dedicated thread.
Thread thread = new Thread(s_longRunningThreadWork);
thread.IsBackground = true; // Keep this thread from blocking process shutdown
thread.Start(task);
}
else
{
// Normal handling for non-LongRunning tasks.
bool preferLocal = ((options & TaskCreationOptions.PreferFairness) == 0);
ThreadPool.UnsafeQueueUserWorkItemInternal(task, preferLocal);
}
}
}
雖然已經在Task的構造函數中,捕獲了ExecutionContext,但是對於直接新建的Thread實例,啟動的時候同樣也需要捕獲當前線程的ExecutionContext
public void Start()
{
this.StartupSetApartmentStateInternal();
if (this._delegate != null)
{
ThreadHelper threadHelper = (ThreadHelper)this._delegate.Target;
ExecutionContext executionContextHelper = ExecutionContext.Capture();
threadHelper.SetExecutionContextHelper(executionContextHelper);
}
this.StartInternal();
}
線程池調度Task、IThreadPoolWorkItem邏輯:
從線程池取出工作線程,工作線程調用Dispatch()方法,對於IThreadPoolWorkItem,直接執行IThreadPoolWorkItem.Execute() 方法,所以線程池處理IThreadPoolWorkItem是不涉及到上下文切換的。對於 Task ,將ExecutionContext賦值給工作線程,調用委托,然后清除工作線程的上下文,最后調用Finish(true) 來執行任務完成的回調方法。調用鏈路很長,這里直接跳到RunContinuations()方法。
調用鏈路:
// 最內層 async 狀態機 執行await Task.Run()之后的代碼塊
//=>Task.Finish(true);
//=>FinishStageTwo();
//===>FinishStageThree();
//=====>FinishContinuations();
//=======>RunContinuations(continuationObject);
//=========>AwaitTaskContinuation.RunOrScheduleAction(asyncStateMachineBox, flag); 狀態機的回調執行這個
//==========>box.ExecuteFromThreadPool(threadPoolThread); 或者 box.MoveNext();
我們知道,異步方法的最內層肯定有一個 await Task。 正是Task.Finish(true)這個方法調用了最內層狀態機,去執行第二次stateMachine.MoveNext()方法,並且在MoveNext()方法中都會調用SetResult() 方法,從而觸發外層狀態機的第二次stateMachine.MoveNext()執行,就這樣一層層的調用完成了所有的層次的回調。可以看到,工作線程在執行 await Task.Run()/MethodAsnc() 后代碼塊時,傳入的是在 AwaitUnsafeOnCompleted() 方法中捕獲的 ExecutionContext。
s_callback字段保存了狀態機的MoveNext()方法。
private class AsyncStateMachineBox<TStateMachine> :Task<TResult>,IAsyncStateMachineBox where TStateMachine : IAsyncStateMachine
{
private static readonly ContextCallback s_callback = ExecutionContextCallback;
private static void ExecutionContextCallback(object? s)
{
Unsafe.As<AsyncStateMachineBox<TStateMachine>>(s).StateMachine!.MoveNext();
}
public TStateMachine StateMachine = default;
public ExecutionContext? Context;
internal sealed override void ExecuteFromThreadPool(Thread threadPoolThread) => MoveNext(threadPoolThread);
internal sealed override void ExecuteFromThreadPool(Thread threadPoolThread) => MoveNext(threadPoolThread);
public void MoveNext() => MoveNext(threadPoolThread: null);
private void MoveNext(Thread? threadPoolThread)
{
bool loggingOn = AsyncCausalityTracer.LoggingOn;
if (loggingOn)
{
AsyncCausalityTracer.TraceSynchronousWorkStart(this, CausalitySynchronousWork.Execution);
}
ExecutionContext? context = Context;
if (context == null)
{
Debug.Assert(StateMachine != null);
StateMachine.MoveNext();
}
else
{
if (threadPoolThread is null)
{
ExecutionContext.RunInternal(context, s_callback, this);
}
else
{
ExecutionContext.RunFromThreadPoolDispatchLoop(threadPoolThread, context, s_callback, this);
}
}
if (IsCompleted)
{
if (System.Threading.Tasks.Task.s_asyncDebuggingEnabled)
{
System.Threading.Tasks.Task.RemoveFromActiveTasks(this);
}
StateMachine = default;
Context = default;
}
if (loggingOn)
{
AsyncCausalityTracer.TraceSynchronousWorkCompletion(CausalitySynchronousWork.Execution);
}
}
}
最后在 RunInternal 和 RunFromThreadPoolDispatchLoop 中,都會使用在AwaitUnsafeOnCompleted()方法里面捕獲的 ExecutionContext,這也就解釋了為什么在 await Task.Run()/MethodAsync() 前后的代碼塊中,ExecutionContext始終相同。需要注意的一點是,不管在Task 任務執行之后,還是 await 回調執行之后,都會把工作線程的上下文清空。
internal static void RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, object state)
{
if (executionContext != null && !executionContext.m_isDefault)
{
ExecutionContext.RestoreChangedContextToThread(threadPoolThread, executionContext, null);
}
ExceptionDispatchInfo exceptionDispatchInfo = null;
try
{
callback(state);
}
catch (Exception source)
{
exceptionDispatchInfo = ExceptionDispatchInfo.Capture(source);
}
ExecutionContext executionContext2 = threadPoolThread._executionContext;
threadPoolThread._synchronizationContext = null;
if (executionContext2 != null)
{
ExecutionContext.RestoreChangedContextToThread(threadPoolThread, null, executionContext2);
}
if (exceptionDispatchInfo != null)
{
exceptionDispatchInfo.Throw();
}
}
