基於循環數組的無鎖隊列


在之前的兩篇博客(線程安全的無鎖RingBuffer的實現多個寫線程一個讀線程的無鎖隊列實現)中,分別寫了在只有一個讀線程、一個寫線程的情況下,以及只有一個寫線程、兩個讀線程的情況下,不采用加鎖技術,甚至原子運算的循環隊列的實現。但是,在其他的情況下,我們也需要盡可能高效的線程安全的隊列的實現。本文實現了一種基於循環數組和原子運算的無鎖隊列。采用原子運算(compare and swap)而不是加鎖同步,可以很大的提高運行效率。之所以用循環數組,是因為這樣在使用過程中不需要反復開辟內存空間,可以達到較好的效率。本文的實現參考了論文Implementing Lock-Free Queues所提到的方法。但是,在這篇論文中,作者並沒有給出具體的實現。本文的實現也和論文中的方法有所不同。本文的實現基於Windows操作系統,代碼在Visual Studio 2010下測試通過。

本文的實現基於compare and swap這個原子操作,簡寫為CAS。其原型為

long CAS(long* ptr, long old_value, long new_value).

其操作為,如果ptr所指向的變量和old_value相等,則將其置為new_value. 否則什么也不做。返回值為ptr所指向的變量。

本文提出的方法的基本原理如下。代碼會在其后附上。

首先,我們約定,有一個特殊的數值,叫做EMPTY,存入數組的所有數都不能是這個數值。在一開始,將整個buffer的值都初始化為EMPTY.

第二,以兩個變量,readCnt和writeCnt,來記錄讀和寫的次數。這兩個數模數組長度,就可以得到下一次讀和寫的位置。如果readCnt和writeCnt模buffer大小相等,則說明當前隊列為控;而如果在寫的位置不是EMPTY,則說明buffer已滿。

第三,也就是最重要的,線程同步的原理。在進行寫buffer操作時,首先通過CAS操作將buffer對應位置置為指定值,而writeCnt不變,因此其他線程無法在同樣的位置進行寫操作,這樣防止了其他寫線程的覆蓋。而由於此時writeCnt還沒有變,讀線程此時也無法讀該位置的數據,這樣防止了其他讀線程的沖突。接下來,第二步,則是通過CAS操作,將writeCnt加一。可以看到,代碼中在第一個CAS操作失敗的情況下,會執行一個增大writeCnt的原子操作。這樣做的目的是避免某個線程停掉導致其他所有線程都停掉。

而在進行寫操作時,首先通過判斷readCnt和writeCnt是否相等來判斷當前隊列是否已滿。這樣做的原因如前所述,是為了避免和寫線程之間的沖突。接下來,如果當前位置可以讀,則通過一個CAS操作將readCnt加一。這樣,其他讀線程就無法再讀這個位置。而由於這個位置值還不是EMPTY,其他寫線程也無法寫這個位置。接下來,在讀走數值之后,再通過CAS操作將buffer中的這個位置置為EMPTY. 此時,寫線程可以在這個位置寫入數據。

但是,這個實現其實還有點問題。如果讀線程比較多,例如,讀線程個數和數組長度一樣,就可能兩個讀線程同時讀同一個位置。這樣的讀沖突這個程序是無法避免的。另外,如果讀數據過程中有一個讀線程停掉了,那么其他線程在讀或者寫到這個位置時,也會被阻塞。

代碼如下。

 1 #include <windows.h>
 2 #include <stdlib.h>
 3 
 4 #define CAS(ptr, oldval, newval) (InterlockedCompareExchange(ptr, newval, oldval))
 5 
 6 static const long EMPTY=-1;
 7 
 8 class NBQueue {
 9 public:
10   NBQueue()
11     : queueSize(0)
12     , buffer(NULL)
13     , readCnt(0)
14     , writeCnt(0){}
15 
16   ~NBQueue() {
17     uninit();
18   }
19 
20   bool init(int size) {
21     queueSize = size;
22     try {
23       buffer = new long[queueSize];
24     }
25     catch (...) {
26       queueSize = 0;
27       buffer = NULL;
28       return false;
29     }
30     for (int i = 0; i < queueSize; i++) {
31       buffer[i] = EMPTY;
32     }
33     return true;
34   }
35   void uninit() {
36     if (buffer != NULL) {
37       delete[] buffer;
38       buffer = NULL;
39     }
40     queueSize = 0;
41   }
42 
43   bool put(long v) {
44     bool succ;
45     long tail;
46     long index;
47     do {
48       tail = writeCnt;
49       index = tail % queueSize;
50       if (buffer[index] != EMPTY) {
51         return false;
52       }
53       long oldval = CAS(&buffer[index], EMPTY, v);
54       succ = oldval == EMPTY;
55       if (!succ) {
56         CAS(&writeCnt, tail, tail + 1);
57       }
58     } while (!succ);
59 
60     CAS(&writeCnt, tail, tail + 1);
61 
62     return true;
63   }
64 
65   bool get(long* v) {
66     long head;
67     bool succ;
68     do {
69       head = readCnt;
70       if (head == writeCnt) {
71         return false;
72       }
73       long oldval = CAS(&readCnt, head, head + 1);
74       succ = oldval == head;
75     } while (!succ);
76 
77     long index = head % queueSize;
78     *v = buffer[index];
79     CAS(&buffer[index], *v, EMPTY);
80 
81     return true;
82   }
83 
84 private:
85   long* buffer;
86   int queueSize;
87   long readCnt;
88   long writeCnt;
89 };

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM