C++11 並發指南六(atomic 類型詳解一 atomic_flag 介紹) 一文介紹了 C++11 中最簡單的原子類型 std::atomic_flag,但是 std::atomic_flag 過於簡單,只提供了 test_and_set 和 clear 兩個 API,不能滿足其他需求(如 store, load, exchange, compare_exchange 等),因此本文將介紹功能更加完善的 std::atomic 類。
std::atomic 基本介紹
std::atomic 是模板類,一個模板類型為 T 的原子對象中封裝了一個類型為 T 的值。
template <class T> struct atomic;
原子類型對象的主要特點就是從不同線程訪問不會導致數據競爭(data race)。因此從不同線程訪問某個原子對象是良性 (well-defined) 行為,而通常對於非原子類型而言,並發訪問某個對象(如果不做任何同步操作)會導致未定義 (undifined) 行為發生。
C++11 標准中的基本 std::atomic 模板定義如下:
template < class T > struct atomic { bool is_lock_free() const volatile; bool is_lock_free() const; void store(T, memory_order = memory_order_seq_cst) volatile; void store(T, memory_order = memory_order_seq_cst); T load(memory_order = memory_order_seq_cst) const volatile; T load(memory_order = memory_order_seq_cst) const; operator T() const volatile; operator T() const; T exchange(T, memory_order = memory_order_seq_cst) volatile; T exchange(T, memory_order = memory_order_seq_cst); bool compare_exchange_weak(T &, T, memory_order, memory_order) volatile; bool compare_exchange_weak(T &, T, memory_order, memory_order); bool compare_exchange_strong(T &, T, memory_order, memory_order) volatile; bool compare_exchange_strong(T &, T, memory_order, memory_order); bool compare_exchange_weak(T &, T, memory_order = memory_order_seq_cst) volatile; bool compare_exchange_weak(T &, T, memory_order = memory_order_seq_cst); bool compare_exchange_strong(T &, T, memory_order = memory_order_seq_cst) volatile; bool compare_exchange_strong(T &, T, memory_order = memory_order_seq_cst); atomic() = default; constexpr atomic(T); atomic(const atomic &) = delete; atomic & operator=(const atomic &) = delete; atomic & operator=(const atomic &) volatile = delete; T operator=(T) volatile; T operator=(T); };
另外,C++11 標准庫 std::atomic 提供了針對整形(integral)和指針類型的特化實現,分別定義如下:
針對整形(integal)的特化,其中 integal 代表了如下類型char, signed char, unsigned char, short, unsigned short, int, unsigned int, long, unsigned long, long long, unsigned long long, char16_t, char32_t, wchar_t:
template <> struct atomic<integral> {
bool is_lock_free() const volatile;
bool is_lock_free() const;
void store(integral, memory_order = memory_order_seq_cst) volatile;
void store(integral, memory_order = memory_order_seq_cst);
integral load(memory_order = memory_order_seq_cst) const volatile;
integral load(memory_order = memory_order_seq_cst) const;
operator integral() const volatile;
operator integral() const;
integral exchange(integral, memory_order = memory_order_seq_cst) volatile;
integral exchange(integral, memory_order = memory_order_seq_cst);
bool compare_exchange_weak(integral&, integral, memory_order, memory_order) volatile;
bool compare_exchange_weak(integral&, integral, memory_order, memory_order);
bool compare_exchange_strong(integral&, integral, memory_order, memory_order) volatile;
bool compare_exchange_strong(integral&, integral, memory_order, memory_order);
bool compare_exchange_weak(integral&, integral, memory_order = memory_order_seq_cst) volatile;
bool compare_exchange_weak(integral&, integral, memory_order = memory_order_seq_cst);
bool compare_exchange_strong(integral&, integral, memory_order = memory_order_seq_cst) volatile;
bool compare_exchange_strong(integral&, integral, memory_order = memory_order_seq_cst);
integral fetch_add(integral, memory_order = memory_order_seq_cst) volatile;
integral fetch_add(integral, memory_order = memory_order_seq_cst);
integral fetch_sub(integral, memory_order = memory_order_seq_cst) volatile;
integral fetch_sub(integral, memory_order = memory_order_seq_cst);
integral fetch_and(integral, memory_order = memory_order_seq_cst) volatile;
integral fetch_and(integral, memory_order = memory_order_seq_cst);
integral fetch_or(integral, memory_order = memory_order_seq_cst) volatile;
integral fetch_or(integral, memory_order = memory_order_seq_cst);
integral fetch_xor(integral, memory_order = memory_order_seq_cst) volatile;
integral fetch_xor(integral, memory_order = memory_order_seq_cst);
atomic() = default;
constexpr atomic(integral);
atomic(const atomic&) = delete;
atomic& operator=(const atomic&) = delete;
atomic& operator=(const atomic&) volatile = delete;
integral operator=(integral) volatile;
integral operator=(integral);
integral operator++(int) volatile;
integral operator++(int);
integral operator--(int) volatile;
integral operator--(int);
integral operator++() volatile;
integral operator++();
integral operator--() volatile;
integral operator--();
integral operator+=(integral) volatile;
integral operator+=(integral);
integral operator-=(integral) volatile;
integral operator-=(integral);
integral operator&=(integral) volatile;
integral operator&=(integral);
integral operator|=(integral) volatile;
integral operator|=(integral);
integral operator^=(integral) volatile;
integral operator^=(integral);
};
針對指針的特化:
template <class T> struct atomic<T*> {
bool is_lock_free() const volatile;
bool is_lock_free() const;
void store(T*, memory_order = memory_order_seq_cst) volatile;
void store(T*, memory_order = memory_order_seq_cst);
T* load(memory_order = memory_order_seq_cst) const volatile;
T* load(memory_order = memory_order_seq_cst) const;
operator T*() const volatile;
operator T*() const;
T* exchange(T*, memory_order = memory_order_seq_cst) volatile;
T* exchange(T*, memory_order = memory_order_seq_cst);
bool compare_exchange_weak(T*&, T*, memory_order, memory_order) volatile;
bool compare_exchange_weak(T*&, T*, memory_order, memory_order);
bool compare_exchange_strong(T*&, T*, memory_order, memory_order) volatile;
bool compare_exchange_strong(T*&, T*, memory_order, memory_order);
bool compare_exchange_weak(T*&, T*, memory_order = memory_order_seq_cst) volatile;
bool compare_exchange_weak(T*&, T*, memory_order = memory_order_seq_cst);
bool compare_exchange_strong(T*&, T*, memory_order = memory_order_seq_cst) volatile;
bool compare_exchange_strong(T*&, T*, memory_order = memory_order_seq_cst);
T* fetch_add(ptrdiff_t, memory_order = memory_order_seq_cst) volatile;
T* fetch_add(ptrdiff_t, memory_order = memory_order_seq_cst);
T* fetch_sub(ptrdiff_t, memory_order = memory_order_seq_cst) volatile;
T* fetch_sub(ptrdiff_t, memory_order = memory_order_seq_cst);
atomic() = default;
constexpr atomic(T*);
atomic(const atomic&) = delete;
atomic& operator=(const atomic&) = delete;
atomic& operator=(const atomic&) volatile = delete;
T* operator=(T*) volatile;
T* operator=(T*);
T* operator++(int) volatile;
T* operator++(int);
T* operator--(int) volatile;
T* operator--(int);
T* operator++() volatile;
T* operator++();
T* operator--() volatile;
T* operator--();
T* operator+=(ptrdiff_t) volatile;
T* operator+=(ptrdiff_t);
T* operator-=(ptrdiff_t) volatile;
T* operator-=(ptrdiff_t);
};
std::atomic 成員函數
好了,對 std::atomic 有了一個最基本認識之后我們來看 std::atomic 的成員函數吧。
std::atomic 構造函數
std::atomic 的構造函數如下:
| default (1) | atomic() noexcept = default; |
|---|---|
| initialization (2) | constexpr atomic (T val) noexcept; |
| copy [deleted] (3) | atomic (const atomic&) = delete; |
- 默認構造函數,由默認構造函數創建的 std::atomic 對象處於未初始化(uninitialized)狀態,對處於未初始化(uninitialized)狀態 std::atomic對象可以由 atomic_init 函數進行初始化。
- 初始化構造函數,由類型 T初始化一個 std::atomic對象。
- 拷貝構造函數被禁用。
請看下例:
#include <iostream> // std::cout
#include <atomic> // std::atomic, std::atomic_flag, ATOMIC_FLAG_INIT
#include <thread> // std::thread, std::this_thread::yield
#include <vector> // std::vector
// 由 false 初始化一個 std::atomic<bool> 類型的原子變量
std::atomic<bool> ready(false);
std::atomic_flag winner = ATOMIC_FLAG_INIT;
void do_count1m(int id)
{
while (!ready) { std::this_thread::yield(); } // 等待 ready 變為 true.
for (volatile int i=0; i<1000000; ++i) {} // 計數
if (!winner.test_and_set()) {
std::cout << "thread #" << id << " won!\n";
}
}
int main ()
{
std::vector<std::thread> threads;
std::cout << "spawning 10 threads that count to 1 million...\n";
for (int i=1; i<=10; ++i) threads.push_back(std::thread(count1m,i));
ready = true;
for (auto& th : threads) th.join();
return 0;
}
std::atomic::operator=() 函數
std::atomic 的賦值操作函數定義如下:
| set value (1) | T operator= (T val) noexcept; T operator= (T val) volatile noexcept; |
|---|---|
| copy [deleted] (2) | atomic& operator= (const atomic&) = delete; atomic& operator= (const atomic&) volatile = delete; |
可以看出,普通的賦值拷貝操作已經被禁用。但是一個類型為 T 的變量可以賦值給相應的原子類型變量(相當與隱式轉換),該操作是原子的,內存序(Memory Order) 默認為順序一致性(std::memory_order_seq_cst),如果需要指定其他的內存序,需使用 std::atomic::store()。
#include <iostream> // std::cout
#include <atomic> // std::atomic
#include <thread> // std::thread, std::this_thread::yield
std::atomic <int> foo = 0;
void set_foo(int x)
{
foo = x; // 調用 std::atomic::operator=().
}
void print_foo()
{
while (foo == 0) { // wait while foo == 0
std::this_thread::yield();
}
std::cout << "foo: " << foo << '\n';
}
int main()
{
std::thread first(print_foo);
std::thread second(set_foo, 10);
first.join();
second.join();
return 0;
}
基本 std::atomic 類型操作
本節主要介紹基本 std::atomic 類型所具備的操作(即成員函數)。我們知道 std::atomic 是模板類,一個模板類型為 T 的原子對象中封裝了一個類型為 T 的值。本文<std::atomic 基本介紹>一節中也提到了 std::atomic 類模板除了基本類型以外,還針對整形和指針類型做了特化。 特化的 std::atomic 類型支持更多的操作,如 fetch_add, fetch_sub, fetch_and 等。本小節介紹基本 std::atomic 類型所具備的操作:
- is_lock_free
bool is_lock_free() const volatile noexcept; bool is_lock_free() const noexcept;
- 判斷該 std::atomic 對象是否具備 lock-free 的特性。如果某個對象滿足 lock-free 特性,在多個線程訪問該對象時不會導致線程阻塞。(可能使用某種事務內存 transactional memory 方法實現 lock-free 的特性)。
- store
void store (T val, memory_order sync = memory_order_seq_cst) volatile noexcept; void store (T val, memory_order sync = memory_order_seq_cst) noexcept;
-
修改被封裝的值,std::atomic::store 函數將類型為 T 的參數 val 復制給原子對象所封裝的值。T 是 std::atomic 類模板參數。另外參數 sync 指定內存序(Memory Order),可能的取值如下:
| Memory Order 值 | Memory Order 類型 |
|---|---|
| memory_order_relaxed | Relaxed |
| memory_order_release | Release |
| memory_order_seq_cst | Sequentially consistent |
- 請看下面例子:
#include <iostream> // std::cout
#include <atomic> // std::atomic, std::memory_order_relaxed
#include <thread> // std::thread
std::atomic<int> foo(0); // 全局的原子對象 foo
void set_foo(int x)
{
foo.store(x, std::memory_order_relaxed); // 設置(store) 原子對象 foo 的值
}
void print_foo()
{
int x;
do {
x = foo.load(std::memory_order_relaxed); // 讀取(load) 原子對象 foo 的值
} while (x == 0);
std::cout << "foo: " << x << '\n';
}
int main ()
{
std::thread first(print_foo); // 線程 first 打印 foo 的值
std::thread second(set_foo, 10); // 線程 second 設置 foo 的值
first.join();
second.join();
return 0;
}
- load
T load (memory_order sync = memory_order_seq_cst) const volatile noexcept; T load (memory_order sync = memory_order_seq_cst) const noexcept;
- 讀取被封裝的值,參數 sync 設置內存序(Memory Order),可能的取值如下:
| Memory Order 值 | Memory Order 類型 |
|---|---|
| memory_order_relaxed | Relaxed |
| memory_order_consume | Consume |
| memory_order_acquire | Acquire |
| memory_order_seq_cst | Sequentially consistent |
- 請看下面例子:
#include <iostream> // std::cout
#include <atomic> // std::atomic, std::memory_order_relaxed
#include <thread> // std::thread
std::atomic<int> foo(0); // 全局的原子對象 foo
void set_foo(int x)
{
foo.store(x, std::memory_order_relaxed); // 設置(store) 原子對象 foo 的值
}
void print_foo()
{
int x;
do {
x = foo.load(std::memory_order_relaxed); // 讀取(load) 原子對象 foo 的值
} while (x == 0);
std::cout << "foo: " << x << '\n';
}
int main ()
{
std::thread first(print_foo); // 線程 first 打印 foo 的值
std::thread second(set_foo, 10); // 線程 second 設置 foo 的值
first.join();
second.join();
return 0;
}
- operator T
operator T() const volatile noexcept; operator T() const noexcept;
- 與 load 功能類似,也是 讀取被封裝的值,operator T() 是類型轉換( type-cast) 操作,默認的內存序是 std::memory_order_seq_cst,如果需要指定其他的內存序,你應該使用 load() 函數。請看下面例子:
#include <iostream> // std::cout
#include <atomic> // std::atomic
#include <thread> // std::thread, std::this_thread::yield
std::atomic<int> foo = 0;
std::atomic<int> bar = 0;
void set_foo(int x)
{
foo = x;
}
void copy_foo_to_bar()
{
// 如果 foo == 0,則該線程 yield,
// 在 foo == 0 時, 實際也是隱含了類型轉換操作,
// 因此也包含了 operator T() const 的調用.
while (foo == 0) std::this_thread::yield();
// 實際調用了 operator T() const, 將foo 強制轉換成 int 類型,
// 然后調用 operator=().
bar = static_cast<int>(foo);
}
void print_bar()
{
// 如果 bar == 0,則該線程 yield,
// 在 bar == 0 時, 實際也是隱含了類型轉換操作,
// 因此也包含了 operator T() const 的調用.
while (bar == 0) std::this_thread::yield();
std::cout << "bar: " << bar << '\n';
}
int main ()
{
std::thread first(print_bar);
std::thread second(set_foo, 10);
std::thread third(copy_foo_to_bar);
first.join();
second.join();
third.join();
return 0;
}
- exchange
T exchange (T val, memory_order sync = memory_order_seq_cst) volatile noexcept; T exchange (T val, memory_order sync = memory_order_seq_cst) noexcept;
- 讀取並修改被封裝的值,exchange 會將 val 指定的值替換掉之前該原子對象封裝的值,並返回之前該原子對象封裝的值,整個過程是原子的(因此exchange 操作也稱為 read-modify-write 操作)。sync參數指定內存序(Memory Order),可能的取值如下:
| Memory Order 值 | Memory Order 類型 |
|---|---|
| memory_order_relaxed | Relaxed |
| memory_order_consume | Consume |
| memory_order_acquire | Acquire |
| memory_order_release | Release |
| memory_order_acq_rel | Acquire/Release |
| memory_order_seq_cst | Sequentially consistent |
請看下面例子,各個線程計數至 1M,首先完成計數任務的線程打印自己的 ID,
#include <iostream> // std::cout
#include <atomic> // std::atomic
#include <thread> // std::thread
#include <vector> // std::vector
std::atomic<bool> ready(false);
std::atomic<bool> winner(false);
void count1m (int id)
{
while (!ready) {} // wait for the ready signal
for (int i = 0; i < 1000000; ++i) {} // go!, count to 1 million
if (!winner.exchange(true)) { std::cout << "thread #" << id << " won!\n"; }
};
int main ()
{
std::vector<std::thread> threads;
std::cout << "spawning 10 threads that count to 1 million...\n";
for (int i = 1; i <= 10; ++i) threads.push_back(std::thread(count1m,i));
ready = true;
for (auto& th : threads) th.join();
return 0;
}
- compare_exchange_weak
| (1) | bool compare_exchange_weak (T& expected, T val,
memory_order sync = memory_order_seq_cst) volatile noexcept;
bool compare_exchange_weak (T& expected, T val,
memory_order sync = memory_order_seq_cst) noexcept;
|
|---|---|
| (2) | bool compare_exchange_weak (T& expected, T val,
memory_order success, memory_order failure) volatile noexcept;
bool compare_exchange_weak (T& expected, T val,
memory_order success, memory_order failure) noexcept; |
- 比較並交換被封裝的值(weak)與參數 expected 所指定的值是否相等,如果:
- 相等,則用 val 替換原子對象的舊值。
- 不相等,則用原子對象的舊值替換 expected ,因此調用該函數之后,如果被該原子對象封裝的值與參數 expected 所指定的值不相等,expected 中的內容就是原子對象的舊值。
-
該函數通常會讀取原子對象封裝的值,如果比較為 true(即原子對象的值等於
expected),則替換原子對象的舊值,但整個操作是原子的,在某個線程讀取和修改該原子對象時,另外的線程不能對讀取和修改該原子對象。
在第 (2)種情況下,內存序(Memory Order)的選擇取決於比較操作結果,如果比較結果為 true(即原子對象的值等於 expected),則選擇參數 success 指定的內存序,否則選擇參數 failure 所指定的內存序。 -
注意,該函數直接比較原子對象所封裝的值與參數 expected 的物理內容,所以某些情況下,對象的比較操作在使用 operator==() 判斷時相等,但 compare_exchange_weak 判斷時卻可能失敗,因為對象底層的物理內容中可能存在位對齊或其他邏輯表示相同但是物理表示不同的值(比如 true 和 2 或 3,它們在邏輯上都表示"真",但在物理上兩者的表示並不相同)。 -
與 compare_exchange_strong 不同, weak 版本的 compare-and-exchange 操作允許( spuriously 地)返回 false(即原子對象所封裝的值與參數 expected 的物理內容相同,但卻仍然返回 false),不過在某些需要循環操作的算法下這是可以接受的,並且在一些平台下 compare_exchange_weak 的性能更好 。如果 compare_exchange_weak 的判斷確實發生了偽失敗( spurious failures)——即使原子對象所封裝的值與參數 expected 的物理內容相同,但判斷操作的結果卻為 false,compare_exchange_weak函數返回 false,並且參數expected 的值不會改變。 -
對於某些不需要采用循環操作的算法而言, 通常采用 compare_exchange_strong 更好。另外,該函數的內存序由 sync 參數指定,可選條件如下:
| Memory Order 值 | Memory Order 類型 |
|---|---|
| memory_order_relaxed | Relaxed |
| memory_order_consume | Consume |
| memory_order_acquire | Acquire |
| memory_order_release | Release |
| memory_order_acq_rel | Acquire/Release |
| memory_order_seq_cst | Sequentially consistent |
- 請看下面的例子( 參考):
#include <iostream> // std::cout
#include <atomic> // std::atomic
#include <thread> // std::thread
#include <vector> // std::vector
// a simple global linked list:
struct Node { int value; Node* next; };
std::atomic<Node*> list_head(nullptr);
void append(int val)
{
// append an element to the list
Node* newNode = new Node{val, list_head};
// next is the same as: list_head = newNode, but in a thread-safe way:
while (!list_head.compare_exchange_weak(newNode->next,newNode)) {}
// (with newNode->next updated accordingly if some other thread just appended another node)
}
int main ()
{
// spawn 10 threads to fill the linked list:
std::vector<std::thread> threads;
for (int i = 0; i < 10; ++i) threads.push_back(std::thread(append, i));
for (auto& th : threads) th.join();
// print contents:
for (Node* it = list_head; it!=nullptr; it=it->next)
std::cout << ' ' << it->value;
std::cout << '\n';
// cleanup:
Node* it; while (it=list_head) {list_head=it->next; delete it;}
return 0;
}
- 可能的執行結果如下:
9 8 7 6 5 4 3 2 1 0
- compare_exchange_strong
| (1) | bool compare_exchange_strong (T& expected, T val,
memory_order sync = memory_order_seq_cst) volatile noexcept;
bool compare_exchange_strong (T& expected, T val,
memory_order sync = memory_order_seq_cst) noexcept;
|
|---|---|
| (2) | bool compare_exchange_strong (T& expected, T val,
memory_order success, memory_order failure) volatile noexcept;
bool compare_exchange_strong (T& expected, T val,
memory_order success, memory_order failure) noexcept; |
- 比較並交換被封裝的值(strong)與參數 expected 所指定的值是否相等,如果:
- 相等,則用 val 替換原子對象的舊值。
- 不相等,則用原子對象的舊值替換 expected ,因此調用該函數之后,如果被該原子對象封裝的值與參數 expected 所指定的值不相等,expected 中的內容就是原子對象的舊值。
-
該函數通常會讀取原子對象封裝的值,如果比較為 true(即原子對象的值等於
expected),則替換原子對象的舊值,但整個操作是原子的,在某個線程讀取和修改該原子對象時,另外的線程不能對讀取和修改該原子對象。
在第 (2)種情況下,內存序(Memory Order)的選擇取決於比較操作結果,如果比較結果為 true(即原子對象的值等於 expected),則選擇參數 success 指定的內存序,否則選擇參數 failure 所指定的內存序。 -
注意,該函數直接比較原子對象所封裝的值與參數 expected 的物理內容,所以某些情況下,對象的比較操作在使用 operator==() 判斷時相等,但 compare_exchange_weak 判斷時卻可能失敗,因為對象底層的物理內容中可能存在位對齊或其他邏輯表示相同但是物理表示不同的值(比如 true 和 2 或 3,它們在邏輯上都表示"真",但在物理上兩者的表示並不相同)。
- 與 compare_exchange_weak 不同, strong版本的 compare-and-exchange 操作不允許( spuriously 地)返回 false,即原子對象所封裝的值與參數 expected 的物理內容相同,比較操作一定會為 true。不過在某些平台下,如果算法本身需要循環操作來做檢查, compare_exchange_weak 的性能會更好。
-
因此對於某些不需要采用循環操作的算法而言, 通常采用 compare_exchange_strong 更好。另外,該函數的內存序由 sync 參數指定,可選條件如下:
| Memory Order 值 | Memory Order 類型 |
|---|---|
| memory_order_relaxed | Relaxed |
| memory_order_consume | Consume |
| memory_order_acquire | Acquire |
| memory_order_release | Release |
| memory_order_acq_rel | Acquire/Release |
| memory_order_seq_cst | Sequentially consistent |
- 請看下面的例子:
#include <iostream> // std::cout
#include <atomic> // std::atomic
#include <thread> // std::thread
#include <vector> // std::vector
// a simple global linked list:
struct Node { int value; Node* next; };
std::atomic<Node*> list_head(nullptr);
void append(int val)
{
// append an element to the list
Node* newNode = new Node{val, list_head};
// next is the same as: list_head = newNode, but in a thread-safe way:
while (!(list_head.compare_exchange_strong(newNode->next, newNode)));
// (with newNode->next updated accordingly if some other thread just appended another node)
}
int main ()
{
// spawn 10 threads to fill the linked list:
std::vector<std::thread> threads;
for (int i = 0; i < 10; ++i) threads.push_back(std::thread(append, i));
for (auto& th : threads) th.join();
// print contents:
for (Node* it = list_head; it!=nullptr; it=it->next)
std::cout << ' ' << it->value;
std::cout << '\n';
// cleanup:
Node* it; while (it=list_head) {list_head=it->next; delete it;}
return 0;
}
好了,本文花了大量的篇幅介紹 std::atomic 基本類型,下一篇博客我會給大家介紹 C++11 的標准庫中std::atomic 針對整形(integral)和指針類型的特化版本做了哪些改進。
