書續上回:實現無鎖棧與隊列(1)
對於下面這個看起來很美好的無鎖棧:
1 //無鎖的棧。 2 3 typedef ELEM int; 4 #define MAX (2048) 5 6 static ELEM Stack[MAX]; 7 static int top = 0; 8 9 bool Push(ELEM val) 10 { 11 int old_top; 12 13 do 14 { 15 old_top = top; 16 if (old_top >= MAX) return false; 17 18 if (cas(&top, old_top, old_top + 1)) 19 break; 20 21 }while(1); 22 23 Stack[old_top] = val; 24 25 return true; 26 } 27 28 29 bool Pop(ELEM& val) 30 { 31 int old_top; 32 do 33 { 34 old_top = top; 35 36 if (old_top == 0) return false; 37 38 val = Stack[old_top - 1]; 39 40 if (cas(&top, old_top, old_top - 1)) 41 break; 42 43 } while(1); 44 45 46 return true; 47 }
我們仔細看一下它的 Push 操作,cas 保證了對 top 的更新是安全,原子的,但是數據的更新呢?這里把數據的更新放后了一步,似乎也是理所當然的:騰出了空間,再往里面寫東西。但是,但是,如果還沒有來得及完成往棧里寫數據,當前線程就被切換了出去呢?有人可能想,換出去就換出去唄,記得再換回來就行了。理想很豐滿,現實卻很骨感,再想一下,如果在換回來之前,有線程要從這個棧里 pop 數據怎么辦?棧的特性是后進先出的,top 被更新之后,在別的線程看來,就是已經完成了數據的插入,如果這時要進行 pop 操作,但之前的線程又沒有真的往里面寫完數據,那顯然結果就不是我們所想要的。
嚴重的錯誤!
回頭看一看,問題的根本所在就是,我們沒有保證數據的更新和 top 的更新是一致的。它們被分開了,在多線程的環境里,兩個相鄰的操作有可能是會相隔很遠的,遠到從前一個操作的完成到后一個操作的完成,中間可能經過了滄海桑田,任何東西都可能變了。問題清楚了,解決的方法看起來無非只有兩個:
1) 保證 top 的更新和數據的插入是同步的,即更新 top 與更新數據在同一個原子操作里完成。
2) 設置標記,在未完成插入數據之前,不允許 pop 操作。
第一個方案應該是最好的,但它不好實現,cas 的使用是有限制的,它並不能對任意長度的內存進行原子操作,而我們這里的設計,是希望設計一個相對泛型一些 stack,它應能適應各種長度的數據類型,顯然這個要求太嚴格,cas基本沒法滿足。
那么我們看看第二個方案,注意到棧的所有操作都是在棧頂,多線程場合下,對棧的操作如果有競發,那肯定就是在爭棧頂,這個特性看起來很有幫助:我們只要保證,在搶到棧頂,完成對棧頂的修改是在同一個線程里完成,而不會被別的線程干擾,那就成了!
1 //無鎖的棧。 2 3 typedef ELEM int; 4 #define MAX (2048) 5 6 static ELEM Stack[MAX]; 7 static int top = 0; 8 static int flag = 0; 9 10 bool Push(ELEM val) 11 { 12 int old_top; 13 14 do 15 { 16 old_top = top; 17 if (old_top >= MAX) return false; 18 19 if (!cas(flag, 0, 1)) continue; 20 21 if (cas(&top, old_top, old_top + 1)) 22 break; 23 24 }while(1); 25 26 Stack[old_top] = val; 27 28 cas(&flag, 1, 0); 29 30 return true; 31 } 32 33 34 bool Pop(ELEM& val) 35 { 36 int old_top; 37 do 38 { 39 old_top = top; 40 41 if (old_top == 0) return false; 42 43 if (!cas(&flag, 0, 1)) continue; 44 45 val = Stack[old_top - 1]; 46 47 if (cas(&top, old_top, old_top - 1)) 48 break; 49 50 } while(1); 51 52 cas(&flag, 1, 0); 53 54 return true; 55 }
上面的代碼顯然解決了之前的問題,真讓人高興,但再認真看看,我們忽然發現,我們其實是自己實現了一個互斥鎖啊,這並不算高明,更重要的是,它沒能符合我們第一篇文章里對無鎖棧的要求,首先,它的互斥性很強,只允許一個線程獨占操作,雖然沒有 sleep 操作導致線程切換,但是它的性能未必比加鎖的高,其二,也是最無法接受的,它不符合我們在前一篇文章里提的第二個要求,它不能避免死鎖,設想一下,如果一個線程在設置了 flag 之后,突然異常退出了,掛了,那后續的任何線程,都無法再操作這個棧了。
這個發現很讓人沮喪,它幾乎表明我們之前的所有工作前功盡棄了,回過頭來看,所有問題的根源就在於我們在實現這個數組為基礎的棧時,需要在鄰界區內做兩步操作,更新棧頂,寫數據,而且這兩個操作又有要格的順序要求,這個要求事實上太嚴格,以致於我現在沒法想到一個合適的方法來解決。但退一步來講,換一個想法,我們能不能干脆就避免這兩個操作同時在鄰界區進行呢?這是一個讓人眼前一亮的思路,用鏈表不就行了嗎?用鏈表來實現無鎖的隊列、棧,網上有很多很多相關的介紹文章及實現的代碼,我最開始也是准備那樣子做的,但是用鏈表來實現要解決幾個很麻煩的問題,這就是為什么我是先嘗試了用數組來實現的根本原因。現在看來,我明白了為什么網上幾乎沒有幾篇文章是介紹怎么用數組來實現無鎖隊列的原因了:用數組根本無法實現一個真正意義上的無鎖隊列。
現在再看看我們之前寫的代碼,它的確無法真正無鎖,但我們還可以對它加以改進讓它變得更適用,比如,最基本的一個,允許並發地寫,允許並發地讀,也就是允許幾個線程同時往里面寫,又或者允許幾線程同時從棧里讀,但不允許同時有讀寫。這是可以做到的,因為只是往棧里寫的時候,我們需要競爭一下 top,獲取一個數組的位置就夠了,讀也同理,這一個改進會讓這個棧的性能有很大的提升,雖然我們還是無法保證它不會死鎖。
為了保證能允許多個線程同時讀(寫),但又要讀寫互斥,我們需要至少檢查兩個標記,一個標記記錄是否有讀在進行,一個標記記錄是否有寫在進行,這看起來又像是要有兩步操作,前面的經驗看來,幾乎又會是失敗的開端。但我從 cas2 這種操作里獲得了一個新思路,我把這兩個標記放到一個變量里,那就可以避免要用兩步來實現檢查兩個變量了!
1 // 無鎖的棧。 2 3 typedef ELEM int; 4 #define MAX (2048) 5 6 static ELEM Stack[MAX]; 7 static int top = 0; 8 9 // 低位一個字節表示有多少線程在讀 10 // 低位第二個字節表示多少線程在寫 11 static size_t mask = 0; 12 13 bool Push(ELEM val) 14 { 15 int old_top; 16 size_t old_mask; 17 size_t append = 0x10; 18 19 do 20 { 21 old_top = top; 22 if (old_top >= MAX) return false; 23 // TODO 檢查正在寫的線程的數量,如果超過255就讓當前線程等待 24 old_mask = mask & (~0x0f);// 低位全為0時,沒有線程在讀,只有當沒有線程讀時,才能往棧里寫 25 if (!cas(&mask, old_mask, old_mask + append)) continue;// 這里可以適當sleep, sched_yield(); 26 27 if (cas(&top, old_top, old_top + 1)) 28 break; 29 30 }while(1); 31 32 Stack[old_top] = val; 33 34 do 35 { 36 old_mask = mask; 37 } while(!cas(&mask, old_mask, old_mask - append)); 38 39 return true; 40 } 41 42 43 bool Pop(ELEM& val) 44 { 45 int old_top; 46 size_t old_mask; 47 size_t append = 0x01; 48 49 do 50 { 51 old_top = top; 52 53 if (old_top == 0) return false; 54 55 old_mask = mask & (~0xf0);// 第二個字節為0時,沒有線程在寫,只有當沒有線程在寫時,才允許讀 56 // TODO, 檢查正在讀的線程的數量,如果超過255,就等待 57 if (!cas(&mask, old_mask, old_mask + append)) continue; 58 59 val = Stack[old_top - 1]; 60 61 if (cas(&top, old_top, old_top - 1)) 62 break; 63 64 } while(1); 65 66 do 67 { 68 old_mask = mask; 69 } while(!cas(&mask, old_mask, old_mask - append)); 70 71 return true; 72 }
經過上面的優化,這個棧在讀寫方面的性能有非常大的提升,它的特點是允許一定數量的線程在並發地寫(讀),已經非常接近我們理想中的無鎖棧了,唯一的遺憾是,它沒法保證讀寫的獨立性,如果有線程在讀,想寫數據的線程就得等待,反之亦然,這個缺點使得上面的代碼也沒法克服不會死鎖的缺點,所以也不能用在中斷,信號處理里面,十分遺憾。但不管怎樣,這算是邁出了一步,也實現了一個還算差強人意的棧,上面的代碼或許很簡潔,但有很多理論藏在了后面,要實現真正的無鎖數據結構不是一件容易的事情,首先就是很難有平台移植性,cas 操作與具體的 cpu 相關,內存模型更是與 cpu 千絲萬縷的關系,其中后一條我這兒只字不提,只是因為我不想讓事情變得復雜。
在實現真正的無鎖隊列的路上,我們還有路要走,接下來我會再介紹一下用鏈表來實現的思路,至於上面所寫的代碼,可以到 GitHub上獲取:https://github.com/kmalloc/server/blob/master/misc/LockFreeContainer.h