0.前言
Redis中有序集合zset需要使用skiplist作為存儲數據結構, 關於skiplist數據結構描述可以查詢wiki, 本文主要介紹Redis實現的skiplist的細節.
1.數據結構定義
typedef struct zskiplistNode {
/*成員object對象*/
robj *obj;
/*分數字段依賴此值對skiplist進行排序*/
double score;
/*插入層中指向上一個元素level數組*/
struct zskiplistNode *backward;
struct zskiplistLevel {
/*每層中指向下一個元素指針*/
struct zskiplistNode *forward;
/*距離下一個元素之間元素數量, 即forward指向的元素*/
unsigned int span;
} level[];
} zskiplistNode;
typedef struct zskiplist {
/*跳躍表頭節點和尾節點*/
struct zskiplistNode *header, *tail;
/*跳躍表中元素個數*/
unsigned long length;
/*跳躍表當前最大層數*/
int level;
} zskiplist;
2.創建跳躍表
創建跳躍表過程比較簡單, 初始化zskiplist數據結構, 跳躍表默認最大層數32層, 跳躍表是按score進行升序排列.
/*創建跳躍表*/
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;
zsl->length = 0;
/*初始化創建一個頭節點, 初始化節點信息*/
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}
/*創建一個跳躍表節點*/
zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
zn->obj = obj;
return zn;
}
3.添加元素
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
redisAssert(!isnan(score));
x = zsl->header;
/*從頭節點開始搜索, 一層一層向下搜索, 直到直到最后一層, update數組中保存着每層應該插入的位置*/
for (i = zsl->level-1; i >= 0; i--) {
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
/*記錄每層距離頭部位置的距離*/
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
/* 隨機一個層數, 如果隨機的層數是新的層數, 則需要給update數組中新的層數賦值*/
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
/*新的一層上一個指針肯定是header*/
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
/*創建新的節點插入到update數組對應的層*/
x = zslCreateNode(level,score,obj);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/*
header update[i] x update[i]->forward
|-----------|-----------|-----------|-----------|-----------|-----------|
|<---update[i].span---->|
|<-------rank[i]------->|
|<-------------------rank[0]------------------->|
更新update數組中span值和新插入元素span值, rank[0]存儲的是x元素距離頭部的距離, rank[i]存儲的是update[i]距離頭部的距離, 上面給出了示意圖
*/
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* level可能小zsl->level, 無變動的元素span依次增加1*/
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
/*上一個元素level數組, 重新賦值*/
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
/*下一個元素為空,則表示x為尾部元素*/
zsl->tail = x;
zsl->length++;
return x;
}
4.獲取排名
排名其實就是元素在skiplist中排列的序號, 獲取排名需要給出分數和成員member, 通過score查找, 匹配member成員, 時間復雜度log(N). 由於skiplist是升序排列的,因此函數返回的rank是score按升序排列的rank, 如果想獲取降序rank應該是(length-rank).
unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
zskiplistNode *x;
unsigned long rank = 0;
int i;
x = zsl->header;
/*循環遍歷並累加每層的span值, 獲取總的排名*/
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
rank += x->level[i].span;
x = x->level[i].forward;
}
/* 判斷成員是否相等 */
if (x->obj && equalStringObjects(x->obj,o)) {
/*升序排列的排名*/
return rank;
}
}
return 0;
}
5.根據排名查找元素
/*通過排名查找元素, rank是從1開始, rank是升序排列的rank值*/
zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
zskiplistNode *x;
unsigned long traversed = 0;
int i;
/*遍歷每一層,並記錄排名, 與待查rank比較, 相等則找到, 找不到則返回NULL*/
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
{
traversed += x->level[i].span;
x = x->level[i].forward;
}
/*找到直接返回*/
if (traversed == rank) {
return x;
}
}
return NULL;
}
6.刪除元素
刪除元素需要精確匹配到分數和member
/*刪除一個元素*/
int zslDelete(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0)))
x = x->level[i].forward;
update[i] = x;
}
/* 由於score值可能相等, 因此需要精確匹配score和obj值 */
x = x->level[0].forward;
if (x && score == x->score && equalStringObjects(x->obj,obj)) {
zslDeleteNode(zsl, x, update);
zslFreeNode(x);
return 1;
}
return 0; /* not found */
}
/* 具體進行刪除元素所在節點*/
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
/*刪除元素需要更新update元素的span值*/
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
update[i]->level[i].span -= 1;
}
}
if (x->level[0].forward) {
/*非尾部元素則需要重置backforward指針*/
x->level[0].forward->backward = x->backward;
} else {
/*刪除x可能是最后一個元素, 需要重置尾部指針*/
zsl->tail = x->backward;
}
/*刪除元素位於最上層, 並且僅有此一個元素, 刪除之后,需要降低跳躍表層數*/
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
zsl->length--;
}