1、前言
本文介紹了Redis復制的主要流程和設計思想。通過本文的閱讀,您大致能理解復制在軟件架構方面的通用思想。在閱讀本文之前,希望讀者首先對Redis有一定的認識,對Redis的事件類型、和事件處理器有個基本的了解。因為本文主要講復制的流程,所以很多額外的知識點只是一筆帶過、想要更多的了解,自行參考網上資料。話不多說、進入主題。
2、復制的主要流程
在redis復制的過程中,參與者主要就是redis的主從架構。復制是從一方復制數據到另一方,所以兩台Redis機器是必不可少的參與對象。一台主機、一台從機!參考Redis復制的主要流程,我將它分為以下幾個小模塊來分析。
- 配置階段
- 握手、探測階段
- 同步階段
- 命令傳播階段
Redis使用狀態機的策略來把以上流程給串接起來。即在每個階段都配置一個狀態碼、及每個狀態碼下執行的代碼流程!
2.1 配置階段
主從機是通過TCP協議來進行數據傳輸。所以它們首先就要建立一個安全的鏈接通道,以便可以通信!那么我們就要在從機啟動的時候配置個,它要向誰要數據,認哪個主機為自己的Master! 配置有以下幾種方法
1、 通過配置文件配置
在Redis.conf時添加要建立鏈接的主機信息、
echo slaveof masterIp masterPort >> redis.conf
2、通過客戶端
我們可以通過終端鏈接到從機
//鏈接到從機
redis-cli -p <從機port>
//執行
slaveof masterIp masterPort
3、通過啟動時指定參數
也可以在啟動從機的時候帶上指定參數
redis-server redis.conf --slaveof masterIp masterPort
那么以上三種方法都可以讓當前啟動的從機保存既然要鏈接到主機的地址、和端口號!這三種方法有一定的區別,通過配置文件保存的啟動方式比較靠普一些。當配置好主機信息后,那么接下來就要鏈接到主機!
經過以上三種方式的配置,狀態機里的狀態碼配置成REPL_STATE_CONNECT
/* Set replication to the specified master address and port. */
void replicationSetMaster(char *ip, int port) {
int was_master = server.masterhost == NULL;
//其他代碼.....
//配置狀態機為 REPL_STATE_CONNECT
server.repl_state = REPL_STATE_CONNECT;
}
2.2 握手、探測階段
2.2.1 鏈接Master
上面說到從機配置好了主機的地址和端口,那么如何觸發鏈接呢?這就是Redis的時間事件函數serverCron
, 它做了很多事情。其中它做了一件事就是:維護主從機數據同步。
/* Replication cron function -- used to reconnect to master,
* detect transfer failures, start background RDB transfers and so forth. */
/* 1000ms執行一次 replicationCron這個函數 */
run_with_period(1000) replicationCron();
這個replicationCron
函數會去檢測狀態機的狀態碼、上回我們的狀態碼是REPL_STATE_CONNECT
void replicationCron(void) {
//.....
/* Check if we should connect to a MASTER */
if (server.repl_state == REPL_STATE_CONNECT) {
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
if (connectWithMaster() == C_OK) {
serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
}
}
//....
}
小學英文水准也能看的懂是吧、檢測是否去鏈接Master!!!判斷條件很簡單、就是那個狀態碼!!接下來看下connectWithMaster
鏈接主機的函數
int connectWithMaster(void) {
int fd;
//采用了NonBlock的方式 可以參考《UNIX網絡編程》-卷1(16節)的非阻塞I/O部分
fd = anetTcpNonBlockBestEffortBindConnect(NULL,
server.masterhost,server.masterport,NET_FIRST_BIND_ADDR);
if (fd == -1) {
serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
strerror(errno));
return C_ERR;
}
//文件事件、大致思想參考《UNIX網絡編程》-卷1(6節)I/O復用
if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
AE_ERR)
{
close(fd);
serverLog(LL_WARNING,"Can't create readable event for SYNC");
return C_ERR;
}
server.repl_transfer_lastio = server.unixtime;
//保存鏈接套節字
server.repl_transfer_s = fd;
server.repl_state = REPL_STATE_CONNECTING;
return C_OK;
}
這就和Master建立TCP鏈接、使得狀態變成REPL_STATE_CONNECTING
模式。
2.2.2 相互認證信息、檢測同步環境
當我們從機鏈接到主機后、也不是立馬進行數據發送,進行同步。它和那一樣、也要做足了前戲!過程相當的多,但都很簡單!沒有什么重點可講的,我們大致過下
- 從機給主機發送Ping,來探測網絡狀況、網絡狀態不好的情況下,重新建立鏈接!這步有點那個味道,先互相了解認識下、牽個手啥的!如果特殊時期,對不起您!
- 然后身份驗證、就是身份識別,總不能什么人來鏈接我 我都要給你同步吧!
- 發送當前從機的IP信息、及監聽的端口號啥的、這步不知道啥用?
- 探測支持的同步協議類型、和支持同步能力(EOF/PSYNC2/CAPA)。Redis在初期的時候,只支持全量的同步,就是你只要來,我都給你!經過后期作者的優化又支持、部分同步(即,同步過的數據不會再同步給你)。這步呢,就是來看看到底支持哪種同步協議類型的、以方便后續操作。
if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {
//讀取回復
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF capa. */
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF capa: %s", err);
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_PSYNC;
}
經過上面那一系列的"互相認識"階段,最終讓狀態變成REPL_STATE_SEND_PSYNC
2.3 同步階段
那么到這里就可以真正的同步數據了,萬事具備了!上回說狀態到了REPL_STATE_SEND_PSYNC
,且看源碼:
if (server.repl_state == REPL_STATE_SEND_PSYNC) {
if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) {
err = sdsnew("Write error sending the PSYNC command.");
goto write_error;
}
server.repl_state = REPL_STATE_RECEIVE_PSYNC;
return;
}
主要函數slaveTryPartialResynchronization
小學水平翻譯下:“從機嘗試部分同步”。這里為為什么要嘗試部分同步呢?之前咱們說到:Redis早期的版本不支持部分同步,后來才支持的。函數名我估計是:如果當前這台機器同步過數據,那么走部分同步,如果沒有就走全部同步,所以起了個slaveTryPartialResynchronization
這也是我的猜想啊、看源碼很累,有時候猜也能幫助你順着往下看,如果你每個函數都看一下,會累死的!猜函數的大致用法也是看源碼的方法之一!
既然來了還是帶看下源碼吧!
/**
* fd :鏈接套節字,你就認為中 socekt里 socket_accpet 那返回的玩意兒,用於相互同信的!
* read_reply: 這個函數分為兩個部分、用這個值來區分,通俗了說也就是、傳遞1干什么、傳遞0干什么、
* 我們看下源碼里
*/
int slaveTryPartialResynchronization(int fd, int read_reply) {
/* Writing half */
if (!read_reply) {
/*
* 當不可讀的時候,取就是寫的時候,即往fd里寫數據。即向對方發送數據!
*/
return 狀態常量
}
/* Reading half */
/**
* 上面是發送數據,下面就是讀取數據的源碼咯!
*/
}
很顯示、上面這函數分了兩大塊,由read_reply
參數來決定,是發送數據,還是讀取數據!就兩件事
0:發送數據
1:讀取數據
接下來看一張圖吧、單獨用文字來解釋有點繞
上面這張圖大致的來表示了一個干凈的從機,第一次向主機同步數據的過程,下面解釋下這張圖
- 向發送指令
psync ? -1
我們還是來看下源碼、回到 slaveTryPartialResynchronization
函數:
int slaveTryPartialResynchronization(int fd, int read_reply) {
char *psync_replid;
char psync_offset[32];
sds reply;
/* Writing half */
if (!read_reply) {
server.master_initial_offset = -1;
//如果有主機的數據、
if (server.cached_master) {
psync_replid = server.cached_master->replid;
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
} else {
//沒有情況
serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
psync_replid = "?";
memcpy(psync_offset,"-1",3);
}
//發送 PSYNC 指令
/* Issue the PSYNC command */
reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_replid,psync_offset,NULL);
if (reply != NULL) {
serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
sdsfree(reply);
aeDeleteFileEvent(server.el,fd,AE_READABLE);
return PSYNC_WRITE_ERROR;
}
return PSYNC_WAIT_REPLY;
}
// 讀的部分...省略
}
PSYNC
指令有兩個參數、
- psync_replid
- psync_offset
從上面的邏輯可以看出來、當有同步過的時候,psync_replid
和psync_offset
會取出相對就的值、如果沒有則用"?"和“-1”來給值。通常情況下,我們是一個新機器,所以沒有同步過主機信息,即cached_master
為false
所以:
// psync_replid = ?
// psync_offset = -1
sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC", "?", -1, NULL);
接下來把狀態變更為PSYNC_WAIT_REPLY
等待主機的回復!
主機在接收到從機發來的PSYNC
命令時大致的流程是會去fock一個子進程出來做bgSave
的事情、有關於Redis持久化的過程不在本文描述、可以自尋資料觀看。當主機接收到PSYNC
指令的時候,解析指令,我們轉到主機視角看如何解析!我們主要分析下關鍵代碼~
void syncCommand(client *c) {
// 一系列的判斷代碼、略過
//因為我們是第一次同步、所以 嘗試部分同步會失敗、 走到下面的 else 里 stat_sync_partial_err++
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
if (masterTryPartialResynchronization(c) == C_OK) {
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
char *master_replid = c->argv[1]->ptr;
/* Increment stats for failed PSYNCs, but only if the
* replid is not "?", as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
* resync. */
if (master_replid[0] != '?') server.stat_sync_partial_err++;
}
} else {
/* If a slave uses SYNC, we are dealing with an old implementation
* of the replication protocol (like redis-cli --slave). Flag the client
* so that we don't expect to receive REPLCONF ACK feedbacks. */
c->flags |= CLIENT_PRE_PSYNC;
}
//以主機的視角來看的話,這里很多代碼是做一些 數據保存、主要把從機的信息保存下來、
/* Setup the slave as one waiting for BGSAVE to start. The following code
* paths will change the state if we handle the slave differently. */
//標識當前這個從機的同步狀態、標識從機為CLIENT_SLAVE身份、加入從機列表、
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;//這個打個flag ,下面將會用到。先標識狀態
if (server.repl_disable_tcp_nodelay)
anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
c->repldbfd = -1;
c->flags |= CLIENT_SLAVE;
listAddNodeTail(server.slaves,c);
//這里創建一個復制積壓緩沖區,用於部分同步,稍后講到、這里打個flag
/* Create the replication backlog if needed. */
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {
/* When we create the backlog from scratch, we always use a new
* replication ID and clear the ID2, since there is no valid
* past history. */
changeReplicationId();
clearReplicationId2();
createReplicationBacklog();
}
//下面有三個case 因為我們是第一次請求主機同步。所以沒有任務bgsave progress(這里假設,方便我們閱讀代碼,和順應場景)
/* CASE 1: BGSAVE is in progress, with disk target. */
if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_DISK)
{
/* CASE 2: BGSAVE is in progress, with socket target. */
} else if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
{
/* There is an RDB child process but it is writing directly to
* children sockets. We need to wait for the next BGSAVE
* in order to synchronize. */
serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");
/* CASE 3: There is no BGSAVE is progress. */
} else {
if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {
/* Diskless replication RDB child is created inside
* replicationCron() since we want to delay its start a
* few seconds to wait for more slaves to arrive. */
if (server.repl_diskless_sync_delay)
serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
} else {
/* Target is disk (or the slave is not capable of supporting
* diskless replication) and we don't have a BGSAVE in progress,
* let's start one. */
if (server.aof_child_pid == -1) {
//開始為同步進行Bgsave操作
startBgsaveForReplication(c->slave_capa);
} else {
serverLog(LL_NOTICE,
"No BGSAVE in progress, but an AOF rewrite is active. "
"BGSAVE for replication delayed");
}
}
}
return;
}
如果都沒出錯的知識(主機默認支持無磁化同步),那么開始startBgsaveForReplication
再接着往下看
int startBgsaveForReplication(int mincapa) {
int retval;
int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
listIter li;
listNode *ln;
//開始bgsave為同步准備, socket_target 為復制到socket還是磁盤的判斷
serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",
socket_target ? "replicas sockets" : "disk");
//准備RDB文件 開始
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
/* Only do rdbSave* when rsiptr is not NULL,
* otherwise slave will miss repl-stream-db. */
if (rsiptr) {
if (socket_target)
//保存到socket
retval = rdbSaveToSlavesSockets(rsiptr);
else
///保存到disk
retval = rdbSaveBackground(server.rdb_filename,rsiptr);
} else {
serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later.");
retval = C_ERR;
}
//准備RDB文件 結束
//如果錯誤 那么采取的應對方法、找到等待同步的從機,
if (retval == C_ERR) {
serverLog(LL_WARNING,"BGSAVE for replication failed");
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
//找到等待同步的從機,
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
slave->replstate = REPL_STATE_NONE;
slave->flags &= ~CLIENT_SLAVE;
//刪除節點、看樣子從列表中刪除當前這個從機
listDelNode(server.slaves,ln);
//向從機發送日志、
addReplyError(slave,
"BGSAVE failed, replication can't continue");
//標識為close狀態
slave->flags |= CLIENT_CLOSE_AFTER_REPLY;
}
}
return retval;
}
/* If the target is socket, rdbSaveToSlavesSockets() already setup
* the salves for a full resync. Otherwise for disk target do it now.*/
//走到這里、socket_tartget = false
if (!socket_target) {
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
//找在等待同步開始的從機、這個狀態在上面設置過的,
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
replicationSetupSlaveForFullResync(slave,
getPsyncInitialOffset());
}
}
}
/* Flush the script cache, since we need that slave differences are
* accumulated without requiring slaves to match our cached scripts. */
if (retval == C_OK) replicationScriptCacheFlush();
return retval;
}
經過上面很流程,終於走到了函數replicationSetupSlaveForFullResync
、這里再次提醒下大家在看源碼的時候,不要多看,順着主流程往下看,每個函數的分支比較多,看多了容易看不回來。切記!
int replicationSetupSlaveForFullResync(client *slave, long long offset) {
char buf[128];
int buflen;
//這個函數很簡單了、配置從機和復制偏移量、配置從機的復制狀態、
slave->psync_initial_offset = offset;
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
/* We are going to accumulate the incremental changes for this
* slave as well. Set slaveseldb to -1 in order to force to re-emit
* a SELECT statement in the replication stream. */
server.slaveseldb = -1;
/* Don't send this reply to slaves that approached us with
* the old SYNC command. */
if (!(slave->flags & CLIENT_PRE_PSYNC)) {
buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
server.replid,offset);
//寫入數據、 FULLRESYNC replid offset
if (write(slave->fd,buf,buflen) != buflen) {
freeClientAsync(slave);
return C_ERR;
}
}
return C_OK;
}
好了、到這里從機發送 psync ? -1
的流程就完整了,最后主機把自己的replid
和offset
發送給了從機!至於replid
和offset
的作用和含意我們下文說到! 源碼鏈路不算太長,沿着主線看就行,拋開那些不是很重要的代碼!
接着我們回到從機的視角
因為有數據回來,Redis的文件事件會自動觸發syncWithMaster
回到slaveTryPartialResynchronization
函數、參數是1
psync_result = slaveTryPartialResynchronization(fd,1);
if (psync_result == PSYNC_CONTINUE) {
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization.");
return;
}
int slaveTryPartialResynchronization(int fd, int read_reply) {
/* Reading half */
//讀取數據
reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
if (sdslen(reply) == 0) {
/* The master may send empty newlines after it receives PSYNC
* and before to reply, just to keep the connection alive. */
sdsfree(reply);
return PSYNC_WAIT_REPLY;
}
aeDeleteFileEvent(server.el,fd,AE_READABLE);
//如果是FULLRESYNC
if (!strncmp(reply,"+FULLRESYNC",11)) {
char *replid = NULL, *offset = NULL;
/* FULL RESYNC, parse the reply in order to extract the run id
* and the replication offset. */
replid = strchr(reply,' ');
if (replid) {
replid++;
offset = strchr(replid,' ');
if (offset) offset++;
}
if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) {
serverLog(LL_WARNING,
"Master replied with wrong +FULLRESYNC syntax.");
/* This is an unexpected condition, actually the +FULLRESYNC
* reply means that the master supports PSYNC, but the reply
* format seems wrong. To stay safe we blank the master
* replid to make sure next PSYNCs will fail. */
memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1);
} else {
//記錄返回來的replid 和 offset
memcpy(server.master_replid, replid, offset-replid-1);
server.master_replid[CONFIG_RUN_ID_SIZE] = '\0';
server.master_initial_offset = strtoll(offset,NULL,10);
serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
server.master_replid,
server.master_initial_offset);
}
/* We are going to full resync, discard the cached master structure. */
//清空已經存在的主機信息
replicationDiscardCachedMaster();
sdsfree(reply);
return PSYNC_FULLRESYNC;
}
}
從機讀取到了從主機發來的FULLRESYNC
信息后、保存了一些返回來的信息、接下來回到syncWithMaster
下面的代碼
/* Prepare a suitable temp file for bulk transfer */
//准備一個臨時文件、來放主機傳遞過來的RDB文件、
while(maxtries--) {
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
if (dfd != -1) break;
sleep(1);
}
if (dfd == -1) {
serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno));
goto error;
}
/* Setup the non blocking download of the bulk file. */
//監聽事件、回調函數 readSyncBulkPayload、 用來接收RDB文件!!
if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
== AE_ERR)
{
serverLog(LL_WARNING,
"Can't create readable event for SYNC: %s (fd=%d)",
strerror(errno),fd);
goto error;
}
//配置狀態為等着接收RDB、初始一些初始化數據
server.repl_state = REPL_STATE_TRANSFER;
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
server.repl_transfer_fd = dfd;
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_tmpfile = zstrdup(tmpfile);
return;
到這里、syncWithMaster
這個函數就結束了最終把狀態機變更為REPL_STATE_TRANSFER
,配置回調函數為readSyncBulkPayload
來處理RDB文件!!!
readSyncBulkPayload
這個函數就不分析了,太長了、主要就是把接收到RDB文件寫到臨時文件、清空數據、然后加載到數據庫、釋放各種資源!
當主機和從機建立鏈接后,其實就可以正常的復制數據了,當主機准備RDB的時候,也會有正常的命令打進來,這時候因為從機狀態是等待同步,所以這些命令會被打入到緩存區,等RDB文件同步完,主機會把緩沖區的數據打到從機,更新數據、這部分就不說了。到這里,同步就算完成了!!!
2.5 命令傳播
結過上面那些流程,總算能讓主從機達到數據的一致性、但是我們服務器是一直運行的,所以我們需要把主機的命令及時的同步到從機上面、但總不是每次都是同步RDB文件、那代價也太大了!
一個需求產生就有一個相對應的應對方法!命令傳播程序!主機在接收到數據的,經過命令傳播程序會把數據發送給自己的小從機們、有達到數據的一致性!因為考慮到Redis的高效性,命令傳播是異步進行的,所以在數據一致性上還是有點差異的,魚和熊掌不可兼得。作者也做了很多的彌補工作、后面再說!關於命令傳播的源碼就不放出來了replicationFeedSlaves
可以自行觀看、就是把數據寫到從機的 replcation buffer
里。同時也寫到backLog buffer(下文說)
自此主從整個復制流程已經結束!主機機器已經能夠正常的同步數據了!
等一等!!
從機也有斷網斷電的時候啊、不能我再次鏈接上來的時候又准備RDB文件吧。所以作者又再一次進行了優化,我也支持你批量同步!
2.6 部分同步
部分同步的三個點~
2.6.1 runId 運行ID
這個很容易理解,圖上已經說明、不再描述。
2.6.2 offset 復制偏移量
2.6.3 backlogbuffer 復制積壓緩沖區
那么通過下面這個圖來理解下offset
和backlogbuffer
。在命令傳播階段、主機不僅把命令傳遞到從機、還把接收到命令按字節數寫到backlogbuffer
區,就是為了怕從機沒有接收到傳遞的數據,備份一下! 每次接收到新數據,主機和從機都會更新自己的 offset值,以達到兩邊保持一致!
首先backlogbuffer
什么時候是創建的呢?
void syncCommand(client *c) {
//主機在解析從機發來 PSYNC 命令時、
//當有一個從機的時候、並且 back_log為null的時候
/* Create the replication backlog if needed. */
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {
/* When we create the backlog from scratch, we always use a new
* replication ID and clear the ID2, since there is no valid
* past history. */
changeReplicationId();
clearReplicationId2();
//創建 backlogBuffer
createReplicationBacklog();
}
}
//創建復制積壓緩沖區
void createReplicationBacklog(void) {
serverAssert(server.repl_backlog == NULL);
//配置文件配置大小、defult 1M
server.repl_backlog = zmalloc(server.repl_backlog_size);
//實際數據長度
server.repl_backlog_histlen = 0;
//下一次寫入命令的的位置
server.repl_backlog_idx = 0;
/* We don't have any data inside our buffer, but virtually the first
* byte we have is the next byte that will be generated for the
* replication stream. */
server.repl_backlog_off = server.master_repl_offset+1;
}
即當有一個從機鏈接到主機的時候,並且發送PSYNC
的時候。並且是所有從機共享的一個數據構建。
那么有了這三個要素,再來看下,如果斷網了從機是怎么批量同步的、注意這里是斷網即斷開了和主機的鏈接、但是數據已然還在!
當從機再次鏈接上來的時候、發現自己是有主機的信息的、所以發送命令帶上runId
和offset
上面的代碼也是有的!可以回頭看下從機發送PSYNC
命令的那個代碼!
那么主機是如何判斷支持部分同步的呢?
回到syncCommand
void syncCommand(client *c) {
//其他代碼...
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
// 在這里、這個函數看函數名!
if (masterTryPartialResynchronization(c) == C_OK) {
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
char *master_replid = c->argv[1]->ptr;
/* Increment stats for failed PSYNCs, but only if the
* replid is not "?", as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
* resync. */
if (master_replid[0] != '?') server.stat_sync_partial_err++;
}
}
//其他代碼...
}
int masterTryPartialResynchronization(client *c) {
//其他代碼...
/* We still have the data our slave is asking for? */
//這個翻譯就是 。我們有從機要的數據!。。。。意思就是我有備份、不用生成RDB!
//backlogBuffer不存在
//或者、傳遞過來的偏移量 < 當前主機的偏移量 (意思是主機跑的太快了、覆蓋了一些數據)
//或者是當前從機比主機跑的還數據還要多
if (!server.repl_backlog ||
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
{
serverLog(LL_NOTICE,
"Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset);
if (psync_offset > server.master_repl_offset) {
serverLog(LL_WARNING,
"Warning: replica %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
}
//否則就走全量同步
goto need_full_resync;
}
//其他代碼...
//發送在緩沖區的數據
psync_len = addReplyReplicationBacklog(c,psync_offset);
}
我們只看重點代碼、一些其他干擾的就去掉了!上面代碼可以看了,只在數據還在復制積壓緩沖區就不用走全量同步!從機等着接收數據更新Offset
即可。
上面修復了斷網的情況下~現在又有新情況下!
2.7 特殊情況下的批量同步
2.7.1 從機重啟
也沒關系~后來作者也針對這種情況做了優化!看圖
當從機關機重新的時候、會把當前同步的信息保存到RDB文件中、持久化到磁盤中。等下次重新啟動的時候,再給拿回去!這樣保證了cached_master
數據不會丟失。
2.7.2 換主的情況下
當主機變為從機的時候、從機(6380)會做了一件下面有意義的事情即”把原來的repl_id 和 offset 保存到 備份、到replid2和scond_offset“
當之前下線的主機(6379)上線后,變為(6380)的從機。當6379再向6380請求同步數據的時候,帶上自己的(原來當主機時候的數據)repliid和offset
,再看是主機是如果支持指同步的.再回到函數masterTryPartialResynchronization
void masterTryPartialResynchronization(client *c) {
//其他代碼
/* Is the replication ID of this master the same advertised by the wannabe
* slave via PSYNC? If the replication ID changed this master has a
* different replication history, and there is no way to continue.
*
* Note that there are two potentially valid replication IDs: the ID1
* and the ID2. The ID2 however is only valid up to a specific offset. */
//我們說了,當6380被選為主機的時候,備份了原來的 replid 和 offset! 這一點至關重要
if (strcasecmp(master_replid, server.replid) &&
// 因為這里支持了對 replid2的判斷,即我還記得你,你之前是我爸爸,但是,你不能比我跑的快、
//不然不好意思、去全量同步我的!
(strcasecmp(master_replid, server.replid2) ||
psync_offset > server.second_replid_offset))
{
/* Run id "?" is used by slaves that want to force a full resync. */
if (master_replid[0] != '?') {
if (strcasecmp(master_replid, server.replid) &&
strcasecmp(master_replid, server.replid2))
{
serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
"Replication ID mismatch (Replica asked for '%s', my "
"replication IDs are '%s' and '%s')",
master_replid, server.replid, server.replid2);
} else {
serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
"Requested offset for second ID was %lld, but I can reply "
"up to %lld", psync_offset, server.second_replid_offset);
}
} else {
serverLog(LL_NOTICE,"Full resync requested by replica %s",
replicationGetSlaveName(c));
}
goto need_full_resync;
}
}
所以通過以上代碼可以得知,從機變主機,為了不忘記原來的主機,保留主機的信息!以便下次來的時候,還能認得你!
所以到這里同步就真的完了!!沒有然后了!從上看下來,主機同步做的工作真的很多,每一步走錯了都是致命的。本次分析的只是部分代碼,還有60%的代碼沒有分析到!最后還有一個心跳機制要說下!
3 心跳探測
在命令傳播階段呢,從服務器會以1秒的頻率向主機發送 Replication ACK <offset>
作者設計這個目的也是會了增強主從之間的數據一致性、Redis被稱為高可用、高性能的服務器,那么對它的加強措施是一點兒也不能松懈!換名話說:如果你能和我一塊友好的工作,那么就OK,否則Kill掉你,如果你的網絡狀態不好,那么不好意思,主機也拒絕寫命令!那么這樣看來:數據的一致性在這塊設計還是贏了一回!
4 總結
就總結兩個點吧、其他還有很多,網上都能找到。那么通過本文的學習我們可以一起考慮以下兩點。
- 數據的丟失
- 數據的一致性
4.1 數據丟失
Reids盡最大可能保持數據不要丟失。比如:持久化。但在我們剛才講的換主的情況下 、如果主機執行一個數據,因為命令傳播民異步的,那么就有可能失敗!如果從機真的失敗了,剛好主機又下線了!當失敗的從機被選為主機,下線的主機又被配置為從機,那么在同步的時候剛才那條命令就會丟失!因為Reids在保持數據的一致性!
所以Redis最好只能用來做緩存,不要當作真的數據庫來用
4.2數據不一致性
從上面可以看出來,數據同步是異步的,所以就有可能讀寫不一致!
那么避免這種情況,網絡要好、機器要好、同時Redis的的配置項也能配置的6
本文參考資料redis源碼5.0.9