深入剖析 redis 主從復制


主從概述

redis 支持 master-slave(主從)模式,redis server 可以設置為另一個 redis server 的主機(從機),從機定期從主機拿數據。特殊的,一個 從機同樣可以設置為一個 redis server 的主機,這樣一來 master-slave 的分布看起來就是一個有向無環圖 DAG,如此形成 redis server 集群,無論是主機還是從機都是 redis server,都可以提供服務)。

master_slave

在配置后,主機可負責讀寫服務,從機只負責讀。redis 提高這種配置方式,為的是讓其支持數據的弱一致性,即最終一致性。在業務中,選擇強一致性還是若已執行,應該取決於具體的業務需求,像微博,完全可以使用弱一致性模型;像淘寶,可以選用強一致性模型。

redis 主從復制的實現主要在 replication.c 中。

這篇文章涉及較多的代碼,但我已經盡量刪繁就簡,達到能說明問題本質。為了保留代碼的原生性並讓讀者能夠閱讀原生代碼的注釋,剖析 redis 的幾篇文章都沒有刪除代碼中的英文注釋,並已加注釋。

積壓空間

在《深入剖析 redis AOF 持久化策略》中,介紹了更新緩存的概念,舉一個例子:客戶端發來命令:set name Jhon,這一數據更新被記錄為:*3\r\n$3\r\nSET\r\n$4\r\nname\r\n$3\r\nJhon\r\n,並存儲在更新緩存中。

同樣,在主從連接中,也有更新緩存的概念。只是兩者的用途不一樣,前者被寫入本地,后者被寫入從機,這里我們把它成為積壓空間。

更新緩存存儲在 server.repl_backlog,redis 將其作為一個環形空間來處理,這樣做節省了空間,避免內存再分配的情況。

struct redisServer {
    /* Replication (master) */
    // 最近一次使用(訪問)的數據集
    int slaveseldb;                 /* Last SELECTed DB in replication output */

    // 全局的數據同步偏移量
    long long master_repl_offset;   /* Global replication offset */

    // 主從連接心跳頻率
    int repl_ping_slave_period;     /* Master pings the slave every N seconds */

    // 積壓空間指針
    char *repl_backlog;             /* Replication backlog for partial syncs */

    // 積壓空間大小
    long long repl_backlog_size;    /* Backlog circular buffer size */

    // 積壓空間中寫入的新數據的大小
    long long repl_backlog_histlen; /* Backlog actual data length */

    // 下一次向積壓空間寫入數據的起始位置
    long long repl_backlog_idx;     /* Backlog circular buffer current offset */

    // 積壓數據的起始位置,是一個宏觀值
    long long repl_backlog_off;     /* Replication offset of first byte in the
                                       backlog buffer. */

    // 積壓空間有效時間
    time_t repl_backlog_time_limit; /* Time without slaves after the backlog
                                       gets released. */
}

積壓空間中的數據變更記錄是什么時候被寫入的?在執行一個 redis 命令的時候,如果存在數據的修改(寫),那么就會把變更記錄傳播。redis 源碼中是這么實現的:call()->propagate()->replicationFeedSlaves()

注釋:命令真正執行的地方在 call() 中,call() 如果發現數據被修改(dirty),則傳播 propagrate(),replicationFeedSlaves() 將修改記錄寫入積壓空間和所有已連接的從機。

這里可能會有疑問:為什么把數據添加入積壓空間,又把數據分發給所有的從機?為什么不僅僅將數據分發給所有從機呢?

因為有一些從機會因特殊情況(???)與主機斷開連接,注意從機斷開前有暫存主機的狀態信息,因此這些斷開的從機就沒有及時收到更新的數據。redis 為了讓斷開的從機在下次連接后能夠獲取更新數據,將更新數據加入了積壓空間。從 replicationFeedSlaves() 實現來看,在線的 slave 能馬上收到數據更新記錄;因某些原因暫時斷開連接的 slave,需要從積壓空間中找回斷開期間的數據更新記錄。如果斷開的時間足夠長,master 會拒絕 slave 的部分同步請求,從而 slave 只能進行全同步。

下面是源碼注釋:

// call() 函數是執行命令的核心函數,真正執行命令的地方
/* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags) {
    ......
    /* Call the command. */
    c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);
    redisOpArrayInit(&server.also_propagate);

    // 臟數據標記,數據是否被修改
    dirty = server.dirty;

    // 執行命令對應的函數
    c->cmd->proc(c);

    dirty = server.dirty-dirty;
    duration = ustime()-start;

    ......

    // 將客戶端請求的數據修改記錄傳播給 AOF 和從機
    /* Propagate the command into the AOF and replication link */
    if (flags & REDIS_CALL_PROPAGATE) {
        int flags = REDIS_PROPAGATE_NONE;

        // 強制主從復制
        if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;

        // 強制 AOF 持久化
        if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;

        // 數據被修改
        if (dirty)
            flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);

        // 傳播數據修改記錄
        if (flags != REDIS_PROPAGATE_NONE)
            propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
    }
    ......
}

// 向 AOF 和從機發布數據更新
/* Propagate the specified command (in the context of the specified database id)
 * to AOF and Slaves.
 *
 * flags are an xor between:
 * + REDIS_PROPAGATE_NONE (no propagation of command at all)
 * + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
 * + REDIS_PROPAGATE_REPL (propagate into the replication link)
 */
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    // AOF 策略需要打開,且設置 AOF 傳播標記,將更新發布給本地文件
    if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);

    // 設置了從機傳播標記,將更新發布給從機
    if (flags & REDIS_PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

// 向積壓空間和從機發送數據
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
    listNode *ln;
    listIter li;
    int j, len;
    char llstr[REDIS_LONGSTR_SIZE];

    // 沒有積壓數據且沒有從機,直接退出
    /* If there aren't slaves, and there is no backlog buffer to populate,
     * we can return ASAP. */
    if (server.repl_backlog == NULL && listLength(slaves) == 0) return;

    /* We can't have slaves attached and no backlog. */
    redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));

    /* Send SELECT command to every slave if needed. */
    if (server.slaveseldb != dictid) {
        robj *selectcmd;

        // 小於等於 10 的可以用共享對象
        /* For a few DBs we have pre-computed SELECT command. */
        if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
            selectcmd = shared.select[dictid];
        } else {
        // 不能使用共享對象,生成 SELECT 命令對應的 redis 對象
            int dictid_len;

            dictid_len = ll2string(llstr,sizeof(llstr),dictid);
            selectcmd = createObject(REDIS_STRING,
                sdscatprintf(sdsempty(),
                "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
                dictid_len, llstr));
        }

        // 這里可能會有疑問:為什么把數據添加入積壓空間,又把數據分發給所有的從機?
        // 為什么不僅僅將數據分發給所有從機呢?
        // 因為有一些從機會因特殊情況(???)與主機斷開連接,注意從機斷開前有暫存
        // 主機的狀態信息,因此這些斷開的從機就沒有及時收到更新的數據。redis 為了讓
        // 斷開的從機在下次連接后能夠獲取更新數據,將更新數據加入了積壓空間。

        // 將 SELECT 命令對應的 redis 對象數據添加到積壓空間
        /* Add the SELECT command into the backlog. */
        if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);

        // 將數據分發所有的從機
        /* Send it to slaves. */
        listRewind(slaves,&li);
        while((ln = listNext(&li))) {
            redisClient *slave = ln->value;
            addReply(slave,selectcmd);
        }

        // 銷毀對象
        if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)
            decrRefCount(selectcmd);
    }

    // 更新最近一次使用(訪問)的數據集
    server.slaveseldb = dictid;

    // 將命令寫入積壓空間
    /* Write the command to the replication backlog if any. */
    if (server.repl_backlog) {
        char aux[REDIS_LONGSTR_SIZE+3];

        // 命令個數
        /* Add the multi bulk reply length. */
        aux[0] = '*';
        len = ll2string(aux+1,sizeof(aux)-1,argc);
        aux[len+1] = '\r';
        aux[len+2] = '\n';
        feedReplicationBacklog(aux,len+3);

        // 逐個命令寫入
        for (j = 0; j < argc; j++) {
            long objlen = stringObjectLen(argv[j]);

            /* We need to feed the buffer with the object as a bulk reply
             * not just as a plain string, so create the $..CRLF payload len
             * ad add the final CRLF */
            aux[0] = '$';
            len = ll2string(aux+1,sizeof(aux)-1,objlen);
            aux[len+1] = '\r';
            aux[len+2] = '\n';

            /* 每個命令格式如下:
            $3
            *3
            SET
            *4
            NAME
            *4
            Jhon*/

            // 命令長度
            feedReplicationBacklog(aux,len+3);
            // 命令
            feedReplicationBacklogWithObject(argv[j]);
            // 換行
            feedReplicationBacklog(aux+len+1,2);
        }
    }

    // 立即給每一個從機發送命令
    /* Write the command to every slave. */
    listRewind(slaves,&li);
    while((ln = listNext(&li))) {
        redisClient *slave = ln->value;

        // 如果從機要求全同步,則不對此從機發送數據
        /* Don't feed slaves that are still waiting for BGSAVE to start */
        if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;

        /* Feed slaves that are waiting for the initial SYNC (so these commands
         * are queued in the output buffer until the initial SYNC completes),
         * or are already in sync with the master. */

        // 向從機命令的長度
        /* Add the multi bulk length. */
        addReplyMultiBulkLen(slave,argc);

        // 向從機發送命令
        /* Finally any additional argument that was not stored inside the
         * static buffer if any (from j to argc). */
        for (j = 0; j < argc; j++)
            addReplyBulk(slave,argv[j]);
    }
}

主從數據同步機制概述

redis 主從同步有兩種方式(或者所兩個階段):全同步和部分同步。

主從剛剛連接的時候,進行全同步;全同步結束后,進行部分同步。當然,如果有需要,slave 在任何時候都可以發起全同步。redis 策略是,無論如何,首先會嘗試進行部分同步,如不成功,要求從機進行全同步,並啟動 BGSAVE……BGSAVE 結束后,傳輸 RDB 文件;如果成功,允許從機進行部分同步,並傳輸積壓空間中的數據。

下面這幅圖,總結了主從同步的機制:

how_redis_replication_sync_works

如需設置 slave,master 需要向 slave 發送 SLAVEOF hostname port,從機接收到后會自動連接主機,注冊相應讀寫事件(syncWithMaster())。

// 修改主機
void slaveofCommand(redisClient *c) {
    if (!strcasecmp(c->argv[1]->ptr,"no") &&
        !strcasecmp(c->argv[2]->ptr,"one")) {
        // slaveof no one 斷開主機連接
        if (server.masterhost) {
            replicationUnsetMaster();
            redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
        }
    } else {
        long port;

        if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK))
            return;

        // 可能已經連接需要連接的主機
        /* Check if we are already attached to the specified slave */
        if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
            && server.masterport == port) {
            redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");
            addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
            return;
        }

        // 斷開之前連接主機的連接,連接新的。 replicationSetMaster() 並不會真正連接主機,只是修改 struct server 中關於主機的設置。真正的主機連接在 replicationCron() 中完成
        /* There was no previous master or the user specified a different one,
         * we can continue. */
        replicationSetMaster(c->argv[1]->ptr, port);
        redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
            server.masterhost, server.masterport);
    }
    addReply(c,shared.ok);
}

// 設置新主機
/* Set replication to the specified master address and port. */
void replicationSetMaster(char *ip, int port) {
    sdsfree(server.masterhost);
    server.masterhost = sdsdup(ip);
    server.masterport = port;

    // 斷開之前主機的連接
    if (server.master) freeClient(server.master);
    disconnectSlaves(); /* Force our slaves to resync with us as well. */

    // 取消緩存主機
    replicationDiscardCachedMaster(); /* Don't try a PSYNC. */

    // 釋放積壓空間
    freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */

    // cancelReplicationHandshake() 嘗試斷開數據傳輸和主機連接
    cancelReplicationHandshake();
    server.repl_state = REDIS_REPL_CONNECT;
    server.master_repl_offset = 0;
}

// 管理主從連接的定時程序定時程序,每秒執行一次
// 在 serverCorn() 中調用
/* --------------------------- REPLICATION CRON  ----------------------------- */

/* Replication cron funciton, called 1 time per second. */
void replicationCron(void) {
    ......
    // 如果需要( REDIS_REPL_CONNECT),嘗試連接主機,真正連接主機的操作在這里
    /* Check if we should connect to a MASTER */
    if (server.repl_state == REDIS_REPL_CONNECT) {
        redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d",
            server.masterhost, server.masterport);
        if (connectWithMaster() == REDIS_OK) {
            redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
        }
    }
    ......
}

全同步

接着自動發起 PSYNC 請求 master 進行全同步。無論如何,redis 首先會嘗試部分同步,如果失敗才嘗試全同步。而剛剛建立連接的 master-slave 需要全同步。

從機連接主機后,會主動發起 PSYNC 命令,從機會提供 master_runid 和 offset,主機驗證 master_runid 和 offset 是否有效?master_runid 相當於主機身份驗證碼,用來驗證從機上一次連接的主機,offset 是全局積壓空間數據的偏移量。
驗證未通過則,則進行全同步:主機返回 +FULLRESYNC master_runid offset(從機接收並記錄 master_runid 和 offset,並准備接收 RDB 文件)接着啟動 BGSAVE 生成 RDB 文件,BGSAVE 結束后,向從機傳輸,從而完成全同步。

// 連接主機 connectWithMaster() 的時候,會被注冊為回調函數
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
    char tmpfile[256], *err;
    int dfd, maxtries = 5;
    int sockerr = 0, psync_result;
    socklen_t errlen = sizeof(sockerr);

    ......

    // 這里嘗試向主機請求部分同步,主機會回復以拒絕或接受請求。如果拒絕部分同步,會返回 +FULLRESYNC master_runid offset
    // 從機接收后准備進行全同步    psync_result = slaveTryPartialResynchronization(fd);
    if (psync_result == PSYNC_CONTINUE) {
        redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
        return;
    }

    // 執行全同步
    /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
     * and the server.repl_master_runid and repl_master_initial_offset are
     * already populated. */

    // 未知結果,進行出錯處理
    if (psync_result == PSYNC_NOT_SUPPORTED) {
        redisLog(REDIS_NOTICE,"Retrying with SYNC...");
        if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
            redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
                strerror(errno));
            goto error;
        }
    }

    // 為什么要嘗試 5次???
    /* Prepare a suitable temp file for bulk transfer */
    while(maxtries--) {
        snprintf(tmpfile,256,
            "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
        dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
        if (dfd != -1) break;
        sleep(1);
    }
    if (dfd == -1) {
        redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
        goto error;
    }

    // 注冊讀事件,回調函數 readSyncBulkPayload(), 准備讀 RDB 文件
    /* Setup the non blocking download of the bulk file. */
    if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
            == AE_ERR)
    {
        redisLog(REDIS_WARNING,
            "Can't create readable event for SYNC: %s (fd=%d)",
            strerror(errno),fd);
        goto error;
    }

    // 設置傳輸 RDB 文件數據的選項
    // 狀態
    server.repl_state = REDIS_REPL_TRANSFER;
    // RDB 文件大小
    server.repl_transfer_size = -1;
    // 已經傳輸的大小
    server.repl_transfer_read = 0;
    // 上一次同步的偏移,為的是定時寫入磁盤
    server.repl_transfer_last_fsync_off = 0;
    // 本地 RDB 文件套接字
    server.repl_transfer_fd = dfd;
    // 上一次同步 IO 時間
    server.repl_transfer_lastio = server.unixtime;
    // 臨時文件名
    server.repl_transfer_tmpfile = zstrdup(tmpfile);
    return;

error:
    close(fd);
    server.repl_transfer_s = -1;
    server.repl_state = REDIS_REPL_CONNECT;
    return;
}

全同步請求的數據是 RDB 數據文件和積壓空間中的數據。關於 RDB 數據文件,請參看《深入剖析 redis RDB 持久化策略》。如果沒有后台持久化 BGSAVE 進程,那么 BGSVAE 會被觸發,否則所有請求全同步的 slave 都會被標記為等待 BGSAVE 結束。BGSAVE 結束后,master 會馬上向所有的從機發送 RDB 文件。

// 主機 SYNC 和 PSYNC 命令處理函數,會嘗試進行部分同步和全同步
/* SYNC ad PSYNC command implemenation. */
void syncCommand(redisClient *c) {
    ......
    // 主機嘗試部分同步,失敗的話向從機發送 +FULLRESYNC master_runid offset,接着啟動 BGSAVE

    // 執行全同步:
    /* Full resynchronization. */
    server.stat_sync_full++;

    /* Here we need to check if there is a background saving operation
     * in progress, or if it is required to start one */
    if (server.rdb_child_pid != -1) {
    /*  存在 BGSAVE 后台進程。
        1.如果 master 現有所連接的所有從機 slaves 當中有存在 REDIS_REPL_WAIT_BGSAVE_END 的從機,那么將從機 c 設置為 REDIS_REPL_WAIT_BGSAVE_END;
        2.否則,設置為 REDIS_REPL_WAIT_BGSAVE_START*/

        /* Ok a background save is in progress. Let's check if it is a good
         * one for replication, i.e. if there is another slave that is
         * registering differences since the server forked to save */
        redisClient *slave;
        listNode *ln;
        listIter li;

        // 檢測是否已經有從機申請全同步
        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            slave = ln->value;
            if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
        }

        if (ln) {
        // 存在狀態為 REDIS_REPL_WAIT_BGSAVE_END 的從機 slave,
        // 就將此從機 c 狀態設置為 REDIS_REPL_WAIT_BGSAVE_END,
        // 從而在 BGSAVE 進程結束后,可以發送 RDB 文件,
        // 同時將從機 slave 中的更新復制到此從機 c。

            /* Perfect, the server is already registering differences for
             * another slave. Set the right state, and copy the buffer. */

            // 將其他從機上的待回復的緩存復制到從機 c
            copyClientOutputBuffer(c,slave);

            // 修改從機 c 狀態為「等待 BGSAVE 進程結束」
            c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
            redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
        } else {
        // 不存在狀態為 REDIS_REPL_WAIT_BGSAVE_END 的從機,就將此從機 c 狀態設置為 REDIS_REPL_WAIT_BGSAVE_START,即等待新的 BGSAVE 進程的開啟。

            // 修改狀態為「等待 BGSAVE 進程開始」
            /* No way, we need to wait for the next BGSAVE in order to
             * register differences */
            c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
            redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
        }
    } else {
    // 不存在 BGSAVE 后台進程,啟動一個新的 BGSAVE 進程

        /* Ok we don't have a BGSAVE in progress, let's start one */
        redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
        if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
            redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
            addReplyError(c,"Unable to perform background save");
            return;
        }

        // 將此從機 c 狀態設置為 REDIS_REPL_WAIT_BGSAVE_END,從而在 BGSAVE 進程結束后,可以發送 RDB 文件,同時將從機 slave 中的更新復制到此從機 c。
        c->replstate = REDIS_REPL_WAIT_BGSAVE_END;

        // 清理腳本緩存???
        /* Flush the script cache for the new slave. */
        replicationScriptCacheFlush();
    }

    if (server.repl_disable_tcp_nodelay)
        anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
    c->repldbfd = -1;
    c->flags |= REDIS_SLAVE;
    server.slaveseldb = -1; /* Force to re-emit the SELECT command. */
    listAddNodeTail(server.slaves,c);
    if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
        createReplicationBacklog();
    return;
}

// BGSAVE 結束后,會調用
/* A background saving child (BGSAVE) terminated its work. Handle this. */
void backgroundSaveDoneHandler(int exitcode, int bysignal) {
    // 其他操作
    ......
    // 可能從機正在等待 BGSAVE 進程的終止
    /* Possibly there are slaves waiting for a BGSAVE in order to be served
     * (the first stage of SYNC is a bulk transfer of dump.rdb) */
    updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
}

// 當 RDB 持久化(backgroundSaveDoneHandler())結束后,會調用此函數
// RDB 文件就緒,給所有的從機發送 RDB 文件
/* This function is called at the end of every background saving.
* The argument bgsaveerr is REDIS_OK if the background saving succeeded
* otherwise REDIS_ERR is passed to the function.
*
* The goal of this function is to handle slaves waiting for a successful
* background saving in order to perform non-blocking synchronization. */
void updateSlavesWaitingBgsave(int bgsaveerr) {
    listNode *ln;
    int startbgsave = 0;
    listIter li;

    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        redisClient *slave = ln->value;

        // 等待 BGSAVE 開始。調整狀態為等待下一次 BGSAVE 進程的結束
        if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
            startbgsave = 1;

            slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;

        // 等待 BGSAVE 結束。准備向 slave 發送 RDB 文件
        } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
            struct redis_stat buf;

            // 如果 RDB 持久化失敗, bgsaveerr 會被設置為 REDIS_ERR
            if (bgsaveerr != REDIS_OK) {
                freeClient(slave);
                redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
                continue;
            }

            // 打開 RDB 文件
            if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
                redis_fstat(slave->repldbfd,&buf) == -1) {
                freeClient(slave);
                redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
                continue;
            }

            slave->repldboff = 0;
            slave->repldbsize = buf.st_size;
            slave->replstate = REDIS_REPL_SEND_BULK;

            // 如果之前有注冊寫事件,取消
            aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);

            // 注冊新的寫事件,sendBulkToSlave() 傳輸 RDB 文件
            if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
                freeClient(slave);
                continue;
            }
        }
    }

    // startbgsave == REDIS_ERR 表示 BGSAVE 失敗,再一次進行 BGSAVE 嘗試
    if (startbgsave) {
        /* Since we are starting a new background save for one or more slaves,
         * we flush the Replication Script Cache to use EVAL to propagate every
         * new EVALSHA for the first time, since all the new slaves don't know
         * about previous scripts. */
        replicationScriptCacheFlush();

        if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
        /*BGSAVE 可能 fork 失敗,所有等待 BGSAVE 的從機都將結束連接。這是 redis 自我保護的措施,fork 失敗很可能是內存緊張*/

            listIter li;

            listRewind(server.slaves,&li);
            redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
            while((ln = listNext(&li))) {
                redisClient *slave = ln->value;

                if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
                    freeClient(slave);
            }
        }
    }
}

部分同步

如上所說,無論如何,redis 首先會嘗試部分同步。部分同步即把積壓空間緩存的數據,即更新記錄發送給從機。

從機連接主機后,會主動發起 PSYNC 命令,從機會提供 master_runid 和 offset,主機驗證 master_runid 和 offset 是否有效?
驗證通過則,進行部分同步:主機返回 +CONTINUE(從機接收后會注冊積壓數據接收事件),接着發送積壓空間數據。

// 連接主機 connectWithMaster() 的時候,會被注冊為回調函數
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
    char tmpfile[256], *err;
    int dfd, maxtries = 5;
    int sockerr = 0, psync_result;
    socklen_t errlen = sizeof(sockerr);

    ......

    // 嘗試部分同步,主機允許進行部分同步會返回 +CONTINUE,從機接收后注冊相應的事件

    /* Try a partial resynchonization. If we don't have a cached master
     * slaveTryPartialResynchronization() will at least try to use PSYNC
     * to start a full resynchronization so that we get the master run id
     * and the global offset, to try a partial resync at the next
     * reconnection attempt. */

    // 函數返回三種狀態:
    // PSYNC_CONTINUE:表示會進行部分同步,在 slaveTryPartialResynchronization()
                     // 中已經設置回調函數 readQueryFromClient()
    // PSYNC_FULLRESYNC:全同步,會下載 RDB 文件
    // PSYNC_NOT_SUPPORTED:未知
    psync_result = slaveTryPartialResynchronization(fd);
    if (psync_result == PSYNC_CONTINUE) {
        redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
        return;
    }

    // 執行全同步
    ......
}

// 函數返回三種狀態:
// PSYNC_CONTINUE:表示會進行部分同步,已經設置回調函數
// PSYNC_FULLRESYNC:全同步,會下載 RDB 文件
// PSYNC_NOT_SUPPORTED:未知
#define PSYNC_CONTINUE 0
#define PSYNC_FULLRESYNC 1
#define PSYNC_NOT_SUPPORTED 2
int slaveTryPartialResynchronization(int fd) {
    char *psync_runid;
    char psync_offset[32];
    sds reply;

    /* Initially set repl_master_initial_offset to -1 to mark the current
     * master run_id and offset as not valid. Later if we'll be able to do
     * a FULL resync using the PSYNC command we'll set the offset at the
     * right value, so that this information will be propagated to the
     * client structure representing the master into server.master. */
    server.repl_master_initial_offset = -1;

    if (server.cached_master) {
    // 緩存了上一次與主機連接的信息,可以嘗試進行部分同步,減少數據傳輸
        psync_runid = server.cached_master->replrunid;
        snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
        redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
    } else {
    // 未緩存上一次與主機連接的信息,進行全同步
    // psync ? -1 可以獲取主機的 master_runid
        redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)");
        psync_runid = "?";
        memcpy(psync_offset,"-1",3);
    }

    // 向主機發送命令,並接收回復
    /* Issue the PSYNC command */
    reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);

    // 全同步
    if (!strncmp(reply,"+FULLRESYNC",11)) {
        char *runid = NULL, *offset = NULL;

        /* FULL RESYNC, parse the reply in order to extract the run id
         * and the replication offset. */
        runid = strchr(reply,' ');
        if (runid) {
            runid++;
            offset = strchr(runid,' ');
            if (offset) offset++;
        }
        if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
            redisLog(REDIS_WARNING,
                "Master replied with wrong +FULLRESYNC syntax.");
            /* This is an unexpected condition, actually the +FULLRESYNC
             * reply means that the master supports PSYNC, but the reply
             * format seems wrong. To stay safe we blank the master
             * runid to make sure next PSYNCs will fail. */
            memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
        } else {
            // 拷貝 runid
            memcpy(server.repl_master_runid, runid, offset-runid-1);
            server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
            server.repl_master_initial_offset = strtoll(offset,NULL,10);
            redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld",
                server.repl_master_runid,
                server.repl_master_initial_offset);
        }
        /* We are going to full resync, discard the cached master structure. */
        replicationDiscardCachedMaster();
        sdsfree(reply);
        return PSYNC_FULLRESYNC;
    }

    // 部分同步
    if (!strncmp(reply,"+CONTINUE",9)) {
        /* Partial resync was accepted, set the replication state accordingly */
        redisLog(REDIS_NOTICE,
            "Successful partial resynchronization with master.");
        sdsfree(reply);

        // 緩存主機替代現有主機,且為 PSYNC(部分同步) 做好准備c
        replicationResurrectCachedMaster(fd);

        return PSYNC_CONTINUE;
    }

    /* If we reach this point we receied either an error since the master does
     * not understand PSYNC, or an unexpected reply from the master.
     * Reply with PSYNC_NOT_SUPPORTED in both cases. */

    // 接收到主機發出的錯誤信息
    if (strncmp(reply,"-ERR",4)) {
        /* If it's not an error, log the unexpected event. */
        redisLog(REDIS_WARNING,
            "Unexpected reply to PSYNC from master: %s", reply);
    } else {
        redisLog(REDIS_NOTICE,
            "Master does not support PSYNC or is in "
            "error state (reply: %s)", reply);
    }
    sdsfree(reply);
    replicationDiscardCachedMaster();
    return PSYNC_NOT_SUPPORTED;
}

// 主機 SYNC 和 PSYNC 命令處理函數,會嘗試進行部分同步和全同步
/* SYNC ad PSYNC command implemenation. */
void syncCommand(redisClient *c) {
    ......

    // 主機嘗試部分同步,允許則進行部分同步,會返回 +CONTINUE,接着發送積壓空間

    /* Try a partial resynchronization if this is a PSYNC command.
     * If it fails, we continue with usual full resynchronization, however
     * when this happens masterTryPartialResynchronization() already
     * replied with:
     *
     * +FULLRESYNC <runid> <offset>
     *
     * So the slave knows the new runid and offset to try a PSYNC later
     * if the connection with the master is lost. */
    if (!strcasecmp(c->argv[0]->ptr,"psync")) {
        // 部分同步
        if (masterTryPartialResynchronization(c) == REDIS_OK) {
            server.stat_sync_partial_ok++;
            return; /* No full resync needed, return. */
        } else {
        // 部分同步失敗,會進行全同步,這時會收到來自客戶端的 runid
            char *master_runid = c->argv[1]->ptr;

            /* Increment stats for failed PSYNCs, but only if the
             * runid is not "?", as this is used by slaves to force a full
             * resync on purpose when they are not albe to partially
             * resync. */
            if (master_runid[0] != '?') server.stat_sync_partial_err++;
        }
    } else {
        /* If a slave uses SYNC, we are dealing with an old implementation
         * of the replication protocol (like redis-cli --slave). Flag the client
         * so that we don't expect to receive REPLCONF ACK feedbacks. */
        c->flags |= REDIS_PRE_PSYNC_SLAVE;
    }

    // 執行全同步:
    ......
}

// 主機嘗試是否能進行部分同步
/* This function handles the PSYNC command from the point of view of a
* master receiving a request for partial resynchronization.
*
* On success return REDIS_OK, otherwise REDIS_ERR is returned and we proceed
* with the usual full resync. */
int masterTryPartialResynchronization(redisClient *c) {
    long long psync_offset, psync_len;
    char *master_runid = c->argv[1]->ptr;
    char buf[128];
    int buflen;

    /* Is the runid of this master the same advertised by the wannabe slave
     * via PSYNC? If runid changed this master is a different instance and
     * there is no way to continue. */
    if (strcasecmp(master_runid, server.runid)) {
    // 當因為異常需要與主機斷開連接的時候,從機會暫存主機的狀態信息,以便
    // 下一次的部分同步。
    // 1)master_runid 是從機提供一個因緩存主機的 runid,
    // 2)server.runid 是本機(主機)的 runid。
    // 匹配失敗,說明是本機(主機)不是從機緩存的主機,這時候不能進行部分同步,
    // 只能進行全同步

        // "?" 表示從機要求全同步
        // 什么時候從機會要求全同步???
        /* Run id "?" is used by slaves that want to force a full resync. */
        if (master_runid[0] != '?') {
            redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
                "Runid mismatch (Client asked for '%s', I'm '%s')",
                master_runid, server.runid);
        } else {
            redisLog(REDIS_NOTICE,"Full resync requested by slave.");
        }
        goto need_full_resync;
    }

    // 從參數中解析整數,整數是從機指定的偏移量
    /* We still have the data our slave is asking for? */
    if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
       REDIS_OK) goto need_full_resync;

    // 部分同步失敗的情況
    if (!server.repl_backlog || /*不存在積壓空間*/
        psync_offset < server.repl_backlog_off ||  /*psync_offset 太過小,
                                                    即從機錯過太多更新記錄,
                                                    安全起見,實行全同步*/
                                                    /*psync_offset 越界*/
        psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
    // 經檢測,不滿足部分同步的條件,轉而進行全同步
    {
        redisLog(REDIS_NOTICE,
            "Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset);
        if (psync_offset > server.master_repl_offset) {
            redisLog(REDIS_WARNING,
                "Warning: slave tried to PSYNC with an offset that is greater than the master replication offset.");
        }
        goto need_full_resync;
    }

    // 執行部分同步:
    // 1)標記客戶端為從機
    // 2)通知從機准備接收數據。從機收到 +CONTINUE 會做好准備
    // 3)開發發送數據
    /* If we reached this point, we are able to perform a partial resync:
     * 1) Set client state to make it a slave.
     * 2) Inform the client we can continue with +CONTINUE
     * 3) Send the backlog data (from the offset to the end) to the slave. */

    // 將連接的客戶端標記為從機
    c->flags |= REDIS_SLAVE;

    // 表示進行部分同步
    // #define REDIS_REPL_ONLINE 9 /* RDB file transmitted, sending just
    // updates. */
    c->replstate = REDIS_REPL_ONLINE;

    // 更新 ack 的時間
    c->repl_ack_time = server.unixtime;

    // 添加入從機鏈表
    listAddNodeTail(server.slaves,c);

    // 告訴從機可以進行部分同步,從機收到后會做相關的准備(注冊回調函數)
    /* We can't use the connection buffers since they are used to accumulate
     * new commands at this stage. But we are sure the socket send buffer is
     * emtpy so this write will never fail actually. */
    buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
    if (write(c->fd,buf,buflen) != buflen) {
        freeClientAsync(c);
        return REDIS_OK;
    }

    // 向從機寫積壓空間中的數據,積壓空間存儲有「更新緩存」
    psync_len = addReplyReplicationBacklog(c,psync_offset);

    redisLog(REDIS_NOTICE,
        "Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset);
    /* Note that we don't need to set the selected DB at server.slaveseldb
     * to -1 to force the master to emit SELECT, since the slave already
     * has this state from the previous connection with the master. */

    refreshGoodSlavesCount();
    return REDIS_OK; /* The caller can return, no full resync needed. */

need_full_resync:
    ......
    // 向從機發送 +FULLRESYNC runid repl_offset
}

暫緩主機

從機因為某些原因,譬如網絡延遲(PING 超時,ACK 超時等),可能會斷開與主機的連接。這時候,從機會嘗試保存與主機連接的信息,譬如全局積壓空間數據偏移量等,以便下一次的部分同步,並且從機會再一次嘗試連接主機。注意一點,如果斷開的時間足夠長, 部分同步肯定會失敗的。

void freeClient(redisClient *c) {
    listNode *ln;

    /* If this is marked as current client unset it */
    if (server.current_client == c) server.current_client = NULL;

    // 如果此機為從機,已經連接主機,可能需要保存主機狀態信息,以便進行 PSYNC
    /* If it is our master that's beging disconnected we should make sure
     * to cache the state to try a partial resynchronization later.
     *
     * Note that before doing this we make sure that the client is not in
     * some unexpected state, by checking its flags. */
    if (server.master && c->flags & REDIS_MASTER) {
        redisLog(REDIS_WARNING,"Connection with master lost.");
        if (!(c->flags & (REDIS_CLOSE_AFTER_REPLY|
                          REDIS_CLOSE_ASAP|
                          REDIS_BLOCKED|
                          REDIS_UNBLOCKED)))
        {
            replicationCacheMaster(c);
            return;
        }
    }
    ......
}

// 為了實現部分同步,從機會保存主機的狀態信息后才會斷開主機的連接,主機狀態信息
// 保存在 server.cached_master
// 會在 freeClient() 中調用,保存與主機連接的狀態信息,以便進行 PSYNC
void replicationCacheMaster(redisClient *c) {
    listNode *ln;

    redisAssert(server.master != NULL && server.cached_master == NULL);
    redisLog(REDIS_NOTICE,"Caching the disconnected master state.");

    // 從客戶端列表刪除主機的信息
    /* Remove from the list of clients, we don't want this client to be
     * listed by CLIENT LIST or processed in any way by batch operations. */
    ln = listSearchKey(server.clients,c);
    redisAssert(ln != NULL);
    listDelNode(server.clients,ln);

    // 保存主機的狀態信息
    /* Save the master. Server.master will be set to null later by
     * replicationHandleMasterDisconnection(). */
    server.cached_master = server.master;

    // 注銷事件,關閉連接
    /* Remove the event handlers and close the socket. We'll later reuse
     * the socket of the new connection with the master during PSYNC. */
    aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
    aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
    close(c->fd);

    /* Set fd to -1 so that we can safely call freeClient(c) later. */
    c->fd = -1;

    // 修改連接的狀態,設置 server.master = NULL
    /* Caching the master happens instead of the actual freeClient() call,
     * so make sure to adjust the replication state. This function will
     * also set server.master to NULL. */
    replicationHandleMasterDisconnection();
}

總結

簡單來說,主從同步就是 RDB 文件的上傳下載;主機有小部分的數據修改,就把修改記錄傳播給每個從機。這篇文章詳述了 redis 主從復制的內部協議和機制。接下來的幾篇關於 redis 的文章,主要是其內部數據結構。

搗亂 2014-4-22

http://daoluan.net


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM