實現無鎖的棧與隊列(3)


怎樣實現一個無鎖隊列,網絡上有很多的介紹,其中流傳最廣,影響最大的恐怕就屬於以下兩篇論文:

 a) "Implementing lock free queue" by  John.D.Valois

 b) "Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms" by M.M. Michael & M.L. Scott。

很多基於鏈表的無鎖隊列的實現基本參考了該兩篇論文里的算法,網絡上也充斥着各種各樣的文章,而在眾多的文章中,Christian Hergert 的這篇:Introduction to lock-free/wait-free and the ABA problem(可能需要翻牆) 介紹了他基於 M.M.Michael & M.L.Scott 的論文,用 c++ 做的實現,后來被廣泛流傳。本文接下來的討論,也基本來自上述三個出處, 站在巨人的肩膀上,容易看得更遠。

前面兩篇博客簡單介紹了怎樣基於數組來實現一個無鎖的棧,雖然最后沒能如願寫出一個滿足理想條件的,但還好它們至少給了我們一些新的思路。現在我們就來探討探討怎樣用鏈表來實現一個無鎖隊列,以下的計論均假設基於強類型的內存模型,所有的讀寫操作至少帶有 acquire/release  語義,開始之前,先貼一張以鏈表為基礎的隊列圖。

 

上面這個隊列應該都不陌生,在 tail 指向的一端插入,在 head 指向的一端取出。有了前兩篇博文的基礎,對於這樣的隊列,理論上我們只要處理好 tail, head 兩個指針及頭尾兩個結點就可以實現無鎖了,事情看起來也比較簡單,那現在我們來嘗試寫一個 Push 操作,先定義結點:

// list based queue
1
struct Node 2 { 3 Node* next; 4 void* data; 5 }; 6 7 static Node* tail = NULL; 8 static Node* head = NULL;

然后准備插入:

 1 void Push(void* data)
 2  {
 3     Node* old_tail;
 4     Node* node = new Node();
 5     node->data = data;
 6     
 7     do 
 8     {
 9        old_tail = tail;
10  
11        if (!cas(&tail, old_tail, node))
12            continue
13  
14        if (old_tail)
15           old_tail->next = node;
16        else
17           cas(&head, NULL, node);
18  
19        break;
20  
21     }while (1);
22     
23  }

上面一段代碼乍看起來好像沒什么大問題,應該能正常工作吧??

--- 很遺憾,它是有問題的:

   1) 首先是第 4 行,我們用 new 來分配節點,但是 new 這個操作本身很有可能是有鎖的(如果你沒重載它實現自己的分配方式),至少在標准庫中這個 new 是有鎖的,我們在一個無鎖的操作里調用了有鎖的函數,后面還有必要去展示你精妙的算法嗎?

   2) 第 15 行,我們的原意是想把尾結點指向我們新插入的結點,想法太單純太一廂情願,你怎么保證當你執行這個操作時,old_tail->next 這個語句中的 old_tail 指向的結點沒有被別的線程所釋放掉?設想一下,當你從 11 行執行完 CAS 操作后,當前線程就可能會被切換了出去,再被切換回來時,或許又是滄海桑田,或許 old_tail 已經不復存在了。

前面我說過,用鏈表來實現無鎖隊列,有幾個麻煩問題要解決,上面這兩個問題中關於 new 的使用就是其中一個,而這個問題已經有點兒類似於蛋生雞還是雞生蛋的問題了,所以最開始我就想着要不干脆避開動態內存分配的問題,用數組來試試,只是后面發現這不是一個可以躲避的問題,所以現在又老老實實跑回來。對於這個問題,目前來說,比較直接的解法方法是把分配節點的那套用另外的方法來實現,比如說用 tls(thread local storage),分配內存完全在線程內進行,不和別的線程競爭,這樣就沒必要用鎖了,缺點是不好寫出有好移值性的代碼來,因為 tls 與平台,OS 相關。對於 tls 的使用,不熟悉的讀者可以參考一下這篇文檔,  我根據自己的需要,做了一個簡單的封裝,代碼放這里

對於上面提到的第二個問題, 我們再仔細看一下,old_tail 所指向的節點之所以有可能被釋放,原因是它現在是在鏈表上,而任何在鏈表上的節點,都屬於線程間的公共變量,隨時有可能被某個線程取下來做其它事情,因此一定要避免直接在鏈表上修改某個節點,這種操作不能保證原子性及線程獨有(exclusive)?  如果需要修改它,就先把它取下來,確保了它只屬於當前線程,再進行相關的修改。

現在回到之前的問題,我們是不是應該像剛剛討論的那樣,直接把節點從鏈表上摘下來,修改完再放回去呢? 很顯然不可行,因為節點一但取出來,我們很難再把它放回原來的位置了,而隊列這個數據結構對結點順序有嚴格要求,不應該因為我們的操作而導致里面的節點亂序。怎么辦呢?John.D.Valois 在它的論文里介紹了一種做法,下圖展示了他設計的鏈表的樣子:

  

他的做法是引入一個 dummy 的頭,head 指向這個 dummy 的頭,鏈表中真正包含數據的結點由 dummy->next 指向。這個改進保證了,在進行 push 操作的時候,tail 指向的節點永遠都是存在的(空隊列的時候,指向 dummy 頭), 因此也就避免了之前所遇到的問題, 除此之外,這個 dummy header 也使得在處理更空隊列時,更加的容易。

Jobh.D.valois 算法偽代碼如下:

 1 Enqueue(x)
 2 {  
 3     q = new record
 4     q.value = x
 5     q.next  = NULL
 6      
 7     do
 8     {
 9         p = tail
10         succ = cas(p.next, NULL, q)
11         if succ != true
12            cas(tail, p, p.next)
13     } while(succ == false)
14  
15     cas(tail, p,q)
16 }
17 
18 Dequeue
19 {
20      do
21      {
22          p = head
23          if p.next == NULL
24              return NULL
25        }while(false == cas(head, p, p.next)
26      
27       return p.next.value
28 }

其中第 10 行是嘗試把新節點掛到隊列的尾巴上,這個操作有可能失敗,因此用一個 while 來反復嘗試,第 11,12 的代碼是個很關鍵的兩行,假設當前的線程 1 在第 10 行執行失敗了,那證明已經有別的線程,假設為線程 2,成功把它的結點掛到了鏈表的尾端,正常情況下,線程 1 也沒必要執行 11,12 行,但在某些情況下,如果不幸,線程 2 在把節點掛到鏈表的尾巴上后,還沒有來得及更新 tail 指針時,就掛了。這時線程 1 執行 11,12 行就能幫助線程 2 把 tail 更新一下,否則線程 1 就只能一直在 while 里面一直打傳了,這兩行代碼事實上保證了任何 push 操作都會在有限的時間內能完成。

上面的偽代碼是算法的原始版本,John.D.valois 在論文里指出上面的第 11,12行雖然保證了任何 push 操作不會等太久,但有一個缺陷,在並發比較快的場景下,第 11,12 行可能會被反復執行,而 cas 操作相對是一個比較費時的操作,因此這里的效率相對受影響,回頭仔細再想想,第 11,12 行那么費勁,只是為了更新一下 tail 指針,這個是有必要的嗎?是不是 tail 如果不指向最后的節點,就沒法完成插入了呢?

答案是否定的,tail 不必隨時都指着尾巴,我們之所以固定思維地覺得一定要 tail 指向尾巴,不過是因為我們插入新節點時,總是需要在尾巴的后面,但是我們忘了,tail 即使不指向尾巴,我們可以也是可以找到尾巴節點的:順着 tail->next 往下搜索不就行了嗎,於是得到如下的改進:

 

 1 Enqueue
 2 {        
 3       q = new record
 4       q.value = x
 5       q.next  = NULL
 6         
 7        p = tail
 8        oldp = p
 9 
10      do
11      {
12          
13            while( p.next != NULL)
14                p = p.next         
15 
16      } while(cas(p.next, NULL, q) == false)
17   
18      cas(tail, oldp, q)
19  }

其中第 13,14 行就是為找到最尾巴上的節點,注意此時的 tail 並不一定是指向最后一個節點的。改進的版本減少了對 cas 的使用,而把大部分時間花在找尾巴節點上了,但是有研究表明,這個找節點的循環不會太長,假如當前有 p 個線程在 enqueue,那 tail 離尾巴最多就隔着 2p-1 個節點,具體的證明可以參看原論文,不難理解。

到目前為止,一切看起來都還好,仿佛實現一個無鎖隊列馬上就能完成了,現在我們來看看別人是怎么做的,根據 John.D.Valois 的論文,Christian Hergert 在他的博客中用 c++ 實現了一個版本,代碼如下:

 1     typedef struct _Node Node;
 2     typedef struct _Queue Queue;
 3 
 4     struct _Node {
 5         void *data;
 6         Node *next;
 7     };
 8 
 9     struct _Queue {
10         Node *head;
11         Node *tail;
12     };
13 
14     Queue*
15     queue_new(void)
16     {
17         Queue *q = g_slice_new(sizeof(Queue));
18         q->head = q->tail = g_slice_new0(sizeof(Node));
19         return q;
20     }
21 
22     void
23     queue_enqueue(Queue *q, gpointer data)
24     {
25         Node *node, *tail, *next;
26 
27         node = g_slice_new(Node);
28         node->data = data;
29         node->next = NULL;
30 
31         while (TRUE) {
32             tail = q->tail;
33             next = tail->next; // 改為 q->tail->next 會更好 34             if (tail != q->tail)
35                 continue;
36 
37             if (next != NULL) {
38                 CAS(&q->tail, tail, next);
39                 continue;
40             }
41 
42             if (CAS(&tail->next, null, node) // 應改為 CAS(&q->tail->next, null, node) 43                 break;
44         }
45 
46         CAS(&q->tail, tail, node);
47     }
48 
49     gpointer
50     queue_dequeue(Queue *q)
51     {
52         Node *node, *tail, *next;
53 
54         while (TRUE) {
55             head = q->head;
56             tail = q->tail;
57             next = head->next; // 應改為 q->head->next 58             if (head != q->head)
59                 continue;
60 
61             if (next == NULL)
62                 return NULL; // Empty
63 
64             if (head == tail) {
65                 CAS(&q->tail, tail, next); // next 不空,head == tail, 即 tail 並沒有指向真正的尾巴 66                 continue;
67             }
68 
69             data = next->data;
70             if (CAS(&q->head, head, next))
71                 break;
72         }
73 
74         g_slice_free(Node, head); // This isn't safe
75         return data;
76     }

 

上面的代碼看起來和論文里的偽代碼是完全一致的,前面我們一直在討論 enqueue,現在我們來看 dequeue.  

a) 有人可能注意到 33/57/69 行有問題,tail/head/next 有可能已經被釋放了。確實有可能,33/57 行的問題好解決(純粹只是寫的不對,參看我的注釋),69 行的問題卻非常麻煩了,這個問題是實現 lock free queue 最難解決的問題之一:memory reclamation,但是這個問題在現代的操作系統中,也往往較難觸發,比如在有 virtual memory 的系統中,上面的操作只是讀取可能被 freed 掉的內存,而標准庫對內存分配常常有緩沖處理,free 掉的內存並不一定會立即返回給 OS, 而且即使被返回 OS,只要相應的內存沒有真正釋放(如在 linux 中,unmap),哪怕這塊內存再次被重新分配了,也是可以去讀的,因此問題不容易出現,但問題仍然是存在的,因此不能存在繞幸的想法,想一想,用着一個明知有問題的東西,然后祈禱問題不會出現,那感覺,仿佛心里有只蒼蠅。

b) 還有問題的地方是在第 70 行,這一行想要做什么呢?它的作用是把 dummy head 指向的節點取下來,看似比較簡單的事情,但上面的代碼在 c, c++ 中卻是不安全的。。。當然,這個不安全不容易看出來, 現在我們看看這兒究竟怎么不安全。假設某時刻,隊列如下:

    Node1 -> node2 -> node3

假設線程 1,開始執行 dequeue, 在執行到 70 行時(還沒開始 cas), 它被停下,切換了出去,此時線程 1 來說,第 70 行中,head, next 分別指向 node1, node2。

另一個線程 2,開始執行,然后它成功把 node1 pop 出去了,然后第 74 行,free(node1),它運氣比較好,然后又把 node2,node3 也 Pop 出去了,此時隊列為空!

然后又切換到線程 3,它要執行 enqueue 操作,此時它會在 27 行分配一個節點,壞事來了,如果十分巧合的情況下,它分配得到了線程 1 所 free 掉的 node1,enqueue 之后,隊列成了如下樣子:

   Node1-> NULL

線程 3 執行完后,如果恰好又切換回線程 1,此時,對線程 1 來說,第 70 行中的 q->head == head == node1, 因此 cas 會成功!但是 next 呢?next 指向的是被釋放掉的 node2! 嚴重問題!

這個就是無鎖世界里所謂的 ABA 問題,這個問題就是我為什么最開始想嘗試用數組來實現無鎖操作的原因之二,而且,ABA 問題不容易解決。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM