第31課 std::atomic原子變量


一. std::atomic_flag和std::atomic

(一)std::atomic_flag

  1. std::atomic_flag是一個bool類型的原子變量,它有兩個狀態set和clear,對應着flag為true和false。

  2. std::atomic_flag使用前必須被ATOMIC_FLAG_INIT初始化,此時的flag為clear狀態,相當於靜態初始化。

  3. 三個原子化操作

  (1)test_and_set():檢查當前flag是否被設置。若己設置直接返回true,若沒設置則將flag置為true ,並返回false

  (2)clear();清除flag標志,即flag=false。

  (3)析構函數

  4. 和所有atomic類型一樣,std::atomic_flag不支持拷貝和賦值等操作。因為賦值和拷貝調用了兩個對象,從第一個對象中讀值,然后再寫入另一個。對於兩個獨立的對象,這里就有兩個獨立的操作,合並這兩個操作必定不是原子的。

  5. std::atomic_flag 類型不提供is_lock_free()。 該類型是一個簡單的布爾標志, 並且在這種類型上的操作都是無鎖的。但atomic_flag的可操作性不強,導致其應用局限性,還不如std::atomic<bool>。

(二)std::atomic<T>模板類

 

  1. std::atomic_flag是無鎖類型的,但是atomic<bool>不一定是lock free的,可以用atomic<T>::is_lock_free()來判斷。通常情況下,編譯器不會為std::atomic<UDT>生成無鎖代碼,所有操作使用一個內部鎖(UDT為用戶自定義類型,如果其類型大小如同int或void*時,大多數平台仍會使用原子指令)。

  2. fetch_系列函數返回的是舊值,復合賦值運算返回的是新值但它們返回的都不是引用類型。因為任何依賴與這個結果的代碼都需要顯式加載該值。潛在的問題是,結果可能會被其他線程修改。而通過非原子值進行賦值,可以避免多余的加載過程,並且得到實際存儲的值。

  3. compare_exchange_weak/strong函數可以保證“比較-交換”的原子化。compare_exchange_weak可能失敗,即此函數可能與expected值相等的情形下atomic的T值沒有替換為disired(atomic值未變)且返回false,這可能發生在缺少單條CAS操作(“比較-交換”指令)的機器上,所以通常使用一個循環中。

bool compare_exchange_strong(T& expected, const T desired) volatile noexcept {
    if (*this == expected)
       *this = desired;
    else
      expected = *this;
  
    return (*this == expected)
}
compare_exchange_strong偽代碼

  4. 整數原子類型沒有乘法、除法、位移操作,因為整數原子類型通常用於計數或者掩碼等,如果需要,可以將compare_change_weak()放入循環中完成。

  5. std::atomic<UDT>,UDT類不能有任何虛函數或虛基類,以及必須使用編譯器創建的拷貝賦值操作。所有的基類和非靜態數據成員也都需要支持拷貝賦值操作。其比較-交換操作就類似於memcmp使用按位比較,而非為UDT類定義一個比較操作符。

  6. 每個函數的操作都有一個內存排序參數。這個參數可以用來指定存儲的順序。

  (1)store操作,可選如下順序:memory_order_relaxed、memory_order_release、memory_order_seq_cst。

  (2)load操作,可選順序:memory_order_relaxed、memory_order_consume、memory_order_acquire和memory_order_seq_cst。

  (3)讀-改-寫操作(RMW):memory_order_relaxed、memory_order_consume、memory_order_acquire、memory_order_release、memory_order_acq_rel和memory_order_seq_cst。

【編程實驗】std::atomic<T> 初體驗

#include <atomic>
#include <thread>
#include <iostream>
#include <vector>
#include <sstream>
#include <mutex>
#include <cassert>

std::mutex io_mutex;
class Foo {};

//自己封裝的自旋鎖
class spinlock
{
    std::atomic_flag flag = ATOMIC_FLAG_INIT;
public:
    void lock()
    {
        while (flag.test_and_set(std::memory_order_acquire)) 
        {
            //std::this_thread::yield(); //這里應注釋掉,否則會引起線程切換,成本較高達不到自
                                         //旋鎖要的效率
        }
    }

    void unlock()
    {
        flag.clear(std::memory_order_release);
    }
};

spinlock g_lock;
std::stringstream stream;
void append_number(int x)
{
    g_lock.lock();

    stream << "thread(" << std::this_thread::get_id() << "): " << x << "\n";

    g_lock.unlock();
}

std::vector<int> data;
std::atomic<bool> data_read(false);
void reader_thread()
{
    while (!data_read.load())
    {
        std::this_thread::sleep_for(std::chrono::microseconds(1));
    }

    std::cout << "The answer=" << data[0] << std::endl; //1
}

void writer_thread()
{
    data.push_back(42); //2。 由於1和2處發生了data race,所以使用atomic<boo>進行同步。
    data_read = true;
}

std::atomic<int> g_count = 0;
void test()
{
    for (int i = 0; i < 100000; ++i)
    {
        //count++; //原子操作
        //g_count += 1; //原子操作

        g_count = g_count + 1; //注意,不是原子操作
    }
}

//線程安全的簡單鏈表
struct linkList
{
    struct Node
    {
        int value;
        Node* next;
    };

    void append(int val)
    {
        Node* newNode = new Node{ val,list_head }; //將頭結點移動到newNode的后面(即next下)
                                                  //注意,由於線程的data race,這里可能出現多個線程
                                                  //同時創建新結點,並將newNode->next = list_head

        // 頭插法
        //這里需要用while循環,因為第1個搶到compare_exchange_strong的線程,會成功將list_head改為
        //newNode,使得newNode成為新的list_head(頭插法) 。而其余線程第1次調用compare_exchange_strong
        //時會因為newNode->next不等於新的list_head而失敗,該函數會將新的list_head掛在newNode->next下面,
        //如此當再次循環時,就能夠成功將newNode設置為新的list_head(這里可能再次遇到data race,會重復上述
        //過程,直到成功)。
        while (!(list_head.compare_exchange_strong(newNode->next, newNode))); //函數體為空
    }

    void print()
    {
        for (Node* it = list_head; it != nullptr; it = it->next)
        {
            std::cout << " " << it->value;
        }

        std::cout << std::endl;
    }

    void clear()
    {
        Node* it;
        while (it = list_head)
        {
            list_head = it->next;
            delete it;
        }
    }

    std::atomic<Node*> list_head = nullptr;
};

//實現一個無鎖的線程安全棧
template<typename T>
class stack
{
    struct node
    {
        std::shared_ptr<T> data;
        node* next;
        node(const T& d) :next(nullptr), data(std::make_shared<T>(d)) {} //用shared_ptr指向新分配出來的T
    };
private:
    std::atomic<node*> head;
private:
    std::atomic<unsigned> threads_in_pop; //正在調用pop函數的線程數量
    std::atomic<node*> toDeleted;

public:
    void push(const T& data)
    {
        node* newNode = new node(data); //創建新節點
        newNode->next = head.load();  //將原head移到新節點的后驅節點。這里可能出現多線程
                                      //同時將head節點作為其后驅

        //compare_exchange_weak是原子操作。先進行比較再交換。由於多線程的競爭,每次只允許
        //一個線程compared_exchange_weak,第1個搶到的線程會交換成功,成為新的head。而其它
        //線程則會失敗,並將newNode的后驅改為新的head並繼續循環比較-交換操作,直至所有
        //線程的push操作結束。
        while (!head.compare_exchange_weak(newNode->next, newNode)); //新節點變為新的head
    }

    std::shared_ptr<T> pop()
    {
        ++threads_in_pop; //引用計數,統計有多少個線程正在使用該函數
        node* oldHead = head.load(); //① 多線程時,會出現head被多個線程搶到。

        //每次只有一個線程可以執行compare_exchange_weak操作,並將head的后驅變成新的head
        //其它線程會返回失敗,並重置oldhead為新的head,然后開始另一輪的循環。
        while (oldHead && !head.compare_exchange_weak(oldHead, oldHead->next));

        //以下存在內存泄漏,雖然oldHead->data被取走了,但oldHead這個結點本身沒被釋放!
        //return oldHead ? oldHead->data : std::shared_ptr<T>();//由於①處oldHead的競爭,其他線程可能正在使
                                                                //用該節點,這里如果直接delete oldHead可能造
                                                                //成未定義行為,怎么辦?
        //解決方案:
        std::shared_ptr<T> res;
        if (oldHead) res.swap(oldHead->data); //從節點中提取數據

        //當多線程同時調用pop時,先將這個要刪除的結點加入到待刪除節點鏈表中,然后等
        //最后一個線程pop時,再將這個鏈表delete。
        try_reclaim(oldHead); //嘗試回收節點

        return res;
    }

    //刪除某個節點
    static void delete_nodes(node* nodes)
    {
        while (nodes) {
            node* next = nodes->next;
            delete nodes;
            nodes = next;
        }
    }

    void try_reclaim(node* old_head)
    {
        if (threads_in_pop == 1) //只有當前線程使用。沒有競爭,可以直接刪除old_head
        {
            node* nodes_to_delete = toDeleted.exchange(nullptr);//返回toDeleted舊值
            if (!--threads_in_pop) //雙重檢查,因為在上面的賦值時,可能另一個線程再pop()進來
            {
                delete_nodes(nodes_to_delete); //確實只剩一個線程,就刪除待刪除節點鏈表
            }
            else if (nodes_to_delete) {
                chain_pending_nodes(nodes_to_delete);//將已存在的鏈表鏈接到刪除鏈表后面
            }

            delete old_head; //刪除節點本身
        }
        else {
            chain_pending_node(old_head); //將節點加入到待刪除鏈表中(頭插法)
            --threads_in_pop;
        }
    }

    //將已存在的鏈表鏈接到刪除鏈表后面
    void chain_pending_nodes(node* nodes)
    {
        node* last = nodes;
        while (node * next = last->next) { //讓next指針指向鏈表的末尾
            last = next;
        }

        chain_pending_nodes(nodes, last);
    }

    void chain_pending_nodes(node* first, node* last)
    {
        last->next = toDeleted; //將原鏈表放在last->next下。
        while (!toDeleted.compare_exchange_weak(last->next, first)); //first成為新的鏈表頭部
    }

    void chain_pending_node(node* n)
    {
        chain_pending_nodes(n, n);
    }
};

int main()
{
    //1. 自旋瑣的實現
    std::vector<std::thread> threads;

    for (int i = 1; i < 10; ++i) {
        threads.push_back(std::thread(append_number, i));
    }

    for (auto& th : threads) th.join();

    std::cout << stream.str();

    //2. std::atomic<bool>的使用
    std::thread reader(reader_thread);
    std::thread writer(writer_thread);
    reader.join();
    writer.join();

    //3. std::atomic<T*>的使用(指針的原子相加減操作)
    Foo fArr[5];
    std::atomic<Foo*> pF(fArr);
    Foo* x = pF.fetch_add(2); //fetch_add返回舊值,即x==fArr
    assert(x == fArr); 
    
    x = (pF -= 1); //x ==pF ==  &fArr[1];
    assert(x == &fArr[1]);
    assert(pF.load() == &fArr[1]);

    //4. 線程安全的無鎖結構
    //4.1  無鎖鏈表
    std::vector<std::thread> ths;
    linkList lst;
    for (int i = 0; i < 10; ++i)
        ths.push_back(std::thread(&linkList::append, &lst, i));
    for (auto& th : ths) th.join();

    //打印內容
    lst.print();
    lst.clear();

    //4.2 無鎖棧結構
    stack<int> st;
    std::vector<std::thread> producers;

    for (int i = 0; i < 10; ++i)
    {
        producers.emplace_back([&st, i]() {st.push(i); });
    }

    for (auto& p : producers) p.join();

    std::vector<std::thread> consumers;
    for (int i = 0; i < 10; ++i)
    {
        consumers.emplace_back([&st]() {
            std::lock_guard<std::mutex> lck(io_mutex);

            std::cout << *st.pop() << " ";
            });
    }

    for (auto& c : consumers) c.join();
    std::cout << std::endl;

    //5.原子和非原子的加減操作
    std::thread t1(test);
    std::thread t2(test);
    t1.join();
    t2.join();

    std::cout << g_count << std::endl;

    return 0;
}
/*輸出結果
thread(2200): 2
thread(13492): 3
thread(19072): 4
thread(11624): 5
thread(16712): 6
thread(14632): 1
thread(14124): 7
thread(2092): 8
thread(2820): 9
The answer=42
 8 9 7 6 5 4 3 1 2 0
9 8 7 6 5 4 3 2 1 0
106675
*/

二. 內存屏障與同步

(一)亂序的原因

  從源碼變成可以被機器識別的程序,至少要經過編譯期和運行期。重排序分為兩類:編譯期重排序和運行期重排序。由於重排序的存在,指令實際的執行順序,並不是源碼中看到的順序。

  1. 編譯器出於優化的目的,在編譯階段將源碼的順序進行交換。

  2. 對計算機來說,通常內存的寫操作相對於讀操作要昂貴很多。因此,寫操作一般會在 CPU 內部的store buffer緩存。這就導致在一個 CPU 里執行一個寫操作之后,不一定馬上就被另一個 CPU 所看到。這從另一個角度講,效果上其實就是讀寫亂序了。

(二)CPU cache 工作機制及內存屏障【非權威,僅供參考】

 

  1. 為了提高CPU效率,每個CPU中都有兩塊私有的特殊內存:store buffer和Invalidate隊列。其中,store buffer,用於CPU臨時寫入數據的緩沖區。Invalidate隊列為該CPU cache即將無效數據項的請求隊列。只有位於cache或memory中的數據才會被CPUs共享並且他們通過MESI協議保證了數據的一致性。即不論哪個CPU發生了cache miss,它們從其他CPU嗅探到的或從內存加載的數據都是一樣的。

  2. 存在於store buffer中的數據是最新的(因為執行了寫操作),該CPU下次的讀操作往往優先從這里而不是cache中獲取。但store buffer中的數據是該CPU私有的,不能被其他CPU共享。而且經常也不會立即刷新到cache或memory中一般需要等到寫這個數據的CPU接收到來自其他CPU的invalidate acknowledgement回復后才會進行刷新store buffer和cache中數據可能是不一樣的,store buffer的較新,而cache中是舊值。此外,讀操作時,只有該項數據是無效的才會從其他CPU或memory中讀取,如果仍處於有效狀態則會從本地cache中讀取。因此,如果invalidate請求還保存在隊列中的話,就可能讀到舊值。只有等該CPU處理完隊列中的這條invalidate消息,才會從cache或memory中讀取。因此,store buffer和Invalidate隊列都可能引起memory oder問題。

  3. 內存屏障的作用

  (1)寫屏障指令:作用於store buffer,它只是約束執行CPU上的store操作的順序。該指令有兩個作用A. 確保寫屏障之前的store操作不會被重排到指令之后B.執行當遇到該指令時首先flush store buffer(也就是將指令之前store操作寫入於store buffer中的值刷新到cacheline中)。在C++11中可以使用std::atomic_thread_fence(std::memory_order_release)強制加入寫屏障指令。

  (2)讀屏障指令:作用於Invalidate queue上,它只是約束執行CPU上的load操作的順序。該指令有兩個作用A. 確保讀屏障之后的load操作不會被重排到指令之前B.執行后續的load操作之前,先將Invalidate Queue隊列中相關項執行完,確保后續的load操作觸發cache miss,從而讀到最新的數據。讀屏障指令就象一道柵欄,嚴格區分了之前和之后的load操作。在C++11中可以使用std::atomic_thread_fence(std::memory_order_acquire) 強制加入讀屏障指令。

(三)C++11的6種memory order

內存順序

作用

memory_order_relexed

只保證當前操作的原子性。

不考慮線程間的同步,其它線程可能讀到舊值(因為不指定內存屏障,所以內存操作執行時可能是亂序的)

memory_order_acquire

①對讀取(load)實施acquire語義。相當於插入一個內存讀屏障,保證之后的讀操作不會被重排到該操作之前

類似於mutex的lock操作,但不會阻塞。只是保證如果其它線程的release己經發生,則本線程其后對該原子變量的load操作一定會獲取到該變量release之前發生的寫入。因此,acuquire一般需要用循環,等待其他線程release的發生

memory_order_release

①對寫入(store)實施release語義(類似於mutex的unlock操作)。保證之前的寫操作不會被重排到該操作之后,同時確保該操作之前的store操作將數據寫入cache或memory中,確保cache或memory是最新數據。(相當於插入一個寫屏障)。

②該操作之前當前線程內的所有寫操作,對於其他對這個原子變量進行acquire或assume的線程可見。(對於assume,寫操作指對依賴變量的寫操作)

③該操作本身是原子操作。

memory_order_consume

類似memory_order_acquire,但只對與這塊內存有關的讀寫操作起作用。

memory_order_acq_rel

對讀取和寫入施加acquire-release語義,無法被重排(相當於同時插入讀寫兩種內存屏障)。

②可以看見其他線程施加release語義的所有寫入,同時自己release結束后所有寫入對其他acquire線程可見。

memory_order_seq_cst

如果是讀取就是acquire語義,寫入就是release語義,讀寫就施加acquire-release語義

②同時會對所有使用此memory-order的原子操作建立一個全局順序這樣,所有線程都將看到同樣一個的內存操作順序。

【編程實驗】內存順序

#include <iostream>
#include <thread>
#include <atomic>
#include <cassert>

using namespace std;

//Acuqire-Release、relaxed語義
class CAcqRel
{
    int m;
    std::atomic<bool> x, y;
    std::atomic<int> z;
public:
    CAcqRel():x(false),y(false),z(0),m(0){}
    void write_x_then_y()
    {
        m = 1; //非原子變量
        x.store(true, std::memory_order_relaxed); //relaxed語義,無法保證a和x寫入的先后順序。

        y.store(true, std::memory_order_release); //對x實施release語義,保證a/x一定在y之前被寫入
    }

    void read_y_then_x()
    {
        while (!y.load(std::memory_order_acquire)); //對y實施acuqire語義,同時使用循
                                                    //等待y原子變量的release的發生。
        if (x.load(std::memory_order_relaxed)) //acquire線程可見到release之前的寫操作,因此x為true。
            ++z;
    }

    void test()
    {
        std::thread a(&CAcqRel::write_x_then_y, this);
        std::thread b(&CAcqRel::read_y_then_x, this);

        a.join(); b.join(); 
        assert(z.load() != 0); //條件成立
        assert(m == 1);  //條件成立
    }
};

//Consume語義
class CConsume
{
    std::atomic<std::string*> ptr;
    int data;
    std::atomic<int> atData;
public:
    CConsume():data(0),atData(0),ptr(nullptr){}

    void producer()
    {
        std::string* p = new std::string("Hello");
        data = 42;
        atData.store(2019, std::memory_order_relaxed);

        ptr.store(p, std::memory_order_release);
    }

    void consumer()
    {
        std::string* p2;
        while (!(p2 = ptr.load(std::memory_order_consume))); //consume語義,只能保證額ptr
                                                             //依賴的變量p己被存儲,但不保證
                                                             //data和atData的值。

        assert(*p2 == "Hello"); //條件一定成立。
        assert(data == 42);  //無法保證data一定等於42。因為ptr對其無依賴。
        assert(atData == 2019); //無法保證atData一定等於2019,因為ptr對其無依賴
    }

    void test()
    {
        std::thread t1(&CConsume::consumer, this);
        std::thread t2(&CConsume::producer, this);
        t1.join(); t2.join();
    }
};

//seq_cst語義
class CSeqCst
{
    std::string work;
    std::atomic<bool> ready;
    std::atomic<int> data;
public:
    CSeqCst():ready(false),data(0){}

    void write()
    {
        //以下的寫操作由於采用memory_order_seq_cst語義,因此當寫入時會產生一個全局
        //的寫入順序,即先work再data,最后寫入ready。這個順序對所有使用該語義的
        //線程可見,即
        work = "done";
        data.store(2019, std::memory_order_seq_cst);
        ready = true; //默認采用memory_order_seq_cst語義
    }

    void read()
    {
        //默認采用memory_order_seq_cst語義。當ready發生時,由於全局順序必然知道data
        //和work的存儲己發生
        while (!ready.load());

        std::cout << work << std::endl; //done,全局順序,work一定為done
        std::cout << data << std::endl; //2019,全局順序,data一定等於2019
    }

    void test()
    {
        std::thread t1(&CSeqCst::write, this);
        std::thread t2(&CSeqCst::read, this);
        t1.join(); t2.join();
    }
};

class CMemoryBarriers
{
    bool x = false; //x為一個非原子變量
    std::atomic<bool> y;
    std::atomic<int> z;
public:
    CMemoryBarriers():x(false),y(false),z(0){}

    void write_x_then_y()
    {
        x = true; //①在柵欄前存儲x
        std::atomic_thread_fence(std::memory_order_release); //②“釋放柵欄”
        y.store(true, std::memory_order_relaxed); //③在柵欄之后存儲y
    }

    void read_y_then_x()
    {
        while (!y.load(std::memory_order_relaxed)); //④  在③寫入前持續等待

        std::atomic_thread_fence(std::memory_order_acquire); //⑤“讀取柵欄”。②與⑤是同步關系
                                                             //②與⑤兩個柵欄都是必要的,這樣才
                                                             //能在兩個線程間建立同步關系。
        if (x) ++z;  //⑥。這里的x讀取是在⑤之后,由於柵欄之間的同步關系,也發生在①之后。所以
                     //這里讀取到的值是①寫入的。
    }

    void test()
    {
        std::thread a(&CMemoryBarriers::write_x_then_y, this);
        std::thread b(&CMemoryBarriers::read_y_then_x, this);

        a.join(); b.join();
        assert(z.load() != 0); //條件成立
    }

};

int main()
{
    //測試memory-order
    CAcqRel ar;
    ar.test();

    CConsume cs;
    cs.test();

    CSeqCst sc;
    sc.test();

    CMemoryBarriers mb;
    mb.test();

    return 0;
}
/*輸出結果
done
2019
*/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM