一、自旋鎖
自旋鎖是一種基礎的同步原語,用於保障對共享數據的互斥訪問。與互斥鎖的相比,在獲取鎖失敗的時候不會使得線程阻塞而是一直自旋嘗試獲取鎖。當線程等待自旋鎖的時候,CPU不能做其他事情,而是一直處於輪詢忙等的狀態。自旋鎖主要適用於被持有時間短,線程不希望在重新調度上花過多時間的情況。實際上許多其他類型的鎖在底層使用了自旋鎖實現,例如多數互斥鎖在試圖獲取鎖的時候會先自旋一小段時間,然后才會休眠。如果在持鎖時間很長的場景下使用自旋鎖,則會導致CPU在這個線程的時間片用盡之前一直消耗在無意義的忙等上,造成計算資源的浪費。
二、CAS操作實現自旋鎖
CAS(Compare and Swap),即比較並替換,實現並發算法時常用到的一種技術,這種操作提供了硬件級別的原子操作(通過鎖總線的方式)。CAS操作的原型可以認為是:
bool CAS(V, A, B)
其中V代表內存中的變量,A代表期待的值,B表示新值。當V的值與A相等時,將V與B的值交換。邏輯上可以用下面的偽代碼表示:
bool CAS(V, A, B)
{
if (V == A)
{
swap(V, B);
return true;
}
return false;
}
需要強調的是上面的操作是原子的,要么不做,要么全部完成。
那么已經擁有CAS操作的情況下如何實現一個自旋鎖呢?首先回憶自旋鎖的用途,本質上我們是希望能夠讓一個線程在不滿足進入臨界區的條件時,不停的忙等輪詢,直到可以運行的時候再繼續(進入臨界區)執行。那么,我們可能自然的想到使用一個bool變量來表示是否可以進入臨界區,例如以下面的偽代碼的邏輯:
while(flag == true);
flag = true;
/*
do something ...
*/
flag = false;
...
這樣做的直觀想法是當flag為true的時候表示已經有線程處於臨界區內,只有當flag為fasle時才能進入,而在進入的時候立即將flag置為true。但是這樣做明顯存在一個問題,判斷flag為false和設置flag為true並不是一個不可分割的整體,有可能出現類似下面這樣的時序, 假設最初flag為false:
step | thread 1 | thread 2 |
---|---|---|
1 | while(flag == true); | |
2 | while(flag == true); | |
3 | flag = true | |
4 | flag = true | |
5 | do something | do something |
6 | flag = false | |
7 | flag = false |
step是虛構的步驟,do something為一系列指令,這里寫在一起表示並發執行。這里可以看出由於thread1讀取判斷flag的值與修改flag的值是兩個獨立的操作,中間插入了thread2的判斷操作,最終使得有兩個線程同時進入了臨界區,這與我們的期望相悖。那么如何解決呢?如果能將讀取判斷與修改的操作合二為一,變成一個不可分割的整體,那么自然就不可能出現這種交錯的場景。對於這樣一個整體操作,我們希望它能讀取內存中變量的值,並且當其等於特定值的時候,修改它為我們需要的另一個值。嗯......沒錯,這樣我們就得到了CAS操作。
現在可以重新修改我們的同步方式,不停的進行期望flag為false的CAS操作 CAS(flag, flase, b) (這里b為true),直到其返回成功為止,再進行臨界區中的操作,離開臨界區時將flag置為false。
b = true;
while(!CAS(flag, false, b));
//do something
flag = false;
現在,判斷操作與寫入操作已經成為了一個整體,當一個線程的CAS操作成功的時候會阻止其他線程進入臨界區,到達互斥訪問的目的。
現在我們已經可以使用CAS操作來解決臨界區的互斥訪問的問題了,但是如果每次都這樣寫一遍實在太過麻煩,因此可以進行一些封裝使得使用更加方便,也就是說...可以封裝成自旋鎖。我們可以用一個類來表示,將一個bool值作為類的數據成員,同時將CAS操作和賦值操作作為其成員函數,CAS操作其實就是加鎖操作,而后面的賦值操作就是解鎖操作。
三、用C++原子量實現
按照上面的思路,接下來用 C++ 11 引入標准庫的原子量來實現一個自旋鎖並且進行測試。
首先,我們需要一個bool值來表示鎖的狀態,這里直接使用標准庫中的原子量 atomic<bool> (C++ 11的原子量可以參考:https://www.cnblogs.com/FateTHarlaown/p/8919235.html) ,在我的平台(Cygwin64、GCC7.3)上 atomic<bool> 的成員函數is_lock_free()返回值為true,是無鎖的實現(如果內部使用了鎖來實現的話那還叫什么自旋鎖 = =)。實際上在大多數平台上 atomic<bool> 都是無鎖的,如果不確定的話也可以使用C++標准規定必須為無鎖實現的atomic_flag。
接下來,我們需要兩個原子操作,CAS和賦值,C++11標准庫在原子量的成員函數中直接提供了這兩個操作。
//CAS
std::atomic::compare_exchange_weak( T& expected, T desired,
std::memory_order order =
std::memory_order_seq_cst ),
std::atomic::compare_exchange_strong( T& expected, T desired,
std::memory_order order =
std::memory_order_seq_cst )
//賦值
void store( T desired, std::memory_order order = std::memory_order_seq_cst )
compare_exchange_weak 與 compare_exchange_strong 主要的區別在於內存中的值與expected相等的時候,CAS操作是否一定能成功,compare_exchange_weak有概率會返回失敗,而compare_exchange_strong則一定會成功。因此,compare_exchange_weak必須與循環搭配使用來保證在失敗的時候重試CAS操作。得到的好處是在某些平台上compare_exchange_weak性能更好。按照上面的模型,我們本來就要和while搭配使用,可以使用compare_exchange_weak。最后內存序的選擇沒有特殊需求直接使用默認的std::memory_order_seq_cst。而賦值操作非常簡單直接,這個調用一定會成功(只是賦值而已 = =),沒有返回值。
實現代碼非常短,下面是源代碼:
#include <atomic>
class SpinLock {
public:
SpinLock() : flag_(false)
{}
void lock()
{
bool expect = false;
while (!flag_.compare_exchange_weak(expect, true))
{
//這里一定要將expect復原,執行失敗時expect結果是未定的
expect = false;
}
}
void unlock()
{
flag_.store(false);
}
private:
std::atomic<bool> flag_;
};
如上面所說,lock操作不停的嘗試CAS操作直到成功為止,unlock操作則將bool標志位復原。使用方式如下:
SpinLock myLock;
myLock.lock();
//do something
myLock.unlock();
接下來,我們進行正確性測試,以經典的i++ 問題為例:
#include <atomic>
#include <thread>
#include <vector>
//自旋鎖類定義
class SpinLock {
public:
SpinLock() : flag_(false)
{}
void lock()
{
bool expect = false;
while (!flag_.compare_exchange_weak(expect, true))
{
expect = false;
}
}
void unlock()
{
flag_.store(false);
}
private:
std::atomic<bool> flag_;
};
//每個線程自增次數
const int kIncNum = 1000000;
//線程數
const int kWorkerNum = 10;
//自增計數器
int count = 0;
//自旋鎖
SpinLock spinLock;
//每個線程的工作函數
void IncCounter()
{
for (int i = 0; i < kIncNum; ++i)
{
spinLock.lock();
count++;
spinLock.unlock();
}
}
int main()
{
std::vector<std::thread> workers;
std::cout << "SpinLock inc MyTest start" << std::endl;
count = 0;
std::cout << "start " << kWorkerNum << " workers_" << "every worker inc " << kIncNum << std::endl;
std::cout << "count_: " << count << std::endl;
//創建10個工作線程進行自增操作
for (int i = 0; i < kWorkerNum; ++i)
workers.push_back(std::move(std::thread(IncCounter)));
for (auto it = workers.begin(); it != workers.end(); it++)
it->join();
std::cout << "workers_ end" << std::endl;
std::cout << "count_: " << count << std::endl;
//驗證結果
if (count == kIncNum * kWorkerNum)
{
std::cout << "SpinLock inc MyTest passed" << std::endl;
return true;
}
else
{
std::cout << "SpinLock inc MyTest failed" << std::endl;
return false;
}
return 0;
}
上面的代碼中創建了10個線程對共享的全局變量count分別進行一百萬次++操作,然后驗證結果是否正確,最終執行的輸出為:
SpinLock inc MyTest start
start 10 workers_every worker inc 1000000
count_: 0
workers_ end
count_: 10000000
SpinLock inc MyTest passed
從結果中可以看出我們實現的自旋鎖起到了保護臨界區(這里就是i++ )的作用,count最后的值等於每個線程執行自增的數目之和。作為對比,可以去掉IncCounter中的加鎖解鎖操作:
void IncCounter()
{
for (int i = 0; i < kIncNum; ++i)
{
//spinLock.lock();
count++;
//spinLock.unlock();
}
}
執行后的輸出為:
SpinLock inc MyTest start
start 10 workers_every worker inc 1000000
count_: 0
workers_ end
count_: 7254522
SpinLock inc MyTest failed
結果由於多個線程同時執行 i++ 造成結果錯誤。
到這里,我們就通過 C++ 11的原子量實現了一個簡單的自旋鎖。這里只是對C++原子量的一個小使用,無論是自旋鎖本身還是原子量都還有許多值得探究的地方。