為了實現一個快速無鎖的 logging 模塊, 這幾天花了不少時間去了解怎樣實現一些無鎖的操作及與之相對應的數據結構。對多線程場景下的無鎖操作的研究一直是個熱點,理想中的無鎖操作,它應能天然地避開有鎖操作的一些缺陷,比如:
1)減少線程切換,能夠相對快速高效地讀寫(不使用 mutex, semaphore)
2)避免死鎖的可能,任何操作都應能在有限的等待時間內完成,
這些優點是很有吸引力的,它們從根本上繞開了有鎖操作可能引起的令人頭疼的同步死鎖問題,那么它會是我們的救世主嗎? 要了解無鎖的數據結構,我們不妨先來回顧一下常規的數據結構是怎么寫。
1 // 一般的棧。 2 3 typedef ELEM int; 4 #define MAX (2048) 5 6 static ELEM Stack[MAX]; 7 static int top = 0; 8 9 bool Push(const ELEM& val) 10 { 11 if (top >= MAX) return false; 12 13 Stack[top] = val; 14 ++top; 15 return true; 16 } 17 18 19 bool Pop(ELEM& val) 20 { 21 if (top == 0) return false; 22 23 --top; 24 val = Stack[top]; 25 26 return true; 27 }
這樣的棧在單線程場合下是常見的,也很簡潔明了,但它卻不適用於多線程的場合,試想一下,如果兩個線程,線程 a, 線程 b, 同一時間對同一個棧進行 Push 操作,參考上面的代碼,假設此時 top = 0, 如果線程 a 在執行到第13 行時停了下來,切換到線程 b 進行 Push,線程 b 執行完 13 行,但沒有執行 14 行的時候,這時 Stack[top] 中已經插入了線程 b 要插入的值,但 top 還沒更新,如果這時線程 b 不幸又被切換了出去,換到線程 a 繼續執行,那么線程 a 又會在同樣一個位置 top = 0 的地方插入,從而破壞了線程b的操作!
我們可以觀察到,上面的代碼在多線程下之所以不安全,是因為 Stack 被多個線程同時修改,但各個線程又沒有對關鍵的變量在訪問順序上作保護。對此,我們可以引入一些同步的機制來修改它,使得它能在多線程的場合里是操作安全的。
1 //帶鎖的棧。 2 3 typedef ELEM int; 4 #define MAX (2048) 5 6 static ELEM Stack[MAX]; 7 static int top = 0; 8 9 static Mutex mutex; 10 11 bool Push(ELEM val) 12 { 13 if (top >= MAX) return false; 14 15 Lock(&mutex); 16 17 Stack[top] = val; 18 ++top; 19 20 Unlock(&mutex); 21 22 return true; 23 } 24 25 26 bool Pop(ELEM& val) 27 { 28 if (top == 0) return false; 29 30 Lock(&mutex); 31 32 --top; 33 val = Stack[top]; 34 35 Unlock(&mutex); 36 return true; 37 }
上面的代碼就是我們常說的有鎖操作了,mutex 保證了各個線程對公共變量的訪問是安全的,各個線程在同時對 Stack 進行操作時,需要先搶占 mutex,搶到就可以對 stack 進行操作,沒搶到就先等着。這里付出了些代價,但保證了操作的安全可靠性。那么這些保護是有必要的嗎?再觀察一下前面的代碼,多個線程有可能,有需要同時修改的變量就一個而已: top. 只要我們參保證 top 在多線程的環境里能夠安全地被修改,那對整個 stack 的修改也都是安全的。事情看起來,好像比較簡單。要保證對 top 變量的原子操作,我們需要 cpu 提供一些特殊的支持,來保證我們在對某些內存進行修改時,不會被線程所中斷,它要么就完成,要么就不完成,而不會在完成到一半時,被別的線程中斷。在 intel 平台上,從 80486 開始,CMPXCHG 匯編指令可以幫助我們完全這件事情,這就是我們通常所說 CAS 操作的基礎。
下面我們嘗試用 cas 來寫一個無鎖的 stack.
1 // 無鎖的棧。 2 3 typedef ELEM int; 4 #define MAX (2048) 5 6 static ELEM Stack[MAX]; 7 static int top = 0; 8 9 bool Push(ELEM val) 10 { 11 int old_top; 12 13 do 14 { 15 old_top = top; 16 if (old_top >= MAX) return false; 17 18 if (cas(&top, old_top, old_top + 1)) 19 break; 20 21 }while(1); 22 23 Stack[old_top] = val; 24 25 return true; 26 } 27 28 29 bool Pop(ELEM& val) 30 { 31 int old_top; 32 do 33 { 34 old_top = top; 35 36 if (old_top == 0) return false; 37 38 val = Stack[old_top - 1]; 39 40 if (cas(&top, old_top, old_top - 1)) 41 break; 42 43 } while(1); 44 45 46 return true; 47 }
上面的實現乍看起來很美好, 它會是我們想要的東西嗎?