redis學習筆記(六): processCommand


在看它的command處理之前,先說一下redis中C/S交互的流程(不知道怎么用圖來表示流程,先碼在這里):

1. 在initServer中調用aeCreateFileEvent給tcp listen socket注冊 acceptTcpHandler 做為rfileProc
2. 有客戶端連接過來時,在aeApiPoll中,listen套接字上來了可讀事件,調用其注冊的rfileProc,也就是acceptTcpHandler
3. 在acceptTcpHandler的處理當中會調用createClient,它除了分配新的redisClient結構之外,還會調用aeCreateFileEvent為新的fd注冊可讀事件上的rfileProc: readQueryFromClient
4. 當客戶端發過來請求時,aeApiPoll中,client套接字上來了可讀事件,調用rfileProc也就是readQueryFromClient讀取client的請求
5. 在處理完請求之后(一般是執行相應的命令),將需要返回給client的內容放在redisClient結構中的buf成員中,然后將它對應的fd以可寫事件加入poll中,對應的callback為sendReplyToClient
6. 下一次aeApiPoll時,可寫事件就緒,就會調用sendReplyToClient發回給客戶
所以命令的接收跟命令的執行(queue也算是執行)是按順序執行的,給client的回復是異步做的
所以對於同一個client,盡可能多地收集回復再一次性發出去能減少網絡I/O的次數

命令交互的主要入口函數是readQueryFromClient,它從套接字中讀取內容、再處理成argc/argv的格式、最后去執行:

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    redisClient *c = (redisClient*) privdata;
    int nread, readlen;
    size_t qblen;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);

    /* 設置server.current_client,沒有多線程的影響? */
    server.current_client = c;
    readlen = REDIS_IOBUF_LEN;
    /* If this is a multi bulk request, and we are processing a bulk reply
     * that is large enough, try to maximize the probability that the query
     * buffer contains exactly the SDS string representing the object, even
     * at the risk of requiring more read(2) calls. This way the function
     * processMultiBulkBuffer() can avoid copying buffers to create the
     * Redis Object representing the argument. */
    /* client第一次來請求時,reqtype是0,不會進下面這個判斷
     * 后面的processInputBuffer中會把reqtype設置為REDIS_REQ_MULTIBULK,所以這個函數會進來多次?從調用流程上來看,有可能是因為內容太多,一次read沒能完全讀取完?
     */
    if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
        && c->bulklen >= REDIS_MBULK_BIG_ARG)
    {
        int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);

        if (remaining < readlen) readlen = remaining;
    }

    /* 初始情況長度為0 */
    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    /* 接收buf的大小最大為REDIS_IOBUF_LEN */
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    /* 讀取數據 */
    nread = read(fd, c->querybuf+qblen, readlen);
    if (nread == -1) {
        if (errno == EAGAIN) {
            nread = 0;
        } else {
            redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
            freeClient(c);
            return;
        }
    } else if (nread == 0) {
        redisLog(REDIS_VERBOSE, "Client closed connection");
        freeClient(c);
        return;
    }
    if (nread) {
        /* 更新querybuf的len, free字段,並在內容的最后加'\0' */
        sdsIncrLen(c->querybuf,nread);
        /* 記錄最后一次交互的時間 */
        c->lastinteraction = server.unixtime;
        /* master? */
        if (c->flags & REDIS_MASTER) c->reploff += nread;
        server.stat_net_input_bytes += nread;
    } else {
        server.current_client = NULL;
        return;
    }
    /* 如果請求內容的長度超過了最大長度的限制,記錄client info和請求長度,釋放client並返回 */
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();

        bytes = sdscatrepr(bytes,c->querybuf,64);
        redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
        sdsfree(ci);
        sdsfree(bytes);
        freeClient(c);
        return;
    }
    processInputBuffer(c);
    server.current_client = NULL;
}

它最后調用processInputBuffer對接收到的內容進行處理

void processInputBuffer(redisClient *c) {
    /* Keep processing while there is something in the input buffer */
    /* 從這個循環來看,querybuf里如果是'*'開頭的multi-bulk格式的內容,則一定會解析完成?不存在沒有讀取完整數據的情況? */
    while(sdslen(c->querybuf)) {
        /* Return if clients are paused. */
        if (!(c->flags & REDIS_SLAVE) && clientsArePaused()) return;

        /* Immediately abort if the client is in the middle of something. */
        if (c->flags & REDIS_BLOCKED) return;

        /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
         * written to the client. Make sure to not let the reply grow after
         * this flag has been set (i.e. don't process more commands). */
        if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;

        /* Determine request type when unknown. */
        /* 只要這里設置過一次reqtype,除非調用resetClient,否則不會再進入這個判斷里面 
         * 但是,不是以'*'開頭的命令格式會是什么樣的命令?
         */
        if (!c->reqtype) {            
            if (c->querybuf[0] == '*') {
                c->reqtype = REDIS_REQ_MULTIBULK;
            } else {
                c->reqtype = REDIS_REQ_INLINE;
            }
        }

        if (c->reqtype == REDIS_REQ_INLINE) {
            if (processInlineBuffer(c) != REDIS_OK) break;
        } else if (c->reqtype == REDIS_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != REDIS_OK) break;
        } else {
            redisPanic("Unknown request type");
        }

        /* Multibulk processing could see a <= 0 length. */
        /* */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            /* Only reset the client when the command was executed. */
            if (processCommand(c) == REDIS_OK)
                resetClient(c);
        }
    }
}

processMultibulkBuffer就是主要的解析過程。對於set a 1這條命令,server端收到的內容應該是: *3\r\n$3\r\nset\r\n$1\r\na\r\n$1\r\n1\r\n:

int processMultibulkBuffer(redisClient *c) {
    char *newline = NULL;
    int pos = 0, ok;
    long long ll;

    /* 第一次進來的時候,這個值應該都是0 */
    if (c->multibulklen == 0) {
        /* The client should have been reset */
        redisAssertWithInfo(c,NULL,c->argc == 0);

        /* 一定要有\r\n才能進行multi-bulk解析 */
        /* Multi bulk length cannot be read without a \r\n */
        /* 先找到\r */
        newline = strchr(c->querybuf,'\r');
        /* 如果沒有\r,就返回錯誤*/
        if (newline == NULL) {
            if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
                addReplyError(c,"Protocol error: too big mbulk count string");
                setProtocolError(c,0);
            }
            return REDIS_ERR;
        }

        /* Buffer should also contain \n */
        /* 如果第一個\r之前數據部分的長度大於整個讀取內容長度減2,就返回錯誤 
         * 為什么長度的判斷能確保有一個\n?
         */
        if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
            return REDIS_ERR;

        /* We know for sure there is a whole line since newline != NULL,
         * so go ahead and find out the multi bulk length. */
        redisAssertWithInfo(c,NULL,c->querybuf[0] == '*');
        /* 解析'*'號之后的數字,表示這一個bulk的數量 */
        ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
        if (!ok || ll > 1024*1024) {
            addReplyError(c,"Protocol error: invalid multibulk length");
            setProtocolError(c,pos);
            return REDIS_ERR;
        }

        /* 用長度記錄下一行的開始位置 */
        pos = (newline-c->querybuf)+2;
        if (ll <= 0) {
            sdsrange(c->querybuf,pos,-1);
            return REDIS_OK;
        }

        /* 這一塊bulk的數量 */
        c->multibulklen = ll;

        /* Setup argv array on client structure */
        /* 釋放之前的argv?只有multibulklen為0才一定會走到這里。 */
        if (c->argv) zfree(c->argv);
        /* 重新分配argv的空間 */
        c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
    }

    redisAssertWithInfo(c,NULL,c->multibulklen > 0);
    /* 開始解析每一個bulk的數據。*/
    while(c->multibulklen) {
        /* Read bulk length if unknown */
        /* bulklen等於-1說明解析完了一個bulk */
        if (c->bulklen == -1) {
            /* pos記錄了上一行的結尾部分,這里開始處理下一行 */
            newline = strchr(c->querybuf+pos,'\r');
            /* 如果后面沒有再找到\r,認為是處理完成,跳出循環 */
            if (newline == NULL) {
                if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
                    addReplyError(c,
                        "Protocol error: too big bulk count string");
                    setProtocolError(c,0);
                    return REDIS_ERR;
                }
                break;
            }

            /* Buffer should also contain \n */
            if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
                break;

            /* 一定是以'$'開頭 */
            if (c->querybuf[pos] != '$') {
                addReplyErrorFormat(c,
                    "Protocol error: expected '$', got '%c'",
                    c->querybuf[pos]);
                setProtocolError(c,pos);
                return REDIS_ERR;
            }

            /* 解析'$'后面跟的數字,表示后接的參數(字符串表示)的長度 */
            ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
            if (!ok || ll < 0 || ll > 512*1024*1024) {
                addReplyError(c,"Protocol error: invalid bulk length");
                setProtocolError(c,pos);
                return REDIS_ERR;
            }

            /* 記錄下一行的位置 */
            pos += newline-(c->querybuf+pos)+2;
            /* 如果這個參數的長度大於32k,則進行一些特殊處理 */
            if (ll >= REDIS_MBULK_BIG_ARG) {
                size_t qblen;

                /* If we are going to read a large object from network
                 * try to make it likely that it will start at c->querybuf
                 * boundary so that we can optimize object creation
                 * avoiding a large copy of data. */
                /* 截取下一行一直到末尾的子串 */ 
                sdsrange(c->querybuf,pos,-1);
                pos = 0;
                /* 子串的長度 */
                qblen = sdslen(c->querybuf);
                /* Hint the sds library about the amount of bytes this string is
                 * going to contain. */
                /* 如果子串長度小於'$'后面指示的長度,說明這一次沒有讀取完數據,因此在querybuf上make room,確保下一次讀取完整bulk的數據 */
                if (qblen < (size_t)ll+2)
                    c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-qblen);
            }
            /* 這一個參數的長度 */
            c->bulklen = ll;
        }

        /* Read bulk argument */
        /* 如果剩下的部分的長度小於待解析參數的長度,表明數據不完整,跳出 */
        if (sdslen(c->querybuf)-pos < (unsigned)(c->bulklen+2)) {
            /* Not enough data (+2 == trailing \r\n) */
            break;
        } else {
            /* Optimization: if the buffer contains JUST our bulk element
             * instead of creating a new object by *copying* the sds we
             * just use the current sds string. */
            /* 如果是超長的參數,並且整個querybuf都只包含這一個參數,則使用createObject,否則,使用createStringObject */
            if (pos == 0 &&
                c->bulklen >= REDIS_MBULK_BIG_ARG &&
                (signed) sdslen(c->querybuf) == c->bulklen+2)
            {
                c->argv[c->argc++] = createObject(REDIS_STRING,c->querybuf);
                sdsIncrLen(c->querybuf,-2); /* remove CRLF */
                /* createObject是直接使用了querybuf表示的空間,所以下面需要再創造出另一個相同長度的空的buffer */
                c->querybuf = sdsempty();
                /* Assume that if we saw a fat argument we'll see another one
                 * likely... */
                c->querybuf = sdsMakeRoomFor(c->querybuf,c->bulklen+2);
                pos = 0;
            } else {
                c->argv[c->argc++] = createStringObject(c->querybuf+pos,c->bulklen);
                pos += c->bulklen+2;
            }
            /* 這一塊處理完成 */
            c->bulklen = -1;
            c->multibulklen--;
        }
    }

    /* Trim to pos */
    /* 如果pos非0,截取剩下的部分 */
    if (pos) sdsrange(c->querybuf,pos,-1);

    /* We're done when c->multibulk == 0 */
    /* 所有bulk數據都處理完成 */
    if (c->multibulklen == 0) return REDIS_OK;

    /* Still not read to process the command */
    /* 如果是由於數據沒有讀取完整,也會返回err? */
    return REDIS_ERR;
}

processMultibulkBuffer解析完成之后,redisClient中argc和argv就已經有了所有參數的信息了。

解析完了命令內容之后,接下來,processInputBuffer會調用processCommand進行處理。processCommand的大部分工作是做一些檢查工作,以確保當前的命令是可以被執行的。

每一步檢查工作的目的,代碼中的注釋也寫的比較詳細了,其實現如下:

/* If this function gets called we already read a whole
 * command, arguments are in the client argv/argc fields.
 * processCommand() execute the command or prepare the
 * server for a bulk read from the client.
 *
 * If 1 is returned the client is still alive and valid and
 * other operations can be performed by the caller. Otherwise
 * if 0 is returned the client was destroyed (i.e. after QUIT). */
int processCommand(redisClient *c) {
    /* The QUIT command is handled separately. Normal command procs will
     * go through checking for replication and QUIT will cause trouble
     * when FORCE_REPLICATION is enabled and would be implemented in
     * a regular command proc. */
    /* 在這個調用流程上,如果processCommand返回了REDIS_OK,client會被reset掉,所以這里只是打上REDIS_CLOSE_AFTER_REPLY的標記並返回REDIS_ERR */
    if (!strcasecmp(c->argv[0]->ptr,"quit")) {
        addReply(c,shared.ok);
        c->flags |= REDIS_CLOSE_AFTER_REPLY;
        return REDIS_ERR;
    }

    /* Now lookup the command and check ASAP about trivial error conditions
     * such as wrong arity, bad command name and so forth. */
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    /* 由於一次只會處理一條命令,所以c->cmd->arity大於0的話則一定會等於c->argc 
     * 如果arity小於0,說明該命令的參數個數至少是-arity個
     */
    if (!c->cmd) {
        flagTransaction(c);
        addReplyErrorFormat(c,"unknown command '%s'",
            (char*)c->argv[0]->ptr);
        return REDIS_OK;
    } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
               (c->argc < -c->cmd->arity)) {
        flagTransaction(c);
        addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
            c->cmd->name);
        return REDIS_OK;
    }

    /* Check if the user is authenticated */
    /* 要求認證 */
    if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
    {
        flagTransaction(c);
        addReply(c,shared.noautherr);
        return REDIS_OK;
    }

    /* If cluster is enabled perform the cluster redirection here.
     * However we don't perform the redirection if:
     * 1) The sender of this command is our master.
     * 2) The command has no key arguments. */
    if (server.cluster_enabled &&
        !(c->flags & REDIS_MASTER) &&
        !(c->flags & REDIS_LUA_CLIENT &&
          server.lua_caller->flags & REDIS_MASTER) &&
        !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
    {
        int hashslot;

        if (server.cluster->state != REDIS_CLUSTER_OK) {
            flagTransaction(c);
            clusterRedirectClient(c,NULL,0,REDIS_CLUSTER_REDIR_DOWN_STATE);
            return REDIS_OK;
        } else {
            int error_code;
            clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
            if (n == NULL || n != server.cluster->myself) {
                flagTransaction(c);
                clusterRedirectClient(c,n,hashslot,error_code);
                return REDIS_OK;
            }
        }
    }

    /* Handle the maxmemory directive.
     *
     * First we try to free some memory if possible (if there are volatile
     * keys in the dataset). If there are not the only thing we can do
     * is returning an error. */
    if (server.maxmemory) {
        int retval = freeMemoryIfNeeded();
        /* freeMemoryIfNeeded may flush slave output buffers. This may result
         * into a slave, that may be the active client, to be freed. */
        if (server.current_client == NULL) return REDIS_ERR;

        /* It was impossible to free enough memory, and the command the client
         * is trying to execute is denied during OOM conditions? Error. */
        if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
            flagTransaction(c);
            addReply(c, shared.oomerr);
            return REDIS_OK;
        }
    }

    /* Don't accept write commands if there are problems persisting on disk
     * and if this is a master instance. */
    if (((server.stop_writes_on_bgsave_err &&
          server.saveparamslen > 0 &&
          server.lastbgsave_status == REDIS_ERR) ||
          server.aof_last_write_status == REDIS_ERR) &&
        server.masterhost == NULL &&
        (c->cmd->flags & REDIS_CMD_WRITE ||
         c->cmd->proc == pingCommand))
    {
        flagTransaction(c);
        if (server.aof_last_write_status == REDIS_OK)
            addReply(c, shared.bgsaveerr);
        else
            addReplySds(c,
                sdscatprintf(sdsempty(),
                "-MISCONF Errors writing to the AOF file: %s\r\n",
                strerror(server.aof_last_write_errno)));
        return REDIS_OK;
    }

    /* Don't accept write commands if there are not enough good slaves and
     * user configured the min-slaves-to-write option. */
    if (server.masterhost == NULL &&
        server.repl_min_slaves_to_write &&
        server.repl_min_slaves_max_lag &&
        c->cmd->flags & REDIS_CMD_WRITE &&
        server.repl_good_slaves_count < server.repl_min_slaves_to_write)
    {
        flagTransaction(c);
        addReply(c, shared.noreplicaserr);
        return REDIS_OK;
    }

    /* Don't accept write commands if this is a read only slave. But
     * accept write commands if this is our master. */
    if (server.masterhost && server.repl_slave_ro &&
        !(c->flags & REDIS_MASTER) &&
        c->cmd->flags & REDIS_CMD_WRITE)
    {
        addReply(c, shared.roslaveerr);
        return REDIS_OK;
    }

    /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
    if (c->flags & REDIS_PUBSUB &&
        c->cmd->proc != pingCommand &&
        c->cmd->proc != subscribeCommand &&
        c->cmd->proc != unsubscribeCommand &&
        c->cmd->proc != psubscribeCommand &&
        c->cmd->proc != punsubscribeCommand) {
        addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
        return REDIS_OK;
    }

    /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and
     * we are a slave with a broken link with master. */
    if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED &&
        server.repl_serve_stale_data == 0 &&
        !(c->cmd->flags & REDIS_CMD_STALE))
    {
        flagTransaction(c);
        addReply(c, shared.masterdownerr);
        return REDIS_OK;
    }

    /* Loading DB? Return an error if the command has not the
     * REDIS_CMD_LOADING flag. */
    if (server.loading && !(c->cmd->flags & REDIS_CMD_LOADING)) {
        addReply(c, shared.loadingerr);
        return REDIS_OK;
    }

    /* Lua script too slow? Only allow a limited number of commands. */
    if (server.lua_timedout &&
          c->cmd->proc != authCommand &&
          c->cmd->proc != replconfCommand &&
        !(c->cmd->proc == shutdownCommand &&
          c->argc == 2 &&
          tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
        !(c->cmd->proc == scriptCommand &&
          c->argc == 2 &&
          tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
    {
        flagTransaction(c);
        addReply(c, shared.slowscripterr);
        return REDIS_OK;
    }

    /* Exec the command */
    if (c->flags & REDIS_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        queueMultiCommand(c);
        addReply(c,shared.queued);
    } else {
        call(c,REDIS_CALL_FULL);
        c->woff = server.master_repl_offset;
        if (listLength(server.ready_keys))
            handleClientsBlockedOnLists();
    }
    return REDIS_OK;
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM