【DPDK】【ring】從DPDK的ring來看x86無鎖隊列的實現


【前言】

  隊列是眾多數據結構中最常見的一種之一。曾經有人和我說過這么一句話,叫做“程序等於數據結構+算法”。因此在設計模塊、寫代碼時,隊列常常作為一個很常見的結構出現在模塊設計中。DPDK不僅是一個加速網絡IO的框架,其內部還提供眾多的功能組件,rte_ring就是DPDK內部提供的一種無鎖隊列,本篇文章將從使用的角度出發闡述DPDK的ring怎么用?在怎么用的角度上再來闡述ring無鎖的實現,最后將探討實現無鎖隊列的關鍵以及在不通平台上如何實現,本文將會探討x86平台下無鎖隊列的實現。

  權當拋磚引玉,有問題請留言指正,感激不盡。

【場景】

  程序等於數據結構+算法。但是場景仍然是最重要的,因為場景取決於我們到底“用不用”某個技術或者是某個組件,亦或是某種數據結構。

  做數據面的都應該見過如圖1的這種線程模型。

圖1.常見的數據面線程模型

  圖1是一種常見的數據面模型,比如linux基金會的FD.IO(VPP)采用的就是這種線程模型,這種線程模型下分工明確:

  1. Main Thread做管理。常常使用協程驅動實現單線程多任務(VPP內部實現了一套類似於協程的調度機制,以此來實現單線程多任務的調度)
  2. fwd Thread做純轉發。通常為了性能考慮,在轉發路徑上嚴禁有內存拷貝和系統調用(但是凡事都有例外)。

  那么現在有一種需求,fwd線程需要將一些信息上傳至控制面進程那么最好的做法是什么呢?這里通常有很多種實現方式,但是均和本篇文章的主要討論對象無關,因此不多做討論。

  其中一種常見的手段就是通過ring,還有一種場景就是DPDK的multiprocess場景,也同樣可以通過ring來講數據包分發到其他process中。如圖2這種情況(這個圖畫錯了,應該是指向main thread...)

圖2.另外一種常見的場景

  這種場景是典型的“僧多肉少”型,就是“processer的數量多於rx隊列數量”,那么這種場景下注定有一些processer是無法接管網卡隊列的,但是我還想發揮這些processer的處理能力,怎么辦?

  那么常見的方案就是在接管到rx隊列的processer將數據包從rx queue上收上來后,計算數據包的rss,然后將數據包“盡量均勻”的通過ring來發送到那些沒有分配到rx queue的fwd thread上。其實也不光是雲計算的數據面場景,在很多場景下我們都需要用到隊列,因為隊列是一個再基礎不過的數據結構,因此我們拿DPDK的ring出發,最終闡述無鎖隊列的常見實現方式。

【DPDK ring 從使用出發】

  我個人覺得任何一種技術,出發點肯定是“先用再分析”,說白了就是對一種技術或對某一個模塊的直觀印象都不是直接分析代碼就能得到的,都是“先跑起來,玩一下,看看情況”得到的第一印象,因此這里還是會先從使用的角度出發,先會用再分析實現。如果有用過DPDK Ring,那么本節可以直接跳過,直接看后面的分析章節。

  DPDK的ring代碼主要以lib的形式集成在DPDK源代碼中,具體代碼位置為:DPDK根目錄/lib/librte_ring目錄中。以下代碼均已DPDK 19.11版本作為參照(其他版本基本都是大同小異)。

  先介紹一下主要的函數接口:

struct rte_ring * rte_ring_create(const char *name, unsigned count, int socket_id, unsigned flags) //創建dpdk的rte_ring
void rte_ring_free(struct rte_ring *r) //釋放已經創建的dpdk的rte_ring
struct rte_ring * rte_ring_lookup(const char *name) //去尋找一個已經創建好的dpdk的rte_ring
static __rte_always_inline unsigned int __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, unsigned int n, enum rte_ring_queue_behavior behavior, unsigned int is_sp, unsigned int *free_space) //此函數為內部方法,所有入隊函數都是此函數的上層封裝
static __rte_always_inline unsigned int __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, unsigned int n, enum rte_ring_queue_behavior behavior, unsigned int is_sc, unsigned int *available) //此函數為內部方法,所有出隊函數都是此函數的上層封裝
static __rte_always_inline unsigned int rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) //此函數為批量入隊函數,為多生產者安全(multi producer)
static __rte_always_inline unsigned int
rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) //此函數為批量入隊函數,為單生產者安全(single producer)
static __rte_always_inline unsigned int rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) //此函數為批量入隊函數,具體安全性質取決於創建隊列時的標志(flags)
static __rte_always_inline unsigned int rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) //此函數為批量出隊函數,為多消費者安全(multi consumer)
static __rte_always_inline unsigned int rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) //此函數為批量出隊函數,為單消費者安全(single consumer)
static __rte_always_inline unsigned int rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) //此函數為批量出隊函數,具體安全性質取決於創建隊列時的標志(flags)
static inline unsigned rte_ring_count(const struct rte_ring *r) //此函數用於查看隊列中元素的數量

  可以看到上述函數列表(只是代表性的一部分)基本分為三類接口

  1. 創建、銷毀、尋找隊列實例;
  2. 入隊、出隊;
  3. 查看隊列狀態,例如查看隊列是否為滿,查看隊列元素個數等等。

圖3.出隊的操作函數

  實際上如果讓我們自己設計一個隊列,基本上也逃離不出去這些接口,並且根據圖3可以看出,所有的出隊函數基本都是基於__rte_ring_do_enqueue的封裝而已。

那么實際使用起來的步驟可以基本可以為以下流程圖描述

圖3.dpdk ring常見的使用流程

  使用流程還是非常簡單的,因為隊列本身作為一個常見的數據結構使用起來並不復雜,具體使用的例子可以看dpdk的example/multiprocess/中的例子。

  但是使用的時候有幾個地方需要注意

  1. ring在創建時調用的rte_ring_create函數中最后兩個參數socket_id和flags一定要注意。socket_id這里的socket不是unix網絡編程中的socket,而是指的numa節點,numa架構下,如果processer訪問的內存和自己不在一個numa node上會產生非常嚴重的性能損耗。flags決定了這個隊列的性質,也就是是“什么性質的安全”,例如如果指定RING_F_SP_ENQ那么就會創建一個單生產者安全的隊列(實際上完全是扯淡,創建時的flags實際上影響的並不是隊列本身的性質而是調用隊列的函數__rte_ring_do_enqueue參數)
  2. ring在創建時調用的rte_ring_create函數中,大小必須是2的N次冪大小。
  3. ring的push或者是pop,不是對整個對象進行操作,而是對對象的內存進行操作,換句話說push和pop塞入/得到的其實只能是對象的內存地址而已,所以性能很高。(這點也符合數據面的設計原則,嚴禁內存拷貝,如果是拷貝整個對象那么勢必會產生額外的內存拷貝,傳內存既不發生內存拷貝,性能又強,為何不這么做呢?)

  可以看到dpdk的rte_ring使用上還是蠻簡單的,因此接下來就從源碼出發解析一下dpdk的rte_ring的無鎖實現。

【DPDK ring 的無鎖實現】

  先說結論:

無鎖的實現依賴於一個匯編指令: cmpxchg
翻譯過來就是compare and change

  我們先看看dpdk的ring是如何實現無鎖的,我們拿__rte_ring_do_enqueue和__rte_ring_do_dequeue這兩個函數開刀,這兩個函數分別是入隊和出隊的底層實現函數,其余所有的入隊和出隊函數都是基於這兩個函數進行了上層封裝而已。

  先想一下,在多生產者和多消費者場景下,分別要應付哪些問題?

  1. 多個生產者,生產位置有沖突,比如生產者A要push 3個元素,生產者B要push 3個元素,如何做到不沖突不覆蓋?
  2. 生產者和消費者,生產了之后要讓消費者可以消費,消費了之后要讓生產者進行生產。
  3. 多消費者,和多生產者的問題類似,消費位置沖突,比如消費者A要消費3個元素,消費者B要消費3個元素,如何做到消費不沖突讓每一個消費者都能有元素可以消費?

  我們先看第一個問題和第二個問題是如何實現的,但是在分析實際函數的實現之前,我們要先分析一下rte_ring。

struct rte_ring {
    char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; // ring的名稱,lookup的時候就是根據名稱進行查找對應的ring

    int flags;                                           // 標記,用來描述隊列是單/多生產者還是單/多消費者安全

    const struct rte_memzone *memzone;                     // 所屬的memzone,memzone是dpdk內存管理底層的數據結構

    uint32_t size;                                        // 隊列長,為2^n。如果flags為RING_F_EXACT_SZ
                                                         // 隊列size為初始化時隊列長度的向上取2的n次冪,例如如果為
                                                         // 7,那么向上取最近的2^n冪的數為8.如果flags不為
                                                         // RING_F_EXACT_SZ,那么初始化隊列的時候隊列長必須為2^n冪                                                         

    uint32_t mask;                                         // 掩碼,為隊列長 - 1,用來計算位置的時候取余用

    uint32_t capacity;                                    // 隊列容量,一般不等於隊列長度,把隊列容量理解為實際可以
                                                         // 使用的元素個數即可。例如初始化時count為7並且指定標志為
                                                         // RING_F_EXACT_SZ,那么count最后為8,但是capacity為7,因為
                                                         // 8是向上取2^n冪取出來的,實際上仍然是創建時所需的個數,8.

    char pad0 __rte_cache_aligned;                          // 填充,考慮到性能,要使用填充法保證cache line

    struct rte_ring_headtail prod __rte_cache_aligned;   // 生產者位置,里面有一個生產者頭,即prod.head,還有一個生
                                                         // 產者尾,即prod.tail。prod.head代表着下一次生產時的起始
                                                         // 生產位置。prod.tail代表消費者可以消費的位置界限,到達
                                                         // prod.tail后就無法繼續消費,通常情況下生產完成后,
                                                         // prod.tail = prod.head,意味着剛生產的元素皆可以被消費

    char pad1 __rte_cache_aligned;                          

    struct rte_ring_headtail cons __rte_cache_aligned;   // 消費者位置,里面有一個消費者頭,即cons.head,還有一個消
                                                         // 費者尾,即cons.tail。cons.head代表着下一次消費時的起始
                                                         // 消費位置。cons.tail代表生產者可以生產的位置界限,到達
                                                         // cons.tail后就無法繼續生產,通常情況下消費完成后,
                                                         // cons.tail = cons.head,意味着剛消費的位置皆可以被生產
                                                         
    char pad2 __rte_cache_aligned; /**< empty cache line */
};

上述數據結構為rte_ring的數據結構,rte_ring就代表着一條ring,是ring的抽象。其中重要的是兩個地方,一個是prod,一個是cons,前者代表生產者,后者代表消費者,里面分別有兩個標記,關於標記的用途已經在上述代碼的注釋中闡述。

但是還有一點,ring中存放的數據在哪?dpdk的ring中存放的數據位置可以見圖4.

圖4.dpdk ring的內存分布圖

  可以看到,rte_ring的data中存放的是指針(就因為是指針才能利用cmpxchg實現“無鎖”),並且data分布在struct rte_ring緊鄰的空間中(圖中青色的內存塊)。在分析實際的函數前,再看幾個流程圖,結合rte_ring中的數據結構來看,理解會更加深刻(當然這部分的內容在《深入淺出dpdk》一書中的4.4.2節也有描述)。

  1.入隊操作,以單生產者單消費者(多生產者和多消費者基本差不多)為例。初始狀態為圖5所示。初始狀態中隊列中有4個元素,分別是obj1、obj2、obj3、obj4.

圖5.初始狀態

  2.第一步,新元素入隊,先偏移prod.head到新的生產者頭位置,例如現在位置為5,若生產元素的個數為2,那么新位置即為index = 7,但是由於涉及到多生產者,其中多生產者無鎖的奧秘就在這一步,因此先占位置,如圖6。

圖6.入隊的第一步操作

  3.第二步,元素寫入。

圖7.入隊的第二步操作

  4.第三步,更新生產者的尾指針,也就是prod.tail,因為第二步只是將元素寫入而已,涉及生產-消費的流程,還要告訴消費者“可以消費”,prod.tail的作用便是如此,所以需要更新,但是假設當前消費者開始消費,那么流程便如圖7所示,消費者的頭標記只能到達生產者尾標記的位置。

圖8.出隊的第一步操作

  5.第四步,消費者開始消費元素,此時生產者的tail標記開始更新。

圖9.出隊的第二步操作

  6.第五步,與生產者相同,消費者消費數據后,被消費后的空間不能立即用於生產,還需要更新tail標記才可以(cons.tail)

圖10.生產-消費后的最終狀態

  接下來,理解了上述生產-消費的流程后,既可以分析具體的函數了,接下來將站在生產者的視角進行分析代碼實現(消費者與生產者幾乎相同),拿生產者的入隊函數__rte_ring_do_enqueue來分析。

static __rte_always_inline unsigned int
__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
         unsigned int n, enum rte_ring_queue_behavior behavior,
         unsigned int is_sp, unsigned int *free_space)
{
    uint32_t prod_head, prod_next;
    uint32_t free_entries;
    //第一步,先偏移頭指針,搶占生產位置
    n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
            &prod_head, &prod_next, &free_entries);
    if (n == 0)
        goto end;
    //第二步,塞數據
    ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
    //第三部,更新尾指針,讓消費者可以消費
    update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
end:
    if (free_space != NULL)
        *free_space = free_entries - n;
    return n;
}

  上述代碼是一個典型的“三步走”。

  1. 先偏移頭指針,說白了就是搶位置。這步主要是為了對付多生產者的情況。
  2. 搶到位置后寫數據。
  3. 更新尾指針,讓消費者可以消費剛塞入的數據。

  那么很顯然,第一步就是對付第一個問題的,即在多生產者下如何讓生產者可以順利生產並且多個生產者之間不會互相沖突,所以需要分析一下__rte_ring_move_prod_head函數。

static __rte_always_inline unsigned int
__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
        unsigned int n, enum rte_ring_queue_behavior behavior,
        uint32_t *old_head, uint32_t *new_head,
        uint32_t *free_entries)
{
    const uint32_t capacity = r->capacity;
    unsigned int max = n;
    int success;

    do {
        //1.先確定生產者要生產多少個元素
        n = max;
        //2.拿到現在生產者的head位置,也就是即將生產的位置
        *old_head = r->prod.head;

        //內存屏障
        rte_smp_rmb();

        //3.計算剩余的空間
        *free_entries = (capacity + r->cons.tail - *old_head);

        //4.比較生產的元素個數和剩余空間
        if (unlikely(n > *free_entries))
            n = (behavior == RTE_RING_QUEUE_FIXED) ?
                    0 : *free_entries;

        if (n == 0)
            return 0;
        //5.計算生產后的新位置
        *new_head = *old_head + n;
        if (is_sp)
            r->prod.head = *new_head, success = 1;
        else //6.如果是多生產者的話調用cpmset函數實現生產位置搶占
            success = rte_atomic32_cmpset(&r->prod.head,
                    *old_head, *new_head);
    } while (unlikely(success == 0));
    return n;
}

  上述函數邏輯是一個非常簡單的實現邏輯,而關鍵在於第6點和do while循環,cmpset函數是什么?又是如何實現的生產位置搶占呢?

 1 static inline int
 2 rte_atomic32_cmpset(volatile uint32_t *dst, uint32_t exp, uint32_t src)
 3 {
 4     uint8_t res;
 5 
 6     asm volatile(
 7             MPLOCKED
 8             "cmpxchgl %[src], %[dst];"
 9             "sete %[res];"
10             : [res] "=a" (res),     /* output */
11               [dst] "=m" (*dst)
12             : [src] "r" (src),      /* input */
13               "a" (exp),
14               "m" (*dst)
15             : "memory");            /* no-clobber list */
16     return res;
17 }

  上述cmpset為x86體系下的實現,可以看到,是一段GCC內聯的匯編指令,這段內聯的嵌入匯編指令由三個匯編指令構成,最核心的一個指令便是第8行的“cmpxchg”,這便是我們最開始說的 "無鎖的實現依賴於cmpxchg指令",那么這個指令究竟是什么意思呢?

cmpxchg指令的意思就是“compare and change”,即“比較並交換”。
舉個例子,如果A等於B,則將C賦值給A;如果A不等於B,則拒絕將C賦值給A。

  根據這個特征我們可以知道,在多生產者場景下,最擔心的事情是什么呢?最擔心的事情即為“前腳剛計算好生產位置(偏移),后腳還沒等寫入數據,結果就被另外一個生產者把剛剛計算好的生產位置給占了,結果自己沒得空間生產”,將這個場景結合剛才的cmpxchg之后怎么解決呢?

如果生產位置沒有變化(A等於B),那么就將最新的生產位置(計算偏移后的生產位置)賦值給生產者指針;如果生產位置發生了變化(有其他生產者也在生產),那么就取消更新生產者指針

  核心實現就是上面這句話。關於rte_atomic32_cmpset函數,下一章【x86的cas】中會詳細講解。

  那么頭指針偏移部分代碼的流程圖可以總結如下:

  那么至此,第一個問題之“多生產者如何解決生產位置的問題得到了解決”,那么接下來就是第三個問題,“如何讓消費者可以消費剛剛生產的數據?”

  這個問題在“三步走”中的第三部中解決的。

static __rte_always_inline void
update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
        uint32_t single, uint32_t enqueue)
{
    //1.內存屏障
    if (enqueue)
        rte_smp_wmb();
    else
        rte_smp_rmb();
    //2.如果有其他生產者生產數據,那么需要等待其將數據生產完更新tail指針后,本生產者才能更新tail指針
    if (!single)
        while (unlikely(ht->tail != old_val))
            rte_pause();
    //3.更新tail指針,更新的位置為最新的生產位置,意味着剛剛生產的數據已經全部可以被消費者消費
    ht->tail = new_val;
}

  這里面可能唯一會讓人產生些許疑惑的就是step 2.這里有一個自旋鎖,自旋等待"ht->tail == old_val"條件的成立,這是為什么呢?想一下這樣的場景:

  單生產者單消費者情況下:生產數據成功后,應該講prod.tail指針前移至prod.head處,相當於告訴消費者隊列中的數據都是可以消費的,但是如果此時是多生產者場景,由於有多個生產者,prod.tail指針可能隨時發生變化,例如:

  剛開始的時候,prod.head = prod.tail = 0,生產者A生產了3份數據,prod.head = 3並且prod.tail = 0,隨后生產者B生產了2份數據,prod.head = 5並且prod.tail = 0,那么此時會滿足“ht->tail == old_val”么?不會,ht->tail = prod.tail = 0,而old_val的值卻為生產元素前的prod.head的值,也就是3.那么此時需要做的就是等待生產者A將3份數據完全生產完,並且將prod.tail更新至3,那么此時才會滿足“ht->tail == old_val”。說白了就是得等別的生產者完全生產完才能生產。但是從最終結果而言,生產者A生產了3個元素,生產者B生產了2個元素,最終結果中,prod.tail = 5,也就是剛剛生產的5個元素可以全部被消費者消費。

  所以從上面的“__rte_ring_do_enqueue”函數可以看出,想想所謂的無鎖隊列真的實現了理想的“無鎖”么?

  “rte_ring_do_dequeue”的函數執行流程與“__rte_ring_do_enqueue”的流程基本一致,無法后者為生產者視角,而前者為消費者視角,請讀者根據上述“隊列入隊”的分析過程自行分析“隊列出隊”。

【x86的CAS】

  可能有的讀者在“無鎖”這個概念上知道“無鎖”的實現是一種"CAS"操作,那么什么才是CAS操作呢?

CAS的全程為“Compare And Swap”,意味比較並交換

  “比較並交換”,這個概念和前一章中“cmpxchg”指令的含義基本一致。核心思想就是:

和預期結果比較,相同則賦值,不同則放棄

  如果和預期不同,那么我會一遍一遍的去嘗試,當沒有人和我競爭了,和預期結果自然就會“相同”,再回到之前的內聯匯編。

 1 static inline int
 2 rte_atomic32_cmpset(volatile uint32_t *dst, uint32_t exp, uint32_t src)
 3 {
 4     uint8_t res;
 5 
 6     asm volatile(
 7             MPLOCKED
 8             "cmpxchgl %[src], %[dst];"
 9             "sete %[res];"
10             : [res] "=a" (res),     /* output */
11               [dst] "=m" (*dst)
12             : [src] "r" (src),      /* input */
13               "a" (exp),
14               "m" (*dst)
15             : "memory");            /* no-clobber list */
16     return res;
17 }

  想讀懂這個函數首先需要先了解內聯匯編的正確寫法和格式。當然,接下來要說的內聯匯編格式為intel格式。由於涉及到內聯匯編的文章有許多,在這里不會詳細介紹內聯匯編的格式和寫法,更多的會聚焦於此函數的實現。

內聯匯編的函數格式為:

1 asm ( assembler template
2         : output operands                /* optional */
3         : input operands                   /* optional */
4         : list of clobbered registers   /* optional */
5 );

  很簡單,內聯匯編由4個部分組成:

  1. assembler template。也就是匯編的指令集合。對應到rte_atomic32_cmpset函數中就是line 7、8、9三行的內容。
  2. 輸出操作數,也稱為目的操作數,不懂操作數是什么的可以將它理解為C語言的左值,也就是輸出被賦值的變量,等號左邊的。對應到rte_atomic32_cmpset函數中就是line 10、11的內容。
  3. 輸入操作數,也稱為源操作數,不懂操作數是什么的可以將它理解為C語言的右值,也就是輸入賦值的變量,等號右邊的。對應到rte_atomic32_cmpset函數中就是line 12、13、14的內容。
  4. 被改變的寄存器的值,這個地方看場合,不同的場合不太一樣。對應到rte_atomic32_cmpset函數中就是line 15的內容,也就是內存屏障。
  5. 還有一點需要注意的是,opt-code %1,%2,其中在intel架構下,前者為目的操作數,也就是%1,后者為源操作數%2。
  6. 還有一個額外的概念就是constraints,也就是約束。對應到rte_atomic32_cmpset函數中就是操作數前面的雙引號部分,例如“a” (exp),這里雙引號里面的a就是一個constraints。約束分為很多種,這里只介紹常見的幾種:
    • "a"是一個寄存器約束。用來指定“eax”寄存器,被描述的對象會將值存至eax寄存器;
    • "="不算是一個constraints,而是作為一個修飾符,相當於告訴這個元素是“write-only”;
    • "r"同樣是一個寄存器約束,用來表明是通用寄存器,被修飾的操作數會被存到通用寄存器中,沒有具體指定的話就是任意;
    • “m”是一個內存約束,和寄存器約束的區別是,寄存器約束會將值取到寄存器中,參與完計算后會回寫到內存中,而內存約束就不需要寄存器作為中轉,全程在內存中進行,所以速度也會慢於寄存器。

  那么我們接着回到rte_atomic32_cmpset函數的實現,line 7是一個x86架構下的“lock”指令指令前綴,注意“lock”其實本質上不是一個指令,而是一個指令前綴,也就是用來修飾接下來的指令,支隊接下來的指令有效力,並且修飾的指令必須是對內存有“讀-改-寫”三種操作的指令,就比如說cmpxchg指令就是。

#if RTE_MAX_LCORE == 1
#define MPLOCKED                        /**< No need to insert MP lock prefix. */
#else
#define MPLOCKED        "lock ; "       /**< Insert MP lock prefix. */
#endif

  在x86多核架構下,lock指令通常用來確保多核訪問cache line是具有排他性的(相當於一把鎖)。

  第一個指令是cmpxchg,關於cmpxchg我們前面已經大致講過此命令的作用。此命令的實際作用是:

比較源操作數和eax寄存器中的值,如果相同,則將目的操作數更新為源操作數,並且將標志寄存器中的ZF(zero flags)位置1;如果源操作數和eax寄存器中的值不通,則將源操作數寫入eax寄存器中,並將標志寄存器中的ZF(zero flags)清0

  那么對照上面的場景,一般eax寄存器中存的值都是初始值,也就是還沒有計算入隊偏移的初始值,由於在計算入隊偏移操作時,其他生產者可能也在進行計算入隊偏移,那么就會起沖突,具體體現就是生產者頭指針發生變化,因此在cmpxchg指令中,再拿生產者頭指針和初始值進行比較,如果相同這說明現在沒有其他生產者在更新,那么源操作數(當前生產者頭指針)和eax寄存器中的值(事先備份的初始值)必定相同,此時則可以安全的將目的操作數賦值至源操作數,也就是(prod.head = new_head);如果不同,這說明現在可能有其他生產者在生產導致生產者頭指針發生變化(prod.head發生變化),那么此時便不能更新源操作數(prod.head)。

  第二個指令是sete,這個指令就很簡單了,就是單純的將標志寄存器中的zf位的值賦值給目的操作數,也就是res。那就意味着如果cmpxchg執行交換成功,則zf位為1,那么經過sete設置后,res返回值也就是1;如果cmpxchg執行交換失敗,則zf為0,那么經過sete設置后,res的返回值也就是0.

  那么這個函數便是,如果cmpxchg成功,則函數返回1,如果cmpxchg失敗,則函數返回0,那么根據函數的返回值,上層邏輯便知道更新生產者頭指針是否成功,成功直接返回即可;不成功怎么辦呢?也很簡單,循環,我一次一次試(while循環),總會成功的。

  可以看到,CAS操作實現無鎖的本質上就是“比較”,比較什么呢?這取決於我們最擔心什么?那我們最擔心的是什么呢?我們最擔心的無非就是

生產者的視角:我剛開始根據舊的生產者頭指針 + 生產的元素數量,計算出生產后的指針位置,結果在我計算的過程中,由於有其他生產者干擾,導致舊的生產者頭指針已經發生了變化,那么計算出的生產后的指針位置也是失效的。
消費者的視角:我剛開始根據舊的消費者頭指針 + 消費的元素數量,計算出消費后的指針位置,結果在我計算的過程中,由於有其他消費者干擾,導致舊的消費者頭指針已經發生了變化,那么計算出的消費后的指針位置也是失效的。

  所以需要比較什么呢?比較是“預期值”與“實際值”,預期值是我們希望“舊的生產者頭指針不會發生變化”,那么實際值便是“當前的生產者頭指針位置”,那么我只需要比較兩者便可以得知,是否有其他生產者干擾,只有符合預期的情況,我才可以進行接下來的操作,也就是賦值。

【后續】

  1. CAS操作真的做到了理想的“無鎖”了么?我個人的想法是並沒有做到,因為那段匯編指令仍然是同時只有一個執行者(processer)通過,只不過是和傳統的mutex、spinlock之類的鎖,critical section更小而已。
  2. CAS操作的本質是“比較預期值和實際值”,ARM平台下也有類似的CAS操作,只不過ARM平台下叫做LL/SC,本質上與CAS相同,都是比較,以后有精力會將ARM平台的LL/SC也補上。
  3. 我本人是沒想到一個dpdk ring的分析會寫將近1周...雖然自己明白,但是實際寫成文檔,輸出博客,還是得花上一番心思的。

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM