目錄
目錄 1
1. 前言 1
2. 配置項 1
3. redisServer 2
4. feedReplicationBacklog-寫repl_backlog 3
5. addReplyReplicationBacklog-讀repl_backlog 4
1. 前言
注意,repl_backlog只針對部分復制(Partial Replication),而非全量復制。
本文內容基於redis-5.0.5(截至2019/6/6的最新版本),本文深入介紹REdis主從復制的部分復制核心要素repl_backlog,與其相關的配置直接影響主從間的穩定性,對提升集群的穩定性十分重要。
注意REdis的主節點把所有從節點也當作一個Client看待,正常的數據同步並不涉及repl_backlog。當從節點斷開重連,這個時候repl_backlog的作用就體現出來了。截至到5.0.5版本,從節點重啟用不上repl_backlog,原因是從節點沒有保存repl_backlog的信息,無法實現部分同步,但可少量改動REdis源代碼,實現從節點重啟后的部分復制。
正常情況下,主節點會往從節點連接緩沖區寫一份數據,同時往repl_backlog也寫一份數據,所有從節點共享同一份repl_backlog,因此可以考慮repl_backlog配置大一點,以容忍從節點更長時間失聯。
從節點向主節點發送命令PSYNC,觸發部分復制。有關REdis主從復制的細節,請參見《REdis復制研究》。
2. 配置項
REdis的復制分全量復制和部分復制,全量復制是個很重的過程,而部分復制則是輕量的,部分復制實際是一個增量復制。
REdis的主節點創建和維護一個環形緩沖復制隊列(即repl_backlog),從節點部分復制(增量復制)的數據均來自於repl_backlog。
主節點只有一個repl_backlog,所有從節點共享,直接相關的配置項有兩個:
配置項名 |
配置項說明 |
repl-backlog-size |
環形緩沖復制隊列大小,可不帶單位,但同時支持單位:b、k、kb、m、mb、g、gb,單位不區分大小寫,其中k、m、g間的計算倍數是1000,而kb、mb和gb的計算倍數是1024。 |
repl-backlog-ttl |
環形緩沖復制隊列存活時長(所有slaves不可用時,保留repl_backlog多長時間,單位:秒) |
3. redisServer
結構體redisServer是REdis的第一核心結構,repl_backlog是它的組成成員。
struct redisServer { /* My current replication offset */ long long master_repl_offset; /* Accept offsets up to this for replid2. */ long long second_replid_offset; /* Replication backlog for partial syncs */ char *repl_backlog; // 環形緩沖復制隊列 /* Backlog circular buffer size */ long long repl_backlog_size; // 環形緩沖復制隊列容量 /* Backlog actual data length */ long long repl_backlog_histlen; // 環形緩沖復制隊列已用大小(影響是否能部分復制) /* Backlog circular buffer current offset, that is the next byte will'll write to.*/ // 實際上談不上空閑,因為總是環繞覆蓋寫, // 理解為最新數據的截止位置更為合適,更新的數據總是從這里開始寫入到repl_backlog中。 long long repl_backlog_idx; // 環形緩沖復制隊列空閑起始位置(寫從這里開始) /* Replication "master offset" of first byte in the replication backlog buffer.*/ long long repl_backlog_off; // 數據在環形緩沖復制隊列的起始位置(讀從這里開始) /* Time without slaves after the backlog gets released. */ time_t repl_backlog_time_limit; // 環形緩沖復制隊列生存時長 /* We have no slaves since that time. Only valid if server.slaves len is 0. */ time_t repl_no_slaves_since; // 無可用從節點的發生時間 /* Min number of slaves to write. */ int repl_min_slaves_to_write; // 最小需寫的從節點數 /* Max lag of <count> slaves to write. */ int repl_min_slaves_max_lag; /* Number of slaves with lag <= max_lag. */ int repl_good_slaves_count; /* Send RDB to slaves sockets directly. */ int repl_diskless_sync; // 不落磁盤(無盤)往從節點發送RDB(全量復制) /* Delay to start a diskless repl BGSAVE. */ // 無盤復制時,延遲指定的時長,以等待更多的從節點 int repl_diskless_sync_delay; }; |
4. feedReplicationBacklog-寫repl_backlog
/* Add data to the replication backlog. * This function also increments the global replication offset stored at * server.master_repl_offset, because there is no case where we want to feed * the backlog without incrementing the offset. */ // 主要被replicationFeedSlaves調用 // 寫len長的數據ptr到repl_backlog中 // repl_backlog是一個環形buffer,不存在溢出的問題,策略是最新數據覆蓋最老數據。 // 如果參數len大於repl_backlog_size, // 則repl_backlog沒有實際意義, // 因為無法存儲一份完整數據。 void feedReplicationBacklog(void *ptr, size_t len) { unsigned char *p = ptr; server.master_repl_offset += len;
/* This is a circular buffer, so write as much data we can at every * iteration and rewind the "idx" index if we reach the limit. */ // 因為repl_backlog是環形buffer, // 剩余的空間可能容納不了len長的數據, // 當不夠時,就需要環繞從頭開始寫, // 因此這里需while循環。 while(len) { // repl_backlog_size為repl_backlog的容量大小, // 由配置項決定repl_backlog_size值決定, // repl_backlog_idx是repl_backlog空閑區域的起始位置, // 這兩個值相減得到repl_backlog可用大小。 size_t thislen = server.repl_backlog_size - server.repl_backlog_idx; // 如果thislen大於len,則表示足夠容納 if (thislen > len) thislen = len; memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen); // 空閑位置往后挪動 server.repl_backlog_idx += thislen; // 如果空閑位置達到容量大小,則環繞回去從0開始 if (server.repl_backlog_idx == server.repl_backlog_size) server.repl_backlog_idx = 0; len -= thislen; p += thislen; // repl_backlog_histlen記錄了repl_backlog中的數據大小 server.repl_backlog_histlen += thislen; } // 修正存儲在repl_backlog中的數據大小, // 它不可能超過repl_backlog_size值。 if (server.repl_backlog_histlen > server.repl_backlog_size) server.repl_backlog_histlen = server.repl_backlog_size; /* Set the offset of the first byte we have in the backlog. */ server.repl_backlog_off = server.master_repl_offset - server.repl_backlog_histlen + 1; } |
5. addReplyReplicationBacklog-讀repl_backlog
當主節點判斷可部分復制時,會記錄如下日志:
Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld. |
給從節點的響應頭為“+CONTINUE replid\r\n”或“+CONTINUE\r\n”。函數addReplyReplicationBacklog的實現:
/* Feed the slave 'c' with the replication backlog starting from the * specified 'offset' up to the end of the backlog. */ // 被masterTryPartialResynchronization調用 // 而masterTryPartialResynchronization被syncCommand調用(對應命令PSYNC)。 // 從repl_backlog取數據發給slave, // 數據的開始位置由offset指定。 long long addReplyReplicationBacklog(client *c, long long offset) { long long j, skip, len;
serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset); // repl_backlog_histlen為0, // 表示repl_backlog中無數據。 if (server.repl_backlog_histlen == 0) { serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero"); return 0; }
serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld", server.repl_backlog_size); serverLog(LL_DEBUG, "[PSYNC] First byte: %lld", server.repl_backlog_off); serverLog(LL_DEBUG, "[PSYNC] History len: %lld", server.repl_backlog_histlen); serverLog(LL_DEBUG, "[PSYNC] Current index: %lld", server.repl_backlog_idx);
/* Compute the amount of bytes we need to discard. */ skip = offset - server.repl_backlog_off; serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip);
/* Point j to the oldest byte, that is actually our * server.repl_backlog_off byte. */ j = (server.repl_backlog_idx + (server.repl_backlog_size-server.repl_backlog_histlen)) % server.repl_backlog_size; serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j);
/* Discard the amount of data to seek to the specified 'offset'. */ j = (j + skip) % server.repl_backlog_size;
/* Feed slave with data. Since it is a circular buffer we have to * split the reply in two parts if we are cross-boundary. */ len = server.repl_backlog_histlen - skip; serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len); while(len) { long long thislen = ((server.repl_backlog_size - j) < len) ? (server.repl_backlog_size - j) : len;
serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen); addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen)); len -= thislen; j = 0; } return server.repl_backlog_histlen - skip; } |