最近在學習.NET4.5關於“並行任務”的使用。“並行任務”有自己的同步機制,沒有顯示給出類似如舊版本的:事件等待句柄、信號量、lock、ReaderWriterLock……等同步基元對象,但我們可以沿溪這一編程習慣,那么這系列翻譯就是給“並行任務”封裝同步基元對象。翻譯資源來源《(譯)關於Async與Await的FAQ》
1. 構建Async同步基元,Part 1 AsyncManualResetEvent
2. 構建Async同步基元,Part 2 AsyncAutoResetEvent
3. 構建Async同步基元,Part 3 AsyncCountdownEvent
4. 構建Async同步基元,Part 4 AsyncBarrier
5. 構建Async同步基元,Part 5 AsyncSemaphore
6. 構建Async同步基元,Part 6 AsyncLock
7. 構建Async同步基元,Part 7 AsyncReaderWriterLock
開始:Async同步基元,Part 1 AsyncManualResetEvent
基於任務異步模式(TAP)不僅僅是關於開始然后異步等待完成的異步操作,更概括的說,任務可以用來指代各種事件,使你能夠等待任何事的條件發生。我們甚至可以使用任務來構建簡單的同步基元,這些同步基元類似.NET原生提供的非任務版本,但是它們允許等待異步完成。
線程同步基元之一:事件等待句柄,它們存在於.NET Framework。ManualResetEvent 和AutoResetEvent和.NET 4為ManualResetEvent新增的優化版本ManualResetEventSlim。事件等待句柄就是一方等待另一方提供信號。比如ManualResetEvent,他會在調用Set()后保持信號直到顯示調用Reset()。
TaskCompletionSource<TResult>本身基於SpinWait結構與Task的IsCompleted屬性實現類似事件等待句柄,僅僅是缺少Reset()方法。
// 表示未綁定到委托的 System.Threading.Tasks.Task<TResult> 的制造者方,
// 並通過 Tasks.TaskCompletionSource<TResult>.Task屬性提供對使用者方的訪問。
public class TaskCompletionSource<TResult>
{
public TaskCompletionSource();
// 獲取由此 Tasks.TaskCompletionSource<TResult> 創建的 Tasks.Task<TResult>。
public Task<TResult> Task { get; }
// 將基礎 Tasks.Task<TResult> 轉換為 Tasks.TaskStatus.Canceled狀態。
public void SetCanceled();
public bool TrySetCanceled();
// 將基礎 Tasks.Task<TResult> 轉換為 Tasks.TaskStatus.Faulted狀態。
public void SetException(Exception exception);
public void SetException(IEnumerable<Exception> exceptions);
public bool TrySetException(Exception exception);
public bool TrySetException(IEnumerable<Exception> exceptions);
// 嘗試將基礎 Tasks.Task<TResult> 轉換為 TaskStatus.RanToCompletion狀態。
public bool TrySetResult(TResult result);
……
}
TaskCompletionSource<TResult>開始於無信號,它指代的任務不能完成,因此,等待這個任務的“異步方法”也不能完成。(Try)Set*方法充當信號,將任務切換到完成狀態,這樣才能完成等待任務。因此我們可以很容易基於TaskCompletionSource<TResult>來構建一個AsyncManualResetEvent。它可以為我們提供缺失的Reset()能力。接下來我們構建此AsyncManualResetEvent。
這是我們將構建的目標類型:
public class AsyncManualResetEvent
{
public Task WaitAsync();
public void Set();
public void Reset();
}
WaitAsync()和Set()方法非常簡單,直接封裝TaskCompletionSource<bool>實例成員,如下:
public class AsyncManualResetEvent
{
private volatile TaskCompletionSource<bool> m_tcs = new TaskCompletionSource<bool>();
public Task WaitAsync() { return m_tcs.Task; }
public void Set() { m_tcs.TrySetResult(true); }
…
}
剩下的只有Reset()方法了。我們的目標是使隨后調用的WaitAsync()中返回的Task無法完成。因為Task最終狀態只有完成狀態(即,正常完成、取消、異常),所以我們需要切換一個新的TaskCompletionSource<bool>實例。這樣做,我們只需要確保如果多個線程同時調用Reset()、Set()和WaitAsync(), WaitAsync()不會返回孤立的Task(即,我們不希望一個線程調用WaitAsync()返回一個不能完成的Task(已經被Reset())后,另一個線程又在新的Task上調用Set())。為了達到此目的,我們將確保如果當前Task已經完成就切換一個新的Task,並且還確保這個切換操作的原子性。(當然,還有其他策略實現此目標,這僅僅是我選擇的一個特定例子)
注意:關鍵字volatile和Interlocked類的使用。
public class AsyncManualResetEvent
{
private volatile TaskCompletionSource<bool> m_tcs = new TaskCompletionSource<bool>();
public Task WaitAsync() { return m_tcs.Task; }
public void Set() { m_tcs.TrySetResult(true); }
public void Reset()
{
while (true)
{
var tcs = m_tcs;
if (!tcs.Task.IsCompleted ||
Interlocked.CompareExchange(ref m_tcs, new TaskCompletionSource<bool>(), tcs) == tcs)
return;
}
}
}
到此,我們的AsyncManualResetEvent已經完成。然而,還有一個重要的潛在行為要記住。在之前的文章中,我們談論過延續任務和他們是如何同步執行,這意味着延續任務將作為任務完成的一部分執行,在同一個線程上同步完成任務。對於TaskCompletionSource<TResult>,這意味着同步延續任務將作為(Try)Set*方法的一部分執行,也就是說,在AsyncManualResetEvent例子中,延續任務將作為Set()方法的一部分執行。根據你的需求,如果你不希望這種事情發生,有一些替代的方法。一種方法是異步運行(Try)Set*方法,並使Set()調用阻塞,直到任務真真完成(只是任務本身,不包括任務的延續任務)。Eg:
public void Set()
{
var tcs = m_tcs;
Task.Factory.StartNew(s => ((TaskCompletionSource<bool>)s).TrySetResult(true), tcs
, CancellationToken.None, TaskCreationOptions.PreferFairness, TaskScheduler.Default);
tcs.Task.Wait();
}
當然,還有其他可能的方法,如何實現取決於你的需求。
這就是本節要講的AsyncManualResetEvent。
完整源碼如下:
public class AsyncManualResetEvent
{
private volatile TaskCompletionSource<bool> m_tcs = new TaskCompletionSource<bool>();
public Task WaitAsync() { return m_tcs.Task; }
public void Set()
{
var tcs = m_tcs;
Task.Factory.StartNew(s => ((TaskCompletionSource<bool>)s).TrySetResult(true), tcs
, CancellationToken.None, TaskCreationOptions.PreferFairness, TaskScheduler.Default);
tcs.Task.Wait();
}
public void Reset()
{
while (true)
{
var tcs = m_tcs;
// 短邏輯單元 確保如果當前Task已經完成就切換一個新的Task。
if (!tcs.Task.IsCompleted ||
Interlocked.CompareExchange(ref m_tcs, new TaskCompletionSource<bool>(), tcs) == tcs)
return;
}
}
}
下一節,我將實現一個async版本的AutoResetEvent。
推薦閱讀:
感謝你的觀看……
原文:《Building Async Coordination Primitives, Part 1: AsyncManualResetEvent》
