當多個任務或線程並行運行時,難以避免的對某些有限的資源進行並發的訪問。可以考慮使用信號量來進行這方面的控制(System.Threading.Semaphore)是表示一個Windows內核的信號量對象。如果預計等待的時間較短,可以考慮使用SemaphoreSlim,它則帶來的開銷更小。.NetFrameWork中的信號量通過跟蹤進入和離開的任務或線程來協調對資源的訪問。信號量需要知道資源的最大數量,當一個任務進入時,資源計數器會被減1,當計數器為0時,如果有任務訪問資源,它會被阻塞,直到有任務離開為止。
如果需要有跨進程或AppDomain的同步時,可以考慮使用Semaphore。Semaphore是取得的Windows 內核的信號量,所以在整個系統中是有效的。它主要的接口是Release和WaitOne,使用的方式和SemaphoreSlim是一致的。
信號量Semaphore是另外一個CLR中的內核同步對象。在.net中,類Semaphore封裝了這個對象。與標准的排他鎖對象(Monitor,Mutex,SpinLock)不同的是,它不是一個排他的鎖對象,它與SemaphoreSlim,ReaderWriteLock等一樣允許多個有限的線程同時訪問共享內存資源。
Semaphore就好像一個柵欄,有一定的容量,當里面的線程數量到達設置的最大值時候,就沒有線程可以進去。然后,如果一個線程工作完成以后出來了,那下一個線程就可以進去了。Semaphore的WaitOne或Release等操作分別將自動地遞減或者遞增信號量的當前計數值。當線程試圖對計數值已經為0的信號量執行WaitOne操作時,線程將阻塞直到計數值大於0。在構造Semaphore時,最少需要2個參數。信號量的初始容量和最大的容量。
Semaphore的WaitOne或者Release方法的調用大約會耗費1微秒的系統時間,而優化后的SemaphoreSlim則需要大致四分之一微秒。在計算中大量頻繁使用它的時候SemaphoreSlim還是優勢明顯,加上SemaphoreSlim還豐富了不少接口,更加方便我們進行控制,所以在4.0以后的多線程開發中,推薦使用SemaphoreSlim。SemaphoreSlim的實現如下:
public class SemaphoreSlim : IDisposable { private volatile int m_currentCount; //可用數的資源數,<=0開始阻塞 private readonly int m_maxCount; private volatile int m_waitCount; //阻塞的線程數 private object m_lockObj; private volatile ManualResetEvent m_waitHandle; private const int NO_MAXIMUM = Int32.MaxValue; //Head of list representing asynchronous waits on the semaphore. private TaskNode m_asyncHead; // Tail of list representing asynchronous waits on the semaphore. private TaskNode m_asyncTail; // A pre-completed task with Result==true private readonly static Task<bool> s_trueTask = new Task<bool>(false, true, (TaskCreationOptions)InternalTaskOptions.DoNotDispose, default(CancellationToken)); public SemaphoreSlim(int initialCount) : this(initialCount, NO_MAXIMUM){ } public SemaphoreSlim(int initialCount, int maxCount) { if (initialCount < 0 || initialCount > maxCount) { throw new ArgumentOutOfRangeException("initialCount", initialCount, GetResourceString("SemaphoreSlim_ctor_InitialCountWrong")); } if (maxCount <= 0) { throw new ArgumentOutOfRangeException("maxCount", maxCount, GetResourceString("SemaphoreSlim_ctor_MaxCountWrong")); } m_maxCount = maxCount; m_lockObj = new object(); m_currentCount = initialCount; } public void Wait(){Wait(Timeout.Infinite, new CancellationToken());} public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken) { CheckDispose(); if (millisecondsTimeout < -1) { throw new ArgumentOutOfRangeException("totalMilliSeconds", millisecondsTimeout, GetResourceString("SemaphoreSlim_Wait_TimeoutWrong")); } cancellationToken.ThrowIfCancellationRequested(); uint startTime = 0; if (millisecondsTimeout != Timeout.Infinite && millisecondsTimeout > 0) { startTime = TimeoutHelper.GetTime(); } bool waitSuccessful = false; Task<bool> asyncWaitTask = null; bool lockTaken = false; CancellationTokenRegistration cancellationTokenRegistration = cancellationToken.InternalRegisterWithoutEC(s_cancellationTokenCanceledEventHandler, this); try { SpinWait spin = new SpinWait(); while (m_currentCount == 0 && !spin.NextSpinWillYield) { spin.SpinOnce(); } try { } finally { Monitor.Enter(m_lockObj, ref lockTaken); if (lockTaken) { m_waitCount++; } } // If there are any async waiters, for fairness we'll get in line behind if (m_asyncHead != null) { Contract.Assert(m_asyncTail != null, "tail should not be null if head isn't"); asyncWaitTask = WaitAsync(millisecondsTimeout, cancellationToken); } // There are no async waiters, so we can proceed with normal synchronous waiting. else { // If the count > 0 we are good to move on. // If not, then wait if we were given allowed some wait duration OperationCanceledException oce = null; if (m_currentCount == 0) { if (millisecondsTimeout == 0) { return false; } // Prepare for the main wait... // wait until the count become greater than zero or the timeout is expired try { waitSuccessful = WaitUntilCountOrTimeout(millisecondsTimeout, startTime, cancellationToken); } catch (OperationCanceledException e) { oce = e; } } Contract.Assert(!waitSuccessful || m_currentCount > 0, "If the wait was successful, there should be count available."); if (m_currentCount > 0) { waitSuccessful = true; m_currentCount--; } else if (oce != null) { throw oce; } if (m_waitHandle != null && m_currentCount == 0) { m_waitHandle.Reset(); } } } finally { // Release the lock if (lockTaken) { m_waitCount--; Monitor.Exit(m_lockObj); } // Unregister the cancellation callback. cancellationTokenRegistration.Dispose(); } return (asyncWaitTask != null) ? asyncWaitTask.GetAwaiter().GetResult() : waitSuccessful; } private bool WaitUntilCountOrTimeout(int millisecondsTimeout, uint startTime, CancellationToken cancellationToken) { int remainingWaitMilliseconds = Timeout.Infinite; //Wait on the monitor as long as the count is zero while (m_currentCount == 0) { // If cancelled, we throw. Trying to wait could lead to deadlock. cancellationToken.ThrowIfCancellationRequested(); if (millisecondsTimeout != Timeout.Infinite) { remainingWaitMilliseconds = TimeoutHelper.UpdateTimeOut(startTime, millisecondsTimeout); if (remainingWaitMilliseconds <= 0) { // The thread has expires its timeout return false; } } // ** the actual wait ** if (!Monitor.Wait(m_lockObj, remainingWaitMilliseconds)) { return false; } } return true; } public Task<bool> WaitAsync(int millisecondsTimeout, CancellationToken cancellationToken) { CheckDispose(); // Validate input if (millisecondsTimeout < -1) { throw new ArgumentOutOfRangeException("totalMilliSeconds", millisecondsTimeout, GetResourceString("SemaphoreSlim_Wait_TimeoutWrong")); } // Bail early for cancellation if (cancellationToken.IsCancellationRequested) return Task.FromCancellation<bool>(cancellationToken); lock (m_lockObj) { // If there are counts available, allow this waiter to succeed. if (m_currentCount > 0) { --m_currentCount; if (m_waitHandle != null && m_currentCount == 0) m_waitHandle.Reset(); return s_trueTask; } // If there aren't, create and return a task to the caller. // The task will be completed either when they've successfully acquired // the semaphore or when the timeout expired or cancellation was requested. else { Contract.Assert(m_currentCount == 0, "m_currentCount should never be negative"); var asyncWaiter = CreateAndAddAsyncWaiter(); return (millisecondsTimeout == Timeout.Infinite && !cancellationToken.CanBeCanceled) ? asyncWaiter : WaitUntilCountOrTimeoutAsync(asyncWaiter, millisecondsTimeout, cancellationToken); } } } /// <summary>Creates a new task and stores it into the async waiters list.</summary> /// <returns>The created task.</returns> private TaskNode CreateAndAddAsyncWaiter() { Contract.Assert(Monitor.IsEntered(m_lockObj), "Requires the lock be held"); // Create the task var task = new TaskNode(); // Add it to the linked list if (m_asyncHead == null) { Contract.Assert(m_asyncTail == null, "If head is null, so too should be tail"); m_asyncHead = task; m_asyncTail = task; } else { Contract.Assert(m_asyncTail != null, "If head is not null, neither should be tail"); m_asyncTail.Next = task; task.Prev = m_asyncTail; m_asyncTail = task; } // Hand it back return task; } private async Task<bool> WaitUntilCountOrTimeoutAsync(TaskNode asyncWaiter, int millisecondsTimeout, CancellationToken cancellationToken) { Contract.Assert(asyncWaiter != null, "Waiter should have been constructed"); Contract.Assert(Monitor.IsEntered(m_lockObj), "Requires the lock be held"); using (var cts = cancellationToken.CanBeCanceled ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, default(CancellationToken)) : new CancellationTokenSource()) { var waitCompleted = Task.WhenAny(asyncWaiter, Task.Delay(millisecondsTimeout, cts.Token)); if (asyncWaiter == await waitCompleted.ConfigureAwait(false)) { cts.Cancel(); // ensure that the Task.Delay task is cleaned up return true; // successfully acquired } } // If we get here, the wait has timed out or been canceled. // If the await completed synchronously, we still hold the lock. If it didn't, // we no longer hold the lock. As such, acquire it. lock (m_lockObj) { // Remove the task from the list. If we're successful in doing so, // we know that no one else has tried to complete this waiter yet, // so we can safely cancel or timeout. if (RemoveAsyncWaiter(asyncWaiter)) { cancellationToken.ThrowIfCancellationRequested(); // cancellation occurred return false; // timeout occurred } } // The waiter had already been removed, which means it's already completed or is about to // complete, so let it, and don't return until it does. return await asyncWaiter.ConfigureAwait(false) await asyncWaiter.ConfigureAwait(false); } public int Release(){ return Release(1);} public int Release(int releaseCount) { CheckDispose(); // Validate input if (releaseCount < 1) { throw new ArgumentOutOfRangeException( "releaseCount", releaseCount, GetResourceString("SemaphoreSlim_Release_CountWrong")); } int returnCount; lock (m_lockObj) { // Read the m_currentCount into a local variable to avoid unnecessary volatile accesses inside the lock. int currentCount = m_currentCount; returnCount = currentCount; // If the release count would result exceeding the maximum count, throw SemaphoreFullException. if (m_maxCount - currentCount < releaseCount) { throw new SemaphoreFullException(); } // Increment the count by the actual release count currentCount += releaseCount; // Signal to any synchronous waiters int waitCount = m_waitCount; if (currentCount == 1 || waitCount == 1) { Monitor.Pulse(m_lockObj); } else if (waitCount > 1) { Monitor.PulseAll(m_lockObj); } // Now signal to any asynchronous waiters, if there are any. While we've already // signaled the synchronous waiters, we still hold the lock, and thus // they won't have had an opportunity to acquire this yet. So, when releasing // asynchronous waiters, we assume that all synchronous waiters will eventually // acquire the semaphore. That could be a faulty assumption if those synchronous // waits are canceled, but the wait code path will handle that. if (m_asyncHead != null) { Contract.Assert(m_asyncTail != null, "tail should not be null if head isn't null"); int maxAsyncToRelease = currentCount - waitCount; while (maxAsyncToRelease > 0 && m_asyncHead != null) { --currentCount; --maxAsyncToRelease; // Get the next async waiter to release and queue it to be completed var waiterTask = m_asyncHead; RemoveAsyncWaiter(waiterTask); // ensures waiterTask.Next/Prev are null QueueWaiterTask(waiterTask); } } m_currentCount = currentCount; // Exposing wait handle if it is not null if (m_waitHandle != null && returnCount == 0 && currentCount > 0) { m_waitHandle.Set(); } } // And return the count return returnCount; } ///Removes the waiter task from the linked list.</summary> private bool RemoveAsyncWaiter(TaskNode task) { Contract.Requires(task != null, "Expected non-null task"); Contract.Assert(Monitor.IsEntered(m_lockObj), "Requires the lock be held"); // Is the task in the list? To be in the list, either it's the head or it has a predecessor that's in the list. bool wasInList = m_asyncHead == task || task.Prev != null; // Remove it from the linked list if (task.Next != null) task.Next.Prev = task.Prev; if (task.Prev != null) task.Prev.Next = task.Next; if (m_asyncHead == task) m_asyncHead = task.Next; if (m_asyncTail == task) m_asyncTail = task.Prev; Contract.Assert((m_asyncHead == null) == (m_asyncTail == null), "Head is null iff tail is null"); // Make sure not to leak task.Next = task.Prev = null; // Return whether the task was in the list return wasInList; } private static void QueueWaiterTask(TaskNode waiterTask) { ThreadPool.UnsafeQueueCustomWorkItem(waiterTask, forceGlobal: false); } public int CurrentCount { get { return m_currentCount; } } public WaitHandle AvailableWaitHandle { get { CheckDispose(); if (m_waitHandle != null) return m_waitHandle; lock (m_lockObj) { if (m_waitHandle == null) { m_waitHandle = new ManualResetEvent(m_currentCount != 0); } } return m_waitHandle; } } private sealed class TaskNode : Task<bool>, IThreadPoolWorkItem { internal TaskNode Prev, Next; internal TaskNode() : base() {} [SecurityCritical] void IThreadPoolWorkItem.ExecuteWorkItem() { bool setSuccessfully = TrySetResult(true); Contract.Assert(setSuccessfully, "Should have been able to complete task"); } [SecurityCritical] void IThreadPoolWorkItem.MarkAborted(ThreadAbortException tae) { /* nop */ } } }
SemaphoreSlim類有幾個私有字段很重要,m_currentCount表示可用資源,如果m_currentCount>0每次調用Wait都會減1,當m_currentCount<=0時再次調用Wait方法就會阻塞。每次調用Release方法m_currentCount都會加1.m_maxCount表示最大可用資源數,是在構造函數中指定的。m_waitCount表示當前阻塞的線程數。TaskNode m_asyncHead,m_asyncTail這2個變量主要用於異步方法。
我們首先來看看Wait方法,這里還有它的異步版本WaitAsync。在Wait方法中首先檢查m_currentCount是否為0,如果是我們用SpinWait自旋10次;任意一次Wait都需要鎖住m_lockObj對象,m_asyncHead != null表示當前已經存在異步的對象,所以我們調用WaitAsync方法,如果沒有那么我們調用WaitUntilCountOrTimeout方法,該方法在m_currentCount==0會阻塞到到m_currentCount不為0或者超時;看到WaitUntilCountOrTimeout方法中【if (!Monitor.Wait(m_lockObj, remainingWaitMilliseconds))】,就很明了Wait方法中【CancellationTokenRegistration cancellationTokenRegistration = cancellationToken.InternalRegisterWithoutEC(s_cancellationTokenCanceledEventHandler, this)】存在的原因了,確實很巧妙【這里和ManualResetEventSlim相似】。現在我們回到WaitAsync方法,該方法也是首先檢查m_currentCount是否大於0,大於直接返回。否者調用CreateAndAddAsyncWaiter創建一個Task<bool>【Task<bool>是一個鏈表結構】,如果沒有取消且超時大於-1,那么就調用WaitUntilCountOrTimeoutAsync方法,該方法首先包裝一個Task【var waitCompleted = Task.WhenAny(asyncWaiter, Task.Delay(millisecondsTimeout, cts.Token))】然后等待線程【await waitCompleted.ConfigureAwait(false)】返回的是asyncWaiter或者另一個Delay的Task。如果返回的不是asyncWaiter說明已經超時需要調用RemoveAsyncWaiter,然后返回 await asyncWaiter.ConfigureAwait(false),如果返回的是asyncWaiter,那么就調用Cancel方法。那么這里的asyncWaiter.ConfigureAwait(false)什么時候退出了【或者說不阻塞】,這就要看Release中的QueueWaiterTask方法了。
QueueWaiterTask方法或調用TaskNode的ExecuteWorkItem方法。
那現在我們來看看Release方法,該方法會把currentCount加1,然后把等待線程轉為就緒線程【Monitor.Pulse(m_lockObj)或 Monitor.PulseAll(m_lockObj)】,如果存在異步的話,看看還可以釋放幾個異步task【 int maxAsyncToRelease = currentCount - waitCount】,這里Release的注釋很重要,只是沒怎么明白,現釋同步的waiters,然后在釋放異步的waiters,但是釋放同步后鎖的資源沒有釋放,在釋放異步的waiters時候是把currentCount減1,這樣感覺異步waiters優先獲取資源。也不知道我的理解是否正確?
1)當ConfigureAwait(true),代碼由同步執行進入異步執行時,當前同步執行的線程上下文信息(比如HttpConext.Current,Thread.CurrentThread.CurrentCulture)就會被捕獲並保存至SynchronizationContext中,供異步執行中使用,並且供異步執行完成之后(await之后的代碼)的同步執行中使用(雖然await之后是同步執行的,但是發生了線程切換,會在另外一個線程中執行「ASP.NET場景」)。這個捕獲當然是有代價的,當時我們誤以為性能問題是這個地方的開銷引起,但實際上這個開銷很小,在我們的應用場景不至於會帶來性能問題。
2)當Configurewait(flase),則不進行線程上下文信息的捕獲,async方法中與await之后的代碼執行時就無法獲取await之前的線程的上下文信息,在ASP.NET中最直接的影響就是HttpConext.Current的值為null。