在上一篇中說到了mailbox_t的底層實際上使用了管道ypipe_t來存儲命令。而ypipe_t實質上是一個無鎖隊列,其底層使用了yqueue_t隊列,ypipe_t是對yueue_t的再包裝,所以我們先來看看yqueue_t是怎么實現的。
1、yqueue_t
yqueue_t是一個高效的隊列,高效體現在她的內存配置上,盡量少的申請內存,盡量重用將要釋放的內存。其實,容器的設計都會涉及這點--高效的內存配置器,像sgi stl容器的內存配置器,使用了內存池,預先分配一塊較大內存,用不同大小的桶管理,容器申請內存時從相應的桶里拿一塊內存,釋放內存時又把內存回收到相應的桶里,這樣就能做到盡量少的malloc調用。yqueue_t並沒有使用內存池,但是利用了同樣的思想,一次性分配一個chunk_t減少內存分配次數,並用spare_chunk管理將要釋放的塊用於內存回收,詳細的實現后面再說,先看一下yqueue_t的整個概況,源碼位於Yqueue.hpp
// T is the type of the object in the queue.隊列中元素的類型 // N is granularity(粒度) of the queue,簡單來說就是yqueue_t一個結點可以裝載N個T類型的元素,可以猜想yqueue_t的一個結點應該是個數組 template <typename T, int N> class yqueue_t { public: inline yqueue_t ();// Create the queue. inline ~yqueue_t ();// Destroy the queue. inline T &front ();// Returns reference to the front element of the queue. If the queue is empty, behaviour is undefined. inline T &back ();// Returns reference to the back element of the queue.If the queue is empty, behaviour is undefined. inline void push ();// Adds an element to the back end of the queue. inline void pop ();// Removes an element from the front of the queue. inline void unpush ()// 用於回滾操作,暫時先不管這個函數,用到再說 private: // Individual memory chunk to hold N elements. struct chunk_t { T values [N]; chunk_t *prev; chunk_t *next; }; chunk_t *begin_chunk; int begin_pos; chunk_t *back_chunk; int back_pos; chunk_t *end_chunk; int end_pos; atomic_ptr_t<chunk_t> spare_chunk; //空閑塊(我把所有元素都已經出隊的塊稱為空閑塊),讀寫線程的共享變量 };
可以看到,yqueue_t是采用雙向鏈表實現的,鏈表結點稱之為chunk_t,每個chunk_t可以容納N個T類型的元素,以后就以一個chunk_t為單位申請內存,begin_chunk可以理解為鏈表頭結點,back_chunk可以理解為隊列中最后一個元素所在的鏈表結點,我們知道容器都應該要能動態擴容的,end_chunk就是拿來擴容的,總是指向鏈表的最后一個結點,而spare_chunk表示最近的被踢出隊列的鏈表結點。入隊操作back_chunk和back_pos,back_chunk結點填滿元素時該擴容,讓end_chunk指向新的鏈表結點或者之前釋放的鏈表結點,出隊操作begin_chunk和begin_pos,begin_chunk所有元素都出完后並不釋放內存,而是讓spare_chunk指向他,然后釋放spare_chunk上一次的指針,這樣擴容的時候就可以重新使用這個結點了。begin_chunk,begin_pos,back_chunk,back_pos,end_chunk,end_pos的關系大致如下:
這里需要重點說一下spare_chunk,根據上面的描述,擴容(寫線程的事)和出隊列(寫線程的事)都會用到這個變量,所以這個變量是讀寫共享的,有同步的語義,zmq用了atomic_ptr_t<T>來做同步,atomic_ptr_t同樣可以看成是一個指針,結構上atomic_ptr_t內含一個指針,提供了兩個原子操作和一個非原子操作,在yqueue_t中就需要用到其中一個原子操作xchg。
// This class encapsulates several atomic operations on pointers. template <typename T> class atomic_ptr_t { public: inline void set (T *ptr_);//非原子操作 inline T *xchg (T *val_);//原子操作 inline T *cas (T *cmp_, T *val_);//原子操作 private: volatile T *ptr; }
- set函數,把私有成員ptr指針設置成參數ptr_的值,不是一個原子操作,需要使用者確保執行set過程沒有其他線程使用ptr的值
- xchg函數,把私有成員ptr指針設置成參數val_的值,並返回ptr設置之前的值。原子操作,操作系統保證線程安全
- cas函數,把私有成員ptr指針與參數cmp_指針比較,如果相等,就把ptr設置為參數val_的值,返回ptr設置之前的值;如果直接返回ptr值。原子操作,操作系統保證線程安全
在實現上xchg和cas函數就是包裝了各種cpu提供的xchg和cas原子操作,想了解原理的可以查一查着方面的資料,這里只需要知道有這個功能就可以了。
有了這個指針,就可以保證單個讀線程和單個寫線程時的線程安全了。
接着,來看下yqueue_t是如何構造、push、pop的。后面我會把begin_chunk和begin_pos合起來成為隊頭指針,back_chunk和back_pos合起來成為隊尾指針,end_chunk和end_pos合起來稱為容器指針
①構造yqueue_t
inline yqueue_t () { begin_chunk = (chunk_t*) malloc (sizeof (chunk_t)); alloc_assert (begin_chunk); begin_pos = 0; back_chunk = NULL;//back_chunk總是指向隊列中最后一個元素所在的鏈表結點,現在還沒有元素,所以初始為空 back_pos = 0; end_chunk = begin_chunk;//end_chunk總是指向鏈表的最后一個結點 end_pos = 0; }
②pop
// Removes an element from the front end of the queue. inline void pop () { if (++ begin_pos == N) { chunk_t *o = begin_chunk; begin_chunk = begin_chunk->next; begin_chunk->prev = NULL; begin_pos = 0; // 'o' has been more recently used than spare_chunk, // so for cache reasons we'll get rid of the spare and // use 'o' as the spare. chunk_t *cs = spare_chunk.xchg (o);//由於局部性原理,總是保存最新的空閑塊而釋放先前的空閑快 free (cs); } }
主要是鏈表的基本操作。pop雖然只有幾行代碼,卻也有兩個點需要注意:
- pop掉的元素,其銷毀工作交給調用者完成
- 空閑塊的保存,要求是原子操作。這得想明白為什么。原因是,空閑塊是讀寫線程的共享變量,需要做同步,我們會在push中看到,push使用了spare_chunk。
③push
inline void push ()
{
back_chunk = end_chunk; back_pos = end_pos; if (++end_pos != N)//end_pos==N表明這個鏈表結點已經滿了 return; chunk_t *sc = spare_chunk.xchg (NULL); if (sc) { end_chunk->next = sc; sc->prev = end_chunk; } else { end_chunk->next = (chunk_t*) malloc (sizeof (chunk_t)); alloc_assert (end_chunk->next); end_chunk->next->prev = end_chunk; } end_chunk = end_chunk->next; end_pos = 0; }
push操作並未真正的push一個元素,只是把隊尾指針指向容器指針,然后讓容器指針加1,所以,這二者的值總是差1,二者的關系在第2節中的四個圖中可以看的更清楚。擴容的條件是容器指針到達了容器尾(所以end_chunk是拿來擴容的),擴容時先去spare_chunk拿之前廢棄的塊(所有元素都被pop的塊),拿到了就重用,沒拿到就得重新申請。同樣需要注意,拿空閑塊需要做同步操作。
當end_pos==N時,需要擴容,如下:
④front、back
這兩個函數需要注意的點是,返回的是引用,是個左值,調用者可以通過二者修改容器的值。
// Returns reference to the front element of the queue. // If the queue is empty, behaviour is undefined. inline T &front () { return begin_chunk->values [begin_pos]; } // Returns reference to the back element of the queue. // If the queue is empty, behaviour is undefined. inline T &back () { return back_chunk->values [back_pos]; }
總的來說yqueue_t還是比較好理解,現在可以來看一看ypipe_t的實現。
2、ypipe_t
先看下ypipe_t的介紹(注釋)、類繼承關系、類接口及數據成員
// Lock-free queue implementation. // Only a single thread can read from the pipe at any specific moment. // Only a single thread can write to the pipe at any specific moment. // T is the type of the object in the queue. // N is granularity of the pipe, i.e. how many items are needed to // perform next memory allocation. template <typename T, int N> class ypipe_t : public ypipe_base_t<T,N> template <typename T, int N> class ypipe_base_t { public: virtual ~ypipe_base_t () {} virtual void write (const T &value_, bool incomplete_) = 0; virtual bool unwrite (T *value_) = 0; virtual bool flush () = 0; virtual bool check_read () = 0; virtual bool read (T *value_) = 0; virtual bool probe (bool (*fn)(T &)) = 0; }; template <typename T, int N> class ypipe_t : public ypipe_base_t<T,N> { protected: // Allocation-efficient queue to store pipe items. // Front of the queue points to the first prefetched item, back of the pipe points to last un-flushed item. // Front is used only by reader thread, while back is used only by writer thread. yqueue_t <T, N> queue;//底層容器 // Points to the first un-flushed item. This variable is used exclusively by writer thread. T *w;//指向第一個未刷新的元素,只被寫線程使用 // Points to the first un-prefetched item. This variable is used exclusively by reader thread. T *r;//指向第一個還沒預提取的元素,只被讀線程使用 // Points to the first item to be flushed in the future. T *f;//指向下一輪要被刷新的一批元素中的第一個 // The single point of contention between writer and reader thread. // Points past the last flushed item. If it is NULL,reader is asleep. // This pointer should be always accessed using atomic operations. atomic_ptr_t <T> c;//讀寫線程共享的指針,指向每一輪刷新的起點(看代碼的時候會詳細說)。當c為空時,表示讀線程睡眠(只會在讀線程中被設置為空) }
ypipe_t繼承自ypipe_base_t,其提供了一組操作管道的接口,從ypipe_t源碼來看,他只是實現了這組接口,並沒有提供其他的方法了。T、N在yqueue_t中已經詳細說過了。從數據成員來看,其底層使用了上面講到的yqueue_t,可以猜想,ypipe_t的write、read等操作往yqueue_t中寫數據讀數據。ypipe_t開頭的的注釋中寫道,ypipe_t是一個無鎖隊列的實現,單個讀單個寫同時操作ypipe_t是線程安全的。有沒有發現這就是一個生產者消費者的問題,寫線程是生產者,讀線程是消費者,yqueue_t就是緩沖區,由於只有一個生產者、一個消費者,並不涉及同類線程間的互斥,只需讀線程和寫線程同步就可以了,操作系統課程中的解法就是兩個信號量+PV操作,涉及到鎖,而這里是無鎖,其實就是使用了數據成員中的三個指針w、r、c來實現的。至於f指針,是用來保證數據完整性的,zmq中一個完整的數據是可以分成多段往ypipe_t中寫的(下面源碼的write函數incomplete_參數),只有數據寫完整了才允許讀線程去讀數據,關於這點,可以在session與socket_base_t實例的通信中看到,后面會有文章專門詳細介紹,我們前面說過的mailbox_t底層也使用了ypipe_t,但mailbox不會把命令分段,每次都是完整的數據。那么接下來就看看這幾個指針時如何協同工作的吧。
先看ypipe_t構造的時候做了什么事情:
// Initialises the pipe. inline ypipe_t () { // Insert terminator element into the queue. queue.push ();//yqueue_t的尾指針加1,開始back_chunk為空,現在back_chunk指向第一個chunk_t塊的第一個位置 // Let all the pointers to point to the terminator. r = w = f = &queue.back (); c.set (&queue.back ()); }
在ypipe_t中,back_chunk+back_pos類似vector的end迭代器,上面的注釋"Let all the pointers to point to the terminator."也是這個意思,就是讓r、w、f、c四個指針都指向這個end迭代器,有關這點在write的時候能看清晰的感受到。那么做完這一步,他們關系像下面這個樣子:
ps.后面7個格子都屬於同一個chunk_t塊(yqueue_t介紹了chunk_t結點內含一個數組)
現在看看如何往queue寫數據:
inline void write (const T &value_, bool incomplete_) { // Place the value to the queue, add new terminator element. queue.back () = value_; queue.push (); // Move the "flush up to here" poiter. if (!incomplete_) f = &queue.back (); }
write往ypipe_t的end迭代器寫入內容,然后讓end迭代器下移一個位置,參數incomplete_=true表示數據分段,現在寫的只是其中一段,當incomplete=false時所有數據段都寫完了,把指針f指向end迭代器,所以從w指針到f指針這一段表示一個完整的數據。假設完整的數據為ABC,現在把數據分三段A、B、C寫入ypipe_t,調用write的形式為:
write(A,true); some code; write(B,true); some code; write(C,false); some code;
這時w,f,r,c的關系如下圖:
當一個完整的數據寫完后,寫線程會調用flush函數,讓讀線程看到這個完整的數據,如果讀線程睡眠了,寫線程有義務喚醒讀線程。這里涉及了幾個點:
- 如何讓讀線程看到這個數據?
- 如何判斷讀線程睡眠?
- 讀線程睡眠時,寫線程如何通知讀線程?
- 如何刷新
然后帶着這四個問題來看看flush函數的實現:
inline bool flush () { // If there are no un-flushed items, do nothing. if (w == f) return true; // Try to set 'c' to 'f'. if (c.cas (w, f) != w) { // Compare-and-swap was unseccessful because 'c' is NULL. // This means that the reader is asleep. Therefore we don't care about thread-safeness and update c in non-atomic manner. // We'll return false to let the caller know that reader is sleeping. c.set (f); w = f; return false; } // Reader is alive. Nothing special to do now. Just move the 'first un-flushed item' pointer to 'f'. w = f; return true; }
可以看到w==f 時,flush是直接返回的,什么也沒做,而當write函數的incomplete_=false時,把 f 指向了新的結點,這個時候 w!=f 了,flush函數才有所作為,所以w、f指針合作可用來告知flush函數現在能否刷新。
當w!=f時,真正執行刷新,可以看到所謂的刷新只做了兩件事情,c=f和w=f,其中c是讀寫線程共享的指針,所以用了原子操作cas來完成c=f的功能。cas這個函數我們在queue_t中也說過了,這里有必要再說一下她的大概實現:
T* cas(w,f){ ret=c ; if(c==w) c = f; return ret; }
可以看到c!=w時,設置失敗,並返回當前c的值。在寫線程中c和w總是指向同一個值的,都是指向f(刷新的目的就是兩個賦值,c=f和w=f),只有被讀線程改寫的情況下c!=w才成立,而讀線程只在一種情況下改寫c,那就是隊列中沒有數據時,讀線程把c置NULL然后睡眠,這點在說read的源碼時會看到。所以,cas的返回值代表了讀線程的狀態,返回NULL,說明讀線程睡眠了,此時cas沒有成功,所以需要使用set函數,把c指向f。這樣就完成了刷新工作。
先來回答上面的4個問題
- 如何讓讀線程看到這個數據?
令c=f,讀線程會檢查指針c,判斷是否有數據 - 如何判斷讀線程睡眠?
c.cas(w,f)返回NULL,讀線程睡眠 - 讀線程睡眠時,寫線程如何通知讀線程?
flush函數返回false,表明讀線程睡眠了,寫線程看到flush返回false之后會發送一個消息給讀線程。關於這點可以看上一篇中mailbox的send函數源碼 - 如何刷新
c=f ; w=f
再看一下刷新之后,w、f、c、r的關系:
再來看一下讀線程如何read:
// Reads an item from the pipe. Returns false if there is no value available. inline bool read (T *value_) { // Try to prefetch a value. if (!check_read ()) return false; // There was at least one value prefetched.Return it to the caller. *value_ = queue.front (); queue.pop (); return true; }
可以看到,read函數會先檢查隊列中是否有數據可讀,如果沒有數據可讀直接就返回了,如果有數據可讀,會在check_read中預取數據。這里面有兩個點,一個是檢查是否有數據可讀,一個是預取,所以帶着這兩個問題來看看check_read函數的源碼:
// Check whether item is available for reading. inline bool check_read () { // Was the value prefetched already? If so, return. if (&queue.front () != r && r)//判斷是否在前幾次調用read函數時已經預取數據了return true; // There's no prefetched value, so let us prefetch more values. // Prefetching is to simply retrieve the pointer from c in atomic fashion. // If there are no items to prefetch, set c to NULL (using compare-and-swap). r = c.cas (&queue.front (), NULL);//嘗試預取數據 // If there are no elements prefetched, exit. // During pipe's lifetime r should never be NULL, however,it can happen during pipe shutdown when items are being deallocated. if (&queue.front () == r || !r)//判斷是否成功預取數據 return false; // There was at least one value prefetched. return true; }
可以看到,check_read是通過指針r的位置來判斷是否有數據可讀的:如果指針r指向的是隊頭元素(r==&queue.front())或者r沒有指向任何元素(NULL)則說明隊列中並沒有可讀的數據,這個時候check_read嘗試去預取數據。所謂的預取就是令 r=c (cas函數就是返回c本身的值,看上面關於cas的實現), 而c在write中被指向f(見上圖),這時從queue.front()到f這個位置的數據都被預取出來了,然后每次調用read都能取出一段。值得注意的是,當c==&queue.front()時,代表數據被取完了,這時把c指向NULL,接着讀線程會睡眠,這也是給寫線程檢查讀線程是否睡眠的標志。
繼續上面寫入ABC數據的場景,第一次調用read時,會先check_read,把指針r指向指針c的位置(所謂的預取),這時r,c,w,f的關系如下:
這時,&queue.front()!=r,讀線程的就可以一直讀數據了,直到隊頭到達了指針r的位置,表示沒數據了,然后陷入睡眠。
綜上,就是ypipe_t無鎖隊列的實現,再總結一下過程:
數據可分段,寫線程一次寫入一段,所有數據都寫完后把f指向隊列的end迭代器位置,用以表示下一輪的寫位置,然后flush,把c和w指向end的位置,通知讀線程,讀線程check_read預取數據,把r也指向end位置,每次read的時候隊頭指針都下移一個位置,直到隊頭移動到r的位置,也就是end,表示沒數據了,讀線程把c指針置空,表示數據我都讀完了,我要睡了,下次再喊我。
下一篇將介紹session與socket_base_t的消息通信,還要一段時間再更新。