REdis命令處理流程處理分析


 

分析版本:REdis-5.0.4

 

REdis命令處理流程可分解成三個獨立的流程(不包括復制和持久化):

1) 接受連接請求流程;

2) 接收請求數據和處理請求流程,在這個過程並不會發送處理結果給Client,而只是將結果數據寫入響應緩沖,將由響應請求流程來發送;

3) 響應請求流程。

 

上述三個流程均是異步化的,並且沒有直接的聯系。它們的共同點均是通過REdis的簡單事件驅動(AEA simple event-driven)觸發,對於Linux實際是epoll的包裝,對於macOSevport的包裝,對於FreeBSDkqueue的包裝,對於其它則是select的包裝。

可以把ae.h/ae.c看成是抽象基類,而ae_epoll.cae_select.cae_evport.cae_kqueue.c看成是ea的具體實現,以面向對象來看,大致如下圖所示:

 

 

從上圖可以看出,當沒有任何數據時,進程將阻塞在函數aeApiPoll(對於epoll實際為epoll_wait)處直接超時。

如果有連接請求進來,或者有連接發送數據過來,或者有響應數據還未發送完成(連接變成可寫),aeApiPoll均會立即從阻塞狀態返回。

注意,只有fd被塞進了epoll,並沒有將clientaeFileEvent塞入epoll。因此當一個連接被激活(比如有數據需要接收)時,需要通過fd來查找到aeFileEvent,而client因為在創建aeFileEvent時就被賦值給了aeFileEventclientData,因此只需要找到aeFileEvent即可。

全局對象server(類型為redisServer,定義在server.h中)維護了一個全局的aeEventLoop在,則aeEventLoop維護了一個aeFileEvent數組,並且aeFileEvent的數組下標為fd,因此很容易通過fd找到對應的aeFileEvent

之所以沒有將aeFileEvent直接注入到epoll,是為了統一事件驅動,比如select就不支持。在進程啟動執行initServer時,會調用aeCreateEventLoop初始化該數組,數組大小大於配置項maxclients指定的值(額外加128),這利用了fd作為操作系統內核資源是循環利用的特性。

1. 接受連接請求流程

接受一個連接后,為該連接創建一個client對象,並將該client注冊到epoll中,注冊事件為EPLLIN(對應於eaAE_READABLE),。

 

 

對應的偽代碼:

int main()

{

  // “ae”為“A simple event-driven”的縮寫

  void aeMain()

  {

    while (!eventLoop->stop)

    {

      // 響應從beforesleep開始,

      // 未完成部分才會走到aeApiPoll。

      if (eventLoop->beforesleep != NULL)

        eventLoop->beforesleep(eventLoop);

            

      // aeProcessEvents處理各種事件,包括:

      // 1) 接受連接請求,為每個連接創建一個client

      // 2) 接收請求數據,和處理請求

      // 3) 發送響應數據

      // 4) 處理各類定時事件(調用processTimeEvents)

      int aeProcessEvents()

      {

        // aeApiPoll實為epoll或select等

        aeApiPoll();

      

        acceptTcpHandler(int fd)

        {

          // fd為listen套接字

          // anetTcpAccept底層調用的是accept

          int cfd = anetTcpAccept(fd);

          

          acceptCommonHandler(cfd)

          {

            // createClient會將c添加server.clients中,

            // server.clients是一個鏈接。

            client *c = createClient(cfd)

            {

              aeCreateFileEvent(

                server.el,fd,AE_READABLE,

                readQueryFromClient, c)

              {

                // mask值為AE_READABLE(對應於epoll的EPOLLIN),

                // 對於epoll實際調用的是epoll_ctl。                

                aeApiAddEvent(eventLoop,fd,mask);

              }

            }

          }

        }

      }

    }

  }

}

2. 接收請求數據和處理請求流程

這一塊會調用相應命令的處理函數,比如SET命令的處理函數setCommandGET命令的處理函數getCommand。命令處理函數會修改內存數據。

並將處理的結果寫入響應緩沖區,但並不立即發送給client。同時也會將處理結果寫入AOF緩沖區,如果開啟了AOF。以及將命令寫入到復制積壓緩沖區,如果有開啟或有需要。還會將命令寫入到slaves的緩沖區,如果需要。

響應請求在另一獨立的流程中進行,本流程並不直接發送響應給client

 

 

對應的偽代碼:

// 不包括響應命令,

// 響應和接收處理是分開的兩個過程。

int main() // server.c:4003

{

  // “ae”為“A simple event-driven”的縮寫

  void aeMain() // ae.c:496

  {

    while (!eventLoop->stop)

    {

      // 發送響應先在beforesleep中進行,

      // 如果在beforesleep中沒有發送完(比如響應的數據量過大),

      // 則后續的發送會由aeApiPoll觸發。

      if (eventLoop->beforesleep != NULL)

        eventLoop->beforesleep(eventLoop);

 

      int aeProcessEvents() // ae.c:358

      {

        // aeApiPoll實為epoll或select等

        aeApiPoll();

 

        // readQueryFromClient是個回調函數,

        // 在創建client時注冊:

        // client *createClient(int fd) {

        //   aeCreateFileEvent(

        //       server.el, fd,

        //       AE_READABLE,

        //       readQueryFromClient, c);

        // }

        

        void readQueryFromClient() // networking.c:1494

        {

          // 這里調用read收數據

          // client傳過來的數據大小不能超過配置項client-query-buffer-limit指定的值。

          // 默認大小為1G,足夠覆蓋大部場景。

          // 如果超過大小,則可看到WARNING日志:

          // Closing client that reached max query buffer length

          // 實際中,一般遠小於1G,所以可能將這個值調小一點,以增加對REdis的保護。

          int nread = read(fd, c->querybuf, readlen);

          int processCommand(client*) // networking.c:2543

          {

            redisCommand* lookupCommand(name)

            {

              // REdis所有命令存儲

              // 在struct redisServer的command表中:

              // struct redisServer {

              //    dict *commands; // Command table

              // };

              // 可將redisCommand看作一個C++抽象基本,

              // 該抽象基本定義了純虛函數proc:

              // typedef void redisCommandProc(client *c);

              // struct redisCommand {

              //   redisCommandProc *proc;

              // };

              // 而command表中的每一個成員則為redisCommand的實現。

              return dictFetchValue(commands,name);

            }

 

            void call(client*,flags) // server.c:2414

            {

              // 回調具體的命令處理:

              // 如果是SET命令,

              // 實際調用的是t_string.c中的函數setCommand;

              // 如果是DEL命令,

              // 實際調用的是db.c中的函數delCommand。

              redisCommand::proc(client*);

 

              void propagate(redisCommand*) // server.c:2315

              {

                // 數據寫入到AOF文件

                feedAppendOnlyFile(); // aof.c:555

 

                // 數據復制給所有Slaves

                void replicationFeedSlaves(slaves) // replication.c:173

                {

                  // 數據寫入到復制積壓(Backlog)緩沖區,

                  // 注意積壓緩沖區是一個循環緩沖區,

                  // 如果滿了,則從頭覆蓋寫,

                  // 循環緩沖區的大小,

                  // 則配置項repl-backlog-size決定

                  feedReplicationBacklog(); // replication.c:126

                }

              }

            }

          }

        }

      }

    }

  }

}

 

// 以GET命令為列:

// 這里的list實際為server.clients_pending_write

// 所以需響應的client都添加到server.clients_pending_write鏈表中(可視為隊列)

// struct redisServer server; // Server global state

#0  listAddNodeHead (list=0x7fe88bc0f210, value=0x7fe88bc64ec0) at adlist.c:92

// 並不是所有的命令都需要WriteHandler,

// 因此有些並不會調用clientInstallWriteHandler。

#1  in clientInstallWriteHandler (c=0x7fe88bc64ec0) at networking.c:185

#2  in prepareClientToWrite (c=0x7fe88bc64ec0) at networking.c:228

#3  in addReplyString (c=0x7fe88bc64ec0, s=0x7ffdfc2e70c0 "$855\r\n", len=6) at networking.c:338

#4  in addReplyLongLongWithPrefix (c=0x7fe88bc64ec0, ll=855, prefix=36 '$') at networking.c:515

#5  in addReplyBulkLen (c=0x7fe88bc64ec0, obj=0x7fe889312840) at networking.c:557

#6  in addReplyBulk (c=0x7fe88bc64ec0, obj=0x7fe889312840) at networking.c:562

#7  in getGenericCommand (c=0x7fe88bc64ec0) at t_string.c:167

#8  in getCommand (c=0x7fe88bc64ec0) at t_string.c:173

#9  in call (c=0x7fe88bc64ec0, flags=15) at server.c:2437

#10 in processCommand (c=0x7fe88bc64ec0) at server.c:2729

#11 in processInputBuffer (c=0x7fe88bc64ec0) at networking.c:1451

#12 in processInputBufferAndReplicate (c=0x7fe88bc64ec0) at networking.c:1486

#13 in readQueryFromClient (el=0x7fe88bc30050, fd=8, privdata=0x7fe88bc64ec0, mask=1) at networking.c:1568

#14 in aeProcessEvents (eventLoop=0x7fe88bc30050, flags=11) at ae.c:443

#15 in aeMain (eventLoop=0x7fe88bc30050) at ae.c:501

#16 in main (argc=2, argv=0x7ffdfc2e75b8) at server.c:4197

3. 響應請求流程

對於每一個有響應的命令,它的響應總是首先在beforesleep中進行,但如果一次沒能發送完成,則會交給sendReplyToClient后續異步處理(以epoll為例,通過注冊epollEPOLLOUT事件)。

 

 

對應的偽代碼:

// 響應和接收處理是分開的兩個過程。

int main()

{

  // “ae”為“A simple event-driven”的縮寫

  void aeMain()

  {

    while (!eventLoop->stop)

    {

      // 調用eventLoop->beforesleep(eventLoop);

      // 但實際調用的是server.c中的beforeSleep:

      void beforeSleep(struct aeEventLoop*)

      {

        int handleClientsWithPendingWrites()

        {

          // REdis接收和處理

          // 命令流程會設置clients_pending_write,

          // clients_pending_write實為一個隊列鏈接。

          // 當處理完一個命令后,調用clientInstallWriteHandler

          // 將當前client添加到clients_pending_write中。

          // 但是有些命令並不需要響應,因此沒有這個動作。

          listRewind(server.clients_pending_write,&li);

 

          while((ln = listNext(&li)))

          {

            int writeToClient(int fd,client* c)

            {

              write(fd,c->buf);

              // 如果全部發送完了,

              // 則調用aeDeleteFileEvent

              // 將fd從epoll中移除。

              if (!clientHasPendingReplies(c))

              {

                aeDeleteFileEvent(

                  server.el, c->fd, AE_WRITABLE); // 從epoll中刪除EPOLLOUT

              }

            }

            

            // 如果一次writeToClient調用沒有發完,

            // 則將fd注冊到epoll

            if (clientHasPendingReplies(c))

            {

              // 下列動作是設置epoll的EPOLLOUT

              int ae_flags = AE_WRITABLE; // 將EPOLLOUT添加到epoll中

              aeCreateFileEvent(

                server.el, c->fd, ae_flags,

                sendReplyToClient, c);

            }

          }

        }

      }

 

      // REdis接收和處理一個命令流程

      aeProcessEvents();

    }

   }

}

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM