對無鎖隊列的最初興趣來自梁斌同志的一個英雄帖:http://coderpk.com/。 第一次看到這個題目的時候還不知道CAS,FAA等所謂的“原子操作”,但直覺上感覺,通過對讀寫操作的性能優化來達到大幅提高隊列性能的方法是行不通的,就算讀寫操作全用匯編來寫,也不會和正常的read及 write有數量級上的區別。后來搜索了一下lock free data structure,才知道了關於原子操作的一些東西,同時也糾正了自己由來已久的一個錯誤觀點:C++中的++操作和--操作都不是原子操作。這篇筆記將記錄在我在探索這個問題的過程中產生的一些想法,同時對多線程編程——或者說是並發編程的一些問題進行備忘。
多線程及並發
線程是操作系統進行作業調度的最小單位,也是進程內部的一條執行路徑。與進程不同,線程並沒有對操作系統的資源所有權,也就是說多個線程對資源的訪問權是共享的。一個進程中的所有線程共享一個地址空間或者諸如打開的文件之類的其他資源,一個進程對這些資源的任何修改,都會影響到本進程中其他線程的運行。因此,需要對多個線程的執行進行細致地設計,使它們能夠互不干涉,並且不破壞共享的數據及資源。
在單處理器中的並發系統里,不同進程和線程之間的指令流是交替執行的,但由於調度系統及CPU時鍾的配合,使得程序對外表現出一種同時執行的外部特征;而在並行多處理器系統中,指令流之間的執行則是重疊的。無論是交替執行還是重疊執行,實際上並發程序都面臨這同樣的問題,即指令流的執行速度不可預測,這取決於其他指令流的活動狀態、操作系統處理中斷的方式及操作系統的調度策略。這給並發程序設計帶來了如下的一些問題:
1)多個進程(或線程)對同一全局資源的訪問可能造成未定義的后果。例如,如果兩個並發線程都使用同一個全局變量,並且都對該變量執行讀寫操作,那么程序執行的結果就取決於不同的讀寫執行順序——而這些讀寫執行順序是不可預知的。
2)操作系統難以對資源進行最優化分配。這涉及到死鎖及飢餓的問題。
3)很難定位程序的錯誤。在多數情況下,並發程序設計的失誤都是很難復現的,在一次執行中出現一種結果,而在下一次執行中,往往會出現迥然不同的其他結果。
因此,在進行多線程程序或者並發程序的設計時,尤其需要小心。可以看到的是,絕大多數並發程序的錯誤都出現在對共享資源的訪問上,因此,如何保證對共享資源的訪問以一種確定的、我們可以預知的方式運行,成為並發程序設計的首要問題。在操作系統領域,對共享資源的訪問有個專用的數據,稱為臨界區。
臨界區是一段代碼,在這段代碼中,進程將訪問共享資源。當另外一個進程已經在這段代碼中執行時,這個進程就不能在這段代碼中執行。
也就是說,臨界區是一個代碼段,這個代碼段不允許兩條並行的指令流同時進入。提供這種保證的機制稱為互斥:當一個進程在臨界區訪問共享資源時,其他進程不能進入該臨界區。
鎖及互斥
實現互斥的機制,最重要的是互斥鎖(Mutex)。互斥鎖實際上是一種二元信號量(只有0和1),專用於多任務之間臨界區的互斥操作。(關於信號量及互斥鎖的區別,可以參看操作系統相關知識)
Mutex本質上是一個信號量對象,只有0和1兩個值。同時,mutex還對信號量加1和減1的操作進行了限制,即某個線程對其進行了+1操作,則-1操作也必須由這個線程來完成。mutex的兩個值也分別代表了Mutex的兩種狀態。值為0, 表示鎖定狀態,當前對象被鎖定,用戶進程/線程如果試圖Lock臨界資源,則進入排隊等待;值為1,表示空閑狀態,當前對象為空閑,用戶進程/線程可以Lock臨界資源,之后Mutex值減1變為0。
Mutex可以抽象為創建(Create),加鎖(Lock),解鎖(Unlock),及銷毀(Destroy)等四個操作。在創建Mutex時,可以指定鎖的狀態是空閑或者是鎖定,在linux中,這個屬性的設置主要通過pthread_mutex_init來實現。
在使用mutex的時候,務必需要了解其本質:Mutex實際上是一個在多個線程之間共享的信號量,當其進入鎖定狀態時,再試圖對其加鎖,則會阻塞線程。例如,對於兩個線程A和B,其指令序列如下:
| 線程A |
線程B |
| 1.lock(&mutex); 2.do something; 3.unlock(&mutex); |
|
在線程A的語句1處,線程A對mutex進行了加鎖操作,mutex變為鎖定狀態。在線程A的語句2及線程B的語句1處,A尚未對mutex進行解鎖,而B則試圖是mutex進行加鎖操作,因此線程B被阻塞,直到A的語句3處,線程A對mutex進行了解鎖,B的語句1才得以繼續執行,將mutex進行加鎖並繼續執行語句2和語句3。因此,如果在do something中有對共享資源的訪問操作,那么do something就是一個臨界區,每次都只有一個線程能夠進入這段代碼。
原子操作
無論是信號量,還是互斥,其中最重要的一個概念就是原子操作。所謂原子操作,就是不會被線程調度機制所打斷的操作——從該操作的第一條指令開始到最后一條指令結束,中間不會有任何的上下文切換(context switch)。
在單處理器系統上,原子操作的實現較為簡單:第一種方式是一些單指令即可完成的操作,如compare and swap、test and set等;由於上下文切換只可能出現在指令之間,因此單處理器系統上的單指令操作都是原子操作;另一種方式則是禁用中斷,通過匯編語言支持,在指令執行期間,禁用處理器的所有中斷操作,由於上下文切換都是通過中斷來觸發的,因此禁用中斷后,可以保證指令流的執行不會被外部指令所打斷。
而在多處理器系統上,情況要復雜一些。由於系統中有多個處理器在獨立地運行,即使能在單條指令中完成的操作也有可能受到干擾。如,在不同的CPU運行的兩個進行都在執行一條遞減指令,即對內存中某個內存單元的值-1,則指令流水線可能是這樣:(省略了取指)
A處理器: |--讀內存--|--計數減1--|--寫內存--|
B處理器: |--讀內存--|--計數減1--|--寫內存--|
假設原來內存單元中存儲的值為5,那么,A、B處理器所讀到的內存值都為5,其往內存單元中寫入的值都為4。因此,雖然進行了兩次-1操作,但實際上運行的結果和執行了1次是一樣的。
注:這是一個數據相關問題(關於數據相關問題,可以參考計算機體系結構中指令流水線的設計及數據相關的避免等資料),在單處理機中,這個問題可以通過檢查處理機中的指令寄存器,來檢查在流水線中的指令之間的相關性,如果出現數據相關的情況,可以通過延遲相關指令執行的方法來規避;而在對稱多處理機中,由於CPU之間相互不知道對方的指令寄存器狀態,那么這種流水線作業引起的數據競跑就無法避免。
為了對原子操作提供支持,在x86 平台上,CPU提供了在指令執行期間對總線加鎖的手段。CPU芯片上有一條引線#HLOCK pin,如果匯編語言的程序中在一條指令前面加上前綴"LOCK",經過匯編以后的機器代碼就使CPU在執行這條指令的時候把#HLOCK pin的電位拉低,持續到這條指令結束時放開,從而把總線鎖住,這樣同一總線上別的CPU就暫時不能通過總線訪問內存了,保證了這條指令在多處理器環境中的原子性。
可以看出,其實pthread_mutex_lock及pthread_mutex_unlock就是一個原子操作。它保證了兩個線程不會同時對某個mutex變量加鎖或者解鎖,否則的話,互斥也就無從實現了。
i++和++i是原子操作嗎?
有一個很多人也許都不是很清楚的問題:i++或++i是一個原子操作嗎?在上一節,其實已經提到了,在SMP(對稱多處理器)上,即使是單條遞減匯編指令,其原子性也是不能保證的。那么在單處理機系統中呢?
在編譯器對C/C++源代碼進行編譯時,往往會進行一些代碼優化。例如,對i++這條指令,實際上編譯器編譯出的匯編代碼是類似下面的匯編語句:
1.mov eax,[i]
2.add eax,1
3.mov [i],eax
語句1是將i所在的內存讀取到寄存器中,而語句2是將寄存器的值加1,語句3是將寄存器值寫回到內存中。之所以進行這樣的操作,是為了CPU訪問數據效率的高效。可以看出,i++是由一條語句被編譯成了3條指令,因此,即使在單處理機系統上,i++這種操作也不是原子的。這是由於指令之間的亂序執行而造成的,注意和上節中,指令流水線之間的數據競跑造成的數據不一致的區別。
GCC的內建原子操作
在GCC中,從版本4.1.2起,提供了__sync_*系列的built-in函數,用於提供加減和邏輯運算的原子操作。這些操作通過鎖定總線,無論在單處理機和多處理機上都保證了其原子性。GCC提供的原子操作主要包括:
type __sync_fetch_and_add (type *ptr, type value, ...)
type __sync_fetch_and_sub (type *ptr, type value, ...)
type __sync_fetch_and_or (type *ptr, type value, ...)
type __sync_fetch_and_and (type *ptr, type value, ...)
type __sync_fetch_and_xor (type *ptr, type value, ...)
type __sync_fetch_and_nand (type *ptr, type value, ...)
這六個函數的作用是:取得ptr所指向的內存中的數據,同時對ptr中的數據進行修改操作(加,減,或,與,異或,與后取非)等;
type __sync_add_and_fetch (type *ptr, type value, ...)
type __sync_sub_and_fetch (type *ptr, type value, ...)
type __sync_or_and_fetch (type *ptr, type value, ...)
type __sync_and_and_fetch (type *ptr, type value, ...)
type __sync_xor_and_fetch (type *ptr, type value, ...)
type __sync_nand_and_fetch (type *ptr, type value, ...)
這六個函數與上六個函數基本相同,不同之處在於,上六個函數返回值為修改之前的數據,而這六個函數返回的值為修改之后的數據.
bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
比較並交換指令.如果ptr所指向的內存中的數據等於oldval,則設置其為newval,同時返回true;否則返回false.
type __sync_lock_test_and_set (type *ptr, type value, ...)
測試並置位指令.
void __sync_lock_release (type *ptr, ...)
將ptr設置為0;
其中,這些操作的操作數(type)
可以是1,2,4或8字節長度的int類型,即:
int8_t / uint8_t
int16_t / uint16_t
int32_t / uint32_t
int64_t / uint64_t
無鎖隊列的實現
在酷殼的一篇文章(http://coolshell.cn/articles/8239.html )中,給出了一種鏈表無鎖隊列的實現。其中對ABA和double CAS等現象都進行了分析。在文章的結尾,給出了一種數組無鎖隊列的實現,不過這個數組受限於CAS、FAA等操作對操作類型的限制,只能存儲一些較小的數據類型,如32位數據等。而對於鏈表無鎖隊列,每次進行出隊和入隊操作都伴隨着內存的分配和釋放,不可避免地要影響到效率。
而使用環形數組的隊列則避免了頻繁的內存操作,從實現上來說也更加簡單。本節描述如何以環形數組為基礎,實現一個無鎖隊列。
多線程之間的協調
多線程程序或者說並發程序之間協調的關鍵是,要考慮到多個線程同時訪問某個資源的時候,保證它們訪問的順序能夠准確地反映到程序執行的結果上。
先定義一下無鎖隊列的基本結構:
template
class LockFreeQueue {
private:
ElementT * ring_array_;
int size_;
int head_index_;
int tail_index_;
}
由於出隊操作都是在隊首進行,而入隊操作則都是在隊尾進行,因此,我們可以嘗試用head_index_和tail_index_來實現多個線程之間的協調。這其中會用到CAS操作:
入隊進程:
……
do {
獲取當前的tail_index_的值cur_tail_index;
計算新的tail_index_的值:new_tail_index = (cur_tail_index + 1) % size;
} while(!CAS(tail_index_, cur_tail_index, new_tail_index));
插入元素到cur_tail_index;
其中的do-while循環實現的是一個忙式等待:線程試圖獲取當前的隊列尾部空間的控制權;一旦獲取成功,則向其中插入元素。
但是這樣出隊的時候就出現了問題:如何判斷隊首的位置里是否有相應元素呢?僅使用head_index_來判斷是不行的,這只能保證出隊進程不會對同一個索引位置進行出隊操作,而不能保證head_index_的位置中一定有有效的元素。
因此,為了保證出隊隊列與入隊隊列之間的協調,需要在LockFreeQueue中添加一個標志數組:
char * flag_array_;
flag_array中的元素標記ring_array_中與之對應的元素位置是否有效。flag_array_中的元素有4個取值:
0表示對應的ring_array_中的槽位為空;1表示對應槽位已被申請,正在寫入;2表示對應槽位中為有效的元素,可以對其進行出對操作;3則表示正在彈出操作。
修改后的無鎖隊列的代碼如下:
template
class LockFreeQueue {
private:
ElementT * ring_array_;
char * flags_array_; // 標記位,標記某個位置的元素是否被占用
// flags: 0:空節點;1:已被申請,正在寫入
// 2:已經寫入,可以彈出;3,正在彈出操作;
int size_; // 環形數組的大小
int element_num_; //隊列中元素的個數
int head_index_;
int tail_index_;
public:
LockFreeQueue(int s = 0) {
size_ = s;
head_index_ = 0;
tail_index_ = 0;
element_num_ = 0;
}
~LockFreeQueue() {}
public:
// 初始化queue。分配內存,設定size
bool Init(void);
const int GetSize(void) const {
return size_;
}
const int GetElementNum(void) const {
return element_num_;
}
// 入隊函數
bool EnQueue(const ElementT & ele);
// 出隊函數
bool DeQueue(ElementT * ele);
};
// This function is NOT ThreadSafe!
// 應當在單線程環境中使用該函數
// OR should be called in the constructor...
template
bool LockFreeQueue::Init(void) {
flags_array_ = new(std::nothrow) char[size_];
if (flags_array_ == NULL)
return false;
memset(flags_array_, 0, size_);
ring_array_ = reinterpret_cast(
new(std::nothrow) char[size_ * sizeof(ElementT)]);
if (ring_array_ == NULL)
return false;
memset(ring_array_, 0, size_ * sizeof(ElementT));
return true;
}
// ThreadSafe
// 元素入隊尾部
template
bool LockFreeQueue::EnQueue(const ElementT & ele) {
if (!(element_num_ < size_))
return false;
int cur_tail_index = tail_index_;
char * cur_tail_flag_index = flags_array_ + cur_tail_index;
// 忙式等待
// while中的原子操作:如果當前tail的標記為“”已占用(1)“,則更新cur_tail_flag_index,
// 繼續循環;否則,將tail標記設為已經占用
while (!__sync_bool_compare_and_swap(cur_tail_flag_index, 0, 1)) {
cur_tail_index = tail_index_;
cur_tail_flag_index = flags_array_ + cur_tail_index;
}
// 兩個入隊線程之間的同步
// 取模操作可以優化
int update_tail_index = (cur_tail_index + 1) % size_;
// 如果已經被其他的線程更新過,則不需要更新;
// 否則,更新為 (cur_tail_index+1) % size_;
__sync_bool_compare_and_swap(&tail_index_, cur_tail_index, update_tail_index);
// 申請到可用的存儲空間
*(ring_array_ + cur_tail_index) = ele;
// 寫入完畢
__sync_fetch_and_add(cur_tail_flag_index, 1);
// 更新size;入隊線程與出隊線程之間的協作
__sync_fetch_and_add(&element_num_, 1);
return true;
}
// ThreadSafe
// 元素出隊頭部
template
bool LockFreeQueue::DeQueue(ElementT * ele) {
if (!(element_num_ > 0))
return false;
int cur_head_index = head_index_;
char * cur_head_flag_index = flags_array_ + cur_head_index;
while (!__sync_bool_compare_and_swap(cur_head_flag_index, 2, 3)) {
cur_head_index = head_index_;
cur_head_flag_index = flags_array_ + cur_head_index;
}
// 取模操作可以優化
int update_head_index = (cur_head_index + 1) % size_;
__sync_bool_compare_and_swap(&head_index_, cur_head_index, update_head_index);
*ele = *(ring_array_ + cur_head_index);
// 彈出完畢
__sync_fetch_and_sub(cur_head_flag_index, 3);
// 更新size
__sync_fetch_and_sub(&element_num_, 1);
return true;
}
無鎖隊列的分析——死鎖及飢餓
經過上節的分析,LockFreeQueue實現了基本的多線程之間的協調,不會存在多個線程同時對同一個資源進行操作的情況,也就不會產生數據競跑,這保證了對於這個隊列而言,基本的訪問操作(出隊、入隊)的執行都是安全的,其結果是可預期的。
在多線程環境下,LockFreeQueue會不會出現死鎖的情況呢?死鎖有四個必要條件:1:對資源的訪問是互斥的;2,請求和保持請求;3,資源不可剝奪;4,循環等待。在LockFreeQueue中,所有的線程都是對資源進行申請后再使用,一個線程若申請到了資源(這里的資源主要指環形隊列中的內存槽位),就會立即使用,並且在使用完后釋放掉該資源。不存在一個線程使用A資源的同時去申請B資源的情況,因此並不會出現死鎖。
但LockFreeQueue可能出現飢餓狀態。例如,對兩個出隊線程A、B,兩者都循環進行出隊操作。當隊列中有元素時,A總能申請到這個元素並且執行到彈出操作,而B則只能在DeQueue函數的while循環中一直循環下去。
一些優化
對LockFreeQueue可以進行一些優化。比如:
1,對於環形數組大小,可以設定為2的整數倍,如1024。這樣取模的操作即可以簡化為與size_-1的按位與操作。
2,忙式等待的時候可能會出現某個線程一直占用cpu的情況。此時可以使用sleep(0),其功能類似於java中的yield系統調用,可以讓該線程讓出CPU時間片,從就緒態轉為掛起態。
