REPLICAOF同步數據 根據注釋,版本5以后使用REPLICAOF代替SLAVEOF { "SLAVEOF", "host port", "Make the server a replica of another instance, or promote it as master. Deprecated starting with Redis 5. Use REPLICAOF instead.", 9, "1.0.0" }, 當執行這個命令的時候 REPLICAOF host port,主機會開始做數據同步,主要邏輯如下 調用如下同步函數 /* SYNC and PSYNC command implemenation. */ SYNC 同步 和 PSYNC 增量同步 命令實現 void syncCommand(client *c) { /* ignore SYNC if already slave or in monitor mode */ 忽略同步命令,如果已經是從機或者處於監控模式 if (c->flags & CLIENT_SLAVE) return; /* Refuse SYNC requests if we are a slave but the link with our master * is not ok... */ 拒絕同步請求,如果我們是一個從機,而且連接的主機當下斷開連接了。 if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) { addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n")); return; } /* SYNC can't be issued when the server has pending data to send to * the client about already issued commands. We need a fresh reply * buffer registering the differences between the BGSAVE and the current * dataset, so that we can copy to other slaves if needed. */ 當服務器還有掛着的之前執行命令的數據需要發送給客戶端時,不執行同步命令。 我們需要一個新的應答緩沖區,來寄存BGSAVE和當前數據集的區別,從而在需要的時候可以復制給從機 if (clientHasPendingReplies(c)) { addReplyError(c,"SYNC and PSYNC are invalid with pending output"); return; } serverLog(LL_NOTICE,"Replica %s asks for synchronization", replicationGetSlaveName(c)); 記錄從機的名字 有ip地址用IP地址,沒有用客戶唯一編號 /* Try a partial resynchronization if this is a PSYNC command. * If it fails, we continue with usual full resynchronization, however * when this happens masterTryPartialResynchronization() already * replied with: 如果是PSYNC命令,嘗試部分再同步,失敗的情況下,我們繼續使用全量再同步, 實際上,當這種情況發生時候,函數masterTryPartialResynchronization已經回復客戶端+FULLRESYNC <replid> <offset> * +FULLRESYNC <replid> <offset> * * So the slave knows the new replid and offset to try a PSYNC later * if the connection with the master is lost. */ 所以從機知道新的復制id和偏移量后面去嘗試部分同步,如果主機的連接丟失了。 if (!strcasecmp(c->argv[0]->ptr,"psync")) { if (masterTryPartialResynchronization(c) == C_OK) { 主機嘗試進行增量同步,可以就返回成功進行增量同步,不行就返回失敗需要全量同步 server.stat_sync_partial_ok++; 增量同步計數++ return; /* No full resync needed, return. */ 不需要全量同步 } else { 需要全量同步的情況 char *master_replid = c->argv[1]->ptr; /* Increment stats for failed PSYNCs, but only if the * replid is not "?", as this is used by slaves to force a full * resync on purpose when they are not albe to partially * resync. */ 對失敗的增量同步計數,但是只針對同步id不是問號時, 因為這種情況是當不能進行增量同步時而強制進行全量同步 if (master_replid[0] != '?') server.stat_sync_partial_err++; } } else { /* If a slave uses SYNC, we are dealing with an old implementation * of the replication protocol (like redis-cli --slave). Flag the client * so that we don't expect to receive REPLCONF ACK feedbacks. */ 如果從機使用的命令時SYNC,那么我們使用老的復制協議(例如 redis-cli --slave ). 標記客戶端,這樣我們就不會企鵝昂收到復制回復反饋 c->flags |= CLIENT_PRE_PSYNC; } /* Full resynchronization. */ 全量同步加1 server.stat_sync_full++; /* Setup the slave as one waiting for BGSAVE to start. The following code * paths will change the state if we handle the slave differently. */ 設置從機的狀態為等待BGSAVE啟動。如果我們處理從機不同,那么接下來的代碼路徑將改變狀態 c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; if (server.repl_disable_tcp_nodelay) 是否禁用TCP_NODELAY,禁用則會使用Nagle算法,方便大數據包傳播 connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */ 失敗也無所謂 c->repldbfd = -1; c->flags |= CLIENT_SLAVE; listAddNodeTail(server.slaves,c); /* Create the replication backlog if needed. */ 創建后台賦值日志 if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) { /* When we create the backlog from scratch, we always use a new * replication ID and clear the ID2, since there is no valid * past history. */ 當我們從頭創建后台日志的時候,我們總是使用新的復制id和清理ID2,因為過去的歷史已經沒有價值(我們使用了新的全量同步) changeReplicationId(); 使用新的復制id,注意到上面的條件,只有一個slave時且全量同步時候才這么做 clearReplicationId2(); 清空第二復制id2 createReplicationBacklog();創建后台日志,用於記錄執行過的命令 serverLog(LL_NOTICE,"Replication backlog created, my new " "replication IDs are '%s' and '%s'", server.replid, server.replid2); } /* CASE 1: BGSAVE is in progress, with disk target. */ 情況1:正在進行BGSAVE,目標為磁盤 if (server.rdb_child_pid != -1 && server.rdb_child_type == RDB_CHILD_TYPE_DISK) { /* Ok a background save is in progress. Let's check if it is a good * one for replication, i.e. if there is another slave that is * registering differences since the server forked to save. */ 一個后台保存進程正在執行。讓我們檢查這個是否可以用於復制。 舉例來說,如果有一個另外的從機自從服務器開啟后台進程之后一直在保存累積的新命令 client *slave; listNode *ln; listIter li; listRewind(server.slaves,&li); while((ln = listNext(&li))) { slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break; 如果存在另外的后台保存數據庫的進程 } /* To attach this slave, we check that it has at least all the * capabilities of the slave that triggered the current BGSAVE. */ 想要連接這個從機,我們檢查從機至少擁有觸發當前BGSAVE的從機的同步能力 if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) { 檢測同步能力(capabilities) /* Perfect, the server is already registering differences for * another slave. Set the right state, and copy the buffer. */ 完美,主機已經生成了為另外一個從機增量命令的積累,設置正確的狀態,復制另外從機的緩存 copyClientOutputBuffer(c,slave); 復制另外一個正在緩存BGSAVE的累積命令緩沖區 replicationSetupSlaveForFullResync(c,slave->psync_initial_offset); serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC"); } else { /* No way, we need to wait for the next BGSAVE in order to * register differences. */ 否則我們需要需要等待下一個BGSAVE,保存不同的緩存區 serverLog(LL_NOTICE,"Can't attach the replica to the current BGSAVE. Waiting for next BGSAVE for SYNC"); } /* CASE 2: BGSAVE is in progress, with socket target. */ 情況2: bgsave正在進行,目標是套接字 } else if (server.rdb_child_pid != -1 && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) { /* There is an RDB child process but it is writing directly to * children sockets. We need to wait for the next BGSAVE * in order to synchronize. */ 這是一個RDB的子進程,但是它是直接寫向子客戶端的套接字。我們需要等待下個BGSAVE來實現同步 serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC"); /* CASE 3: There is no BGSAVE is progress. */ 情況3: 沒有BGSAVE在執行 } else { if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) { /* Diskless replication RDB child is created inside * replicationCron() since we want to delay its start a * few seconds to wait for more slaves to arrive. */ 在replicationCron中創建無盤復制的RDB子子線程,因此我們想要延遲幾秒,用來等待更多的從機到來(大家合在一起只需要一次即可,節約時間) if (server.repl_diskless_sync_delay) serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC"); } else { /* Target is disk (or the slave is not capable of supporting * diskless replication) and we don't have a BGSAVE in progress, * let's start one. */ 目標是磁盤(或者從機不支持無盤復制),當前沒有DBSAVE,那么讓我們開始一個bgsave if (!hasActiveChildProcess()) { 沒有活躍的下載子進程 startBgsaveForReplication(c->slave_capa); 開啟bgsave } else { serverLog(LL_NOTICE, "No BGSAVE in progress, but another BG operation is active. " "BGSAVE for replication delayed"); } } } return; } ********************************************************* 函數 masterTryPartialResynchronization 的邏輯如下: 是否合適做部分同步,不合適就需要做全量同步 /* This function handles the PSYNC command from the point of view of a * master receiving a request for partial resynchronization. * * On success return C_OK, otherwise C_ERR is returned and we proceed * with the usual full resync. */ 這個函數處PSYNC命令,從主機接收一個增量同步的命令請求 成功的情況下返回C_OK,否則犯返回C_ERR並且我們繼續使用全量同步操作 int masterTryPartialResynchronization(client *c) { long long psync_offset, psync_len; char *master_replid = c->argv[1]->ptr; char buf[128]; int buflen; /* Parse the replication offset asked by the slave. Go to full sync * on parse error: this should never happen but we try to handle * it in a robust way compared to aborting. */ 分析從機請求的賦值偏移量,如果失敗則轉移到全量同步: 這種情況正常不會發生, 但是我們嘗試用更加穩健的方式處理這個問題,從而程序不會終止 if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != C_OK) goto need_full_resync; /* Is the replication ID of this master the same advertised by the wannabe * slave via PSYNC? If the replication ID changed this master has a * different replication history, and there is no way to continue. * * Note that there are two potentially valid replication IDs: the ID1 * and the ID2. The ID2 however is only valid up to a specific offset. */ 當前的復制ID是否是從機PSYNC請求命令的id?如果復制ID不同,而且這個主機擁有不同的復制歷史,那么就不能部分增量復制。 注意到潛在的兩個有效復制ID: ID1和ID2. ID2只在特定偏移量下有效。 這里的意思是 如果請求的復制ID和當前主機的復制ID不同,那么表明之前不是從該主機復制的數據, 這里需要判斷ID2,看看是否和當前主機有過共同的主機,有的話,查看復制的進度(這里就是偏移量)是否能覆蓋從機目前的進度, 不能的話,就需要從新主機全量復制 if (strcasecmp(master_replid, server.replid) && 是否不同於當前主機的id (strcasecmp(master_replid, server.replid2) || 是否和當前主機有曾經共同的主機 或者 psync_offset > server.second_replid_offset)) 進度比當前主機在曾經共同主機的復制進度快 { /* Run id "?" is used by slaves that want to force a full resync. */ 如果使用?代替40位的運行ID,表示從機需要強制執行一個全量同步 if (master_replid[0] != '?') { 如果不是強制執行全量同步 if (strcasecmp(master_replid, server.replid) && 請求ID是否等等同於當前主機ID strcasecmp(master_replid, server.replid2)) 請求ID是否等同於曾經的主機ID { 如果都不同,那必須是全量同步 serverLog(LL_NOTICE,"Partial resynchronization not accepted: " "Replication ID mismatch (Replica asked for '%s', my " "replication IDs are '%s' and '%s')", master_replid, server.replid, server.replid2); } else { 在都有曾經共同主機的情況下,進度太靠前(超過當前主機,當前主機無法提供有效信息),也需要全量同步 serverLog(LL_NOTICE,"Partial resynchronization not accepted: " "Requested offset for second ID was %lld, but I can reply " "up to %lld", psync_offset, server.second_replid_offset); } } else { 強制進行全量同步 serverLog(LL_NOTICE,"Full resync requested by replica %s", replicationGetSlaveName(c)); } goto need_full_resync; 需要全量同步 } /* We still have the data our slave is asking for? */ 我們是否還擁有從機請求的數據 if (!server.repl_backlog || 存在復制后台日志 psync_offset < server.repl_backlog_off || 請求進度小於當前最小緩存數據地址 psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) 請求進度超過當前最大緩存數據地址 { serverLog(LL_NOTICE, "Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset); if (psync_offset > server.master_repl_offset) { 超過當前主機的同步偏移量 serverLog(LL_WARNING, "Warning: replica %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c)); } goto need_full_resync; 沒有請求數據的情況下,需要全量同步 } /* If we reached this point, we are able to perform a partial resync: * 1) Set client state to make it a slave. * 2) Inform the client we can continue with +CONTINUE * 3) Send the backlog data (from the offset to the end) to the slave. */ 如果程序執行到這里,那么意味着我們能夠執行增量同步: 1設置客戶端為一個從機 2通知客戶端我們能夠繼續增量同步+CONTINUE 3發送后端日志數據給從機(從偏移量開始到結束位置的數據) c->flags |= CLIENT_SLAVE; c->replstate = SLAVE_STATE_ONLINE; c->repl_ack_time = server.unixtime; c->repl_put_online_on_ack = 0; listAddNodeTail(server.slaves,c); 添加到從機隊列 /* We can't use the connection buffers since they are used to accumulate * new commands at this stage. But we are sure the socket send buffer is * empty so this write will never fail actually. */ 我們不能使用連接緩存,因為它這個時候正在被累積新的命令使用。但是我們確認這時發送套接字緩存是空的, 所以這個寫不會實際的失敗 if (c->slave_capa & SLAVE_CAPA_PSYNC2) { 是否支持ps2的協議 buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid); } else { buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); } if (connWrite(c->conn,buf,buflen) != buflen) { freeClientAsync(c); 異步釋放客戶端 return C_OK; } psync_len = addReplyReplicationBacklog(c,psync_offset); 將數據發送給客戶端 serverLog(LL_NOTICE, "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.", replicationGetSlaveName(c), psync_len, psync_offset); /* Note that we don't need to set the selected DB at server.slaveseldb * to -1 to force the master to emit SELECT, since the slave already * has this state from the previous connection with the master. */ 注意到我們不需要在主機上設置選擇 數據庫為-1,強制主機發出select命令。 因為從機從上次和主機交互的過程中已經設置了這個狀態 refreshGoodSlavesCount(); /* Fire the replica change modules event. */ 觸發復制改變的模塊事件,即如果有注冊對於復制改變的模塊事件,就會執行 moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE, REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE, NULL); return C_OK; /* The caller can return, no full resync needed. */ 返回給調用者,不需要全量同步 need_full_resync: /* We need a full resync for some reason... Note that we can't * reply to PSYNC right now if a full SYNC is needed. The reply * must include the master offset at the time the RDB file we transfer * is generated, so we need to delay the reply to that moment. */ 因為實際情況,我們需要進行全量同步。。。注意到我們現在不能回復增量同步,因為需要全量同步。 回復必須包括主機准備傳輸的rdb文件生成時刻的偏移量,因此我們需要延遲到那個時刻(生成RDB文件的時刻) 這個理解為 全量傳輸除去生成的全量RDB文件,還需要額外的新累積數據, 所以需要記住生成RDB時候的位置,一遍傳輸新增的命令數據 return C_ERR; } **************************************************** 開啟全量同步 /* Start a BGSAVE for replication goals, which is, selecting the disk or * socket target depending on the configuration, and making sure that * the script cache is flushed before to start. 為復制開始BGSAVE,根據配置選擇磁盤或者套接字為目標地址,確保在開始之前清空腳本緩存 * The mincapa argument is the bitwise AND among all the slaves capabilities * of the slaves waiting for this BGSAVE, so represents the slave capabilities * all the slaves support. Can be tested via SLAVE_CAPA_* macros. 參數mincapa是按位與的包含當前等待BGSAVE的所有該從機的能力,因此代表了從機支持的所有能力。 可以通過宏定義SLAVE_CAPA_*來測試 * Side effects, other than starting a BGSAVE: 除去開啟gbsave的伴隨效應 * 1) Handle the slaves in WAIT_START state, by preparing them for a full * sync if the BGSAVE was successfully started, or sending them an error * and dropping them from the list of slaves. 1如果BGSAVE開始成功啟動,開始全量同步,那么設置從機狀態為WAIT_START, 如果失敗,那么就給從機發送錯誤並且將它們從從機列表移除。 * 2) Flush the Lua scripting script cache if the BGSAVE was actually * started. 如果BGSAVE成功開啟,清空lua腳本緩存 * Returns C_OK on success or C_ERR otherwise. */ 成功返回C_OK,失敗返回C_ERR int startBgsaveForReplication(int mincapa) { int retval; int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF); 目標地址是否為套接字,通過是否設置無盤標識 和 從機是否支持無盤接受 判斷 listIter li; listNode *ln; serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s", socket_target ? "replicas sockets" : "disk"); rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); 填充保存相關信息 /* Only do rdbSave* when rsiptr is not NULL, * otherwise slave will miss repl-stream-db. */ 只有當rsiptr非空時候才調用函數rdbSave*保存信息,空的時候從機就沒有主機的復制流db信息 if (rsiptr) { if (socket_target) retval = rdbSaveToSlavesSockets(rsiptr); 開啟套接字保存(流保存) else retval = rdbSaveBackground(server.rdb_filename,rsiptr); 開始本地磁盤保存 } else { serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later."); retval = C_ERR; } /* If we succeeded to start a BGSAVE with disk target, let's remember * this fact, so that we can later delete the file if needed. Note * that we don't set the flag to 1 if the feature is disabled, otherwise * it would never be cleared: the file is not deleted. This way if * the user enables it later with CONFIG SET, we are fine. */ 如果我們成功開始目標為磁盤的BGSAVE,讓我們記住這個事實,因此我們后面如果有需要就可以刪除這個保存的文件。 注意到我們不設置這個刪除標志rdb_del_sync_files為1,那么文件就不會被刪除。 但是如果用戶后面通過配置設置,使得刪除標志有效,那么就可以刪除了 if (retval == C_OK && !socket_target && server.rdb_del_sync_files) RDBGeneratedByReplication = 1; /* If we failed to BGSAVE, remove the slaves waiting for a full * resynchronization from the list of slaves, inform them with * an error about what happened, close the connection ASAP. */ 如果我們開始BGSAVE失敗,那么從從機列表中刪除等待全量同步的從機, 告知它們發生的錯誤信息,盡快關閉連接 if (retval == C_ERR) { serverLog(LL_WARNING,"BGSAVE for replication failed"); listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { 處於等待BGSAVE狀態的從機 slave->replstate = REPL_STATE_NONE; slave->flags &= ~CLIENT_SLAVE; listDelNode(server.slaves,ln); addReplyError(slave, "BGSAVE failed, replication can't continue"); slave->flags |= CLIENT_CLOSE_AFTER_REPLY; } } return retval; } /* If the target is socket, rdbSaveToSlavesSockets() already setup * the slaves for a full resync. Otherwise for disk target do it now.*/ 如果目標是套接字,那么函數rdbSaveToSlavesSockets早已設置從機為全量同步。 這里是對於磁盤為目標的設置 if (!socket_target) { listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { replicationSetupSlaveForFullResync(slave, getPsyncInitialOffset()); 設置從機開啟全量同步的狀態,並且通知從機全量同步的信息 } } } /* Flush the script cache, since we need that slave differences are * accumulated without requiring slaves to match our cached scripts. */ 刷新緩存腳本,因為我們需要累積的從機差異,但是不需要匹配緩存的腳本 if (retval == C_OK) replicationScriptCacheFlush(); return retval; }