Redis5設計與源碼分析 (第8章 Stream)


Redis在最新的5.0.0版本中也加入了消息隊列的功能,這就是Stream

8.1 Stream簡介

8-1 Redis Stream結構圖

命令: xadd mystream1 * name hb age 20

mystream1Stream的名稱;

*代表由Redis自行生成消息ID

nameage為該消息的field

hb20則為對應的field的值。

每個消息都由以下兩部分組成。

·每個消息有唯一的消息ID,消息ID嚴格遞增。

·消息內容由多個field-value對組成。

 

當消費者不歸屬於任何消費組時,該消費者可以消費消息隊列中的任何消息。

 

消費組特點。

·每個消費組通過組名稱唯一標識,每個消費組都可以消費該消息隊列的全部消息,多個消費組之間相互獨立。

·每個消費組可以有多個消費者,消費者通過名稱唯一標識,消費者之間的關系是競爭關系,也就是說一個消息只能由該組的一個成員消費。

·組內成員消費消息后需要確認,每個消息組都有一個待確認消息隊列pending entry listpel),用以維護該消費組已經消費但沒有確認的消息。

·消費組中的每個成員也有一個待確認消息隊列,維護着該消費者已經消費尚未確認的消息。

RedisStream的底層實現主要使用了listpack以及Rax樹。

8.1.1 Stream底層結構listpack

Redis listpack可用於存儲字符串或者整型。圖8-2為listpack的整體結構圖。

圖8-2 listpack結構圖

listpack由4部分組成:Total Bytes、Num Elem、Entry以及End,

下面介紹各部分的具體含義。

1)Total Bytes為整個listpack的空間大小,占用4個字節,每個listpack最多占用4294967295Bytes。

2)Num Elem為listpack中的元素個數,即Entry的個數,占用2個字節,值得注意的是,這並不意味着listpack最多只能存放65535個Entry,當Entry個數大於等於65535時,Num Elem被設置為65535,此時如果需要獲取元素個數,需要遍歷整個listpack。

3)End為listpack結束標志,占用1個字節,內容為0xFF。

4)Entry為每個具體的元素。其內容可以為字符串或者整型,每個Entry由3部分組成,每部分的具體含義如下。Encode為該元素的編碼方式,占用1個字節,之后是內容字段content,二者緊密相連。表8-1詳細介紹了Encode字段。

表8-1 listpack Encode

backlen記錄了這個Entry的長度(Encode+content),注意並不包括backlen自身的長度,占用的字節數小於等於5。backlen所占用的每個字節的第一個bit用於標識;0代表結束,1代表尚未結束,每個字節只有7 bit有效。值得一提的是,backlen主要用於從后向前遍歷,當我們需要找到當前元素的上一個元素時,我們可以從后向前依次查找每個字節,找到上一個Entry的backlen字段的結束標識,進而可以計算出上一個元素的長度。例如backlen為0000000110001000,代表該元素的長度為00000010001000,即136字節。通過計算即可算出上一個元素的首地址(entry的首地址)。值得注意的是,在整型存儲中,並不實際存儲負數,而是將負數轉換為正數進行存儲。例如,在13位整型存儲中,存儲范圍為[0,8191],其中[0,4095]對應非負的[0,4095](當然,[0,127]將會采用7位無符號整型存儲),而[4096,8191]則對應[-4096,-1]。

 

8.1.2 Stream底層結構Rax簡介

1.概要

前綴樹是字符串查找時,經常使用的一種數據結構,能夠在一個字符串集合中快速查找到某個字符串,下面給出一個簡單示例,如圖8-3所示。

圖8-3 前綴樹示例                 圖8-4 只有一個壓縮節點的Rax

Rax樹通過節點壓縮節省空間,只有一個key(foo)的Rax樹如圖8-4所示,其中中括號代表非壓縮節點雙引號代表壓縮節點(壓縮節點,非壓縮節點下文將詳細介紹),(iskey=1)代表該節點存儲了一個key,

在上述節點的基礎上插入key(foobar)后,Rax樹結構如圖8-5所示。

圖8-5 包含兩個壓縮節點的Rax

含有兩個key(foobar,footer)的Rax樹結構圖如圖8-6所示。

圖8-6 含有foobar、footer兩個key的Rax

對於非壓縮節點,其內部字符是按照字典序排序的,例如上述第二個節點,含有2個字符b、t,二者是按照字典序排列的。

2.關鍵結構體介紹

1)rax結構代表一個Rax樹,它包含3個字段,指向頭節點的指針,元素個數(即key的個數)以及節點個數。

typedef struct rax {

  raxNode *head;

  uint64_t numele;

  uint64_t numnodes;

} rax;

 

2)raxNode代表Rax樹中的一個節點,它的定義如下:

typedef struct raxNode {

uint32_t iskey:1; /*當前節點是否包含一個key,占用1bit*/

uint32_t isnull:1; /* 當前key對應的value是否為空,占用1bit */

uint32_t iscompr:1; /* 當前節點是否為壓縮節點,占用1bit*/

uint32_t size:29; /* 為壓縮節點壓縮的字符串長度或者非壓縮節點的子節點個

數,占用29bit; */

unsigned char data[]; // 中包含填充字段,同時存儲了當前節點包含的字符串以及子

節點的指針、key對應的value指針。

} raxNode;

 

raxNode分為2類,壓縮節點和非壓縮節點,下面分別進行介紹。

1)壓縮節點 。我們假設該節點存儲的內容為字符串ABC,其結構圖如圖8-7所示。

圖8-7 壓縮節點示例圖

·iskey為1且isnull為0時,value-ptr存在,否則value-ptr不存在;

·iscompr為1代表當前節點是壓縮節點,size為3代表存儲了3個字符;

·緊隨size的是該節點存儲的字符串,根據字符串的長度確定是否需要填充字段(填充必要的字節,使得后面的指針地址放到合適的位置上);

·由於是壓縮字段,故而只有最后一個字符有子節點。(c-ptr)

2)非壓縮節點 。我們假設其內容為XY,結構圖如圖8-8所示。

圖8-8 非壓縮節點示例圖

與壓縮節點的不同點在於,每個字符都有一個子節點,值得一提的是,字符個數小於2時,都是非壓縮節點。為了實現Rax樹的遍歷,Redis提供了raxStackraxIterator兩種結構,下面逐一介紹。

①raxStack結構用於存儲從根節點到當前節點的路徑,具體定義如下:

#define RAX_STACK_STATIC_ITEMS 32

typedef struct raxStack {

  void **stack; /*用於記錄路徑,該指針可能指向static_items(路徑較短時)或者堆空間內存; */

  size_t items, maxitems; /* 代表stack指向的空間的已用空間以及最大空間 */

  void *static_items[RAX_STACK_STATIC_ITEMS];

  int oom; /* 代表當前棧是否出現過內存溢出. */

} raxStack;

 

②raxIterator用於遍歷Rax樹中所有的key,該結構的定義如下:

typedef struct raxIterator {

int flags; //當前迭代器標志位,目前有3種,

RAX_ITER_JUST_SEEKED代表當前迭代器指向的元素是剛剛搜索過的,當需要從迭代器中獲取元素時,直接返回當前元素並清空該標志位即可;

RAX_ITER_EOF代表當前迭代器已經遍歷到rax樹的最后一個節點;

RAX_ITER_SAFE代表當前迭代器為安全迭代器,可以進行寫操作。

rax *rt; /* 當前迭代器對應的rax */

unsigned char *key; /*存儲了當前迭代器遍歷到的key,該指針指向

key_static_string或者從堆中申請的內存。*/

void *data; /* 當前key關聯的value值 */

size_t key_len; /* key指向的空間的已用空間 */

size_t key_max; /*key最大空間 */

unsigned char key_static_string[RAX_ITER_STATIC_LEN]; //默認存儲空間,當key比較大時,會使用堆空間內存。

raxNode *node; /* 當前key所在的raxNode */

raxStack stack; /* 記錄了從根節點到當前節點的路徑,用於raxNode的向上遍歷。*/

raxNodeCallback node_cb; /* 為節點的回調函數,通常為空*/

} raxIterator;

 

 

8.1.3 Stream結構

圖8-9 Stream結構示例

每個消息流都包含一個Rax結構。以消息ID為keylistpack結構為value存儲在Rax結構中。每個消息的具體信息存儲在這個listpack中。以下亮點是值得注意的。

1)每個listpack都有一個master entry,該結構中存儲了創建這個listpack時待插入消息的所有field,這主要是考慮同一個消息流,消息內容通常具有相似性,如果后續消息的field與master entry內容相同,則不需要再存儲其field。

2)每個listpack中可能存儲多條消息。

  1. 消息存儲

(1)消息ID

streamID定義如下,以每個消息創建時的時間(1970年1月1號至今的毫秒數)以及序號組成,共128位。

typedef struct streamID {

  uint64_t ms; /* Unix time in milliseconds. */

  uint64_t seq; /* Sequence number. */

} streamID;

 

(2)消息存儲的格式

Stream的消息內容存儲在listpack中, listpack用於存儲字符串或者整型數據,listpack中的單個元素稱為entry,下文介紹的消息存儲格式的每個字段都是一個entry,並不是將整個消息作為字符串儲存的。值得注意的是,每個listpack會存儲多個消息,具體存儲的消息個數是由stream-node-max-bytes(listpack節點最大占用的內存數,默認4096)和stream-node-max-entries(每個listpack最大存儲的元素個數,默認100)決定的。

·每個消息會占用多個listpack entry。

·每個listpack會存儲多個消息。

每個listpack在創建時,會構造該節點的master entry(根據第一個插入的消息構建),其結構如圖8-10所示。

圖8-10 listpack master entry結構

·count 為當前listpack中的所有未刪除的消息個數。

·deleted 為當前listpack中所有已經刪除的消息個數。

·num-fields 為下面的field的個數。

·field-1,…,filed-N 為當前listpack中第一個插入的消息的所有field

域。

·0 為標識位,在從后向前遍歷該listpack的所有消息時使用。

處省略了listpack每個元素存儲時的encoding以及backlen字段;

 

消息的field域與master entry的域完全相同

存儲一個消息時,如果該消息的field域與master entry的域完全相同,則不需要再次存儲field域,此時其消息存儲如圖8-11所示。

圖8-11 消息存儲

 

·flags字段為消息標志位,STREAM_ITEM_FLAG_NONE代表無特殊標識, STREAM_ITEM_FLAG_DELETED代表該消息已經被刪除, STREAM_ITEM_FLAG_SAMEFIELDS代表該消息的field域與master entry完全相同。

·streamID.ms以及streamID.seq為該消息ID減去master entry id之后的值。

·value域存儲了該消息的每個field域對應的內容。

·lp-count為該消息占用listpack的元素個數,也就是3+N。

 

消息的field域與master entry不完全相同

如果該消息的field域與master entry不完全相同,此時消息的存儲如圖8-12所示。

·flags為消息標志位,與上面一致;

·streamID.ms,streamID.seq為該消息ID減去master entry id之后的值;

·num-fields為該消息field域的個數;

·field-value存儲了消息的域值對,也就是消息的具體內容;

·lp-count為該消息占用的listpack的元素個數,也就是4+2N。

圖8-12 消息存儲

 

2.關鍵結構體介紹

  1. stream。

typedef struct stream {

  rax *rax; /* 存儲生產者生產的具體消息,以消息ID為鍵,消息內容為值存儲在rax中,值得注意的是,rax中的一個節點可能存儲多個消息*/

  uint64_t length; /*當前stream中的消息個數(不包括已經刪除的消息)。*/

  streamID last_id; /* 當前stream中最后插入的消息的ID,stream空時,設置為0。. */

  rax *cgroups; /* 存儲了當前stream相關的消費組,rax中: name -> streamCG */

} stream;

 

  1. 消費組。

每個Stream會有多個消費組,每個消費組通過組名稱進行唯一標識,同時關聯一個streamCG結構,該結構定義如下:

typedef struct streamCG {

  streamID last_id; // 該消費組已經確認的最后一個消息的ID

  rax *pel; // 該消費組尚未確認的消息,消息ID為鍵,streamNACK(一個尚未確認的消息)為值;

  rax *consumers; // 該消費組中所有的消費者,消費者的名稱為鍵,streamConsumer(代表一個消費者)為值。

} streamCG;

 

  1. 消費者。

每個消費者通過streamConsumer唯一標識,該結構如下:

typedef struct streamConsumer {

  mstime_t seen_time; /* 該消費者最后一次活躍的時間; */

  sds name; /* C消費者的名稱*/

  rax *pel; /* 消費者尚未確認的消息,以消息ID為鍵,streamNACK為值。 */

} streamConsumer;

 

  1. 未確認消息。

未確認消息(streamNACK)維護了消費組或者消費者尚未確認的消息,值得注意的是,消費組中的pel的元素與每個消費者的pel中的元素是共享的,即該消費組消費了某個消息,這個消息會同時放到消費組以及該消費者的pel隊列中,並且二者是同一個streamNACK結構。

/* Pending (yet not acknowledged) message in a consumer group. */

typedef struct streamNACK {

  mstime_t delivery_time; /* 該消息最后發送給消費方的時間 */

  uint64_t delivery_count; /*為該消息已經發送的次數(組內的成員可以通過xclaim命令獲取某個消息的處理權,該消息已經分給組內另一個消費者但其並沒有確認該消息)。*/

  streamConsumer *consumer; /* 該消息當前歸屬的消費者 */

} streamNACK;

 

5)迭代器。為了遍歷stream中的消息:

typedef struct streamIterator {

stream *stream; /*當前迭代器正在遍歷的消息流 */

streamID master_id; /* 消息內容實際存儲在listpack中,每個listpack都有一個masterentry(也就是第一個插入的消息),master_id為該消息id */

uint64_t master_fields_count; /* master entry中field域的個數. */

unsigned char *master_fields_start; /*master entry field域存儲的首地址*/

unsigned char *master_fields_ptr; /*當listpack中消息的field域與master entry的field域完全相同時,該消息會復用master entry的field域,在我們遍歷該消息時,需要記錄

當前所在的field域的具體位置,master_fields_ptr就是實現這個功能的。 */

int entry_flags; /* 當前遍歷的消息的標志位 */

int rev; /*當前迭代器的方向 */

uint64_t start_key[2]; /* 該迭代器處理的消息ID的范圍 */

uint64_t end_key[2]; /* End key as 128 bit big endian. */

raxIterator ri; /*rax迭代器,用於遍歷rax中所有的key. */

unsigned char *lp; /* 當前listpack指針*/

unsigned char *lp_ele; /* 當前正在遍歷的listpack中的元素, cursor. */

unsigned char *lp_flags; /* Current entry flags pointer.指向當前消息的flag域 */

//用於從listpack讀取數據時的緩存

unsigned char field_buf[LP_INTBUF_SIZE];

unsigned char value_buf[LP_INTBUF_SIZE];

} streamIterator;

 

8.2 Stream底層結構listpack的實現

結構查詢效率低,並且只適合於末尾增刪。考慮到消息流中,通常只需要向其末尾增加消息,故而可以采用該結構;

8.2.1 初始化

圖8-13 listpack初始化

/* Create a new, empty listpack.

* On success the new listpack is returned, otherwise an error is returned. */

unsigned char *lpNew(void) {

// LP_HDR_SIZE = 6,為listpack的頭部

unsigned char *lp = lp_malloc(LP_HDR_SIZE+1); // 申請空間

if (lp == NULL) return NULL;

lpSetTotalBytes(lp,LP_HDR_SIZE+1);

lpSetNumElements(lp,0);

lp[LP_HDR_SIZE] = LP_EOF; // LP_EOF = 0xFF

return lp;

}

 

8.2.2 增刪改操作

listpack提供了2種添加元素的方式:

一種是在任意位置插入元素,一種是在末尾插入元素。在末尾插入元素的底層實現通過調用任意位置插入元素進行,具體實現為lpInsert函數。

listpack的刪除操作被轉換為用空元素替換的操作。

listpack的替換操作(即改操作)的底層實現也是通過lpInsrt函數實現的。

 

lpInsert 函數定義:

unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, unsigned char *p, int where, unsigned char **newp);

·lp 為當前待操作的listpack;

·ele 為待插入的新元素或者待替換的新元素,ele為空時,也就是刪除操作;

·size 為ele的長度;

·p 為待插入的位置或者帶替換的元素位置;

·where 有LP_BEFORE(前插)、LP_AFTER(后插)、LP_REPLACE(替換);

·*newp 用於返回插入的元素、替換的元素、刪除元素的下一個元素。

該函數返回null或者插入的元素,替換的元素,刪除元素的下一個元素。

 

刪除或者替換的主要過程如下:

1)計算需要插入的新元素或者替換舊元素的新元素需要的空間;

2)計算進行插入或者替換后整個listpack所需的空間,通過realloc申請空間;

3)調整新的listpack中的老的元素的位置,為待操作元素預留空間;

4)釋放舊的listpack;

5)在新的listpack中進行插入或替換的操作;

6)更新新的listpack結構頭部的統計信息。

 

8.2.3 遍歷操作

核心思想是利用每個entry的encode或者backlen字段獲取當前entry的長度;

unsigned char *lpFirst(unsigned char *lp); //獲取第一個元素位置

unsigned char *lpLast(unsigned char *lp); //獲取最后一個元素位置

unsigned char *lpNext(unsigned char *lp, unsigned char *p); //下一個元素位置

unsigned char *lpPrev(unsigned char *lp, unsigned char *p); //上一個元素位置

例如:

unsigned char *lpFirst(unsigned char *lp) {

  lp += LP_HDR_SIZE; /* Skip the header. */ LP=+6

  if (lp[0] == LP_EOF) return NULL; //0xFF

  return lp;

}

此處獲取的僅僅是某個entry首地址的指針,如果要讀取當前元素則需要使用下 lpGet接口;

 

8.2.4 讀取元素

lpGet用於獲取p指向的Listpack中真正存儲的元素:

①當元素采用字符串編碼時,返回字符串的第一個元素位置,count為元素個數;

②當采用整型編碼時,若intbuf不為空,則將整型數據轉換為字符串存儲在intbuf中,count為元素個數,並返回intbuf。若intbuf為空,直接將數據存儲在count中,返回null。

unsigned char *lpGet(unsigned char *p, int64_t *count, unsigned char *intbuf)

lpGet的實現較為簡單,主要是利用了每個entryencode字段(p[0];

 

8.3 Stream底層結構Rax的實現

Stream的消息內容存儲在listpack中,但是如果將所有消息都存儲在一個listpack中,則會存在效率問題。例如,查詢某個消息時,需要遍歷整個listpack;插入消息時,需要重新申請一塊很大的空間。為了解決這些問題,Redis Stream通過Rax組織這些listpack ;

8.3.1 初始化

/* 分配一個新的rax並返回其指針。在內存不足時,函數*返回NULL. */

rax *raxNew(void) {

rax *rax = rax_malloc(sizeof(*rax)); //申請空間

if (rax == NULL) return NULL;

rax->numele = 0; //當前元素個數為0

rax->numnodes = 1; //當前節點個數為1

rax->head = raxNewNode(0,0); //構造頭節點

if (rax->head == NULL) {

  rax_free(rax);

  return NULL;

} else {

  return rax;

}

}

圖8-14 Rax初始化

8.3.2 查找元素

/* 獲取key對應的value值, */

//在rax中查找長度為len的字符串s(s為rax中的一個key), 找到返回該key對應的value

void *raxFind(rax *rax, unsigned char *s, size_t len) {

raxNode *h;

int splitpos = 0;

size_t i = raxLowWalk(rax,s,len,&h,NULL,&splitpos,NULL);

if (i != len || (h->iscompr && splitpos != 0) || !h->iskey)

return raxNotFound; //沒有找到這個key

return raxGetData(h); //查到key, 將key對應的value返回

}

 

raxLowWalk為查找key的核心函數 ;

static inline size_t raxLowWalk(rax *rax, unsigned char *s, size_t len, raxNode **stopnode, raxNode ***plink, int *splitpos, raxStack *ts) ;

·rax 為待查找的Rax;

·s 為待查找的key;

·len 為s的長度;

·*stopnode 為 找過程中的終止節點,也就意味着,當rax查找到該節點時,待查找的key已經匹配完成,或者當前節點無法與帶查找的key匹配;

·*plink 用於記錄父節點中指向*stopnode的指針的位置,當*stopnode變化時,也需要修改父節點指向該節點的指針;

·*splitpos 用於記錄壓縮節點的匹配位置;

·當ts 不為空時,會將查找該key的路徑寫入該變量。

該函數返回s的匹配長度,當s!=len時,表示未查找到該key;當s==len時,需要檢驗*stopnode是否為key,並且當*stopnode為壓縮節點時,還需要檢查splitpos是否為0(可能匹配到某個壓縮節點中間的某個元素)。

 

raxLowWalk函數的執行過程可以分為如下幾步。

1)初始化變量。

2)從rax根節點開始查找,知道當前待查找節點無子節點或者s查找完畢。對於每個節點來說,如果為壓縮節點,則需要與s中的字符完全匹配。如果為非壓縮節點,則查找與當前待匹配字符相同的字符。

3)如果當前待匹配節點能夠與s匹配,則移動位置到其子節點,繼續匹配。

raxNode *h = rax->head; // 從根節點開始匹配

raxNode **parentlink = &rax->head;

size_t i = 0; /*當前待匹配字符位置. */

size_t j = 0; /* 當前匹配的節點的位置*/

while(h->size && i < len) { // 當前節點有子節點且尚未走到s字符串的末尾

unsigned char *v = h->data;

if (h->iscompr) { // 壓縮節點是否能夠完全匹配s字符串

for (j = 0; j < h->size && i < len; j++, i++) {

  if (v[j] != s[i]) break;

}

  if (j != h->size) break; // 當前壓縮節點不能完全匹配或者s已經到達末尾

} else {

  /* 非壓縮節點遍歷節點元素, 查找與當前字符匹配的位置*/

  for (j = 0; j < h->size; j++) {

    if (v[j] == s[i]) break;

  }

  if (j == h->size) break; // 未在非壓縮節點找到匹配的字符

  i++; // 非壓縮節點可以匹配, 移動到s的下一個字符

}

// 當前節點能夠匹配s

if (ts) raxStackPush(ts,h); /* Save stack of parent nodes. */

raxNode **children = raxNodeFirstChildPtr(h);

if (h->iscompr) j = 0; /* Compressed node only child is at index 0. */

memcpy(&h,children+j,sizeof(h)); // 將當前節點移動到其第 j個子節點

parentlink = children+j;

j = 0;

}

if (stopnode) *stopnode = h;

if (plink) *plink = parentlink;

if (splitpos && h->iscompr) *splitpos = j;

return i;

 

8.3.3 添加元素

對於已存在的key,rax提供了2種方案,覆蓋或者不覆蓋原有的value,對應的接口分別為raxInsert、raxTryInsert,兩個接口的定義如下:

/* 覆蓋插入*/

int raxInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old) {

  return raxGenericInsert(rax,s,len,data,old,1);

}

/*非覆蓋插入函數:如果存在具有相同鍵的元素,則不更新值,並且返回0。 */

int raxTryInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old) {

  return raxGenericInsert(rax,s,len,data,old,0);

}

 

raxGenericInsert 函數

函數參數與raxInsert基本一致,只是增加overwrite用於標識key存在時是否覆蓋;

1.查找key是否存在

size_t i;

int j = 0; /* 分割位置。如果raxLowWalk()在壓縮節點中停止,則索引" j"表示我們在壓縮節點中停止的字符,即拆分該節點以進行插入的位置 */

raxNode *h, **parentlink;

i = raxLowWalk(rax,s,len,&h,&parentlink,&j,NULL);

 

2.找到key

根據raxLowWalk的返回值,如果當前key已經存在,則直接對該節點進行操作 ;

if (i == len && (!h->iscompr || j == 0 /* not in the middle if j is 0 */)) {

/*查看之前是否存儲value,沒有則申請空間 . */

if (!h->iskey || (h->isnull && overwrite)) {

  h = raxReallocForData(h,data);

  if (h) memcpy(parentlink,&h,sizeof(h));

}

if (h == NULL) {

  errno = ENOMEM;

  return 0;

}

/* 更新存在的key. */

if (h->iskey) {

  if (old) *old = raxGetData(h);

  if (overwrite) raxSetData(h,data);

  errno = 0;

  return 0; /* Element already exists. */

}

  /*否則,將節點設置為鍵set h->iskey. */

  raxSetData(h,data);

  rax->numele++;

  return 1; /* Element inserted. */

}

 

3.key不存在

1)在查找key的過程中,如果最后停留在某個壓縮節點上,此時需要對該壓縮節點進行拆分,具體拆分情況分為以下幾種,以圖8-15為例。

圖8-15 Rax節點拆分

·插入key"ciao",需要將"annibale"節點拆分為2部分:非壓縮節點,壓縮節點。

·插入key"ago",需要將"annibale"節點拆分為3部分:非壓縮節點,非壓縮節點,壓縮節點。

·插入key"annienter",需要將"annibale"節點拆分3部分:壓縮節點,非壓縮節點,壓縮節點。

·插入key"annibaie",需要將"annibale"拆成3部分:壓縮節點,非壓縮節點,非壓縮節點。

·插入key"annibali",需要將"annibale"拆成2部分:壓縮節點,非壓縮節點。

·插入key"a",將"annibale"拆分成2部分:非壓縮節點,壓縮節點。

·插入key"anni",將"annibale"拆分成2個壓縮節點。

總體而言分為2類:

新插入的key是當前節點的一部分: 將壓縮節點進行拆分后直接設置新的key-value即可

新插入的key和壓縮節點的某個位置不匹配: 對 需要在拆分后的相應位置的非壓縮節點中插入新key的相應不匹配字符,之后將新key的剩余部分插入到這個非壓縮節點的子節點中。

 

2)如果查找key完成后,不匹配節點為某個非壓縮節點,或者某個壓縮節點的某個字符不匹配,進行節點拆分后導致的不匹配位置為拆分后創建的非壓縮節點,此時僅僅需要將當前待匹配字符插入到這個非壓縮節點上(注意字符按照字典序排列),並為其創建子節點。之后,將剩余字符放入新建的子節點中即可(如果字符長度過長,需要進行分割)。

 

8.3.4 刪除元素

Rax的刪除操作主要有3個接口,可以刪除rax中的某個key,或者釋放整個rax,在釋放rax時,還可以設置釋放回調函數,在釋放rax的每個key時,都會調用這個回調函數;

// rax中刪除長度為len的s(s代表待刪除的key), *old用於返回該key對應的value

int raxRemove(rax *rax, unsigned char *s, size_t len, void **old);

// 釋放rax

void raxFree(rax *rax);

// 釋放rax,釋放每個key時,都會調用free_callback函數

void raxFreeWithCallback(rax *rax, void (*free_callback)(void*));

rax的釋放操作,采用的是深度優先算法;

 

raxRemove函數

當刪除rax中的某個key-value對時,首先查找key是否存在,不存在則直接返回,存在則需要進行刪除操作。

 

raxNode *h;

raxStack ts;

raxStackInit(&ts);

int splitpos = 0;

size_t i = raxLowWalk(rax,s,len,&h,NULL,&splitpos,&ts);

if (i != len || (h->iscompr && splitpos != 0) || !h->iskey) {

  raxStackFree(&ts); // 沒有找到需要刪除的key

  return 0;

}

 

如果key存在,則需要進行刪除操作,刪除操作完成后,Rax樹可能需要進行壓縮。具體可以分為下面2種情況,此處所說的壓縮是指將某個節點與其子節點壓縮成一個節點,葉子節點沒有子節點,不能進行壓縮。

1)某個節點只有一個子節點,該子節點之前是key,經刪除操作后不再是key,此時可以將該節點與其子節點壓縮,如圖8-16所示,刪除foo后,可以將Rax進行壓縮,壓縮后為"foobar"->[](iskey=1)。

圖8-16 Rax節點壓縮             圖8-17 Rax節點壓縮

2)某個節點有兩個子節點,經過刪除操作后,只剩下一個子節點,如果這個子節點不是key,則可以將該節點與這個子節點壓縮。如圖8-17所示,刪除foobar后,可以將Rax樹進行壓縮,壓縮成"footer"->[](iskey=1)。

 

刪除操作具體可以分為2個階段,刪除階段以及壓縮階段。例如,圖8-17刪除"foobar"時,需要從下向上,刪除可以刪除的節點。圖8-16在刪除"foo"時,則不需要刪除節點。這部分的實現邏輯主要是利用查找key時記錄的匹配路徑,依次向上直到無法刪除為止

 

if (h->size == 0) {

  raxNode *child = NULL;

  while(h != rax->head) {

    child = h;

    rax_free(child);

    rax->numnodes--;

    h = raxStackPop(&ts);

    /* 如果節點為key或者子節點個數不為1,則無法繼續刪除 */

    if (h->iskey || (!h->iscompr && h->size != 1)) break;

  }

  if (child) {

    raxNode *new = raxRemoveChild(h,child);

    if (new != h) {

      raxNode *parent = raxStackPeek(&ts);

      raxNode **parentlink;

      if (parent == NULL) {

        parentlink = &rax->head;

      } else {

        parentlink = raxFindParentLink(parent,h);

      }

      memcpy(parentlink,&new,sizeof(new));

    }

 

    /* 刪除后查看是否可以嘗試壓縮 node has just a single child and is not a key, */

    if (new->size == 1 && new->iskey == 0) {

      trycompress = 1;

      h = new;

    }

  }

  } else if (h->size == 1) {

  /* 可以嘗試進行壓縮. */

  trycompress = 1;

  }

 

壓縮過程可以細化為2步。

①找到可以進行壓縮的第一個元素,之后將所有可進行壓縮的節點進行壓縮。由於raxRowWalk函數已經記錄了查找key的過程,壓縮時只需從記錄棧中不斷彈出元素,即可找到可進行壓縮的第一個元素,過程如下:

raxNode *parent;

while(1) {

  parent = raxStackPop(&ts);

  if (!parent || parent->iskey ||

    (!parent->iscompr && parent->size != 1)) break;

  h = parent; //可以進行壓縮

}

raxNode *start = h; /* 可以進行壓縮的第一個節點. */

 

②找到第一個可壓縮節點后,進行數據壓縮。由於可壓縮的節點都只有一個子節點,壓縮過程只需要讀取每個節點的內容,創建新的節點,並填充新節點的內容即可,此處省略。

 

8.3.5 遍歷元素

Redis中實現的迭代器為雙向迭代器,可以向前,也可以向后,順序是按照key的字典序排列的。通過rax的結構圖可以看出,如果某個節點為key,則其子節點的key按照字典序比該節點的key大。另外,如果當前節點為非壓縮節點,則其最左側節點的key是其所有子節點的key中最小的。迭代器的主要接口有:

void raxStart(raxIterator *it, rax *rt);

int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len);

int raxNext(raxIterator *it);

int raxPrev(raxIterator *it);

void raxStop(raxIterator *it);

int raxEOF(raxIterator *it);

1.raxStart

raxStart用於初始化raxIterator結構;

void raxStart(raxIterator *it, rax *rt) {

it->flags = RAX_ITER_EOF; /*默認值為迭代結束. */

it->rt = rt;

it->key_len = 0;

it->key = it->key_static_string;

it->key_max = RAX_ITER_STATIC_LEN;

it->data = NULL;

it->node_cb = NULL;

raxStackInit(&it->stack);

}

2.raxSeek

raxStart初始化迭代器后,必須調用raxSeek函數初始化迭代器的位置

int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len);

·it為raxStart初始化的迭代器。

·op為查找操作符,可以為大於(>)、小於(<)、大於等於(>=)、小於等於(<=)、等於(=)、首個元素(^)、末尾元素($)。

·ele為待查找的key。

·len為ele的長度

 

查找末尾元素可以直接在Rax中找到最右側的葉子節點,

查找首個元素被轉換為查找大於等於空的操作。 return raxSeek(it,">=",NULL,0);

處理大於、小於、等於等操作主要分為以下幾步。

1) 在rax中查找key:

  size_t i = raxLowWalk(it->rt,ele,len,&it->node,NULL,&splitpos,&it->stack);

2)如果key找到,並且op中設置了等於,則操作完成:

if (eq && i == len && (!it->node->iscompr || splitpos == 0) &&

  it->node->iskey)

{

  /* 找到該key並且op中設置了=. */

  if (!raxIteratorAddChars(it,ele,len)) return 0; /* OOM. */

  it->data = raxGetData(it->node);

}

3)如果僅僅設置等於,並沒有找到key,則將迭代器的標志位設置為末尾。

4)如果設置了等於但沒有找到key,或者設置了大於或者小於符號,則需要繼續查找,這一步又分為2步。

①首先將查找key的路徑中所有匹配的字符,放入迭代器存儲key的數組中:

//將查找過程的最后一個節點放入路徑棧

if (!raxStackPush(&it->stack,it->node)) return 0;

for (size_t j = 1; j < it->stack.items; j++) {

  raxNode *parent = it->stack.stack[j-1];

  raxNode *child = it->stack.stack[j];

  if (parent->iscompr) {

    if (!raxIteratorAddChars(it,parent->data,parent->size))

    return 0;

  } else {

    raxNode **cp = raxNodeFirstChildPtr(parent);

    unsigned char *p = parent->data;

    while(1) {

      raxNode *aux;

      memcpy(&aux,cp,sizeof(aux));

      if (aux == child) break;

      cp++;

      p++;

    }

    if (!raxIteratorAddChars(it,p,1)) return 0;

 }

}

raxStackPop(&it->stack); //將最后一個節點從路徑棧中彈出

 

②根據key的匹配情況以及op的參數,在rax中繼續查找下一個或者上一個key,此時主要利用的是raxIteratorNextStepraxIteratorPrevStep兩個接口,這兩個接口也是raxNext以及raxPrev的核心處理函數,

 

3.raxNext&raxPrev

raxNext與raxPrev為逆操作,高度的相似,此處以raxNext為例;

int raxNext(raxIterator *it) {

if (!raxIteratorNextStep(it,0)) {

  errno = ENOMEM;

  return 0;

}

if (it->flags & RAX_ITER_EOF) {

  errno = 0;

  return 0;

}

return 1;

}

 

raxIteratorNextStep函數

int raxIteratorNextStep(raxIterator *it, int noup)

·it 為待移動的迭代器。

·noup 為標志位,可以取0或者1。在raxSeek中,我們有時需要查找比某個key大的下一個key,並且這個待查找的key可能並不存在,此時可能需要將noup設置為1。

raxNext處理過程的重點有3點:

①如果迭代器當前的節點有子節點,則沿着其最左側的節點一直向下,直到找到下一個key;

②如果當前節點沒有子節點,則利用迭代器中的路徑棧,依次彈出其父節點,查找父節點是否有其他比當前key大的子節點(迭代器中已經記錄了當前的key,通過該值可以進行查找);

③注意noup為1時,我們已經假設迭代器當前節點為上一個key的父節點,故而在路徑棧彈出時,第一次需要忽略。

 

while(1) {

int children = it->node->iscompr ? 1 : it->node->size;

if (!noup && children) {

  //當前的節點有子節點

  if (!raxStackPush(&it->stack,it->node)) return 0;

  raxNode **cp = raxNodeFirstChildPtr(it->node);

  if (!raxIteratorAddChars(it,it->node->data,

  it->node->iscompr ? it->node->size : 1)) return 0;

  memcpy(&it->node,cp,sizeof(it->node));

  /* 當前節點為key節點, 直接返回. */

  .......

} else {

  /*當前節點沒有子節點,找父節點. */

while(1) {

  int old_noup = noup;

  /* 已經迭代到rax頭部節點,結束 */

      ......

  /* 如果當前節點上沒有子節點,請嘗試父節點的下個子節點。 */

  unsigned char prevchild = it->key[it->key_len-1];

  if (!noup) { it->node = raxStackPop(&it->stack); }

       else { noup = 0; //第一次彈出父節點的操作被跳過 }

       int todel = it->node->iscompr ? it->node->size : 1;

  raxIteratorDelChars(it,todel);

 

  /* 如果至少有一個*額外的孩子,請嘗試下一個孩子 */

  if (!it->node->iscompr && it->node->size > (old_noup ? 0 : 1)) {

    raxNode **cp = raxNodeFirstChildPtr(it->node);

    int i = 0;

    while (i < it->node->size) {

         // 遍歷節點所有子節點,找到下一個比當前key大的子節點

      if (it->node->data[i] > prevchild) break;

      i++; cp++;

    }

    if (i != it->node->size) {

      // 找到了一個子節點比當前key大

      raxIteratorAddChars(it,it->node->data+i,1);

      if (!raxStackPush(&it->stack,it->node)) return 0;

      memcpy(&it->node,cp,sizeof(it->node));

      /* 當前節點為key,獲取值后返回,不是key則跳出內部while循環 */

        ......

    }

  }

}

}

}

 

4.raxStop&raxEOF

raxEOF用於標識迭代器迭代結束,raxStop用於結束迭代並釋放相關資源 ;

int raxEOF(raxIterator *it) {

  return it->flags & RAX_ITER_EOF;

}

/* Free the iterator. */

void raxStop(raxIterator *it) {

  if (it->key != it->key_static_string) rax_free(it->key);

  raxStackFree(&it->stack);

}

8.4 Stream結構的實現

Stream可以看作是一個消息鏈表。對一個消息而言,只能新增或者刪除,不能更改消息內容,故而本節主要介紹Stream相關結構的初始化以及增刪查操作。首先介紹消息流的初始化,之后講解消息的增刪查、消費組的增刪查以及消費組中消費者的增刪查,最后,介紹如何遍歷消息流中的所有消息。

8.4.1 初始化

/* Create a new stream data structure. */

stream *streamNew(void) {

  stream *s = zmalloc(sizeof(*s));

s->rax = raxNew();

s->length = 0;

s->last_id.ms = 0;

s->last_id.seq = 0;

s->cgroups = NULL; /* 按需創建以在不使用時節省內存 */

return s;

}

 

圖8-18 Stream結構初始化

 

8.4.2 添加元素

任何用戶都可以向某個消息流添加消息,或者消費某個消息流中的消息;

 

1.添加消息

Redis提供了streamAppendItem函數,用於向stream中添加一個新的消息:

int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id)

s 為待插入的數據流:

·argv 為待插入的消息內容,argv[0]為field_1,argv[1]為value_1,依此類推;

·numfields 為待插入的消息的field的總數;

·added_id 不為空,並且插入成功時,將新插入的消息id寫入added_id以供調用方使用;

·use_id 為調用方為該消息定義的消息id,該消息id應該大於s中任意一個消息的id。

 

增加消息的流程如下。

①獲取rax的最后一個key所在的節點,由於Rax樹是按照消息id的順序存儲的,所以最后一個key節點存儲了上一次插入的消息。

②查看該節點是否可以插入這條新的消息。

③如果該節點已經不能再插入新的消息(listpack為空或者已經達到設定的存儲最大值),在rax中插入新的節點(以消息id為key,新建listpack為value),並初始化新建的listpack;

如果仍然可以插入消息,則對比插入的消息與listpack中的master消息對應的fields是否完全一致,完全一致則表明該消息可以復用master的field。

④將待插入的消息內容插入到新建的listpack中或者原來的rax的最后一個key節點對應的listpack中,這一步主要取決於前2步的結果。

該函數主要是利用了listpack以及rax的相關接口。

 

2.新增消費組

通過streamCreateCG為消息流新增一個消費組,以消費組的名稱為key,該消費組的streamCG結構為value,放入rax中;

streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) {

// 如果當前消息流尚未有消費組,則新建消費組

if (s->cgroups == NULL) s->cgroups = raxNew();

// 查看是否已經有該消費組,有則新建失敗

if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound)

  return NULL;

// 新建消費組,並初始化相關變量

streamCG *cg = zmalloc(sizeof(*cg));

cg->pel = raxNew();

cg->consumers = raxNew();

cg->last_id = *id;

// 將該消費組插入到消息流的消費組樹中, 以消費組的名稱為key, 對應的streamCG為value

raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL);

return cg;

}

3.新增消費者

Stream允許為某個消費組增加消費者,但沒有直接提供在某個消費組中創建消費者的接口,而是在查詢某個消費組的消費者時,發現該消費組沒有該消費者時選擇插入該消費者,該接口在8.4.4節進行介紹。

 

8.4.3 刪除元素

如何從消息流中刪除消息以及限制消息流的大小。如何釋放消費組中的消費者以及如何釋放整個消費組。

1.刪除消息

streamIteratorRemoveEntry函數用於移除某個消息,值得注意的是,該函數通常只是設置待移除消息的標志位為已刪除,並不會將該消息從所在的listpack中刪除。當消息所在的整個listpack的所有消息都已刪除時,則會從rax中釋放該節點。

 

void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {

unsigned char *lp = si->lp;

int64_t aux;

int flags = lpGetInteger(si->lp_flags);

flags |= STREAM_ITEM_FLAG_DELETED;

lp = lpReplaceInteger(lp,&si->lp_flags,flags); // 設置消息的標志位

 

/* Change the valid/deleted entries count in the master entry. */

unsigned char *p = lpFirst(lp);

aux = lpGetInteger(p);

if (aux == 1) {

  /* 當前Listpack只有待刪除消息,可以直接刪除節點. */

  lpFree(lp);

  raxRemove(si->stream->rax,si->ri.key,si->ri.key_len,NULL);

} else {

/* 修改listpack master enty中的統計信息 */

lp = lpReplaceInteger(lp,&p,aux-1);

p = lpNext(lp,p); /* Seek deleted field. */

aux = lpGetInteger(p);

lp = lpReplaceInteger(lp,&p,aux+1);

/* 查看listpack是否有變化(listpack中元素變化導致的擴容縮容) . */

if (si->lp != lp)

raxInsert(si->stream->rax,si->ri.key,si->ri.key_len,lp,NULL);

}

.....

}

2.裁剪消息流

就是將消息流的大小(未刪除的消息個數,不包含已經刪除的消息)裁剪到給定大小,刪除消息時,按照消息id,從小到大刪除。該接口為streamTrimByLength:

// stream為待裁剪的消息流; maxlen為消息流中最大的消息個數; approx為是否可以存在偏差

int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) ;

 

對於消息流的裁剪,主要有以下幾點。

1)消息刪除是按照消息id的順序進行刪除的,先刪除最先插入(即消息id最小的)消息。

2)從效率的角度上說,函數調用時最好加上approx標志位。

具體實現過程

1)獲取stream的Rax樹的第一個key所在的節點:

if (s->length <= maxlen) return 0; // stream中的消息個數小於maxlen,不需要刪除

raxIterator ri; // 初始化rax迭代器

raxStart(&ri,s->rax);

raxSeek(&ri,"^",NULL,0);

int64_t deleted = 0; // 統計已經刪除的消息個數

2)遍歷rax樹的節點,不斷刪除消息,直到剩余消息個數滿足要求:

while(s->length > maxlen && raxNext(&ri)) {

  // 遍歷Rax樹刪除消息直到滿足要求

}

3)具體刪除消息的部分可以分為如下幾步。

·查看是否需要刪除當前節點,如果刪除該節點存儲的全部消息后仍然未達到要求,則刪除該節點。

·不需要刪除該節點存儲的全部消息,如果函數參數中設置了"approx",則不再進行處理,可以直接返回。 // if (approx) break;

·不需要刪除該節點的全部消息,則遍歷該節點存儲的消息,將部分消息的標志位設置為已經刪除。

 

  刪除當前節點代碼

if (s->length - entries >= maxlen) { // 需要刪除該節點的全部消息

lpFree(lp);

raxRemove(s->rax,ri.key,ri.key_len,NULL); // 調用Rax的接口刪除key

raxSeek(&ri,">=",ri.key,ri.key_len);

s->length -= entries;

deleted += entries;

continue;

}

 

  遍歷當前節點的消息,將其部分消息設置為已刪除 代碼

while(p) { // 遍歷該節點存儲的全部消息,依次刪除,直到消息個數滿足要求

int flags = lpGetInteger(p);

int to_skip;

/* Mark the entry as deleted. */

if (!(flags & STREAM_ITEM_FLAG_DELETED)) {

flags |= STREAM_ITEM_FLAG_DELETED;

lp = lpReplaceInteger(lp,&p,flags);

deleted++;

s->length--;

if (s->length <= maxlen) break; /* Enough entries deleted. */

}

// 移動到下一個消息 ....

}

 

3.釋放消費組

接口為streamFreeCG,該接口主要完成2部分內容,首先釋放該消費組的pel鏈表,之后釋放消費組中的每個消費者。

/* Free a consumer group and all its associated data. */

void streamFreeCG(streamCG *cg) {

// 刪除該消費組的pel鏈表,釋放時設置回調函數用於釋放每個消息對應的streamNACK結構

raxFreeWithCallback(cg->pel,(void(*)(void*))streamFreeNACK);

// 釋放每個消費者時,需要釋放該消費者對應的streamConsumer結構

raxFreeWithCallback(cg->consumers,(void(*)(void*))streamFreeConsumer);

zfree(cg);

}

/* Free a NACK entry(未確認). */

void streamFreeNACK(streamNACK *na) {

  zfree(na);

}

 

4.釋放消費者

需要注意的是,不需要釋放該消費者的pel,因為該消費者的未確認消息結構streamNACK是與消費組的pel共享的,直接釋放相關內存即可。

void streamFreeConsumer(streamConsumer *sc) {

  raxFree(sc->pel); /*此處僅僅是將存儲streamNACK的Rax樹釋放 /

  sdsfree(sc->name);

  zfree(sc);

}

 

8.4.4 查找元素

查找消息、查找消費組、查找消費組中的消費者 ;

(1)查找消息

Stream查找消息是通過迭代器實現的,這部分內容我們將在8.4.5節進行介紹。

(2)查找消費組

Redis提供了streamLookupCG接口用於查找Stream的消費組,該接口較為簡單,主要是利用Rax的查詢接口:

streamCG *streamLookupCG(stream *s, sds groupname) {

if (s->cgroups == NULL) return NULL;

streamCG *cg = raxFind(s->cgroups,(unsigned char*)groupname,

sdslen(groupname));

return (cg == raxNotFound) ? NULL : cg;

}

 

(3)查找消息組中的消費者

streamLookupConsumer接口用於查詢某個消費組中的消費者。消費者不存在時,可以選擇是否將該消費者添加進消費組。

/*在消費組cg中查找消費者name; 如果沒有查到並且create為1時,將該消費者加入消費組 */

streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) {

int create = !(flags & SLC_NOCREAT);

int refresh = !(flags & SLC_NOREFRESH);

streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,

sdslen(name));

if (consumer == raxNotFound) {

if (!create) return NULL; // 不需要插入

consumer = zmalloc(sizeof(*consumer));

consumer->name = sdsdup(name);

consumer->pel = raxNew();

raxInsert(cg->consumers,(unsigned char*)name,sdslen(name),

consumer,NULL);

}

if (refresh) consumer->seen_time = mstime(); //已經查詢到該消費者,更新時間戳

return consumer;

}

 

8.4.5 遍歷

Stream的迭代器streamIterator,用於遍歷Stream中的消息,相關的接口主要有以下4個:

// 用於初始化迭代器,值得注意的是,需要指定迭代器的方向。

void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);

//streamIteratorGetID與streamIteratorGetField配合使用,用於遍歷所有消息的所有field-value

int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);

void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);

// 釋放迭代器的相關資源。

void streamIteratorStop(streamIterator *si);

 

1)streamIteratorStart接口

接口負責初始化streamIterator。它的具體實現主要是利用Rax提供的迭代器:

void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev) {

.....

/* 在基數樹中尋找正確的節點. */

raxStart(&si->ri,s->rax);

if (!rev) { // 正向迭代器

if (start && (start->ms || start->seq)) { // 設置了開始的消息id

  raxSeek(&si->ri,"<=",(unsigned char*)si->start_key,

  sizeof(si->start_key));

  if (raxEOF(&si->ri)) raxSeek(&si->ri,"^",NULL,0);

} else {

  raxSeek(&si->ri,"^",NULL,0); // 默認情況為指向Rax樹中第一個key所在的節點

}

} else { // 逆向迭代器

if (end && (end->ms || end->seq)) {

  raxSeek(&si->ri,"<=",(unsigned char*)si->end_key,

  sizeof(si->end_key));

  if (raxEOF(&si->ri)) raxSeek(&si->ri,"$",NULL,0);

} else {

  raxSeek(&si->ri,"$",NULL,0);

}

}

si->stream = s;

si->lp = NULL; /* There is no current listpack right now. */

si->lp_ele = NULL; /* Current listpack cursor. */

si->rev = rev; /* Direction, if non-zero reversed, from end to start. */

}

 

2)streamIteratorGetID接口

該接口負責獲取迭代器當前的消息id,可以分為以下2步。

①查看當前所在的Rax樹的節點是否仍然有其他消息,沒有則根據迭代器方向調用Rax迭代器接口向前或者向后移動。

②在rax key對應的listpack中,查找尚未刪除的消息,此處需要注意streamIterator的指針移動。

3) streamIteratorGetField接口

直接使用該迭代器內部的指針,獲取當前消息的field-value對:

void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen) {

if (si->entry_flags & STREAM_ITEM_FLAG_SAMEFIELDS) {

  // 當前消息的field內容與master_fields一致,讀取master_field域內容

  *fieldptr = lpGet(si->master_fields_ptr,fieldlen,si->field_buf);

  si->master_fields_ptr = lpNext(si->lp,si->master_fields_ptr);

} else { // 直接獲取當前的field, 移動lp_ele指針

  *fieldptr = lpGet(si->lp_ele,fieldlen,si->field_buf);

  si->lp_ele = lpNext(si->lp,si->lp_ele);

}

// 獲取field對應的value,並將迭代器lp_ele指針向后移動

*valueptr = lpGet(si->lp_ele,valuelen,si->value_buf);

si->lp_ele = lpNext(si->lp,si->lp_ele);

}

 4) streamIteratorStop接口

主要利用raxIterator接口釋放相關資源:

void streamIteratorStop(streamIterator *si) {

  raxStop(&si->ri);

}

 

8.5 本章小結

本章主要介紹了Stream的底層實現。首先講解了Stream結構需要依賴的兩種數據結構Listpack以及Rax,並詳細介紹了這兩種結構的基本操作。之后,進一步說明了Stream是如何利用這兩種結構的。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM