twemproxy源碼分析


  twemproxy是twitter開源的redis/memcached 代理,數據分片提供取模,一致性哈希等手段,維護和后端server的長連接,自動踢除server,恢復server,提供專門的狀態監控端口供外部工具獲取狀態監控信息。代碼寫的比較漂亮,學習了一些Nginx的東西,比如每個請求的處理分為多個階段,IO模型方面,采用單線程收發包,基於epoll事件驅動模型。文檔中提到的Zero Copy技術,通過將消息指針在3個隊列之間流轉實現比較巧妙。本文主要分析twemproxy的核心部分,即一個請求的從接收到最后發送響應給客戶端的流程。

一、大體流程

   twemproxy后端支持多個server pool,為每個server pool分配一個監聽端口用於接收客戶端的連接。客戶端和proxy建立連接記作client_conn,發起請求,proxy讀取數據,放入req_msg中,設置msg的owner為client_conn,proxy根據策略從server pool中選取一個server並且建立連接記作server_conn,然后轉發req_forward,將req_msg指針放入client_conn的output隊列中,同時放入server_conn的input隊列,然后觸發server_conn的寫事件,server_conn的寫回調函數會從input隊列中取出req_msg發送給對應的后端server,發送完成后將req_msg放入server_conn的output隊列,當req_msg的響應rsp_msg回來后,調用rsp_filter(用於判斷消息是否為空,是否消息可以不用回復等)和rsp_forward,將req_msg從server_conn的output隊列中取出,建立req_msg和rsp_msg的對應關系(通過msg的peer字段),通過req_msg的owner找到client_conn,然后啟動client_conn的寫事件,client_conn的寫回調函數從client_conn的output隊列中取出req_msg,然后通過peer字段拿到對應的rsp_msg,將其發出去。至此,一次請求從被proxy接收到最后響應給client結束。

   可以看出,整個流程,req_msg內容只有一份,req_msg指針在三個隊列中的順序是:

        1. req_msg => client_conn.outputq

        2. req_msg => server_conn.inputq

        3. server_conn.inputq => req_msg

        4. req_msg => server_conn.outputq

        5. server_conn.outputq => req_msg

        6. client_conn.outputq => req_msg

   總體來說,proxy既需要接收客戶端的連接,也需要維護和后端server的長連接,根據從客戶端收到的req根據特定策略選擇后端一台server進行轉發,同一個客戶端的連接上的不同的請求可能會轉發到后端不同的server。

   

  

二、基礎數據結構:

  后端的每個redis server對應一個server結構:

struct server {
      uint32_t           idx;           /* server index */
      struct server_pool *owner;        /* owner pool */      // 每個server屬於一個server pool                                                                                                                                  
      struct string      pname;         /* name:port:weight (ref in conf_server) */
      struct string      name;          /* name (ref in conf_server) */
      uint16_t           port;          /* port */      
      uint32_t           weight;        /* weight */    
      int                family;        /* socket family */
      socklen_t          addrlen;       /* socket length */
      struct sockaddr    *addr;         /* socket address (ref in conf_server) */                                                                                                                            
  
      uint32_t           ns_conn_q;     /* # server connection */  // 下面隊列中conn的個數
      struct conn_tqh    s_conn_q;      /* server connection q */ // proxy和這個redis server之間維護的連接隊列  
  
      int64_t            next_retry;    /* next retry time in usec */ // proxy有踢除后端server機制,當proxy給某台server轉發請求出錯次數達到server_failure_limit次,則next_retry微妙內不會請求該server。可配。
      uint32_t           failure_count; /* # consecutive failures */
  };  

  twemproxy中使用一堆宏來定義隊列等數據結構,如上面struct conn_tqh,nc_connection.h中有定義TAILQ_HEAD(conn_tqh, conn),TAILQ_HEAD宏定義如下:

 /*   
   * Tail queue declarations. 
   */  
  #define TAILQ_HEAD(name, type)                                          \                                                                                                                                  
  struct name {                                                           \
      struct type *tqh_first; /* first element */                         \
      struct type **tqh_last; /* addr of last next element */             \
      TRACEBUF                                                            \
  }

可以看出結構體是通過宏來定義的,非常惡心,看代碼ctags 找不到。conn_tqh是一個隊列頭部結構體,嵌入到一個server中,鏈表中每個元素是一個conn結構體,內嵌入一個TAILQ_ENTRY,用於將conn串入server的隊列中。宏定義如下:

 #define TAILQ_ENTRY(type)                                               \
  struct {                                                                \
      struct type *tqe_next;  /* next element */                          \
      struct type **tqe_prev; /* address of previous next element */      \
      TRACEBUF                                                            \
  } 

各個field的關系如下圖所示:

 

看一個很重要的結構conn,可以表示client和proxy之間的connection,也可以表示proxy和redis server之間的connection:

struct conn {
      TAILQ_ENTRY(conn)  conn_tqe;      /* link in server_pool / server / free q */ // 隊列entry字段,用於和其他的conn串起來
      void               *owner;        /* connection owner - server_pool / server */ // 每個連接屬於一個server
  
      int                sd;            /* socket descriptor */
      int                family;        /* socket address family */
      socklen_t          addrlen;       /* socket length */
      struct sockaddr    *addr;         /* socket address (ref in server or server_pool) */
  
      struct msg_tqh     imsg_q;        /* incoming request Q */  //從名字看出,和conn_tqh類似,這里也是一個消息隊列,從連接讀入的數據會組織成msg,push到這個消息隊列,msg由mbuf隊列組成用來存儲具體的數據。
      struct msg_tqh     omsg_q;        /* outstanding request Q */ // 需要往這個連接中寫的msg push到這個消息隊列
      struct msg         *rmsg;         /* current message being rcvd */ //從連接上讀到的數據往rmsg指向的msg里面填
      struct msg         *smsg;         /* current message being sent */ //當前正在寫的msg指針
  
      conn_recv_t        recv;          /* recv (read) handler */ //讀事件觸發時回調
      conn_recv_next_t   recv_next;     /* recv next message handler */ //實際讀數據之前,調這個函數來得到當前正在使用的msg
      conn_recv_done_t   recv_done;     /* read done handler */ // 每次接收到一個完整的消息后,回調
      conn_send_t        send;          /* send (write) handler */ //寫事件觸發時回調
      conn_send_next_t   send_next;     /* write next message handler */ 實際寫數據之前,定位當前要寫的msg
      conn_send_done_t   send_done;     /* write done handler */ //發送完一個msg則回調一次
      conn_close_t       close;         /* close handler */ //
      conn_active_t      active;        /* active? handler */
  
      conn_ref_t         ref;           /* connection reference handler */ // 得到一個連接后,將連接加入相應的隊列
      conn_unref_t       unref;         /* connection unreference handler */
  
      conn_msgq_t        enqueue_inq;   /* connection inq msg enqueue handler */  //這四個隊列用於存放msg的指針,和Zero Copy密切相關,后續詳述
      conn_msgq_t        dequeue_inq;   /* connection inq msg dequeue handler */
      conn_msgq_t        enqueue_outq;  /* connection outq msg enqueue handler */
      conn_msgq_t        dequeue_outq;  /* connection outq msg dequeue handler */
  
      size_t             recv_bytes;    /* received (read) bytes */ //該連接上讀了多少數據
      size_t             send_bytes;    /* sent (written) bytes */  
  
      uint32_t           events;        /* connection io events */
      err_t              err;           /* connection errno */
      unsigned           recv_active:1; /* recv active? */                                                                                                                                                   
      unsigned           recv_ready:1;  /* recv ready? */
      unsigned           send_active:1; /* send active? */
      unsigned           send_ready:1;  /* send ready? */
  
      unsigned           client:1;      /* client? or server? */ //連接屬於proxy和client之間時,client為1,連接屬於proxy和后端server之間時,client為0
      unsigned           proxy:1;       /* proxy? */ // listen fd封裝在conn中時,proxy置1,響應的recv回調函數accept連接
      unsigned           connecting:1;  /* connecting? */
      unsigned           connected:1;   /* connected? */
      unsigned           eof:1;         /* eof? aka passive close? */
      unsigned           done:1;        /* done? aka close? */
      unsigned           redis:1;       /* redis? */ //后端server是redis還是memcached
  };

 二、

不管是proxy accept了Client的連接從而分配一個conn結構,還是proxy主動和后端server建立連接從而分配一個conn結構,都調用conn_get()函數,如下:

struct conn *conn_get(void *owner, bool client, bool redis)                                                                                                                                                             
{
      struct conn *conn;
  
      conn = _conn_get();
      if (conn == NULL) {
          return NULL;
      }
  
      /* connection either handles redis or memcache messages */
      conn->redis = redis ? 1 : 0;
  
      conn->client = client ? 1 : 0;
  
      if (conn->client) {
          /*
           * client receives a request, possibly parsing it, and sends a
           * response downstream.
           */
          conn->recv = msg_recv;  // 從conn讀數據
          conn->recv_next = req_recv_next; // 在真正從conn讀數據之前,需要分配一個req_msg,用於承載讀進來的數據
          conn->recv_done = req_recv_done; //每次讀完一個完整的消req_msg被調用
  
          conn->send = msg_send; // 將從server收到的響應rsp_msg發給客戶端
          conn->send_next = rsp_send_next; // 每次發送rsp_msg之前需要首先確定從哪個開始發
          conn->send_done = rsp_send_done; // 每次發送完成一個rsp_msg給客戶端,調一次
  
          conn->close = client_close; //用於proxy斷開和client的半連接
          conn->active = client_active;
  
          conn->ref = client_ref; //獲取conn后將conn丟進客戶端連接隊列
          conn->unref = client_unref;
  
          conn->enqueue_inq = NULL;
          conn->dequeue_inq = NULL;
          conn->enqueue_outq = req_client_enqueue_omsgq; // proxy每次接收到一個client發過來的req_msg,將req_msg入conn的output 隊列
          conn->dequeue_outq = req_client_dequeue_omsgq; // 給客戶端發送完rsp_msg后將其對應的req_msg從conn的output隊列中刪除
      } else {
          /*
           * server receives a response, possibly parsing it, and sends a
           * request upstream.
           */
          conn->recv = msg_recv; 
          conn->recv_next = rsp_recv_next; //從后端server接收數據之前需要先得到一個rsp_msg,用於承載讀到的數據
          conn->recv_done = rsp_recv_done; // 每次讀完一個完整的rsp_msg,則回調
          conn->send = msg_send; // 將req_msg往后端server發
          conn->send_next = req_send_next; // 確定發哪個req_msg
          conn->send_done = req_send_done; // 每轉發完一個即回調
  
          conn->close = server_close;
          conn->active = server_active;
  
          conn->ref = server_ref;
          conn->unref = server_unref;
  
          conn->enqueue_inq = req_server_enqueue_imsgq; // proxy將需要轉發的req_msg放入對應后端server連接的input隊列
          conn->dequeue_inq = req_server_dequeue_imsgq; //proxy從input隊列中取出req_msg發送給后端server完成后,需要將req_msg從這個后端連接的input隊列中刪除
          conn->enqueue_outq = req_server_enqueue_omsgq; // 繼上一步,需要將req_msg放入到后端連接的output隊列
          conn->dequeue_outq = req_server_dequeue_omsgq; // 收到后端server的rsp_msg后,將rsp_msg對應的req_msg從連接的output隊列刪除
      }
  
      conn->ref(conn, owner);
  
      log_debug(LOG_VVERB, "get conn %p client %d", conn, conn->client);
  
      return conn;
  }

 如果該連接是proxy和后端server建立的,則client為false,否則為true,如果后端server是redis,則redis為true,如果為memcached,則為false,連接上的多個讀寫回調函數根據傳入的標記不同而不同。

三、 請求具體處理流程

 從前面可以看出,當proxy和Client之間有數據可讀時,會調用msg_recv(),如下:

rstatus_t
  msg_recv(struct context *ctx, struct conn *conn)                                                                                                                                                           
  {    
      rstatus_t status;       
      struct msg *msg;        
  
      ASSERT(conn->recv_active);
  
      conn->recv_ready = 1;   
      do {
          msg = conn->recv_next(ctx, conn, true); //req_recv_next()
          if (msg == NULL) {  
              return NC_OK;
          }
  
          status = msg_recv_chain(ctx, conn, msg);
          if (status != NC_OK) {
              return status;
          }
      } while (conn->recv_ready);    
  
      return NC_OK;
  }

代碼很短,其實就是反復的做兩件事:req_recv_next和msg_recv_chain

req_recv_next獲取當前用於接收數據的msg,設置到conn->rmsg中,並且返回rmsg,然后傳給msg_recv_chain,每次重新接收一個完整的請求時,rmsg為空,如果某次讀取只讀取了請求的一部分,則rmsg不為空,下次讀取時數據繼續追加到上一次的msg中。

重點看msg_recv_chain()做的事情:

1. 從conn->rmsg中拿出最后一個mbuf(一個msg的數據實際上存在mbuf中,一個msg可以包含多個mbuf,同樣通過隊列組織),下一次讀最多讀最后一個mbuf的剩余空間大小,如果最后一個mbuf滿了,分配一個新的,插入到rmsg的mbuf隊列尾部

2. 循環從conn讀數據,如果讀取到的數據小於參入的buf大小,設置conn->recv_ready為0,表示后續沒有數據要讀了,外部的while(conn->recv_ready)循環退出。同樣,如果讀操作返回0表示客戶端主動斷開連接,將conn的eof標記置位,同時recv_ready也清0

3. 調用msg_parse()解析rmsg數據,如果成功解析到一條完整的命令,則繼續調用msg_parsed(msg),由於msg由多個mbuf組成,並且TCP是流式協議,所以一次讀可能接收到了多條完整的命令,甚至是部分命令。這時,msg_parsed(msg)會將后面這些多余的數據拷貝到一個新的mbuf中,並且產生一個新的msg,作為conn的rmsg。由於一次讀可能會讀到多條命令,這就是為什么msg_recv_chain()中有下面這個循環:

for (;;) {
          status = msg_parse(ctx, conn, msg); //每次解析一條命令                                                                                                                                                     
          if (status != NC_OK) {
               return status;
           }
  
          /* get next message to parse */
          nmsg = conn->recv_next(ctx, conn, false);
          if (nmsg == NULL || nmsg == msg) {
             /* no more data to parse */
             break;
          }
          msg = nmsg;
      }

 4. 同時,每次調msg_parse(ctx,conn,msg)解析出一條新的命令后,都會回調req_recv_done()方法。這個方法對請求進行過濾(req_filter)和轉發(req_forward)給后端server。

 5. msg_recv_chain()完成。

回到msg_recv,根據conn->recv_ready來判斷是否連接中還有未讀數據,有則繼續讀,parse。。

 

下面看請求轉發給后端函數 req_forward():

 1. 將解析出來的msg(以后將從客戶端發過來的msg叫做req_msg)push進conn的output隊列

 2. 根據key和策略從server pool中選擇一個server,並且獲得和server的連接,記作server_conn,將req_msg push到server_conn的輸入隊列,起server_conn的寫事件

至此,req_msg從client接收到解析到轉發結束。下面看發消息給后端server,從后端server接收響應,將響應回復給客戶端的過程

 

起server_conn寫事件后,回調函數msg_send(struct context *ctx, struct conn *conn):

  rstatus_t
  msg_send(struct context *ctx, struct conn *conn)                                                                                                                                                           
  {    
      rstatus_t status;       
      struct msg *msg;        
  
      ASSERT(conn->send_active);
  
      conn->send_ready = 1;   
      do {
          msg = conn->send_next(ctx, conn); 
          if (msg == NULL) {  
              /* nothing to send */          
              return NC_OK;   
          }
  
          status = msg_send_chain(ctx, conn, msg);
          if (status != NC_OK) {
              return status;
          }
  
      } while (conn->send_ready);

可以看出和接收函數msg_recv類似。req_send_next()從server_conn的input隊列中拿出一個待發消息賦值給conn->smsg。然后調用msg_send_chain()對消息進行實際的發送。

msg_send_chain()流程如下:

1. 准備NC_IOV_MAX個iov,遍歷smsg的mbuf隊列,每個mbuf的數據用一個iov指向。

2. 循環調用req_send_next()不斷的取出待發送的smsg,將其加入到一個局部消息隊列send_msgq,同時遍歷smsg的mbuf隊列,每個mbuf用一個iov指向,直到NC_IOV_MAX個iov被裝滿,或者沒有消息需要發送了,記錄下裝進去的消息總大小,記作nsend。

3. 循環往fd上寫,返回成功寫的大小nsent,遍歷send_msgq中的msg,和msg中的mbuf隊列,將已發送成功的mbuf置為空,並且將發送了部分msg的pos指向第一個未發送的字節。並且,如果一個msg的mbuf隊列中的所有的mbuf都發送完成了,則將調用req_send_done(),將這個msg(指針)從這個連接的input隊列中刪除,並且放入到連接的output隊列中。從這里可以看出,只有一個msg的所有的mbuf都被發送出去了才會從input隊列中刪除,如果只發送了部分mbuf,這些mbuf會被標記為空,下次繼續發送這個msg時,會略過空的msg,實現一個msg過大或者網絡阻塞導致需要多次發送才能發出一個msg的情況。

4. 至此,發送流程分析完成。

 

Redis接收到消息,處理返回,proxy接收響應,和proxy接收client的數據類似,同樣調用msg_recv()

  rstatus_t
  msg_recv(struct context *ctx, struct conn *conn)                                                                                                                                                           
  {    
      rstatus_t status;       
      struct msg *msg;        
  
      ASSERT(conn->recv_active);
  
      conn->recv_ready = 1;   
      do {
          msg = conn->recv_next(ctx, conn, true); //rsp_recv_next()
          if (msg == NULL) {  
              return NC_OK;   
          }
  
          status = msg_recv_chain(ctx, conn, msg);
          if (status != NC_OK) {
              return status;
          }
      } while (conn->recv_ready);    
  
      return NC_OK;
  }

只是,在這里,conn->recv_next指向的函數是rsp_recv_next(),不是req_recv_next()。同理,接收回消息的處理函數是rsp_recv_done(),不是req_recv_done()

rsp_recv_next():

1. 如果當前conn的rmsg為空,則分配一個新的返回,否則返回當前這個rmsg

2. 將上一步返回的rmsg傳給msg_recv_chain()處理。這個函數之前在proxy接收client請求時分析了,不再贅述。

收到一個完整的響應rsp_msg后,調rsp_recv_done():

1. 同樣和前面類似,在這里調用rsp_filter()和rsp_forward(),而不是req_filter()和req_forward()

重點說rsp_forward():

1. 從連接的output隊列中彈出第一個元素,記作req_msg,這個msg即是目前收到的這個msg(rsp_msg)相對應的請求msg

2. 將req_msg的標記done置為1,表示這個請求已完成。

3. 通過以下兩個語句將這兩個msg建立對應關系:

pmsg->peer = msg; //pmsg為req_msg,msg為rsp_msg
msg->peer = pmsg

4. 從pmsg的owner可以找到所屬的client conn,然后啟client conn的寫事件。

5. 至此,proxy從后端server接收響應分析完成。

 

最后,看一下proxy將響應寫給client的流程。

類似,調用msg_send():

rstatus_t
  msg_send(struct context *ctx, struct conn *conn)
  {
      rstatus_t status;
      struct msg *msg;
  
      ASSERT(conn->send_active);
  
      conn->send_ready = 1;
      do {
          msg = conn->send_next(ctx, conn); // rsp_send_next()
          if (msg == NULL) {
              /* nothing to send */
              return NC_OK;
          }
  
          status = msg_send_chain(ctx, conn, msg);
          if (status != NC_OK) {
              return status;
          }
  
      } while (conn->send_ready);
  
      return NC_OK;
  }               

同理,在這里,conn->send_next函數指針指向rsp_send_next():

1. 從client conn的output隊列中拿出msg,記作req_msg,從req_msg的peer字段將相應的rsp_msg拿出來,放在conn的smsg上待發送

發出完成后,調用rsp_send_done(),主要做的事就是將req_msg從client conn的output隊列中刪除。

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM