C++11 CAS無鎖函數compare_exchange_weak的使用


 

#include <iostream>       // std::cout
#include <atomic>         // std::atomic
#include <thread>         // std::thread
#include <vector>         // std::vector

// a simple global linked list:
struct Node { int value; Node* next; };
std::atomic<Node*> list_head (nullptr);

void append (int val) {     // append an element to the list
  Node* oldHead = list_head;
  Node* newNode = new Node {val,oldHead};

  // what follows is equivalent to: list_head = newNode, but in a thread-safe way:
  while (!list_head.compare_exchange_weak(oldHead,newNode)) {
    newNode->next = oldHead;
  }
}

int main ()
{
  // spawn 10 threads to fill the linked list:
  std::vector<std::thread> threads;
  for (int i=0; i<30; ++i) threads.push_back(std::thread(append,i));
  for (auto& th : threads) th.join();

  // print contents:
  for (Node* it = list_head; it!=nullptr; it=it->next)
    std::cout << ' ' << it->value;
  std::cout << '\n';

  // cleanup:
  Node* it; while (it=list_head) {list_head=it->next; delete it;}

  return 0;
}
#include <iostream>       // std::cout
#include <atomic>         // std::atomic
#include <thread>         // std::thread
#include <vector>         // std::vector

// a simple global linked list:
struct Node { int value; Node* next; };
std::atomic<Node*> list_head (nullptr);

void append (int val) {     // append an element to the list
  Node* oldHead = list_head;
  Node* newNode = new Node {val,oldHead};

  // what follows is equivalent to: list_head = newNode, but in a thread-safe way:
  while (!list_head.compare_exchange_weak(oldHead,newNode)) {
    newNode->next = oldHead;
  }
}

int main ()
{
  // spawn 10 threads to fill the linked list:
  std::vector<std::thread> threads;
  for (int i=0; i<30; ++i) threads.push_back(std::thread(append,i));
  for (auto& th : threads) th.join();

  // print contents:
  for (Node* it = list_head; it!=nullptr; it=it->next)
    std::cout << ' ' << it->value;
  std::cout << '\n';

  // cleanup:
  Node* it; while (it=list_head) {list_head=it->next; delete it;}

  return 0;
}

 

 

root@ubuntu:~/c++# g++ -std=c++11  weak.cpp -o weak -pthread
root@ubuntu:~/c++# ./weak
 29 28 27 23 26 25 24 22 21 20 19 18 17 16 15 14 13 12 11 10 9 8 7 6 5 4 3 2 1 0
root@ubuntu:~/c++# 

 

 

在看c++11的CAS用法的時候,主要是產生了兩個問題:

  1. compare_swap_strong 與 compare_swap_weak 有啥區別?
  2. c++11 CAS原語系列后面還有兩個memory_order參數,有什么作用?
/*
* @brief:compare & swap(CAS)。如果等於expect則swap,否則就返回--是否交換成功, 注意expect如果不相等,會把當前值寫入到expected里面。
* 相比於strong,weak可能會出現[spurious wakeup](<http://en.wikipedia.org/wiki/Spurious_wakeup>).
* @param          若x等於expect,則設置為desired 返回true,
*                 否則最新值寫入expect,返回false
*/
class atomic {
bool compare_exchange_strong(T& expect /*用來比較的值*/, T desired/*用來設置的值*/)
bool compare_exchange_weak(T& expect, T desired)
}

compare_swap_strong 與 compare_swap_weak

看一下compare_swap_strong()的實現是如何的?

首先介紹一下幾個定義好的與操作,實際上就是幾個enum

// 幾個與操作是經過了重載的。
/// Enumeration for memory_order
typedef enum memory_order {
    memory_order_relaxed,
    memory_order_consume,
    memory_order_acquire,
    memory_order_release,
    memory_order_acq_rel,
    memory_order_seq_cst
} memory_order;

enum __memory_order_modifier {
    __memory_order_mask          = 0x0ffff,
    __memory_order_modifier_mask = 0xffff0000,
    __memory_order_hle_acquire   = 0x10000,
    __memory_order_hle_release   = 0x20000
};

// & operator
constexpr memory_order
operator&(memory_order __m, __memory_order_modifier __mod) {
  return memory_order(__m & int(__mod));
}

可以簡單地理解為幾個常量在操作。接下來看真正的實現部分。

/c++/atomic 頭文件
compare_exchange_weak(__pointer_type& __p1,
                     __pointer_type __p2,
                     memory_order __m1,
                     memory_order __m2) noexcept {
    return _M_b.compare_exchange_strong(__p1, __p2, __m1, __m2);
}

// atomic_base
_GLIBCXX_ALWAYS_INLINE bool
compare_exchange_strong(__pointer_type& __p1, __pointer_type __p2,
                        memory_order __m1,
                        memory_order __m2) noexcept
{
  memory_order __b2 = __m2 & __memory_order_mask;
  memory_order __b1 = __m1 & __memory_order_mask;
  __glibcxx_assert(__b2 != memory_order_release);
  __glibcxx_assert(__b2 != memory_order_acq_rel);
  __glibcxx_assert(__b2 <= __b1);

  return __atomic_compare_exchange_n(&_M_p, &__p1, __p2, 0, __m1, __m2);
}

最后發現__atomic_compare_exchange_n是由GCC內置的函數。

gcc 內置

Built-in Function:

bool __atomic_compare_exchange_n(
    type *ptr,              // 需要進行比較的ptr
    type *expected,         // 舊的值,會返回ptr里面的值
    type desired,           // 想要設置的新的值
    bool weak,              // 強一致,還是弱一致
    int success_memorder,   // 成功時的內存序
    int failure_memorder    // 失敗時的內存序
)

weak與strong

參考文檔gcc內置CAS說明

詳細的說明如下,如果有耐心那么可以直接看完。如果沒有耐心,可以直接看一下我自己的結論。

  • weak = true的時候,對應c++11的compare_exchange_weak函數。
  • weak = false的時候,對應的是compare_exchange_strong函數。

區別這兩個函數的區別在於,weak在有的平台上(注意,是有的平台,這里不包括x86)會存在失敗的可能性。即,當*ptr == *expected依然有可能什么都不做而返回false

所以,在x86平台來說,這兩者可以說是沒什么區別。只是如果想要代碼可移值性好,那么采用compare_exchange_weak並且使用循環來判斷,那么是一種比較好的辦法。

結論就是:

  • 想要性能,使用compare_exchange_weak+循環來處理。
  • 想要簡單,使用compare_exchange_strong
  • 如果是x86平台,兩者沒區別
  • 如果想在移值的時候,拿到高性能,用compare_exchange_weak

詳細的說明

需要注意的是,weak = true表示弱CAS,在這種情況下,就是交換成功,也有可能返回失敗。

在某些平台上,即使 atomic的值和expected一樣,weak 版仍有可能會失敗。但是絕大多數情況不會失敗,且 weak 版的運行比strong版快。所以若本來就需要在循環里執行 compare-and-swap,用 weak 版較有效率;反之,只執行一次的話,用 strong 版可以省去不必要的循環。

所以weak和strong的區別在於,weak仍然在*ptr == expected的時候,執行依然會有小概率失敗。也就是說, 即使*ptr == expected,此時也不會發生值的設置,也會返回false。(不會是設置成功了,但是返回的是false)。

Many targets only offer the strong variation and ignore the parameter. When in doubt, use the strong variation.

一般而言,當你拿不准,就使用strong的版本。

If desired is written into *ptr then true is returned and memory is affected according to the memory order specified by success_memorder. There are no restrictions on what memory order can be used here.

Otherwise, false is returned and memory is affected according to failure_memorder. This memory order cannot be ATOMIC_RELEASE nor ATOMIC_ACQ_REL. It also cannot be a stronger order than that specified by success_memorder.

如果需要寫入* ptr則返回true,並根據success_memorder指定的內存順序影響內存。 這里可以使用什么內存順序沒有限制。

否則,返回false並根據failure_memorder影響內存。 此內存順序不能是ATOMIC_RELEASE和ATOMIC_ACQ_REL 。 它也不可能比success_memorder指定的順序更強大。

兩個內存序

這個函數系列有趣的是,后面還有兩個有趣的memory_order序。那么這兩個memory_order起什么作用呢?

結論就是,針對x86平台來說,取什么值都是一樣的。可以拿一段簡單的代碼進行嘗試,注意查看生成的匯編代碼。

#include <pthread.h>
#include <stdbool.h>

extern int a_lock;

int a_lock = 0;

static bool lock(int *l)
{
    int tmp = 0;
    return __atomic_compare_exchange_n(l, &tmp, 1,
            true , __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
}
static void unlock(int *l)
{
    __atomic_store_n(l, 0, __ATOMIC_SEQ_CST);
}

int main(int argc, char *argv[])
{
    lock(&a_lock);
    unlock(&a_lock);
}

 

 

root@ubuntu:~/c++# g++ -S -O0  -std=c++11  weak.cpp -o weak -pthread
root@ubuntu:~/c++# cat weak
        .arch armv8-a
        .file   "weak.cpp"
        .global a_lock
        .bss
        .align  2
        .type   a_lock, %object
        .size   a_lock, 4 a_lock:
        .zero   4
        .text
        .align  2
        .type   _ZL4lockPi, %function
_ZL4lockPi:
.LFB14:
        .cfi_startproc
        stp     x29, x30, [sp, -48]!
        .cfi_def_cfa_offset 48
        .cfi_offset 29, -48
        .cfi_offset 30, -40
        add     x29, sp, 0
        .cfi_def_cfa_register 29
        str     x0, [x29, 24]
        adrp    x0, :got:__stack_chk_guard
        ldr     x0, [x0, #:got_lo12:__stack_chk_guard]
        ldr     x1, [x0]
        str     x1, [x29, 40]
        mov     x1,0
        str     wzr, [x29, 36]
        ldr     x1, [x29, 24]
        add     x0, x29, 36
        ldr     w3, [x0]
        mov     w4, 1
        ldaxr   w2, [x1]
        cmp     w2, w3
        bne     .L5
        stlxr   w5, w4, [x1]
        cmp     w5, 0
.L5:
        cset    w1, eq
        cmp     w1, 0
        bne     .L2
        str     w2, [x0]
.L2:
        mov     w0, w1
        adrp    x1, :got:__stack_chk_guard
        ldr     x1, [x1, #:got_lo12:__stack_chk_guard]
        ldr     x2, [x29, 40]
        ldr     x1, [x1]
        eor     x1, x2, x1
        cmp     x1, 0
        beq     .L4
        bl      __stack_chk_fail
.L4:
        ldp     x29, x30, [sp], 48
        .cfi_restore 30
        .cfi_restore 29
        .cfi_def_cfa 31, 0
        ret
        .cfi_endproc
.LFE14:
        .size   _ZL4lockPi, .-_ZL4lockPi
        .align  2
        .type   _ZL6unlockPi, %function
_ZL6unlockPi:
.LFB15:
        .cfi_startproc
        sub     sp, sp, #16
        .cfi_def_cfa_offset 16
        str     x0, [sp, 8]
        ldr     x0, [sp, 8]
        stlr    wzr, [x0]
        nop
        add     sp, sp, 16
        .cfi_def_cfa_offset 0
        ret
        .cfi_endproc
.LFE15:
        .size   _ZL6unlockPi, .-_ZL6unlockPi
        .align  2
        .global main
        .type   main, %function
main:
.LFB16:
        .cfi_startproc
        stp     x29, x30, [sp, -32]!
        .cfi_def_cfa_offset 32
        .cfi_offset 29, -32
        .cfi_offset 30, -24
        add     x29, sp, 0
        .cfi_def_cfa_register 29
        str     w0, [x29, 28]
        str     x1, [x29, 16]
        adrp    x0, a_lock
        add     x0, x0, :lo12:a_lock
        bl      _ZL4lockPi
        adrp    x0, a_lock
        add     x0, x0, :lo12:a_lock
        bl      _ZL6unlockPi
        mov     w0, 0
        ldp     x29, x30, [sp], 32
        .cfi_restore 30
        .cfi_restore 29
        .cfi_def_cfa 31, 0
        ret
        .cfi_endproc
.LFE16:
        .size   main, .-main
        .ident  "GCC: (Ubuntu/Linaro 5.5.0-12ubuntu1) 5.5.0 20171010"
        .section        .note.GNU-stack,"",@progbits
root@ubuntu:~/c++# 

 

 

root@ubuntu:~/c++# cat weak 
        .arch armv8-a
        .file   "weak.cpp"
        .section        .text.startup,"ax",@progbits
        .align  2
        .p2align 3,,7
        .global main
        .type   main, %function
main:
.LFB17:
        .cfi_startproc
        adrp    x0, :got:__stack_chk_guard
        stp     x29, x30, [sp, -32]!
        .cfi_def_cfa_offset 32
        .cfi_offset 29, -32
        .cfi_offset 30, -24
        adrp    x1, .LANCHOR0
        add     x29, sp, 0
        .cfi_def_cfa_register 29
        ldr     x4, [x0, #:got_lo12:__stack_chk_guard]
        add     x2, x1, :lo12:.LANCHOR0
        ldr     x5, [x4]
        str     x5, [x29, 24]
        mov     x5,0
        str     wzr, [x29, 20]
        ldaxr   w4, [x2]
        cmp     w4, 0
        bne     .L4
        mov     w3, 1
        stlxr   w5, w3, [x2]
        cmp     w5, 0
.L4:
        beq     .L2
        str     w4, [x29, 20]
.L2:
        add     x1, x1, :lo12:.LANCHOR0
        stlr    wzr, [x1]
        ldr     x1, [x0, #:got_lo12:__stack_chk_guard]
        mov     w0, 0
        ldr     x2, [x29, 24]
        ldr     x1, [x1]
        eor     x1, x2, x1
        cbnz    x1, .L7
        ldp     x29, x30, [sp], 32
        .cfi_remember_state
        .cfi_restore 30
        .cfi_restore 29
        .cfi_def_cfa 31, 0
        ret
.L7:
        .cfi_restore_state
        bl      __stack_chk_fail
        .cfi_endproc
.LFE17:
        .size   main, .-main
        .global a_lock
        .bss
        .align  2
.LANCHOR0 = . + 0
        .type   a_lock, %object
        .size   a_lock, 4
a_lock:
        .zero   4
        .ident  "GCC: (Ubuntu/Linaro 5.5.0-12ubuntu1) 5.5.0 20171010"
        .section        .note.GNU-stack,"",@progbits
root@ubuntu:~/c++# 

可以嘗試去修改一下__atomic_compare_exchange_n后面兩個內存序。可以發現最后生成的匯編代碼沒有什么區別。

小結

  • 為了跨平台性以及高性能,那么使用compare_exchange_weak好的辦法。
  • 后面的兩個內存序,使用默認的std::memory_order_seq_cst即可。這是因為,在x86平台上,微調這兩個參數生成的代碼都是一樣的,沒有什么收益。其他弱內存序的cpu上面,可能每種cpu得到的最合適的這兩個參數都會有所不同。為了避免去適配各個cpu,那么一種偷懶的辦法就是直接使用std::memory_order_seq_cst

一句話就是,直接使用var.compare_exchange_weak(a,b);這種形式最省事。比如:

void link_nodes_atomic(node * new_top_node, node * end_node)
    {
        tagged_node_handle old_tos = tos.load(detail::memory_order_relaxed);
        for (;;) {
            tagged_node_handle new_tos (pool.get_handle(new_top_node), old_tos.get_tag());
            end_node->next = pool.get_handle(old_tos);

            if (tos.compare_exchange_weak(old_tos, new_tos))
                break;
        }
    }

 

 

#include <atomic>

template<class T>
struct node {
   T data;
   node* next;
   node(const T& data) : data(data), next(nullptr) {}
};

template<class T>
class stack {
   std::atomic<node<T>*> head;
   public:
      void push(const T& data) {
         node<T>* new_node = new node<T>(data);
         new_node->next = head.load(std::memory_order_relaxed);
         while(!std::atomic_compare_exchange_weak_explicit(&head, &new_node->next,
            new_node, std::memory_order_release, std::memory_order_relaxed))
            ;
      }
};

int main() {
   stack<int> s;
   s.push(1);
   s.push(2);
   s.push(3);
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM