前言
相信大家在使用C#進行開發的時候,特別是使用異步的場景,多多少少會接觸到CancellationTokenSource。看名字就知道它和取消異步任務相關的,而且一看便知大名鼎鼎的CancellationToken就是它生產出來的。不看不知道,一看嚇一跳。它在取消異步任務、異步通知等方面效果還是不錯的,不僅好用而且夠強大。無論是微軟底層類庫還是開源項目涉及到Task相關的,基本上都能看到它的身影,而微軟近幾年也是很重視框架中的異步操作,特別是在.NET Core上基本上能看到Task的地方就能看到CancellationTokenSource的身影。這次我們抱着學習的態度,來揭開它的神秘面紗。
簡單示例
相信對於CancellationTokenSource基本的使用,許多同學已經非常熟悉了。不過為了能夠讓大家帶入文章的節奏,我們還是打算先展示幾個基礎的操作,讓大家找找感覺,回到那個熟悉的年代。
基礎操作
首先呈現一個最基礎的操作。
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
CancellationToken cancellationToken = cancellationTokenSource.Token;
cancellationToken.Register(() => System.Console.WriteLine("取消了???"));
cancellationToken.Register(() => System.Console.WriteLine("取消了!!!"));
cancellationToken.Register(state => System.Console.WriteLine($"取消了。。。{state}"),"啊啊啊");
System.Console.WriteLine("做了點別的,然后取消了.");
cancellationTokenSource.Cancel();
這個操作是最簡單的操作,我們上面提到過CancellationTokenSource就是用來生產CancellationToken的,還可以說CancellationToken是CancellationTokenSource的表現,這個待會看源碼的時候我們會知道為啥這么說。這里呢我們給CancellationToken
注冊幾個操作,然后使用CancellationTokenSource的Cancel方法
取消操作,這時候控制台就會打印結果如下
做了點別的,然后取消了.
取消了。。。啊啊啊
取消了!!!
取消了???
通過上面簡單的示例,大家應該非常輕松的理解了它的簡單使用。
定時取消
有的時候呢我們可能需要超時操作,比如我不想一直等着,到了一個固定的時間我就要取消操作,這時候我們可以利用CancellationTokenSource的構造函數給定一個限定時間,過了這個時間CancellationTokenSource就會被取消了,操作如下
//設置3000毫秒(即3秒)后取消
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(3000);
CancellationToken cancellationToken = cancellationTokenSource.Token;
cancellationToken.Register(() => System.Console.WriteLine("我被取消了."));
System.Console.WriteLine("先等五秒鍾.");
await Task.Delay(5000);
System.Console.WriteLine("手動取消.")
cancellationTokenSource.Cancel();
然后在控制台打印的結果是這個樣子的,活脫脫的為我們實現了內建的超時操作。
先等五秒鍾.
我被取消了.
手動取消.
上面的寫法是在構造CancellationTokenSource的時候設置超時等待,還有另一種寫法等同於這種寫法,使用的是CancelAfter
方法,具體使用如下
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.Token.Register(() => System.Console.WriteLine("我被取消了."));
//五秒之后取消
cancellationTokenSource.CancelAfter(5000);
System.Console.WriteLine("不會阻塞,我會執行.");
這個操作也是定時取消操作,需要注意的是CancelAfter
方法並不會阻塞執行,所以打印的結果是
不會阻塞,我會執行.
我被取消了.
關聯取消
還有的時候是這樣的場景,就是我們設置一組關聯的CancellationTokenSource,我們期望的是只要這一組里的任意一個CancellationTokenSource被取消了,那么這個被關聯的CancellationTokenSource就會被取消。說得通俗一點就是,我們幾個當中只要一個不在了,那么你也可以不在了,具體的實現方式是這樣的
//聲明幾個CancellationTokenSource
CancellationTokenSource tokenSource = new CancellationTokenSource();
CancellationTokenSource tokenSource2 = new CancellationTokenSource();
CancellationTokenSource tokenSource3 = new CancellationTokenSource();
tokenSource2.Token.Register(() => System.Console.WriteLine("tokenSource2被取消了"));
//創建一個關聯的CancellationTokenSource
CancellationTokenSource tokenSourceNew = CancellationTokenSource.CreateLinkedTokenSource(tokenSource.Token, tokenSource2.Token, tokenSource3.Token);
tokenSourceNew.Token.Register(() => System.Console.WriteLine("tokenSourceNew被取消了"));
//取消tokenSource2
tokenSource2.Cancel();
上述示例中因為tokenSourceNew關聯了tokenSource、tokenSource2、tokenSource3所以只要他們其中有一個被取消那么tokenSourceNew也會被取消,所以上述示例的打印結果是
tokenSourceNew被取消了
tokenSource2被取消了
判斷取消
上面我們使用的方式,都是通過回調的方式得知CancellationTokenSource被取消了,沒辦法通過標識去得知CancellationTokenSource是否可用。不過微軟貼心的為我們提供了IsCancellationRequested
屬性去判斷,需要注意的是它是CancellationToken
的屬性,具體使用方式如下
CancellationTokenSource tokenSource = new CancellationTokenSource();
CancellationToken cancellationToken = tokenSource.Token;
//打印被取消
cancellationToken.Register(() => System.Console.WriteLine("被取消了."));
//模擬傳遞的場景
Task.Run(async ()=> {
while (!cancellationToken.IsCancellationRequested)
{
System.Console.WriteLine("一直在執行...");
await Task.Delay(1000);
}
});
//5s之后取消
tokenSource.CancelAfter(5000);
上述代碼五秒之后CancellationTokenSource被取消,因此CancellationTokenSource的Token也會被取消。反映到IsCancellationRequested上就是值為true說明被取消,為false說明沒被取消,因此控制台輸出的結果是
一直在執行...
一直在執行...
一直在執行...
一直在執行...
一直在執行...
被取消了.
還有另一種方式,也可以主動判斷任務是否被取消,不過這種方式簡單粗暴,直接是拋出了異常。如果是使用異步的方式的話,需要注意的是Task內部異常的捕獲方式,否則對外可能還沒有感知到具體異常的原因,它的使用方式是這樣的,這里為了演示方便我直接換了一種更直接的方式
CancellationTokenSource tokenSource = new CancellationTokenSource();
CancellationToken cancellationToken = tokenSource.Token;
cancellationToken.Register(() => System.Console.WriteLine("被取消了."));
tokenSource.CancelAfter(5000);
while (true)
{
//如果操作被取消則直接拋出異常
cancellationToken.ThrowIfCancellationRequested();
System.Console.WriteLine("一直在執行...");
await Task.Delay(1000);
}
執行五秒之后則直接拋出 System.OperationCanceledException: The operation was canceled.
異常,異步情況下注意異常處理的方式即可。通過上面這些簡單的示例,相信大家對CancellationTokenSource有了一定的認識,大概知道了在什么時候可以使用它,主要是異步取消通知,或者限定時間操作通知等等。CancellationTokenSource是個不錯的神器,使用簡單功能強大。
源碼探究
通過上面的示例,相信大家對CancellationTokenSource有了一個基本的認識,真的是非常強大,而且使用起來也非常的簡單,這也是c#語言的精妙之處,非常實用,讓你用起來的時候非常舒服,有種用着用着就想跪下的沖動。步入正題,接下來讓我們來往深處看看CancellationTokenSource的源碼,看看它的工作機制是啥。本文貼出的源碼是博主精簡過的,畢竟源碼太多不太可能全部粘貼出來,主要是跟着它的思路了解它的工作方式。
構造入手
因為這一次呢CancellationTokenSource
的初始化函數中有一個比較重要的構造函數,那就是可以設置定時超時的操作,那么我們就從它的構造函數入手[點擊查看源碼👈]
//全局狀態
private volatile int _state;
//未取消狀態值
private const int NotCanceledState = 1;
/// <summary>
/// 無參構造初始化狀態
/// </summary>
public CancellationTokenSource() => _state = NotCanceledState;
/// <summary>
/// 定時取消構造
/// </summary>
public CancellationTokenSource(TimeSpan delay)
{
//獲取timespan的毫秒數
long totalMilliseconds = (long)delay.TotalMilliseconds;
if (totalMilliseconds < -1 || totalMilliseconds > int.MaxValue)
{
throw new ArgumentOutOfRangeException(nameof(delay));
}
//調用InitializeWithTimer
InitializeWithTimer((int)totalMilliseconds);
}
public CancellationTokenSource(int millisecondsDelay)
{
if (millisecondsDelay < -1)
{
throw new ArgumentOutOfRangeException(nameof(millisecondsDelay));
}
//調用InitializeWithTimer
InitializeWithTimer(millisecondsDelay);
}
無參構造函數沒啥好說的,就是給全局state狀態初始化NotCanceledState的初始值,也就是初始化狀態。我們比較關注的是可以定時取消的構造函數,雖然是兩個構造函數,但是殊途同歸,本質都是傳遞的毫秒整形參數,而且調用的核心方法都是InitializeWithTimer
,看來是一個定時器操作,這樣不奇怪了,我們看下InitializeWithTimer
方法的實現[點擊查看源碼👈]
//任務完成狀態值
private const int NotifyingCompleteState = 2;
//定時器
private volatile TimerQueueTimer? _timer;
//定時器回調初始化
private static readonly TimerCallback s_timerCallback = TimerCallback;
//定時器回調委托本質是調用的CancellationTokenSource的NotifyCancellation方法
private static void TimerCallback(object? state) =>
((CancellationTokenSource)state!).NotifyCancellation(throwOnFirstException: false);
private void InitializeWithTimer(uint millisecondsDelay)
{
if (millisecondsDelay == 0)
{
//如果定時的毫秒為0,則設置全局狀態為NotifyingCompleteState
_state = NotifyingCompleteState;
}
else
{
//如果超時毫秒不為0則初始化定時器,並設置定時器定時的回調
_timer = new TimerQueueTimer(s_timerCallback, this, millisecondsDelay, Timeout.UnsignedInfinite, flowExecutionContext: false);
}
}
通過這個方法,我們可以可以非常清晰的看到定時初始化的核心操作其實就是初始化一個定時器,而定時的時間就是我們初始化傳遞的毫秒數,其中s_timerCallback
是定時的回調函數,即如果等待超時之后則調用這個委托,其本質正是CancellationTokenSource的NotifyCancellation方法
,這個方法正是處理超時之后的操作[點擊查看源碼👈]
//信號控制類,通過信號判斷是否需要繼續執行或阻塞
private volatile ManualResetEvent? _kernelEvent;
//throwOnFirstException函數是指示如果被取消了是否拋出異常
private void NotifyCancellation(bool throwOnFirstException)
{
//如果任務已經取消則直接直接釋放定時器
if (!IsCancellationRequested && Interlocked.CompareExchange(ref _state, NotifyingState, NotCanceledState) == NotCanceledState)
{
TimerQueueTimer? timer = _timer;
if (timer != null)
{
_timer = null;
timer.Close();
}
//信號量涉及到了一個重要的屬性WaitHandle接下來會說
_kernelEvent?.Set();
//執行取消操作,是取消操作的核心,講取消操作的時候咱們會着重說這個
ExecuteCallbackHandlers(throwOnFirstException);
Debug.Assert(IsCancellationCompleted, "Expected cancellation to have finished");
}
}
NotifyCancellation正是處理定時器到時的操作,說白了就是到了指定的時間但是沒有手動取消執行的操作,其實也是執行的取消操作,這個方法里涉及到了兩個比較重要的點,也是接下來我們會分析的點,這里做一下說明
- 首先是
ManualResetEvent
這個實例,這個類的功能是通過信號機制控制是否阻塞或執行后續操作,與之相輔的還有另一個類AutoResetEvent
。這兩個類實現的效果是一致的,只是ManualResetEvent需要手動重置初始狀態,而AutoResetEvent則會自動重置。有關兩個類的說明,這里不做過多介紹,有需要了解的同學們可以自行百度。而CancellationTokenSource類的一個重要屬性WaitHandle
正是使用的它。 - 還有一個是
ExecuteCallbackHandlers
方法,這個是CancellationTokenSource執行取消操作的核心操作。為了保證閱讀的順序性,咱們在講取消操作的時候在重點講這個方法。
上面提到了,為了保證閱讀的順序性方便理解,咱們在本文接下來會講解這兩部分,就不再初始化這里講解了,這里做一下標記,以防大家覺得沒講清楚就繼續了。
小插曲WaitHandle
上面我們提到了CancellationTokenSource的WaitHandle屬性,它是基於ManualResetEvent實現的。這個算是一個稍微獨立的地方,我們可以先進行講解一下[點擊查看源碼👈]
private volatile ManualResetEvent? _kernelEvent;
internal WaitHandle WaitHandle
{
get
{
ThrowIfDisposed();
//如果初始化過了則直接返回
if (_kernelEvent != null)
{
return _kernelEvent;
}
//初始化一個ManualResetEvent,給定初始值為false
var mre = new ManualResetEvent(false);
//線程安全操作如果有別的線程初始了則釋放上面初始化的操作
if (Interlocked.CompareExchange(ref _kernelEvent, mre, null) != null)
{
mre.Dispose();
}
//如果任務已取消則后續操作不阻塞
if (IsCancellationRequested)
{
_kernelEvent.Set();
}
return _kernelEvent;
}
}
通過這段代碼我們可以看到,如果使用了WaitHandle屬性則可以使用它實現簡單的阻塞通知操作,也就是收到取消通知操作之后我們可以執行WaitHandle之后的操作,但是WaitHandle是internal修飾的,我們該怎么使用呢?莫慌,我們知道CancellationTokenSource的Token屬性獲取的是CancellationToken
實例[點擊查看源碼👈]
public CancellationToken Token
{
get
{
ThrowIfDisposed();
return new CancellationToken(this);
}
}
直接實例化了一個CancellationToken實例返回去了,並傳遞了當前CancellationTokenSource實例,找到CancellationToken
的這個構造函數[點擊查看源碼👈]
private readonly CancellationTokenSource? _source;
internal CancellationToken(CancellationTokenSource? source) => _source = source;
public WaitHandle WaitHandle => (_source ?? CancellationTokenSource.s_neverCanceledSource).WaitHandle;
通過上面的代碼我們可以看到通過CancellationToken實例便可以使用WaitHandle屬性,實現我們訪問到它的效果,光是說的話可能有點迷糊,通過一個簡單的示例我們來了解WaitHandle的使用方式,簡單來看下
CancellationTokenSource tokenSource = new CancellationTokenSource();
CancellationToken cancellationToken = tokenSource.Token;
cancellationToken.Register(() => System.Console.WriteLine("被取消了."));
tokenSource.CancelAfter(5000);
Task.Run(()=> {
System.Console.WriteLine("阻塞之前");
cancellationToken.WaitHandle.WaitOne();
System.Console.WriteLine("阻塞取消,執行到了.");
});
System.Console.WriteLine("執行到了這里");
在CancellationTokenSource為被取消之前WaitHandle.WaitOne()
方法會阻塞后續執行,也就是下面的輸出暫時不會輸出。等到CancellationTokenSource執行了Cancel操作里調用了ManualResetEvent的Set方法停止阻塞,后續的輸出才會被執行到這是一個同步操作,如果了解ManualResetEvent
的同學相信對這個不難理解。為了演示效果我用Task演示異步的情況,所以執行的結果如下所示
執行到了這里
阻塞之前
阻塞取消,執行到了.
被取消了.
注冊操作
上面我們大概講解了一些初始化相關的和一些輔助的操作,接下來我們看一下核心的注冊操作,注冊操作的用途就是注冊CancellationTokenSource取消或超時后需要執行的動作,而注冊Register
的操作並未由CancellationTokenSource直接進行,而是通過它的Token
屬性即CancellationToken實例操作的,話不多說直接找到CancellationToken的Register方法[點擊查看源碼👈]
public CancellationTokenRegistration Register(Action callback) =>
Register(
s_actionToActionObjShunt,
callback ?? throw new ArgumentNullException(nameof(callback)),
useSynchronizationContext: false,
useExecutionContext: true);
它是直接調用自己的重載方法,注意幾個參數,如果看細節的話還是要關注方法參數的。過程就省略了,直接找到最底層的方法[點擊查看源碼👈]
private CancellationTokenRegistration Register(Action<object?> callback, object? state, bool useSynchronizationContext, bool useExecutionContext)
{
if (callback == null)
throw new ArgumentNullException(nameof(callback));
//_source就是傳遞下來的CancellationTokenSource
CancellationTokenSource? source = _source;
//本質是調用的CancellationTokenSource的InternalRegister方法
return source != null ?
source.InternalRegister(callback, state, useSynchronizationContext ? SynchronizationContext.Current : null, useExecutionContext ? ExecutionContext.Capture() : null) :
default;
從這個最底層的方法我們可以得知,其本質還是調用CancellationTokenSource的InternalRegister方法,核心操作都不在CancellationToken還是在CancellationTokenSource類,CancellationToken更像是依賴CancellationTokenSource的表現類,看一下InternalRegister方法[點擊查看源碼👈]
//初始化CallbackPartition數組
private volatile CallbackPartition?[]? _callbackPartitions;
//獲取初始化上面數組的長度,根據當前CPU核心數獲取的
private static readonly int s_numPartitions = GetPartitionCount();
internal CancellationTokenRegistration InternalRegister(
Action<object?> callback, object? stateForCallback, SynchronizationContext? syncContext, ExecutionContext? executionContext)
{
//判斷有沒有被取消
if (!IsCancellationRequested)
{
//如果已被釋放直接返回
if (_disposed)
{
return default;
}
CallbackPartition?[]? partitions = _callbackPartitions;
if (partitions == null)
{
//首次調用初始化CallbackPartition數組
partitions = new CallbackPartition[s_numPartitions];
//判斷_callbackPartitions如果為null,則把partitions賦值給_callbackPartitions
partitions = Interlocked.CompareExchange(ref _callbackPartitions, partitions, null) ?? partitions;
}
//獲取當前線程使用的分區下標
int partitionIndex = Environment.CurrentManagedThreadId & s_numPartitionsMask;
//獲取一個CallbackPartition
CallbackPartition? partition = partitions[partitionIndex];
if (partition == null)
{
//初始化CallbackPartition實例
partition = new CallbackPartition(this);
//如果partitions的partitionIndex下標位置為null則使用partition填充
partition = Interlocked.CompareExchange(ref partitions[partitionIndex], partition, null) ?? partition;
}
long id;
CallbackNode? node;
bool lockTaken = false;
//鎖住操作
partition.Lock.Enter(ref lockTaken);
try
{
id = partition.NextAvailableId++;
//獲取CallbackNode,這事真正存儲回調的地方,不要被List名字迷惑,其實是要構建鏈表
node = partition.FreeNodeList;
if (node != null)
{
//這個比較有意思如果CallbackNode不是首次,則把最新的賦值給FreeNodeList
partition.FreeNodeList = node.Next;
}
else
{
//首次的時候初始化一個CallbackNode實例
node = new CallbackNode(partition);
}
node.Id = id;
//Register的回調操作賦值給了CallbackNode的Callback
node.Callback = callback;
node.CallbackState = stateForCallback;
node.ExecutionContext = executionContext;
node.SynchronizationContext = syncContext;
//構建一個CallbackNode鏈表,從下面的代碼可以看出來構建的其實是倒序鏈表,最新的CallbackNode是表頭
node.Next = partition.Callbacks;
if (node.Next != null)
{
node.Next.Prev = node;
}
//Callbacks記錄的是當前的節點,如果下一次進來新節點則作為新節點的Next節點
partition.Callbacks = node;
}
finally
{
//釋放鎖
partition.Lock.Exit(useMemoryBarrier: false);
}
//用當前注冊回調生成的CallbackNode節點生成CancellationTokenRegistration實例
var ctr = new CancellationTokenRegistration(id, node);
//如果未被取消則直接返回
if (!IsCancellationRequested || !partition.Unregister(id, node))
{
return ctr;
}
}
//走到這里說明IsCancellationRequested已經等於true了也就是被取消了,則直接執行該回調
callback(stateForCallback);
return default;
}
這里涉及到一個比較核心的類那就是CallbackPartition
,這是一個內部類,它的主要用途就是輔助構建執行回調的鏈表操作,其大概實現是這個樣子的[點擊查看源碼👈]
internal sealed class CallbackPartition
{
public readonly CancellationTokenSource Source;
//使用了自旋鎖
public SpinLock Lock = new SpinLock(enableThreadOwnerTracking: false);
public CallbackNode? Callbacks;
public CallbackNode? FreeNodeList;
public long NextAvailableId = 1;
public CallbackPartition(CancellationTokenSource source)
{
Source = source;
}
internal bool Unregister(long id, CallbackNode node)
{
//這里面有內容,就不羅列了,判斷CallbackNode是否被取消注冊,如果為false說明未被取消注冊
}
}
這里面我暫時沒有列出Unregister
的內容,因為它是和取消相關的,說到取消的時候咱們再看,如果返回true則說明取消成功。這個類核心就是輔助構建Register回調鏈表的,它的核心都是在操作CallbackNode
節點和其構建的回調鏈表,而CallbackNode則是鏈表的一個節點定義,其大致結構如下[點擊查看源碼👈]
internal sealed class CallbackNode
{
public readonly CallbackPartition Partition;
//構建鏈表的核心Prev和Next
public CallbackNode? Prev;
public CallbackNode? Next;
public long Id;
//回調操作被這個委托記錄
public Action<object?>? Callback;
public object? CallbackState;
public ExecutionContext? ExecutionContext;
public SynchronizationContext? SynchronizationContext;
public CallbackNode(CallbackPartition partition)
{
Partition = partition;
}
public void ExecuteCallback()
{
//這里也有代碼,暫時不列出來,講取消的時候單獨講解
}
}
到了這里關於Register
涉及到的核心操作都羅列出來了,由於貼出來的是源碼相關看着是比較蒙圈的,但是如果順着看的話其實還是大致的實現思路還是可以理解的,這里我大致的總結一下它的實現思路
- 首先是構建了
CallbackPartition
數組,構建這個數組的長度是根據CPU的核心數來決定,每個CallbackPartition是操作的核心,為了防止過多的線程同時操作一個CallbackPartition實例,它采用了為不同線程分區的思路,CallbackPartition維護了構建鏈表節點的類CallbackNode。 CallbackNode
是組成鏈表的核心,CallbackNode每個實例都是鏈表的一個節點,從它自包含Prev和Next屬性便可以看出是一個雙向鏈表。- CallbackPartition的核心功能就是為了構建
Register
進來的回調,從上面的InternalRegister
方法里的操作我們可以得知,通過CallbackPartition的輔助將CallbackNode節點構建為一個倒序鏈表,也就是最新的CallbackNode實例是鏈表的首節點,而最老的CallbackNode實例則是鏈表的尾節點。每一次Register進來的回調,都被包裝成了CallbackNode添加到這個鏈表中。
上面InternalRegister方法里我們看到操作CallbackNode的時候,使用了SpinLock自旋鎖。短時間鎖定的情況下SpinLock更快,因為自旋鎖本質上不會讓線程休眠,而是一直循環嘗試對資源訪問,直到可用。所以自旋鎖線程被阻塞時,不進行線程上下文切換,而是空轉等待。對於多核CPU而言,減少了切換線程上下文的開銷,從而提高了性能。
取消操作
上面我們看到了注冊相關的操作,注冊還是比較統一的,就一種操作方式。取消卻有兩種方式,一種是超時取消,另一種是主動取消,接下來我們就分別看一下這兩種方式分別是如何操作的。
Cancel操作
首先我們來看主動取消的操作方式這個是最簡單最直接的方式,而且這個方法屬於CancellationTokenSource類,話不多說直接看實現[點擊查看源碼👈]
public void Cancel() => Cancel(false);
public void Cancel(bool throwOnFirstException)
{
ThrowIfDisposed();
NotifyCancellation(throwOnFirstException);
}
重點來了Cancel
方法居然也是調用的NotifyCancellation
方法,這個方法咱們上面已經看過了。在說定時的方式構造CancellationTokenSource的時候有一個自動取消的操作,提到了NotifyCancellation
方法的核心是ExecuteCallbackHandlers
方法,這個是CancellationTokenSource執行取消操作的核心操作。還說了為了保證閱讀的順序性,咱們在講取消操作的時候在重點講這個方法。看來這個時刻終於還是到來了,直接打開ExecuteCallbackHandlers方法[點擊查看源碼👈]
private volatile int _threadIDExecutingCallbacks = -1;
private volatile CallbackPartition?[]? _callbackPartitions;
private const int NotifyingCompleteState = 3;
private void ExecuteCallbackHandlers(bool throwOnFirstException)
{
//獲取當前線程ID
ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId;
//將_callbackPartitions置為null,但是partitions不為null,因為Exchange返回的是改變之前的值
CallbackPartition?[]? partitions = Interlocked.Exchange(ref _callbackPartitions, null);
//如果partitions為null說明是回調已經通知完成狀態了直接返回
if (partitions == null)
{
Interlocked.Exchange(ref _state, NotifyingCompleteState);
return;
}
List<Exception>? exceptionList = null;
try
{
//遍歷CallbackPartition數組
foreach (CallbackPartition? partition in partitions)
{
//CallbackPartition實例為null說明這個分區未被使用直接跳過
if (partition == null)
{
continue;
}
//循環處理CallbackNode鏈表
while (true)
{
CallbackNode? node;
bool lockTaken = false;
//鎖住當前操作
partition.Lock.Enter(ref lockTaken);
try
{
//獲取鏈表的節點
node = partition.Callbacks;
//為null說明沒Register過直接中斷
if (node == null)
{
break;
}
else
{
//如果鏈表遍歷不是尾節點,切斷和下一個節點的關聯
if (node.Next != null) node.Next.Prev = null;
//把下一個節點賦值給Callbacks
partition.Callbacks = node.Next;
}
//當前執行節點ID
_executingCallbackId = node.Id;
node.Id = 0;
}
finally
{
//退出鎖
partition.Lock.Exit(useMemoryBarrier: false);
}
try
{
//如果當時傳遞了同步上下文則直接在當時的上下文調用ExecuteCallback委托
if (node.SynchronizationContext != null)
{
node.SynchronizationContext.Send(static s =>
{
var n = (CallbackNode)s!;
n.Partition.Source.ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId;
n.ExecuteCallback();
}, node);
ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId;
}
else
{
//如果沒有傳遞SynchronizationContext則直接調用ExecuteCallback委托
//即調用Register的注冊的委托
node.ExecuteCallback();
}
}
catch (Exception ex) when (!throwOnFirstException)
{
(exceptionList ??= new List<Exception>()).Add(ex);
}
}
}
}
finally
{
//將全局狀態置為通知完成狀態
//即已經調用過Register回調
_state = NotifyingCompleteState;
Volatile.Write(ref _executingCallbackId, 0);
Interlocked.MemoryBarrier();
}
//如果中途存在異常則拋出
if (exceptionList != null)
{
Debug.Assert(exceptionList.Count > 0, $"Expected {exceptionList.Count} > 0");
throw new AggregateException(exceptionList);
}
}
關於ExecuteCallback
方法是CallbackNode
類的方法,也就是咱們上面羅列CallbackNode類結構時被省略的方法,它的主要功能就是調用Register的回調,也就是執行Register里的委托。欠下的我會補上來,注意這里是CallbackNode
類,接下來看下實現[點擊查看源碼👈]
public ExecutionContext? ExecutionContext;
public void ExecuteCallback()
{
ExecutionContext? context = ExecutionContext;
//如果Register的時候允許傳遞ExecutionContext則直接用這個上下文執行回調Callback
//Callback委托也就是承載Register的委托操作
if (context != null)
{
ExecutionContext.RunInternal(context, static s =>
{
Debug.Assert(s is CallbackNode, $"Expected {typeof(CallbackNode)}, got {s}");
CallbackNode n = (CallbackNode)s;
Debug.Assert(n.Callback != null);
n.Callback(n.CallbackState);
}, this);
}
else
{
Debug.Assert(Callback != null);
//直接在當前線程調用Callback
//Callback委托也就是承載Register的委托操作
Callback(CallbackState);
}
}
關於取消的核心方法ExecuteCallbackHandlers
的重要操作,咱們已經羅列出來了,其實我們看到注冊的思路的時候,就已經能猜到執行取消回調的大致思路了,既然Register的時候進行了拉鏈,那么取消執行注冊回調肯定是變量鏈表執行里面的Callback了,大致總結一下
- 執行Cancel之后核心操作還是針對構建的CallbackNode鏈表進行遍歷,咱們之前說過構建的CallbackNode鏈表是倒序鏈表,最新的節點放在鏈表的首部,這也就解釋了為啥我們上面的示例Register多個委托的時候,最先輸出的是最后注冊委托。
- Register注冊時候有參數判斷是否需要傳遞當前同步上下文SynchronizationContext和執行上下文ExecutionContext,作用就是為了是否在當時的上下文環境執行Callback回調操作。
- 上面的遍歷代碼我們看到了會執行
CallbackNode.Next.Prev=null
的操作,是為了斷開當前鏈表節點和上下節點的關系,個人感覺是為了切斷對象引用方便釋放的,防止內存泄漏,同時也說明了默認情況下Register的的回調函數執行是一次性的,當執行完Cancel操作之后當前CancellationToken實例也就失效了。
CancelAfter操作
之前我們演示的時候說過有兩種方式可以執行超時取消操作,一種是在構建CancellationTokenSource實例構造的時候傳遞超時時間,還有另一種是使用CancelAfter
操作,這個方法表示在指定時間之后取消,效果上等同於實例化CancellationTokenSource的時候傳遞超時時間的操作,廢話不多說直接羅列代碼[點擊查看源碼👈]
public void CancelAfter(TimeSpan delay)
{
long totalMilliseconds = (long)delay.TotalMilliseconds;
if (totalMilliseconds < -1 || totalMilliseconds > int.MaxValue)
{
throw new ArgumentOutOfRangeException(nameof(delay));
}
//調用的是重載的CancelAfter方法
CancelAfter((int)totalMilliseconds);
}
private static readonly TimerCallback s_timerCallback = obj =>
{
((CancellationTokenSource)obj).NotifyCancellation(throwOnFirstException: false);
};
public void CancelAfter(int millisecondsDelay)
{
//傳遞的毫秒數不能小於-1
if (millisecondsDelay < -1)
{
throw new ArgumentOutOfRangeException(nameof(millisecondsDelay));
}
//如果已經取消則直接返回
if (IsCancellationRequested)
{
return;
}
//注冊一個定時器執行s_timerCallback
//s_timerCallback在上面我們介紹過了 本這就是調用CancellationTokenSource的NotifyCancellation方法
TimerQueueTimer? timer = _timer;
if (timer == null)
{
timer = new TimerQueueTimer(s_timerCallback, this, Timeout.UnsignedInfinite, Timeout.UnsignedInfinite, flowExecutionContext: false);
TimerQueueTimer? currentTimer = Interlocked.CompareExchange(ref _timer, timer, null);
if (currentTimer != null)
{
timer.Close();
timer = currentTimer;
}
}
try
{
timer.Change((uint)millisecondsDelay, Timeout.UnsignedInfinite);
}
catch (ObjectDisposedException)
{
}
}
通過上面的源碼我們可以看到CancelAfter的操作代碼和傳遞超時時間構造CancellationTokenSource的代碼基本上是一致的,都是通過TimerQueueTimer的方式定時觸發調用CancellationTokenSource的NotifyCancellation方法,而NotifyCancellation
方法的核心實現就是ExecuteCallbackHandlers
方法,這些方法咱們上面都有講解過,就不重復介紹了,這樣關於取消相關的操作我們也就全部講解完成了。
總結
本文我們主要講解了C#取消令牌CancellationTokenSource,雖然設計到的類並不多,但是這部分源碼並不少,而且也只是講解核心功能的部分源碼,有興趣的同學可以自行閱讀這個類相關代碼,如果你覺得你的GitHub比較不給力推薦一個可以閱讀CoreCLR源碼的網站source.dot.net這個網站看到的是目前CoreCLR最新的源碼,可以直接連接到GitHub非常方便,但是最新版本的源碼和穩定版本的有些差別,這個還需要注意。由於文章比較長,再加上筆者技術能力和文筆能力都有限,這里做一下簡單的總結
- CancellationTokenSource的用途就是可以感知到取消操作,其中涉及到的Register回調、WaitHandle、IsCancellationRequested都能實現這個功能,當然它還支持超時取消操作。
- CancellationTokenSource的Register和Cancel相關成雙成對的,雖然有CancelAfter和構造傳遞超時時間的方式,其本質和Cancel操作是一樣的。
- CancellationTokenSource的核心操作原理,是通過
CallbackPartition
和CallbackNode
構建倒序鏈表,Register的時候通過Callback委托構建鏈表,Cancel的時候遍歷構建的鏈表執行Callback,雖然有一堆額外操作,但是核心工作方式就是鏈表操作。 - 需要注意的是,默認情況下CancellationTokenSource產生的CancellationToken是一次性的,取消了之后是沒有辦法進行重置的,當然微軟已經為我們提供了
IChangeToken
去解決了CancellationToken重復觸發的問題,請放心使用。
由於本篇文章篇幅較長,加上筆者能力有限,文筆更是一般,如果講解的不清楚還望諒解,或者感興趣的同學可以自行閱讀源碼。關於看源碼每個人都有自己的關注點,我一般的初衷都是弄明白它的原理,順便學習下它代碼風格或思路。學無止境,結果有時候並不那么重要,過程才重要。就和許多人追求自己能有到達什么樣的高度,成功其實只是成長過程中順便的一種表現,就和你如果不滿現狀,說明你在很早之前沒想過改變自己一樣。
