Nodejs事件引擎libuv源碼剖析之:高效隊列(queue)的實現


     聲明:本文為原創博文,轉載請注明出處。

     在libuv中,有一個只使用簡單的宏封裝成的高效隊列(queue),現在我們就來看一下它是怎么實現的。

     首先,看一下queue中最基本的幾個宏:

1 typedef void *QUEUE[2];
2 
3 /* Private macros. */
4 #define QUEUE_NEXT(q)       (*(QUEUE **) &((*(q))[0]))
5 #define QUEUE_PREV(q)       (*(QUEUE **) &((*(q))[1]))
6 #define QUEUE_PREV_NEXT(q)  (QUEUE_NEXT(QUEUE_PREV(q)))
7 #define QUEUE_NEXT_PREV(q)  (QUEUE_PREV(QUEUE_NEXT(q)))

     首先,QUEUE被聲明成一個"具有兩個char*元素的指針數組",如下圖:

                       

 

      接下來看第一個宏: QUEUE_NEXT(q) ,其中q代表一個指向QUEUE數組的指針,其返回值是下一個節點QUEUE的指針,其用法大致如下:

1 static QUEUE queue;
2 QUEUE_NEXT(&queue);

      可以看到,非常簡單的操作便可以取得queue的下一個節點地址,那么它是如何做到的呢,來看一下QUEUE_NEXT的實現:

  (*(QUEUE **) &((*(q))[0]))

     這個表達式看似復雜,其實它就相當於"(*q)[0]",也就是代表QUEUE數組的第一個元素,那么它為什么要寫這么復雜呢,主要有兩個原因:類型保持、成為左值。   

     QUEUE_NEXT(&queue) 擴展之后相當於:(*(QUEUE **)&((*(&queue))[0])),我們將其拆開來看(如下圖所示),共分為四個部分:

     第(1)個部分,先對數組取地址(&)再對其解引用(*),最后再作[0]運算,就相當於queue[0],這里補充一下知識:假設有一個數組int a[10],當訪問數組時,a[1]相當於*(a+1),而數組名相當於數組首元素首地址,而&a在數值上雖然與a的值相同,但是&a從含義上講是代表整個數組的首地址(類型為整個數組),因此&a + 1操作將跨域整個數組的長度,因此(&a)[1]並不是訪問a[1],(*(&a))[1]才是訪問a[1],具體原理可以看我的另一篇博文:圖解多級指針與多維數組

     第(2)個部分,對數組首元素queue[0]取地址。

     第(3)個部分,對第二部分取得的地址進行強制類型轉換,將其強轉為QUEUE **,因為QUEUE的元素類型本身為void *,而實際中每一個元素都需要指向QUEUE地址,因此對於&queue[0](二級指針),就需要將其強轉為QUEUE **。

     第(4)個部分,對上文強轉后的地址進行“解引用”操作,也就是對&queue[0]解引用之后相當於queue[0],為什么要這么做呢?這是為了使其成為左值,左值的簡單定義是:占用實際的內存、可以對其進行取地址操作的變量都是左值,而c語言中(其實其他語言也是一樣),對於一個變量(或者表達式)進行強制類型轉換時,其實並不是改變該變量本身的類型,而是產生一個變量的副本,而這個副本並不是左值(因為並不能對其取地址),它是一個右值,舉個例子:int a = 1; (char) a = 2;這樣會報錯。而如果改成這樣:int a = 1; (*(char *)(&a)) = 2;就正確了。

     至此,這個稍微有點復雜的表達式算是分析清楚了,對於QUEUE_PREV原理類似,在此不再贅述。

         

        接下來看看對隊列的其他操作,這些操作都是建立在前面四個基礎宏定義基礎上的(注:以下所有宏的參數類型都為:QUEUE *):

1)隊列初始化

1 #define QUEUE_INIT(q)                                                         \
2   do {                                                                        \
3     QUEUE_NEXT(q) = (q);                                                      \
4     QUEUE_PREV(q) = (q);                                                      \
5   }                                                                           \
6   while (0)

      初始化隊列q就是將其next和prev的指針指向自己。

2)隊列為空判斷

1 #define QUEUE_EMPTY(q)                                                        \
2   ((const QUEUE *) (q) == (const QUEUE *) QUEUE_NEXT(q))

      只要q的next指針還是指向自己,就說明隊列為空(只有鏈表頭結點)。

3)隊列遍歷

1 #define QUEUE_FOREACH(q, h)                                                   \
2   for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q))

    遍歷隊列q,直到遍歷到h為止。注意:在遍歷時,不要同時對隊列q進行插入、刪除操作,否則會出現未知錯誤。

4)獲取隊列頭

1 #define QUEUE_HEAD(q)                                                         \
2   (QUEUE_NEXT(q))

    鏈表頭節點的next返回的就是隊列的head節點(具體原理可以看下文的圖解)。

5)隊列相加

1 #define QUEUE_ADD(h, n)                                                       \
2   do {                                                                        \
3     QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n);                                       \
4     QUEUE_NEXT_PREV(n) = QUEUE_PREV(h);                                       \
5     QUEUE_PREV(h) = QUEUE_PREV(n);                                            \
6     QUEUE_PREV_NEXT(h) = (h);                                                 \
7   }                                                                           \
8   while (0)

      將隊列n加入到隊列h的尾部,假設兩個對象的初始狀態為:

      經過以上的ADD步驟后,狀態為:

 

6)隊列分割

 1 #define QUEUE_SPLIT(h, q, n)                                                  \
 2   do {                                                                        \
 3     QUEUE_PREV(n) = QUEUE_PREV(h);                                            \
 4     QUEUE_PREV_NEXT(n) = (n);                                                 \
 5     QUEUE_NEXT(n) = (q);                                                      \
 6     QUEUE_PREV(h) = QUEUE_PREV(q);                                            \
 7     QUEUE_PREV_NEXT(h) = (h);                                                 \
 8     QUEUE_PREV(q) = (n);                                                      \
 9   }                                                                           \
10   while (0)

     隊列分割就是上述ADD的逆過程,將隊列h以q為分割點進行分割,分割出來的新隊列為n(n為分出來的雙向循環鏈表的頭結點)。此處不再單獨提供圖示。

7)隊列移動

 1 #define QUEUE_MOVE(h, n)                                                      \
 2   do {                                                                        \
 3     if (QUEUE_EMPTY(h))                                                       \
 4       QUEUE_INIT(n);                                                          \
 5     else {                                                                    \
 6       QUEUE* q = QUEUE_HEAD(h);                                               \
 7       QUEUE_SPLIT(h, q, n);                                                   \
 8     }                                                                         \
 9   }                                                                           \
10   while (0)

      將隊列h移動到n隊里中,首先如果h隊列為空,那么就把n初始化為空;如果h不為空,那么就先取出h隊列的head節點,然后調用前面論述過的隊列分割宏,從head節點開始分割,等價於把h隊列的所有內容(輸了h自身,因為它是鏈表頭節點)全部轉移到n隊里里面。

8)向隊列頭插入節點

1 #define QUEUE_INSERT_HEAD(h, q)                                               \
2   do {                                                                        \
3     QUEUE_NEXT(q) = QUEUE_NEXT(h);                                            \
4     QUEUE_PREV(q) = (h);                                                      \
5     QUEUE_NEXT_PREV(q) = (q);                                                 \
6     QUEUE_NEXT(h) = (q);                                                      \
7   }                                                                           \
8   while (0)

     假設h隊列起始狀態為空,則兩個節點起始狀態為:  

 

 

     則執行插入后的狀態為:

    現在假設再插入一個節點n,則初始狀態為:


 

     插入之后的狀態為:

    

 

9)向隊列尾部插入節點

1 #define QUEUE_INSERT_TAIL(h, q)                                               \
2   do {                                                                        \
3     QUEUE_NEXT(q) = (h);                                                      \
4     QUEUE_PREV(q) = QUEUE_PREV(h);                                            \
5     QUEUE_PREV_NEXT(q) = (q);                                                 \
6     QUEUE_PREV(h) = (q);                                                      \
7   }                                                                           \
8   while (0)

      將q節點插入h隊列的尾部,假設h隊列目前為空,則初始狀態為:

      執行插入之后的狀態為:

 

     現在假設再插入一個n,則初始狀態為:

 

      執行插入之后的狀態為:

    

      不容易看嗎?稍微調整一下,就是這樣(循環雙向鏈表):

 

      由此,可以清楚的看到,QUEUE(h)作為隊列頭,它的next就是隊列的第一個head節點。

10)隊列刪除

1 #define QUEUE_REMOVE(q)                                                       \
2   do {                                                                        \
3     QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q);                                       \
4     QUEUE_NEXT_PREV(q) = QUEUE_PREV(q);                                       \
5   }                                                                           \
6   while (0)

      隊列刪除的原理很簡單,現將q前一個節點的next指針修改為指向q的next指針指向的下一個節點,再q的下一個節點的prev指針修改為指向q當前指向的前一個節點。

11)在隊列中存取用戶數據

1 #define QUEUE_DATA(ptr, type, field)                                          \
2   ((type *) ((char *) (ptr) - offsetof(type, field)))

      在前面的論述中我們清楚了隊列節點的增刪查等操作,但是我們絲毫沒有看到可以存用戶數據的地方,其實,如果你熟悉linux內核的話就會很容易理解,這種隊列並不限制你的用戶數據類型,你需要做的,只是將QUEUE節點內嵌到自己定義的數據類型中即可,然后讓它們串起來。大致概念如下:

 1 struct user_s1 {
 2     int age;
 3     char* name;
 4 
 5     QUEUE node;
 6 };
 7 
 8 struct user_s2 {
 9     int age;
10     char* name;
11     char* address;
12 
13     QUEUE node;
14 };

       兩種結構體雖然是不同的數據類型,但是它們都包含了QUEUE節點,可以將他們的node成員組成雙向循環鏈表進行管理,這樣就可以以隊列方式來管理它們的node成員了,但是拿到node成員(其實是地址)之后,怎么拿到用戶數據呢?這就用到了QUEUE_DATA宏(熟悉Linux內核編程的人都熟悉,他就是container_of),拿到node成員的地址之后,只要將該地址減去node成員在結構體中的偏移,就可以拿到整個結構體的起始地址,也就拿到了用戶數據了。下面再來一張圖:

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM