前言
在 Redis 的 列表(list) 命令中,有一些命令是阻塞模式的,比如:BRPOP, BLPOP, BRPOPLPUSH, 這些命令都有可能造成客戶端的阻塞。下面總結一下 Redis 實現阻塞和取消阻塞的過程。
阻塞過程
當一個阻塞原語的處理目標為空鍵時, 執行該阻塞原語的客戶端就會被阻塞。有以下步驟:
1:將客戶端的狀態設為“正在阻塞”, 並記錄阻塞這個客戶端的各個鍵,以及阻塞的最長時限(timeout) 等數據;
2:將客戶端的信息記錄到 server.db[i]->blocking_keys 中(其中 i 為客戶端所使用的數據庫號碼);
3:繼續維持客戶端和服務器之間的網絡連接,但不再向客戶端傳送任何信息,造成客戶端阻塞;
note: step2 中 service.db[i]->blocking_keys 是一個字典,鍵是那些造成客戶端阻塞的鍵, 值是一個鏈表,鏈表里保存了所有因這個鍵而被阻塞的客戶端,如下圖所示:
阻塞的取消過程
阻塞的取消有三種方法:
【1】被動脫離:有其它客戶端為造成阻塞的鍵推入了新元素;
【2】主動脫離:到達執行阻塞原語時設定的最大阻塞時間(timeout);
【3】強制脫離:客戶端強制終止和服務端的連接,或者服務器停機;
被動脫離
阻塞因 LPUSH, RPUSH, LINSERT 等添加命令而被取消,這三個添加新元素的命令,在底層都有一個 pushGenericCommand 的函數實現(在下方源碼部分增加的 TODO 標志標識關鍵步驟):

void lpushCommand(redisClient *c) { pushGenericCommand(c,REDIS_HEAD); } void rpushCommand(redisClient *c) { pushGenericCommand(c,REDIS_TAIL); }
下面是 pushGenericCommand 函數的源碼實現(在下方源碼部分增加的 TODO 標志標識關鍵步驟):

/*----------------------------------------------------------------------------- * List Commands *----------------------------------------------------------------------------*/ void pushGenericCommand(redisClient *c, int where) { int j, waiting = 0, pushed = 0; // 取出列表對象 robj *lobj = lookupKeyWrite(c->db,c->argv[1]); // 如果列表對象不存在,那么可能有客戶端在等待這個鍵的出現 int may_have_waiting_clients = (lobj == NULL); if (lobj && lobj->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); return; } // 將列表狀態設置為就緒 if (may_have_waiting_clients) signalListAsReady(c,c->argv[1]); //TODO 1: 如果有 client 可能被阻塞,則新加 readyList 到 service.ready_Keys 的字典中的相應鏈表中 // 遍歷所有輸入值,並將它們添加到列表中 for (j = 2; j < c->argc; j++) { //TODO 2: 此處把新 push 的值加入到對應 key 的列表中 // 編碼值 c->argv[j] = tryObjectEncoding(c->argv[j]); // 如果列表對象不存在,那么創建一個,並關聯到數據庫 if (!lobj) { lobj = createZiplistObject(); dbAdd(c->db,c->argv[1],lobj); } // 將值推入到列表 listTypePush(lobj,c->argv[j],where); pushed++; } // 返回添加的節點數量 addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0)); // 如果至少有一個元素被成功推入,那么執行以下代碼 if (pushed) { char *event = (where == REDIS_HEAD) ? "lpush" : "rpush"; // 發送鍵修改信號 signalModifiedKey(c->db,c->argv[1]); // 發送事件通知 notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,c->argv[1],c->db->id); } server.dirty += pushed; }
pushGenericCommand 函數主要做了兩件事情:
【1】向 readyList 添加到服務器;
【2】將新元素 value 添加到該 key 中
到此處為止,被該 key 阻塞的客戶端還沒有任何一個被解除阻塞狀態,為了做到這一點,Redis 的主進程在執行完 pushGenericCommand 函數后,會繼續調用 handleClientsBlockedOnLists 函數,該函數的源碼如下(在下方源碼部分增加的 TODO 標志標識關鍵步驟):

/* This function should be called by Redis every time a single command, * a MULTI/EXEC block, or a Lua script, terminated its execution after * being called by a client. * * 這個函數會在 Redis 每次執行完單個命令、事務塊或 Lua 腳本之后調用。 //TODO 0: NOTICE * * All the keys with at least one client blocked that received at least * one new element via some PUSH operation are accumulated into * the server.ready_keys list. This function will run the list and will * serve clients accordingly. Note that the function will iterate again and * again as a result of serving BRPOPLPUSH we can have new blocking clients * to serve because of the PUSH side of BRPOPLPUSH. * * 對所有被阻塞在某個客戶端的 key 來說,只要這個 key 被執行了某種 PUSH 操作 * 那么這個 key 就會被放到 serve.ready_keys 去。 * * 這個函數會遍歷整個 serve.ready_keys 鏈表, * 並將里面的 key 的元素彈出給被阻塞客戶端, * 從而解除客戶端的阻塞狀態。 * * 函數會一次又一次地進行迭代, * 因此它在執行 BRPOPLPUSH 命令的情況下也可以正常獲取到正確的新被阻塞客戶端。 */ void handleClientsBlockedOnLists(void) { // 遍歷整個 ready_keys 鏈表 while(listLength(server.ready_keys) != 0) { list *l; /* Point server.ready_keys to a fresh list and save the current one * locally. This way as we run the old list we are free to call * signalListAsReady() that may push new elements in server.ready_keys * when handling clients blocked into BRPOPLPUSH. */ // 備份舊的 ready_keys ,再給服務器端賦值一個新的 l = server.ready_keys; server.ready_keys = listCreate(); while(listLength(l) != 0) { //TODO 1: 不斷取出 server.ready_keys 的所有元素(可能對應多個不同的阻塞 Key) // 取出 ready_keys 中的首個鏈表節點 listNode *ln = listFirst(l); // 指向 readyList 結構 readyList *rl = ln->value; /* First of all remove this key from db->ready_keys so that * we can safely call signalListAsReady() against this key. */ // 從 ready_keys 中移除就緒的 key dictDelete(rl->db->ready_keys,rl->key); /* If the key exists and it's a list, serve blocked clients * with data. */ // 獲取鍵對象,這個對象應該是非空的,並且是列表 robj *o = lookupKeyWrite(rl->db,rl->key); if (o != NULL && o->type == REDIS_LIST) { dictEntry *de; /* We serve clients in the same order they blocked for * this key, from the first blocked to the last. */ // 取出所有被這個 key 阻塞的客戶端 de = dictFind(rl->db->blocking_keys,rl->key); if (de) { list *clients = dictGetVal(de); int numclients = listLength(clients); while(numclients--) { //TODO 2: 不斷取出因為等待該 key 被阻塞的客戶端 // 取出客戶端 listNode *clientnode = listFirst(clients); redisClient *receiver = clientnode->value; // 設置彈出的目標對象(只在 BRPOPLPUSH 時使用) robj *dstkey = receiver->bpop.target; // 從列表中彈出元素 // 彈出的位置取決於是執行 BLPOP 還是 BRPOP 或者 BRPOPLPUSH int where = (receiver->lastcmd && receiver->lastcmd->proc == blpopCommand) ? REDIS_HEAD : REDIS_TAIL; robj *value = listTypePop(o,where); //TODO 3: 從該 key 已添加的元素中 pop 出第一個元素,並用於阻塞客戶端的返回值 // 還有元素可彈出(非 NULL) if (value) { /* Protect receiver->bpop.target, that will be * freed by the next unblockClient() * call. */ if (dstkey) incrRefCount(dstkey); // 取消客戶端的阻塞狀態 unblockClient(receiver); //TODO 4: 從 service.blocking_keys 中移除對應阻塞的客戶端 // 將值 value 推入到造成客戶度 receiver 阻塞的 key 上 if (serveClientBlockedOnList(receiver, rl->key,dstkey,rl->db,value, where) == REDIS_ERR) { /* If we failed serving the client we need * to also undo the POP operation. */ listTypePush(o,value,where); } if (dstkey) decrRefCount(dstkey); decrRefCount(value); } else { // 如果執行到這里,表示還有至少一個客戶端被鍵所阻塞 // 這些客戶端要等待對鍵的下次 PUSH break; } } } // 如果列表元素已經為空,那么從數據庫中將它刪除 if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key); /* We don't call signalModifiedKey() as it was already called * when an element was pushed on the list. */ } /* Free this item. */ decrRefCount(rl->key); zfree(rl); listDelNode(l,ln); } listRelease(l); /* We have the new list on place at this point. */ } }
handleClientsBlockedOnLists 函數主要執行如下操作:
【1】如果 service.ready_keys 不為空,那么彈出該鏈表的表頭元素,並取出其中的 readyList 值;
【2】根據 readyList 值中保存的 key 和 db, 在 service.blocking_keys 中查找所有因為該 key 而被阻塞的客戶端(以鏈表形式保存);
【3】如果 Key 不為空,那么從 Key 的列表中彈出一個元素,並獲取客戶端鏈表的第一個客戶端,然后將被彈出元素作為被阻塞的客戶端的返回值;
【4】根據 readyList 結構的屬性,刪除 service.blocking_keys 中相應的客戶端數據,取消客戶端的阻塞狀態;
【5】繼續執行步驟 【3】和 【4】,知道 key 沒有元素可彈出,或者因為 key 而阻塞的客戶端都取消阻塞為止;
【6】繼續執行步驟 【1】,直到 ready_keys 字典所有鏈表里的所有 readyList 結構都被處理完為止;
主動脫離
阻塞因超過最大等待時間而被取消。當客戶端被阻塞時,所有造成它阻塞的鍵,以及阻塞的最長時限都會被記錄在客戶端里面,並且蓋客戶端的狀態會被設置為”正在阻塞“。每次 Redis 服務器常規操作函數(redis.c/serverCron) 執行時,程序都會檢查所有連接到服務器的客戶端,查看哪些處於”正在阻塞“狀態的客戶端時限是否已經過期,如果是的話,就給客戶端返回一個空白回復,然后撤銷對客戶端的阻塞。下面是相關源碼:

void clientsCron(void) { /* Make sure to process at least 1/(server.hz*10) of clients per call. * * 這個函數每次執行都會處理至少 1/server.hz*10 個客戶端。 * * Since this function is called server.hz times per second we are sure that * in the worst case we process all the clients in 10 seconds. * * 因為這個函數每秒鍾會調用 server.hz 次, * 所以在最壞情況下,服務器需要使用 10 秒鍾來遍歷所有客戶端。 * * In normal conditions (a reasonable number of clients) we process * all the clients in a shorter time. * * 在一般情況下,遍歷所有客戶端所需的時間會比實際中短很多。 */ // 客戶端數量 int numclients = listLength(server.clients); // 要處理的客戶端數量 int iterations = numclients / (server.hz * 10); // 至少要處理 50 個客戶端 if (iterations < 50) iterations = (numclients < 50) ? numclients : 50; while (listLength(server.clients) && iterations--) { redisClient *c; listNode *head; /* Rotate the list, take the current head, process. * This way if the client must be removed from the list it's the * first element and we don't incur into O(N) computation. */ // 翻轉列表,然后取出表頭元素,這樣一來上一個被處理的客戶端會被放到表頭 // 另外,如果程序要刪除當前客戶端,那么只要刪除表頭元素就可以了 listRotate(server.clients); head = listFirst(server.clients); c = listNodeValue(head); /* The following functions do different service checks on the client. * The protocol is that they return non-zero if the client was * terminated. */ // 檢查客戶端,並在客戶端超時時關閉它 if (clientsCronHandleTimeout(c)) continue; // 根據情況,縮小客戶端查詢緩沖區的大小 if (clientsCronResizeQueryBuffer(c)) continue; } }
阻塞的取消策略
當程序添加一個新的被阻塞客戶端到 server.blocking_keys 字典的鏈表中時,他將客戶端放在鏈表的最后,而當 handleClientsBlockedOnLists 取消客戶端的阻塞時候,它從鏈表的最前面開始取消阻塞;這個鏈表形成了一個 FIFO 隊列,最先被阻塞的客戶端總是最先脫離阻塞狀態,Redis 文檔稱這種模式為先阻塞先服務(FBFS, first-block-first-server)。
參考內容:
[1]:The Design and Implementation of Redis 黃健宏