生產者消費者模式下的並發無鎖環形緩沖區


上一篇記錄了幾種環形緩沖區的設計方法和環形緩沖區在生產者消費者模式下的使用(並發有鎖),這一篇主要看看怎么實現並發無鎖。

0、簡單的說明

首先對環形緩沖區做下說明:

  1. 環形緩沖區使用改進的數組版本,緩沖區容量為2的冪
  2. 緩沖區滿阻塞生產者,消費者進行消費后,緩沖區又有可用資源,由消費者喚醒生產者
  3. 緩沖區空阻塞消費者,生產者進程生產后,緩沖區又有可用資源,由生產者喚醒消費者

然后對涉及到的幾個技術做下說明:

⑴CAS,Compare & Set,X86下對應的是CMPXCHG 匯編指令,原子操作,基本語義如下:

int CAS(address,old_value,new_value)
{
    int ret = *addresss;
    if(ret == old_value){
        *address = new_value;
    }
    return ret;
}

為了方便調用者知道調用結果,通常被編譯器改寫成如下形式:

bool CAS(address,old_value,new_value)
{
    int ret = *addresss;
    if(ret == old_value){
        *address = new_value;
        return true;
    }
    return ret;
}

⑵sched_yield(),調用sched_yield可以使當前線程讓出cpu,內核會把當前線程插入到線程優先級所對應的就緒隊列,然后調度新的線程占有cpu,Stack Overflow上的解釋

  1. sched_yield() causes the calling thread to relinquish the CPU. The thread is moved to the end of the queue for its static priority and a new thread gets to run.
  2. If the calling thread is the only thread in the highest priority list at that time, it will continue to run after a call to sched_yield().

1、無鎖環形隊列的實現

無鎖隊列的實現依托3個變量,in,out,max_out

in,被所有生產者共享,表示第一個可寫入的位置,每次成功讀取,值加1

out,被消費者共享,表示第一個課讀取的位置,每次成功讀取,值加1

max_out,生產者用於發布數據,[out,max_out]這一段表示消費者可消費的數據,max_out只能被生產者有序修改

out=max_out時,表示沒有數據可消費

que_size,環形緩沖區大小,總是2的冪

in-out表示緩沖區中有多少數據

in-out = que_size,表示緩沖區滿

 1 template<typename ELEM_TYPE>
 2 class queue{
 3 public:
 4     queue(int size);//把size上調至2的冪保存到que_size中
 5     bool enqueue(const ELEM_TYPE& in_data);
 6     bool dequeue(ELEM_TYPE& out_data );
 7     ...//其他成員
 8 private:
 9     ELEM_TYPE *arry;
10     int in;
11     int out;
12     int max_out;//用於讀寫線程同步
13     int que_size;
14     ... //其他成員
15 }
16 
17 template <typename ELEM_T>
18 bool queue<ELEM_T>::enqueue(const ELEM_T& in_data)
19 {
20     int cur_in ;
21     int cur_out;
22 
23     do{
24         cur_in  = in;
25         cur_out = out;
26 
27         if(cur_in - cur_out == que_size){
28             return false;
29         }
30         
31     }while(!CAS(&in,cur_in,cur_in+1))//如果cur_in==in依然成立,說明沒有其他線程修改in,cur_in是可用的,並修改in;如果cur_in!=in,說明in值已被其他線程修改
32     
33     arry[cur_in&(que_size-1)] = in_data;
34     
35     while(!CAS(&max_out,cur_in,cur_in+1)){//發布數據
36         sched_yield();//發布數據失敗,cur_in之前還有數據沒有發布出去,讓出cpu,讓其他線程先執行
37     }
38     
39     return true;
40 }
41 
42 template <typename ELEM_T>
43 bool queue<ELEM_T,QUE_SIZE>::dequeue(ELEM_T& out_data)
44 {
45     int cur_out;
46     int cur_max_out;
47 
48     do{
49         cur_out   = out;
50         cur_max_out = max_out;
51 
52         if(cur_out == cur_max_out){
53             return false;
54         }
55 
56         out_data = arry[cur_out&(que_size-1)] ;//先嘗試獲取數據
57         
58     }while(!CAS(&out,cur_out,cur_out+1))
59 
60     return true;
61 }

enqueue操作:

23行到30行:

先預取可插入位置cur_in

然后判斷緩沖區是否滿,滿了就不插入了,直接退出,返回false,表示緩沖區滿

然后使用CAS(&in,cur_in,cur_in+1)判斷該位置是否被使用過,在單線程中,25行到31行,cur_in和in永遠相等,因為這之間根本就沒有修過cur_in或in的語句,但是,在多線程中,in是所有生產者共享的,可能被其他線程在31行修改,所以,執行到31行時,要么cur_in<in,要么cur_in=in,CAS操作剛好可以判斷相等的這個關系,如果相等,說明沒有別的線程修改過in,也就是則預取的插入位置cur_in有效;如果不等,忙等待。

35行到37行:

這是生產者發布自己生產的數據,發布的方式就是把max_out指向當前線程的插入位置,同樣,max_out可能會被其他線程修改,所以可能導致CAS失敗,這時就不做忙等待了,而是讓出cpu,仔細想一下,CAS失敗的原因就是現在這個線程執行的太快了,導致這個線程插入位置cur_in之前還有數據沒有發布出去,所以這個線程先讓出cpu,先讓沒發布好數據的生產者先發布數據。

其實這樣做會存在問題:如果線程A發布數據時,發現在他之前,還有沒發布好數據的線程,假設為B,那么線程B在32行到34行之間掛掉之后,B之后的所有線程會一直處於忙等待的狀態。我現在還不知道這個問題要怎么解決。

dequeue操作類似 

2、生產者消費者

 說明一下,下面18行、21行、31行、34行是偽碼,看注釋就知道其含義了。

 1 queue<int> dataque(1024);//環形緩沖區
 2 
 3 queue<int> asleep_producers(32);//緩沖區滿時,生產者應該阻塞,該隊列就是生產者等待隊列
 4 queue<int> asleep_consumers(32);//緩沖區空時,消費者應該阻塞,該隊列就是消費者等待隊列
 5 
 6 
 7 void produce(){
 8     if(!dataque.enqueue(in_data)){//生產的數據入隊列
 9     
10         /* 數據入隊列失敗,把當前數據丟掉,或者保存到磁盤中等等*/
11         ... 
12         ...
13 
14         if(!asleep_producers.enqueue(this->gettid())){//把當前生產者線程加入等待隊列
15                 pthread_exit();//如果加入失敗,說明已經達到了等待隊列的最大值,那么線程主動退出
16         }
17 
18         producer_poller.wait();//睡眠,等待信號的到來
19     }
20     
21     wakeup_consumers(asleep_consumers);//生產了一個數據,應該從消費者等待隊列中取一個線程消費數據,本質上可以發送一個數據可消費者的poller
22 }
23 
24 
25 void consume()
26 {
27     if(!dataque.dequeue(out_data)){
28 
29         asleep_consumers.enqueue(this->gettid());//沒能成功取得數據,把自己加入到消費者等待隊列中
30     
31         consumer_poller.wait();//睡眠,等待信號到來
32     }
33     
34     wakeup_producers(asleep_producers);//消費了一個數據,應該從生產者等待隊列中取一個線程生產數據,本質上可以發送一個數據給生產者的poller
35 }

關於producer_poller、consumer_poller、wait、wakeup的實現可以用下面的方案:

每個線程內含一個socketpair和select,select用於監聽socketpair的讀端。

生產者線程起來的時候先去嘗試一次生產(調用produce函數),失敗之后使用select監聽消費者發過來的可生產信號,select成功監聽到事件后,再去生產(調用produce)

消費者線程起來的時候先去嘗試一次消費(調用consume函數),失敗之后使用select監聽生產者發過來的可消費信號,select成功監聽到事件后,再去消費(調用consume)

具體的實現方案可以參考:zeromq源碼分析筆記之線程間收發命令(2)

 

參考資料

基於數組的無鎖隊列(譯)

ABA problem

無鎖隊列的實現

sched_yield(2) - Linux man page


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM