Redis 跳躍表實現


轉載自:http://www.cnblogs.com/WJ5888/p/4516782.html

Redis中支持的數據結構比Memcached要多,如基本的字符串、哈希表、列表、集合、可排序集,在這些基本數據結構上也提供了針對該數據結構的各種操作,這也是Redis之所以流行起來的一個重要原因,當然Redis能夠流行起來的原因,遠遠不只這一個,如支持高並發的讀寫、數據的持久化、高效的內存管理及淘汰機制...

從Redis的git提交歷史中,可以查到,2009/10/24在1.050版本,Redis開始支持可排序集,在該版本中,只提供了一條命令zadd,宏定義如下所示:

 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},

那么什么是可排序集呢? 從Redis 1.0開始就給我們提供了集合(Set)這種數據結構,集合就跟數學上的集合概念是一個道理【無序性,確定性,互異性】,集合里的元素無法保證元素的順序,而業務上的需求,可能不止是一個集合,而且還要求能夠快速地對集合元素進行排序,於是乎,Redis中提供了可排序集這么一種數據結構,似乎也是合情合理,無非就是在集合的基礎上增加了排序功能,也許有人會問,Redis中不是有Sort命令嘛,下面的操作不也是同樣可以達到對無序集的排序功能嘛,是的,是可以,但是在這里我們一直強調的是快速這兩個字,而Sort命令的時間復雜度為O(N+M*Log(M)),可排序集獲取一定范圍內元素的時間復雜度為O(log(N) + M)

root@bjpengpeng-VirtualBox:/home/bjpengpeng/redis-3.0.1/src# ./redis-cli 
127.0.0.1:6379> sort set
1) "1"
2) "2"
3) "3"
4) "5"
127.0.0.1:6379> sort set desc
1) "5"
2) "3"
3) "2"
4) "1"

在了解可排序集是如何實現之前,需要了解一種數據結構跳表(Skip List),跳表與AVL、紅黑樹...等相比,數據結構簡單,算法易懂,但查詢的時間復雜度與平衡二叉樹/紅黑樹相當,跳表的基本結構如下圖所示:

  Redis的跳躍表由zskiplistNode和skiplist兩個結構定義,其中 zskiplistNode結構用於表示跳躍表節點,而 zskiplist結構則用於保存跳躍表節點的相關信息,比如節點的數量,以及指向表頭節點和表尾節點的指針等等。

Redis跳躍表

      上圖展示了一個跳躍表示例,其中最左邊的是 skiplist結構,該結構包含以下屬性。

  • header:指向跳躍表的表頭節點,通過這個指針程序定位表頭節點的時間復雜度就為O(1)

  • tail:指向跳躍表的表尾節點,通過這個指針程序定位表尾節點的時間復雜度就為O(1)

  • level:記錄目前跳躍表內,層數最大的那個節點的層數(表頭節點的層數不計算在內),通過這個屬性可以再O(1)的時間復雜度內獲取層高最好的節點的層數。

  • length:記錄跳躍表的長度,也即是,跳躍表目前包含節點的數量(表頭節點不計算在內),通過這個屬性,程序可以再O(1)的時間復雜度內返回跳躍表的長度。

    結構右方的是四個 zskiplistNode結構,該結構包含以下屬性

  • 層(level):

        節點中用1、2、L3等字樣標記節點的各個層,L1代表第一層,L代表第二層,以此類推。

        每個層都帶有兩個屬性:前進指針和跨度。前進指針用於訪問位於表尾方向的其他節點,而跨度則記錄了前進指針所指向節點和當前節點的距離(跨度越大、距離越遠)。在上圖中,連線上帶有數字的箭頭就代表前進指針,而那個數字就是跨度。當程序從表頭向表尾進行遍歷時,訪問會沿着層的前進指針進行。

        每次創建一個新跳躍表節點的時候,程序都根據冪次定律(powerlaw,越大的數出現的概率越小)隨機生成一個介於1和32之間的值作為level數組的大小,這個大小就是層的“高度”。

  • 后退(backward)指針:

        節點中用BW字樣標記節點的后退指針,它指向位於當前節點的前一個節點。后退指針在程序從表尾向表頭遍歷時使用。與前進指針所不同的是每個節點只有一個后退指針,因此每次只能后退一個節點。

  • 分值(score):

        各個節點中的1.0、2.0和3.0是節點所保存的分值。在跳躍表中,節點按各自所保存的分值從小到大排列。

  • 成員對象(oj):

        各個節點中的o1、o2和o3是節點所保存的成員對象。在同一個跳躍表中,各個節點保存的成員對象必須是唯一的,但是多個節點保存的分值卻可以是相同的:分值相同的節點將按照成員對象在字典序中的大小來進行排序,成員對象較小的節點會排在前面(靠近表頭的方向),而成員對象較大的節點則會排在后面(靠近表尾的方向)。

zskiplistNode

有了上面的跳表基本結構圖及原理,自已設計及實現跳表吧,這樣當看到Redis里面的跳表結構時我們會更加熟悉,更容易理解些,【下面是對Redis中的跳表數據結構及相關代碼進行精減后形成的可運行代碼】,首先定義跳表的基本數據結構如下所示

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

#include<stdio.h>

#include<stdlib.h>

 

#define ZSKIPLIST_MAXLEVEL 32

#define ZSKIPLIST_P 0.25

#include <math.h>

 

//跳表節點

typedef struct zskiplistNode {

    int key;

    int value;

    struct zskiplistLevel {

        struct zskiplistNode *forward;

    } level[1];

} zskiplistNode;

 

//跳表

typedef struct zskiplist {

    struct zskiplistNode *header;

    int level;

} zskiplist;

 

在代碼中我們定義了跳表結構中保存的數據為Key->Value這種形式的鍵值對,注意的是skiplistNode里面內含了一個結構體,代表的是層級,並且定義了跳表的最大層級為32級,下面的代碼是創建空跳表,以及層級的獲取方式

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

//創建跳表的節點

zskiplistNode *zslCreateNode(int level, int key, int value) {

    zskiplistNode *zn = (zskiplistNode *)malloc(sizeof(*zn)+level*sizeof(zn->level));

    zn->key = key;

    zn->value = value;

    return zn;

}

 

//初始化跳表

zskiplist *zslCreate(void) {

    int j;

    zskiplist *zsl;

    zsl = (zskiplist *) malloc(sizeof(*zsl));

    zsl->level = 1;//將層級設置為1

    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,NULL,NULL);

    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {

        zsl->header->level[j].forward = NULL;

    }

    return zsl;

}

 

//向跳表中插入元素時,隨機一個層級,表示插入在哪一層

int zslRandomLevel(void) {

    int level = 1;

    while ((rand()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))

        level += 1;

    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;

}

在這段代碼中,使用了隨機函數獲取過元素所在的層級,下面就是重點,向跳表中插入元素,插入元素之前先查找插入的位置,代碼如下所示,代碼中注意update[i]

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

//向跳表中插入元素

zskiplistNode *zslInsert(zskiplist *zsl, int key, int value) {

    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;

    int i, level;

    x = zsl->header;

    //在跳表中尋找合適的位置並插入元素

    for (i = zsl->level-1; i >= 0; i--) {

        while (x->level[i].forward &&

            (x->level[i].forward->key < key ||

                (x->level[i].forward->key == key &&

                x->level[i].forward->value < value))) {

            x = x->level[i].forward;

        }

        update[i] = x;

    }

    //獲取元素所在的隨機層數

    level = zslRandomLevel();

    if (level > zsl->level) {

        for (i = zsl->level; i < level; i++) {

            update[i] = zsl->header;

        }

        zsl->level = level;

    }

    //為新創建的元素創建數據節點

    x = zslCreateNode(level,key,value);

    for (i = 0; i < level; i++) {

        x->level[i].forward = update[i]->level[i].forward;

        update[i]->level[i].forward = x;

    }

    return x;

}

下面是代碼中刪除節點的操作,和插入節點類似

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

//跳表中刪除節點的操作

void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {

    int i;

    for (i = 0; i < zsl->level; i++) {

        if (update[i]->level[i].forward == x) {

            update[i]->level[i].forward = x->level[i].forward;

        }

    }

    //如果層數變了,相應的將層數進行減1操作

    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)

        zsl->level--;

}

 

//從跳表中刪除元素

int zslDelete(zskiplist *zsl, int key, int value) {

    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;

    int i;

    x = zsl->header;

    //尋找待刪除元素

    for (i = zsl->level-1; i >= 0; i--) {

        while (x->level[i].forward &&

            (x->level[i].forward->key < key ||

                (x->level[i].forward->key == key &&

                x->level[i].forward->value < value))) {

            x = x->level[i].forward;

        }

        update[i] = x;

    }

    x = x->level[0].forward;

    if (x && key == x->key && x->value == value) {

        zslDeleteNode(zsl, x, update);

        //別忘了釋放節點所占用的存儲空間

        free(x);

        return 1;

    else {

        //未找到相應的元素

        return 0;

    }

    return 0;

}

最后,附上一個不優雅的測試樣例

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

//將鏈表中的元素打印出來

void printZslList(zskiplist *zsl) {

    zskiplistNode  *x;

    x = zsl->header;

    for (int i = zsl->level-1; i >= 0; i--) {

        zskiplistNode *p = x->level[i].forward;

        while (p) {

            printf(" %d|%d ",p->key,p->value);

            p = p->level[i].forward;

        }

        printf("\n");

    }

}

 

int main() {

    zskiplist *list = zslCreate();

    zslInsert(list,1,2);

    zslInsert(list,4,5);

    zslInsert(list,2,2);

    zslInsert(list,7,2);

    zslInsert(list,7,3);

    zslInsert(list,7,3);

    printZslList(list);

    //zslDelete(list,7,2);

    printZslList(list);

}

有了上面的跳表理論基礎,理解Redis中跳表的實現就不是那么難了

Redis中跳表的基本數據結構定義如下,與基本跳表數據結構相比,在Redis中實現的跳表其特點是不僅有前向指針,也存在后向指針,而且在前向指針的結構中存在span跨度字段,這個跨度字段的出現有助於快速計算元素在整個集合中的排名

//定義跳表的基本數據節點
typedef struct zskiplistNode {
    robj *obj; // zset value
    double score;// zset score
    struct zskiplistNode *backward;//后向指針
    struct zskiplistLevel {//前向指針
        struct zskiplistNode *forward;
        unsigned int span;
    } level[];
} zskiplistNode;

typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;

//有序集數據結構
typedef struct zset {
    dict *dict;//字典存放value,以value為key
    zskiplist *zsl;
} zset;

將如上數據結構轉化成更形式化的圖形表示,如下圖所示

 

在上圖中,可以看到header指針指向的是一個具有固定層級(32層)的表頭節點,為什么定義成32,是因為定義成32層理論上對於2^32-1個元素的查詢最優,而2^32=4294967296個元素,對於絕大多數的應用來說,已經足夠了,所以就定義成了32層,到於為什么查詢最優,你可以將其想像成一個32層的完全二叉排序樹,算算這個樹中節點的數量

Redis中有序集另一個值得注意的地方就是當Score相同的時候,是如何存儲的,當集合中兩個值的Score相同,這時在跳表中存儲會比較這兩個值,對這兩個值按字典排序存儲在跳表結構中

有了上述的數據結構相關的基礎知識,來看看Redis對zskiplist/zskiplistNode的相關操作,源碼如下所示(源碼均出自t_zset.c)

創建跳表結構的源碼

//#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;
    //分配內存
    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;//默認層級為1
    zsl->length = 0;//跳表長度設置為0
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        //因為沒有任何元素,將表頭節點的前向指針均設置為0
        zsl->header->level[j].forward = NULL;
        //將表頭節點前向指針結構中的跨度字段均設為0
        zsl->header->level[j].span = 0;
    }
    //表頭后向指針設置成0
    zsl->header->backward = NULL;
    //表尾節點設置成NULL
    zsl->tail = NULL;
    return zsl;
}

在上述代碼中調用了zslCreateNode這個函數,函數的源碼如下所示=

zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
    zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
    zn->score = score;
    zn->obj = obj;
    return zn;
}

執行完上述代碼之后會創建如下圖所示的跳表結構

 

創建了跳表的基本結構,下面就是插入操作了,Redis中源碼如下所示

zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; //update[32]
    unsigned int rank[ZSKIPLIST_MAXLEVEL];//rank[32]
    int i, level;
    redisAssert(!isnan(score));
    x = zsl->header;
    //尋找元素插入的位置 
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
            (x->level[i].forward->score < score || //以下是得分相同的情況下,比較value的字典排序
                (x->level[i].forward->score == score &&compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    //產生隨機層數
    level = zslRandomLevel();
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        //記錄最大層數
        zsl->level = level;
    }
    //產生跳表節點
    x = zslCreateNode(level,score,obj);
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;
        //更新跨度
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }
    //此種情況只會出現在隨機出來的層數小於最大層數時
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}

上述源碼中,有一個產生隨機層數的函數,源代碼如下所示:

int zslRandomLevel(void) {
    int level = 1;
    //#define ZSKIPLIST_P 0.25 
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    //#ZSKIPLIST_MAXLEVEL 32
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

圖形化的形式描述如下圖所示:

理解了插入操作,其他查詢,刪除,求范圍操作基本上類似


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM