聊聊.net 並發控制,lock,Monitor,Semaphore,BlockingQueue,樂觀鎖串講


面試(對,最近在找工作面試...)被問到,.net 並發控制怎么做,BlockingQueue和ConcurrentQueue有什么區別?

多線程問題的核心是控制對臨界資源的訪問,接下來我們聊聊.net並發控制,可能除了第一個”lock”,對於其他的幾個概念都很陌生,那么這篇文章應該對你有幫助。

lock

Monitor

Semaphore

ConcurrentQueue

BlockingQueue

BlockingCollection

 

一、lock

說到並發控制,我們首先想到的肯定是 lock關鍵字。

這里要說一下,lock鎖的究竟是什么?是lock下面的代碼塊嗎,不,是locker對象。

我們想象一下,locker對象相當於一把門鎖(或者鑰匙),后面代碼塊相當於屋里的資源。

哪個線程先控制這把鎖,就有權訪問代碼塊,訪問完成后再釋放權限,下一個線程再進行訪問。

注意:如果代碼塊中的邏輯執行時間很長,那么其他線程也會一直等下去,直到上一個線程執行完畢,釋放鎖。

 1         object locker = new object();
 2 
 3         private void Add()
 4         {
 5             lock (locker)
 6             {
 7                 Thread.Sleep(1000);
 8                 counter++;
 9                 this.logger.LogDebug($"{DateTime.Now.ToLongTimeString()} Add counter={counter}.");
10             }
11         }

 

二、Moniter

Monitor是一個靜態類(System.Threading.Monitor),功能與lock關鍵字基本一樣,也是加鎖,控制並發。

有兩個重要的方法:

Monitor.Enter()  //獲取一個鎖

Monitor.Exit()   //釋放一個鎖 

另外幾個方法:

public static bool TryEnter(object obj, int millisecondsTimeout)  //相比於 public static void Enter(object obj) 方法,多了超時時間設置,如果等待超過一定時間,就不再等待了,另外,只有TryEnter()返回值為true時,才能進入代碼塊。

public static bool Wait(object obj, int millisecondsTimeout)    //這個方法在已經獲得鎖權限的代碼塊中調用時,或暫時釋放鎖,等待一定時間后,重新獲取鎖權限,繼續執行Wait后面的代碼。(真想不明怎么會有這種相互禮讓的操作)

public static void Pulse(object obj)      //這個方法的解釋是,通知在等待隊列中的線程,鎖對象狀態改變。(測試發現,此方法並不會真正改變鎖定狀態,只是通知的作用)

 TryEnter代碼示例:

 1         int counter = 0;
 2         object locker = new object();
 3 
 4         private void Minus()
 5         {
 6             //加上try -catch-finally,防止由於異常,鎖無法釋放,這也是為什么我們更多使用lock而不是Moniter的原因。
 7             try
 8             {
 9                 //只有TryEnter()返回值為true時,才能進入代碼塊,與Enter()方法不一樣
10                 if (Monitor.TryEnter(locker, 5000))
11                 {
12                     this.logger.LogDebug($"{DateTime.Now.ToLongTimeString()} Minus in");
13                     Thread.Sleep(1000);
14                     counter--;
15                     this.logger.LogDebug($"{DateTime.Now.ToLongTimeString()} Minus counter={counter}.");
16                 }
17             }
18             catch (Exception ex)
19             {
20                 this.logger.LogDebug($"{DateTime.Now.ToLongTimeString()} Minus Exception {ex.Message}");
21             }
22             finally
23             {
24                 Monitor.Exit(locker);
25             }
26         }

  

通過上面的代碼,我們可以看出Monitor和lock實現的功能基本一致,但Monitor的使用要明顯比lock更復雜,也行這就是我們平時更多的使用lock,而不是Monitor的原因。

 

三、Semaphore 信號量

System.Threading.Semaphore 

lock和Monitor加鎖之后,每次只能有一個線程訪問臨界代碼,信號量類似於一個線程池,線程訪問之前獲取一個信號,訪問完成釋放信號,只要信號量內有可用信號便可以訪問,否則等待。

構造函數:

public Semaphore(int initialCount, int maximumCount)  //創建一個信號量,指定初始信號數量和最大信號數量。

幾個重要方法:

public int Release()        //代碼注釋的意思是:退出信號量,並返回之前的(可用信號)數量。實際上,除了退出,這個方法每調用一次會增加一個可用信號,但數量達到最大數量時會拋異常。

public int Release(int releaseCount)   //和上面的方法類似,上面的方法每次只釋放一個信號,這個方法可以指定信號數量。

public virtual bool WaitOne()    //等待一個可用信號

看下面的示例代碼,如果只初始一個信號量,new Semaphore(1, 100),運行結果與lock和Monitor是一樣的,兩個方法交替執行,如果初始信號量為多個時,new Semaphore(3, 100),執行效率高的方法要占用更多的信號,從而執行更多次。

 1         int counter = 0;
 2         int semaphoreCount = 0;
 3         Semaphore semaphore = new Semaphore(3, 100);
 4 
 5         private void Add()
 6         {
 7             semaphore.WaitOne();
 8             Thread.Sleep(1000);
 9             counter++;
10             semaphoreCount = semaphore.Release();
11             this.logger.LogDebug($"{DateTime.Now.ToLongTimeString()} Add counter={counter}.SemaphoreCount:{semaphoreCount}");
12         }
13 
14         private void Minus()
15         {
16             semaphore.WaitOne();
17             Thread.Sleep(2000);
18             counter--;
19             semaphore.Release();
20             this.logger.LogDebug($"{DateTime.Now.ToLongTimeString()} Minus counter={counter}.SemaphoreCount:{semaphoreCount}");
21         }

 

Semaphore在生產者/消費者模式下的應用

生產者每次添加一個信號,消費者每次消耗一個信號,如果信號量為0,則消費者進入等待狀態。

 1         int counter = 0;
 2         int semaphoreCount = 0;
 3         Semaphore semaphore = new Semaphore(0, int.MaxValue);
 4 
 5         private void Product()
 6         {
 7             semaphoreCount = semaphore.Release();
 8             Thread.Sleep(1000);
 9             counter++;
10             this.logger.LogDebug($"{DateTime.Now.ToLongTimeString()} Product counter={counter}.SemaphoreCount:{semaphoreCount}");
11         }
12 
13         private void Consume()
14         {
15             semaphore.WaitOne();
16             Thread.Sleep(2000);
17             counter--;
18             this.logger.LogDebug($"{DateTime.Now.ToLongTimeString()} Consume counter={counter}.SemaphoreCount:{semaphoreCount}");
19         }

  

四、ConcurrentQueue 和 Queue

.net 集合中有一類線程安全的集合 System.Collections.Concurrent,ConcurrentQueue 就是其中的一個,線程安全的隊列,有普通隊列Queue先進先出的特點,同時又具備多線程安全。

測試過程中發現:

Queue 類的兩個出隊列方法 Dequeue()TryDequeue(out result),在多線程環境下,Dequeue() 會出現並發訪問錯誤,但TryDequeue(out result)不會,即TryDequeue(out result)即使不加鎖,在多線程環境下也運行正常。

ConcurrentQueue 類只有一個出隊列方法 TryDequeue(out result),當然,是線程安全的。

 

五、BlockingQueue

BlockingQueue並不是.net內置的類,如果有人問這個類,那么他多半是在說BlockingCollection

關於 BlockingQueue 有一篇很不錯的文章,可以參考一下:

https://docs.microsoft.com/zh-cn/archive/blogs/toub/blocking-queues

 

六、BlockingCollection

BlockingCollection是.net內置的類,相當於帶有阻塞功能的 ConcurrentQueue ,數據先進先出,相比較ConcurrentQueue ,BlockingCollection在從隊列中讀取數據時,如果隊列為空,那么它會等待(block),直到有數據可讀取。

而ConcurrentQueue ,需要我們自行判斷是否讀取了數據,並且控制循環讀取的頻率。

.net 文檔對這個類解釋的非常詳細,可以仔細閱讀:

https://docs.microsoft.com/zh-cn/dotnet/api/system.collections.concurrent.blockingcollection-1?view=netcore-3.1

七、樂觀鎖

前面講的這些,都是屬於.net提供的並發控制方案,還有另一種更常用的並發控制方式,樂觀鎖。

樂觀鎖本質上並不是加鎖,而是數據版本控制。樂觀鎖的出發點是假定並發錯誤發生的概率很小,從而允許程序並發執行。

首先,數據要有一個版本號,每次數據更新,要產生一個新的版本號。

其次,進入數據處理邏輯之前,記錄該數據的版本號,數據處理結束后,重新讀取數據,比較前后兩個版本號是否一致,如果一致,則提交,處理完成,如果不一致,說明產生了並發錯誤,則拋出異常或已其他方式終止程序執行,從而保證數據的一致性。

 總結

lock是最常用的並發控制方式,Monitor的功能與lock類似,但使用復雜,非必須不建議使用。

Semaphore,信號量,是一個不錯的功能,特定應用場景下非常實用。

ConcurrentQueue 是一個線程安全的隊列,在多線程並發環境下使用,可避免由於並發引起的錯誤。(我們可以使用lock+Queue,實現ConcurrentQueue,自己感興趣可以試一下)

BlockingCollection 帶阻塞功能的 ConcurrentQueue ,沒有可用數據的情況下,進入等待狀態,防止循環訪問,減少CPU資源浪費。(我們可以通過Semaphore+ConcurrentQueue ,實現BlockingCollection ,自己感興趣可以試一下)

   最后,祝大家祝編程快樂。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM