對於並發請求很高的生產環境,單個Redis滿足不了性能要求,通常都會配置Redis集群來提高服務性能。3.0之后的Redis支持了集群模式。
Redis官方提供的集群功能是無中心的,命令請求可以發送到任意一個Redis節點,如果該請求的key不是由該節點負責處理,則會返回給客戶端MOVED錯誤,提示客戶端需要轉向到該key對應的處理節點上。支持集群模式的redis客戶端會自動進行轉向,普通模式客戶端則只返回MOVED錯誤。
先看下常見的Redis集群結構:

節點兩兩之間都有連接,只有主節點可以處理客戶端的命令請求;從節點復制主節點數據,並在主節點下線后,升級為主節點。每個主節點可以掛多個從節點,在主節點下線后從節點需要競爭,只有一個從節點會被選舉為主節點。
考慮以下幾個關鍵點:
- 節點是如何互發現的,請求又是如何分配到各個節點的?
- 其中部分節點出現故障,其他節點是如何發現的又是怎樣恢復的?
- 主節點下線后從節點是如何競爭的?
- 是否可以不中斷Redis服務進行動態的擴容?
接下來幾篇會從這幾個關鍵問題入手來分析Redis集群源碼;首先先看集群的基本數據結構,以及節點之間是如何建立連接的
1. 數據結構
Redis集群是無中心的,每個節點會存儲整個集群各個節點的信息。我們看下Redis源碼中存儲集群節點信息的數據結構:
struct clusterNode { //clusterState->nodes結構 集群數據交互接收的地方在clusterProcessPacket
mstime_t ctime; /* Node object creation time. */
char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
int flags; /* REDIS_NODE_... */ //取值可以參考clusterGenNodeDescription
uint64_t configEpoch; /* Last configEpoch observed for this node */
unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */
int numslots; /* Number of slots handled by this node */
int numslaves; /* Number of slave nodes, if this is a master */
struct clusterNode **slaves; /* pointers to slave nodes */
struct clusterNode *slaveof; /* pointer to the master node */ //注意ClusterNode.slaveof與clusterMsg.slaveof的關聯
mstime_t ping_sent; /* Unix time we sent latest ping */
mstime_t pong_received; /* Unix time we received the pong */
mstime_t fail_time; /* Unix time when FAIL flag was set */
mstime_t voted_time; /* Last time we voted for a slave of this master */
mstime_t repl_offset_time; /* Unix time we received offset for this node */
long long repl_offset; /* Last known repl offset for this node. */
char ip[REDIS_IP_STR_LEN]; /* Latest known IP address of this node */
int port; /* Latest known port of this node */
A節點 B節點
clusterNode-B(link1) ---> link2(該link不屬於任何clusterNode) (A發起meet到B) 步驟1
link4 <---- clusterNode-A(link3) (該link不屬於任何clusterNode) (B收到meet后,再下一個clusterCron中向A發起連接) 步驟2
*/
//clusterCron如果節點的link為NULL,則需要進行重連,在freeClusterLink中如果和集群中某個節點異常掛掉,則本節點通過讀寫事件而感知到,
//然后在freeClusterLink置為NULL
clusterLink *link; /* TCP/IP link with this node */ //還有個賦值的地方在clusterCron,當主動和對端建立連接的時候賦值
list *fail_reports; /* List of nodes signaling this as failing */ //鏈表中成員類型為clusterNodeFailReport
};
typedef struct clusterNode clusterNode;
clusterNode結構體存儲了一個節點的基本信息,包括節點的IP,port,連接信息等;Redis節點每次和其他節點建立連接都會創建一個clusterNode用來記錄其他節點的信息, 這些clusterNode都會存儲到clusterState結構中,每個節點自身只擁有一個clusterState,用來存儲整個集群系統的狀態和信息。
typedef struct clusterState { //數據源頭在server.cluster //集群相關配置加載在clusterLoadConfig
clusterNode *myself; /* This node */
uint64_t currentEpoch;
int state; /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */
int size; /* Num of master nodes with at least one slot */ //默認從1開始,而不是從0開始
dict *nodes; /* Hash table of name -> clusterNode structures */
......
// 例如 slots[i] = clusterNode_A 表示槽 i 由節點 A 處理
clusterNode *slots[REDIS_CLUSTER_SLOTS];
zskiplist *slots_to_keys;
/* The following fields are used to take the slave state on elections. */
......
} clusterState;
clusterState結構中還有很多是故障遷移時需要用到的成員,與集群連接初始化關系不大,可以先不關注,后面再分析。nodes* 存儲的就是本節點所知的集群所有節點的信息。
2 連接建立
集群節點在初始化前都是孤立的Redis服務節點,還沒有連成一個整體。其他節點的信息是如何被該節點獲取的,整個集群是如何連接起來的呢?
這里有兩種途徑:
1)人為干預指定讓節點和其他節點連接,也就是通過cluster meet命令來指定要連入的其他節點;
2)集群自發傳播,靠集群內部的gossip協議自發擴散其他節點的信息。想象下如果沒有集群內部的自發傳播,任意兩個節點間的連接都需要人為輸入命令來建立;節點數如果為n, 整個集群建立的總連接數量會達到n*(n-1);要想建立起整個集群,讓每個節點都知道完整的集群信息,需要的cluster meet指令數量是O(n2),節點多起來的話初始化的成本會很高。所以說內部自發的傳播是很有必要的。
下面來看兩種方式的源碼實現:
Meet指令
CLUSTER MEET <ip> <port>
該指令會指定另一個節點的ip和port,讓接收到MEET命令的Redis節點去和該ip和端口建立連接;
struct redisCommand redisCommandTable[] = { //sentinelcmds redisCommandTable 配置文件加載見loadServerConfigFromString 所有配置文件加載見loadServerConfigFromStringsentinel
{"get",getCommand,2,"r",0,NULL,1,1,1,0,0},
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
{"setnx",setnxCommand,3,"wm",0,NULL,1,1,1,0,0},
......
{"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0},
......
}
可以看出Redis服務處理cluster meet指令的函數是clusterCommand。
//CLUSTER 命令的實現
void clusterCommand(redisClient *c) {
// 不能在非集群模式下使用該命令
if (server.cluster_enabled == 0) {
addReplyError(c,"This instance has cluster support disabled");
return;
}
if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
/* CLUSTER MEET <ip> <port> */
// 將給定地址的節點添加到當前節點所處的集群里面
long long port;
// 檢查 port 參數的合法性
if (getLongLongFromObject(c->argv[3], &port) != REDIS_OK) {
addReplyErrorFormat(c,"Invalid TCP port specified: %s",
(char*)c->argv[3]->ptr);
return;
}
//A通過cluster meet bip bport B后,B端在clusterAcceptHandler接收連接,A端通過clusterCommand->clusterStartHandshake連接服務器
// 嘗試與給定地址的節點進行連接
if (clusterStartHandshake(c->argv[2]->ptr,port) == 0 &&
errno == EINVAL)
{
// 連接失敗
addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
(char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
} else {
// 連接成功
addReply(c,shared.ok);
}
......
}
A節點收到cluster meet B指令后,A進入處理函數clusterCommand,並在該函數中調用clusterStartHandshake連接B服務器。這個函數實質上也只是創建一個記錄了B節點信息的clusterNode(B),並將clusterNode(B)的link置為空。真正發起連接的是集群的時間事件處理函數clusterCron。clusterCron會遍歷A節點上所有的nodes,並向link為空的節點發起連接。這里的連接又用到前面介紹的文件事件機制,不再贅述。
Gossip消息擴散
Gossip消息的擴散是利用節點之間的ping消息,在通過meet建立連接之后為了對節點在線狀態進行檢測,每個節點都要對自己已知集群節點發送ping消息,如果在超時時間內返回了pong則認為節點正常在線。
假定對於A、B、C三個節點,初始只向A節點發送了如下兩條meet指令:
Cluster meet B
Cluster meet C
對於A來講,B和C都是已知的節點信息;A會向B、C分別發送ping消息;在A發送ping消息給B時,發送方A會在gossip消息體中隨機帶上已知的節點信息(假設包含C節點);接收到ping消息的B節點會解析這gossip消息體中的節點信息,發現C節點是未知節點,那么就會向C節點進行握手,並建立連接。那么對B來講,C也成為了已知節點。
看下接收gossip消息並處理未知節點的函數實現:
*/ //解釋 MEET 、 PING 或 PONG 消息中和 gossip 協議有關的信息。
void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
// 記錄這條消息中包含了多少個節點的信息
uint16_t count = ntohs(hdr->count);
// 指向第一個節點的信息
clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
// 取出發送者
clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
// 遍歷所有節點的信息
while(count--) {
sds ci = sdsempty();
// 分析節點的 flag
uint16_t flags = ntohs(g->flags);
// 信息節點
clusterNode *node;
// 取出節點的 flag
if (flags == 0) ci = sdscat(ci,"noflags,");
if (flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
if (flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
if (flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
if (flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
if (flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
if (flags & REDIS_NODE_HANDSHAKE) ci = sdscat(ci,"handshake,");
if (flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",
g->nodename,
g->ip,
ntohs(g->port),
ci);
sdsfree(ci);
/* Update our state accordingly to the gossip sections */
// 使用消息中的信息對節點進行更新
node = clusterLookupNode(g->nodename);
// 節點已經存在於當前節點
if (node) {
/* We already know this node.
Handle failure reports, only when the sender is a master. */
if (sender && nodeIsMaster(sender) && node != myself) {
if (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)) {//發送端每隔1s會從集群挑選一個節點來發送PING,參考CLUSTERMSG_TYPE_PING
// 添加 sender 對 node 的下線報告
if (clusterNodeAddFailureReport(node,sender)) {
//clusterProcessGossipSection->clusterNodeAddFailureReport把接收的fail或者pfail添加到本地fail_reports
redisLog(REDIS_VERBOSE,
"Node %.40s reported node %.40s as not reachable.",
sender->name, node->name); //sender節點告訴本節點node節點異常了
}
// 嘗試將 node 標記為 FAIL
markNodeAsFailingIfNeeded(node);
// 節點處於正常狀態
} else {
// 如果 sender 曾經發送過對 node 的下線報告
// 那么清除該報告
if (clusterNodeDelFailureReport(node,sender)) {
redisLog(REDIS_VERBOSE,
"Node %.40s reported node %.40s is back online.",
sender->name, node->name);
}
}
}
/* If we already know this node, but it is not reachable, and
* we see a different address in the gossip section, start an
* handshake with the (possibly) new address: this will result
* into a node address update if the handshake will be
* successful. */
// 如果節點之前處於 PFAIL 或者 FAIL 狀態
// 並且該節點的 IP 或者端口號已經發生變化
// 那么可能是節點換了新地址,嘗試對它進行握手
if (node->flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL) &&
(strcasecmp(node->ip,g->ip) || node->port != ntohs(g->port)))
{
clusterStartHandshake(g->ip,ntohs(g->port));
}
// 當前節點不認識 node
} else {
if (sender &&
!(flags & REDIS_NODE_NOADDR) &&
!clusterBlacklistExists(g->nodename))
//如果本節點通過cluster forget把某個節點刪除本節點集群的話,那么這個被刪的節點需要等黑名單過期后本節點才能發送handshark
{
clusterStartHandshake(g->ip,ntohs(g->port)); //這樣本地就會創建這個不存在的node節點了,本地也就有了sender里面有,本地沒有的節點了
}
}
/* Next node */
// 處理下個節點的信息
g++;
}
}
Gossip協議的原理通俗來講就是一傳十,十傳百;互相之間傳遞集群節點信息,最終可以達到系統中所有節點都能獲取到完整的集群節點。在ping消息中附加集群節點信息,帶來的額外負擔就是每次接收到ping消息都要預先遍歷下gossip消息中所有節點信息,並判斷是否有包含自身未知的節點,還要建立連接。為了減輕接收方的負擔,gossip消息可以不附帶所有節點信息,附帶隨機節點也可以最終達到所有節點都去到完整集群信息的目的。
http://www.cgpwyj.cn/
http://news.cgpwyj.cn/
http://item.cgpwyj.cn/
http://www.peacemind.com.cn/
http://news.peacemind.com.cn/
http://item.peacemind.com.cn/
http://www.tasknet.com.cn/
http://news.tasknet.com.cn/
http://item.tasknet.com.cn/
http://www.ownbar.cn/
http://news.ownbar.cn/
http://item.ownbar.cn
http://www.shtarchao.net.cn/
http://news.shtarchao.net.cn/
http://item.shtarchao.net.cn/
http://www.metroworld.com.cn/
http://news.metroworld.com.cn/
http://www.cngodo.cn/
http://news.cngodo.cn/
http://item.cngodo.cn/
http://www.gzrdbp.cn/
http://news.gzrdbp.cn/
http://item.gzrdbp.cn/
http://www.dnapt.cn/
http://news.dnapt.cn/
http://item.dnapt.cn/
http://www.ncxlk.cn/
http://news.ncxlk.cn/
http://item.ncxlk.cn/
http://www.zgxxyp.cn/
http://news.zgxxyp.cn/
http://item.zgxxyp.cn/
http://www.sjjdvr.cn/
http://news.sjjdvr.cn/
http://item.sjjdvr.cn/
http://www.sujinkeji.cn/
http://news.sujinkeji.cn/
http://item.sujinkeji.cn/
http://www.zsjxbd.cn/
http://news.zsjxbd.cn/
http://item.zsjxbd.cn/
http://www.yesgas.cn/
http://news.yesgas.cn/
http://item.yesgas.cn/
http://www.quickpass.sh.cn/
http://news.quickpass.sh.cn/
http://item.quickpass.sh.cn/
http://www.jspcrm.cn/
http://news.jspcrm.cn/
http://item.jspcrm.cn/
http://www.yjdwpt.cn/
http://news.yjdwpt.cn/
http://item.yjdwpt.cn/
http://www.henanwulian.cn/
http://news.henanwulian.cn/
http://item.henanwulian.cn/
http://www.hhrshh.cn/
http://news.hhrshh.cn/
http://item.hhrshh.cn/
http://www.gpgold.cn/
http://news.gpgold.cn/
http://item.gpgold.cn/
http://www.jingzhuiyou.cn/
http://news.jingzhuiyou.cn/
http://item.jingzhuiyou.cn/

