一、基元用戶模式和內核模式。
基元(Primitive):指代碼中可以使用的最簡單的構造。
有兩種基元構造:用戶模式(user-mode)和內核模式(kernel-mode)。
1,用戶模式。
它是用CPU指令來協調線程,這種協調是在硬件中發生的,所以速度會快於內核模式。但是也意味着,Windows操作系統永遠也檢測不到一個線程在一個基元用戶模式構造上阻塞了。由於在一個基元用戶模式構造上阻塞的線程永遠不認為已經阻塞,所以線程池不會創建新的線程來替換這種阻塞的線程。另外,這些CPU指令只是阻塞線程極短的時間。
缺點:只有Windows系統的內核才能停止一個線程的執行。用戶模式中的線程可能會被系統搶占,但很快就會被再次調度。如果一個線程想獲得資源又暫時取不到資源,它會一直在用戶模式中運行,這會大大浪費CPU的時間。
2,內核模式。
內核模式的構造是由Windows操作系統自身提供的。它們要求在應用程序的線程中調用操作系統內核的函數。將線程從用戶模式切換成內核模式(或相反)會導致巨大的新能損失,這也是為什么要避免使用內核模式的原因。
優點:一個線程使用一個內核模式的構造獲取一個其它線程擁有的資源時,Windows會阻塞線程,使它不浪費CPU的時間。然后當資源變得可用時,Windows會恢復線程,允許它訪問資源。
3,“活鎖”和“死鎖”
對於在一個構造上等待的線程,如果擁有這個構造的線程一直不釋放它,前者就可能一直阻塞。
“活鎖”:如果這是一個用戶模式的構造,線程將一直在CPU上運行,我們稱之為“活鎖”。
“死鎖”:如果這是一個內核模式的構造,線程將一直阻塞,我們稱之為“死鎖”。
“死鎖”總是優於“活鎖”,因為“活鎖”既浪費CPU時間又浪費內存,而“死鎖”只浪費內存。
4,原子操作
對簡單數據類型進行原子性的讀和寫。
比如:對於32位的cpu,4字節及以下是原子操作。64位的cpu,8字節及以下是原子操作。
如對於32位的cpu
class SomeType { public class Int32 x=0; }
我們對它進行賦值:
SomeType.x=0x01234567
x變量的值會一次性(原子性)從0x00000000變成0x01234567,這期間另一個線程不可能看到一個中間狀態的值。但是如果x是一個Int64的類型。
SomeType.x=0x0123456789abcdef
另一個線程查詢x的值,可能得到一個0x0123456700000000或0x0000000089abcdef。因為讀寫操作不是原子性的。對於32位的cpu必須分兩次寫入這個數據。
5,用戶模式構造
這種方式是非阻止的同步,主要原理就是用了上面描述的原子操作特性。因為它不刻意阻塞線程,所以速度非常快。Thread.VolatileRead、Thread.VolatileWrite、System.Threading.Interlocked類提供的方法,以及以及C#的volatile關鍵字都支持原子性的操作。
重中之重:上面提到的方法,並不是真正意義上的不阻塞,而是這個阻塞發生在cpu上,是用指令來協調的,阻塞時間非常短而已。例如:如果有5個線程同時訪問到Thread.VolatileRead方法,其中4個線程肯定會被阻塞。
5.1 VolatileRead和VolatileWrite
這兩個方法的解釋讓人非常迷惑,很不容易理解。從字面意思來看,就是進行易失性讀取。
對於Jeffrey總結的這條規則:當線程通過共享內存相互通信時,調用VolatileWrite來寫入最后一個值,調用VolatileRead來讀取第一個值。
上面的這條規則是怎么的出來的?難道因為Jeffrey是牛人,我們就不動腦袋盲目接受?適度的探尋是必要的。
先來看看Thread.MemoryBarrier這個方法的作用:它強迫按照程序的順序,之前的加載和存儲操作必須在MemoryBarrier方法之前完成;之后的加載和存儲操作必須在MemoryBarrier方法之后完成。這個方法是一個完整的柵欄(full fence),關於內存柵欄的概念可以google搜索。VolatileRead和VolatileWrite在內部都調用了這個類。
public static int VolatileRead(ref int address) { int num = address; MemoryBarrier(); return num; }
public static void VolatileWrite(ref int address, int value) { MemoryBarrier(); address = value; }
所以真正起作用的是Thread.MemoryBarrier方法:該方法可以阻止CPU指令的重新排列(也可阻止編譯器的優化),在調用MemoryBarrier之后的內存訪問不能在這之前就完成(也就是不能緩存的意思)。到現在明白了,MemoryBarrier方法后的變量訪問,都會去讀內存最新的值。
有了這個解釋,我們在來理解VolatileRead方法就相對容易了。在調用MemoryBarrier之前,它做了一步int num = address;這會造成到內存中去取address的值賦給num,並且因為下面調用了MemoryBarrier方法,所以這一步不能被編譯器優化掉,最后在MemoryBarrier方法后,返回這個最新的值。背后的實質就是利用了MemoryBarrier的特性,對要取的值做一步計算(簡單賦值),然后返回,每次調用這個函數它都會重新取值。
而VolatileWrite方法,它只調用了MemoryBarrier保證前面的代碼都執行了並寫入到了內存,最后寫入新值。所以,如果你的代碼和順序無關,或代碼就只有一句,你完全可以直接賦值,而不用調用這個方法。
有點混亂,再歸納2點:
1)調用這兩個方法,可以保證程序代碼的順序,因為寫入(write)一個值,其他線程可能馬上就會用這個值,所以要保證VolatileWrite放在函數塊的最后(這樣編譯器就不會優化代碼,移動代碼的順序)。以保證VolatileWrite前面的內容都正確的計算和存儲到內存中了。其他線程根據VolatileWrite寫的值,可能會用到我們剛才計算的內容,這樣就不會出錯。對於read一個值,把VolatileRead放在函數塊的最前面(個人覺得位置不是很重要),它在這里的主要作用是保證對變量的讀取是從內存中讀取。
2)這兩個方法中並沒有保證是不是原子操作,看反編譯代碼你就知道。所以要自己控制使用的變量類型。這和你的CPU是32和64位密切相關。(這一點有待進一步考證)
備注:從volatile關鍵字不支持Int64,double等64位類型,也可以間接推斷出這一點。盡管這兩個方法中提供了Int64,double等版本,但我覺得和cpu的位數是相關的。
案例1:對於一個可能被多線程訪問的變量x,如果你在另外一個線程中輪詢這個變量是否被改變,必須要用VolatileRead,以保證讀到的是內存中得最新值,否則可能會出現死循環。
private void VolatileRW() { m_stopWork = 0; Thread t = new Thread(new ParameterizedThreadStart(Worker)); t.Start(0); Thread.Sleep(5000); Thread.VolatileWrite(ref m_stopWork, 1);//設定m_stopWrok為1,這里和順序有關,這里應該用VolatileWrite,不要妄圖去猜想編譯器的優化順序 LogHelper.WriteInfo("Main thread waiting for worker to stop"); t.Join(); } private void Worker(object o) { int x = 0; //while (m_stopWork == 0) //如果這樣判定,m_stopWork被緩存后可能不會再去讀取內存的值(循序變量可能會被編譯器優化),所以可能會是個死循環 while (Thread.VolatileRead(ref m_stopWork) == 0)//用VolatileRead每次就會去讀新的值 { x++; } LogHelper.WriteInfo(string.Format("worker stoped:x={0}", x)); }
實際測試中,用release模式編譯,並且不用vs直接調試程序,上面的第一個while就會出現死循環。現在應該知道這兩個函數的作用了吧。
5.2 Interlocked類
這個類的每個方法執行的是一次原子性的讀取以及寫入操作。這個類的所有方法都建立了完整的內存柵欄(和Thread.MemoryBarrier一樣),保證了對內存的讀取是最新數據。並且它能對Int64,double等執行原子操作(內部做了處理,但不是lock處理,而是用循環判斷不成功就繼續嘗試),這一點和上面的兩個方法是有區別的。
練習1(Interlocked.Add,Interlocked.Decrement,Interlocked.Read):
private long m_threadNum = 5;//工作線程計數 private long m_Total = 0;//總和 private void InterlockTest() { //開啟5個線程,分別計算10以內的和 ThreadPool.QueueUserWorkItem(o => AddNumber(10)); ThreadPool.QueueUserWorkItem(o => AddNumber(10)); ThreadPool.QueueUserWorkItem(o => AddNumber(10)); ThreadPool.QueueUserWorkItem(o => AddNumber(10)); ThreadPool.QueueUserWorkItem(o => AddNumber(10)); //上面5個都計算完了,打印出結果 ThreadPool.QueueUserWorkItem(o => DisplaySum()); } private void AddNumber(int num) { int sum = 0; while (num > 0) { sum += num; num--; } Console.WriteLine("thread {0} ok. sum={1}", Thread.CurrentThread.ManagedThreadId, sum); Interlocked.Add(ref m_Total, sum);//把結果加到總和上面 Interlocked.Decrement(ref m_threadNum);//線程計數減一 } private void DisplaySum() { while (Interlocked.Read(ref m_threadNum) != 0) { //其他線程沒有做完就在這里自旋,故意浪費cpu時間 } //都做完了就輸出結果 Console.WriteLine("thread {0} all done,total sum = {1}", Thread.CurrentThread.ManagedThreadId, m_Total); }
運行結果:
thread 7 ok. sum=55 thread 11 ok. sum=55 thread 7 ok. sum=55 thread 11 ok. sum=55 thread 7 ok. sum=55 thread 11 all done,total sum = 275
這里的thread ID為7和11,感覺好像只有兩個線程一樣,其實不是這樣的。因為這里用了Theadpool,由於我們的計算量太小(10的累加),所以CLR重復利用了空閑的線程,這也是為什么倡導多用線程池的原因。
另外:上面代碼的Interlocked.Read方法只能用於Int64位的讀取,所以對於Int32等應該用Thread.VolatileRead方法會比較合理。
練習2:Interlocked.Exchange方法用於單例模式出現的問題
private void InterExchangeTest() { //開啟5個線程 new Thread(() => { Singleton.Instance().ToString(); }).Start(); new Thread(() => { Singleton.Instance().ToString(); }).Start(); new Thread(() => { Singleton.Instance().ToString(); }).Start(); new Thread(() => { Singleton.Instance().ToString(); }).Start(); new Thread(() => { Singleton.Instance().ToString(); }).Start(); } private class Singleton { private static Singleton _instance; private Guid _id; private Singleton(Guid id) { this._id = id; } public override string ToString() { return _id.ToString();//為了便於標識對象,這里用了Guid來表示 } public static Singleton Instance() { if (_instance == null) { Singleton temp = new Singleton(Guid.NewGuid()); Console.WriteLine("temp:" + temp.ToString()); if (Interlocked.Exchange(ref _instance, temp) == null) //如果是null,代表是第一次初始化值,否者不是 { Console.WriteLine("single thead entered.thread id ={0}", Thread.CurrentThread.ManagedThreadId); } else { Console.WriteLine("other thead entered.thread id ={0}", Thread.CurrentThread.ManagedThreadId); } Console.WriteLine("_instace:" + _instance.ToString()); } return _instance; } }
上面的代碼用Interlocked.Exchange方法來初始化單例對象,這個方法返回的是改變之前的值。如果返回的是null,則代表是第一次訪問;如果不是null,則一定是多個線程進入到了代碼段,第一個線程改變了_instance變量,其他線程訪問時返回的就不是null了,但它們還會繼續覆蓋初始化好的值。測試中確實有多個線程進入,實例被多次初始化了:
temp:1d20c10a-be7b-4352-ade7-bb15e0a6ee38 single thead entered.thread id =13 _instace:1d20c10a-be7b-4352-ade7-bb15e0a6ee38//_instance第一次被初始化 temp:e333502c-757d-4526-b3c3-74f052ae0951 other thead entered.thread id =12 _instace:e333502c-757d-4526-b3c3-74f052ae0951//_instance第二次被改變
上面的問題出在Interlocked.Exchange方法會強制每個線程對_instance變量賦值,這會導致后面的線程覆蓋了前面線程創建號的對象,這和單例模式相違背了。解決這個問題的關鍵在於,當_instance不為null時,我們希望不要強制交換值。幸好,Interlocked類提供了一個可用於比較后再賦值的原子操作方法CompareExchange,下面對上面的代碼進行一個小改動:
private class Singleton { private static Singleton _instance; private Guid _id; private Singleton(Guid id) { this._id = id; } public override string ToString() { return _id.ToString();// } public static Singleton Instance() { if (_instance == null) { Singleton temp = new Singleton(Guid.NewGuid()); Console.WriteLine("temp:" + temp.ToString()); if (Interlocked.CompareExchange(ref _instance, temp, null) == null) //這里換用compareExchange方法 { Console.WriteLine("single thead entered.thread id ={0}", Thread.CurrentThread.ManagedThreadId); } else { Console.WriteLine("other thead entered.thread id ={0}", Thread.CurrentThread.ManagedThreadId); } Console.WriteLine("_instace:" + _instance.ToString()); } return _instance; }
CompareExchange方法,判斷原始值是我們期望的值時,才進行交換。如這句Interlocked.CompareExchange(ref _instance, temp, null),我們就是希望_instance為null是才把temp賦給它。運行結果:
temp:6be7b8d8-ff1e-49b0-abf4-fafbf89c40e7 single thead entered.thread id =12 _instace:6be7b8d8-ff1e-49b0-abf4-fafbf89c40e7//第一次初始化 temp:90cc00e8-98be-4b44-be22-9957384dfd86 other thead entered.thread id =11 _instace:6be7b8d8-ff1e-49b0-abf4-fafbf89c40e7//第二次有其他線程進來,它的值還是第一次的值,並沒有被覆蓋
特別注意:可能你會認為下面的代碼和CompareExchange是等價的,這是不對的!
if (_instance == null) { //這里仍然可能有多個線程進來 Interlocked.Exchange(ref _instance, temp); }
CompareExchange相當於把判斷放在了函數內部,但它是一個原子性的操作,其他線程進不來的。
有了以上的驗證,我們實現單例模式完全可以這樣寫,而不用lock:
private class Singleton { private static Singleton _instance; private Singleton(){} public static Singleton Instance() { if (_instance == null) { Singleton temp = new Singleton(); Interlocked.CompareExchange(ref _instance, temp, null); } return _instance; } }
5.3 volatile關鍵字
這是為簡化VolatileWrite和VolatileRead的編程而提供的關鍵字。對於標注為volatile的變量:就是表明該變量可能會被多線程訪問,每次訪問該變量都應從內存中讀取新值,而不應該用寄存器中保留的值。對於多cpu的機器而言,每個cpu的寄存器可能都需要刷新,在一定程度上會損害部分性能。(上面的幾個方法同樣會有這個問題)
private volatile bool m_finish = false; private void VolatileKeywordTest() { m_finish = false; ThreadPool.QueueUserWorkItem(o => WaitingForFinish()); Thread.Sleep(5000); m_finish = true; } private void WaitingForFinish() { while (m_finish == false) { } Console.WriteLine("the work is down"); }
有了這個關鍵字,我們就不需要再用VolatileRead方法去讀取一個變量了。
下一篇,繼續寫基元線程的阻塞同步(內核模式)。
主要參考:
CLR Via C# 第三版