使用C++11原子量實現自旋鎖


一、自旋鎖

自旋鎖是一種基礎的同步原語,用於保障對共享數據的互斥訪問。與互斥鎖的相比,在獲取鎖失敗的時候不會使得線程阻塞而是一直自旋嘗試獲取鎖。當線程等待自旋鎖的時候,CPU不能做其他事情,而是一直處於輪詢忙等的狀態。自旋鎖主要適用於被持有時間短,線程不希望在重新調度上花過多時間的情況。實際上許多其他類型的鎖在底層使用了自旋鎖實現,例如多數互斥鎖在試圖獲取鎖的時候會先自旋一小段時間,然后才會休眠。如果在持鎖時間很長的場景下使用自旋鎖,則會導致CPU在這個線程的時間片用盡之前一直消耗在無意義的忙等上,造成計算資源的浪費。

二、CAS操作實現自旋鎖

CAS(Compare and Swap),即比較並替換,實現並發算法時常用到的一種技術,這種操作提供了硬件級別的原子操作(通過鎖總線的方式)。CAS操作的原型可以認為是:

bool CAS(V, A, B)

其中V代表內存中的變量,A代表期待的值,B表示新值。當V的值與A相等時,將V與B的值交換。邏輯上可以用下面的偽代碼表示:

bool CAS(V, A, B)
{
    if (V == A)
    {
        swap(V, B);
        return true;
    }
    
    return false;
}

需要強調的是上面的操作是原子的,要么不做,要么全部完成。

那么已經擁有CAS操作的情況下如何實現一個自旋鎖呢?首先回憶自旋鎖的用途,本質上我們是希望能夠讓一個線程在不滿足進入臨界區的條件時,不停的忙等輪詢,直到可以運行的時候再繼續(進入臨界區)執行。那么,我們可能自然的想到使用一個bool變量來表示是否可以進入臨界區,例如以下面的偽代碼的邏輯:

while(flag == true);
flag = true;
/*
do something ...
*/
flag = false;
    ...

這樣做的直觀想法是當flag為true的時候表示已經有線程處於臨界區內,只有當flag為fasle時才能進入,而在進入的時候立即將flag置為true。但是這樣做明顯存在一個問題,判斷flag為false和設置flag為true並不是一個不可分割的整體,有可能出現類似下面這樣的時序, 假設最初flag為false:

step thread 1 thread 2
1 while(flag == true);
2 while(flag == true);
3 flag = true
4 flag = true
5 do something do something
6 flag = false
7 flag = false

step是虛構的步驟,do something為一系列指令,這里寫在一起表示並發執行。這里可以看出由於thread1讀取判斷flag的值與修改flag的值是兩個獨立的操作,中間插入了thread2的判斷操作,最終使得有兩個線程同時進入了臨界區,這與我們的期望相悖。那么如何解決呢?如果能將讀取判斷與修改的操作合二為一,變成一個不可分割的整體,那么自然就不可能出現這種交錯的場景。對於這樣一個整體操作,我們希望它能讀取內存中變量的值,並且當其等於特定值的時候,修改它為我們需要的另一個值。嗯......沒錯,這樣我們就得到了CAS操作。

現在可以重新修改我們的同步方式,不停的進行期望flag為false的CAS操作 CAS(flag, flase, b) (這里b為true),直到其返回成功為止,再進行臨界區中的操作,離開臨界區時將flag置為false。

b = true;
while(!CAS(flag, false, b));
//do something
flag = false;

現在,判斷操作與寫入操作已經成為了一個整體,當一個線程的CAS操作成功的時候會阻止其他線程進入臨界區,到達互斥訪問的目的。

現在我們已經可以使用CAS操作來解決臨界區的互斥訪問的問題了,但是如果每次都這樣寫一遍實在太過麻煩,因此可以進行一些封裝使得使用更加方便,也就是說...可以封裝成自旋鎖。我們可以用一個類來表示,將一個bool值作為類的數據成員,同時將CAS操作和賦值操作作為其成員函數,CAS操作其實就是加鎖操作,而后面的賦值操作就是解鎖操作。

三、用C++原子量實現

按照上面的思路,接下來用 C++ 11 引入標准庫的原子量來實現一個自旋鎖並且進行測試。

首先,我們需要一個bool值來表示鎖的狀態,這里直接使用標准庫中的原子量 atomic<bool> (C++ 11的原子量可以參考:https://www.cnblogs.com/FateTHarlaown/p/8919235.html) ,在我的平台(Cygwin64、GCC7.3)上 atomic<bool> 的成員函數is_lock_free()返回值為true,是無鎖的實現(如果內部使用了鎖來實現的話那還叫什么自旋鎖 = =)。實際上在大多數平台上 atomic<bool> 都是無鎖的,如果不確定的話也可以使用C++標准規定必須為無鎖實現的atomic_flag。

接下來,我們需要兩個原子操作,CAS和賦值,C++11標准庫在原子量的成員函數中直接提供了這兩個操作。

//CAS
std::atomic::compare_exchange_weak( T& expected, T desired,
                                    std::memory_order order =
                                    std::memory_order_seq_cst ),
                                    
std::atomic::compare_exchange_strong( T& expected, T desired,
                                    std::memory_order order =
                                    std::memory_order_seq_cst )
//賦值
void store( T desired, std::memory_order order = std::memory_order_seq_cst )

compare_exchange_weak 與 compare_exchange_strong 主要的區別在於內存中的值與expected相等的時候,CAS操作是否一定能成功,compare_exchange_weak有概率會返回失敗,而compare_exchange_strong則一定會成功。因此,compare_exchange_weak必須與循環搭配使用來保證在失敗的時候重試CAS操作。得到的好處是在某些平台上compare_exchange_weak性能更好。按照上面的模型,我們本來就要和while搭配使用,可以使用compare_exchange_weak。最后內存序的選擇沒有特殊需求直接使用默認的std::memory_order_seq_cst。而賦值操作非常簡單直接,這個調用一定會成功(只是賦值而已 = =),沒有返回值。
實現代碼非常短,下面是源代碼:

#include <atomic>

class SpinLock {

public:
    SpinLock() : flag_(false)
    {}

    void lock()
    {
        bool expect = false;
        while (!flag_.compare_exchange_weak(expect, true))
        {
            //這里一定要將expect復原,執行失敗時expect結果是未定的
            expect = false;
        }
    }

    void unlock()
    {
        flag_.store(false);
    }

private:
    std::atomic<bool> flag_;
};

如上面所說,lock操作不停的嘗試CAS操作直到成功為止,unlock操作則將bool標志位復原。使用方式如下:

SpinLock myLock;
myLock.lock();

//do something

myLock.unlock();

接下來,我們進行正確性測試,以經典的i++ 問題為例:

#include <atomic>
#include <thread>
#include <vector>

//自旋鎖類定義
class SpinLock {

public:
    SpinLock() : flag_(false)
    {}

    void lock()
    {
        bool expect = false;
        while (!flag_.compare_exchange_weak(expect, true))
        {
            expect = false;
        }
    }

    void unlock()
    {
        flag_.store(false);
    }

private:
    std::atomic<bool> flag_;
};

//每個線程自增次數
const int kIncNum = 1000000;
//線程數
const int kWorkerNum = 10;
//自增計數器
int count = 0;
//自旋鎖
SpinLock spinLock;
//每個線程的工作函數
void IncCounter()
{
    for (int i = 0; i < kIncNum; ++i)
    {
        spinLock.lock();
        count++;
        spinLock.unlock();
    }
}

int main()
{
    std::vector<std::thread> workers;
    std::cout << "SpinLock inc MyTest start" << std::endl;
    count = 0;

    std::cout << "start " << kWorkerNum << " workers_" << "every worker inc " << kIncNum << std::endl;
    std::cout << "count_: " << count << std::endl;
    //創建10個工作線程進行自增操作
    for (int i = 0; i < kWorkerNum; ++i)
        workers.push_back(std::move(std::thread(IncCounter)));

    for (auto it = workers.begin(); it != workers.end(); it++)
        it->join();

    std::cout << "workers_ end" << std::endl;
    std::cout << "count_: " << count << std::endl;
    //驗證結果
    if (count == kIncNum * kWorkerNum)
    {
        std::cout << "SpinLock inc MyTest passed" << std::endl;
        return true;
    }
    else
    {
        std::cout << "SpinLock inc MyTest failed" << std::endl;
        return false;
    }

    return 0;
}

上面的代碼中創建了10個線程對共享的全局變量count分別進行一百萬次++操作,然后驗證結果是否正確,最終執行的輸出為:

SpinLock inc MyTest start
start 10 workers_every worker inc 1000000
count_: 0
workers_ end
count_: 10000000
SpinLock inc MyTest passed

從結果中可以看出我們實現的自旋鎖起到了保護臨界區(這里就是i++ )的作用,count最后的值等於每個線程執行自增的數目之和。作為對比,可以去掉IncCounter中的加鎖解鎖操作:

void IncCounter()
{
    for (int i = 0; i < kIncNum; ++i)
    {
        //spinLock.lock();
        count++;
        //spinLock.unlock();
    }
}

執行后的輸出為:

SpinLock inc MyTest start
start 10 workers_every worker inc 1000000
count_: 0
workers_ end
count_: 7254522
SpinLock inc MyTest failed

結果由於多個線程同時執行 i++ 造成結果錯誤。

到這里,我們就通過 C++ 11的原子量實現了一個簡單的自旋鎖。這里只是對C++原子量的一個小使用,無論是自旋鎖本身還是原子量都還有許多值得探究的地方。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM