Redis有序集內部實現原理分析


Redis技術交流群 481804090

Redis:https://github.com/zwjlpeng/Redis_Deep_Read

Redis中支持的數據結構比Memcached要多的多啦,如基本的字符串哈希表列表集合可排序集,在這些基本數據結構上也提供了針對該數據結構的各種操作,這也是Redis所以流行起來的一個重要原因,當然Redis能夠流行起來的原因,遠遠不只這一個,如支持高並發的讀寫、數據的持久化、高效的內存管理及淘汰機制...

Redisgit提交歷史中,可以查到,2009/10/241.050版本,Redis開始支持可排序集,在該版本中,只提供了一條命令zadd,宏定義如下所示:

1     {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},

那么什么是可排序集呢? 從Redis 1.0開始就給我們提供了集合(Set)這種數據結構,集合就跟數學上的集合概念是一個道理【無序性,確定性,互異性】,集合里的元素無法保證元素的順序,而業務上的需求,可能不止是一個集合,而且還要求能夠快速地對集合元素進行排序,於是乎,Redis中提供了可排序集這么一種數據結構,似乎也是合情合理,無非就是在集合的基礎上增加了排序功能,也許有人會問,Redis中不是有Sort命令嘛,下面的操作不也是同樣可以達到對無序集的排序功能嘛,是的,是可以,但是在這里我們一直強調的是快速這兩個字,而Sort命令的時間復雜度為O(N+M*Log(M)),可排序集獲取一定范圍內元素的時間復雜度為O(log(N) + M)

root@bjpengpeng-VirtualBox:/home/bjpengpeng/redis-3.0.1/src# ./redis-cli 
127.0.0.1:6379> sort set
1) "1"
2) "2"
3) "3"
4) "5"
127.0.0.1:6379> sort set desc
1) "5"
2) "3"
3) "2"
4) "1"
127.0.0.1:6379>

在了解可排序集是如何實現之前,需要了解一種數據結構跳表(Skip List),跳表與AVL、紅黑樹...等相比,數據結構簡單,算法易懂,但查詢的時間復雜度與平衡二叉樹/紅黑樹相當,跳表的基本結構如下圖所示

上圖中整個跳表結構存放了4個元素5->10->20->30,圖中的紅色線表示查找元素30時,走的查找路線,從Head指針數組里最頂層的指針所指的20開始比較,與普通的鏈表查找相比,跳表的查詢可以跳躍元素,上圖中查詢30,發現3020大,則查找就是20開始,而普通鏈表的查詢必須一個元素一個元素的比較,時間復雜度為O(n)

有了上圖所示的跳表基本結構,再看看如何向跳表中插入元素,向跳表中插入元素,由於元素所在層級的隨機性,平均起來也是O(logn),說白了,就是查找元素應該插入在什么位置,然后就是普通的移動指針問題,再想想往有序單鏈表的插入操作吧,時間復雜度是不是也是O(n),下圖所示是往跳表中插入元素28的過程,圖中紅色線表示查找插入位置的過程,綠色線表示進行指針的移動,將該元素插入

 

有了跳表的查找及插入那么就看看在跳表中如何刪除元素吧,跳表中刪除元素的個程,查找要刪除的元素,找到后,進行指針的移動,過程如下圖所示,刪除元素30

有了上面的跳表基本結構圖及原理,自已設計及實現跳表吧,這樣當看到Redis里面的跳表結構時我們會更加熟悉,更容易理解些,【下面是對Redis中的跳表數據結構及相關代碼進行精減后形成的可運行代碼】,首先定義跳表的基本數據結構如下所示

 

#include<stdio.h>
#include<stdlib.h>

#define ZSKIPLIST_MAXLEVEL 32
#define ZSKIPLIST_P 0.25
#include <math.h>

//跳表節點
typedef struct zskiplistNode {
    int key;
    int value;
    struct zskiplistLevel {
        struct zskiplistNode *forward;
    } level[1];
} zskiplistNode;

//跳表
typedef struct zskiplist {
    struct zskiplistNode *header;
    int level;
} zskiplist;

 

在代碼中我們定義了跳表結構中保存的數據為Key->Value這種形式的鍵值對,注意的是skiplistNode里面內含了一個結構體,代表的是層級,並且定義了跳表的最大層級為32級,下面的代碼是創建空跳表,以及層級的獲取方式

//創建跳表的節點
zskiplistNode *zslCreateNode(int level, int key, int value) {
    zskiplistNode *zn = (zskiplistNode *)malloc(sizeof(*zn)+level*sizeof(zn->level));
    zn->key = key;
    zn->value = value;
    return zn;
}

//初始化跳表
zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;
    zsl = (zskiplist *) malloc(sizeof(*zsl));
    zsl->level = 1;//將層級設置為1
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,NULL,NULL);
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        zsl->header->level[j].forward = NULL;
    }
    return zsl;
}

//向跳表中插入元素時,隨機一個層級,表示插入在哪一層
int zslRandomLevel(void) {
    int level = 1;
    while ((rand()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

在這段代碼中,使用了隨機函數獲取過元素所在的層級,下面就是重點,向跳表中插入元素,插入元素之前先查找插入的位置,代碼如下所示,代碼中注意update[i]

//向跳表中插入元素
zskiplistNode *zslInsert(zskiplist *zsl, int key, int value) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i, level;
    x = zsl->header;
    //在跳表中尋找合適的位置並插入元素
	for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
            (x->level[i].forward->key < key ||
                (x->level[i].forward->key == key &&
                x->level[i].forward->value < value))) {
            x = x->level[i].forward;
        }
        update[i] = x;
    }
	//獲取元素所在的隨機層數
    level = zslRandomLevel();
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            update[i] = zsl->header;
        }
        zsl->level = level;
    }
	//為新創建的元素創建數據節點
    x = zslCreateNode(level,key,value);
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;
    }
    return x;
}

下面是代碼中刪除節點的操作,和插入節點類似

//跳表中刪除節點的操作
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;
    for (i = 0; i < zsl->level; i++) {
        if (update[i]->level[i].forward == x) {
            update[i]->level[i].forward = x->level[i].forward;
        }
    }
	//如果層數變了,相應的將層數進行減1操作
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
        zsl->level--;
}

//從跳表中刪除元素
int zslDelete(zskiplist *zsl, int key, int value) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;
    x = zsl->header;
	//尋找待刪除元素
	for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
            (x->level[i].forward->key < key ||
                (x->level[i].forward->key == key &&
                x->level[i].forward->value < value))) {
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    x = x->level[0].forward;
    if (x && key == x->key && x->value == value) {
        zslDeleteNode(zsl, x, update);
        //別忘了釋放節點所占用的存儲空間
		free(x);
        return 1;
    } else {
		//未找到相應的元素
        return 0; 
    }
    return 0;
}

最后,附上一個不優雅的測試樣例

//將鏈表中的元素打印出來
void printZslList(zskiplist *zsl) {
	zskiplistNode  *x;
	x = zsl->header;
	for (int i = zsl->level-1; i >= 0; i--) {
        zskiplistNode *p = x->level[i].forward; 
		while (p) {
			printf(" %d|%d ",p->key,p->value);
            p = p->level[i].forward;
        }
		printf("\n");
    }
}

int main() {
	zskiplist *list = zslCreate();
	zslInsert(list,1,2);
	zslInsert(list,4,5);
	zslInsert(list,2,2);
	zslInsert(list,7,2);
	zslInsert(list,7,3);
	zslInsert(list,7,3);
	printZslList(list);
	//zslDelete(list,7,2);
	printZslList(list);
}

有了上面的跳表理論基礎,理解Redis中跳表的實現就不是那么難了,先分析到這,下回續寫,【代碼以Redis 2.9為例】~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM