redis源碼學習-skiplist


1.初步認識跳躍表

圖中所示,跳躍表與普通鏈表的區別在於,每一個節點可以有多個后置節點,圖中是一個4層的跳躍表

第0層: head->3->6->7->9->12->17->19->21->25->26->tail
第1層: head->6->9->17->25->tail
第2層: head->6->25->tail
第3層: head->6->tail

傳統意義的單鏈表是一個線性結構,向有序的鏈表中插入一個節點需要O(n)的時間,查找操作需要O(n)的時間。如果我們使用圖中所示的跳躍表,就可以減少查找所需時間為O(n/2),因為我們可以先通過每個節點的最上面的指針先進行查找,這樣子就能跳過一半的節點。比如我們想查找19,首先和6比較,大於6之后,在和9進行比較,然后在和12進行比較......最后比較到21的時候,發現21大於19,說明查找的點在17和21之間,從這個過程中,我們可以看出,查找的時候跳過了3、7、12等點,因此查找的復雜度為O(n/2)。

2.redis中實現的skiplist

  • 結構體 zskiplist

    typedef struct zskiplist {
    
      // 表頭節點和表尾節點
      struct zskiplistNode *header, *tail;
    
      // 表中節點的數量
      unsigned long length;
    
      // 表中層數最大的節點的層數
      int level;
    
     } zskiplist;
     // 節點
     typedef struct zskiplistNode {
    
      // 成員對象
      robj *obj;
    
      // 分值
      double score;
    
      // 后退指針
      struct zskiplistNode *backward; // 前一個節點
    
      // 層
      struct zskiplistLevel {
    
          // 前進指針
          struct zskiplistNode *forward; // 下一個節點
    
          // 跨度
          unsigned int span;  // 當前節點在第i層到下一個節點forward需要跨過的節點數
    
      } level[];
    
    } zskiplistNode;
    

redis實現的跳躍表特點:

1.zskiplistNode中保存着前置節點backward
2.跳躍表的層數最大值32,每次插入新節點都會生成一個隨機的level(1~32)作為新節點的層數
3.刪除節點可能會引起跳躍表層數的下降,插入節點可能會引起跳躍表層數上升
4.查找節點的時間復雜度平均為 O(logn)
5.插入和刪除的成本都比較低,擁有平衡二叉樹的查找性能
  • 創建一條skiplist

       // 創建一條長度為0的skiplist
         zskiplist *zslCreate(void) {
         int j;
         zskiplist *zsl;
    
     // 分配空間
     zsl = zmalloc(sizeof(*zsl));
    
    
     zsl->level = 1; // 起始層數
     zsl->length = 0; // 跳躍表長度
    
     // 初始化表頭節點
     // T = O(1)
     zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
     for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
         zsl->header->level[j].forward = NULL;
         zsl->header->level[j].span = 0;
     }
     zsl->header->backward = NULL;
    
     // 設置表尾
     zsl->tail = NULL;
    
     return zsl;
    }
    
    // 創建新節點
     zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
    
     // 分配空間
     zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
    
     // 設置屬性
     zn->score = score;
     zn->obj = obj;
    
     return zn;
    }
    
  • 插入一個節點

     zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
       zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
       unsigned int rank[ZSKIPLIST_MAXLEVEL];
       int i, level;
    
       redisAssert(!isnan(score)); // 保證score合法性
    
       // level越高每一次forward跨越的節點越多,先大間距的查找,隨着level的減小,查找范圍逐漸縮小
       x = zsl->header;
       for (i = zsl->level-1; i >= 0; i--) {
    
           // rank[i]用來記錄當前節點x與header的距離,隨着x的移動,rank[i]實時更新
           rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
    
           // 沿着前進指針遍歷跳躍表
           // T_wrost = O(N^2), T_avg = O(N log N)
           while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                   // 比對分值
                   (x->level[i].forward->score == score &&
                   // 比對成員, T = O(N)
                   compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
    
               // 記錄沿途跨越了多少個節點
               rank[i] += x->level[i].span;
    
               // 移動至下一指針
               x = x->level[i].forward;
           }
           // 第i層第一個大於 score的節點,將作為插入節點obj在第i層連接的的前一個節點
           update[i] = x;
       }
    
       /* we assume the key is not already inside, since we allow duplicated
        * scores, and the re-insertion of score and redis object should never
        * happen since the caller of zslInsert() should test in the hash table
        * if the element is already inside or not. 
        *
        * zslInsert() 的調用者會確保同分值且同成員的元素不會出現,
        * 所以這里不需要進一步進行檢查,可以直接創建新元素。
        */
    
       // 獲取一個隨機值作為新節點的層數
       // T = O(N)
       level = zslRandomLevel();
    
       // 如果新節點的層數比表中其他節點的層數都要大
       // 那么初始化表頭節點中未使用的層,並將它們記錄到 update 數組中
       // 將來也指向新節點
       if (level > zsl->level) {
    
           // 初始化未使用層
           // T = O(1)
           for (i = zsl->level; i < level; i++) {
               rank[i] = 0;
               update[i] = zsl->header;
               update[i]->level[i].span = zsl->length;
           }
    
           // 更新表中節點最大層數
           zsl->level = level;
       }
    
       // 創建新節點
       x = zslCreateNode(level,score,obj);
    
       // 將前面記錄的指針指向新節點,並做相應的設置
       // update[i]保存着第i層x的前置節點,rank[i]保存的是第i層x的前置節點離header的距離,rank[0]+1即是x離header的距離
       for (i = 0; i < level; i++) {
    
           // 設置新節點的 forward 指針
           x->level[i].forward = update[i]->level[i].forward;
    
           // 將沿途記錄的各個節點的 forward 指針指向新節點
           update[i]->level[i].forward = x;
    
           /* update span covered by update[i] as x is inserted here */
           // 用x前置節點到x后置節點的跨度減去x到前置節點的距離等於x到后置節點的跨度
           x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
    
           // 更新新節點插入之后,沿途節點的 span 值
           // 其中的 +1 計算的是新節點
           update[i]->level[i].span = (rank[0] - rank[i]) + 1;   // (rank[0] - rank[i]) 為x距離update[i]的距離
       }
    
       /* increment span for untouched levels */
       // 未接觸的節點的 span 值也需要增一,因為這些節點到后置節點中間插入了一個節點x
       // T = O(1)
       for (i = level; i < zsl->level; i++) {
           update[i]->level[i].span++;
       }
    
       // 設置新節點的后退指針
       x->backward = (update[0] == zsl->header) ? NULL : update[0];
       if (x->level[0].forward)
           x->level[0].forward->backward = x;
       else
           zsl->tail = x;  // x是跳躍表的尾部節點
    
       // 跳躍表的節點計數增一
       zsl->length++;
    
       return x;
     }
    
  • 刪除一個節點

    int zslDelete(zskiplist *zsl, double score, robj *obj) {
       zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
       int i;
    
     // 遍歷跳躍表,查找目標節點,並記錄所有沿途節點
     // T_wrost = O(N^2), T_avg = O(N log N)
     x = zsl->header;
     for (i = zsl->level-1; i >= 0; i--) {
    
         // 遍歷跳躍表的復雜度為 T_wrost = O(N), T_avg = O(log N)
         while (x->level[i].forward &&
             (x->level[i].forward->score < score ||
                 // 比對分值
                 (x->level[i].forward->score == score &&
                 // 比對對象,T = O(N)
                 compareStringObjects(x->level[i].forward->obj,obj) < 0)))
    
             // 沿着前進指針移動
             x = x->level[i].forward;
    
         // 第i層上obj的前一個節點
         update[i] = x;
     }
    
     /* We may have multiple elements with the same score, what we need
      * is to find the element with both the right score and object. 
      *
      * 檢查找到的元素 x ,只有在它的分值和對象都相同時,才將它刪除。
      */
     x = x->level[0].forward; // 指向目標節點
     if (x && score == x->score && equalStringObjects(x->obj,obj)) { // 目標節點與obj一樣
         // T = O(1)
         zslDeleteNode(zsl, x, update); // 已知目標節點每一層的前置節點,刪除目標節點
         // T = O(1)
         zslFreeNode(x); // 釋放目標節點內存
         return 1;
     } else { // 目標節點與obj不匹配
         return 0; /* not found */
     }
    
     return 0; /* not found */
    }
    
    // update數組存儲着要刪除的節點x的前置節點
    void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
     int i;
    
     // 更新所有和被刪除節點 x 有關的節點的指針,解除它們之間的關系
     // T = O(1)
     for (i = 0; i < zsl->level; i++) {
         if (update[i]->level[i].forward == x) {  // update[i]是第i層在x前面的節點而且是前置節點
             update[i]->level[i].span += x->level[i].span - 1; // 更新前置節點的span
             update[i]->level[i].forward = x->level[i].forward; // 更新前置節點的forward
         } else {  // update[i]是第i層在x前面的節點,沒有和x建立連接
             update[i]->level[i].span -= 1;  // 減去中間少的1個
         }
     }
    
     // 更新被刪除節點 x 的前進和后退指針
     if (x->level[0].forward) {
         x->level[0].forward->backward = x->backward;
     } else { // x是尾部節點
         zsl->tail = x->backward;
     }
    
     // 更新跳躍表最大層數(只在被刪除節點是跳躍表中最高的節點時才執行)
     // T = O(1)
     while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
         zsl->level--;
    
     // 跳躍表節點計數器減一
     zsl->length--;
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM