在之前的兩篇博客(線程安全的無鎖RingBuffer的實現,多個寫線程一個讀線程的無鎖隊列實現)中,分別寫了在只有一個讀線程、一個寫線程的情況下,以及只有一個寫線程、兩個讀線程的情況下,不采用加鎖技術,甚至原子運算的循環隊列的實現。但是,在其他的情況下,我們也需要盡可能高效的線程安全的隊列的實現。本文實現了一種基於循環數組和原子運算的無鎖隊列。采用原子運算(compare and swap)而不是加鎖同步,可以很大的提高運行效率。之所以用循環數組,是因為這樣在使用過程中不需要反復開辟內存空間,可以達到較好的效率。本文的實現參考了論文Implementing Lock-Free Queues所提到的方法。但是,在這篇論文中,作者並沒有給出具體的實現。本文的實現也和論文中的方法有所不同。本文的實現基於Windows操作系統,代碼在Visual Studio 2010下測試通過。
本文的實現基於compare and swap這個原子操作,簡寫為CAS。其原型為
long CAS(long* ptr, long old_value, long new_value).
其操作為,如果ptr所指向的變量和old_value相等,則將其置為new_value. 否則什么也不做。返回值為ptr所指向的變量。
本文提出的方法的基本原理如下。代碼會在其后附上。
首先,我們約定,有一個特殊的數值,叫做EMPTY,存入數組的所有數都不能是這個數值。在一開始,將整個buffer的值都初始化為EMPTY.
第二,以兩個變量,readCnt和writeCnt,來記錄讀和寫的次數。這兩個數模數組長度,就可以得到下一次讀和寫的位置。如果readCnt和writeCnt模buffer大小相等,則說明當前隊列為控;而如果在寫的位置不是EMPTY,則說明buffer已滿。
第三,也就是最重要的,線程同步的原理。在進行寫buffer操作時,首先通過CAS操作將buffer對應位置置為指定值,而writeCnt不變,因此其他線程無法在同樣的位置進行寫操作,這樣防止了其他寫線程的覆蓋。而由於此時writeCnt還沒有變,讀線程此時也無法讀該位置的數據,這樣防止了其他讀線程的沖突。接下來,第二步,則是通過CAS操作,將writeCnt加一。可以看到,代碼中在第一個CAS操作失敗的情況下,會執行一個增大writeCnt的原子操作。這樣做的目的是避免某個線程停掉導致其他所有線程都停掉。
而在進行寫操作時,首先通過判斷readCnt和writeCnt是否相等來判斷當前隊列是否已滿。這樣做的原因如前所述,是為了避免和寫線程之間的沖突。接下來,如果當前位置可以讀,則通過一個CAS操作將readCnt加一。這樣,其他讀線程就無法再讀這個位置。而由於這個位置值還不是EMPTY,其他寫線程也無法寫這個位置。接下來,在讀走數值之后,再通過CAS操作將buffer中的這個位置置為EMPTY. 此時,寫線程可以在這個位置寫入數據。
但是,這個實現其實還有點問題。如果讀線程比較多,例如,讀線程個數和數組長度一樣,就可能兩個讀線程同時讀同一個位置。這樣的讀沖突這個程序是無法避免的。另外,如果讀數據過程中有一個讀線程停掉了,那么其他線程在讀或者寫到這個位置時,也會被阻塞。
代碼如下。
1 #include <windows.h> 2 #include <stdlib.h> 3 4 #define CAS(ptr, oldval, newval) (InterlockedCompareExchange(ptr, newval, oldval)) 5 6 static const long EMPTY=-1; 7 8 class NBQueue { 9 public: 10 NBQueue() 11 : queueSize(0) 12 , buffer(NULL) 13 , readCnt(0) 14 , writeCnt(0){} 15 16 ~NBQueue() { 17 uninit(); 18 } 19 20 bool init(int size) { 21 queueSize = size; 22 try { 23 buffer = new long[queueSize]; 24 } 25 catch (...) { 26 queueSize = 0; 27 buffer = NULL; 28 return false; 29 } 30 for (int i = 0; i < queueSize; i++) { 31 buffer[i] = EMPTY; 32 } 33 return true; 34 } 35 void uninit() { 36 if (buffer != NULL) { 37 delete[] buffer; 38 buffer = NULL; 39 } 40 queueSize = 0; 41 } 42 43 bool put(long v) { 44 bool succ; 45 long tail; 46 long index; 47 do { 48 tail = writeCnt; 49 index = tail % queueSize; 50 if (buffer[index] != EMPTY) { 51 return false; 52 } 53 long oldval = CAS(&buffer[index], EMPTY, v); 54 succ = oldval == EMPTY; 55 if (!succ) { 56 CAS(&writeCnt, tail, tail + 1); 57 } 58 } while (!succ); 59 60 CAS(&writeCnt, tail, tail + 1); 61 62 return true; 63 } 64 65 bool get(long* v) { 66 long head; 67 bool succ; 68 do { 69 head = readCnt; 70 if (head == writeCnt) { 71 return false; 72 } 73 long oldval = CAS(&readCnt, head, head + 1); 74 succ = oldval == head; 75 } while (!succ); 76 77 long index = head % queueSize; 78 *v = buffer[index]; 79 CAS(&buffer[index], *v, EMPTY); 80 81 return true; 82 } 83 84 private: 85 long* buffer; 86 int queueSize; 87 long readCnt; 88 long writeCnt; 89 };