Redis數據類型使用場景及有序集合SortedSet底層實現詳解


  Redis常用數據類型有字符串String、字典dict、列表List、集合Set、有序集合SortedSet,本文將簡單介紹各數據類型及其使用場景,並重點剖析有序集合SortedSet的實現。

  List的底層實現是類似Linked List雙端鏈表的結構,而不是數組,插入速度快,不需要節點的移動,但不支持隨機訪問,需要順序遍歷到索引所在節點。List有兩個主要的使用場景:

  1. 記住用戶最新發表的博文,每次用戶發表了文章,將文章id使用LPUSH加入到列表中,用戶訪問自己的主頁時,使用LRANGE 0 9獲取最新10條博文(使用LTRIM 0 9可以取出最新10條文章的同時,刪除舊的文章),而不用使用order by sql語句去后端數據庫取數據。
  2. 生產者/消費者模式,生產者往List中加入數據,消費者從List中取數據。當List為空時,消費者rpop返回為NULL,這是會進行輪詢,等待一段時間繼續去取。輪詢模式有如下缺點:
    1. 客戶端和redis耗費cpu和網絡帶寬等資源執行無效命令。
    2. 取回NULL后,sleep會使有新數據時,客戶端消費不夠及時。

  為了解決輪詢的問題,Redis提供了brpop和blpop實現Blocking讀,當List為空時,等待一段時間再返回,當有數據時,按請求順序返回給各客戶端。(當List為空時,可以將請求Blocking讀命令的客戶端加入此List的Blocking讀列表中,有數據時按列表序返回)

  集合Set的底層實現是類似Hash,不過value全為NULL,set有求並、交、差集及隨機取的功能。使用場景如下:

  1. 表示對象之間的聯系,比如求擁有標簽1、2、10的新聞,使用sinter tag:1:news tag:2:news tag:10:news。
  2. 隨機發牌,使用spop,spop隨機返回集合中的元素,比如給每位玩家發五張牌,每位玩家調用五次spop即可,為了下次發牌不需要再將牌加入set,可以在這次發牌前調用sunionstore將牌復制。

  有序集合SortedSet(t_zset.c),集合中的每個值都帶有分數,按分數排序,底層實現較為復雜,用到了ziplist、skiplist和dict數據結構,后文將進行詳細介紹。使用場景如下:

  1. 排行榜問題,比如游戲排行榜,按用戶分數排序,並取top N個用戶。

  在redis中,所有數據類型都被封裝在一個redisObject結構中,用於提供統一的接口,結構如下表1:

 表1 redisObject

redisObject源碼(server.h)
typedef struct redisObject {
    unsigned type:4;//對象類型,用於分辨字符串、列表、
//集合、有序集合、字典,有序集合為REDIS_ZSET
unsigned encoding:4;//編碼,標識底層數據結構,
//有序集合有REDIS_ENCODING_ZIPLIST(壓縮列表)、REDIS_ENCODING_SKIPLIST(跳表)
//記錄鍵最近一次被訪問的時間,長時間不被訪問的對象可被內存回收 unsigned lru:LRU_BITS;

/* LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). */ int refcount;//引用計數,用於對象內存回收,
//當為0時回收內存,引用計數可實現不同鍵相同值的共享,
//事實上,redis會初始化創建0到9999個整數對象用於共享,從而節約內存
void *ptr;//指向底層數據結構實例的指針 } robj;

 

 

   有序列表有壓縮列表ziplist和跳表skiplist兩種實現方式,通過encoding識別,當數據項數目小於zset_max_ziplist_entries(默認為128),且保存的所有元素長度不超過zset_max_ziplist_value(默認為64)時,則用ziplist實現有序集合,否則使用zset結構,zset底層使用skiplist跳表和dict字典。創建有序集合的關鍵代碼如下表2:

表2 創建有序集合

zaddGenericCommand函數
if (server.zset_max_ziplist_entries == 0 ||
server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
        {
            zobj = createZsetObject(); //創建zset
        } else {
            zobj = createZsetZiplistObject();//創建ziplist
        }

 

  ziplist是一個內存連續的特殊雙向鏈表LinkList,減少了內存碎片和指針的占用,用於節省內存,但對ziplist進行操作會導致內存的重新分配,影響性能,故在元素較少時用ziplist。ziplist內存布局如下:

<zlbytes> <zltail> <zllen> <entry> <entry> ... <entry> <zlend>

表3 ziplist在內存中各字節含義

Field
含義
zlbytes(uint32_t)
ziplist占用的內存字節數,包括zlbytes本身
zltail(uint32_t)
最后一個entry的offset偏移值
zllen(uint16_t)
數據項entry的個數
entry(變長)
數據項
zlend(uint8_t)
標識ziplist的結束,值為255

  數據項entry的內存結構如下:<prevlen> <encoding> <entry-data>,當保存的是小整型數據時,entry沒有entry-data域, encoding本身包含了整型元素值。Entry各字節含義如下表4:

表4 entry各Field含義

Field
含義
prevlen
上一個數據項entry的長度。當長度小於254字節,則prevlen占1字節,當長度大於或等於254字節,則prevlen占5字節,首字節值為254,剩下4字節表示上一entry長度。
encoding
encoding的值依賴於數據entry-data。首字節的前兩個bit為00、01、10,標識entry-data為字符串,同時表示encoding的長度分別為1、2、5字節,除前兩個bit,剩下的bit表示字符串長度;前兩個bit為11,表示entry-data為整型,接下來的2 bit表示整數類型。entry-data不同類型及encoding如下:
1)       |00pppppp| - 1 byte,字符串且長度小於等於63字節(6bit)
2)       |01pppppp|qqqqqqqq| - 2 bytes,字符串且長度小於等於16383字節(14bit)
3)       |10000000|qqqqqqqq|rrrrrrrr|ssssssss|tttttttt| - 5 bytes,字符串且長度大於等於16384(后面四個字節表示長度,首字節的低位6bit設為0)
4)       |11000000| - 1 bytes,len字段為1字節,后面的entry-data為整型且類型為int16_t (2 bytes)
5)       |11010000| - 1 bytes, entry-data為整型且類型為int32_t (4 bytes)
6)       |11100000| - 1 bytes, entry-data為整型且類型為int64_t (8 bytes)
7)       |11110000| - 1 bytes, entry-data為整型且占3 bytes
8)       |11111110| - 1 bytes, entry-data為整型且占1 bytes
9)       |1111xxxx| - (with xxxx between 0000 and 1101),xxxx的值從1到13,可用於表示entry-data(1到12),encoding包含entry-data的值,從而不需要entry-data域
10)    |11111111| - 用於標識ziplist的結束
entry-data
具體的數據

  ziplist在內存中的實例如圖1,zibytes占4字節(小端存儲),值為15,表示此ziplist占用內存15字節;zltail占4字節,值為12,表示最后一個數據項entry(這里是5所在的entry),距離ziplist的開頭offset為12字節;entries占2字節,表示數據項數目為2; "00 f3"表示第一個entry(值為2),”00”表示前一個entry的長度為0(prevlen),”f3”對應encoding中的第9種情況(“11110011”),表示數據為整型且值為2;”02 f6”表示第二個entry,”02”表示前一個entry的長度為2(prevlen),”f6”也對應encoding的第9種情況(“11110110”),表示數據為整型且值為6.

圖1 ziplist在內存中的實例

  ziplist在redis中插入數據的源碼及注釋如表5:

表5 ziplist插入數據源碼

ziplist插入邏輯源碼(ziplist.c)

/* Insert item at "p". */

unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {

    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen;

    unsigned int prevlensize, prevlen = 0;

    size_t offset;

    int nextdiff = 0;

    unsigned char encoding = 0;

    long long value = 123456789; /* initialized to avoid warning. Using a value

                                    that is easy to see if for some reason

                                    we use it uninitialized. */

    zlentry tail;

 

    /* Find out prevlen for the entry that is inserted. */

    //插入位置前面一個entry節點占用的字節數prevlen

    if (p[0] != ZIP_END) {//插入節點不在末尾節點,直接從p的前面字節讀

        ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);

    } else {//插入節點在末尾位置,找到末尾節點

        unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);

        if (ptail[0] != ZIP_END) {

            prevlen = zipRawEntryLength(ptail);

        }

    }

    /* See if the entry can be encoded */

    if (zipTryEncoding(s,slen,&value,&encoding)) {//判斷s是否可以轉化為整數,並將整數值和enconding分別存在value和encoding指針

        /* 'encoding' is set to the appropriate integer encoding */

        reqlen = zipIntSize(encoding);//整數值長度

    } else {

        /* 'encoding' is untouched, however zipStoreEntryEncoding will use the

         * string length to figure out how to encode it. */

        reqlen = slen;//字符串長度

    }

 

    /* We need space for both the length of the previous entry and

     * the length of the payload. */

    //得出新插入節點占用的總字節數reqlen

    reqlen += zipStorePrevEntryLength(NULL,prevlen);

    reqlen += zipStoreEntryEncoding(NULL,encoding,slen);

 

    /* When the insert position is not equal to the tail, we need to

     * make sure that the next entry can hold this entry's length in

     * its prevlen field. */

    //插入新節點不在末尾位置,則插入位置p所指向的entry節點的prevlen,

    //值會變成新插入節點的總長度,且prevlen所占用的字節數可能會變化,

    //nextdiff表示新插入節點下一節點的prevlen需要空間的變化,負值表示變小,

    //正值表示擴大

    int forcelarge = 0;

    nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;

    if (nextdiff == -4 && reqlen < 4) {

        nextdiff = 0;

        forcelarge = 1;

    }

 

    /* Store offset because a realloc may change the address of zl. */

    offset = p-zl;

    zl = ziplistResize(zl,curlen+reqlen+nextdiff);//重新分配空間,並將zl的每字節都填充到新分配的內存中

    p = zl+offset;

    //將p后面的數據項進行移動

    /* Apply memory move when necessary and update tail offset. */

    if (p[0] != ZIP_END) {

        /* Subtract one because of the ZIP_END bytes */

        memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);

 

        /* Encode this entry's raw length in the next entry. */

        if (forcelarge)//設置下一個節點的prevlen

            zipStorePrevEntryLengthLarge(p+reqlen,reqlen);

        else

            zipStorePrevEntryLength(p+reqlen,reqlen);

 

        /* Update offset for tail */

        ZIPLIST_TAIL_OFFSET(zl) =

            intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);

 

        /* When the tail contains more than one entry, we need to take

         * "nextdiff" in account as well. Otherwise, a change in the

         * size of prevlen doesn't have an effect on the *tail* offset. */

        zipEntry(p+reqlen, &tail);

        if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {

            ZIPLIST_TAIL_OFFSET(zl) =

                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);

        }

    } else {

        /* This element will be the new tail. */

        ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);

    }

 

    /* When nextdiff != 0, the raw length of the next entry has changed, so

     * we need to cascade the update throughout the ziplist */

    if (nextdiff != 0) {

        offset = p-zl;

        zl = __ziplistCascadeUpdate(zl,p+reqlen);

        p = zl+offset;

    }

 

    /* Write the entry */

    //將新數據項放入插入位置

    p += zipStorePrevEntryLength(p,prevlen);

    p += zipStoreEntryEncoding(p,encoding,slen);

    if (ZIP_IS_STR(encoding)) {

        memcpy(p,s,slen);

    } else {

        zipSaveInteger(p,value,encoding);

    }

    ZIPLIST_INCR_LENGTH(zl,1);

    return zl;

}

 

  zset在redis中的定義如表6:

表6 zset源碼

zset定義(server.h)

typedef struct zset {

    dict *dict;//字典

    zskiplist *zsl;//跳表

} zset;

 

  zset同時使用dict和zskiplist實現有序集合的功能,dict是為了快速獲得指定元素的分值(zscore命令,時間復雜度為O(1)),zskiplist是為了快速范圍查詢(zrank、zrange命令)。本文重點講解跳表的知識。

  skiplist是在有序鏈表的基礎上發展而來,在有序鏈表中進行查找,需要進行順序遍歷,時間復雜度為O(n),同樣,進行插入也需要順序遍歷到插入位置,時間復雜度也為O(n)。

 

圖2 有序鏈表

  利用有序的性質,每兩個節點多加一個指針,指向下下個節點,如圖3所示,新增加的指針可以構成一個新的有序鏈表,新鏈表節點個數只有下層鏈表的一半,當查找元素時,可以從新鏈表開始向右查找,碰到比查找元素大的節點,則回到下一層鏈表查找,比如查找元素20,查找路徑如下圖中標記為紅的路徑(head->8->17->23,23比20大,到下一層查找,17->20),由於新增的指針,查找元素時不需要和每個節點進行比較,需要比較的節點大概為原來的一半。

圖3 雙層有序鏈表

  可以在新產生的鏈表之上,每隔兩個節點,再增加一個指針,從而產生第三層鏈表,如圖4所示,紅色箭頭代表查找路徑,從最上層鏈表開始查找,一次可以跳過四個節點,進一步加快了查找速度。

圖4 多層有向鏈表

 

  skiplist借鑒了多層鏈表的思想,但多層鏈表這種嚴格的2:1關系,會導致插入和刪除節點破壞上下層之間的2:1關系,導致插入位置和刪除位置及后續的所有節點都需要進行調整。skiplist並不采用這種嚴格的2:1對應關系,每個節點的層數采用隨機生成的方法,節點插入例子如下圖5所示,插入節點不會影響其它節點的層數,且只需調整插入節點前后的指針,不需要對所有節點進行調整,降低了插入的復雜度。

圖5 skiplist插入節點過程

  skiplist隨機生成層數level的的代碼如表7:

  表7 隨機生成節點層數

zslRandomLevel函數(t_zset.c)

int zslRandomLevel(void) {

    //隨機生成節點層數,當第i層節點存在時,第i+1節點存在的概率為ZSKIPLIST_P = 1/4

    //ZSKIPLIST_MAXLEVEL 64,表示節點的最大層數

    int level = 1;

    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))

        level += 1;

    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;

}

 

  skiplist時間復雜度為o(),所占用空間的大小依賴於插入元素隨機生成的層數,每個元素level至少為1,層數越高,生成的概率越低,節點的層數服從一定的概率分布,如下:

  1. 節點恰好只有一層的概率為1-p
  2. 節點層數大於等於2的概率為p,恰好等於2的概率為p(1-p)
  3. 節點層數大於等於k的概率為pk-1,恰好等於k的概率為pk-1(1-p)

  每個節點的平均層數計算如下:

  

  平均層數代表每個節點的平均指針數目,在redis中,p=1/4,因此平均指針數目為1.33。

  在redis中skiplist的定義代碼如表8,zskiplist表示跳表, zskiplistNode表示跳表中的節點, zskiplistNode包含了分值,每個節點按分值排序,且節點包含后退指針,用於雙向遍歷。

表8 redis中跳表結構

zskiplist及zskiplistNode(server.h)

/* ZSETs use a specialized version of Skiplists */

typedef struct zskiplistNode {

    sds ele;//實際存儲的數據

    double score;//分值

    struct zskiplistNode *backward;//后退指針,指向前一個節點

    struct zskiplistLevel {

        struct zskiplistNode *forward;//前進指針,指向下一個節點

        unsigned long span;//跨度,表示該層鏈表的這一節點到下一節點跨越的節點數,用於計算rank

    } level[];//層級數組,每個層級都有到下一個節點的指針和跨度

} zskiplistNode;//跳表節點

 

typedef struct zskiplist {

    struct zskiplistNode *header, *tail;//跳表頭節點和尾節點

    unsigned long length;//跳表元素個數

    int level;//跳表的最高層數(不包括頭節點,頭節點實際上並不存儲數據)

} zskiplist;

 

   redis中,zskiplist插入元素的代碼如表9,在查找插入位置的過程中,記下每層需要更新的前一節點在update數組中。

表9 跳表插入節點源代碼

zslInsert(t_zset.c)

zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {

    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;

    unsigned int rank[ZSKIPLIST_MAXLEVEL];

    int i, level;

 

    serverAssert(!isnan(score));

    x = zsl->header;

    for (i = zsl->level-1; i >= 0; i--) {

        /* store rank that is crossed to reach the insert position */

        //rank[i]初始化為rank[i+1],所以rank[i]-rank[i+1]表示在i層走過的節點數

        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];

        while (x->level[i].forward &&

                (x->level[i].forward->score < score ||

                    (x->level[i].forward->score == score &&

                    sdscmp(x->level[i].forward->ele,ele) < 0)))

        {

            rank[i] += x->level[i].span;

            x = x->level[i].forward;

        }

        // 記錄將要和新節點相連接的節點,x表示新節點在i層連接的上一節點

        update[i] = x;

    }

    /* we assume the element is not already inside, since we allow duplicated

     * scores, reinserting the same element should never happen since the

     * caller of zslInsert() should test in the hash table if the element is

     * already inside or not. */

    level = zslRandomLevel();//隨機生成此節點的層數

    if (level > zsl->level) {

        for (i = zsl->level; i < level; i++) {

            rank[i] = 0;

            update[i] = zsl->header;

            update[i]->level[i].span = zsl->length;

        }

        zsl->level = level;

    }

    x = zslCreateNode(level,score,ele);

    for (i = 0; i < level; i++) {

        x->level[i].forward = update[i]->level[i].forward;

        update[i]->level[i].forward = x;

 

        /* update span covered by update[i] as x is inserted here */

        //rank[0]表示0層鏈表,插入節點x左邊的節點數

        //rank[i]表示i層鏈表,插入節點x左邊的節點數

        //rank[0] - rank[i]+1表示i層鏈表,x前一節點到x的跨度

        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);

        update[i]->level[i].span = (rank[0] - rank[i]) + 1;

    }

 

    /* increment span for untouched levels */

    //在level及之上的每層,update[i]到下一節點的距離由於插入了x節點而加1

    for (i = level; i < zsl->level; i++) {

        update[i]->level[i].span++;

    }

    //更新后退指針

    x->backward = (update[0] == zsl->header) ? NULL : update[0];

    if (x->level[0].forward)

        x->level[0].forward->backward = x;

    else

        zsl->tail = x;

    zsl->length++;

    return x;

}

 

  與平衡樹(AVL、紅黑樹)比,skiplist有如下優點,這也是redis使用跳表做有序集合底層結構而不選用平衡樹的原因。

  1. 占用內存少。通過調節概率p,可以使每個節點的平均指針數發生變化,redis中為1.33,而二叉樹每個節點都有兩個指針。
  2. ZRANGE or ZREVRANGE等范圍查詢更簡單。Skiplist可以看作特殊的雙向鏈表,只需找到范圍中的最小節點,順序遍歷即可,而平衡樹找到范圍中的最小節點,仍需中序遍歷。
  3. 和紅黑樹等比,skiplist實現和調試簡單。

參考文獻

  1. An introduction to Redis data types and abstractions
  2. Redis內部數據結構詳解(4)——ziplist
  3. Pugh W. Skip lists: a probabilistic alternative to balanced trees[J]. Communications of the ACM, 1990, 33(6): 668-677.
  4. Redis為什么用跳表而不用平衡樹?
  5. Is there any particular reason you chose skip list instead of btrees except for simplicity? 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM