0. 參考資料
- C++ atomic http://www.cplusplus.com/reference/atomic/atomic/
- C++ 原子操作庫 std::atomic https://www.apiref.com/cpp-zh/cpp/atomic/atomic/compare_exchange.html
- CAS https://www.cnblogs.com/muhe221/articles/5089918.html
- c++並發編程3. CAS原語 https://zhuanlan.zhihu.com/p/56055215
- C++11:原子交換函數compare_exchange_weak和compare_exchange_strong https://blog.csdn.net/feikudai8460/article/details/107035480/
- 理解memory order https://blog.csdn.net/jiang4357291/article/details/110753759
- C++11中的內存模型下篇 - C++11支持的幾種內存模型 https://www.codedump.info/post/20191214-cxx11-memory-model-2/
- 調試經驗 | C++ memory order和一個相關的穩定性問題 https://juejin.cn/post/6844904096671989773
- std::memory_order https://en.cppreference.com/w/cpp/atomic/memory_order
- std::memory_order https://zh.cppreference.com/w/cpp/atomic/memory_order
1. 背景
多線程讀寫非線程安全的數據結構時,為了保證結果正確性,一種方式是對數據結構加鎖后進行讀寫。為了解決加鎖帶來的性能損耗問題,可使用CAS。
2. CAS
Compare-and-Swap (CAS)是用於多線程以實現同步的原子指令。它將存儲位置的內容與給定值進行比較,當它們逐位相等,才將該存儲位置的內容修改為新的給定值。整個流程為一個原子操作。
2.1 C++的CAS方法
compare_exchange_weak
compare_exchange_strong
其位於atomic庫中 http://www.cplusplus.com/reference/atomic/atomic/
2.2 std::atomic的使用
定義一個原子對象,以鏈表Node為例:
struct Node {
int value;
Node *next;
};
std::atomic<Node *> list_head;
atomic中的主要方法有:
方法名 | 功能 | 備注 |
---|---|---|
is_lock_free | Is lock-free | |
store | Modify contained value | |
load | Read contained value | |
operator T | Access contained value | |
exchange | Access and modify contained value | |
compare_exchange_weak | Compare and exchange contained value (weak) | |
compare_exchange_strong | Compare and exchange contained value (strong) |
2.3 CAS函數說明
2.3.1compare_exchange_weak
bool compare_exchange_weak (T& expected, T val, memory_order sync = memory_order_seq_cst) volatile noexcept;
bool compare_exchange_weak (T& expected, T val, memory_order sync = memory_order_seq_cst) noexcept;
bool compare_exchange_weak (T& expected, T val, memory_order success, memory_order failure) volatile noexcept;
bool compare_exchange_weak (T& expected, T val, memory_order success, memory_order failure) noexcept;
2.3.2compare_exchange_strong
bool compare_exchange_strong (T& expected, T val, memory_order sync = memory_order_seq_cst) volatile noexcept;
bool compare_exchange_strong (T& expected, T val, memory_order sync = memory_order_seq_cst) noexcept;
bool compare_exchange_strong (T& expected, T val, memory_order success, memory_order failure) volatile noexcept;
bool compare_exchange_strong (T& expected, T val, memory_order success, memory_order failure) noexcept;
函數釋義:
將原子對象包含的值的內容與expected值進行比較:
-如果為true,則用val
替換原子對象包含的值
(類似於store)。
-如果為false,則用原子對象包含的值
替換expected
。
此函數直接將包含值的物理內容與預期值的內容進行比較
與compare_exchange_strong
不同,compare_exchange_weak
返回false時,仍然可能是expected值
與原子對象包含的對象
相等的情況。對於某些循環算法來說,這可能是可以接受,並且在某些平台上可能會有明顯更好的性能。對於這種情況,函數返回false,但沒有修改expected
。
對於非循環算法,通常首選compare_exchange_strong
。
方法參數釋義
expected
: 對一個對象的引用,該對象的值與原子對象包含的值
進行比較,如果不匹配,該對象可能會
被包含的值覆蓋。
val
: 當expected原子對象包含的值
匹配時,copy val到原子對象內包含的值
sync
: CAS操作的同步模式。參數類型為memory_order,為一組枚舉值,其具體功能后文介紹
2.3.3例程
Demo: 線程安全的無鎖鏈表
// a simple global linked list:
struct Node {
int value;
Node *next;
};
class ConcurrentLinkList {
public:
ConcurrentLinkList() { list_head = nullptr; }
void append(int val) { // append an element to the list
Node *old_head = list_head.load();
Node *new_node = new Node{val, old_head};
// equivalent to: list_head = new_node, but in a thread-safe way:
while (!list_head.compare_exchange_weak(old_head, new_node)) {
new_node->next = old_head;
}
}
void clean() {
Node *it;
while (it = list_head) {
list_head = it->next;
delete it;
}
}
public:
std::atomic<Node *> list_head;
};
點擊打開完整示例
#include <atomic> // std::atomic
#include <iostream> // std::cout
#include <thread> // std::thread
#include <vector> // std::vector
// a simple global linked list:
struct Node {
int value;
Node *next;
};
class ConcurrentLinkList {
public:
ConcurrentLinkList() { list_head = nullptr; }
void append(int val) { // append an element to the list
Node *old_head = list_head.load();
Node *new_node = new Node{val, old_head};
// equivalent to: list_head = new_node, but in a thread-safe way:
while (!list_head.compare_exchange_weak(old_head, new_node)) {
new_node->next = old_head;
}
}
void clean() {
Node *it;
while (it = list_head) {
list_head = it->next;
delete it;
}
}
public:
std::atomic<Node *> list_head;
};
int main() {
// spawn 10 threads to fill the linked list:
ConcurrentLinkList list;
std::vector<std::thread> threads;
for (int i = 0; i < 10; ++i) {
threads.push_back(std::thread(&ConcurrentLinkList::append, &list, i));
}
for (auto &th : threads) {
th.join();
}
// print contents:
for (Node *it = list.list_head; it != nullptr; it = it->next) {
std::cout << it->value << ' ';
}
std::cout << '\n';
// cleanup:
list.clean();
return 0;
}
compare_exchange_strong使用的結果
點擊打開完整示例
#include <atomic>
#include <iostream>
std::atomic<int> ai;
int tst_val= 4;
int new_val= 5;
bool exchanged= false;
void valsout()
{
std::cout << "ai= " << ai
<< " tst_val= " << tst_val
<< " new_val= " << new_val
<< " exchanged= " << std::boolalpha << exchanged
<< "\n";
}
int main()
{
ai= 3;
valsout();
// tst_val != ai ==> tst_val 被修改
exchanged= ai.compare_exchange_strong( tst_val, new_val );
valsout();
// tst_val == ai ==> ai 被修改
exchanged= ai.compare_exchange_strong( tst_val, new_val );
valsout();
}
結果:
ai= 3 tst_val= 4 new_val= 5 exchanged= false
ai= 3 tst_val= 3 new_val= 5 exchanged= false
ai= 5 tst_val= 3 new_val= 5 exchanged= true
3. memory order
compare_exchange_weak
和compare_exchange_strong
方法的參數 sync
表示CAS操作的同步模式,參數類型為memory_order
,為一組枚舉值,以下是枚舉類型memory_order
的所有可能值:
value | memory order | description |
---|---|---|
memory_order_relaxed | Relaxed | 沒有同步或順序制約,僅對此操作要求原子性 |
memory_order_consume | Consume | 1. 對當前要讀取的內存施加 release 語義(store),在代碼中這條語句后面所有與這塊內存有關的讀寫操作都無法被重排到這個操作之前 2. 在這個原子變量上施加 release 語義的操作發生之后,consume 可以保證讀到所有在 release 前發生的並且與這塊內存有關的寫入 |
memory_order_acquire | Acquire | 1. 向前保證,本線程中所有讀寫操作都不能重排到memory_order_acquire的load之前 2. 其他線程中所有memory_order_release的寫操作都對當前線程可見 |
memory_order_release | Release | 1. 向后保證,本線程中所有讀寫操作都不能重排到memory_order_acquire的store之后 2. 本線程中的所有寫都對其他對同一atomic變量帶有 memory_order_acquire的線程可見 3. 本線程中的所有寫都對其他所有有依賴且consume該變量的線程可見 |
memory_order_acq_rel | Acquire/Release | 1. 是release+acquire的結合,前后都保證,本線程中所有讀寫操作既不能重排到memory_order_acquire的load之前也不能到之后 2. 其他線程的memory_order_release寫在本線程寫之前都是可見的 3. 本線程的的寫對其他線程的memory_order_acquire讀都是可見的 |
memory_order_seq_cst | Sequentially consistent | 1. 順序一致性 2. 如果是讀取就是 acquire 語義,如果是寫入就是 release 語義,如果是讀取+寫入就是 acquire-release 語義 3. 同時會對所有使用此 memory order 的原子操作進行同步,所有線程看到的內存操作的順序都是一樣的,就像單個線程在執行所有線程的指令一樣 |
可參考這篇文章的圖: https://blog.csdn.net/jiang4357291/article/details/110753759
當多個線程訪問一個原子對象時,所有原子操作的行為特性:在任何其他原子操作可以訪問該對象之前,每個原子操作都是在該對象上完全執行的。這保證了這些對象上沒有數據競爭。
但是,每個線程可能會對原子對象本身以外的內存位置執行操作:這些其他操作可能會對其他線程產生明顯的副作用。memory_order這種類型的參數允許為操作指定內存順序(memory order),該操作確定如何在線程之間同步這些(可能是非原子的)可見副作用。
在 C11/C++11 中,引入了六種不同的 memory order,可以讓程序員在並發編程中根據自己需求盡可能降低同步的粒度,以獲得更好的程序性能。這六種 order 分別是:
relaxed, acquire, release, consume, acq_rel, seq_cst
Relaxed ordering 示例
點擊打開完整示例
#include <atomic>
#include <thread>
#include <assert.h>
std::atomic<bool> x,y;
void thread1_fun() {
x.store(true, memory_order_relaxed);
y.store(true, memory_order_relaxed);
}
void thread2_fun() {
while(!y.load(memory_order_relaxed));
assert(x.load(memory_order_relaxed));
}
int main() {
x=false;
y=false;
std::thread t1(thread1_fun);
std::thread t2(thread2_fun);
t1.join();
t2.join();
return 0;
}
結果
thread2_fun 內assert可能失敗。 因為thread1中的x和y的store可能會重排(因為這個重排並不影響在thread1內的執行的結果)
Release-consume ordering 示例
點擊打開完整示例
#include <atomic>
#include <thread>
#include <assert.h>
atomic<int> a;
atomic<int> b;
void fun1() {
a.store(1, memory_order_relaxed);
b.store(2, memory_order_release);
}
void fun2() {
while (b.load(memory_order_consume) != 2) {
// do nothing
}
assert(a == 1);
}
int main() {
a=0;
b=0;
std::thread t1(fun1);
std::thread t2(fun2);
t1.join();
t2.join();
return 0;
}
結果:fun2() 內assert可能失敗。 線程1中a和b並不相關,a的寫入就可能被重排到b之后,這樣在b線程load時就有可能a還未store,此時a=0,斷言失敗。(據[8]說memory_order_consume的設計有缺陷,建議大家不要使用)
Release-acquire ordering示例
點擊打開完整示例
memory_order_acquire禁止了該load操作之后的所有讀寫操作(不管是原子對象還是非原子對象)被重排到它之前去運行。[8]
memory_order_release禁止了該store操作之前的所有讀寫操作(不管是原子對象還是非原子對象)被重排到它之后去運行。[8]
當flag.load在時間上晚發生於flag.store時,Thread 1上flag.store之前的所有操作對於Thread 2上flag.load之后的所有操作都是可見的。如果flag.load發生的時間早於flag.store,那么兩個線程間則不擁有任何數據可見性。[8]
#include <thread>
#include <atomic>
#include <cassert>
std::atomic<bool> x = {false};
std::atomic<bool> y = {false};
std::atomic<int> z = {0};
void write_x()
{
x.store(true, std::memory_order_release);
}
void write_y()
{
y.store(true, std::memory_order_release);
}
void read_x_then_y()
{
while (!x.load(std::memory_order_acquire))
;
if (y.load(std::memory_order_acquire)) {
++z;
}
}
void read_y_then_x()
{
while (!y.load(std::memory_order_acquire))
;
if (x.load(std::memory_order_acquire)) {
++z;
}
}
int main()
{
std::thread a(write_x);
std::thread b(write_y);
std::thread c(read_x_then_y);
std::thread d(read_y_then_x);
a.join(); b.join(); c.join(); d.join();
assert(z.load() != 0);
}
結果:不少博客內認為assert可能失敗,解釋:
write_x線程中x的store使用了memory_order_release,和read_x_then_y中的load配對,保證了read_x_then_y線程中load x時能看到其他線程對x的所有release,所以load y之前x一定為true退出了循環
同理,read_y_then_x線程中,load x之前y一定為true
但是,read_x_then_y中y load之前y為0和1確都是有可能的,因為y.load(std::memory_order_acquire)只是保證了load y之前其他線程所有對y的release對當前都是可見的,但是完全有可能write_y線程還沒調度到,y就是0
同理,read_y_then_x中load x也無法保證x必定不為0,所以z最終就可能為0,斷言失敗
文章[7] https://www.codedump.info/post/20191214-cxx11-memory-model-2/ 內也有相同解釋
這里想實際有點存疑:
read_x_then_y 和 read_y_then_x
執行到if判斷時,x/y至少有1個已經store完成了並且對所有acquire線程可見了。所以if判斷至少有1個會執行到。 assert 必定成功。
解釋一下,只可能存在兩種情況:
1:read_x_then_y() 內的 if (y.load(std::memory_order_acquire)) 先執行
2:read_y_then_x() 內的 if (x.load(std::memory_order_acquire)) 先執行
如果是情況1,則x必定已經load完了,所以2內的if判斷成功
如果是情況2,則y必定已經load完了,所以1內的if判斷成功
文章[7]的評論內也有人有相同疑問,不過也有評論說明:
zh.cppreference.com/w/cpp/atomic/memory_order 的“釋放獲得順序”那一節里面說道到“同步僅建立在釋放和獲得同一原子對象的線程之間。其他線程可能看到與被同步線程的一者或兩者相異的內存訪問順序”,也就是說“在 thread c 可能看到是先寫 x 后寫 y,但在 thread d 可能是先寫 y 后寫 x”
這部分待證實
Sequentially-consistent ordering示例
點擊打開完整示例
#include <thread>
#include <atomic>
#include <cassert>
std::atomic<bool> x = {false};
std::atomic<bool> y = {false};
std::atomic<int> z = {0};
void write_x()
{
x.store(true, std::memory_order_seq_cst);
}
void write_y()
{
y.store(true, std::memory_order_seq_cst);
}
void read_x_then_y()
{
while (!x.load(std::memory_order_seq_cst))
;
if (y.load(std::memory_order_seq_cst)) {
++z;
}
}
void read_y_then_x()
{
while (!y.load(std::memory_order_seq_cst))
;
if (x.load(std::memory_order_seq_cst)) {
++z;
}
}
int main()
{
std::thread a(write_x);
std::thread b(write_y);
std::thread c(read_x_then_y);
std::thread d(read_y_then_x);
a.join(); b.join(); c.join(); d.join();
assert(z.load() != 0); // 決不發生
}