C++ std::atomic 原子類型
原子操作:一個不可分割的操作。
標准原子類型可以在
is_lock_free()的成員函數,讓用戶決定在給定類型上的操作是否用原子指令完成。唯一不提供
is_lock_free()成員函數的類型是
std::atomic_flag,在此類型上的操作要求是無鎖的。可以利用
std::atomic_flag實現一個簡單的鎖。
#include <iostream>
#include <thread>
#include <atomic>
#include <assert.h>
class spinlock_mutex
{
public:
spinlock_mutex() : flag_(ATOMIC_FLAG_INIT) { }
void lock()
{
while(flag_.test_and_set(std::memory_order_acquire)) ;
}
void unlock()
{
flag_.clear(std::memory_order_release);
}
private:
std::atomic_flag flag_;
};
int value = 0;
spinlock_mutex mutex;
void test_function()
{
for(int i = 0; i < 100000; i++)
{
std::unique_lock<spinlock_mutex> lock(mutex);
++ value;
}
}
int main()
{
std::thread t1(test_function);
std::thread t2(test_function);
t1.join();
t2.join();
assert(value == 200000);
return 0;
}
C++ 11中的內存模型都是圍繞std::atomic展開的,下面依次介紹C++ 11中引入的內存順序。
參考: Memory Model
順序一致順序
默認的的順序被命名為順序一致,因為這意味着程序的行為和一個簡單的世界觀是一致的。如果所有原子類型實例上的操作是順序一致的,多線程的行為就好像是所有這些操作由單個線程以某種特定的順序進行執行的一樣。
在一個帶有多處理器的弱順序的機器上,它可能導致顯著的性能懲罰,因為操作的整體順序必須與處理器之間保持一致,可能需要處理器之間進行密集的同步操作。
#include <atomic>
#include <thread>
#include <assert.h>
std::atomic<bool> x, y;
std::atomic<int> z;
void write_x()
{
x.store(true, std::memory_order_seq_cst);
}
void write_y()
{
y.store(true, std::memory_order_seq_cst);
}
void read_x_then_y()
{
while(!x.load(std::memory_order_seq_cst)) ;
if(y.load(std::memory_order_seq_cst))
{
printf("x,y\n");
++ z;
}
}
void read_y_then_x()
{
while(!y.load(std::memory_order_seq_cst)) ;
if(x.load(std::memory_order_seq_cst))
{
printf("y,x\n");
++ z;
}
}
int main()
{
x = false;
y = false;
z = 0;
std::thread a(write_x);
std::thread b(write_y);
std::thread c(read_x_then_y);
std::thread d(read_y_then_x);
a.join();
b.join();
c.join();
d.join();
assert(z.load() != 0);
return 0;
}
上述代碼中的assert永遠不會觸發,因為while循環總能保證x或者y的值已經修改為true,如果線程c或d中有一個線程if條件不滿足,那么另一個線程的if條件總能保障,所以最后z的值一定不為0。請注意memory_order_seq_cst的語義需要在所有標記memory_order_seq_cst的操作上有單一的總體順序。
順序一致是最直觀的順序,但是也是最為昂貴的內存順序,因為它要求所有線程之間的全局同步。在多處理器系統中,這可能需要處理器之間相當密集和耗時的通信。
松散順序
以松散順序執行的原子類型上的操作不參與synchronizes-with關系。單線程中的同一變量的操作仍然服從happens-before的關系,但相對於其他線程的順序幾乎沒有任何要求。唯一的要求是,從同一線程對單個原子變量的訪問不能重排,一旦給定的線程已經看到了原子變量的特定值,該線程之后的讀取就不能獲取該變量更早的值。以下程序展現了這種松散性。
#include <atomic>
#include <thread>
#include <assert.h>
std::atomic<bool> x, y;
std::atomic<int> z;
void write_x_then_y()
{
x.store(true, std::memory_order_relaxed);
y.store(true, std::memory_order_relaxed);
}
void read_x_then_y()
{
while(!y.load(std::memory_order_relaxed)) ;
if(x.load(std::memory_order_relaxed))
++ z;
}
int main()
{
x = false;
y = false;
z = 0;
std::thread a(write_x_then_y);
std::thread b(read_x_then_y);
a.join();
b.join();
assert(z.load() != 0);
}
這一次,assert可能會觸發。因為x和y是不同的變量,每個操作所產生的值的可見性沒有順序的保證。
為了理解松散順序是如何工作的,可以想象每個變量是一個小隔間里使用記事本的人。在他的記事本上有一列值。你可以打電話給他,要求他給你一個值,或者你可以告訴他寫下了一個新值。如果你告訴他寫下新值,他就將其寫在列表的底部。如果你向他要一個值,他就為你從列表之中讀取一個數字。第一次你和這個人交談,如果你向他要一個值,此時他可能從他的記事本上的列表里任意選一個給你。如果你接着向他要另一個值,他可能會再給你同一個值,或者從列表的下方給一個給你。他永遠不會給你一個在列表上更上面的值。
獲取釋放順序
獲取釋放順序是松散順序的進步,操作仍然沒有總的順序,但是引入了一些同步。在這個順序模型下,原子載入是acquire操作memory_order_acquire,原子存儲是release操作memory_order_release,原子的讀,修改,寫操作是獲取,釋放或者兩者兼有memory_order_acq_rel。不同的線程仍然可以看到不同的順序,但是這些順序受到了限制。
#include <atomic>
#include <thread>
#include <assert.h>
std::atomic<bool> x, y;
std::atomic<int> z;
void write_x()
{
x.store(true, std::memory_order_release);
}
void write_y()
{
y.store(true, std::memory_order_release);
}
void read_x_then_y()
{
while(!x.load(std::memory_order_acquire)) ;
if(y.load(std::memory_order_acquire))
{
++ z;
}
}
void read_y_then_x()
{
while(!y.load(std::memory_order_seq_cst)) ;
if(x.load(std::memory_order_seq_cst))
{
++ z;
}
}
int main()
{
x = false;
y = false;
z = 0;
std::thread a(write_x);
std::thread b(write_y);
std::thread c(read_x_then_y);
std::thread d(read_y_then_x);
a.join();
b.join();
c.join();
d.join();
assert(z.load() != 0);
return 0;
}
上述代碼中的斷言仍然可能觸發,因為對x的載入和對y的載入都讀取false也是有可能的。x與y由不同的線程寫入,所以每種情況從釋放到獲取的順序對另一個線程的操作是沒有影響的。
但是對於同一個線程來說,使用獲取-釋放操作可以在松散操作之中施加順序。
#include <atomic>
#include <thread>
#include <assert.h>
std::atomic<bool> x, y;
std::atomic<int> z;
void write_x_then_y()
{
x.store(true, std::memory_order_relaxed);
y.store(true, std::memory_order_release);
}
void read_y_then_x()
{
while(!y.load(std::memory_order_acquire));
if(x.load(std::memory_order_relaxed))
++ z;
}
int main()
{
x = false;
y = false;
z = 0;
std::thread a(write_x_then_y);
std::thread b(read_y_then_x);
a.join();
b.join();
assert(z.load() != 0);
return 0;
}
因為存儲使用memory_order_release並且載入使用memory_order_acquire,存儲與載入同步。對x的存儲發生在y的存儲之前,因為它們在同一個線程之中。因為對y的存儲與對y的載入同步,對x的載入必然讀到true,所以斷言並不會觸發。配合使用release和acquire可以達到跨線程同步的功能,如下代碼所示:
#include <atomic>
#include <thread>
#include <assert.h>
std::atomic<int> data[5];
std::atomic<bool> sync1(false), sync2(false);
void thread_1()
{
data[0].store(42, std::memory_order_relaxed);
data[1].store(97, std::memory_order_relaxed);
data[2].store(17, std::memory_order_relaxed);
data[3].store(1, std::memory_order_relaxed);
data[4].store(2, std::memory_order_relaxed);
sync1.store(true, std::memory_order_release);
}
void thread_2()
{
while(!sync1.load(std::memory_order_acquire)) ;
sync2.store(true, std::memory_order_release);
}
void thread_3()
{
while(!sync2.load(std::memory_order_acquire));
assert(data[0].load(std::memory_order_relaxed) == 42);
assert(data[1].load(std::memory_order_relaxed) == 97);
assert(data[2].load(std::memory_order_relaxed) == 17);
assert(data[3].load(std::memory_order_relaxed) == 1);
assert(data[4].load(std::memory_order_relaxed) == 2);
}
int main()
{
std::thread a(thread_1);
std::thread b(thread_2);
std::thread c(thread_3);
a.join();
b.join();
c.join();
return 0;
}
獲取釋放順序與MEMORY_ORDER_CONSUME的數據依賴
通過在載入上使用memory_order_consume以及在之前的存儲上使用memory_order_release,你可以確保所指向的數據得到正確的同步,並且無需再其他非依賴的數據上強制任何同步需求。以下代碼展示了這種用途:
#include <atomic>
#include <thread>
#include <assert.h>
#include <string>
#include <unistd.h>
struct X
{
int i;
std::string s;
};
std::atomic<X*> p;
std::atomic<int> a;
void create_x()
{
X* x = new X;
x->i = 42;
x->s = "hello world";
a.store(99, std::memory_order_relaxed);
// 因為這里依賴了x,所以這一句代碼執行時保證了x已經初始化完畢,並且已經完成賦值。
// 要點,有依賴關系的都已賦值完畢
p.store(x, std::memory_order_release);
}
void use_x()
{
X * x;
while(!(x=p.load(std::memory_order_consume)))
sleep(1);
assert(x->i == 42);
assert(x->s == "hello world");
// 可能斷言出錯
assert(a.load(std::memory_order_relaxed) == 99);
}
int main()
{
std::thread t1(create_x);
std::thread t2(use_x);
t1.join();
t2.join();
}
上述代碼中的前兩個斷言不會出錯,因為p的載入帶有對那些通過變量x的表達式的依賴。另一方面,在a的值上的斷言或許會被觸發。此操作並不依賴從p載入的值,因而對讀到的值就沒有保證。
內存屏障
內存屏障分為寫內存屏障和讀內存屏障。寫內存屏障std::atomic_thread_fence(std::memory_order_release)保證所有在屏障之前的寫入操作都會在屏障之后的寫入操作之前完成,而讀內存屏障std::atomic_thread_fence(std::memory_order_acquire)確保所有屏障之前的讀取操作都會在屏障之后的讀取操作前執行。內存屏障使得特定的操作無法穿越。以下代碼演示了內存屏障的用法。
#include <atomic>
#include <thread>
#include <assert.h>
#include <string>
#include <unistd.h>
std::atomic<bool> x, y;
std::atomic<int> z;
void write_x_then_y()
{
x.store(true, std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_release);
y.store(true, std::memory_order_release);
}
void read_y_then_x()
{
while(!y.load(std::memory_order_acquire));
std::atomic_thread_fence(std::memory_order_acquire);
if(x.load(std::memory_order_relaxed))
++ z;
}
int main()
{
x = false;
y = false;
z = 0;
std::thread a(write_x_then_y);
std::thread b(read_y_then_x);
a.join();
b.join();
assert(z.load() != 0);
}
釋放屏障與獲取屏障同步,因為線程b中從y載入在線程a中存儲的值,這意味着線程a對x的存儲發生在線程b從x的load之前,所以讀取的值一定為true,斷言永遠不會觸發。
參考: 《 C++並發編程實戰 》
