分析版本:REdis-5.0.4。
REdis命令處理流程可分解成三個獨立的流程(不包括復制和持久化):
1) 接受連接請求流程;
2) 接收請求數據和處理請求流程,在這個過程並不會發送處理結果給Client,而只是將結果數據寫入響應緩沖,將由響應請求流程來發送;
3) 響應請求流程。
上述三個流程均是異步化的,並且沒有直接的聯系。它們的共同點均是通過REdis的簡單事件驅動(AE,A simple event-driven)觸發,對於Linux實際是epoll的包裝,對於macOS為evport的包裝,對於FreeBSD對kqueue的包裝,對於其它則是select的包裝。
可以把ae.h/ae.c看成是抽象基類,而ae_epoll.c、ae_select.c、ae_evport.c、ae_kqueue.c看成是ea的具體實現,以面向對象來看,大致如下圖所示:
從上圖可以看出,當沒有任何數據時,進程將阻塞在函數aeApiPoll(對於epoll實際為epoll_wait)處直接超時。
如果有連接請求進來,或者有連接發送數據過來,或者有響應數據還未發送完成(連接變成可寫),aeApiPoll均會立即從阻塞狀態返回。
注意,只有fd被塞進了epoll,並沒有將client或aeFileEvent塞入epoll。因此當一個連接被激活(比如有數據需要接收)時,需要通過fd來查找到aeFileEvent,而client因為在創建aeFileEvent時就被賦值給了aeFileEvent的clientData,因此只需要找到aeFileEvent即可。
全局對象server(類型為redisServer,定義在server.h中)維護了一個全局的aeEventLoop在,則aeEventLoop維護了一個aeFileEvent數組,並且aeFileEvent的數組下標為fd,因此很容易通過fd找到對應的aeFileEvent。
之所以沒有將aeFileEvent直接注入到epoll,是為了統一事件驅動,比如select就不支持。在進程啟動執行initServer時,會調用aeCreateEventLoop初始化該數組,數組大小大於配置項maxclients指定的值(額外加128),這利用了fd作為操作系統內核資源是循環利用的特性。
1. 接受連接請求流程
接受一個連接后,為該連接創建一個client對象,並將該client注冊到epoll中,注冊事件為EPLLIN(對應於ea的AE_READABLE),。
對應的偽代碼:
int main() { // “ae”為“A simple event-driven”的縮寫 void aeMain() { while (!eventLoop->stop) { // 響應從beforesleep開始, // 未完成部分才會走到aeApiPoll。 if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop);
// aeProcessEvents處理各種事件,包括: // 1) 接受連接請求,為每個連接創建一個client // 2) 接收請求數據,和處理請求 // 3) 發送響應數據 // 4) 處理各類定時事件(調用processTimeEvents) int aeProcessEvents() { // aeApiPoll實為epoll或select等 aeApiPoll();
acceptTcpHandler(int fd) { // fd為listen套接字 // anetTcpAccept底層調用的是accept int cfd = anetTcpAccept(fd);
acceptCommonHandler(cfd) { // createClient會將c添加server.clients中, // server.clients是一個鏈接。 client *c = createClient(cfd) { aeCreateFileEvent( server.el,fd,AE_READABLE, readQueryFromClient, c) { // mask值為AE_READABLE(對應於epoll的EPOLLIN), // 對於epoll實際調用的是epoll_ctl。 aeApiAddEvent(eventLoop,fd,mask); } } } } } } } } |
2. 接收請求數據和處理請求流程
這一塊會調用相應命令的處理函數,比如SET命令的處理函數setCommand,GET命令的處理函數getCommand。命令處理函數會修改內存數據。
並將處理的結果寫入響應緩沖區,但並不立即發送給client。同時也會將處理結果寫入AOF緩沖區,如果開啟了AOF。以及將命令寫入到復制積壓緩沖區,如果有開啟或有需要。還會將命令寫入到slaves的緩沖區,如果需要。
響應請求在另一獨立的流程中進行,本流程並不直接發送響應給client。
對應的偽代碼:
// 不包括響應命令, // 響應和接收處理是分開的兩個過程。 int main() // server.c:4003 { // “ae”為“A simple event-driven”的縮寫 void aeMain() // ae.c:496 { while (!eventLoop->stop) { // 發送響應先在beforesleep中進行, // 如果在beforesleep中沒有發送完(比如響應的數據量過大), // 則后續的發送會由aeApiPoll觸發。 if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop);
int aeProcessEvents() // ae.c:358 { // aeApiPoll實為epoll或select等 aeApiPoll();
// readQueryFromClient是個回調函數, // 在創建client時注冊: // client *createClient(int fd) { // aeCreateFileEvent( // server.el, fd, // AE_READABLE, // readQueryFromClient, c); // }
void readQueryFromClient() // networking.c:1494 { // 這里調用read收數據 // client傳過來的數據大小不能超過配置項client-query-buffer-limit指定的值。 // 默認大小為1G,足夠覆蓋大部場景。 // 如果超過大小,則可看到WARNING日志: // Closing client that reached max query buffer length // 實際中,一般遠小於1G,所以可能將這個值調小一點,以增加對REdis的保護。 int nread = read(fd, c->querybuf, readlen); int processCommand(client*) // networking.c:2543 { redisCommand* lookupCommand(name) { // REdis所有命令存儲 // 在struct redisServer的command表中: // struct redisServer { // dict *commands; // Command table // }; // 可將redisCommand看作一個C++抽象基本, // 該抽象基本定義了純虛函數proc: // typedef void redisCommandProc(client *c); // struct redisCommand { // redisCommandProc *proc; // }; // 而command表中的每一個成員則為redisCommand的實現。 return dictFetchValue(commands,name); }
void call(client*,flags) // server.c:2414 { // 回調具體的命令處理: // 如果是SET命令, // 實際調用的是t_string.c中的函數setCommand; // 如果是DEL命令, // 實際調用的是db.c中的函數delCommand。 redisCommand::proc(client*);
void propagate(redisCommand*) // server.c:2315 { // 數據寫入到AOF文件 feedAppendOnlyFile(); // aof.c:555
// 數據復制給所有Slaves void replicationFeedSlaves(slaves) // replication.c:173 { // 數據寫入到復制積壓(Backlog)緩沖區, // 注意積壓緩沖區是一個循環緩沖區, // 如果滿了,則從頭覆蓋寫, // 循環緩沖區的大小, // 則配置項repl-backlog-size決定 feedReplicationBacklog(); // replication.c:126 } } } } } } } } }
// 以GET命令為列: // 這里的list實際為server.clients_pending_write // 所以需響應的client都添加到server.clients_pending_write鏈表中(可視為隊列) // struct redisServer server; // Server global state #0 listAddNodeHead (list=0x7fe88bc0f210, value=0x7fe88bc64ec0) at adlist.c:92 // 並不是所有的命令都需要WriteHandler, // 因此有些並不會調用clientInstallWriteHandler。 #1 in clientInstallWriteHandler (c=0x7fe88bc64ec0) at networking.c:185 #2 in prepareClientToWrite (c=0x7fe88bc64ec0) at networking.c:228 #3 in addReplyString (c=0x7fe88bc64ec0, s=0x7ffdfc2e70c0 "$855\r\n", len=6) at networking.c:338 #4 in addReplyLongLongWithPrefix (c=0x7fe88bc64ec0, ll=855, prefix=36 '$') at networking.c:515 #5 in addReplyBulkLen (c=0x7fe88bc64ec0, obj=0x7fe889312840) at networking.c:557 #6 in addReplyBulk (c=0x7fe88bc64ec0, obj=0x7fe889312840) at networking.c:562 #7 in getGenericCommand (c=0x7fe88bc64ec0) at t_string.c:167 #8 in getCommand (c=0x7fe88bc64ec0) at t_string.c:173 #9 in call (c=0x7fe88bc64ec0, flags=15) at server.c:2437 #10 in processCommand (c=0x7fe88bc64ec0) at server.c:2729 #11 in processInputBuffer (c=0x7fe88bc64ec0) at networking.c:1451 #12 in processInputBufferAndReplicate (c=0x7fe88bc64ec0) at networking.c:1486 #13 in readQueryFromClient (el=0x7fe88bc30050, fd=8, privdata=0x7fe88bc64ec0, mask=1) at networking.c:1568 #14 in aeProcessEvents (eventLoop=0x7fe88bc30050, flags=11) at ae.c:443 #15 in aeMain (eventLoop=0x7fe88bc30050) at ae.c:501 #16 in main (argc=2, argv=0x7ffdfc2e75b8) at server.c:4197 |
3. 響應請求流程
對於每一個有響應的命令,它的響應總是首先在beforesleep中進行,但如果一次沒能發送完成,則會交給sendReplyToClient后續異步處理(以epoll為例,通過注冊epoll的EPOLLOUT事件)。
對應的偽代碼:
// 響應和接收處理是分開的兩個過程。 int main() { // “ae”為“A simple event-driven”的縮寫 void aeMain() { while (!eventLoop->stop) { // 調用eventLoop->beforesleep(eventLoop); // 但實際調用的是server.c中的beforeSleep: void beforeSleep(struct aeEventLoop*) { int handleClientsWithPendingWrites() { // REdis接收和處理 // 命令流程會設置clients_pending_write, // clients_pending_write實為一個隊列鏈接。 // 當處理完一個命令后,調用clientInstallWriteHandler // 將當前client添加到clients_pending_write中。 // 但是有些命令並不需要響應,因此沒有這個動作。 listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) { int writeToClient(int fd,client* c) { write(fd,c->buf); // 如果全部發送完了, // 則調用aeDeleteFileEvent // 將fd從epoll中移除。 if (!clientHasPendingReplies(c)) { aeDeleteFileEvent( server.el, c->fd, AE_WRITABLE); // 從epoll中刪除EPOLLOUT } }
// 如果一次writeToClient調用沒有發完, // 則將fd注冊到epoll if (clientHasPendingReplies(c)) { // 下列動作是設置epoll的EPOLLOUT int ae_flags = AE_WRITABLE; // 將EPOLLOUT添加到epoll中 aeCreateFileEvent( server.el, c->fd, ae_flags, sendReplyToClient, c); } } } }
// REdis接收和處理一個命令流程 aeProcessEvents(); } } } |