redis zset底層數據結構


參考:

https://blog.csdn.net/xp178171640/article/details/102977210

https://www.cnblogs.com/lfls/p/7864798.html

https://www.cnblogs.com/aspirant/p/11475295.html

https://www.jianshu.com/p/fb7547369655

 

 

 

 

跳躍表原理

一、跳表的基本概念

1、跳表的定義

跳表(SkipList):增加了向前指針的鏈表叫做指針。跳表全稱叫做跳躍表,簡稱跳表。跳表是一個隨機化的數據結構,實質是一種可以進行二分查找的有序鏈表。跳表在原有的有序鏈表上增加了多級索引,通過索引來實現快速查詢。跳表不僅能提高搜索性能,同時也可以提高插入和刪除操作的性能。

跳表是一個隨機化的數據結構,可以被看做二叉樹的一個變種,它在性能上和紅黑樹、AVL樹不相上下,但是跳表的原理非常簡單,目前在Redis和LevelDB中都有用到。

2、跳表的詳解

說明:本文中的圖片均來自極客時間《數據結構與算法之美專欄》

對於一個單鏈表來說,即使鏈表中的數據是有序的,如果我們想要查找某個數據,也必須從頭到尾的遍歷鏈表,很顯然這種查找效率是十分低效的,時間復雜度為O(n)。

那么我們如何提高查找效率呢?我們可以對鏈表建立一級“索引”,每兩個結點提取一個結點到上一級,我們把抽取出來的那一級叫做索引或者索引層,如下圖所示,down表示down指針。

假設我們現在要查找值為16的這個結點。我們可以先在索引層遍歷,當遍歷索引層中值為13的時候,通過值為13的結點的指針域發現下一個結點值為17,因為鏈表本身有序,所以值為16的結點肯定在13和17這兩個結點之間。然后我們通過索引層結點的down指針,下降到原始鏈表這一層,繼續往后遍歷查找。這個時候我們只需要遍歷2個結點(值為13和16的結點),就可以找到值等於16的這個結點了。如果使用原來的鏈表方式進行查找值為16的結點,則需要遍歷10個結點才能找到,而現在只需要遍歷7個結點即可,從而提高了查找效率。

那么我們可以由此得到啟發,和上面建立第一級索引的方式相似,在第一級索引的基礎上,每兩個一級索引結點就抽到一個結點到第二級索引中。再來查找值為16的結點,只需要遍歷6個結點即可,從而進一步提高了查找效率。

上面舉得例子中的數據量不大,所以即便加了兩級索引,查找的效率提升的也不是很明顯,下面通過一個64結點的鏈表來更加直觀的感受下索引提升查找效率,如圖所示,建立了五級索引。

從圖中我們可以看出來,原來沒有索引的時候,查找62需要遍歷62個結點,現在只需要遍歷11個結點即可,速度提高了很多。那么,如果當鏈表的長度為10000、10000000時,通過構件索引之后,查找的效率就會提升的非常明顯。

3、跳表的時間復雜度

單鏈表的查找時間復雜度為:O(n),下面分析下跳表這種數據結構的查找時間復雜度:

我們首先考慮這樣一個問題,如果鏈表里有n個結點,那么會有多少級索引呢?按照上面講的,每兩個結點都會抽出一個結點作為上一級索引的結點。那么第一級索引的個數大約就是n/2,第二級的索引大約就是n/4,第三級的索引就是n/8,依次類推,也就是說,第k級索引的結點個數是第k-1級索引的結點個數的1/2,那么第k級的索引結點個數為:2/n^{k}

假設索引有h級,最高級的索引有2個結點,通過上面的公式,我們可以得到n/(2^{h}) = 2,從而可得:h = \log_{2}n - 1。如果包含原始鏈表這一層,整個跳表的高度就是\log_{2}n。我們在跳表中查找某個數據的時候,如果每一層都要遍歷m個結點,那么在跳表中查詢一個數據的時間復雜度就為:O(m*logn)。

其實根據前面的分析,我們不難得出m=3,即每一級索引都最多只需要遍歷3個結點,分析如下:

如上圖所示,假如我們要查找的數據是x,在第k級索引中,我們遍歷到y結點之后,發現x大於y,小於y后面的結點z。所以我們通過y的down指針,從第k級索引下降到第k-1級索引。在第k-1級索引中,y和z之間只有3個結點(包含y和z)。所以,我們在k-1級索引中最多需要遍歷3個結點,以此類推,每一級索引都最多只需要遍歷3個結點。

因此,m=3,所以跳表查找任意數據的時間復雜度為O(logn),這個查找的時間復雜度和二分查找是一樣的,但是我們卻是基於單鏈表這種數據結構實現的。不過,天下沒有免費的午餐,這種查找效率的提升是建立在很多級索引之上的,即空間換時間的思想。其具體空間復雜度見下文詳解。【面試題:如何讓鏈表的元素查詢接近線性時間】

4、跳表的空間復雜度

比起單純的單鏈表,跳表就需要額外的存儲空間去存儲多級索引。假設原始鏈表的大小為n,那么第一級索引大約有n/2個結點,第二級索引大約有4/n個結點,依次類推,每上升一級索引結點的個數就減少一半,直到剩下最后2個結點,如下圖所示,其實就是一個等比數列。

這幾級索引結點總和為:n/2 + n/4 + n/8 + ... + 8 + 4 + 2 = n - 2。所以跳表的空間復雜度為O(n)。也就是說如果將包含n個結點的單鏈表構造成跳表,我們需要額外再用接近n個結點的存儲空間。

其實從上面的分析,我們利用空間換時間的思想,已經把時間壓縮到了極致,因為每一級每兩個索引結點就有一個會被抽到上一級的索引結點中,所以此時跳表所需要的額外內存空間最多,即空間復雜度最高。其實我們可以通過改變抽取結點的間距來降低跳表的空間復雜度,在其時間復雜度和空間復雜度方面取一個綜合性能,當然也要看具體情況,如果內存空間足夠,那就可以選擇最小的結點間距,即每兩個索引結點抽取一個結點到上一級索引中。如果想降低跳表的空間復雜度,則可以選擇每三個或者每五個結點,抽取一個結點到上級索引中。

如上圖所示,每三個結點抽取一個結點到上一級索引中,則第一級需要大約n/3個結點,第二級索引大約需要n/9個結點。每往上一級,索引的結點個數就除以3,為了方便計算,我們假設最高一級的索引結點個數為1,則可以得到一個等比數列,去下圖所示:

通過等比數列的求和公式,總的索引結點大約是:n/3 + n /9 + n/27 + ... + 9 + 3 + 1 = n/2。盡管空間復雜度還是O(n),但是比之前的每兩個結點抽一個結點的索引構建方法,可以減少了一半的索引結點存儲空間。

實際上,在軟件開發中,我們不必太在意索引占用的額外空間。在講數據結構的時候,我們習慣性地把要處理的數據看成整數,但是在實際的軟件開發中,原始鏈表中存儲的有可能是很大的對象,而索引結點只需要存儲關鍵值和幾個指針,並不需要存儲對象,所以當對象比索引結點大很多時,那索引占用的額外空間就可以忽略了。

5、跳表的插入

跳表插入的時間復雜度為:O(logn),支持高效的動態插入。

在單鏈表中,一旦定位好要插入的位置,插入結點的時間復雜度是很低的,就是O(1)。但是為了保證原始鏈表中數據的有序性,我們需要先找到要插入的位置,這個查找的操作就會比較耗時。

對於純粹的單鏈表,需要遍歷每個結點,來找到插入的位置。但是對於跳表來說,查找的時間復雜度為O(logn),所以這里查找某個數據應該插入的位置的時間復雜度也是O(logn),如下圖所示:

6、跳表的刪除

跳表的刪除操作時間復雜度為:O(logn),支持動態的刪除。

在跳表中刪除某個結點時,如果這個結點在索引中也出現了,我們除了要刪除原始鏈表中的結點,還要刪除索引中的。因為單鏈表中的刪除操作需要拿到刪除結點的前驅結點,然后再通過指針操作完成刪除。所以在查找要刪除的結點的時候,一定要獲取前驅結點(雙向鏈表除外)。因此跳表的刪除操作時間復雜度即為O(logn)。

7、跳表索引動態更新

當我們不斷地往跳表中插入數據時,我們如果不更新索引,就有可能出現某2個索引節點之間的數據非常多的情況,在極端情況下,跳表還會退化成單鏈表,如下圖所示:

作為一種動態數據結構,我們需要某種手段來維護索引與原始鏈表大小之間的平衡,也就是說,如果鏈表中的結點多了,索引結點就相應地增加一些,避免復雜度退化,以及查找、插入和刪除操作性能的下降。

如果你了解紅黑樹、AVL樹這樣的平衡二叉樹,你就會知道它們是通過左右旋的方式保持左右子樹的大小平衡,而跳表是通過隨機函數來維護“平衡性”。

當我們往跳表中插入數據的時候,我們可以通過一個隨機函數,來決定這個結點插入到哪幾級索引層中,比如隨機函數生成了值K,那我們就將這個結點添加到第一級到第K級這個K級索引中。如下圖中要插入數據為6,K=2的例子:

隨機函數的選擇是非常有講究的,從概率上講,能夠保證跳表的索引大小和數據大小平衡性,不至於性能的過度退化。至於隨機函數的選擇,見下面的代碼實現過程,而且實現過程並不是重點,掌握思想即可。

8、跳表的性質

(1) 由很多層結構組成,level是通過一定的概率隨機產生的;
            (2) 每一層都是一個有序的鏈表,默認是升序 ;
            (3) 最底層(Level 1)的鏈表包含所有元素;
            (4) 如果一個元素出現在Level i 的鏈表中,則它在Level i 之下的鏈表也都會出現; 
            (5) 每個節點包含兩個指針,一個指向同一鏈表中的下一個元素,一個指向下面一層的元素。

 

 

 

 

 

聊聊Mysql索引和redis跳表 ---redis的有序集合zset數據結構底層采用了跳表原理 時間復雜度O(logn)(阿里)

redis使用跳表不用B+數的原因是:redis是內存數據庫,而B+樹純粹是為了mysql這種IO數據庫准備的。B+樹的每個節點的數量都是一個mysql分區頁的大小(阿里面試)

還有個幾個姊妹篇:介紹mysql的B+索引原理 參考:一步步分析為什么B+樹適合作為索引的結構 以及索引原理 (阿里面試)

參考:kafka如何實現高並發存儲-如何找到一條需要消費的數據(阿里)

參考:二分查找法:各種排序算法的時間復雜度和空間復雜度(阿里)

關於mysql 存儲引擎 介紹包括默認的索引方式參考:MySql的多存儲引擎架構, 默認的引擎InnoDB與 MYISAM的區別(滴滴 阿里)

敲黑板:

每級遍歷 3 個結點即可,而跳表的高度為 h ,所以每次查找一個結點時,需要遍歷的結點數為 3*跳表高度 ,所以忽略低階項和系數后的時間復雜度就是 ○(㏒n),空間復雜度是O(n) 

數據結構 實現原理 key查詢方式 查找效率 存儲大小 插入、刪除效率
Hash 哈希表 支持單key 接近O(1) 小,除了數據沒有額外的存儲 O(1)
B+樹 平衡二叉樹擴展而來 單key,范圍,分頁 O(Log(n) 除了數據,還多了左右指針,以及葉子節點指針 O(Log(n),需要調整樹的結構,算法比較復雜
跳表 有序鏈表擴展而來 單key,分頁 O(Log(n) 除了數據,還多了指針,但是每個節點的指針小於<2,所以比B+樹占用空間小 O(Log(n),只用處理鏈表,算法比較簡單

 對LSM結構感興趣的可以看下cassandra vs mongo (1)存儲引擎

問題

如果對以下問題感到困惑或一知半解,請繼續看下去,相信本文一定會對你有幫助

  • mysql 索引如何實現
  • mysql 索引結構B+樹與hash有何區別。分別適用於什么場景
  • 數據庫的索引還能有其他實現嗎
  • redis跳表是如何實現的
  • 跳表和B+樹,LSM樹有和區別呢

解析

首先為什么要把mysql索引和redis跳表放在一起討論呢,因為他們解決的都是同一種問題,用於解決數據集合的查找問題,即根據指定的key,快速查到它所在的位置(或者對應的value)

當你站在這個角度去思考問題時,還會不知道B+樹索引和hash索引的區別嗎

數據集合的查找問題

現在我們將問題領域邊界划分清楚了,就是為了解決數據集合的查找問題。這一塊需要考慮哪些問題呢

  1. 需要支持哪些查找方式,單key/多key/范圍查找,
  2. 插入/刪除效率
  3. 查找效率(即時間復雜度)
  4. 存儲大小(空間復雜度)

我們看下幾種常用的查找結構

hash

 在這里插入圖片描述

hash是key,value形式,通過一個散列函數,能夠根據key快速找到value

關於hash算法 ,這也是阿里的必考題 深度的原理 我寫了幾篇博客:尤其是最后一篇resize ,以及resize之前與之后的hashmap的情況,

      參考:HashMap的實現原理--鏈表散列

      參考:Hashtable數據存儲結構-遍歷規則,Hash類型的復雜度為啥都是O(1)-源碼分析 

      參考:HashMap, HashTable,HashSet,TreeMap 的時間復雜度   

      參考:HashMap底層實現原理/HashMap與HashTable區別/HashMap與HashSet區別 

      參考:ConcurrentHashMap原理分析(1.7與1.8)-put和 get 兩次Hash到達指定的HashEntry 

resize 參考:HashMap多線程並發問題分析-正常和異常的rehash1(阿里)

B+ 樹:

注意 這是關於B+樹的總結,如果你掌握到這個程度 是遠遠不夠的,

請參考詳細的B+樹原理:一步步分析為什么B+樹適合作為索引的結構 以及索引原理 (阿里面試)

B+樹 的數據都在葉子節點,非葉子節點存放 索引

 

在這里插入圖片描述

 

B+樹是在平衡二叉樹基礎上演變過來,為什么我們在算法課上沒學到B+樹和跳表這種結構呢。因為他們都是從工程實踐中得到,在理論的基礎上進行了妥協。

B+樹首先是有序結構,為了不至於樹的高度太高,影響查找效率,在葉子節點上存儲的不是單個數據,而是一頁數據,提高了查找效率,而為了更好的支持范圍查詢,B+樹在葉子節點冗余了非葉子節點數據,為了支持翻頁,葉子節點之間通過指針連接。

跳表  

 

跳表:為什么 Redis 一定要用跳表來實現有序集合? 

上幾篇主要是學習二分查找算法,但是二分查找底層依賴的是數組隨機訪問的特性,所以只能用數組來實現。如果數據存儲在鏈表中,就沒辦法使用二分查找了嗎? 

此時跳表出現了,跳表(Skip list) 實際上就是在鏈表的基礎上改造生成的。 

跳表是一種各方面性能都比較優秀的 動態數據結構,可以支持快速的插入、刪除、查找操作,寫起來也不復雜,甚至可以替代 紅黑樹??。 

Redis 一共有5種數據結構,包括:

 

1、字符串(String)
redis對於KV的操作效率很高,可以直接用作計數器。例如,統計在線人數等等,另外string類型是二進制存儲安全的,所以也可以使用它來存儲圖片,甚至是視頻等。

2、哈希(hash)
存放鍵值對,一般可以用來存某個對象的基本屬性信息,例如,用戶信息,商品信息等,另外,由於hash的大小在小於配置的大小的時候使用的是ziplist結構,比較節約內存,所以針對大量的數據存儲可以考慮使用hash來分段存儲來達到壓縮數據量,節約內存的目的,例如,對於大批量的商品對應的圖片地址名稱。比如:商品編碼固定是10位,可以選取前7位做為hash的key,后三位作為field,圖片地址作為value。這樣每個hash表都不超過999個,只要把redis.conf中的hash-max-ziplist-entries改為1024,即可。
3、列表(List)
列表類型,可以用於實現消息隊列,也可以使用它提供的range命令,做分頁查詢功能。

 

4、集合(Set)
集合,整數的有序列表可以直接使用set。可以用作某些去重功能,例如用戶名不能重復等,另外,還可以對集合進行交集,並集操作,來查找某些元素的共同點

 

5、有序集合(zset)
有序集合,可以使用范圍查找,排行榜功能或者topN功能。

其中第五個zset 有序集合 就是用跳表來實現的。那 Redis 為什么會選擇用跳表來實現有序集合呢?  

一、如何理解跳表? 

對於單鏈表來說,我們查找某個數據,只能從頭到尾遍歷鏈表,此時時間復雜度是 ○(n)。 

 
單鏈表 

那么怎么提高單鏈表的查找效率呢?看下圖,對鏈表建立一級 索引,每兩個節點提取一個結點到上一級,被抽出來的這級叫做 索引 或 索引層。 

 
第一級索引 

開發中經常會用到一種處理方式,hashmap 中存儲的值類型是一個 list,這里就可以把索引當做 hashmap 中的鍵,將每 2 個結點看成每個鍵對應的值 list。 

所以要找到13,就不需要將16前的結點全遍歷一遍,只需要遍歷索引,找到13,然后發現下一個結點是17,那么16一定是在 [13,17] 之間的,此時在13位置下降到原始鏈表層,找到16,加上一層索引后,查找一個結點需要遍歷的結點個數減少了,也就是說查找效率提高了 

那么我們再加一級索引呢?
跟前面建立一級索引的方式相似,我們在第一級索引的基礎上,每兩個結點就抽出一個結點到第二級索引。此時再查找16,只需要遍歷 6 個結點了,需要遍歷的結點數量又減少了。 

 
第二級索引 

當結點數量多的時候,這種添加索引的方式,會使查詢效率提高的非常明顯、

 
這種鏈表加多級索引的結構,就是跳表。 

二、用跳表查詢到底有多快 

在一個單鏈表中,查詢某個數據的時間復雜度是 ○(n),那在一個具有多級索引的跳表中,查詢某個數據的時間復雜度是多少呢? 

按照上面的示例,每兩個節點就抽出一個一級索引,每兩個一級索引又抽出一個二級索引,所以第一級索引的結點個數大約就是 n/2,第二級索引的結點個數就是 n/4,第 k 級索引的結點個數就是 n/2^k。 

假設一共建立了 h 級索引,最高級的索引有兩個節點(如果最高級索引只有一個結點,那么這一級索引起不到判斷區間的作用,那么是沒什么意義的),所以有: 

 
時間復雜度的分析 
 
每級遍歷多少個結點 

根據上圖得知,每級遍歷 3 個結點即可,而跳表的高度為 h ,所以每次查找一個結點時,需要遍歷的結點數為 3*跳表高度 ,所以忽略低階項和系數后的時間復雜度就是 ○(㏒n) 

其實此時就相當於基於單鏈表實現了二分查找。但是這種查詢效率的提升,由於建立了很多級索引,會不會很浪費內存呢? 

三、跳表是不是很浪費內存? 

來分析一下跳表的空間復雜度。 為O(n)

 
每層索引結點數 
 
空間復雜度 

所以如果將包含 n 個結點的單鏈表構造成跳表,我們需要額外再用接近 n 個結點的存儲空間,那怎么才能降低索引占用的內存空間呢? 

前面是每兩個結點抽一個結點到上級索引,如果我們每三個,或每五個結點,抽一個結點到上級索引,是不是就不用那么多索引結點了呢? 

 
每三個結點抽取一個上級索引 

計算空間復雜度的過程與前面的一致,盡管最后空間復雜度依然是 ○(n),但我們知道,使用大○表示法忽略的低階項或系數,實際上同樣會產生影響,只不過我們為了關注高階項而將它們忽略。 

 
空間復雜度 

實際上,在實際開發中,我們不需要太在意索引占據的額外空間,在學習數據結構與算法時,我們習慣的將待處理數據看成整數,但是實際開發中,原始鏈表中存儲的很可能是很大的對象,而索引結點只需要存儲關鍵值(用來比較的值)和幾個指針(找到下級索引的指針),並不需要存儲原始鏈表中完整的對象,所以當對象比索引結點大很多時,那索引占用的額外空間就可以忽略了。 

四、高效的動態插入和刪除 

跳表這個動態數據結構,不僅支持查找操作,還支持動態的插入、刪除操作,而且插入、刪除操作的時間復雜度也是 ○(㏒n)。 

對於單純的單鏈表,需要遍歷每個結點來找到插入的位置。但是對於跳表來說,因為其查找某個結點的時間復雜度是 ○(㏒n),所以這里查找某個數據應該插入的位置,時間復雜度也是 ○(㏒n)。 

 
插入操作 

那么刪除操作呢? 

 
刪除操作 

五、跳表索引動態更新 

當我們不停的往跳表中插入數據時,如果我們不更新索引,就可能出現某 2 個索引結點之間數據非常多的情況。極端情況下,跳表會退化成單鏈表。 

 
作為一種動態數據結構,我們需要某種手段來維護索引與原始鏈表大小之間的平滑,也就是說如果鏈表中結點多了,索引結點就相應地增加一些,避免復雜度退化,以及查找、插入、刪除操作性能下降。

跳表是通過隨機函數來維護前面提到的 平衡性。 

我們往跳表中插入數據的時候,可以選擇同時將這個數據插入到第幾級索引中,比如隨機函數生成了值 K,那我們就將這個結點添加到第一級到第 K 級這 K 級索引中。 

 
隨機函數可以保證跳表的索引大小和數據大小的平衡性,不至於性能過度退化。

跳表的實現有點復雜,並且跳表的實現並不是這篇的重點。主要是學習思路。 

六、解答開篇 

Redis 中的有序集合是通過跳表來實現的,嚴格點講,還用到了散列表(關於散列表),如果查看 Redis 開發手冊,會發現 Redis 中的有序集合支持的核心操作主要有下面這幾個: 

  • 插入一個數據
  • 刪除一個數據
  • 查找一個數據
  • 按照區間查找數據(比如查找在[100,356]之間的數據)
  • 迭代輸出有序序列 

其中,插入、查找、刪除以及迭代輸出有序序列這幾個操作,紅黑樹也能完成,時間復雜度和跳表是一樣的,但是,按照區間來查找數據這個操作,紅黑樹的效率沒有跳表高。 

對於按照區間查找數據這個操作,跳表可以做到 ○(㏒n) 的時間復雜度定位區間的起點,然后在原始鏈表中順序往后遍歷就可以了。這樣做非常高效。 

當然,還有其他原因,比如,跳表代碼更容易實現,可讀性好不易出錯。跳表更加靈活,可以通過改變索引構建策略,有效平衡執行效率和內存消耗。 

不過跳表也不能完全替代紅黑樹。因為紅黑樹出現的更早一些。很多編程語言中的 Map 類型都是用紅黑樹來實現的。寫業務的時候直接用就行,但是跳表沒有現成的實現,開發中想用跳表,得自己實現。 

參考:Redis詳解(四)------ redis的底層數據結構

參考:聊聊Mysql索引和redis跳表

參考:redis的五種數據結構原理分析

 
 

 

 

 

 

 

 

redis zset底層數據結構

zset底層存儲結構

 zset底層的存儲結構包括ziplist或skiplist,在同時滿足以下兩個條件的時候使用ziplist,其他時候使用skiplist,兩個條件如下:

  • 有序集合保存的元素數量小於128個
  • 有序集合保存的所有元素的長度小於64字節

 當ziplist作為zset的底層存儲結構時候,每個集合元素使用兩個緊挨在一起的壓縮列表節點來保存,第一個節點保存元素的成員,第二個元素保存元素的分值。

 當skiplist作為zset的底層存儲結構的時候,使用skiplist按序保存元素及分值,使用dict來保存元素和分值的映射關系。

ziplist數據結構

 ziplist作為zset的存儲結構時,格式如下圖,細節就不多說了,我估計大家都看得懂,緊挨着的是元素memeber和分值socore,整體數據是有序格式。


 
skiplist數據結構

 skiplist作為zset的存儲結構,整體存儲結構如下圖,核心點主要是包括一個dict對象和一個skiplist對象。dict保存key/value,key為元素,value為分值;skiplist保存的有序的元素列表,每個元素包括元素和分值。兩種數據結構下的元素指向相同的位置。


 
 

skiplist的源碼格式

 zset包括dict和zskiplist兩個數據結構,其中dict的保存key/value,便於通過key(元素)獲取score(分值)。zskiplist保存有序的元素列表,便於執行range之類的命令。

/* * 有序集合 */ typedef struct zset { // 字典,鍵為成員,值為分值 // 用於支持 O(1) 復雜度的按成員取分值操作 dict *dict; // 跳躍表,按分值排序成員 // 用於支持平均復雜度為 O(log N) 的按分值定位成員操作 // 以及范圍操作 zskiplist *zsl; } zset; 

 zskiplist作為skiplist的數據結構,包括指向頭尾的header和tail指針,其中level保存的是skiplist的最大的層數。

/* * 跳躍表 */ typedef struct zskiplist { // 表頭節點和表尾節點 struct zskiplistNode *header, *tail; // 表中節點的數量 unsigned long length; // 表中層數最大的節點的層數 int level; } zskiplist; 

 skiplist跳躍列表中每個節點的數據格式,每個節點有保存數據的robj指針,分值score字段,后退指針backward便於回溯,zskiplistLevel的數組保存跳躍列表每層的指針。

/* * 跳躍表節點 */ typedef struct zskiplistNode { // 成員對象 robj *obj; // 分值 double score; // 后退指針 struct zskiplistNode *backward; // 層 struct zskiplistLevel { // 前進指針 struct zskiplistNode *forward; // 跨度 unsigned int span; } level[]; } zskiplistNode; 

 

zset存儲過程

 zset的添加過程我們以zadd的操作作為例子進行分析,整個過程如下:

  • 解析參數得到每個元素及其對應的分值
  • 查找key對應的zset是否存在不存在則創建
  • 如果存儲格式是ziplist,那么在執行添加的過程中我們需要區分元素存在和不存在兩種情況,存在情況下先刪除后添加;不存在情況下則添加並且需要考慮元素的長度是否超出限制或實際已有的元素個數是否超過最大限制進而決定是否轉為skiplist對象。
  • 如果存儲格式是skiplist,那么在執行添加的過程中我們需要區分元素存在和不存在兩種情況,存在的情況下先刪除后添加,不存在情況下那么就直接添加,在skiplist當中添加完以后我們同時需要更新dict的對象。
void zaddGenericCommand(redisClient *c, int incr) { static char *nanerr = "resulting score is not a number (NaN)"; robj *key = c->argv[1]; robj *ele; robj *zobj; robj *curobj; double score = 0, *scores = NULL, curscore = 0.0; int j, elements = (c->argc-2)/2; int added = 0, updated = 0; // 輸入的 score - member 參數必須是成對出現的 if (c->argc % 2) { addReply(c,shared.syntaxerr); return; } // 取出所有輸入的 score 分值 scores = zmalloc(sizeof(double)*elements); for (j = 0; j < elements; j++) { if (getDoubleFromObjectOrReply(c,c->argv[2+j*2],&scores[j],NULL) != REDIS_OK) goto cleanup; } // 取出有序集合對象 zobj = lookupKeyWrite(c->db,key); if (zobj == NULL) { // 有序集合不存在,創建新有序集合 if (server.zset_max_ziplist_entries == 0 || server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr)) { zobj = createZsetObject(); } else { zobj = createZsetZiplistObject(); } // 關聯對象到數據庫 dbAdd(c->db,key,zobj); } else { // 對象存在,檢查類型 if (zobj->type != REDIS_ZSET) { addReply(c,shared.wrongtypeerr); goto cleanup; } } // 處理所有元素 for (j = 0; j < elements; j++) { score = scores[j]; // 有序集合為 ziplist 編碼 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { unsigned char *eptr; // 查找成員 ele = c->argv[3+j*2]; if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) { // 成員已存在 // ZINCRYBY 命令時使用 if (incr) { score += curscore; if (isnan(score)) { addReplyError(c,nanerr); goto cleanup; } } // 執行 ZINCRYBY 命令時, // 或者用戶通過 ZADD 修改成員的分值時執行 if (score != curscore) { // 刪除已有元素 zobj->ptr = zzlDelete(zobj->ptr,eptr); // 重新插入元素 zobj->ptr = zzlInsert(zobj->ptr,ele,score); // 計數器 server.dirty++; updated++; } } else { // 元素不存在,直接添加 zobj->ptr = zzlInsert(zobj->ptr,ele,score); // 查看元素的數量, // 看是否需要將 ZIPLIST 編碼轉換為有序集合 if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries) zsetConvert(zobj,REDIS_ENCODING_SKIPLIST); // 查看新添加元素的長度 // 看是否需要將 ZIPLIST 編碼轉換為有序集合 if (sdslen(ele->ptr) > server.zset_max_ziplist_value) zsetConvert(zobj,REDIS_ENCODING_SKIPLIST); server.dirty++; added++; } // 有序集合為 SKIPLIST 編碼 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplistNode *znode; dictEntry *de; // 編碼對象 ele = c->argv[3+j*2] = tryObjectEncoding(c->argv[3+j*2]); // 查看成員是否存在 de = dictFind(zs->dict,ele); if (de != NULL) { // 成員存在 // 取出成員 curobj = dictGetKey(de); // 取出分值 curscore = *(double*)dictGetVal(de); // ZINCRYBY 時執行 if (incr) { score += curscore; if (isnan(score)) { addReplyError(c,nanerr); goto cleanup; } } // 執行 ZINCRYBY 命令時, // 或者用戶通過 ZADD 修改成員的分值時執行 if (score != curscore) { // 刪除原有元素 redisAssertWithInfo(c,curobj,zslDelete(zs->zsl,curscore,curobj)); // 重新插入元素 znode = zslInsert(zs->zsl,score,curobj); incrRefCount(curobj); /* Re-inserted in skiplist. */ // 更新字典的分值指針 dictGetVal(de) = &znode->score; /* Update score ptr. */ server.dirty++; updated++; } } else { // 元素不存在,直接添加到跳躍表 znode = zslInsert(zs->zsl,score,ele); incrRefCount(ele); /* Inserted in skiplist. */ // 將元素關聯到字典 redisAssertWithInfo(c,NULL,dictAdd(zs->dict,ele,&znode->score) == DICT_OK); incrRefCount(ele); /* Added to dictionary. */ server.dirty++; added++; } } else { redisPanic("Unknown sorted set encoding"); } } if (incr) /* ZINCRBY */ addReplyDouble(c,score); else /* ZADD */ addReplyLongLong(c,added); cleanup: zfree(scores); if (added || updated) { signalModifiedKey(c->db,key); notifyKeyspaceEvent(REDIS_NOTIFY_ZSET, incr ? "zincr" : "zadd", key, c->db->id); } } 

 

參考文章

漫畫算法:什么是跳躍表



作者:晴天哥_374
鏈接:https://www.jianshu.com/p/fb7547369655
來源:簡書
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。
 

 

 

 

 

 

 

 

redis源碼分析之有序集SortedSet

原文地址:http://www.jianshu.com/p/75ca5a359f9f

一、有序集SortedSet命令簡介

redis中的有序集,允許用戶使用指定值對放進去的元素進行排序,並且基於該已排序的集合提供了一系列豐富的操作集合的API。
舉例如下:

//添加元素,table1為有序集的名字,100為用於排序字段(redis把它叫做score),a為我們要存儲的元素 127.0.0.1:6379> zadd table1 100 a (integer) 1 127.0.0.1:6379> zadd table1 200 b (integer) 1 127.0.0.1:6379> zadd table1 300 c (integer) 1 //按照元素索引返回有序集中的元素,索引從0開始 127.0.0.1:6379> zrange table1 0 1 1) "a" 2) "b" //按照元素排序范圍返回有序集中的元素,這里用於排序的字段在redis中叫做score 127.0.0.1:6379> zrangebyscore table1 150 400 1) "b" 2) "c" //刪除元素 127.0.0.1:6379> zrem table1 b (integer) 1 

在有序集中,用於排序的值叫做score,實際存儲的值叫做member。

由於有序集中提供的API較多,這里只舉了幾個常見的,具體可以參考redis文檔。

關於有序集,我們有一個十分常見的使用場景就是用戶評論。在APP或者網站上發布一條消息,下面會有很多評論,通常展示是按照發布時間倒序排列,這個需求就可以使用有序集,以發布評論的時間戳作為score,然后按照展示評論的數量倒序查找有序集。

二、有序集SortedSet命令源碼分析

老規矩,我們還是從server.c文件中的命令表中找到相關命令的處理函數,然后一一分析。
依舊從添加元素開始,zaddCommand函數:

void zaddCommand(client *c) { zaddGenericCommand(c,ZADD_NONE); } 

這里可以看到流程轉向了zaddGenericCommand,並且傳入了一個模式標記。
關於SortedSet的操作模式這里簡單說明一下,先來看一條完整的zadd命令:

zadd key [NX|XX] [CH] [INCR] score member [score member ...] 

其中的可選項我們依次看下:

  1. NX表示如果元素存在,則不執行替換操作直接返回。
  2. XX表示只操作已存在的元素。
  3. CH表示返回修改(包括添加,更新)元素的數量,只能被ZADD命令使用。
  4. INCR表示在原來的score基礎上加上新的score,而不是替換。

上面代碼片段中的ZADD_NONE表示普通操作。

接下來看下zaddGenericCommand函數的源碼,很長,耐心一點點看:

void zaddGenericCommand(client *c, int flags) { //一條錯誤提示信息 static char *nanerr = "resulting score is not a number (NaN)"; //有序集名字 robj *key = c->argv[1]; robj *zobj; sds ele; double score = 0, *scores = NULL; int j, elements; int scoreidx = 0; //記錄元素操作個數 int added = 0; int updated = 0; int processed = 0; //查找score的位置,默認score在位置2上,但由於有各種模式,所以需要判斷 scoreidx = 2; while(scoreidx < c->argc) { char *opt = c->argv[scoreidx]->ptr; //判斷命令中是否設置了各種模式 if (!strcasecmp(opt,"nx")) flags |= ZADD_NX; else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX; else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH; else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR; else break; scoreidx++; } //設置模式 int incr = (flags & ZADD_INCR) != 0; int nx = (flags & ZADD_NX) != 0; int xx = (flags & ZADD_XX) != 0; int ch = (flags & ZADD_CH) != 0; //通過上面的解析,scoreidx為真實的初始score的索引位置 //這里客戶端參數數量減去scoreidx就是剩余所有元素的數量 elements = c->argc - scoreidx; //由於有序集中score,member成對出現,所以加一層判斷 if (elements % 2 || !elements) { addReply(c,shared.syntaxerr); return; } //這里計算score,member有多少對 elements /= 2; //參數合法性校驗 if (nx && xx) { addReplyError(c, "XX and NX options at the same time are not compatible"); return; } //參數合法性校驗 if (incr && elements > 1) { addReplyError(c, "INCR option supports a single increment-element pair"); return; } //這里開始解析score,先初始化scores數組 scores = zmalloc(sizeof(double)*elements); for (j = 0; j < elements; j++) { //填充數組,這里注意元素是成對出現,所以各個score之間要隔一個member if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL) != C_OK) goto cleanup; } //這里首先在client對應的db中查找該key,即有序集 zobj = lookupKeyWrite(c->db,key); if (zobj == NULL) { //沒有指定有序集且模式為XX(只操作已存在的元素),直接返回 if (xx) goto reply_to_client; //根據元素數量選擇不同的存儲結構初始化有序集 if (server.zset_max_ziplist_entries == 0 || server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr)) { //哈希表 + 跳表的組合模式 zobj = createZsetObject(); } else { //ziplist(壓縮鏈表)模式 zobj = createZsetZiplistObject(); } //加入db中 dbAdd(c->db,key,zobj); } else { //如果ZADD操作的集合類型不對,則返回 if (zobj->type != OBJ_ZSET) { addReply(c,shared.wrongtypeerr); goto cleanup; } } //這里開始往有序集中添加元素 for (j = 0; j < elements; j++) { double newscore; //取出client傳過來的score score = scores[j]; int retflags = flags; //取出與之對應的member ele = c->argv[scoreidx+1+j*2]->ptr; //向有序集中添加元素,參數依次是有序集,要添加的元素的score,要添加的元素,操作模式,新的score int retval = zsetAdd(zobj, score, ele, &retflags, &newscore); //添加失敗則返回 if (retval == 0) { addReplyError(c,nanerr); goto cleanup; } //記錄操作 if (retflags & ZADD_ADDED) added++; if (retflags & ZADD_UPDATED) updated++; if (!(retflags & ZADD_NOP)) processed++; //設置新score值 score = newscore; } //操作記錄 server.dirty += (added+updated); //返回邏輯 reply_to_client: if (incr) { if (processed) addReplyDouble(c,score); else addReply(c,shared.nullbulk); } else { addReplyLongLong(c,ch ? added+updated : added); } //清理邏輯 cleanup: zfree(scores); if (added || updated) { signalModifiedKey(c->db,key); notifyKeyspaceEvent(NOTIFY_ZSET, incr ? "zincr" : "zadd", key, c->db->id); } } 

代碼有點長,來張圖看一下存儲結構:
有序集存儲結構
注:每個entry都是由score+member組成

有了上面的結構圖以后,可以想到刪除操作應該就是根據不同的存儲結構進行,如果是ziplist就執行鏈表刪除,如果是哈希表+跳表結構,那就要把兩個集合都進行刪除。真實邏輯是什么呢?
我們來看下刪除函數zremCommand的源碼,相對短一點:

void zremCommand(client *c) { //獲取有序集名 robj *key = c->argv[1]; robj *zobj; int deleted = 0, keyremoved = 0, j; //做校驗 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL || checkType(c,zobj,OBJ_ZSET)) return; for (j = 2; j < c->argc; j++) { //一次刪除指定元素 if (zsetDel(zobj,c->argv[j]->ptr)) deleted++; //如果有序集中全部元素都被刪除,則回收有序表 if (zsetLength(zobj) == 0) { dbDelete(c->db,key); keyremoved = 1; break; } } //同步操作 if (deleted) { notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id); if (keyremoved) notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); signalModifiedKey(c->db,key); server.dirty += deleted; } //返回 addReplyLongLong(c,deleted); } 

看下具體的刪除操作源碼:

//參數zobj為有序集,ele為要刪除的元素 int zsetDel(robj *zobj, sds ele) { //與添加元素相同,根據不同的存儲結構執行不同的刪除邏輯 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *eptr; //ziplist是一個簡單的鏈表刪除節點操作 if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) { zobj->ptr = zzlDelete(zobj->ptr,eptr); return 1; } } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; dictEntry *de; double score; de = dictUnlink(zs->dict,ele); if (de != NULL) { //查詢該元素的score score = *(double*)dictGetVal(de); //從哈希表中刪除元素 dictFreeUnlinkedEntry(zs->dict,de); //從跳表中刪除元素 int retval = zslDelete(zs->zsl,score,ele,NULL); serverAssert(retval); //如果有需要則對哈希表進行resize操作 if (htNeedsResize(zs->dict)) dictResize(zs->dict); return 1; } } else { serverPanic("Unknown sorted set encoding"); } //沒有找到指定元素返回0 return 0; } 

最后看一個查詢函數zrangeCommand源碼,也是很長,汗~~~,不過放心,有了上面的基礎,大致也能猜到查詢邏輯應該是什么樣子的:

void zrangeCommand(client *c) { //第二個參數,0表示順序,1表示倒序 zrangeGenericCommand(c,0); } void zrangeGenericCommand(client *c, int reverse) { //有序集名 robj *key = c->argv[1]; robj *zobj; int withscores = 0; long start; long end; int llen; int rangelen; //參數校驗 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) || (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return; //根據參數附加信息判斷是否需要返回score if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) { withscores = 1; } else if (c->argc >= 5) { addReply(c,shared.syntaxerr); return; } //有序集校驗 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL || checkType(c,zobj,OBJ_ZSET)) return; //索引值重置 llen = zsetLength(zobj); if (start < 0) start = llen+start; if (end < 0) end = llen+end; if (start < 0) start = 0; //返回空集 if (start > end || start >= llen) { addReply(c,shared.emptymultibulk); return; } if (end >= llen) end = llen-1; rangelen = (end-start)+1; //返回給客戶端結果長度 addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen); //同樣是根據有序集的不同結構執行不同的查詢邏輯 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *zl = zobj->ptr; unsigned char *eptr, *sptr; unsigned char *vstr; unsigned int vlen; long long vlong; //根據正序還是倒序計算起始索引 if (reverse) eptr = ziplistIndex(zl,-2-(2*start)); else eptr = ziplistIndex(zl,2*start); serverAssertWithInfo(c,zobj,eptr != NULL); sptr = ziplistNext(zl,eptr); while (rangelen--) { serverAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL); //注意嵌套的ziplistGet方法就是把eptr索引的值讀出來保存在后面三個參數中 serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong)); //返回value if (vstr == NULL) addReplyBulkLongLong(c,vlong); else addReplyBulkCBuffer(c,vstr,vlen); //如果需要則返回score if (withscores) addReplyDouble(c,zzlGetScore(sptr)); //倒序從后往前,正序從前往后 if (reverse) zzlPrev(zl,&eptr,&sptr); else zzlNext(zl,&eptr,&sptr); } } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplist *zsl = zs->zsl; zskiplistNode *ln; sds ele; //找到起始節點 if (reverse) { ln = zsl->tail; if (start > 0) ln = zslGetElementByRank(zsl,llen-start); } else { ln = zsl->header->level[0].forward; if (start > 0) ln = zslGetElementByRank(zsl,start+1); } //遍歷並返回給客戶端 while(rangelen--) { serverAssertWithInfo(c,zobj,ln != NULL); ele = ln->ele; addReplyBulkCBuffer(c,ele,sdslen(ele)); if (withscores) addReplyDouble(c,ln->score); ln = reverse ? ln->backward : ln->level[0].forward; } } else { serverPanic("Unknown sorted set encoding"); } } 

上面就是關於有序集SortedSet的添加,刪除,查找的源碼。可以看出SortedSet會根據存放元素的數量選擇ziplist或者哈希表+跳表兩種數據結構進行實現,之所以源碼看上去很長,主要原因也就是要根據不同的數據結構進行不同的代碼實現。只要掌握了這個核心思路,再看源碼就不會太難。

三、有序集SortedSet命令總結

有序集的邏輯不難,就是代碼有點長,涉及到ziplist,skiplist,dict三套數據結構,其中除了常規的dict之外,另外兩個數據結構內容都不少,准備專門寫文章進行總結,就不在這里贅述了。本文主要目的是總結一下有序集SortedSet的實現原理。

 
 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM