面試(對,最近在找工作面試...)被問到,.net 並發控制怎么做,BlockingQueue和ConcurrentQueue有什么區別?
多線程問題的核心是控制對臨界資源的訪問,接下來我們聊聊.net並發控制,可能除了第一個”lock”,對於其他的幾個概念都很陌生,那么這篇文章應該對你有幫助。
lock
Monitor
Semaphore
ConcurrentQueue
BlockingQueue
BlockingCollection
一、lock
說到並發控制,我們首先想到的肯定是 lock關鍵字。
這里要說一下,lock鎖的究竟是什么?是lock下面的代碼塊嗎,不,是locker對象。
我們想象一下,locker對象相當於一把門鎖(或者鑰匙),后面代碼塊相當於屋里的資源。
哪個線程先控制這把鎖,就有權訪問代碼塊,訪問完成后再釋放權限,下一個線程再進行訪問。
注意:如果代碼塊中的邏輯執行時間很長,那么其他線程也會一直等下去,直到上一個線程執行完畢,釋放鎖。
1 object locker = new object(); 2 3 private void Add() 4 { 5 lock (locker) 6 { 7 Thread.Sleep(1000); 8 counter++; 9 this.logger.LogDebug($"{DateTime.Now.ToLongTimeString()} Add counter={counter}."); 10 } 11 }
二、Moniter
Monitor是一個靜態類(System.Threading.Monitor),功能與lock關鍵字基本一樣,也是加鎖,控制並發。
有兩個重要的方法:
Monitor.Enter() //獲取一個鎖
Monitor.Exit() //釋放一個鎖
另外幾個方法:
public static bool TryEnter(object obj, int millisecondsTimeout) //相比於 public static void Enter(object obj) 方法,多了超時時間設置,如果等待超過一定時間,就不再等待了,另外,只有TryEnter()返回值為true時,才能進入代碼塊。
public static bool Wait(object obj, int millisecondsTimeout) //這個方法在已經獲得鎖權限的代碼塊中調用時,或暫時釋放鎖,等待一定時間后,重新獲取鎖權限,繼續執行Wait后面的代碼。(真想不明怎么會有這種相互禮讓的操作)
public static void Pulse(object obj) //這個方法的解釋是,通知在等待隊列中的線程,鎖對象狀態改變。(測試發現,此方法並不會真正改變鎖定狀態,只是通知的作用)
TryEnter代碼示例:
1 int counter = 0; 2 object locker = new object(); 3 4 private void Minus() 5 { 6 //加上try -catch-finally,防止由於異常,鎖無法釋放,這也是為什么我們更多使用lock而不是Moniter的原因。 7 try 8 { 9 //只有TryEnter()返回值為true時,才能進入代碼塊,與Enter()方法不一樣 10 if (Monitor.TryEnter(locker, 5000)) 11 { 12 this.logger.LogDebug($"{DateTime.Now.ToLongTimeString()} Minus in"); 13 Thread.Sleep(1000); 14 counter--; 15 this.logger.LogDebug($"{DateTime.Now.ToLongTimeString()} Minus counter={counter}."); 16 } 17 } 18 catch (Exception ex) 19 { 20 this.logger.LogDebug($"{DateTime.Now.ToLongTimeString()} Minus Exception {ex.Message}"); 21 } 22 finally 23 { 24 Monitor.Exit(locker); 25 } 26 }
通過上面的代碼,我們可以看出Monitor和lock實現的功能基本一致,但Monitor的使用要明顯比lock更復雜,也行這就是我們平時更多的使用lock,而不是Monitor的原因。
三、Semaphore 信號量
System.Threading.Semaphore
lock和Monitor加鎖之后,每次只能有一個線程訪問臨界代碼,信號量類似於一個線程池,線程訪問之前獲取一個信號,訪問完成釋放信號,只要信號量內有可用信號便可以訪問,否則等待。
構造函數:
public Semaphore(int initialCount, int maximumCount) //創建一個信號量,指定初始信號數量和最大信號數量。
幾個重要方法:
public int Release() //代碼注釋的意思是:退出信號量,並返回之前的(可用信號)數量。實際上,除了退出,這個方法每調用一次會增加一個可用信號,但數量達到最大數量時會拋異常。
public int Release(int releaseCount) //和上面的方法類似,上面的方法每次只釋放一個信號,這個方法可以指定信號數量。
public virtual bool WaitOne() //等待一個可用信號
看下面的示例代碼,如果只初始一個信號量,new Semaphore(1, 100),運行結果與lock和Monitor是一樣的,兩個方法交替執行,如果初始信號量為多個時,new Semaphore(3, 100),執行效率高的方法要占用更多的信號,從而執行更多次。
1 int counter = 0; 2 int semaphoreCount = 0; 3 Semaphore semaphore = new Semaphore(3, 100); 4 5 private void Add() 6 { 7 semaphore.WaitOne(); 8 Thread.Sleep(1000); 9 counter++; 10 semaphoreCount = semaphore.Release(); 11 this.logger.LogDebug($"{DateTime.Now.ToLongTimeString()} Add counter={counter}.SemaphoreCount:{semaphoreCount}"); 12 } 13 14 private void Minus() 15 { 16 semaphore.WaitOne(); 17 Thread.Sleep(2000); 18 counter--; 19 semaphore.Release(); 20 this.logger.LogDebug($"{DateTime.Now.ToLongTimeString()} Minus counter={counter}.SemaphoreCount:{semaphoreCount}"); 21 }
Semaphore在生產者/消費者模式下的應用
生產者每次添加一個信號,消費者每次消耗一個信號,如果信號量為0,則消費者進入等待狀態。
1 int counter = 0; 2 int semaphoreCount = 0; 3 Semaphore semaphore = new Semaphore(0, int.MaxValue); 4 5 private void Product() 6 { 7 semaphoreCount = semaphore.Release(); 8 Thread.Sleep(1000); 9 counter++; 10 this.logger.LogDebug($"{DateTime.Now.ToLongTimeString()} Product counter={counter}.SemaphoreCount:{semaphoreCount}"); 11 } 12 13 private void Consume() 14 { 15 semaphore.WaitOne(); 16 Thread.Sleep(2000); 17 counter--; 18 this.logger.LogDebug($"{DateTime.Now.ToLongTimeString()} Consume counter={counter}.SemaphoreCount:{semaphoreCount}"); 19 }
四、ConcurrentQueue 和 Queue
.net 集合中有一類線程安全的集合 System.Collections.Concurrent,ConcurrentQueue 就是其中的一個,線程安全的隊列,有普通隊列Queue先進先出的特點,同時又具備多線程安全。
測試過程中發現:
Queue 類的兩個出隊列方法 Dequeue() 和 TryDequeue(out result),在多線程環境下,Dequeue() 會出現並發訪問錯誤,但TryDequeue(out result)不會,即TryDequeue(out result)即使不加鎖,在多線程環境下也運行正常。
ConcurrentQueue 類只有一個出隊列方法 TryDequeue(out result),當然,是線程安全的。
五、BlockingQueue
BlockingQueue並不是.net內置的類,如果有人問這個類,那么他多半是在說BlockingCollection
關於 BlockingQueue 有一篇很不錯的文章,可以參考一下:
https://docs.microsoft.com/zh-cn/archive/blogs/toub/blocking-queues
六、BlockingCollection
BlockingCollection是.net內置的類,相當於帶有阻塞功能的 ConcurrentQueue ,數據先進先出,相比較ConcurrentQueue ,BlockingCollection在從隊列中讀取數據時,如果隊列為空,那么它會等待(block),直到有數據可讀取。
而ConcurrentQueue ,需要我們自行判斷是否讀取了數據,並且控制循環讀取的頻率。
.net 文檔對這個類解釋的非常詳細,可以仔細閱讀:
七、樂觀鎖
前面講的這些,都是屬於.net提供的並發控制方案,還有另一種更常用的並發控制方式,樂觀鎖。
樂觀鎖本質上並不是加鎖,而是數據版本控制。樂觀鎖的出發點是假定並發錯誤發生的概率很小,從而允許程序並發執行。
首先,數據要有一個版本號,每次數據更新,要產生一個新的版本號。
其次,進入數據處理邏輯之前,記錄該數據的版本號,數據處理結束后,重新讀取數據,比較前后兩個版本號是否一致,如果一致,則提交,處理完成,如果不一致,說明產生了並發錯誤,則拋出異常或已其他方式終止程序執行,從而保證數據的一致性。
總結
lock是最常用的並發控制方式,Monitor的功能與lock類似,但使用復雜,非必須不建議使用。
Semaphore,信號量,是一個不錯的功能,特定應用場景下非常實用。
ConcurrentQueue 是一個線程安全的隊列,在多線程並發環境下使用,可避免由於並發引起的錯誤。(我們可以使用lock+Queue,實現ConcurrentQueue,自己感興趣可以試一下)
BlockingCollection 帶阻塞功能的 ConcurrentQueue ,沒有可用數據的情況下,進入等待狀態,防止循環訪問,減少CPU資源浪費。(我們可以通過Semaphore+ConcurrentQueue ,實現BlockingCollection ,自己感興趣可以試一下)
最后,祝大家祝編程快樂。