Redis在最新的5.0.0版本中也加入了消息隊列的功能,這就是Stream。
8.1 Stream簡介

圖8-1 Redis Stream結構圖
命令: xadd mystream1 * name hb age 20
mystream1為Stream的名稱;
*代表由Redis自行生成消息ID;
name、age為該消息的field;
hb、20則為對應的field的值。
每個消息都由以下兩部分組成。
·每個消息有唯一的消息ID,消息ID嚴格遞增。
·消息內容由多個field-value對組成。
當消費者不歸屬於任何消費組時,該消費者可以消費消息隊列中的任何消息。
消費組特點。
·每個消費組通過組名稱唯一標識,每個消費組都可以消費該消息隊列的全部消息,多個消費組之間相互獨立。
·每個消費組可以有多個消費者,消費者通過名稱唯一標識,消費者之間的關系是競爭關系,也就是說一個消息只能由該組的一個成員消費。
·組內成員消費消息后需要確認,每個消息組都有一個待確認消息隊列(pending entry list,pel),用以維護該消費組已經消費但沒有確認的消息。
·消費組中的每個成員也有一個待確認消息隊列,維護着該消費者已經消費尚未確認的消息。
RedisStream的底層實現主要使用了listpack以及Rax樹。
8.1.1 Stream底層結構listpack
Redis listpack可用於存儲字符串或者整型。圖8-2為listpack的整體結構圖。

圖8-2 listpack結構圖
listpack由4部分組成:Total Bytes、Num Elem、Entry以及End,
下面介紹各部分的具體含義。
1)Total Bytes為整個listpack的空間大小,占用4個字節,每個listpack最多占用4294967295Bytes。
2)Num Elem為listpack中的元素個數,即Entry的個數,占用2個字節,值得注意的是,這並不意味着listpack最多只能存放65535個Entry,當Entry個數大於等於65535時,Num Elem被設置為65535,此時如果需要獲取元素個數,需要遍歷整個listpack。
3)End為listpack結束標志,占用1個字節,內容為0xFF。
4)Entry為每個具體的元素。其內容可以為字符串或者整型,每個Entry由3部分組成,每部分的具體含義如下。Encode為該元素的編碼方式,占用1個字節,之后是內容字段content,二者緊密相連。表8-1詳細介紹了Encode字段。
表8-1 listpack Encode

backlen記錄了這個Entry的長度(Encode+content),注意並不包括backlen自身的長度,占用的字節數小於等於5。backlen所占用的每個字節的第一個bit用於標識;0代表結束,1代表尚未結束,每個字節只有7 bit有效。值得一提的是,backlen主要用於從后向前遍歷,當我們需要找到當前元素的上一個元素時,我們可以從后向前依次查找每個字節,找到上一個Entry的backlen字段的結束標識,進而可以計算出上一個元素的長度。例如backlen為0000000110001000,代表該元素的長度為00000010001000,即136字節。通過計算即可算出上一個元素的首地址(entry的首地址)。值得注意的是,在整型存儲中,並不實際存儲負數,而是將負數轉換為正數進行存儲。例如,在13位整型存儲中,存儲范圍為[0,8191],其中[0,4095]對應非負的[0,4095](當然,[0,127]將會采用7位無符號整型存儲),而[4096,8191]則對應[-4096,-1]。
8.1.2 Stream底層結構Rax簡介
1.概要
前綴樹是字符串查找時,經常使用的一種數據結構,能夠在一個字符串集合中快速查找到某個字符串,下面給出一個簡單示例,如圖8-3所示。
圖8-3 前綴樹示例 圖8-4 只有一個壓縮節點的Rax
Rax樹通過節點壓縮節省空間,只有一個key(foo)的Rax樹如圖8-4所示,其中中括號代表非壓縮節點,雙引號代表壓縮節點(壓縮節點,非壓縮節點下文將詳細介紹),(iskey=1)代表該節點存儲了一個key,
在上述節點的基礎上插入key(foobar)后,Rax樹結構如圖8-5所示。

圖8-5 包含兩個壓縮節點的Rax
含有兩個key(foobar,footer)的Rax樹結構圖如圖8-6所示。

圖8-6 含有foobar、footer兩個key的Rax
對於非壓縮節點,其內部字符是按照字典序排序的,例如上述第二個節點,含有2個字符b、t,二者是按照字典序排列的。
2.關鍵結構體介紹
1)rax結構代表一個Rax樹,它包含3個字段,指向頭節點的指針,元素個數(即key的個數)以及節點個數。
typedef struct rax {
raxNode *head;
uint64_t numele;
uint64_t numnodes;
} rax;
2)raxNode代表Rax樹中的一個節點,它的定義如下:
typedef struct raxNode {
uint32_t iskey:1; /*當前節點是否包含一個key,占用1bit*/
uint32_t isnull:1; /* 當前key對應的value是否為空,占用1bit */
uint32_t iscompr:1; /* 當前節點是否為壓縮節點,占用1bit*/
uint32_t size:29; /* 為壓縮節點壓縮的字符串長度或者非壓縮節點的子節點個
數,占用29bit; */
unsigned char data[]; // 中包含填充字段,同時存儲了當前節點包含的字符串以及子
節點的指針、key對應的value指針。
} raxNode;
raxNode分為2類,壓縮節點和非壓縮節點,下面分別進行介紹。
1)壓縮節點 。我們假設該節點存儲的內容為字符串ABC,其結構圖如圖8-7所示。

圖8-7 壓縮節點示例圖
·iskey為1且isnull為0時,value-ptr存在,否則value-ptr不存在;
·iscompr為1代表當前節點是壓縮節點,size為3代表存儲了3個字符;
·緊隨size的是該節點存儲的字符串,根據字符串的長度確定是否需要填充字段(填充必要的字節,使得后面的指針地址放到合適的位置上);
·由於是壓縮字段,故而只有最后一個字符有子節點。(c-ptr)
2)非壓縮節點 。我們假設其內容為XY,結構圖如圖8-8所示。

圖8-8 非壓縮節點示例圖
與壓縮節點的不同點在於,每個字符都有一個子節點,值得一提的是,字符個數小於2時,都是非壓縮節點。為了實現Rax樹的遍歷,Redis提供了raxStack及raxIterator兩種結構,下面逐一介紹。
①raxStack結構用於存儲從根節點到當前節點的路徑,具體定義如下:
#define RAX_STACK_STATIC_ITEMS 32
typedef struct raxStack {
void **stack; /*用於記錄路徑,該指針可能指向static_items(路徑較短時)或者堆空間內存; */
size_t items, maxitems; /* 代表stack指向的空間的已用空間以及最大空間 */
void *static_items[RAX_STACK_STATIC_ITEMS];
int oom; /* 代表當前棧是否出現過內存溢出. */
} raxStack;
②raxIterator用於遍歷Rax樹中所有的key,該結構的定義如下:
typedef struct raxIterator {
int flags; //當前迭代器標志位,目前有3種,
RAX_ITER_JUST_SEEKED代表當前迭代器指向的元素是剛剛搜索過的,當需要從迭代器中獲取元素時,直接返回當前元素並清空該標志位即可;
RAX_ITER_EOF代表當前迭代器已經遍歷到rax樹的最后一個節點;
RAX_ITER_SAFE代表當前迭代器為安全迭代器,可以進行寫操作。
rax *rt; /* 當前迭代器對應的rax */
unsigned char *key; /*存儲了當前迭代器遍歷到的key,該指針指向
key_static_string或者從堆中申請的內存。*/
void *data; /* 當前key關聯的value值 */
size_t key_len; /* key指向的空間的已用空間 */
size_t key_max; /*key最大空間 */
unsigned char key_static_string[RAX_ITER_STATIC_LEN]; //默認存儲空間,當key比較大時,會使用堆空間內存。
raxNode *node; /* 當前key所在的raxNode */
raxStack stack; /* 記錄了從根節點到當前節點的路徑,用於raxNode的向上遍歷。*/
raxNodeCallback node_cb; /* 為節點的回調函數,通常為空*/
} raxIterator;
8.1.3 Stream結構

圖8-9 Stream結構示例
每個消息流都包含一個Rax結構。以消息ID為key、listpack結構為value存儲在Rax結構中。每個消息的具體信息存儲在這個listpack中。以下亮點是值得注意的。
1)每個listpack都有一個master entry,該結構中存儲了創建這個listpack時待插入消息的所有field,這主要是考慮同一個消息流,消息內容通常具有相似性,如果后續消息的field與master entry內容相同,則不需要再存儲其field。
2)每個listpack中可能存儲多條消息。
-
消息存儲
(1)消息ID
streamID定義如下,以每個消息創建時的時間(1970年1月1號至今的毫秒數)以及序號組成,共128位。
typedef struct streamID {
uint64_t ms; /* Unix time in milliseconds. */
uint64_t seq; /* Sequence number. */
} streamID;
(2)消息存儲的格式
Stream的消息內容存儲在listpack中, listpack用於存儲字符串或者整型數據,listpack中的單個元素稱為entry,下文介紹的消息存儲格式的每個字段都是一個entry,並不是將整個消息作為字符串儲存的。值得注意的是,每個listpack會存儲多個消息,具體存儲的消息個數是由stream-node-max-bytes(listpack節點最大占用的內存數,默認4096)和stream-node-max-entries(每個listpack最大存儲的元素個數,默認100)決定的。
·每個消息會占用多個listpack entry。
·每個listpack會存儲多個消息。
每個listpack在創建時,會構造該節點的master entry(根據第一個插入的消息構建),其結構如圖8-10所示。

圖8-10 listpack master entry結構
·count 為當前listpack中的所有未刪除的消息個數。
·deleted 為當前listpack中所有已經刪除的消息個數。
·num-fields 為下面的field的個數。
·field-1,…,filed-N 為當前listpack中第一個插入的消息的所有field
域。
·0 為標識位,在從后向前遍歷該listpack的所有消息時使用。
處省略了listpack每個元素存儲時的encoding以及backlen字段;
消息的field域與master entry的域完全相同
存儲一個消息時,如果該消息的field域與master entry的域完全相同,則不需要再次存儲field域,此時其消息存儲如圖8-11所示。

圖8-11 消息存儲
·flags字段為消息標志位,STREAM_ITEM_FLAG_NONE代表無特殊標識, STREAM_ITEM_FLAG_DELETED代表該消息已經被刪除, STREAM_ITEM_FLAG_SAMEFIELDS代表該消息的field域與master entry完全相同。
·streamID.ms以及streamID.seq為該消息ID減去master entry id之后的值。
·value域存儲了該消息的每個field域對應的內容。
·lp-count為該消息占用listpack的元素個數,也就是3+N。
消息的field域與master entry不完全相同
如果該消息的field域與master entry不完全相同,此時消息的存儲如圖8-12所示。
·flags為消息標志位,與上面一致;
·streamID.ms,streamID.seq為該消息ID減去master entry id之后的值;
·num-fields為該消息field域的個數;
·field-value存儲了消息的域值對,也就是消息的具體內容;
·lp-count為該消息占用的listpack的元素個數,也就是4+2N。

圖8-12 消息存儲
2.關鍵結構體介紹
- stream。
typedef struct stream {
rax *rax; /* 存儲生產者生產的具體消息,以消息ID為鍵,消息內容為值存儲在rax中,值得注意的是,rax中的一個節點可能存儲多個消息*/
uint64_t length; /*當前stream中的消息個數(不包括已經刪除的消息)。*/
streamID last_id; /* 當前stream中最后插入的消息的ID,stream空時,設置為0。. */
rax *cgroups; /* 存儲了當前stream相關的消費組,rax中: name -> streamCG */
} stream;
- 消費組。
每個Stream會有多個消費組,每個消費組通過組名稱進行唯一標識,同時關聯一個streamCG結構,該結構定義如下:
typedef struct streamCG {
streamID last_id; // 該消費組已經確認的最后一個消息的ID
rax *pel; // 該消費組尚未確認的消息,消息ID為鍵,streamNACK(一個尚未確認的消息)為值;
rax *consumers; // 該消費組中所有的消費者,消費者的名稱為鍵,streamConsumer(代表一個消費者)為值。
} streamCG;
- 消費者。
每個消費者通過streamConsumer唯一標識,該結構如下:
typedef struct streamConsumer {
mstime_t seen_time; /* 該消費者最后一次活躍的時間; */
sds name; /* C消費者的名稱*/
rax *pel; /* 消費者尚未確認的消息,以消息ID為鍵,streamNACK為值。 */
} streamConsumer;
- 未確認消息。
未確認消息(streamNACK)維護了消費組或者消費者尚未確認的消息,值得注意的是,消費組中的pel的元素與每個消費者的pel中的元素是共享的,即該消費組消費了某個消息,這個消息會同時放到消費組以及該消費者的pel隊列中,並且二者是同一個streamNACK結構。
/* Pending (yet not acknowledged) message in a consumer group. */
typedef struct streamNACK {
mstime_t delivery_time; /* 該消息最后發送給消費方的時間 */
uint64_t delivery_count; /*為該消息已經發送的次數(組內的成員可以通過xclaim命令獲取某個消息的處理權,該消息已經分給組內另一個消費者但其並沒有確認該消息)。*/
streamConsumer *consumer; /* 該消息當前歸屬的消費者 */
} streamNACK;
5)迭代器。為了遍歷stream中的消息:
typedef struct streamIterator {
stream *stream; /*當前迭代器正在遍歷的消息流 */
streamID master_id; /* 消息內容實際存儲在listpack中,每個listpack都有一個masterentry(也就是第一個插入的消息),master_id為該消息id */
uint64_t master_fields_count; /* master entry中field域的個數. */
unsigned char *master_fields_start; /*master entry field域存儲的首地址*/
unsigned char *master_fields_ptr; /*當listpack中消息的field域與master entry的field域完全相同時,該消息會復用master entry的field域,在我們遍歷該消息時,需要記錄
當前所在的field域的具體位置,master_fields_ptr就是實現這個功能的。 */
int entry_flags; /* 當前遍歷的消息的標志位 */
int rev; /*當前迭代器的方向 */
uint64_t start_key[2]; /* 該迭代器處理的消息ID的范圍 */
uint64_t end_key[2]; /* End key as 128 bit big endian. */
raxIterator ri; /*rax迭代器,用於遍歷rax中所有的key. */
unsigned char *lp; /* 當前listpack指針*/
unsigned char *lp_ele; /* 當前正在遍歷的listpack中的元素, cursor. */
unsigned char *lp_flags; /* Current entry flags pointer.指向當前消息的flag域 */
//用於從listpack讀取數據時的緩存
unsigned char field_buf[LP_INTBUF_SIZE];
unsigned char value_buf[LP_INTBUF_SIZE];
} streamIterator;
8.2 Stream底層結構listpack的實現
結構查詢效率低,並且只適合於末尾增刪。考慮到消息流中,通常只需要向其末尾增加消息,故而可以采用該結構;
8.2.1 初始化

圖8-13 listpack初始化
/* Create a new, empty listpack.
* On success the new listpack is returned, otherwise an error is returned. */
unsigned char *lpNew(void) {
// LP_HDR_SIZE = 6,為listpack的頭部
unsigned char *lp = lp_malloc(LP_HDR_SIZE+1); // 申請空間
if (lp == NULL) return NULL;
lpSetTotalBytes(lp,LP_HDR_SIZE+1);
lpSetNumElements(lp,0);
lp[LP_HDR_SIZE] = LP_EOF; // LP_EOF = 0xFF
return lp;
}
8.2.2 增刪改操作
listpack提供了2種添加元素的方式:
一種是在任意位置插入元素,一種是在末尾插入元素。在末尾插入元素的底層實現通過調用任意位置插入元素進行,具體實現為lpInsert函數。
listpack的刪除操作被轉換為用空元素替換的操作。
listpack的替換操作(即改操作)的底層實現也是通過lpInsrt函數實現的。
lpInsert 函數定義:
unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, unsigned char *p, int where, unsigned char **newp);
·lp 為當前待操作的listpack;
·ele 為待插入的新元素或者待替換的新元素,ele為空時,也就是刪除操作;
·size 為ele的長度;
·p 為待插入的位置或者帶替換的元素位置;
·where 有LP_BEFORE(前插)、LP_AFTER(后插)、LP_REPLACE(替換);
·*newp 用於返回插入的元素、替換的元素、刪除元素的下一個元素。
該函數返回null或者插入的元素,替換的元素,刪除元素的下一個元素。
刪除或者替換的主要過程如下:
1)計算需要插入的新元素或者替換舊元素的新元素需要的空間;
2)計算進行插入或者替換后整個listpack所需的空間,通過realloc申請空間;
3)調整新的listpack中的老的元素的位置,為待操作元素預留空間;
4)釋放舊的listpack;
5)在新的listpack中進行插入或替換的操作;
6)更新新的listpack結構頭部的統計信息。
8.2.3 遍歷操作
核心思想是利用每個entry的encode或者backlen字段獲取當前entry的長度;
unsigned char *lpFirst(unsigned char *lp); //獲取第一個元素位置
unsigned char *lpLast(unsigned char *lp); //獲取最后一個元素位置
unsigned char *lpNext(unsigned char *lp, unsigned char *p); //下一個元素位置
unsigned char *lpPrev(unsigned char *lp, unsigned char *p); //上一個元素位置
例如:
unsigned char *lpFirst(unsigned char *lp) {
lp += LP_HDR_SIZE; /* Skip the header. */ LP=+6
if (lp[0] == LP_EOF) return NULL; //0xFF
return lp;
}
此處獲取的僅僅是某個entry首地址的指針,如果要讀取當前元素則需要使用下 lpGet接口;
8.2.4 讀取元素
lpGet用於獲取p指向的Listpack中真正存儲的元素:
①當元素采用字符串編碼時,返回字符串的第一個元素位置,count為元素個數;
②當采用整型編碼時,若intbuf不為空,則將整型數據轉換為字符串存儲在intbuf中,count為元素個數,並返回intbuf。若intbuf為空,直接將數據存儲在count中,返回null。
unsigned char *lpGet(unsigned char *p, int64_t *count, unsigned char *intbuf)
lpGet的實現較為簡單,主要是利用了每個entry的encode字段(p[0];
8.3 Stream底層結構Rax的實現
Stream的消息內容存儲在listpack中,但是如果將所有消息都存儲在一個listpack中,則會存在效率問題。例如,查詢某個消息時,需要遍歷整個listpack;插入消息時,需要重新申請一塊很大的空間。為了解決這些問題,Redis Stream通過Rax組織這些listpack ;
8.3.1 初始化
/* 分配一個新的rax並返回其指針。在內存不足時,函數*返回NULL. */
rax *raxNew(void) {
rax *rax = rax_malloc(sizeof(*rax)); //申請空間
if (rax == NULL) return NULL;
rax->numele = 0; //當前元素個數為0
rax->numnodes = 1; //當前節點個數為1
rax->head = raxNewNode(0,0); //構造頭節點
if (rax->head == NULL) {
rax_free(rax);
return NULL;
} else {
return rax;
}
}

圖8-14 Rax初始化
8.3.2 查找元素
/* 獲取key對應的value值, */
//在rax中查找長度為len的字符串s(s為rax中的一個key), 找到返回該key對應的value
void *raxFind(rax *rax, unsigned char *s, size_t len) {
raxNode *h;
int splitpos = 0;
size_t i = raxLowWalk(rax,s,len,&h,NULL,&splitpos,NULL);
if (i != len || (h->iscompr && splitpos != 0) || !h->iskey)
return raxNotFound; //沒有找到這個key
return raxGetData(h); //查到key, 將key對應的value返回
}
raxLowWalk為查找key的核心函數 ;
static inline size_t raxLowWalk(rax *rax, unsigned char *s, size_t len, raxNode **stopnode, raxNode ***plink, int *splitpos, raxStack *ts) ;
·rax 為待查找的Rax;
·s 為待查找的key;
·len 為s的長度;
·*stopnode 為 找過程中的終止節點,也就意味着,當rax查找到該節點時,待查找的key已經匹配完成,或者當前節點無法與帶查找的key匹配;
·*plink 用於記錄父節點中指向*stopnode的指針的位置,當*stopnode變化時,也需要修改父節點指向該節點的指針;
·*splitpos 用於記錄壓縮節點的匹配位置;
·當ts 不為空時,會將查找該key的路徑寫入該變量。
該函數返回s的匹配長度,當s!=len時,表示未查找到該key;當s==len時,需要檢驗*stopnode是否為key,並且當*stopnode為壓縮節點時,還需要檢查splitpos是否為0(可能匹配到某個壓縮節點中間的某個元素)。
raxLowWalk函數的執行過程可以分為如下幾步。
1)初始化變量。
2)從rax根節點開始查找,知道當前待查找節點無子節點或者s查找完畢。對於每個節點來說,如果為壓縮節點,則需要與s中的字符完全匹配。如果為非壓縮節點,則查找與當前待匹配字符相同的字符。
3)如果當前待匹配節點能夠與s匹配,則移動位置到其子節點,繼續匹配。
raxNode *h = rax->head; // 從根節點開始匹配
raxNode **parentlink = &rax->head;
size_t i = 0; /*當前待匹配字符位置. */
size_t j = 0; /* 當前匹配的節點的位置*/
while(h->size && i < len) { // 當前節點有子節點且尚未走到s字符串的末尾
unsigned char *v = h->data;
if (h->iscompr) { // 壓縮節點是否能夠完全匹配s字符串
for (j = 0; j < h->size && i < len; j++, i++) {
if (v[j] != s[i]) break;
}
if (j != h->size) break; // 當前壓縮節點不能完全匹配或者s已經到達末尾
} else {
/* 非壓縮節點遍歷節點元素, 查找與當前字符匹配的位置*/
for (j = 0; j < h->size; j++) {
if (v[j] == s[i]) break;
}
if (j == h->size) break; // 未在非壓縮節點找到匹配的字符
i++; // 非壓縮節點可以匹配, 移動到s的下一個字符
}
// 當前節點能夠匹配s
if (ts) raxStackPush(ts,h); /* Save stack of parent nodes. */
raxNode **children = raxNodeFirstChildPtr(h);
if (h->iscompr) j = 0; /* Compressed node only child is at index 0. */
memcpy(&h,children+j,sizeof(h)); // 將當前節點移動到其第 j個子節點
parentlink = children+j;
j = 0;
}
if (stopnode) *stopnode = h;
if (plink) *plink = parentlink;
if (splitpos && h->iscompr) *splitpos = j;
return i;
8.3.3 添加元素
對於已存在的key,rax提供了2種方案,覆蓋或者不覆蓋原有的value,對應的接口分別為raxInsert、raxTryInsert,兩個接口的定義如下:
/* 覆蓋插入*/
int raxInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old) {
return raxGenericInsert(rax,s,len,data,old,1);
}
/*非覆蓋插入函數:如果存在具有相同鍵的元素,則不更新值,並且返回0。 */
int raxTryInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old) {
return raxGenericInsert(rax,s,len,data,old,0);
}
raxGenericInsert 函數
函數參數與raxInsert基本一致,只是增加overwrite用於標識key存在時是否覆蓋;
1.查找key是否存在
size_t i;
int j = 0; /* 分割位置。如果raxLowWalk()在壓縮節點中停止,則索引" j"表示我們在壓縮節點中停止的字符,即拆分該節點以進行插入的位置 */
raxNode *h, **parentlink;
i = raxLowWalk(rax,s,len,&h,&parentlink,&j,NULL);
2.找到key
根據raxLowWalk的返回值,如果當前key已經存在,則直接對該節點進行操作 ;
if (i == len && (!h->iscompr || j == 0 /* not in the middle if j is 0 */)) {
/*查看之前是否存儲value,沒有則申請空間 . */
if (!h->iskey || (h->isnull && overwrite)) {
h = raxReallocForData(h,data);
if (h) memcpy(parentlink,&h,sizeof(h));
}
if (h == NULL) {
errno = ENOMEM;
return 0;
}
/* 更新存在的key. */
if (h->iskey) {
if (old) *old = raxGetData(h);
if (overwrite) raxSetData(h,data);
errno = 0;
return 0; /* Element already exists. */
}
/*否則,將節點設置為鍵set h->iskey. */
raxSetData(h,data);
rax->numele++;
return 1; /* Element inserted. */
}
3.key不存在
1)在查找key的過程中,如果最后停留在某個壓縮節點上,此時需要對該壓縮節點進行拆分,具體拆分情況分為以下幾種,以圖8-15為例。

圖8-15 Rax節點拆分
·插入key"ciao",需要將"annibale"節點拆分為2部分:非壓縮節點,壓縮節點。
·插入key"ago",需要將"annibale"節點拆分為3部分:非壓縮節點,非壓縮節點,壓縮節點。
·插入key"annienter",需要將"annibale"節點拆分3部分:壓縮節點,非壓縮節點,壓縮節點。
·插入key"annibaie",需要將"annibale"拆成3部分:壓縮節點,非壓縮節點,非壓縮節點。
·插入key"annibali",需要將"annibale"拆成2部分:壓縮節點,非壓縮節點。
·插入key"a",將"annibale"拆分成2部分:非壓縮節點,壓縮節點。
·插入key"anni",將"annibale"拆分成2個壓縮節點。
總體而言分為2類:
新插入的key是當前節點的一部分: 將壓縮節點進行拆分后直接設置新的key-value即可
新插入的key和壓縮節點的某個位置不匹配: 對 需要在拆分后的相應位置的非壓縮節點中插入新key的相應不匹配字符,之后將新key的剩余部分插入到這個非壓縮節點的子節點中。
2)如果查找key完成后,不匹配節點為某個非壓縮節點,或者某個壓縮節點的某個字符不匹配,進行節點拆分后導致的不匹配位置為拆分后創建的非壓縮節點,此時僅僅需要將當前待匹配字符插入到這個非壓縮節點上(注意字符按照字典序排列),並為其創建子節點。之后,將剩余字符放入新建的子節點中即可(如果字符長度過長,需要進行分割)。
8.3.4 刪除元素
Rax的刪除操作主要有3個接口,可以刪除rax中的某個key,或者釋放整個rax,在釋放rax時,還可以設置釋放回調函數,在釋放rax的每個key時,都會調用這個回調函數;
// 在rax中刪除長度為len的s(s代表待刪除的key), *old用於返回該key對應的value
int raxRemove(rax *rax, unsigned char *s, size_t len, void **old);
// 釋放rax
void raxFree(rax *rax);
// 釋放rax,釋放每個key時,都會調用free_callback函數
void raxFreeWithCallback(rax *rax, void (*free_callback)(void*));
rax的釋放操作,采用的是深度優先算法;
raxRemove函數
當刪除rax中的某個key-value對時,首先查找key是否存在,不存在則直接返回,存在則需要進行刪除操作。
raxNode *h;
raxStack ts;
raxStackInit(&ts);
int splitpos = 0;
size_t i = raxLowWalk(rax,s,len,&h,NULL,&splitpos,&ts);
if (i != len || (h->iscompr && splitpos != 0) || !h->iskey) {
raxStackFree(&ts); // 沒有找到需要刪除的key
return 0;
}
如果key存在,則需要進行刪除操作,刪除操作完成后,Rax樹可能需要進行壓縮。具體可以分為下面2種情況,此處所說的壓縮是指將某個節點與其子節點壓縮成一個節點,葉子節點沒有子節點,不能進行壓縮。
1)某個節點只有一個子節點,該子節點之前是key,經刪除操作后不再是key,此時可以將該節點與其子節點壓縮,如圖8-16所示,刪除foo后,可以將Rax進行壓縮,壓縮后為"foobar"->[](iskey=1)。
圖8-16 Rax節點壓縮 圖8-17 Rax節點壓縮
2)某個節點有兩個子節點,經過刪除操作后,只剩下一個子節點,如果這個子節點不是key,則可以將該節點與這個子節點壓縮。如圖8-17所示,刪除foobar后,可以將Rax樹進行壓縮,壓縮成"footer"->[](iskey=1)。
刪除操作具體可以分為2個階段,刪除階段以及壓縮階段。例如,圖8-17刪除"foobar"時,需要從下向上,刪除可以刪除的節點。圖8-16在刪除"foo"時,則不需要刪除節點。這部分的實現邏輯主要是利用查找key時記錄的匹配路徑,依次向上直到無法刪除為止。
if (h->size == 0) {
raxNode *child = NULL;
while(h != rax->head) {
child = h;
rax_free(child);
rax->numnodes--;
h = raxStackPop(&ts);
/* 如果節點為key或者子節點個數不為1,則無法繼續刪除 */
if (h->iskey || (!h->iscompr && h->size != 1)) break;
}
if (child) {
raxNode *new = raxRemoveChild(h,child);
if (new != h) {
raxNode *parent = raxStackPeek(&ts);
raxNode **parentlink;
if (parent == NULL) {
parentlink = &rax->head;
} else {
parentlink = raxFindParentLink(parent,h);
}
memcpy(parentlink,&new,sizeof(new));
}
/* 刪除后查看是否可以嘗試壓縮 node has just a single child and is not a key, */
if (new->size == 1 && new->iskey == 0) {
trycompress = 1;
h = new;
}
}
} else if (h->size == 1) {
/* 可以嘗試進行壓縮. */
trycompress = 1;
}
壓縮過程可以細化為2步。
①找到可以進行壓縮的第一個元素,之后將所有可進行壓縮的節點進行壓縮。由於raxRowWalk函數已經記錄了查找key的過程,壓縮時只需從記錄棧中不斷彈出元素,即可找到可進行壓縮的第一個元素,過程如下:
raxNode *parent;
while(1) {
parent = raxStackPop(&ts);
if (!parent || parent->iskey ||
(!parent->iscompr && parent->size != 1)) break;
h = parent; //可以進行壓縮
}
raxNode *start = h; /* 可以進行壓縮的第一個節點. */
②找到第一個可壓縮節點后,進行數據壓縮。由於可壓縮的節點都只有一個子節點,壓縮過程只需要讀取每個節點的內容,創建新的節點,並填充新節點的內容即可,此處省略。
8.3.5 遍歷元素
Redis中實現的迭代器為雙向迭代器,可以向前,也可以向后,順序是按照key的字典序排列的。通過rax的結構圖可以看出,如果某個節點為key,則其子節點的key按照字典序比該節點的key大。另外,如果當前節點為非壓縮節點,則其最左側節點的key是其所有子節點的key中最小的。迭代器的主要接口有:
void raxStart(raxIterator *it, rax *rt);
int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len);
int raxNext(raxIterator *it);
int raxPrev(raxIterator *it);
void raxStop(raxIterator *it);
int raxEOF(raxIterator *it);
1.raxStart
raxStart用於初始化raxIterator結構;
void raxStart(raxIterator *it, rax *rt) {
it->flags = RAX_ITER_EOF; /*默認值為迭代結束. */
it->rt = rt;
it->key_len = 0;
it->key = it->key_static_string;
it->key_max = RAX_ITER_STATIC_LEN;
it->data = NULL;
it->node_cb = NULL;
raxStackInit(&it->stack);
}
2.raxSeek
raxStart初始化迭代器后,必須調用raxSeek函數初始化迭代器的位置。
int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len);
·it為raxStart初始化的迭代器。
·op為查找操作符,可以為大於(>)、小於(<)、大於等於(>=)、小於等於(<=)、等於(=)、首個元素(^)、末尾元素($)。
·ele為待查找的key。
·len為ele的長度
查找末尾元素可以直接在Rax中找到最右側的葉子節點,
查找首個元素被轉換為查找大於等於空的操作。 return raxSeek(it,">=",NULL,0);
處理大於、小於、等於等操作主要分為以下幾步。
1) 在rax中查找key:
size_t i = raxLowWalk(it->rt,ele,len,&it->node,NULL,&splitpos,&it->stack);
2)如果key找到,並且op中設置了等於,則操作完成:
if (eq && i == len && (!it->node->iscompr || splitpos == 0) &&
it->node->iskey)
{
/* 找到該key並且op中設置了=. */
if (!raxIteratorAddChars(it,ele,len)) return 0; /* OOM. */
it->data = raxGetData(it->node);
}
3)如果僅僅設置等於,並沒有找到key,則將迭代器的標志位設置為末尾。
4)如果設置了等於但沒有找到key,或者設置了大於或者小於符號,則需要繼續查找,這一步又分為2步。
①首先將查找key的路徑中所有匹配的字符,放入迭代器存儲key的數組中:
//將查找過程的最后一個節點放入路徑棧
if (!raxStackPush(&it->stack,it->node)) return 0;
for (size_t j = 1; j < it->stack.items; j++) {
raxNode *parent = it->stack.stack[j-1];
raxNode *child = it->stack.stack[j];
if (parent->iscompr) {
if (!raxIteratorAddChars(it,parent->data,parent->size))
return 0;
} else {
raxNode **cp = raxNodeFirstChildPtr(parent);
unsigned char *p = parent->data;
while(1) {
raxNode *aux;
memcpy(&aux,cp,sizeof(aux));
if (aux == child) break;
cp++;
p++;
}
if (!raxIteratorAddChars(it,p,1)) return 0;
}
}
raxStackPop(&it->stack); //將最后一個節點從路徑棧中彈出
②根據key的匹配情況以及op的參數,在rax中繼續查找下一個或者上一個key,此時主要利用的是raxIteratorNextStep、raxIteratorPrevStep兩個接口,這兩個接口也是raxNext以及raxPrev的核心處理函數,
3.raxNext&raxPrev
raxNext與raxPrev為逆操作,高度的相似,此處以raxNext為例;
int raxNext(raxIterator *it) {
if (!raxIteratorNextStep(it,0)) {
errno = ENOMEM;
return 0;
}
if (it->flags & RAX_ITER_EOF) {
errno = 0;
return 0;
}
return 1;
}
raxIteratorNextStep函數
int raxIteratorNextStep(raxIterator *it, int noup)
·it 為待移動的迭代器。
·noup 為標志位,可以取0或者1。在raxSeek中,我們有時需要查找比某個key大的下一個key,並且這個待查找的key可能並不存在,此時可能需要將noup設置為1。
raxNext處理過程的重點有3點:
①如果迭代器當前的節點有子節點,則沿着其最左側的節點一直向下,直到找到下一個key;
②如果當前節點沒有子節點,則利用迭代器中的路徑棧,依次彈出其父節點,查找父節點是否有其他比當前key大的子節點(迭代器中已經記錄了當前的key,通過該值可以進行查找);
③注意noup為1時,我們已經假設迭代器當前節點為上一個key的父節點,故而在路徑棧彈出時,第一次需要忽略。
while(1) {
int children = it->node->iscompr ? 1 : it->node->size;
if (!noup && children) {
//當前的節點有子節點
if (!raxStackPush(&it->stack,it->node)) return 0;
raxNode **cp = raxNodeFirstChildPtr(it->node);
if (!raxIteratorAddChars(it,it->node->data,
it->node->iscompr ? it->node->size : 1)) return 0;
memcpy(&it->node,cp,sizeof(it->node));
/* 當前節點為key節點, 直接返回. */
.......
} else {
/*當前節點沒有子節點,找父節點. */
while(1) {
int old_noup = noup;
/* 已經迭代到rax頭部節點,結束 */
......
/* 如果當前節點上沒有子節點,請嘗試父節點的下個子節點。 */
unsigned char prevchild = it->key[it->key_len-1];
if (!noup) { it->node = raxStackPop(&it->stack); }
else { noup = 0; //第一次彈出父節點的操作被跳過 }
int todel = it->node->iscompr ? it->node->size : 1;
raxIteratorDelChars(it,todel);
/* 如果至少有一個*額外的孩子,請嘗試下一個孩子 */
if (!it->node->iscompr && it->node->size > (old_noup ? 0 : 1)) {
raxNode **cp = raxNodeFirstChildPtr(it->node);
int i = 0;
while (i < it->node->size) {
// 遍歷節點所有子節點,找到下一個比當前key大的子節點
if (it->node->data[i] > prevchild) break;
i++; cp++;
}
if (i != it->node->size) {
// 找到了一個子節點比當前key大
raxIteratorAddChars(it,it->node->data+i,1);
if (!raxStackPush(&it->stack,it->node)) return 0;
memcpy(&it->node,cp,sizeof(it->node));
/* 當前節點為key,獲取值后返回,不是key則跳出內部while循環 */
......
}
}
}
}
}
4.raxStop&raxEOF
raxEOF用於標識迭代器迭代結束,raxStop用於結束迭代並釋放相關資源 ;
int raxEOF(raxIterator *it) {
return it->flags & RAX_ITER_EOF;
}
/* Free the iterator. */
void raxStop(raxIterator *it) {
if (it->key != it->key_static_string) rax_free(it->key);
raxStackFree(&it->stack);
}
8.4 Stream結構的實現
Stream可以看作是一個消息鏈表。對一個消息而言,只能新增或者刪除,不能更改消息內容,故而本節主要介紹Stream相關結構的初始化以及增刪查操作。首先介紹消息流的初始化,之后講解消息的增刪查、消費組的增刪查以及消費組中消費者的增刪查,最后,介紹如何遍歷消息流中的所有消息。
8.4.1 初始化
/* Create a new stream data structure. */
stream *streamNew(void) {
stream *s = zmalloc(sizeof(*s));
s->rax = raxNew();
s->length = 0;
s->last_id.ms = 0;
s->last_id.seq = 0;
s->cgroups = NULL; /* 按需創建以在不使用時節省內存 */
return s;
}

圖8-18 Stream結構初始化
8.4.2 添加元素
任何用戶都可以向某個消息流添加消息,或者消費某個消息流中的消息;
1.添加消息
Redis提供了streamAppendItem函數,用於向stream中添加一個新的消息:
int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id)
s 為待插入的數據流:
·argv 為待插入的消息內容,argv[0]為field_1,argv[1]為value_1,依此類推;
·numfields 為待插入的消息的field的總數;
·added_id 不為空,並且插入成功時,將新插入的消息id寫入added_id以供調用方使用;
·use_id 為調用方為該消息定義的消息id,該消息id應該大於s中任意一個消息的id。
增加消息的流程如下。
①獲取rax的最后一個key所在的節點,由於Rax樹是按照消息id的順序存儲的,所以最后一個key節點存儲了上一次插入的消息。
②查看該節點是否可以插入這條新的消息。
③如果該節點已經不能再插入新的消息(listpack為空或者已經達到設定的存儲最大值),在rax中插入新的節點(以消息id為key,新建listpack為value),並初始化新建的listpack;
如果仍然可以插入消息,則對比插入的消息與listpack中的master消息對應的fields是否完全一致,完全一致則表明該消息可以復用master的field。
④將待插入的消息內容插入到新建的listpack中或者原來的rax的最后一個key節點對應的listpack中,這一步主要取決於前2步的結果。
該函數主要是利用了listpack以及rax的相關接口。
2.新增消費組
通過streamCreateCG為消息流新增一個消費組,以消費組的名稱為key,該消費組的streamCG結構為value,放入rax中;
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) {
// 如果當前消息流尚未有消費組,則新建消費組
if (s->cgroups == NULL) s->cgroups = raxNew();
// 查看是否已經有該消費組,有則新建失敗
if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound)
return NULL;
// 新建消費組,並初始化相關變量
streamCG *cg = zmalloc(sizeof(*cg));
cg->pel = raxNew();
cg->consumers = raxNew();
cg->last_id = *id;
// 將該消費組插入到消息流的消費組樹中, 以消費組的名稱為key, 對應的streamCG為value
raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL);
return cg;
}
3.新增消費者
Stream允許為某個消費組增加消費者,但沒有直接提供在某個消費組中創建消費者的接口,而是在查詢某個消費組的消費者時,發現該消費組沒有該消費者時選擇插入該消費者,該接口在8.4.4節進行介紹。
8.4.3 刪除元素
如何從消息流中刪除消息以及限制消息流的大小。如何釋放消費組中的消費者以及如何釋放整個消費組。
1.刪除消息
streamIteratorRemoveEntry函數用於移除某個消息,值得注意的是,該函數通常只是設置待移除消息的標志位為已刪除,並不會將該消息從所在的listpack中刪除。當消息所在的整個listpack的所有消息都已刪除時,則會從rax中釋放該節點。
void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
unsigned char *lp = si->lp;
int64_t aux;
int flags = lpGetInteger(si->lp_flags);
flags |= STREAM_ITEM_FLAG_DELETED;
lp = lpReplaceInteger(lp,&si->lp_flags,flags); // 設置消息的標志位
/* Change the valid/deleted entries count in the master entry. */
unsigned char *p = lpFirst(lp);
aux = lpGetInteger(p);
if (aux == 1) {
/* 當前Listpack只有待刪除消息,可以直接刪除節點. */
lpFree(lp);
raxRemove(si->stream->rax,si->ri.key,si->ri.key_len,NULL);
} else {
/* 修改listpack master enty中的統計信息 */
lp = lpReplaceInteger(lp,&p,aux-1);
p = lpNext(lp,p); /* Seek deleted field. */
aux = lpGetInteger(p);
lp = lpReplaceInteger(lp,&p,aux+1);
/* 查看listpack是否有變化(listpack中元素變化導致的擴容縮容) . */
if (si->lp != lp)
raxInsert(si->stream->rax,si->ri.key,si->ri.key_len,lp,NULL);
}
.....
}
2.裁剪消息流
就是將消息流的大小(未刪除的消息個數,不包含已經刪除的消息)裁剪到給定大小,刪除消息時,按照消息id,從小到大刪除。該接口為streamTrimByLength:
// stream為待裁剪的消息流; maxlen為消息流中最大的消息個數; approx為是否可以存在偏差
int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) ;
對於消息流的裁剪,主要有以下幾點。
1)消息刪除是按照消息id的順序進行刪除的,先刪除最先插入(即消息id最小的)消息。
2)從效率的角度上說,函數調用時最好加上approx標志位。
具體實現過程
1)獲取stream的Rax樹的第一個key所在的節點:
if (s->length <= maxlen) return 0; // stream中的消息個數小於maxlen,不需要刪除
raxIterator ri; // 初始化rax迭代器
raxStart(&ri,s->rax);
raxSeek(&ri,"^",NULL,0);
int64_t deleted = 0; // 統計已經刪除的消息個數
2)遍歷rax樹的節點,不斷刪除消息,直到剩余消息個數滿足要求:
while(s->length > maxlen && raxNext(&ri)) {
// 遍歷Rax樹刪除消息直到滿足要求
}
3)具體刪除消息的部分可以分為如下幾步。
·查看是否需要刪除當前節點,如果刪除該節點存儲的全部消息后仍然未達到要求,則刪除該節點。
·不需要刪除該節點存儲的全部消息,如果函數參數中設置了"approx",則不再進行處理,可以直接返回。 // if (approx) break;
·不需要刪除該節點的全部消息,則遍歷該節點存儲的消息,將部分消息的標志位設置為已經刪除。
刪除當前節點代碼
if (s->length - entries >= maxlen) { // 需要刪除該節點的全部消息
lpFree(lp);
raxRemove(s->rax,ri.key,ri.key_len,NULL); // 調用Rax的接口刪除key
raxSeek(&ri,">=",ri.key,ri.key_len);
s->length -= entries;
deleted += entries;
continue;
}
遍歷當前節點的消息,將其部分消息設置為已刪除 代碼
while(p) { // 遍歷該節點存儲的全部消息,依次刪除,直到消息個數滿足要求
int flags = lpGetInteger(p);
int to_skip;
/* Mark the entry as deleted. */
if (!(flags & STREAM_ITEM_FLAG_DELETED)) {
flags |= STREAM_ITEM_FLAG_DELETED;
lp = lpReplaceInteger(lp,&p,flags);
deleted++;
s->length--;
if (s->length <= maxlen) break; /* Enough entries deleted. */
}
// 移動到下一個消息 ....
}
3.釋放消費組
接口為streamFreeCG,該接口主要完成2部分內容,首先釋放該消費組的pel鏈表,之后釋放消費組中的每個消費者。
/* Free a consumer group and all its associated data. */
void streamFreeCG(streamCG *cg) {
// 刪除該消費組的pel鏈表,釋放時設置回調函數用於釋放每個消息對應的streamNACK結構
raxFreeWithCallback(cg->pel,(void(*)(void*))streamFreeNACK);
// 釋放每個消費者時,需要釋放該消費者對應的streamConsumer結構
raxFreeWithCallback(cg->consumers,(void(*)(void*))streamFreeConsumer);
zfree(cg);
}
/* Free a NACK entry(未確認). */
void streamFreeNACK(streamNACK *na) {
zfree(na);
}
4.釋放消費者
需要注意的是,不需要釋放該消費者的pel,因為該消費者的未確認消息結構streamNACK是與消費組的pel共享的,直接釋放相關內存即可。
void streamFreeConsumer(streamConsumer *sc) {
raxFree(sc->pel); /*此處僅僅是將存儲streamNACK的Rax樹釋放 /
sdsfree(sc->name);
zfree(sc);
}
8.4.4 查找元素
查找消息、查找消費組、查找消費組中的消費者 ;
(1)查找消息
Stream查找消息是通過迭代器實現的,這部分內容我們將在8.4.5節進行介紹。
(2)查找消費組
Redis提供了streamLookupCG接口用於查找Stream的消費組,該接口較為簡單,主要是利用Rax的查詢接口:
streamCG *streamLookupCG(stream *s, sds groupname) {
if (s->cgroups == NULL) return NULL;
streamCG *cg = raxFind(s->cgroups,(unsigned char*)groupname,
sdslen(groupname));
return (cg == raxNotFound) ? NULL : cg;
}
(3)查找消息組中的消費者
streamLookupConsumer接口用於查詢某個消費組中的消費者。消費者不存在時,可以選擇是否將該消費者添加進消費組。
/*在消費組cg中查找消費者name; 如果沒有查到並且create為1時,將該消費者加入消費組 */
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) {
int create = !(flags & SLC_NOCREAT);
int refresh = !(flags & SLC_NOREFRESH);
streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
sdslen(name));
if (consumer == raxNotFound) {
if (!create) return NULL; // 不需要插入
consumer = zmalloc(sizeof(*consumer));
consumer->name = sdsdup(name);
consumer->pel = raxNew();
raxInsert(cg->consumers,(unsigned char*)name,sdslen(name),
consumer,NULL);
}
if (refresh) consumer->seen_time = mstime(); //已經查詢到該消費者,更新時間戳
return consumer;
}
8.4.5 遍歷
Stream的迭代器streamIterator,用於遍歷Stream中的消息,相關的接口主要有以下4個:
// 用於初始化迭代器,值得注意的是,需要指定迭代器的方向。
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
//streamIteratorGetID與streamIteratorGetField配合使用,用於遍歷所有消息的所有field-value
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
// 釋放迭代器的相關資源。
void streamIteratorStop(streamIterator *si);
1)streamIteratorStart接口
接口負責初始化streamIterator。它的具體實現主要是利用Rax提供的迭代器:
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev) {
.....
/* 在基數樹中尋找正確的節點. */
raxStart(&si->ri,s->rax);
if (!rev) { // 正向迭代器
if (start && (start->ms || start->seq)) { // 設置了開始的消息id
raxSeek(&si->ri,"<=",(unsigned char*)si->start_key,
sizeof(si->start_key));
if (raxEOF(&si->ri)) raxSeek(&si->ri,"^",NULL,0);
} else {
raxSeek(&si->ri,"^",NULL,0); // 默認情況為指向Rax樹中第一個key所在的節點
}
} else { // 逆向迭代器
if (end && (end->ms || end->seq)) {
raxSeek(&si->ri,"<=",(unsigned char*)si->end_key,
sizeof(si->end_key));
if (raxEOF(&si->ri)) raxSeek(&si->ri,"$",NULL,0);
} else {
raxSeek(&si->ri,"$",NULL,0);
}
}
si->stream = s;
si->lp = NULL; /* There is no current listpack right now. */
si->lp_ele = NULL; /* Current listpack cursor. */
si->rev = rev; /* Direction, if non-zero reversed, from end to start. */
}
2)streamIteratorGetID接口
該接口負責獲取迭代器當前的消息id,可以分為以下2步。
①查看當前所在的Rax樹的節點是否仍然有其他消息,沒有則根據迭代器方向調用Rax迭代器接口向前或者向后移動。
②在rax key對應的listpack中,查找尚未刪除的消息,此處需要注意streamIterator的指針移動。
3) streamIteratorGetField接口
直接使用該迭代器內部的指針,獲取當前消息的field-value對:
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen) {
if (si->entry_flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
// 當前消息的field內容與master_fields一致,讀取master_field域內容
*fieldptr = lpGet(si->master_fields_ptr,fieldlen,si->field_buf);
si->master_fields_ptr = lpNext(si->lp,si->master_fields_ptr);
} else { // 直接獲取當前的field, 移動lp_ele指針
*fieldptr = lpGet(si->lp_ele,fieldlen,si->field_buf);
si->lp_ele = lpNext(si->lp,si->lp_ele);
}
// 獲取field對應的value,並將迭代器lp_ele指針向后移動
*valueptr = lpGet(si->lp_ele,valuelen,si->value_buf);
si->lp_ele = lpNext(si->lp,si->lp_ele);
}
4) streamIteratorStop接口
主要利用raxIterator接口釋放相關資源:
void streamIteratorStop(streamIterator *si) {
raxStop(&si->ri);
}
8.5 本章小結
本章主要介紹了Stream的底層實現。首先講解了Stream結構需要依賴的兩種數據結構Listpack以及Rax,並詳細介紹了這兩種結構的基本操作。之后,進一步說明了Stream是如何利用這兩種結構的。
