前面幾篇我們已經完全理解了redis的基本功能的實現了。
但單靠基本功能實現,往往還是稱不上優秀的項目的。畢竟,我們現在面對的都是復雜的環境,高並發的場景,大數據量的可能。
簡而言之,現在的系統一般都需要支持分布式部署,不存在單點問題,才算是一個合格的系統。
而redis作為一個存儲系統,單點問題肯定是不行的。
最簡單的,就是起碼得支持讀寫分離功能,因為我們面臨的許多問題,一般是面對大量的查詢問題。而要做到讀寫分離功能,就是要把主節點的數據同步到從節點上。從而可以讓從節點接受讀請求,以減輕主節點的讀壓力。
就讓我們來分析下 Redis 是如何進行主從同步數據的吧!主從同步,換個名稱也就是數據復制。
0. 主從復制的作用
數據冗余:主從復制實現了數據的熱備份,是持久化之外的一種數據冗余方式。
故障恢復:當主節點出現問題時,可以由從節點提供服務,實現快速的故障恢復;實際上是一種服務的冗余。
負載均衡:在主從復制的基礎上,配合讀寫分離,可以由主節點提供寫服務,由從節點提供讀服務(即寫Redis數據時應用連接主節點,讀Redis數據時應用連接從節點),分擔服務器負載;尤其是在寫少讀多的場景下,通過多個從節點分擔讀負載,可以大大提高Redis服務器的並發量。
讀寫分離:可以用於實現讀寫分離,主庫寫、從庫讀,讀寫分離不僅可以提高服務器的負載能力,同時可根據需求的變化,改變從庫的數量;
高可用基石:除了上述作用以外,主從復制還是哨兵和集群能夠實施的基礎,因此說主從復制是Redis高可用的基礎。
1. Redis 主從復制簡介
在主從復制中,數據庫分為兩類,一類是主庫(master),另一類是同步主庫數據的從庫(slave)。主庫可以進行讀寫操作,當寫操作導致數據變化時會自動同步到從庫。而從庫一般是只讀的(特定情況也可以寫,通過參數slave-read-only指定),並接受來自主庫的數據,一個主庫可擁有多個從庫,而一個從庫只能有一個主庫。這樣就使得redis的主從架構有了兩種模式:一類是一主多從如下圖1,二類是“鏈式主從復制”--主->從->主-從如下圖2。
2. Redis 主從復制的操作步驟簡略說明
1. 首先,你得有至少2個redis server 實例,單機多實例或者多機多實例皆可。
2. 配置主從關系,使用 slaveof master_host master_port; (config rewrite 可直接寫入配置文件,避免每次都重新寫)
3. 驗證主從配置,使用 info Replication;
上面的操作步驟是進行實時操作的,也可以直接將 master/slave 配置放到 redis.conf 中,啟動時直接加載。
當master需要使用密碼進行訪問時,可以使用命令 masterauth 進行授權。
masterauth 123456 # 寫到redis.conf配置文件中 config set masterauth 123456 # 通過命令行進行授權
3. 主要同步的實現原理
主從復制大致流程為:
1. slaveof 是我們的開啟方法,它會將master信息寫入到從節點;
2. 然后與master進行建立連接;
3. 然后master決定復制方式是全量同步還是部分同步;
4. master進行數據准備;
5. 將需要同步的發送給slave節點;
6. 從節點執行發送過來的數據;
但是,我們需要進行深入理解。
3.1. slaveof 命令源碼解析
slaveof 為我們操作開啟主從復制開啟了入口,其接口定義如下:
{"slaveof",slaveofCommand,3,"ast",0,NULL,0,0,0,0,0},
// 用法 slaveof <master_host> <master_port> 建立主從關系 // slaveof no one 取消主從同步 // replication.c void slaveofCommand(client *c) { /* SLAVEOF is not allowed in cluster mode as replication is automatically * configured using the current address of the master node. */ if (server.cluster_enabled) { addReplyError(c,"SLAVEOF not allowed in cluster mode."); return; } /* The special host/port combination "NO" "ONE" turns the instance * into a master. Otherwise the new master address is set. */ // slaveof no one, 取消主從同步 if (!strcasecmp(c->argv[1]->ptr,"no") && !strcasecmp(c->argv[2]->ptr,"one")) { if (server.masterhost) { // 取消當前的master關聯,返回客戶端目前狀態信息,結束 replicationUnsetMaster(); sds client = catClientInfoString(sdsempty(),c); serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')", client); sdsfree(client); } } else { long port; if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK)) return; /* Check if we are already attached to the specified slave */ // 只能和一個 master 建立主從關系 if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr) && server.masterport == port) { serverLog(LL_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed."); addReplySds(c,sdsnew("+OK Already connected to specified master\r\n")); return; } /* There was no previous master or the user specified a different one, * we can continue. */ // 設置master信息 replicationSetMaster(c->argv[1]->ptr, port); // 輸出client狀態信息 sds client = catClientInfoString(sdsempty(),c); serverLog(LL_NOTICE,"SLAVE OF %s:%d enabled (user request from '%s')", server.masterhost, server.masterport, client); sdsfree(client); } addReply(c,shared.ok); } // 綁定新的master關聯 /* Set replication to the specified master address and port. */ void replicationSetMaster(char *ip, int port) { sdsfree(server.masterhost); server.masterhost = sdsnew(ip); server.masterport = port; if (server.master) freeClient(server.master); // slave 不進行阻塞客戶端 disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */ // 斷開所有 slave 連接 disconnectSlaves(); /* Force our slaves to resync with us as well. */ // cacheMaster 丟棄 replicationDiscardCachedMaster(); /* Don't try a PSYNC. */ // 鏈式主從復制刪除 freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ // 斷開正在連接slave請求 cancelReplicationHandshake(); server.repl_state = REPL_STATE_CONNECT; server.master_repl_offset = 0; server.repl_down_since = 0; } // 取消master關聯 /* Cancel replication, setting the instance as a master itself. */ void replicationUnsetMaster(void) { if (server.masterhost == NULL) return; /* Nothing to do. */ sdsfree(server.masterhost); server.masterhost = NULL; if (server.master) { if (listLength(server.slaves) == 0) { /* If this instance is turned into a master and there are no * slaves, it inherits the replication offset from the master. * Under certain conditions this makes replicas comparable by * replication offset to understand what is the most updated. */ server.master_repl_offset = server.master->reploff; freeReplicationBacklog(); } freeClient(server.master); } replicationDiscardCachedMaster(); cancelReplicationHandshake(); server.repl_state = REPL_STATE_NONE; } // blocked.c, 解除所有的阻塞客戶端 /* Mass-unblock clients because something changed in the instance that makes * blocking no longer safe. For example clients blocked in list operations * in an instance which turns from master to slave is unsafe, so this function * is called when a master turns into a slave. * * The semantics is to send an -UNBLOCKED error to the client, disconnecting * it at the same time. */ void disconnectAllBlockedClients(void) { listNode *ln; listIter li; listRewind(server.clients,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); if (c->flags & CLIENT_BLOCKED) { addReplySds(c,sdsnew( "-UNBLOCKED force unblock from blocking operation, " "instance state changed (master -> slave?)\r\n")); unblockClient(c); c->flags |= CLIENT_CLOSE_AFTER_REPLY; } } } // networking.c, 斷開所有的 slave 連接 /* Close all the slaves connections. This is useful in chained replication * when we resync with our own master and want to force all our slaves to * resync with us as well. */ void disconnectSlaves(void) { while (listLength(server.slaves)) { listNode *ln = listFirst(server.slaves); freeClient((client*)ln->value); } } // replication.c /* Free a cached master, called when there are no longer the conditions for * a partial resync on reconnection. */ void replicationDiscardCachedMaster(void) { if (server.cached_master == NULL) return; serverLog(LL_NOTICE,"Discarding previously cached master state."); server.cached_master->flags &= ~CLIENT_MASTER; freeClient(server.cached_master); server.cached_master = NULL; } // replication.c void freeReplicationBacklog(void) { serverAssert(listLength(server.slaves) == 0); zfree(server.repl_backlog); server.repl_backlog = NULL; } // replication.c /* This function aborts a non blocking replication attempt if there is one * in progress, by canceling the non-blocking connect attempt or * the initial bulk transfer. * * If there was a replication handshake in progress 1 is returned and * the replication state (server.repl_state) set to REPL_STATE_CONNECT. * * Otherwise zero is returned and no operation is perforemd at all. */ int cancelReplicationHandshake(void) { if (server.repl_state == REPL_STATE_TRANSFER) { replicationAbortSyncTransfer(); server.repl_state = REPL_STATE_CONNECT; } else if (server.repl_state == REPL_STATE_CONNECTING || slaveIsInHandshakeState()) { undoConnectWithMaster(); server.repl_state = REPL_STATE_CONNECT; } else { return 0; } return 1; } // networking.c /* Concatenate a string representing the state of a client in an human * readable format, into the sds string 's'. */ sds catClientInfoString(sds s, client *client) { char flags[16], events[3], *p; int emask; p = flags; if (client->flags & CLIENT_SLAVE) { if (client->flags & CLIENT_MONITOR) *p++ = 'O'; else *p++ = 'S'; } if (client->flags & CLIENT_MASTER) *p++ = 'M'; if (client->flags & CLIENT_MULTI) *p++ = 'x'; if (client->flags & CLIENT_BLOCKED) *p++ = 'b'; if (client->flags & CLIENT_DIRTY_CAS) *p++ = 'd'; if (client->flags & CLIENT_CLOSE_AFTER_REPLY) *p++ = 'c'; if (client->flags & CLIENT_UNBLOCKED) *p++ = 'u'; if (client->flags & CLIENT_CLOSE_ASAP) *p++ = 'A'; if (client->flags & CLIENT_UNIX_SOCKET) *p++ = 'U'; if (client->flags & CLIENT_READONLY) *p++ = 'r'; if (p == flags) *p++ = 'N'; *p++ = '\0'; emask = client->fd == -1 ? 0 : aeGetFileEvents(server.el,client->fd); p = events; if (emask & AE_READABLE) *p++ = 'r'; if (emask & AE_WRITABLE) *p++ = 'w'; *p = '\0'; // 可變參數定義: sds sdscatfmt(sds s, char const *fmt, ...) return sdscatfmt(s, "id=%U addr=%s fd=%i name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U obl=%U oll=%U omem=%U events=%s cmd=%s", (unsigned long long) client->id, getClientPeerId(client), client->fd, client->name ? (char*)client->name->ptr : "", (long long)(server.unixtime - client->ctime), (long long)(server.unixtime - client->lastinteraction), flags, client->db->id, (int) dictSize(client->pubsub_channels), (int) listLength(client->pubsub_patterns), (client->flags & CLIENT_MULTI) ? client->mstate.count : -1, (unsigned long long) sdslen(client->querybuf), (unsigned long long) sdsavail(client->querybuf), (unsigned long long) client->bufpos, (unsigned long long) listLength(client->reply), (unsigned long long) getClientOutputBufferMemoryUsage(client), events, client->lastcmd ? client->lastcmd->name : "NULL"); }
所以,slaveof 只是做簡單的驗證,然后設置了下 master 信息,然后就返回了。那么是誰在做同步的工作呢?
其實同步任務是由 cron 任務運行的。
3.2. 如何執行同步任務?
因為復制是比較耗性能的東西,如果和用戶線程共享處理過程的話,將可能引起並發性能的。所以,redis使用異步 cron 任務的形式實現主從復制功能。
// server.c, 初始化server,注冊 cron void initServer(void) { ... /* Create out timers, that's our main way to process background * operations. */ // 添加 serverCron 到 eventLoop 中,以便后續可以執行定時腳本 if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { serverPanic("Can't create event loop timers."); exit(1); } ... } // ae.c, 添加時間事件 long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc) { long long id = eventLoop->timeEventNextId++; aeTimeEvent *te; te = zmalloc(sizeof(*te)); if (te == NULL) return AE_ERR; te->id = id; aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms); te->timeProc = proc; te->finalizerProc = finalizerProc; te->clientData = clientData; te->next = eventLoop->timeEventHead; eventLoop->timeEventHead = te; return id; } // server.c, 主腳本運行入口, 每1秒運行1次 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { ... /* Replication cron function -- used to reconnect to master and * to detect transfer failures. */ // 主從復制,連接 master,我們的入口 run_with_period(1000) replicationCron(); ... server.cronloops++; return 1000/server.hz; } // 重點入口: replicationCron() // replication.c, 主從復制定時腳本 /* Replication cron function, called 1 time per second. */ void replicationCron(void) { static long long replication_cron_loops = 0; /* Non blocking connection timeout? */ // 連接超時處理,取消重連 if (server.masterhost && (server.repl_state == REPL_STATE_CONNECTING || slaveIsInHandshakeState()) && (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) { serverLog(LL_WARNING,"Timeout connecting to the MASTER..."); cancelReplicationHandshake(); } /* Bulk transfer I/O timeout? */ // 傳輸數據超時,取消重連 if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER && (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) { serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value."); cancelReplicationHandshake(); } /* Timed out master when we are an already connected slave? */ // slave 會話超時 if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED && (time(NULL)-server.master->lastinteraction) > server.repl_timeout) { serverLog(LL_WARNING,"MASTER timeout: no data nor PING received..."); freeClient(server.master); } /* Check if we should connect to a MASTER */ // 3.2.1. 初次設置master時,一定會進行連接處理 if (server.repl_state == REPL_STATE_CONNECT) { serverLog(LL_NOTICE,"Connecting to MASTER %s:%d", server.masterhost, server.masterport); if (connectWithMaster() == C_OK) { serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started"); } } /* Send ACK to master from time to time. * Note that we do not send periodic acks to masters that don't * support PSYNC and replication offsets. */ // 3.2.2. 每次定時任務執行,都會發生 ACK 給master if (server.masterhost && server.master && !(server.master->flags & CLIENT_PRE_PSYNC)) replicationSendAck(); /* If we have attached slaves, PING them from time to time. * So slaves can implement an explicit timeout to masters, and will * be able to detect a link disconnection even if the TCP connection * will not actually go down. */ listIter li; listNode *ln; robj *ping_argv[1]; /* First, send PING according to ping_slave_period. */ // 3.2.3. 發送 PING 請求 // 默認 repl_ping_slave_period: 10 if ((replication_cron_loops % server.repl_ping_slave_period) == 0) { ping_argv[0] = createStringObject("PING",4); replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1); decrRefCount(ping_argv[0]); } /* Second, send a newline to all the slaves in pre-synchronization * stage, that is, slaves waiting for the master to create the RDB file. * The newline will be ignored by the slave but will refresh the * last-io timer preventing a timeout. In this case we ignore the * ping period and refresh the connection once per second since certain * timeouts are set at a few seconds (example: PSYNC response). */ // 3.2.4. 向以當前節點為master的slaves 發送空行數據 listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START || (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && server.rdb_child_type != RDB_CHILD_TYPE_SOCKET)) { if (write(slave->fd, "\n", 1) == -1) { /* Don't worry, it's just a ping. */ } } } /* Disconnect timedout slaves. */ // 斷開連接超時的 slaves if (listLength(server.slaves)) { listIter li; listNode *ln; listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (slave->replstate != SLAVE_STATE_ONLINE) continue; if (slave->flags & CLIENT_PRE_PSYNC) continue; if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) { serverLog(LL_WARNING, "Disconnecting timedout slave: %s", replicationGetSlaveName(slave)); freeClient(slave); } } } /* If we have no attached slaves and there is a replication backlog * using memory, free it after some (configured) time. */ // 如果沒有slave 跟隨當前節點,一段時間后將backlog 釋放掉 if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit && server.repl_backlog) { time_t idle = server.unixtime - server.repl_no_slaves_since; if (idle > server.repl_backlog_time_limit) { freeReplicationBacklog(); serverLog(LL_NOTICE, "Replication backlog freed after %d seconds " "without connected slaves.", (int) server.repl_backlog_time_limit); } } /* If AOF is disabled and we no longer have attached slaves, we can * free our Replication Script Cache as there is no need to propagate * EVALSHA at all. */ if (listLength(server.slaves) == 0 && server.aof_state == AOF_OFF && listLength(server.repl_scriptcache_fifo) != 0) { replicationScriptCacheFlush(); } /* If we are using diskless replication and there are slaves waiting * in WAIT_BGSAVE_START state, check if enough seconds elapsed and * start a BGSAVE. * * This code is also useful to trigger a BGSAVE if the diskless * replication was turned off with CONFIG SET, while there were already * slaves in WAIT_BGSAVE_START state. */ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { time_t idle, max_idle = 0; int slaves_waiting = 0; int mincapa = -1; listNode *ln; listIter li; listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { idle = server.unixtime - slave->lastinteraction; if (idle > max_idle) max_idle = idle; slaves_waiting++; mincapa = (mincapa == -1) ? slave->slave_capa : (mincapa & slave->slave_capa); } } // 3.2.5. 如果有等待同步的slave, 且等待時間超過 server.repl_diskless_sync_delay, 默認是: 5s if (slaves_waiting && max_idle > server.repl_diskless_sync_delay) { /* Start a BGSAVE. Usually with socket target, or with disk target * if there was a recent socket -> disk config change. */ startBgsaveForReplication(mincapa); } } /* Refresh the number of slaves with lag <= min-slaves-max-lag. */ // 刷新本節點的 從健康節點 數量,以便在需要確保多少節點時才進行寫入的場景判定 refreshGoodSlavesCount(); replication_cron_loops++; /* Incremented with frequency 1 HZ. */ }
以上,就是整個主從復制的主體框架了。且以上代碼包含了兩種角色的運行機制。1: master 的運行; 2. slave 的運行;
slave 的運行過程如下:
1. 從節點每秒運行一次定時任務;
2. 當定時任務發現存在新的主節點后,會調用 connectWithMaster() 嘗試與master節點建立網絡連接;
3. 建立連接后,由 syncWithMaster() 進行處理后續同步事務;
4. 各種連接超時釋放處理;
master 的運行過程如下:
1. 各種連接超時釋放處理;
2. 定期進行 PING slave 操作;
3. 向slave寫入一個空行,相當於ping操作與slave續租期;
4. 清理連接超時的slaves, 如果一個slave也沒有, 則直接把backlog釋放掉;
5. 如果未開啟磁盤持久化操作,且有等待同步的slaves, 則主動開啟一個 bgsave;
從上面的框架中,可以說大部分時候都是在處理各種異常問題和續期問題,但是實際最重要的一個連接master操作卻只有一行代碼。那么slave連接master之后,是如何進行后續的同步的呢?好像這個定時任務的運行並沒有太大的作用呢!
3.3. 從節點如何處理同步操作?
從節點是整個同步操作的操控者,整個同步可以說都是其主導的。從上一節的過程,我們可以看到,只有一個連接master的只剩,所以必定許多工作要這里完成。
實際上,slave連接到master的請求實現,基於 epoll 模型的異步操作,所以,在主框架中,我們只看到一個連接操作。因為連接完成后的操作,是異步執行的。 先總覽一個時序圖,然后再細分源碼:
可以看到,epoll 模型在這其中起到了很大作用,將許多同步工作轉換為了異步,避免了阻塞。
// replication.c, 連接請求到 master 節點 int connectWithMaster(void) { int fd; // 創建socket fd fd = anetTcpNonBlockBestEffortBindConnect(NULL, server.masterhost,server.masterport,NET_FIRST_BIND_ADDR); if (fd == -1) { serverLog(LL_WARNING,"Unable to connect to MASTER: %s", strerror(errno)); return C_ERR; } // 使用epoll模型進行異步連接 // 連接成功后,由 syncWithMaster 進行事件處理 // 關注 讀寫事件 if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == AE_ERR) { close(fd); serverLog(LL_WARNING,"Can't create readable event for SYNC"); return C_ERR; } server.repl_transfer_lastio = server.unixtime; server.repl_transfer_s = fd; // 狀態變更,以便下次不會再進行連接 server.repl_state = REPL_STATE_CONNECTING; return C_OK; } // anet.c, 建立一個非阻塞的socket連接 int anetTcpNonBlockBestEffortBindConnect(char *err, char *addr, int port, char *source_addr) { // ANET_CONNECT_BE_BINDING 代表將進行重試盡可能建立連接 return anetTcpGenericConnect(err,addr,port,source_addr, ANET_CONNECT_NONBLOCK|ANET_CONNECT_BE_BINDING); } // 與master連接成功后,由 syncWithMaster 進行處理后續事務 // replication.c void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { char tmpfile[256], *err = NULL; int dfd, maxtries = 5; int sockerr = 0, psync_result; socklen_t errlen = sizeof(sockerr); UNUSED(el); UNUSED(privdata); UNUSED(mask); /* If this event fired after the user turned the instance into a master * with SLAVEOF NO ONE we must just return ASAP. */ if (server.repl_state == REPL_STATE_NONE) { close(fd); return; } /* Check for errors in the socket. */ if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1) sockerr = errno; if (sockerr) { serverLog(LL_WARNING,"Error condition on socket for SYNC: %s", strerror(sockerr)); goto error; } /* Send a PING to check the master is able to reply without errors. */ if (server.repl_state == REPL_STATE_CONNECTING) { serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event."); /* Delete the writable event so that the readable event remains * registered and we can wait for the PONG reply. */ aeDeleteFileEvent(server.el,fd,AE_WRITABLE); server.repl_state = REPL_STATE_RECEIVE_PONG; /* Send the PING, don't check for errors at all, we have the timeout * that will take care about this. */ // 發送一個 PING 出去,檢查 master 是否可以響應 err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL); if (err) goto write_error; return; } /* Receive the PONG command. */ if (server.repl_state == REPL_STATE_RECEIVE_PONG) { // 同步讀取PING結果 err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); /* We accept only two replies as valid, a positive +PONG reply * (we just check for "+") or an authentication error. * Note that older versions of Redis replied with "operation not * permitted" instead of using a proper error code, so we test * both. */ // 沒有權限且提示不是請授權類的提示,則發生錯誤 // 沒有調用 auth 前 // -NOAUTH, 代表未授權, 可以進入下一步授權操作 if (err[0] != '+' && strncmp(err,"-NOAUTH",7) != 0 && strncmp(err,"-ERR operation not permitted",28) != 0) { serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err); sdsfree(err); goto error; } else { serverLog(LL_NOTICE, "Master replied to PING, replication can continue..."); } sdsfree(err); server.repl_state = REPL_STATE_SEND_AUTH; } /* AUTH with the master if required. */ // 需要輸入master密碼狀態 if (server.repl_state == REPL_STATE_SEND_AUTH) { if (server.masterauth) // 發送授權命令 // AUTH master_password err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL); if (err) goto write_error; server.repl_state = REPL_STATE_RECEIVE_AUTH; return; } else { server.repl_state = REPL_STATE_SEND_PORT; } } /* Receive AUTH reply. */ if (server.repl_state == REPL_STATE_RECEIVE_AUTH) { // 授權響應,讀取結果 // 授權成功響應 +OK, 其他授權失敗 err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); if (err[0] == '-') { serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err); sdsfree(err); goto error; } sdsfree(err); server.repl_state = REPL_STATE_SEND_PORT; } /* Set the slave port, so that Master's INFO command can list the * slave listening port correctly. */ // 發送端口號給master, 以便master可以列舉出所有slave的端口號 if (server.repl_state == REPL_STATE_SEND_PORT) { sds port = sdsfromlonglong(server.port); // 發送本節點的端口給 master // 命令: REPLCONF listening-port port err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF", "listening-port",port, NULL); sdsfree(port); if (err) goto write_error; sdsfree(err); server.repl_state = REPL_STATE_RECEIVE_PORT; return; } /* Receive REPLCONF listening-port reply. */ if (server.repl_state == REPL_STATE_RECEIVE_PORT) { err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); /* Ignore the error if any, not all the Redis versions support * REPLCONF listening-port. */ // 忽略失敗情況,影響不大,只是個展示問題,且並非所有版本都支持該命令 if (err[0] == '-') { serverLog(LL_NOTICE,"(Non critical) Master does not understand " "REPLCONF listening-port: %s", err); } sdsfree(err); server.repl_state = REPL_STATE_SEND_CAPA; } /* Inform the master of our capabilities. While we currently send * just one capability, it is possible to chain new capabilities here * in the form of REPLCONF capa X capa Y capa Z ... * The master will ignore capabilities it does not understand. */ if (server.repl_state == REPL_STATE_SEND_CAPA) { // 發送命令: REPLCONF capa eof err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF", "capa","eof",NULL); if (err) goto write_error; sdsfree(err); server.repl_state = REPL_STATE_RECEIVE_CAPA; return; } /* Receive CAPA reply. */ if (server.repl_state == REPL_STATE_RECEIVE_CAPA) { err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); /* Ignore the error if any, not all the Redis versions support * REPLCONF capa. */ if (err[0] == '-') { serverLog(LL_NOTICE,"(Non critical) Master does not understand " "REPLCONF capa: %s", err); } sdsfree(err); // 可以進行數據同步了 PSYNC server.repl_state = REPL_STATE_SEND_PSYNC; } /* Try a partial resynchonization. If we don't have a cached master * slaveTryPartialResynchronization() will at least try to use PSYNC * to start a full resynchronization so that we get the master run id * and the global offset, to try a partial resync at the next * reconnection attempt. */ if (server.repl_state == REPL_STATE_SEND_PSYNC) { // 嘗試進行部分同步, 可能為 全量同步、部分同步、或者命令不支持 // PSYNC_WAIT_REPLY, PSYNC_CONTINUE, PSYNC_FULLRESYNC, PSYNC_NOT_SUPPORTED if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) { err = sdsnew("Write error sending the PSYNC command."); goto write_error; } server.repl_state = REPL_STATE_RECEIVE_PSYNC; return; } /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC. */ if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) { serverLog(LL_WARNING,"syncWithMaster(): state machine error, " "state should be RECEIVE_PSYNC but is %d", server.repl_state); goto error; } // 讀取 PSYNC 結果 // PSYNC_WAIT_REPLY, PSYNC_CONTINUE, PSYNC_FULLRESYNC, PSYNC_NOT_SUPPORTED psync_result = slaveTryPartialResynchronization(fd,1); if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ /* Note: if PSYNC does not return WAIT_REPLY, it will take care of * uninstalling the read handler from the file descriptor. */ if (psync_result == PSYNC_CONTINUE) { serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization."); return; } /* PSYNC failed or is not supported: we want our slaves to resync with us * as well, if we have any (chained replication case). The mater may * transfer us an entirely different data set and we have no way to * incrementally feed our slaves after that. */ // 不能使用 PSYNC 進行同步,斷開當前節點的 slaves // 不允許鏈式主從 disconnectSlaves(); /* Force our slaves to resync with us as well. */ freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC * and the server.repl_master_runid and repl_master_initial_offset are * already populated. */ if (psync_result == PSYNC_NOT_SUPPORTED) { serverLog(LL_NOTICE,"Retrying with SYNC..."); // 不支持 PSYNC, 降級為 SYNC if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) { serverLog(LL_WARNING,"I/O error writing to MASTER: %s", strerror(errno)); goto error; } } /* Prepare a suitable temp file for bulk transfer */ // 准備從rdb文件中讀取數據,最多重試5次(共5s) // 臨時文件名: temp-<1560888xxx>.<pid>.rdb while(maxtries--) { snprintf(tmpfile,256, "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); if (dfd != -1) break; sleep(1); } if (dfd == -1) { serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno)); goto error; } /* Setup the non blocking download of the bulk file. */ // 使用 epoll 模型進行異步接收master傳送過來的rdb文件 // 由 readSyncBulkPayload 函數進行結果處理 if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL) == AE_ERR) { serverLog(LL_WARNING, "Can't create readable event for SYNC: %s (fd=%d)", strerror(errno),fd); goto error; } // 保存同步狀態 server.repl_state = REPL_STATE_TRANSFER; server.repl_transfer_size = -1; server.repl_transfer_read = 0; server.repl_transfer_last_fsync_off = 0; server.repl_transfer_fd = dfd; server.repl_transfer_lastio = server.unixtime; server.repl_transfer_tmpfile = zstrdup(tmpfile); return; error: aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); close(fd); server.repl_transfer_s = -1; server.repl_state = REPL_STATE_CONNECT; return; write_error: /* Handle sendSynchronousCommand(SYNC_CMD_WRITE) errors. */ serverLog(LL_WARNING,"Sending command to master in replication handshake: %s", err); sdsfree(err); goto error; }
整個連接成功之后的處理過程還是比較繁雜的,主要邏輯就在 syncWithMaster,主要是在各個狀態之間的轉換,尤其頭疼,不過幸好都是流水式的一步步下來。
1. REPL_STATE_CONNECTING: 待連接狀態. slave 發送 PING命令進行主動連接, 然后將狀態置為 REPL_STATE_RECEIVE_PONG;
2. REPL_STATE_RECEIVE_PONG: 待master響應狀態. slave同步等待結果(其實一般會立即獲取到,因為epoll已經准備好,才會調用此狀態),判斷是否PING正常后, 將狀態置為 REPL_STATE_SEND_AUTH;
3. REPL_STATE_SEND_AUTH: 等待授權狀態. slave 發送 auth passwd 給master后, 將狀態置為 REPL_STATE_RECEIVE_AUTH;
4. REPL_STATE_RECEIVE_AUTH: 等待授權響應狀態. slave同步等待結果, 判斷授權通過后, 將狀態置為 REPL_STATE_SEND_PORT;
5. REPL_STATE_SEND_PORT: 待發送端口狀態. slave發送自身的服務端口給master以便master展示使用, 然后將狀態置為 REPL_STATE_RECEIVE_PORT;
6. REPL_STATE_RECEIVE_PORT: 等待端口發送結果. 不論結果如何, 直接將狀態置為 REPL_STATE_SEND_CAPA;
7. REPL_STATE_SEND_CAPA: 等待發送capa命令狀態. 發送 REPLCONF capa eof 后, 將狀態置為 REPL_STATE_RECEIVE_CAPA;
8. REPL_STATE_RECEIVE_CAPA: 等待capa命令發送結果. 不論結果如何, 將狀態置為 REPL_STATE_SEND_PSYNC;
9. REPL_STATE_SEND_PSYNC: 等待PSYNC同步命令狀態. 嘗試使用PSYNC進行部分復制,結果可能是全量復制或部分復制,也可能使用其他版本命令執行, 將狀態置為 REPL_STATE_RECEIVE_PSYNC;
10. REPL_STATE_RECEIVE_PSYNC: 等待PSYNC結果. 這是真正接收數據的時候, 是終態, 根據上一次命令的請求方式,接收相應結果進一步處理;
11. 重新注冊一個 epoll 事件,用於接收master傳輸過來的數據,處理方法為 readSyncBulkPayload();
接下來,我們先看看嘗試部分時都做了哪些事,因為這決定了是使用全量復制還是部分復制:
// 嘗試進行部分同步 // replication.c int slaveTryPartialResynchronization(int fd, int read_reply) { char *psync_runid; char psync_offset[32]; sds reply; /* Writing half */ // 第一次調用時, read_reply=0, 即是寫動作 // 向 master 寫入 PSYNC psync_runid psync_offset // 即是每次都拉取一部分數據吧 if (!read_reply) { /* Initially set repl_master_initial_offset to -1 to mark the current * master run_id and offset as not valid. Later if we'll be able to do * a FULL resync using the PSYNC command we'll set the offset at the * right value, so that this information will be propagated to the * client structure representing the master into server.master. */ server.repl_master_initial_offset = -1; // 如果已經建立了連接,則 psync_runid, psync_offset 都是可預知的 // 否則 psync_runid = "?", psync_offset="-1"; if (server.cached_master) { psync_runid = server.cached_master->replrunid; snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1); serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset); } else { serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)"); psync_runid = "?"; memcpy(psync_offset,"-1",3); } /* Issue the PSYNC command */ // 首次發送命令 PSYNC ? -1 // 后續使用實際的信息 PSYNC psync_runid psync_offset reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_runid,psync_offset,NULL); if (reply != NULL) { serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply); sdsfree(reply); aeDeleteFileEvent(server.el,fd,AE_READABLE); return PSYNC_WRITE_ERROR; } return PSYNC_WAIT_REPLY; } /* Reading half */ // 讀取 PSYNC 的結果 reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); if (sdslen(reply) == 0) { /* The master may send empty newlines after it receives PSYNC * and before to reply, just to keep the connection alive. */ sdsfree(reply); return PSYNC_WAIT_REPLY; } aeDeleteFileEvent(server.el,fd,AE_READABLE); // +FULLRESYNC 代表需要進行全量復制,否則進行部分復制 // +FULLRESYNC runid offset if (!strncmp(reply,"+FULLRESYNC",11)) { char *runid = NULL, *offset = NULL; /* FULL RESYNC, parse the reply in order to extract the run id * and the replication offset. */ runid = strchr(reply,' '); if (runid) { runid++; offset = strchr(runid,' '); if (offset) offset++; } // runid 長度為 40 if (!runid || !offset || (offset-runid-1) != CONFIG_RUN_ID_SIZE) { serverLog(LL_WARNING, "Master replied with wrong +FULLRESYNC syntax."); /* This is an unexpected condition, actually the +FULLRESYNC * reply means that the master supports PSYNC, but the reply * format seems wrong. To stay safe we blank the master * runid to make sure next PSYNCs will fail. */ memset(server.repl_master_runid,0,CONFIG_RUN_ID_SIZE+1); } else { memcpy(server.repl_master_runid, runid, offset-runid-1); server.repl_master_runid[CONFIG_RUN_ID_SIZE] = '\0'; server.repl_master_initial_offset = strtoll(offset,NULL,10); serverLog(LL_NOTICE,"Full resync from master: %s:%lld", server.repl_master_runid, server.repl_master_initial_offset); } /* We are going to full resync, discard the cached master structure. */ // 全量同步,重置master緩存 replicationDiscardCachedMaster(); sdsfree(reply); return PSYNC_FULLRESYNC; } // 部分復制的情況下,只會返回 +CONTINUE if (!strncmp(reply,"+CONTINUE",9)) { /* Partial resync was accepted, set the replication state accordingly */ serverLog(LL_NOTICE, "Successful partial resynchronization with master."); // 立即將結果釋放,那什么時候處理結果呢? sdsfree(reply); // 實際上通過該方法同步數據的 replicationResurrectCachedMaster(fd); // 繼續使用 部分同步 return PSYNC_CONTINUE; } /* If we reach this point we received either an error since the master does * not understand PSYNC, or an unexpected reply from the master. * Return PSYNC_NOT_SUPPORTED to the caller in both cases. */ // PSYNC 不支持,因處理為降級版本 if (strncmp(reply,"-ERR",4)) { /* If it's not an error, log the unexpected event. */ serverLog(LL_WARNING, "Unexpected reply to PSYNC from master: %s", reply); } else { serverLog(LL_NOTICE, "Master does not support PSYNC or is in " "error state (reply: %s)", reply); } sdsfree(reply); replicationDiscardCachedMaster(); return PSYNC_NOT_SUPPORTED; }
通過上面的過程,我們可以看清了整個與master是如何協調進行同步的,主要依賴於 PSYNC 的返回值決定。也可以看到,全量同步功能時,注冊了一個可讀事件的監聽,具體處理使用 readSyncBulkPayload 進行承載。
3.4. 全量同步數據的實現方式
通過前面的分析,我們看到全量同時時,注冊了一個FileEvent事件,依賴於epoll實現異步操作。具體處理由 readSyncBulkPayload() 進行處理。它負責異步讀取master 同步過來的數據,寫入aof文件,加載到slave的數據庫中。具體如下:
// replication.c /* Asynchronously read the SYNC payload we receive from a master */ #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { char buf[4096]; ssize_t nread, readlen; off_t left; UNUSED(el); UNUSED(privdata); UNUSED(mask); /* Static vars used to hold the EOF mark, and the last bytes received * form the server: when they match, we reached the end of the transfer. */ static char eofmark[CONFIG_RUN_ID_SIZE]; static char lastbytes[CONFIG_RUN_ID_SIZE]; static int usemark = 0; /* If repl_transfer_size == -1 we still have to read the bulk length * from the master reply. */ // 先讀取數據長度 if (server.repl_transfer_size == -1) { if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) { serverLog(LL_WARNING, "I/O error reading bulk count from MASTER: %s", strerror(errno)); goto error; } if (buf[0] == '-') { serverLog(LL_WARNING, "MASTER aborted replication with an error: %s", buf+1); goto error; } else if (buf[0] == '\0') { /* At this stage just a newline works as a PING in order to take * the connection live. So we refresh our last interaction * timestamp. */ server.repl_transfer_lastio = server.unixtime; return; } else if (buf[0] != '$') { serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf); goto error; } /* There are two possible forms for the bulk payload. One is the * usual $<count> bulk format. The other is used for diskless transfers * when the master does not know beforehand the size of the file to * transfer. In the latter case, the following format is used: * * $EOF:<40 bytes delimiter> * * At the end of the file the announced delimiter is transmitted. The * delimiter is long and random enough that the probability of a * collision with the actual file content can be ignored. */ if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) { usemark = 1; memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE); memset(lastbytes,0,CONFIG_RUN_ID_SIZE); /* Set any repl_transfer_size to avoid entering this code path * at the next call. */ server.repl_transfer_size = 0; serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: receiving streamed RDB from master"); } else { usemark = 0; // 讀取數據長度, 寫入 server.repl_transfer_size, 后續判斷是否取完整數據 server.repl_transfer_size = strtol(buf+1,NULL,10); serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: receiving %lld bytes from master", (long long) server.repl_transfer_size); } return; } /* Read bulk data */ if (usemark) { readlen = sizeof(buf); } else { left = server.repl_transfer_size - server.repl_transfer_read; readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); } nread = read(fd,buf,readlen); if (nread <= 0) { serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s", (nread == -1) ? strerror(errno) : "connection lost"); cancelReplicationHandshake(); return; } server.stat_net_input_bytes += nread; /* When a mark is used, we want to detect EOF asap in order to avoid * writing the EOF mark into the file... */ int eof_reached = 0; if (usemark) { /* Update the last bytes array, and check if it matches our delimiter.*/ // 更新 最后幾個字符 if (nread >= CONFIG_RUN_ID_SIZE) { memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE); } else { int rem = CONFIG_RUN_ID_SIZE-nread; memmove(lastbytes,lastbytes+nread,rem); memcpy(lastbytes+rem,buf,nread); } if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1; } server.repl_transfer_lastio = server.unixtime; // 將數據寫入到 temp rdb 文件中 if (write(server.repl_transfer_fd,buf,nread) != nread) { serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno)); goto error; } server.repl_transfer_read += nread; /* Delete the last 40 bytes from the file if we reached EOF. */ if (usemark && eof_reached) { if (ftruncate(server.repl_transfer_fd, server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1) { serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno)); goto error; } } /* Sync data on disk from time to time, otherwise at the end of the transfer * we may suffer a big delay as the memory buffers are copied into the * actual disk. */ // 緩沖達到一定值后,直接刷盤 // REPL_MAX_WRITTEN_BEFORE_FSYNC: 8M if (server.repl_transfer_read >= server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) { off_t sync_size = server.repl_transfer_read - server.repl_transfer_last_fsync_off; rdb_fsync_range(server.repl_transfer_fd, server.repl_transfer_last_fsync_off, sync_size); server.repl_transfer_last_fsync_off += sync_size; } /* Check if the transfer is now complete */ // 傳輸完成 if (!usemark) { if (server.repl_transfer_read == server.repl_transfer_size) eof_reached = 1; } if (eof_reached) { // 直接將臨時 rdb 文件改名為正式的 rdb 文件,從而實現數據替換 if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) { serverLog(LL_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno)); cancelReplicationHandshake(); return; } serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Flushing old data"); // 清空原來的數據,刷入新數據 signalFlushedDb(-1); emptyDb( -1, server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS, replicationEmptyDbCallback); /* Before loading the DB into memory we need to delete the readable * handler, otherwise it will get called recursively since * rdbLoad() will call the event loop to process events from time to * time for non blocking loading. */ aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory"); // 重新載入 rdb 文件,從而完成同步操作 if (rdbLoad(server.rdb_filename) != C_OK) { serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); cancelReplicationHandshake(); return; } /* Final setup of the connected slave <- master link */ zfree(server.repl_transfer_tmpfile); close(server.repl_transfer_fd); // 設置 master 信息,以便下次直接使用 replicationCreateMasterClient(server.repl_transfer_s); serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Finished with success"); /* Restart the AOF subsystem now that we finished the sync. This * will trigger an AOF rewrite, and when done will start appending * to the new file. */ if (server.aof_state != AOF_OFF) { int retry = 10; // 重新關聯 aof 文件,以便后續寫入aof正常 stopAppendOnly(); while (retry-- && startAppendOnly() == C_ERR) { serverLog(LL_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second."); sleep(1); } if (!retry) { serverLog(LL_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now."); exit(1); } } } return; error: cancelReplicationHandshake(); return; }
以上就是全量復制功能實現了,大體步驟為:
1. 先讀取整體數據長度;(肯定是master發來的數據了)
2. 依次讀取就緒數據,將其定入臨時aof文件 temp-<unixtime>.<pid>.aof;
3. 達到一定緩沖數量后,強制刷盤;
4. master 傳輸完成后,slave將臨時aof文件重命名為正式的aof文件;
5. slave 清空原來db數據;
6. 禁用aof文件的監聽,載入新的aof數據,重新開啟監聽;
7. aof 先停止再啟動,重新關聯新文件;
3.5. 部分復制的實現
前面我們看到有個 slaveTryPartialResynchronization(), 是做部分同步檢測的,但是它只會返回幾個狀態,好像返回后都沒有做什么后續處理。只有全量同步時,我們看到了如上邏輯。那么部分同步是如何實現的呢?其中有個 +CONTINUE 的狀態值得我們注意:
... // 部分復制的情況下,只會返回 +CONTINUE if (!strncmp(reply,"+CONTINUE",9)) { /* Partial resync was accepted, set the replication state accordingly */ serverLog(LL_NOTICE, "Successful partial resynchronization with master."); // 立即將結果釋放,那什么時候處理結果呢? sdsfree(reply); // 實際上通過該方法同步數據的 replicationResurrectCachedMaster(fd); // 繼續使用 部分同步 return PSYNC_CONTINUE; } ...
就這上面這個,返回 CONTINUE 后,外部邏輯只是返回,所以肯定是 replicationResurrectCachedMaster() 做了處理。而這個處理,應該是讀取后續的數據沒錯了!
// replication.c, 使用 cacheMaster 做 PSYNC 處理復制數據 /* Turn the cached master into the current master, using the file descriptor * passed as argument as the socket for the new master. * * This function is called when successfully setup a partial resynchronization * so the stream of data that we'll receive will start from were this * master left. */ void replicationResurrectCachedMaster(int newfd) { server.master = server.cached_master; server.cached_master = NULL; server.master->fd = newfd; server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP); server.master->authenticated = 1; server.master->lastinteraction = server.unixtime; server.repl_state = REPL_STATE_CONNECTED; /* Re-add to the list of clients. */ listAddNodeTail(server.clients,server.master); // 添加file事件,epoll事件, 由 readQueryFromClient 進行事件處理 if (aeCreateFileEvent(server.el, newfd, AE_READABLE, readQueryFromClient, server.master)) { serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno)); freeClientAsync(server.master); /* Close ASAP. */ } /* We may also need to install the write handler as well if there is * pending data in the write buffers. */ // 如果有待發送數據,建立一個 寫的 fileEvent 事件 if (clientHasPendingReplies(server.master)) { if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE, sendReplyToClient, server.master)) { serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno)); freeClientAsync(server.master); /* Close ASAP. */ } } } // 接下來,我們查看下 當master發送數據過來時,部分復制是如何實現的 // networking.c, 從 master 中讀取數據, privdata = server.master void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { client *c = (client*) privdata; int nread, readlen; size_t qblen; UNUSED(el); UNUSED(mask); // PROTO_IOBUF_LEN: 1024*16 // PROTO_MBULK_BIG_ARG: 1024*32 readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query * buffer contains exactly the SDS string representing the object, even * at the risk of requiring more read(2) calls. This way the function * processMultiBulkBuffer() can avoid copying buffers to create the * Redis Object representing the argument. */ if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf); if (remaining < readlen) readlen = remaining; } qblen = sdslen(c->querybuf); if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); // 讀取請求命令 nread = read(fd, c->querybuf+qblen, readlen); if (nread == -1) { if (errno == EAGAIN) { return; } else { serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno)); freeClient(c); return; } } else if (nread == 0) { serverLog(LL_VERBOSE, "Client closed connection"); freeClient(c); return; } sdsIncrLen(c->querybuf,nread); c->lastinteraction = server.unixtime; if (c->flags & CLIENT_MASTER) c->reploff += nread; server.stat_net_input_bytes += nread; // 超出最大限制,不處理 if (sdslen(c->querybuf) > server.client_max_querybuf_len) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); bytes = sdscatrepr(bytes,c->querybuf,64); serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); freeClient(c); return; } // 處理 querybuf 數據, 其實就和普通的客戶端寫請求一樣的處理方式 processInputBuffer(c); }
處理master 部分同步過來的數據,重新在 slave 執行一次即可,基於epoll的事件監聽,可以持續處理同步數據。
所以,部分復制,其實就是重新在slave端執行與master相同的請求就好了。這個processInputBuffer()過程在前面的文章已經介紹過。
3.6. PSYNC 命令實現原理
從上面可以看出,PSYNC是整個主從復制過程的重要操作,那么 PSYNC 都是怎么實現的呢?大體上應該是一個范圍查找響應的過程,但是細節必然很多。我們可以先自己想想,要處理的點大概有哪些呢?
1. 第一次調用時,即 PSYNC ? -1 如何處理?
2. 后續調用時 即 PSYNC psync_runid psync_offset 如何處理?
3. 響應結構是如何的?比如如何響應+CONTINUE?
我們就通過源碼來解答這些問題吧!
首先是 PSYNC 的定義: 可以看到,sync 和 psync 居然是一樣的實現?
// 差別是 sync 的參數只有一個,而 psync 的參數是3個 {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0}, {"psync",syncCommand,3,"ars",0,NULL,0,0,0,0,0},
具體實現:
// 用法: PSYNC run_id offset // replication.c /* SYNC and PSYNC command implemenation. */ void syncCommand(client *c) { /* ignore SYNC if already slave or in monitor mode */ // SYNC 命令只能調用成功一次,后續就直接忽略了 if (c->flags & CLIENT_SLAVE) return; /* Refuse SYNC requests if we are a slave but the link with our master * is not ok... */ if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) { addReplyError(c,"Can't SYNC while not connected with my master"); return; } /* SYNC can't be issued when the server has pending data to send to * the client about already issued commands. We need a fresh reply * buffer registering the differences between the BGSAVE and the current * dataset, so that we can copy to other slaves if needed. */ // 還有輸出未完成時不能再進行處理 if (clientHasPendingReplies(c)) { addReplyError(c,"SYNC and PSYNC are invalid with pending output"); return; } serverLog(LL_NOTICE,"Slave %s asks for synchronization", replicationGetSlaveName(c)); /* Try a partial resynchronization if this is a PSYNC command. * If it fails, we continue with usual full resynchronization, however * when this happens masterTryPartialResynchronization() already * replied with: * * +FULLRESYNC <runid> <offset> * * So the slave knows the new runid and offset to try a PSYNC later * if the connection with the master is lost. */ // 事實上,psync 和 sync 的實現還是區別對待的 // psync 將會優先嘗試部分復制 if (!strcasecmp(c->argv[0]->ptr,"psync")) { // 部分復制將不會重置 flags, 即每次 psync 都會成功運行 if (masterTryPartialResynchronization(c) == C_OK) { server.stat_sync_partial_ok++; return; /* No full resync needed, return. */ } else { char *master_runid = c->argv[1]->ptr; /* Increment stats for failed PSYNCs, but only if the * runid is not "?", as this is used by slaves to force a full * resync on purpose when they are not albe to partially * resync. */ if (master_runid[0] != '?') server.stat_sync_partial_err++; } } else { /* If a slave uses SYNC, we are dealing with an old implementation * of the replication protocol (like redis-cli --slave). Flag the client * so that we don't expect to receive REPLCONF ACK feedbacks. */ c->flags |= CLIENT_PRE_PSYNC; } // 以下為全量復制 /* Full resynchronization. */ server.stat_sync_full++; /* Setup the slave as one waiting for BGSAVE to start. The following code * paths will change the state if we handle the slave differently. */ c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; if (server.repl_disable_tcp_nodelay) anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */ c->repldbfd = -1; // 添加slave 到master的從節點集合中, 設置 SLAVE 標識,表示已執行過 SYNC 操作 c->flags |= CLIENT_SLAVE; listAddNodeTail(server.slaves,c); /* CASE 1: BGSAVE is in progress, with disk target. */ // 如果 rdb 存儲已在進行中,即 BGSAVE 已經在運行 // 此種是對於后來進行主從同步的客戶端,只需告知正在運行 BGSAVE 即可 if (server.rdb_child_pid != -1 && server.rdb_child_type == RDB_CHILD_TYPE_DISK) { /* Ok a background save is in progress. Let's check if it is a good * one for replication, i.e. if there is another slave that is * registering differences since the server forked to save. */ client *slave; listNode *ln; listIter li; listRewind(server.slaves,&li); while((ln = listNext(&li))) { slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break; } /* To attach this slave, we check that it has at least all the * capabilities of the slave that triggered the current BGSAVE. */ if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) { /* Perfect, the server is already registering differences for * another slave. Set the right state, and copy the buffer. */ copyClientOutputBuffer(c,slave); replicationSetupSlaveForFullResync(c,slave->psync_initial_offset); serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC"); } else { /* No way, we need to wait for the next BGSAVE in order to * register differences. */ serverLog(LL_NOTICE,"Waiting for next BGSAVE for SYNC"); } /* CASE 2: BGSAVE is in progress, with socket target. */ } else if (server.rdb_child_pid != -1 && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) { /* There is an RDB child process but it is writing directly to * children sockets. We need to wait for the next BGSAVE * in order to synchronize. */ serverLog(LL_NOTICE,"Waiting for next BGSAVE for SYNC"); /* CASE 3: There is no BGSAVE is progress. */ } else { // master 不持久化方式下,不啟動 bgsave if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) { /* Diskless replication RDB child is created inside * replicationCron() since we want to delay its start a * few seconds to wait for more slaves to arrive. */ if (server.repl_diskless_sync_delay) serverLog(LL_NOTICE,"Delay next BGSAVE for SYNC"); } else { /* Target is disk (or the slave is not capable of supporting * diskless replication) and we don't have a BGSAVE in progress, * let's start one. */ // 主動開啟一個后台 BGSAVE if (startBgsaveForReplication(c->slave_capa) != C_OK) return; } } // 如果是第一個 slave, 則創建backlog if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) createReplicationBacklog(); // 最后,直接return, 說明響應沒有一個統一的格式,各自情況各自判斷就好 return; } // 3.6.1. 后台 BGSAVE 的觸發 // replication.c /* Start a BGSAVE for replication goals, which is, selecting the disk or * socket target depending on the configuration, and making sure that * the script cache is flushed before to start. * * The mincapa argument is the bitwise AND among all the slaves capabilities * of the slaves waiting for this BGSAVE, so represents the slave capabilities * all the slaves support. Can be tested via SLAVE_CAPA_* macros. * * Side effects, other than starting a BGSAVE: * * 1) Handle the slaves in WAIT_START state, by preparing them for a full * sync if the BGSAVE was succesfully started, or sending them an error * and dropping them from the list of slaves. * * 2) Flush the Lua scripting script cache if the BGSAVE was actually * started. * * Returns C_OK on success or C_ERR otherwise. */ int startBgsaveForReplication(int mincapa) { int retval; int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF); listIter li; listNode *ln; serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s", socket_target ? "slaves sockets" : "disk"); if (socket_target) // 直接向socket中寫入數據同步 retval = rdbSaveToSlavesSockets(); else // 存儲到磁盤rdb 文件中 retval = rdbSaveBackground(server.rdb_filename); /* If we failed to BGSAVE, remove the slaves waiting for a full * resynchorinization from the list of salves, inform them with * an error about what happened, close the connection ASAP. */ if (retval == C_ERR) { serverLog(LL_WARNING,"BGSAVE for replication failed"); listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { slave->flags &= ~CLIENT_SLAVE; listDelNode(server.slaves,ln); addReplyError(slave, "BGSAVE failed, replication can't continue"); slave->flags |= CLIENT_CLOSE_AFTER_REPLY; } } return retval; } /* If the target is socket, rdbSaveToSlavesSockets() already setup * the salves for a full resync. Otherwise for disk target do it now.*/ if (!socket_target) { listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; // 依次響應 slave 端 +FULLRESYNC <master_runid> <master_offset> if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { replicationSetupSlaveForFullResync(slave, getPsyncInitialOffset()); } } } /* Flush the script cache, since we need that slave differences are * accumulated without requiring slaves to match our cached scripts. */ // lua 腳本相關,略 if (retval == C_OK) replicationScriptCacheFlush(); return retval; } // rdb.c, 后台保存數據到 filename 中 int rdbSaveBackground(char *filename) { pid_t childpid; long long start; if (server.rdb_child_pid != -1) return C_ERR; server.dirty_before_bgsave = server.dirty; server.lastbgsave_try = time(NULL); start = ustime(); // 使用fork() 創建子進程進行 bgsave // 所以,bgsave 應該是個很耗內存的事 if ((childpid = fork()) == 0) { int retval; /* Child */ // fork() 出的子進程執行此代碼區域 closeListeningSockets(0); redisSetProcTitle("redis-rdb-bgsave"); // 所以,整個耗時的操作都在 rdbSave() 中了 retval = rdbSave(filename); if (retval == C_OK) { size_t private_dirty = zmalloc_get_private_dirty(); if (private_dirty) { serverLog(LL_NOTICE, "RDB: %zu MB of memory used by copy-on-write", private_dirty/(1024*1024)); } } // 執行完rdbSave()后,直接退出子進程 // 此處的退出操作,並不會清理進程 I/O 緩沖,以便將來方便使用 exitFromChild((retval == C_OK) ? 0 : 1); } else { /* Parent */ // 父進程執行此代碼區域 server.stat_fork_time = ustime()-start; server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */ latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000); if (childpid == -1) { server.lastbgsave_status = C_ERR; serverLog(LL_WARNING,"Can't save in background: fork: %s", strerror(errno)); return C_ERR; } // 記錄子進程信息 serverLog(LL_NOTICE,"Background saving started by pid %d",childpid); server.rdb_save_time_start = time(NULL); server.rdb_child_pid = childpid; server.rdb_child_type = RDB_CHILD_TYPE_DISK; // bgsave 期間禁止dict進行擴容 updateDictResizePolicy(); return C_OK; } return C_OK; /* unreached */ } // replication.c, 響應客戶端需要進行全量復制 /* Send a FULLRESYNC reply in the specific case of a full resynchronization, * as a side effect setup the slave for a full sync in different ways: * * 1) Remember, into the slave client structure, the offset we sent * here, so that if new slaves will later attach to the same * background RDB saving process (by duplicating this client output * buffer), we can get the right offset from this slave. * 2) Set the replication state of the slave to WAIT_BGSAVE_END so that * we start accumulating differences from this point. * 3) Force the replication stream to re-emit a SELECT statement so * the new slave incremental differences will start selecting the * right database number. * * Normally this function should be called immediately after a successful * BGSAVE for replication was started, or when there is one already in * progress that we attached our slave to. */ int replicationSetupSlaveForFullResync(client *slave, long long offset) { char buf[128]; int buflen; slave->psync_initial_offset = offset; slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END; /* We are going to accumulate the incremental changes for this * slave as well. Set slaveseldb to -1 in order to force to re-emit * a SLEECT statement in the replication stream. */ server.slaveseldb = -1; /* Don't send this reply to slaves that approached us with * the old SYNC command. */ if (!(slave->flags & CLIENT_PRE_PSYNC)) { buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n", server.runid,offset); if (write(slave->fd,buf,buflen) != buflen) { freeClientAsync(slave); return C_ERR; } } return C_OK; } // rdb.c, 子進程bgsave 數據過程 /* Save the DB on disk. Return C_ERR on error, C_OK on success. */ int rdbSave(char *filename) { char tmpfile[256]; FILE *fp; rio rdb; int error = 0; // 先使用臨時文件寫數據,然后再更名為 rdb正式文件 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); fp = fopen(tmpfile,"w"); if (!fp) { serverLog(LL_WARNING, "Failed opening .rdb for saving: %s", strerror(errno)); return C_ERR; } rioInitWithFile(&rdb,fp); // rdbSaveRio 主dump數據的關鍵實現 if (rdbSaveRio(&rdb,&error) == C_ERR) { errno = error; goto werr; } /* Make sure data will not remain on the OS's output buffers */ if (fflush(fp) == EOF) goto werr; if (fsync(fileno(fp)) == -1) goto werr; if (fclose(fp) == EOF) goto werr; /* Use RENAME to make sure the DB file is changed atomically only * if the generate DB file is ok. */ if (rename(tmpfile,filename) == -1) { serverLog(LL_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno)); unlink(tmpfile); return C_ERR; } serverLog(LL_NOTICE,"DB saved on disk"); server.dirty = 0; server.lastsave = time(NULL); server.lastbgsave_status = C_OK; return C_OK; werr: serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno)); fclose(fp); unlink(tmpfile); return C_ERR; } // replication.c, 針對第一個進行主從復制的 slave, 需要觸發 backlog 的初始化 void createReplicationBacklog(void) { serverAssert(server.repl_backlog == NULL); server.repl_backlog = zmalloc(server.repl_backlog_size); server.repl_backlog_histlen = 0; server.repl_backlog_idx = 0; /* When a new backlog buffer is created, we increment the replication * offset by one to make sure we'll not be able to PSYNC with any * previous slave. This is needed because we avoid incrementing the * master_repl_offset if no backlog exists nor slaves are attached. */ server.master_repl_offset++; /* We don't have any data inside our buffer, but virtually the first * byte we have is the next byte that will be generated for the * replication stream. */ server.repl_backlog_off = server.master_repl_offset+1; } // 3.6.2. 部分復制時的處理方式 // replication.c, 部分復制嘗試 /* This function handles the PSYNC command from the point of view of a * master receiving a request for partial resynchronization. * * On success return C_OK, otherwise C_ERR is returned and we proceed * with the usual full resync. */ int masterTryPartialResynchronization(client *c) { long long psync_offset, psync_len; char *master_runid = c->argv[1]->ptr; char buf[128]; int buflen; /* Is the runid of this master the same advertised by the wannabe slave * via PSYNC? If runid changed this master is a different instance and * there is no way to continue. */ // run_id 發生了變化,則需要重新同步 if (strcasecmp(master_runid, server.runid)) { /* Run id "?" is used by slaves that want to force a full resync. */ if (master_runid[0] != '?') { serverLog(LL_NOTICE,"Partial resynchronization not accepted: " "Runid mismatch (Client asked for runid '%s', my runid is '%s')", master_runid, server.runid); } else { serverLog(LL_NOTICE,"Full resync requested by slave %s", replicationGetSlaveName(c)); } goto need_full_resync; } /* We still have the data our slave is asking for? */ if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != C_OK) goto need_full_resync; // offset 超出范圍,使用全量同步 if (!server.repl_backlog || psync_offset < server.repl_backlog_off || psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) { serverLog(LL_NOTICE, "Unable to partial resync with slave %s for lack of backlog (Slave request was: %lld).", replicationGetSlaveName(c), psync_offset); if (psync_offset > server.master_repl_offset) { serverLog(LL_WARNING, "Warning: slave %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c)); } goto need_full_resync; } /* If we reached this point, we are able to perform a partial resync: * 1) Set client state to make it a slave. * 2) Inform the client we can continue with +CONTINUE * 3) Send the backlog data (from the offset to the end) to the slave. */ c->flags |= CLIENT_SLAVE; c->replstate = SLAVE_STATE_ONLINE; c->repl_ack_time = server.unixtime; c->repl_put_online_on_ack = 0; listAddNodeTail(server.slaves,c); /* We can't use the connection buffers since they are used to accumulate * new commands at this stage. But we are sure the socket send buffer is * empty so this write will never fail actually. */ // 響應客戶端 +CONTINUE buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); if (write(c->fd,buf,buflen) != buflen) { freeClientAsync(c); return C_OK; } // 輸出部分同步的數據 psync_len = addReplyReplicationBacklog(c,psync_offset); serverLog(LL_NOTICE, "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.", replicationGetSlaveName(c), psync_len, psync_offset); /* Note that we don't need to set the selected DB at server.slaveseldb * to -1 to force the master to emit SELECT, since the slave already * has this state from the previous connection with the master. */ refreshGoodSlavesCount(); return C_OK; /* The caller can return, no full resync needed. */ need_full_resync: /* We need a full resync for some reason... Note that we can't * reply to PSYNC right now if a full SYNC is needed. The reply * must include the master offset at the time the RDB file we transfer * is generated, so we need to delay the reply to that moment. */ return C_ERR; } // replication.c, 根據偏移量響應從節點數據 /* Feed the slave 'c' with the replication backlog starting from the * specified 'offset' up to the end of the backlog. */ long long addReplyReplicationBacklog(client *c, long long offset) { long long j, skip, len; serverLog(LL_DEBUG, "[PSYNC] Slave request offset: %lld", offset); if (server.repl_backlog_histlen == 0) { serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero"); return 0; } serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld", server.repl_backlog_size); serverLog(LL_DEBUG, "[PSYNC] First byte: %lld", server.repl_backlog_off); serverLog(LL_DEBUG, "[PSYNC] History len: %lld", server.repl_backlog_histlen); serverLog(LL_DEBUG, "[PSYNC] Current index: %lld", server.repl_backlog_idx); /* Compute the amount of bytes we need to discard. */ // 重點就是 計算出需要同步的點 skip = offset - server.repl_backlog_off; serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip); /* Point j to the oldest byte, that is actaully our * server.repl_backlog_off byte. */ j = (server.repl_backlog_idx + (server.repl_backlog_size-server.repl_backlog_histlen)) % server.repl_backlog_size; serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j); /* Discard the amount of data to seek to the specified 'offset'. */ j = (j + skip) % server.repl_backlog_size; /* Feed slave with data. Since it is a circular buffer we have to * split the reply in two parts if we are cross-boundary. */ len = server.repl_backlog_histlen - skip; serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len); while(len) { long long thislen = ((server.repl_backlog_size - j) < len) ? (server.repl_backlog_size - j) : len; serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen); addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen)); len -= thislen; j = 0; } return server.repl_backlog_histlen - skip; } //3.6.3. 全量復制時如何響應客戶端 // 因為前面我們看到只是響應了一個 FULLRESYNC <master_runid> <master_offset> 的標識而已 // 實際上,這也是一個后台腳本在運行時處理的 // replication.c, /* This function is called at the end of every background saving, * or when the replication RDB transfer strategy is modified from * disk to socket or the other way around. * * The goal of this function is to handle slaves waiting for a successful * background saving in order to perform non-blocking synchronization, and * to schedule a new BGSAVE if there are slaves that attached while a * BGSAVE was in progress, but it was not a good one for replication (no * other slave was accumulating differences). * * The argument bgsaveerr is C_OK if the background saving succeeded * otherwise C_ERR is passed to the function. * The 'type' argument is the type of the child that terminated * (if it had a disk or socket target). */ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { listNode *ln; int startbgsave = 0; int mincapa = -1; listIter li; listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { startbgsave = 1; mincapa = (mincapa == -1) ? slave->slave_capa : (mincapa & slave->slave_capa); } // 當bgsave 完成后, replstate 將變為 SLAVE_STATE_WAIT_BGSAVE_END // 代表可以進行發送 rdb 文件了 // 同樣,基於epoll io模型,進行高效發送文件 else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) { struct redis_stat buf; /* If this was an RDB on disk save, we have to prepare to send * the RDB from disk to the slave socket. Otherwise if this was * already an RDB -> Slaves socket transfer, used in the case of * diskless replication, our work is trivial, we can just put * the slave online. */ if (type == RDB_CHILD_TYPE_SOCKET) { serverLog(LL_NOTICE, "Streamed RDB transfer with slave %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming", replicationGetSlaveName(slave)); /* Note: we wait for a REPLCONF ACK message from slave in * order to really put it online (install the write handler * so that the accumulated data can be transfered). However * we change the replication state ASAP, since our slave * is technically online now. */ slave->replstate = SLAVE_STATE_ONLINE; slave->repl_put_online_on_ack = 1; slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */ } else { if (bgsaveerr != C_OK) { freeClient(slave); serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error"); continue; } if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || redis_fstat(slave->repldbfd,&buf) == -1) { freeClient(slave); serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); continue; } slave->repldboff = 0; slave->repldbsize = buf.st_size; slave->replstate = SLAVE_STATE_SEND_BULK; slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n", (unsigned long long) slave->repldbsize); aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); // 注冊一個寫事件到 epoll 中,由 sendBulkToSlave 進行具體的發送邏輯 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { freeClient(slave); continue; } } } } if (startbgsave) startBgsaveForReplication(mincapa); } // replication.c, 發送 rdb 文件到從節點 void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { client *slave = privdata; UNUSED(el); UNUSED(mask); char buf[PROTO_IOBUF_LEN]; ssize_t nwritten, buflen; /* Before sending the RDB file, we send the preamble as configured by the * replication process. Currently the preamble is just the bulk count of * the file in the form "$<length>\r\n". */ if (slave->replpreamble) { nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble)); if (nwritten == -1) { serverLog(LL_VERBOSE,"Write error sending RDB preamble to slave: %s", strerror(errno)); freeClient(slave); return; } server.stat_net_output_bytes += nwritten; sdsrange(slave->replpreamble,nwritten,-1); if (sdslen(slave->replpreamble) == 0) { sdsfree(slave->replpreamble); slave->replpreamble = NULL; /* fall through sending data. */ } else { return; } } /* If the preamble was already transfered, send the RDB bulk data. */ lseek(slave->repldbfd,slave->repldboff,SEEK_SET); buflen = read(slave->repldbfd,buf,PROTO_IOBUF_LEN); if (buflen <= 0) { serverLog(LL_WARNING,"Read error sending DB to slave: %s", (buflen == 0) ? "premature EOF" : strerror(errno)); freeClient(slave); return; } if ((nwritten = write(fd,buf,buflen)) == -1) { if (errno != EAGAIN) { serverLog(LL_WARNING,"Write error sending DB to slave: %s", strerror(errno)); freeClient(slave); } return; } slave->repldboff += nwritten; server.stat_net_output_bytes += nwritten; // 一次次地寫入socket中,直到傳輸完成 if (slave->repldboff == slave->repldbsize) { close(slave->repldbfd); slave->repldbfd = -1; aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); putSlaveOnline(slave); } }
PSYNC 也是主從同步的重要命令,它決定是全量復制還是部分復制。全量復制時,得決定是否開啟 BGSAVE 操作;而部分復制時則只需把offset后的數據發送回slave即可完成數據同步。
4. 如何持續同步?
也叫增量同步。前面我們看這么多東西,其實也只做到了初次的全量復制和部分復制功能。那么第一次復制之后呢,后續又是如何持續同步的呢?
想想前面,既然有一個定時任務一直在運行,由它來實現可能是個不錯的想法。從節點一直向其發送ping命令,而master節點則一直將自身的數據寫入slave中,從而完成持續同步。
事實上,每個寫動作,都會有一個事件傳播的操作。而這個操作里,就會有一個檢測 slave 情況的設定,而非cron去處理。就是 replicationFeedSlaves():
// 將命令傳播給slaves // 觸發的場景如: 很多寫操作, 特別的:某個key過期, // replication.c void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { listNode *ln; listIter li; int j, len; char llstr[LONG_STR_SIZE]; /* If there aren't slaves, and there is no backlog buffer to populate, * we can return ASAP. */ if (server.repl_backlog == NULL && listLength(slaves) == 0) return; /* We can't have slaves attached and no backlog. */ serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); /* Send SELECT command to every slave if needed. */ if (server.slaveseldb != dictid) { robj *selectcmd; /* For a few DBs we have pre-computed SELECT command. */ if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) { selectcmd = shared.select[dictid]; } else { int dictid_len; dictid_len = ll2string(llstr,sizeof(llstr),dictid); selectcmd = createObject(OBJ_STRING, sdscatprintf(sdsempty(), "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", dictid_len, llstr)); } /* Add the SELECT command into the backlog. */ if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd); /* Send it to slaves. */ listRewind(slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; addReply(slave,selectcmd); } if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd); } server.slaveseldb = dictid; /* Write the command to the replication backlog if any. */ if (server.repl_backlog) { char aux[LONG_STR_SIZE+3]; /* Add the multi bulk reply length. */ aux[0] = '*'; len = ll2string(aux+1,sizeof(aux)-1,argc); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBacklog(aux,len+3); for (j = 0; j < argc; j++) { long objlen = stringObjectLen(argv[j]); /* We need to feed the buffer with the object as a bulk reply * not just as a plain string, so create the $..CRLF payload len * and add the final CRLF */ aux[0] = '$'; len = ll2string(aux+1,sizeof(aux)-1,objlen); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBacklog(aux,len+3); feedReplicationBacklogWithObject(argv[j]); feedReplicationBacklog(aux+len+1,2); } } /* Write the command to every slave. */ listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; /* Don't feed slaves that are still waiting for BGSAVE to start */ // 只有初始化完成后的從節點,才會推送同步寫操作 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; /* Feed slaves that are waiting for the initial SYNC (so these commands * are queued in the output buffer until the initial SYNC completes), * or are already in sync with the master. */ /* Add the multi bulk length. */ addReplyMultiBulkLen(slave,argc); /* Finally any additional argument that was not stored inside the * static buffer if any (from j to argc). */ for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]); } }
寫操作的命令傳播,是在 call() 調用實際的數據操作里統一封裝的,避免了到處寫相同的代碼。
// server.c, 執行命令核心方法包裝 // 調用如: processCommand().call(c,CMD_CALL_FULL); 會以最大能力處理命令 /* Call() is the core of Redis execution of a command. * * The following flags can be passed: * CMD_CALL_NONE No flags. * CMD_CALL_SLOWLOG Check command speed and log in the slow log if needed. * CMD_CALL_STATS Populate command stats. * CMD_CALL_PROPAGATE_AOF Append command to AOF if it modified the dataset * or if the client flags are forcing propagation. * CMD_CALL_PROPAGATE_REPL Send command to salves if it modified the dataset * or if the client flags are forcing propagation. * CMD_CALL_PROPAGATE Alias for PROPAGATE_AOF|PROPAGATE_REPL. * CMD_CALL_FULL Alias for SLOWLOG|STATS|PROPAGATE. * * The exact propagation behavior depends on the client flags. * Specifically: * * 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set * and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set * in the call flags, then the command is propagated even if the * dataset was not affected by the command. * 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP * are set, the propagation into AOF or to slaves is not performed even * if the command modified the dataset. * * Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF * or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or * slaves propagation will never occur. * * Client flags are modified by the implementation of a given command * using the following API: * * forceCommandPropagation(client *c, int flags); * preventCommandPropagation(client *c); * preventCommandAOF(client *c); * preventCommandReplication(client *c); * */ void call(client *c, int flags) { long long dirty, start, duration; int client_old_flags = c->flags; ... /* Call the command. */ dirty = server.dirty; start = ustime(); c->cmd->proc(c); duration = ustime()-start; dirty = server.dirty-dirty; if (dirty < 0) dirty = 0; ... // 此處將需要傳播的命令傳播到 slave /* Propagate the command into the AOF and replication link */ if (flags & CMD_CALL_PROPAGATE && (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP) { int propagate_flags = PROPAGATE_NONE; /* Check if the command operated changes in the data set. If so * set for replication / AOF propagation. */ if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL); /* If the client forced AOF / replication of the command, set * the flags regardless of the command effects on the data set. */ if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL; if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF; /* However prevent AOF / replication propagation if the command * implementatino called preventCommandPropagation() or similar, * or if we don't have the call() flags to do so. */ if (c->flags & CLIENT_PREVENT_REPL_PROP || !(flags & CMD_CALL_PROPAGATE_REPL)) propagate_flags &= ~PROPAGATE_REPL; if (c->flags & CLIENT_PREVENT_AOF_PROP || !(flags & CMD_CALL_PROPAGATE_AOF)) propagate_flags &= ~PROPAGATE_AOF; /* Call propagate() only if at least one of AOF / replication * propagation is needed. */ // 如果需要傳播命令,則調用 propagate(), propagate 會決定寫 AOF 或者 slaves if (propagate_flags != PROPAGATE_NONE) propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags); } /* Restore the old replication flags, since call() can be executed * recursively. */ c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); c->flags |= client_old_flags & (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); ... server.stat_numcommands++; } /* Propagate the specified command (in the context of the specified database id) * to AOF and Slaves. * * flags are an xor between: * + PROPAGATE_NONE (no propagation of command at all) * + PROPAGATE_AOF (propagate into the AOF file if is enabled) * + PROPAGATE_REPL (propagate into the replication link) * * This should not be used inside commands implementation. Use instead * alsoPropagate(), preventCommandPropagation(), forceCommandPropagation(). */ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) { // 寫 AOF 文件 if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) feedAppendOnlyFile(cmd,dbid,argv,argc); // 寫slave if (flags & PROPAGATE_REPL) replicationFeedSlaves(server.slaves,dbid,argv,argc); }
其實整個同步過程並不太復雜,大體就是建立連接然后復制數據然后恢復數據的過程,只是要實現的時候,代碼還是不會太少。
當然,這里面會有很多要注意的點:
1. 如何不影響性能?
2. 如何保證低延遲?
3. 如何安全地復制?
4. 如何檢測異常?
5. 如何保證高可用性?