Redis 列表阻塞命令的實現


前言 

  在 Redis 的 列表(list) 命令中,有一些命令是阻塞模式的,比如:BRPOP,  BLPOP, BRPOPLPUSH, 這些命令都有可能造成客戶端的阻塞。下面總結一下 Redis 實現阻塞和取消阻塞的過程。

阻塞過程

  當一個阻塞原語的處理目標為空鍵時, 執行該阻塞原語的客戶端就會被阻塞。有以下步驟:

1:將客戶端的狀態設為“正在阻塞”, 並記錄阻塞這個客戶端的各個鍵,以及阻塞的最長時限(timeout) 等數據;

2:將客戶端的信息記錄到 server.db[i]->blocking_keys 中(其中 i 為客戶端所使用的數據庫號碼);

3:繼續維持客戶端和服務器之間的網絡連接,但不再向客戶端傳送任何信息,造成客戶端阻塞;

note: step2 中 service.db[i]->blocking_keys 是一個字典,鍵是那些造成客戶端阻塞的鍵, 值是一個鏈表,鏈表里保存了所有因這個鍵而被阻塞的客戶端,如下圖所示:

 

 

 阻塞的取消過程

  阻塞的取消有三種方法:

    【1】被動脫離:有其它客戶端為造成阻塞的鍵推入了新元素;

    【2】主動脫離:到達執行阻塞原語時設定的最大阻塞時間(timeout);

    【3】強制脫離:客戶端強制終止和服務端的連接,或者服務器停機;

 

被動脫離

    阻塞因 LPUSH, RPUSH, LINSERT 等添加命令而被取消,這三個添加新元素的命令,在底層都有一個 pushGenericCommand 的函數實現(在下方源碼部分增加的 TODO 標志標識關鍵步驟):

void lpushCommand(redisClient *c) {
    pushGenericCommand(c,REDIS_HEAD);
}

void rpushCommand(redisClient *c) {
    pushGenericCommand(c,REDIS_TAIL);
}
調用 pushGenericCommand

  下面是 pushGenericCommand  函數的源碼實現(在下方源碼部分增加的 TODO 標志標識關鍵步驟):

/*-----------------------------------------------------------------------------
 * List Commands
 *----------------------------------------------------------------------------*/

void pushGenericCommand(redisClient *c, int where) {

    int j, waiting = 0, pushed = 0;

    // 取出列表對象
    robj *lobj = lookupKeyWrite(c->db,c->argv[1]);

    // 如果列表對象不存在,那么可能有客戶端在等待這個鍵的出現
    int may_have_waiting_clients = (lobj == NULL);

    if (lobj && lobj->type != REDIS_LIST) {
        addReply(c,shared.wrongtypeerr);
        return;
    }

    // 將列表狀態設置為就緒
    if (may_have_waiting_clients) signalListAsReady(c,c->argv[1]);  //TODO 1: 如果有 client 可能被阻塞,則新加 readyList 到 service.ready_Keys 的字典中的相應鏈表中

    // 遍歷所有輸入值,並將它們添加到列表中
    for (j = 2; j < c->argc; j++) {               //TODO 2: 此處把新 push 的值加入到對應 key 的列表中

        // 編碼值
        c->argv[j] = tryObjectEncoding(c->argv[j]);

        // 如果列表對象不存在,那么創建一個,並關聯到數據庫
        if (!lobj) {
            lobj = createZiplistObject();
            dbAdd(c->db,c->argv[1],lobj);
        }

        // 將值推入到列表
        listTypePush(lobj,c->argv[j],where);

        pushed++;
    }

    // 返回添加的節點數量
    addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));

    // 如果至少有一個元素被成功推入,那么執行以下代碼
    if (pushed) {
        char *event = (where == REDIS_HEAD) ? "lpush" : "rpush";

        // 發送鍵修改信號
        signalModifiedKey(c->db,c->argv[1]);

        // 發送事件通知
        notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,c->argv[1],c->db->id);
    }

    server.dirty += pushed;
}
pushGenericCommand

pushGenericCommand 函數主要做了兩件事情:

【1】向 readyList 添加到服務器;

【2】將新元素 value 添加到該 key 中

到此處為止,被該 key 阻塞的客戶端還沒有任何一個被解除阻塞狀態,為了做到這一點,Redis 的主進程在執行完 pushGenericCommand 函數后,會繼續調用 handleClientsBlockedOnLists  函數,該函數的源碼如下(在下方源碼部分增加的 TODO 標志標識關鍵步驟):

/* This function should be called by Redis every time a single command,
 * a MULTI/EXEC block, or a Lua script, terminated its execution after
 * being called by a client.
 *
 * 這個函數會在 Redis 每次執行完單個命令、事務塊或 Lua 腳本之后調用。  //TODO 0:  NOTICE
 *
 * All the keys with at least one client blocked that received at least
 * one new element via some PUSH operation are accumulated into
 * the server.ready_keys list. This function will run the list and will
 * serve clients accordingly. Note that the function will iterate again and
 * again as a result of serving BRPOPLPUSH we can have new blocking clients
 * to serve because of the PUSH side of BRPOPLPUSH. 
 *
 * 對所有被阻塞在某個客戶端的 key 來說,只要這個 key 被執行了某種 PUSH 操作
 * 那么這個 key 就會被放到 serve.ready_keys 去。
 * 
 * 這個函數會遍歷整個 serve.ready_keys 鏈表,
 * 並將里面的 key 的元素彈出給被阻塞客戶端,
 * 從而解除客戶端的阻塞狀態。
 *
 * 函數會一次又一次地進行迭代,
 * 因此它在執行 BRPOPLPUSH 命令的情況下也可以正常獲取到正確的新被阻塞客戶端。
 */
void handleClientsBlockedOnLists(void) {

    // 遍歷整個 ready_keys 鏈表
    while(listLength(server.ready_keys) != 0) {
        list *l;

        /* Point server.ready_keys to a fresh list and save the current one
         * locally. This way as we run the old list we are free to call
         * signalListAsReady() that may push new elements in server.ready_keys
         * when handling clients blocked into BRPOPLPUSH. */
        // 備份舊的 ready_keys ,再給服務器端賦值一個新的
        l = server.ready_keys;
        server.ready_keys = listCreate();

        while(listLength(l) != 0) {                 //TODO 1: 不斷取出 server.ready_keys 的所有元素(可能對應多個不同的阻塞 Key)

            // 取出 ready_keys 中的首個鏈表節點
            listNode *ln = listFirst(l);

            // 指向 readyList 結構
            readyList *rl = ln->value;

            /* First of all remove this key from db->ready_keys so that
             * we can safely call signalListAsReady() against this key. */
            // 從 ready_keys 中移除就緒的 key
            dictDelete(rl->db->ready_keys,rl->key);

            /* If the key exists and it's a list, serve blocked clients
             * with data. */
            // 獲取鍵對象,這個對象應該是非空的,並且是列表
            robj *o = lookupKeyWrite(rl->db,rl->key);
            if (o != NULL && o->type == REDIS_LIST) {
                dictEntry *de;

                /* We serve clients in the same order they blocked for
                 * this key, from the first blocked to the last. */
                // 取出所有被這個 key 阻塞的客戶端
                de = dictFind(rl->db->blocking_keys,rl->key);
                if (de) {
                    list *clients = dictGetVal(de);
                    int numclients = listLength(clients);

                    while(numclients--) {         //TODO 2: 不斷取出因為等待該 key 被阻塞的客戶端
                        // 取出客戶端
                        listNode *clientnode = listFirst(clients);
                        redisClient *receiver = clientnode->value;

                        // 設置彈出的目標對象(只在 BRPOPLPUSH 時使用)
                        robj *dstkey = receiver->bpop.target;

                        // 從列表中彈出元素
                        // 彈出的位置取決於是執行 BLPOP 還是 BRPOP 或者 BRPOPLPUSH
                        int where = (receiver->lastcmd &&
                                     receiver->lastcmd->proc == blpopCommand) ?
                                    REDIS_HEAD : REDIS_TAIL;
                        robj *value = listTypePop(o,where);    //TODO 3: 從該 key 已添加的元素中 pop 出第一個元素,並用於阻塞客戶端的返回值

                        // 還有元素可彈出(非 NULL)
                        if (value) {
                            /* Protect receiver->bpop.target, that will be
                             * freed by the next unblockClient()
                             * call. */
                            if (dstkey) incrRefCount(dstkey);

                            // 取消客戶端的阻塞狀態
                            unblockClient(receiver);        //TODO 4: 從 service.blocking_keys 中移除對應阻塞的客戶端

                            // 將值 value 推入到造成客戶度 receiver 阻塞的 key 上
                            if (serveClientBlockedOnList(receiver,
                                rl->key,dstkey,rl->db,value,
                                where) == REDIS_ERR)
                            {
                                /* If we failed serving the client we need
                                 * to also undo the POP operation. */
                                    listTypePush(o,value,where);
                            }

                            if (dstkey) decrRefCount(dstkey);
                            decrRefCount(value);
                        } else {
                            // 如果執行到這里,表示還有至少一個客戶端被鍵所阻塞
                            // 這些客戶端要等待對鍵的下次 PUSH
                            break;
                        }
                    }
                }
                
                // 如果列表元素已經為空,那么從數據庫中將它刪除
                if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key);
                /* We don't call signalModifiedKey() as it was already called
                 * when an element was pushed on the list. */
            }

            /* Free this item. */
            decrRefCount(rl->key);
            zfree(rl);
            listDelNode(l,ln);
        }
        listRelease(l); /* We have the new list on place at this point. */
    }
}
handleClientsBlockedOnLists

handleClientsBlockedOnLists  函數主要執行如下操作:

【1】如果 service.ready_keys 不為空,那么彈出該鏈表的表頭元素,並取出其中的 readyList 值;

【2】根據 readyList 值中保存的 key 和 db, 在 service.blocking_keys 中查找所有因為該 key 而被阻塞的客戶端(以鏈表形式保存);

【3】如果 Key 不為空,那么從 Key 的列表中彈出一個元素,並獲取客戶端鏈表的第一個客戶端,然后將被彈出元素作為被阻塞的客戶端的返回值;

【4】根據 readyList 結構的屬性,刪除 service.blocking_keys 中相應的客戶端數據,取消客戶端的阻塞狀態;

【5】繼續執行步驟 【3】和 【4】,知道 key 沒有元素可彈出,或者因為 key 而阻塞的客戶端都取消阻塞為止;

【6】繼續執行步驟 【1】,直到 ready_keys 字典所有鏈表里的所有 readyList 結構都被處理完為止;

 

主動脫離

    阻塞因超過最大等待時間而被取消。當客戶端被阻塞時,所有造成它阻塞的鍵,以及阻塞的最長時限都會被記錄在客戶端里面,並且蓋客戶端的狀態會被設置為”正在阻塞“。每次 Redis 服務器常規操作函數(redis.c/serverCron) 執行時,程序都會檢查所有連接到服務器的客戶端,查看哪些處於”正在阻塞“狀態的客戶端時限是否已經過期,如果是的話,就給客戶端返回一個空白回復,然后撤銷對客戶端的阻塞。下面是相關源碼:

void clientsCron(void) {
    /* Make sure to process at least 1/(server.hz*10) of clients per call.
     *
     * 這個函數每次執行都會處理至少 1/server.hz*10 個客戶端。
     *
     * Since this function is called server.hz times per second we are sure that
     * in the worst case we process all the clients in 10 seconds.
     *
     * 因為這個函數每秒鍾會調用 server.hz 次,
     * 所以在最壞情況下,服務器需要使用 10 秒鍾來遍歷所有客戶端。
     *
     * In normal conditions (a reasonable number of clients) we process
     * all the clients in a shorter time. 
     *
     * 在一般情況下,遍歷所有客戶端所需的時間會比實際中短很多。
     */

    // 客戶端數量
    int numclients = listLength(server.clients);

    // 要處理的客戶端數量
    int iterations = numclients / (server.hz * 10);

    // 至少要處理 50 個客戶端
    if (iterations < 50)
        iterations = (numclients < 50) ? numclients : 50;

    while (listLength(server.clients) && iterations--) {
        redisClient *c;
        listNode *head;

        /* Rotate the list, take the current head, process.
         * This way if the client must be removed from the list it's the
         * first element and we don't incur into O(N) computation. */
        // 翻轉列表,然后取出表頭元素,這樣一來上一個被處理的客戶端會被放到表頭
        // 另外,如果程序要刪除當前客戶端,那么只要刪除表頭元素就可以了
        listRotate(server.clients);
        head = listFirst(server.clients);
        c = listNodeValue(head);
        /* The following functions do different service checks on the client.
         * The protocol is that they return non-zero if the client was
         * terminated. */
        // 檢查客戶端,並在客戶端超時時關閉它
        if (clientsCronHandleTimeout(c)) continue;
        // 根據情況,縮小客戶端查詢緩沖區的大小
        if (clientsCronResizeQueryBuffer(c)) continue;
    }
}
clientsCron

 

阻塞的取消策略

    當程序添加一個新的被阻塞客戶端到 server.blocking_keys 字典的鏈表中時,他將客戶端放在鏈表的最后,而當 handleClientsBlockedOnLists  取消客戶端的阻塞時候,它從鏈表的最前面開始取消阻塞;這個鏈表形成了一個 FIFO 隊列,最先被阻塞的客戶端總是最先脫離阻塞狀態,Redis 文檔稱這種模式為先阻塞先服務(FBFS, first-block-first-server)。

 

參考內容:

  [1]:The Design and Implementation of Redis  黃健宏

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM