一、ziplist簡介
從上一篇分析我們知道quicklist的底層存儲使用了ziplist(壓縮列表),由於壓縮列表本身也有不少內容,所以重新開了一篇,在正式源碼之前,還是先看下ziplist的特點:
1. ziplist是一種特殊編碼的雙向列表,特殊編碼是為了節省存儲空間。
2. ziplist允許同時存放字符串和整型類型,並且整型數被編碼成真實的整型數而不是字符串序列(節省空間)。
3. ziplist列表支持在頭部和尾部進行push和pop操作的時間復雜度都在常量范圍O(1),但是每次操作都涉及內存重新分配,尤其在頭部操作時,會涉及大段的內存移動操作,增加了操作的復雜性。
上面粗體部分會在下面的代碼分析中一一體現(ziplist.h和ziplist.c)。
二、ziplist數據結構
下面我們先看一下ziplist的結構示意圖:
上面示意圖展示了ziplist的整體結構,由於ziplist和entry的長度是不定長的,因此代碼中也沒有這兩個接口的定義,這里先給出一個示意結構定義,方便理解:
struct ziplist<T>{ unsigned int zlbytes; // ziplist的長度字節數,包含頭部、所有entry和zipend。 unsigned int zloffset; // 從ziplist的頭指針到指向最后一個entry的偏移量,用於快速反向查詢 unsigned short int zllength; // entry元素個數 T[] entry; // 元素值 unsigned char zlend; // ziplist結束符,值固定為0xFF } struct entry{ char[var] prevlen; // 前面一個entry的字節長度值。 char[var] encoding; // 元素編碼類型 char[] content; // 元素內容 }
代碼中對ziplist的變量的讀取和賦值都是通過宏來實現的,如下:
#define ZIPLIST_BYTES(zl) (*((uint32_t*)(zl))) #define ZIPLIST_TAIL_OFFSET(zl) (*((uint32_t*)((zl)+sizeof(uint32_t)))) #define ZIPLIST_LENGTH(zl) (*((uint16_t*)((zl)+sizeof(uint32_t)*2))) #define ZIPLIST_HEADER_SIZE (sizeof(uint32_t)*2+sizeof(uint16_t)) #define ZIPLIST_END_SIZE (sizeof(uint8_t)) #define ZIPLIST_ENTRY_HEAD(zl) ((zl)+ZIPLIST_HEADER_SIZE) #define ZIPLIST_ENTRY_TAIL(zl) ((zl)+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) #define ZIPLIST_ENTRY_END(zl) ((zl)+intrev32ifbe(ZIPLIST_BYTES(zl))-1)
entry的結構要稍微復雜一些了,里面對prevlen和encoding做了特殊編碼以節省空間,ziplist的精髓也正是在這里體現的。
首先看prevlen賦值方法的源代碼:
/* Encode the length of the previous entry and write it to "p". Return the * number of bytes needed to encode this length if "p" is NULL. 把前一個entry的長度編碼后寫入當前entry的prevLen字段,編碼規則: 1. 如果len<254,prevLen占用一個字節,並寫入當前entry的第一個字節。 2. 如果len>=254,prevLen占用五個字節,第一個字節固定寫入254,第二個至第五個字節寫入實際的長度。 */ static unsigned int zipPrevEncodeLength(unsigned char *p, unsigned int len) { if (p == NULL) { // 此時只是計算len所需的存儲長度 return (len < ZIP_BIGLEN) ? 1 : sizeof(len)+1; } else { if (len < ZIP_BIGLEN) { p[0] = len; return 1; } else { p[0] = ZIP_BIGLEN; memcpy(p+1,&len,sizeof(len)); memrev32ifbe(p+1); return 1+sizeof(len); } } }
下面再來分析encoding字段,redis根據存儲元素的值做不同的編碼(long long類型和String類型),long long類型編碼也是為了節省空間,其是在zipTryEncoding方法中進行:
/* Check if string pointed to by 'entry' can be encoded as an integer. * Stores the integer value in 'v' and its encoding in 'encoding'. 當存儲內容可以轉化為long long類型時,encoding占用一個字節,其中前2位固定都是1,后面6位根據value值大小不同,具體如下: a. OX11000000 表示content內容是int16,長度是2個字節。 b. OX11010000 表示content內容是int32,長度是4個字節。 c. OX11100000 表示content內容是int64,長度是8個字節。 d. OX11110000 表示content內容是int24,長度是3個字節。 e. OX11111110 表示content內容是int8,長度是1個字節。 f. OX11111111 表示ziplist的結束。 g. 0X1111xxxx 表示極小數,存儲0-12的值,由於0000和1111都不能使用,所以它的實際值將是1至13,程序在取得這4位的值之后,還需要減去1,才能計算出正確的值,比如說,如果后4位為0001 = 1,那么程序返回的值將是1-1=0。 */ static int zipTryEncoding(unsigned char *entry, unsigned int entrylen, long long *v, unsigned char *encoding) { long long value; if (entrylen >= 32 || entrylen == 0) return 0; if (string2ll((char*)entry,entrylen,&value)) { /* Great, the string can be encoded. Check what's the smallest * of our encoding types that can hold this value. */ if (value >= 0 && value <= 12) { *encoding = ZIP_INT_IMM_MIN+value; } else if (value >= INT8_MIN && value <= INT8_MAX) { *encoding = ZIP_INT_8B; } else if (value >= INT16_MIN && value <= INT16_MAX) { *encoding = ZIP_INT_16B; } else if (value >= INT24_MIN && value <= INT24_MAX) { *encoding = ZIP_INT_24B; } else if (value >= INT32_MIN && value <= INT32_MAX) { *encoding = ZIP_INT_32B; } else { *encoding = ZIP_INT_64B; } *v = value; return 1; } return 0; }
上述方法定義了是否可以編碼為long long類型,如果不能,則編碼為String類型並賦值,編碼代碼在zipEncodeLength方法:
/* Encode the length 'rawlen' writing it in 'p'. If p is NULL it just returns * the amount of bytes required to encode such a length. 本方法對encoding是String類型時,進行編碼並賦值(如果entry內容可以轉化為long long類型,在zipTryEncoding方法中進行編碼),並根據不同長度的字符串來編碼encoding的值,具體如下: a. 0X00xxxxxx 前兩位00表示最大長度為63的字符串,后面6位表示實際字符串長度,encoding占用1個字節。 b. 0X01xxxxxx xxxxxxxx 前兩位01表示中等長度的字符串(大於63小於等於16383),后面14位表示實際長度,encoding占用兩個字節。 c. OX10000000 xxxxxxxx xxxxxxxx xxxxxxxx 表示特大字符串,第一個字節固定128(0X80),后面四個字節存儲實際長度,encoding占用5個字節。 */ static unsigned int zipEncodeLength(unsigned char *p, unsigned char encoding, unsigned int rawlen) { unsigned char len = 1, buf[5]; if (ZIP_IS_STR(encoding)) { /* Although encoding is given it may not be set for strings, * so we determine it here using the raw length. */ if (rawlen <= 0x3f) { if (!p) return len; buf[0] = ZIP_STR_06B | rawlen; } else if (rawlen <= 0x3fff) { len += 1; if (!p) return len; buf[0] = ZIP_STR_14B | ((rawlen >> 8) & 0x3f); buf[1] = rawlen & 0xff; } else { len += 4; if (!p) return len; buf[0] = ZIP_STR_32B; buf[1] = (rawlen >> 24) & 0xff; buf[2] = (rawlen >> 16) & 0xff; buf[3] = (rawlen >> 8) & 0xff; buf[4] = rawlen & 0xff; } } else { /* Implies integer encoding, so length is always 1. */ if (!p) return len; buf[0] = encoding; } /* Store this length at p */ memcpy(p,buf,len); return len; }
三、ziplist增刪改查
1. 創建ziplist
在執行lpush命令時,如果當前quicklistNode是新建的,則需要新建一個ziplist:
/* Add new entry to head node of quicklist. * * Returns 0 if used existing head. * Returns 1 if new head created. 在quicklist的頭部節點添加新元素: 如果新元素添加在head中,返回0,否則返回1. */ int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) { quicklistNode *orig_head = quicklist->head; // 如果head不為空,且空間大小滿足新元素的存儲要求,則新元素添加到head中,否則新加一個quicklistNode if (likely( _quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) { quicklist->head->zl = ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD); quicklistNodeUpdateSz(quicklist->head); } else { // 創建新的quicklistNode quicklistNode *node = quicklistCreateNode(); // 把新元素添加到新建的ziplist中 node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD); // 更新ziplist的長度到quicklistNode的sz字段 quicklistNodeUpdateSz(node); // 把新node添加到quicklist中,即添加到原head前面 _quicklistInsertNodeBefore(quicklist, quicklist->head, node); } quicklist->count++; quicklist->head->count++; return (orig_head != quicklist->head); }
/* Create a new empty ziplist. */ unsigned char *ziplistNew(void) { unsigned int bytes = ZIPLIST_HEADER_SIZE+1; unsigned char *zl = zmalloc(bytes); ZIPLIST_BYTES(zl) = intrev32ifbe(bytes); ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE); ZIPLIST_LENGTH(zl) = 0; zl[bytes-1] = ZIP_END; return zl; }
2. 添加entry
添加entry的代碼在ziplistPush方法中:
unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where) { unsigned char *p; p = (where == ZIPLIST_HEAD) ? ZIPLIST_ENTRY_HEAD(zl) : ZIPLIST_ENTRY_END(zl); return __ziplistInsert(zl,p,s,slen); } /* Insert item at "p". zl中添加一個元素 */ static unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) { size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen; unsigned int prevlensize, prevlen = 0; size_t offset; int nextdiff = 0; unsigned char encoding = 0; long long value = 123456789; /* initialized to avoid warning. Using a value that is easy to see if for some reason we use it uninitialized. */ zlentry tail; /* Find out prevlen for the entry that is inserted. */ if (p[0] != ZIP_END) { ZIP_DECODE_PREVLEN(p, prevlensize, prevlen); } else { // 當之前的操作從尾巴刪除元素時,ZIPLIST_ENTRY_TAIL指針會向前遷移,此時ptail[0] != ZIP_END unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl); if (ptail[0] != ZIP_END) { prevlen = zipRawEntryLength(ptail); } } /* See if the entry can be encoded */ // 檢查entry的value是否可以編碼為long long類型,如果可以就把值保存在value中, // 並把所需最小字節長度保存在encoding if (zipTryEncoding(s,slen,&value,&encoding)) { /* 'encoding' is set to the appropriate integer encoding */ reqlen = zipIntSize(encoding); } else { /* 'encoding' is untouched, however zipEncodeLength will use the * string length to figure out how to encode it. */ reqlen = slen; } /* We need space for both the length of the previous entry and * the length of the payload. */ reqlen += zipPrevEncodeLength(NULL,prevlen); reqlen += zipEncodeLength(NULL,encoding,slen); /* When the insert position is not equal to the tail, we need to * make sure that the next entry can hold this entry's length in * its prevlen field. */ nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0; // reqlen是zlentry所需大小,nextdiff是待插入位置原entry中prelen與新entry中prelen所需存儲空間的大小差值。 /* Store offset because a realloc may change the address of zl. */ offset = p-zl; zl = ziplistResize(zl,curlen+reqlen+nextdiff); p = zl+offset; /* Apply memory move when necessary and update tail offset. */ if (p[0] != ZIP_END) { /* Subtract one because of the ZIP_END bytes */ // 原數據向后移動,騰出空間寫入新的zlentry memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff); /* Encode this entry's raw length in the next entry. */ // 新entry的長度寫入下一個zlentry的prelen zipPrevEncodeLength(p+reqlen,reqlen); /* Update offset for tail */ // 更新ZIPLIST_TAIL_OFFSET指向原來的tail entry。 ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen); /* When the tail contains more than one entry, we need to take * "nextdiff" in account as well. Otherwise, a change in the * size of prevlen doesn't have an effect on the *tail* offset. */ zipEntry(p+reqlen, &tail); // 如果原插入位置的entry不是最后的tail元素,需要調整ZIPLIST_TAIL_OFFSET值(增加nextdiff) if (p[reqlen+tail.headersize+tail.len] != ZIP_END) { ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff); } } else { /* This element will be the new tail. */ // ZIPLIST_TAIL_OFFSET指向新加的entry,即新加的entry是tail元素 ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl); } /* When nextdiff != 0, the raw length of the next entry has changed, so * we need to cascade the update throughout the ziplist */ if (nextdiff != 0) { // 如果nextdiff不為0,需要循環更新后續entry中的prelen,最差情況下,所有entry都需要更新一遍 offset = p-zl; zl = __ziplistCascadeUpdate(zl,p+reqlen); p = zl+offset; } /* Write the entry */ // 給新加的entry賦值 p += zipPrevEncodeLength(p,prevlen); p += zipEncodeLength(p,encoding,slen); if (ZIP_IS_STR(encoding)) { memcpy(p,s,slen); } else { zipSaveInteger(p,value,encoding); } ZIPLIST_INCR_LENGTH(zl,1); return zl; }
從上面代碼可以看出,如果是在頭部添加元素時,需要把執行memmove方法把當前ziplist中的所有元素后移一段距離,消耗還是比較大的。
3. 刪除entry
刪除操作在ziplistDelete方法中實現,其邏輯和添加剛剛相反,就不再贅述了。
至此,ziplist的主體代碼就分析結束了,從代碼可以看到,ziplist的實現非常精妙,盡可能的節省存儲空間,但是在頭部操作時,會有大量的內存移動操作,消耗挺大,在尾部操作時,無內存移動,效率則要高很多。
本篇內容參考了錢文品的《Redis深度歷險:核心原理與應用實踐》,特此感謝!