Redis之壓縮列表ziplist


Redis之壓縮列表ziplist 

 

Redis是基於內存的nosql,有些場景下為了節省內存redis會用“時間”換“空間”。
ziplist就是很典型的例子。

ziplist是list鍵、hash鍵以及zset鍵的底層實現之一(3.0之后list鍵已經不直接用ziplist和linkedlist作為底層實現了,取而代之的是quicklist)
這些鍵的常規底層實現如下:

  • list鍵:雙向鏈表
  • hash鍵:字典dict
  • zset鍵:跳躍表zskiplist

但是當list鍵里包含的元素較少、並且每個元素要么是小整數要么是長度較小的字符串時,redis將會用ziplist作為list鍵的底層實現。同理hash和zset在這種場景下也會使用ziplist。

既然已有底層結構可以實現list、hash、zset鍵,為什么還要用ziplist呢?
當然是為了節省內存空間
我們先來看看ziplist是如何壓縮的

原理

整體布局

ziplist是由一系列特殊編碼的連續內存塊組成的順序存儲結構,類似於數組,ziplist在內存中是連續存儲的,但是不同於數組,為了節省內存 ziplist的每個元素所占的內存大小可以不同(數組中叫元素,ziplist叫節點entry,下文都用“節點”),每個節點可以用來存儲一個整數或者一個字符串。
下圖是ziplist在內存中的布局

 

  • zlbytes: ziplist的長度(單位: 字節),是一個32位無符號整數
  • zltail: ziplist最后一個節點的偏移量,反向遍歷ziplist或者pop尾部節點的時候有用。
  • zllen: ziplist的節點(entry)個數
  • entry: 節點
  • zlend: 值為0xFF,用於標記ziplist的結尾

普通數組的遍歷是根據數組里存儲的數據類型 找到下一個元素的,例如int類型的數組訪問下一個元素時每次只需要移動一個sizeof(int)就行(實際上開發者只需讓指針p+1就行,在這里引入sizeof(int)只是為了說明區別)。
上文說了,ziplist的每個節點的長度是可以不一樣的,而我們面對不同長度的節點又不可能直接sizeof(entry),那么它是怎么訪問下一個節點呢?
ziplist將一些必要的偏移量信息記錄在了每一個節點里,使之能跳到上一個節點或下一個節點。
接下來我們看看節點的布局

節點的布局(entry)

每個節點由三部分組成:prevlength、encoding、data

  • prevlengh: 記錄上一個節點的長度,為了方便反向遍歷ziplist
  • encoding: 當前節點的編碼規則,下文會詳細說
  • data: 當前節點的值,可以是數字或字符串 

為了節省內存,根據上一個節點的長度prevlength 可以將ziplist節點分為兩類:

 

  • entry的前8位小於254,則這8位就表示上一個節點的長度
  • entry的前8位等於254,則意味着上一個節點的長度無法用8位表示,后面32位才是真實的prevlength。用254 不用255(11111111)作為分界是因為255是zlend的值,它用於判斷ziplist是否到達尾部。

根據當前節點存儲的數據類型及長度,可以將ziplist節點分為9類:
其中整數節點分為6類: 

 整數節點的encoding的長度為8位,其中高2位用來區分整數節點和字符串節點(高2位為11時是整數節點),低6位用來區分整數節點的類型,定義如下:

#define ZIP_INT_16B (0xc0 | 0<<4)//整數data,占16位(2字節)
 
#define ZIP_INT_32B (0xc0 | 1<<4)//整數data,占32位(4字節)
 
#define ZIP_INT_64B (0xc0 | 2<<4)//整數data,占64位(8字節)
 
#define ZIP_INT_24B (0xc0 | 3<<4)//整數data,占24位(3字節)
 
#define ZIP_INT_8B 0xfe //整數data,占8位(1字節)
 
/* 4 bit integer immediate encoding */
 
//整數值1~13的節點沒有data,encoding的低四位用來表示data
 
#define ZIP_INT_IMM_MASK 0x0f
 
#define ZIP_INT_IMM_MIN 0xf1 /* 11110001 */
 
#define ZIP_INT_IMM_MAX 0xfd /* 11111101 */

值得注意的是 最后一種encoding是存儲整數0~12的節點的encoding,它沒有額外的data部分,encoding的高4位表示這個類型,低4位就是它的data。這種類型的節點的encoding大小介於ZIP_INT_24B與ZIP_INT_8B之間(1~13),但是為了表示整數0,取出低四位xxxx之后會將其-1作為實際的data值(0~12)。在函數zipLoadInteger中,我們可以看到這種類型節點的取值方法:

...
 
} else if (encoding >= ZIP_INT_IMM_MIN && encoding <= ZIP_INT_IMM_MAX) {
 
ret = (encoding & ZIP_INT_IMM_MASK)-1;
 
}
 
...

字符串節點分為3類:

  • 當data小於63字節時(2^6),節點存為上圖的第一種類型,高2位為00,低6位表示data的長度。
  • 當data小於16383字節時(2^14),節點存為上圖的第二種類型,高2位為01,后續14位表示data的長度。
  • 當data小於4294967296字節時(2^32),節點存為上圖的第二種類型,高2位為10,下一字節起連續32位表示data的長度。

上圖可以看出:
不同於整數節點encoding永遠是8位,字符串節點的encoding可以有8位、16位、40位三種長度
相同encoding類型的整數節點 data長度是固定的,但是相同encoding類型的字符串節點,data長度取決於encoding后半部分的值。

 
#define ZIP_STR_06B (0 << 6)//字符串data,最多有2^6字節(encoding后半部分的length有6位,length決定data有多少字節)
 
#define ZIP_STR_14B (1 << 6)//字符串data,最多有2^14字節
 
#define ZIP_STR_32B (2 << 6)//字符串data,最多有2^32字節

上文介紹了ziplist節點(entry)的分類,知道了節點可以細分為9種類型,那么當遍歷一個ziplist時,指針到達某個節點時 如何判斷出節點的類型從而找到data呢?

已知節點的位置,求data的值

根據圖2 entry布局 可以看出,若要算出data的偏移量,得先計算出prevlength所占內存大小(1字節和5字節):

//根據ptr指向的entry,返回這個entry的prevlensize
 
#define ZIP_DECODE_PREVLENSIZE(ptr, prevlensize) do { \
 
if ((ptr)[0] < ZIP_BIGLEN) { \
 
(prevlensize) = 1; \
 
} else { \
 
(prevlensize) = 5; \
 
} \
 
} while(0);

接着再用ZIP_DECODE_LENGTH(ptr + prevlensize, encoding, lensize, len)算出encoding所占的字節,返回給lensize;data所占的字節返回給len

//根據ptr指向的entry求出該entry的len(encoding里存的 data所占字節)和lensize(encoding所占的字節)
 
#define ZIP_DECODE_LENGTH(ptr, encoding, lensize, len) do { \
 
ZIP_ENTRY_ENCODING((ptr), (encoding)); \
 
if ((encoding) < ZIP_STR_MASK) { \
 
if ((encoding) == ZIP_STR_06B) { \
 
(lensize) = 1; \
 
(len) = (ptr)[0] & 0x3f; \
 
} else if ((encoding) == ZIP_STR_14B) { \
 
(lensize) = 2; \
 
(len) = (((ptr)[0] & 0x3f) << 8) | (ptr)[1]; \
 
} else if (encoding == ZIP_STR_32B) { \
 
(lensize) = 5; \
 
(len) = ((ptr)[1] << 24) | \
 
((ptr)[2] << 16) | \
 
((ptr)[3] << 8) | \
 
((ptr)[4]); \
 
} else { \
 
assert(NULL); \
 
} \
 
} else { \
 
(lensize) = 1; \
 
(len) = zipIntSize(encoding); \
 
} \
 
} while(0);
 
 
 
//將ptr的encoding解析成1個字節:00000000、01000000、10000000(字符串類型)和11??????(整數類型)
 
//如果是整數類型,encoding直接照抄ptr的;如果是字符串類型,encoding被截斷成一個字節並清零后6位
 
#define ZIP_ENTRY_ENCODING(ptr, encoding) do { \
 
(encoding) = (ptr[0]); \
 
if ((encoding) < ZIP_STR_MASK) (encoding) &= ZIP_STR_MASK; \
 
} while(0)
 
 
 
//根據encoding返回數據(整數)所占字節數
 
unsigned int zipIntSize(unsigned char encoding) {
 
switch(encoding) {
 
case ZIP_INT_8B: return 1;
 
case ZIP_INT_16B: return 2;
 
case ZIP_INT_24B: return 3;
 
case ZIP_INT_32B: return 4;
 
case ZIP_INT_64B: return 8;
 
default: return 0; /* 4 bit immediate */
 
}
 
assert(NULL);
 
return 0;
 
}

完成以上步驟之后,即可算出data的位置:ptr+prevlensize+lensize,以及data的長度len

ziplist接口

上文已經闡述了ziplist的底層內存布局,接下來看看一些基本的增刪改查操作在ziplist中是如何執行的。

ziplistNew 創建一個ziplist O(1)

 1 /* Create a new empty ziplist. */
 2  
 3 unsigned char *ziplistNew(void) {
 4  
 5 unsigned int bytes = ZIPLIST_HEADER_SIZE+1;//<zlbytes>4字節<zltail>4字節<zllen>2字節<zlend>1字節,沒有entry節點
 6  
 7 unsigned char *zl = zmalloc(bytes);
 8  
 9 ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);//<zlbytes>賦值
10  
11 ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);//<zltail>
12  
13 ZIPLIST_LENGTH(zl) = 0;//<zllen>
14  
15 zl[bytes-1] = ZIP_END;//<zlend>
16  
17 return zl;
18  
19 }
20  
21 #define ZIPLIST_HEADER_SIZE (sizeof(uint32_t)*2+sizeof(uint16_t))//空ziplist除了<zlend>的大小
22  
23 #define ZIPLIST_BYTES(zl) (*((uint32_t*)(zl)))//<zlbyte>的指針的值,可讀可寫
24  
25 #define ZIPLIST_TAIL_OFFSET(zl) (*((uint32_t*)((zl)+sizeof(uint32_t))))//<zltail>的指針的值
26  
27 #define ZIPLIST_HEADER_SIZE (sizeof(uint32_t)*2+sizeof(uint16_t))//空ziplist除了<zlend>的大小
28  
29 #define ZIPLIST_LENGTH(zl) (*((uint16_t*)((zl)+sizeof(uint32_t)*2)))//<zllen>的指針的值

參照着圖1理解會直觀些,分配了一塊內存並初始化<zlbytes><zltail><zllen><zlend>,沒有entry。

ziplistFind 從ziplist里找出一個entry O(n)

  1 //返回p節點之后data與vstr(長度是vlen)相等的節點,只找p節點之后每隔skip的節點
  2  
  3 //時間復雜度 O(n)
  4  
  5 unsigned char *ziplistFind(unsigned char *p, unsigned char *vstr, unsigned int vlen, unsigned int skip) {
  6  
  7 int skipcnt = 0;
  8  
  9 unsigned char vencoding = 0;
 10  
 11 long long vll = 0;
 12  
 13  
 14  
 15 while (p[0] != ZIP_END) {
 16  
 17 unsigned int prevlensize, encoding, lensize, len;
 18  
 19 unsigned char *q;
 20  
 21  
 22  
 23 ZIP_DECODE_PREVLENSIZE(p, prevlensize);
 24  
 25 ZIP_DECODE_LENGTH(p + prevlensize, encoding, lensize, len);
 26  
 27 q = p + prevlensize + lensize;//當前節點的data
 28  
 29  
 30  
 31 if (skipcnt == 0) {
 32  
 33 /* Compare current entry with specified entry */
 34  
 35 if (ZIP_IS_STR(encoding)) {//判斷當前節點是不是字符串節點
 36  
 37 if (len == vlen && memcmp(q, vstr, vlen) == 0) {
 38  
 39 return p;
 40  
 41 }
 42  
 43 } else {
 44  
 45 /* Find out if the searched field can be encoded. Note that
 46  
 47 * we do it only the first time, once done vencoding is set
 48  
 49 * to non-zero and vll is set to the integer value. */
 50  
 51 if (vencoding == 0) {//這個代碼塊只會執行一次,計算vstr的整數表示
 52  
 53 if (!zipTryEncoding(vstr, vlen, &vll, &vencoding)) {
 54  
 55 //將參數給的節點vstr當做整數節點轉換;將data值返回給vll,節點編碼返回給vencoding
 56  
 57 //進入這個代碼塊說明將vstr轉換成整數失敗,vencoding不變,下次判斷當前節點是整數節點之后可以跳過這個節點
 58  
 59 /* If the entry can't be encoded we set it to
 60  
 61 * UCHAR_MAX so that we don't retry again the next
 62  
 63 * time. */
 64  
 65 vencoding = UCHAR_MAX;//當前節點是整數節點,但是vstr是字符串節點,跳過不用比較了
 66  
 67 }
 68  
 69 /* Must be non-zero by now */
 70  
 71 assert(vencoding);
 72  
 73 }
 74  
 75  
 76  
 77 /* Compare current entry with specified entry, do it only
 78  
 79 * if vencoding != UCHAR_MAX because if there is no encoding
 80  
 81 * possible for the field it can't be a valid integer. */
 82  
 83 if (vencoding != UCHAR_MAX) {
 84  
 85 long long ll = zipLoadInteger(q, encoding);//算出當前節點的data
 86  
 87 if (ll == vll) {
 88  
 89 return p;
 90  
 91 }
 92  
 93 }
 94  
 95 }
 96  
 97  
 98  
 99 /* Reset skip count */
100  
101 skipcnt = skip;
102  
103 } else {
104  
105 /* Skip entry */
106  
107 skipcnt--;
108  
109 }
110  
111  
112  
113 /* Move to next entry */
114  
115 p = q + len;
116  
117 }
118  
119  
120  
121 return NULL;
122  
123 }
124  
125  
126  
127 //嘗試將entry地址的內容轉換成整數,並根據這個整數算出一個合適的encoding返回給encoding參數。
128  
129 //若無法轉換成整數,則encoding不變,返回0,等到下次調用zipEncodeLength時再計算一個該字符串的encoding
130  
131 int zipTryEncoding(unsigned char *entry, unsigned int entrylen, long long *v, unsigned char *encoding) {
132  
133 long long value;
134  
135  
136  
137 if (entrylen >= 32 || entrylen == 0) return 0;
138  
139 if (string2ll((char*)entry,entrylen,&value)) {
140  
141 /* Great, the string can be encoded. Check what's the smallest
142  
143 * of our encoding types that can hold this value. */
144  
145 if (value >= 0 && value <= 12) {
146  
147 *encoding = ZIP_INT_IMM_MIN+value;
148  
149 } else if (value >= INT8_MIN && value <= INT8_MAX) {
150  
151 *encoding = ZIP_INT_8B;
152  
153 } else if (value >= INT16_MIN && value <= INT16_MAX) {
154  
155 *encoding = ZIP_INT_16B;
156  
157 } else if (value >= INT24_MIN && value <= INT24_MAX) {
158  
159 *encoding = ZIP_INT_24B;
160  
161 } else if (value >= INT32_MIN && value <= INT32_MAX) {
162  
163 *encoding = ZIP_INT_32B;
164  
165 } else {
166  
167 *encoding = ZIP_INT_64B;
168  
169 }
170  
171 *v = value;
172  
173 return 1;
174  
175 }
176  
177 return 0;
178  
179 }
180  
181  
182  
183 /* Read integer encoded as 'encoding' from 'p' */
184  
185 int64_t zipLoadInteger(unsigned char *p, unsigned char encoding) {
186  
187 int16_t i16;
188  
189 int32_t i32;
190  
191 int64_t i64, ret = 0;
192  
193 if (encoding == ZIP_INT_8B) {
194  
195 ret = ((int8_t*)p)[0];
196  
197 } else if (encoding == ZIP_INT_16B) {
198  
199 memcpy(&i16,p,sizeof(i16));
200  
201 memrev16ifbe(&i16);
202  
203 ret = i16;
204  
205 } else if (encoding == ZIP_INT_32B) {
206  
207 memcpy(&i32,p,sizeof(i32));
208  
209 memrev32ifbe(&i32);
210  
211 ret = i32;
212  
213 } else if (encoding == ZIP_INT_24B) {
214  
215 i32 = 0;
216  
217 memcpy(((uint8_t*)&i32)+1,p,sizeof(i32)-sizeof(uint8_t));
218  
219 memrev32ifbe(&i32);
220  
221 ret = i32>>8;
222  
223 } else if (encoding == ZIP_INT_64B) {
224  
225 memcpy(&i64,p,sizeof(i64));
226  
227 memrev64ifbe(&i64);
228  
229 ret = i64;
230  
231 } else if (encoding >= ZIP_INT_IMM_MIN && encoding <= ZIP_INT_IMM_MAX) {
232  
233 ret = (encoding & ZIP_INT_IMM_MASK)-1;
234  
235 } else {
236  
237 assert(NULL);
238  
239 }
240  
241 return ret;
242  
243 }
244  
  1. 其他接口
  • ziplistInsert 往ziplist里插入一個entry 時間復雜度 平均:O(n), 最壞:O(n²)
  • ziplistDelete 從siplist里刪除一個entry 時間復雜度 平均:O(n), 最壞:O(n²)

為什么插入節點和刪除節點兩個接口的最壞時間復雜度會是O(n²)呢?這是由於ziplist的“連鎖更新”導致的,連鎖更新在最壞情況下需要對ziplist執行n次空間重分配操作,而且每次空間重分配的最壞時間復雜度為O(n) ----《Redis設計與實現》
但是出現“連鎖更新”的情況並不多見,所以這里基本不會造成性能問題。
篇幅有限這里不能細說連鎖更新,感興趣可以閱讀《Redis設計與實現》的相關章節以及ziplist.c里的__ziplistCascadeUpdate()函數。

總結

  • ziplist是為節省內存空間而生的。
  • ziplist是一個為Redis專門提供的底層數據結構之一,本身可以有序也可以無序。當作為list和hash的底層實現時,節點之間沒有順序;當作為zset的底層實現時,節點之間會按照大小順序排列。
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM