twemproxy是twitter開源的redis/memcached 代理,數據分片提供取模,一致性哈希等手段,維護和后端server的長連接,自動踢除server,恢復server,提供專門的狀態監控端口供外部工具獲取狀態監控信息。代碼寫的比較漂亮,學習了一些Nginx的東西,比如每個請求的處理分為多個階段,IO模型方面,采用單線程收發包,基於epoll事件驅動模型。文檔中提到的Zero Copy技術,通過將消息指針在3個隊列之間流轉實現比較巧妙。本文主要分析twemproxy的核心部分,即一個請求的從接收到最后發送響應給客戶端的流程。
一、大體流程
twemproxy后端支持多個server pool,為每個server pool分配一個監聽端口用於接收客戶端的連接。客戶端和proxy建立連接記作client_conn,發起請求,proxy讀取數據,放入req_msg中,設置msg的owner為client_conn,proxy根據策略從server pool中選取一個server並且建立連接記作server_conn,然后轉發req_forward,將req_msg指針放入client_conn的output隊列中,同時放入server_conn的input隊列,然后觸發server_conn的寫事件,server_conn的寫回調函數會從input隊列中取出req_msg發送給對應的后端server,發送完成后將req_msg放入server_conn的output隊列,當req_msg的響應rsp_msg回來后,調用rsp_filter(用於判斷消息是否為空,是否消息可以不用回復等)和rsp_forward,將req_msg從server_conn的output隊列中取出,建立req_msg和rsp_msg的對應關系(通過msg的peer字段),通過req_msg的owner找到client_conn,然后啟動client_conn的寫事件,client_conn的寫回調函數從client_conn的output隊列中取出req_msg,然后通過peer字段拿到對應的rsp_msg,將其發出去。至此,一次請求從被proxy接收到最后響應給client結束。
可以看出,整個流程,req_msg內容只有一份,req_msg指針在三個隊列中的順序是:
1. req_msg => client_conn.outputq
2. req_msg => server_conn.inputq
3. server_conn.inputq => req_msg
4. req_msg => server_conn.outputq
5. server_conn.outputq => req_msg
6. client_conn.outputq => req_msg
總體來說,proxy既需要接收客戶端的連接,也需要維護和后端server的長連接,根據從客戶端收到的req根據特定策略選擇后端一台server進行轉發,同一個客戶端的連接上的不同的請求可能會轉發到后端不同的server。
二、基礎數據結構:
后端的每個redis server對應一個server結構:
struct server { uint32_t idx; /* server index */ struct server_pool *owner; /* owner pool */ // 每個server屬於一個server pool struct string pname; /* name:port:weight (ref in conf_server) */ struct string name; /* name (ref in conf_server) */ uint16_t port; /* port */ uint32_t weight; /* weight */ int family; /* socket family */ socklen_t addrlen; /* socket length */ struct sockaddr *addr; /* socket address (ref in conf_server) */ uint32_t ns_conn_q; /* # server connection */ // 下面隊列中conn的個數 struct conn_tqh s_conn_q; /* server connection q */ // proxy和這個redis server之間維護的連接隊列 int64_t next_retry; /* next retry time in usec */ // proxy有踢除后端server機制,當proxy給某台server轉發請求出錯次數達到server_failure_limit次,則next_retry微妙內不會請求該server。可配。 uint32_t failure_count; /* # consecutive failures */ };
twemproxy中使用一堆宏來定義隊列等數據結構,如上面struct conn_tqh,nc_connection.h中有定義TAILQ_HEAD(conn_tqh, conn),TAILQ_HEAD宏定義如下:
/* * Tail queue declarations. */ #define TAILQ_HEAD(name, type) \ struct name { \ struct type *tqh_first; /* first element */ \ struct type **tqh_last; /* addr of last next element */ \ TRACEBUF \ }
可以看出結構體是通過宏來定義的,非常惡心,看代碼ctags 找不到。conn_tqh是一個隊列頭部結構體,嵌入到一個server中,鏈表中每個元素是一個conn結構體,內嵌入一個TAILQ_ENTRY,用於將conn串入server的隊列中。宏定義如下:
#define TAILQ_ENTRY(type) \ struct { \ struct type *tqe_next; /* next element */ \ struct type **tqe_prev; /* address of previous next element */ \ TRACEBUF \ }
各個field的關系如下圖所示:
看一個很重要的結構conn,可以表示client和proxy之間的connection,也可以表示proxy和redis server之間的connection:
struct conn { TAILQ_ENTRY(conn) conn_tqe; /* link in server_pool / server / free q */ // 隊列entry字段,用於和其他的conn串起來 void *owner; /* connection owner - server_pool / server */ // 每個連接屬於一個server int sd; /* socket descriptor */ int family; /* socket address family */ socklen_t addrlen; /* socket length */ struct sockaddr *addr; /* socket address (ref in server or server_pool) */ struct msg_tqh imsg_q; /* incoming request Q */ //從名字看出,和conn_tqh類似,這里也是一個消息隊列,從連接讀入的數據會組織成msg,push到這個消息隊列,msg由mbuf隊列組成用來存儲具體的數據。 struct msg_tqh omsg_q; /* outstanding request Q */ // 需要往這個連接中寫的msg push到這個消息隊列 struct msg *rmsg; /* current message being rcvd */ //從連接上讀到的數據往rmsg指向的msg里面填 struct msg *smsg; /* current message being sent */ //當前正在寫的msg指針 conn_recv_t recv; /* recv (read) handler */ //讀事件觸發時回調 conn_recv_next_t recv_next; /* recv next message handler */ //實際讀數據之前,調這個函數來得到當前正在使用的msg conn_recv_done_t recv_done; /* read done handler */ // 每次接收到一個完整的消息后,回調 conn_send_t send; /* send (write) handler */ //寫事件觸發時回調 conn_send_next_t send_next; /* write next message handler */ 實際寫數據之前,定位當前要寫的msg conn_send_done_t send_done; /* write done handler */ //發送完一個msg則回調一次 conn_close_t close; /* close handler */ // conn_active_t active; /* active? handler */ conn_ref_t ref; /* connection reference handler */ // 得到一個連接后,將連接加入相應的隊列 conn_unref_t unref; /* connection unreference handler */ conn_msgq_t enqueue_inq; /* connection inq msg enqueue handler */ //這四個隊列用於存放msg的指針,和Zero Copy密切相關,后續詳述 conn_msgq_t dequeue_inq; /* connection inq msg dequeue handler */ conn_msgq_t enqueue_outq; /* connection outq msg enqueue handler */ conn_msgq_t dequeue_outq; /* connection outq msg dequeue handler */ size_t recv_bytes; /* received (read) bytes */ //該連接上讀了多少數據 size_t send_bytes; /* sent (written) bytes */ uint32_t events; /* connection io events */ err_t err; /* connection errno */ unsigned recv_active:1; /* recv active? */ unsigned recv_ready:1; /* recv ready? */ unsigned send_active:1; /* send active? */ unsigned send_ready:1; /* send ready? */ unsigned client:1; /* client? or server? */ //連接屬於proxy和client之間時,client為1,連接屬於proxy和后端server之間時,client為0 unsigned proxy:1; /* proxy? */ // listen fd封裝在conn中時,proxy置1,響應的recv回調函數accept連接 unsigned connecting:1; /* connecting? */ unsigned connected:1; /* connected? */ unsigned eof:1; /* eof? aka passive close? */ unsigned done:1; /* done? aka close? */ unsigned redis:1; /* redis? */ //后端server是redis還是memcached };
二、
不管是proxy accept了Client的連接從而分配一個conn結構,還是proxy主動和后端server建立連接從而分配一個conn結構,都調用conn_get()函數,如下:
struct conn *conn_get(void *owner, bool client, bool redis) { struct conn *conn; conn = _conn_get(); if (conn == NULL) { return NULL; } /* connection either handles redis or memcache messages */ conn->redis = redis ? 1 : 0; conn->client = client ? 1 : 0; if (conn->client) { /* * client receives a request, possibly parsing it, and sends a * response downstream. */ conn->recv = msg_recv; // 從conn讀數據 conn->recv_next = req_recv_next; // 在真正從conn讀數據之前,需要分配一個req_msg,用於承載讀進來的數據 conn->recv_done = req_recv_done; //每次讀完一個完整的消req_msg被調用 conn->send = msg_send; // 將從server收到的響應rsp_msg發給客戶端 conn->send_next = rsp_send_next; // 每次發送rsp_msg之前需要首先確定從哪個開始發 conn->send_done = rsp_send_done; // 每次發送完成一個rsp_msg給客戶端,調一次 conn->close = client_close; //用於proxy斷開和client的半連接 conn->active = client_active; conn->ref = client_ref; //獲取conn后將conn丟進客戶端連接隊列 conn->unref = client_unref; conn->enqueue_inq = NULL; conn->dequeue_inq = NULL; conn->enqueue_outq = req_client_enqueue_omsgq; // proxy每次接收到一個client發過來的req_msg,將req_msg入conn的output 隊列 conn->dequeue_outq = req_client_dequeue_omsgq; // 給客戶端發送完rsp_msg后將其對應的req_msg從conn的output隊列中刪除 } else { /* * server receives a response, possibly parsing it, and sends a * request upstream. */ conn->recv = msg_recv; conn->recv_next = rsp_recv_next; //從后端server接收數據之前需要先得到一個rsp_msg,用於承載讀到的數據 conn->recv_done = rsp_recv_done; // 每次讀完一個完整的rsp_msg,則回調 conn->send = msg_send; // 將req_msg往后端server發 conn->send_next = req_send_next; // 確定發哪個req_msg conn->send_done = req_send_done; // 每轉發完一個即回調 conn->close = server_close; conn->active = server_active; conn->ref = server_ref; conn->unref = server_unref; conn->enqueue_inq = req_server_enqueue_imsgq; // proxy將需要轉發的req_msg放入對應后端server連接的input隊列 conn->dequeue_inq = req_server_dequeue_imsgq; //proxy從input隊列中取出req_msg發送給后端server完成后,需要將req_msg從這個后端連接的input隊列中刪除 conn->enqueue_outq = req_server_enqueue_omsgq; // 繼上一步,需要將req_msg放入到后端連接的output隊列 conn->dequeue_outq = req_server_dequeue_omsgq; // 收到后端server的rsp_msg后,將rsp_msg對應的req_msg從連接的output隊列刪除 } conn->ref(conn, owner); log_debug(LOG_VVERB, "get conn %p client %d", conn, conn->client); return conn; }
如果該連接是proxy和后端server建立的,則client為false,否則為true,如果后端server是redis,則redis為true,如果為memcached,則為false,連接上的多個讀寫回調函數根據傳入的標記不同而不同。
三、 請求具體處理流程
從前面可以看出,當proxy和Client之間有數據可讀時,會調用msg_recv(),如下:
rstatus_t msg_recv(struct context *ctx, struct conn *conn) { rstatus_t status; struct msg *msg; ASSERT(conn->recv_active); conn->recv_ready = 1; do { msg = conn->recv_next(ctx, conn, true); //req_recv_next() if (msg == NULL) { return NC_OK; } status = msg_recv_chain(ctx, conn, msg); if (status != NC_OK) { return status; } } while (conn->recv_ready); return NC_OK; }
代碼很短,其實就是反復的做兩件事:req_recv_next和msg_recv_chain
req_recv_next獲取當前用於接收數據的msg,設置到conn->rmsg中,並且返回rmsg,然后傳給msg_recv_chain,每次重新接收一個完整的請求時,rmsg為空,如果某次讀取只讀取了請求的一部分,則rmsg不為空,下次讀取時數據繼續追加到上一次的msg中。
重點看msg_recv_chain()做的事情:
1. 從conn->rmsg中拿出最后一個mbuf(一個msg的數據實際上存在mbuf中,一個msg可以包含多個mbuf,同樣通過隊列組織),下一次讀最多讀最后一個mbuf的剩余空間大小,如果最后一個mbuf滿了,分配一個新的,插入到rmsg的mbuf隊列尾部
2. 循環從conn讀數據,如果讀取到的數據小於參入的buf大小,設置conn->recv_ready為0,表示后續沒有數據要讀了,外部的while(conn->recv_ready)循環退出。同樣,如果讀操作返回0表示客戶端主動斷開連接,將conn的eof標記置位,同時recv_ready也清0
3. 調用msg_parse()解析rmsg數據,如果成功解析到一條完整的命令,則繼續調用msg_parsed(msg),由於msg由多個mbuf組成,並且TCP是流式協議,所以一次讀可能接收到了多條完整的命令,甚至是部分命令。這時,msg_parsed(msg)會將后面這些多余的數據拷貝到一個新的mbuf中,並且產生一個新的msg,作為conn的rmsg。由於一次讀可能會讀到多條命令,這就是為什么msg_recv_chain()中有下面這個循環:
for (;;) { status = msg_parse(ctx, conn, msg); //每次解析一條命令 if (status != NC_OK) { return status; } /* get next message to parse */ nmsg = conn->recv_next(ctx, conn, false); if (nmsg == NULL || nmsg == msg) { /* no more data to parse */ break; } msg = nmsg; }
4. 同時,每次調msg_parse(ctx,conn,msg)解析出一條新的命令后,都會回調req_recv_done()方法。這個方法對請求進行過濾(req_filter)和轉發(req_forward)給后端server。
5. msg_recv_chain()完成。
回到msg_recv,根據conn->recv_ready來判斷是否連接中還有未讀數據,有則繼續讀,parse。。
下面看請求轉發給后端函數 req_forward():
1. 將解析出來的msg(以后將從客戶端發過來的msg叫做req_msg)push進conn的output隊列
2. 根據key和策略從server pool中選擇一個server,並且獲得和server的連接,記作server_conn,將req_msg push到server_conn的輸入隊列,起server_conn的寫事件
至此,req_msg從client接收到解析到轉發結束。下面看發消息給后端server,從后端server接收響應,將響應回復給客戶端的過程
起server_conn寫事件后,回調函數msg_send(struct context *ctx, struct conn *conn):
rstatus_t msg_send(struct context *ctx, struct conn *conn) { rstatus_t status; struct msg *msg; ASSERT(conn->send_active); conn->send_ready = 1; do { msg = conn->send_next(ctx, conn); if (msg == NULL) { /* nothing to send */ return NC_OK; } status = msg_send_chain(ctx, conn, msg); if (status != NC_OK) { return status; } } while (conn->send_ready);
可以看出和接收函數msg_recv類似。req_send_next()從server_conn的input隊列中拿出一個待發消息賦值給conn->smsg。然后調用msg_send_chain()對消息進行實際的發送。
msg_send_chain()流程如下:
1. 准備NC_IOV_MAX個iov,遍歷smsg的mbuf隊列,每個mbuf的數據用一個iov指向。
2. 循環調用req_send_next()不斷的取出待發送的smsg,將其加入到一個局部消息隊列send_msgq,同時遍歷smsg的mbuf隊列,每個mbuf用一個iov指向,直到NC_IOV_MAX個iov被裝滿,或者沒有消息需要發送了,記錄下裝進去的消息總大小,記作nsend。
3. 循環往fd上寫,返回成功寫的大小nsent,遍歷send_msgq中的msg,和msg中的mbuf隊列,將已發送成功的mbuf置為空,並且將發送了部分msg的pos指向第一個未發送的字節。並且,如果一個msg的mbuf隊列中的所有的mbuf都發送完成了,則將調用req_send_done(),將這個msg(指針)從這個連接的input隊列中刪除,並且放入到連接的output隊列中。從這里可以看出,只有一個msg的所有的mbuf都被發送出去了才會從input隊列中刪除,如果只發送了部分mbuf,這些mbuf會被標記為空,下次繼續發送這個msg時,會略過空的msg,實現一個msg過大或者網絡阻塞導致需要多次發送才能發出一個msg的情況。
4. 至此,發送流程分析完成。
Redis接收到消息,處理返回,proxy接收響應,和proxy接收client的數據類似,同樣調用msg_recv()
rstatus_t msg_recv(struct context *ctx, struct conn *conn) { rstatus_t status; struct msg *msg; ASSERT(conn->recv_active); conn->recv_ready = 1; do { msg = conn->recv_next(ctx, conn, true); //rsp_recv_next() if (msg == NULL) { return NC_OK; } status = msg_recv_chain(ctx, conn, msg); if (status != NC_OK) { return status; } } while (conn->recv_ready); return NC_OK; }
只是,在這里,conn->recv_next指向的函數是rsp_recv_next(),不是req_recv_next()。同理,接收回消息的處理函數是rsp_recv_done(),不是req_recv_done()
rsp_recv_next():
1. 如果當前conn的rmsg為空,則分配一個新的返回,否則返回當前這個rmsg
2. 將上一步返回的rmsg傳給msg_recv_chain()處理。這個函數之前在proxy接收client請求時分析了,不再贅述。
收到一個完整的響應rsp_msg后,調rsp_recv_done():
1. 同樣和前面類似,在這里調用rsp_filter()和rsp_forward(),而不是req_filter()和req_forward()
重點說rsp_forward():
1. 從連接的output隊列中彈出第一個元素,記作req_msg,這個msg即是目前收到的這個msg(rsp_msg)相對應的請求msg
2. 將req_msg的標記done置為1,表示這個請求已完成。
3. 通過以下兩個語句將這兩個msg建立對應關系:
pmsg->peer = msg; //pmsg為req_msg,msg為rsp_msg
msg->peer = pmsg
4. 從pmsg的owner可以找到所屬的client conn,然后啟client conn的寫事件。
5. 至此,proxy從后端server接收響應分析完成。
最后,看一下proxy將響應寫給client的流程。
類似,調用msg_send():
rstatus_t msg_send(struct context *ctx, struct conn *conn) { rstatus_t status; struct msg *msg; ASSERT(conn->send_active); conn->send_ready = 1; do { msg = conn->send_next(ctx, conn); // rsp_send_next() if (msg == NULL) { /* nothing to send */ return NC_OK; } status = msg_send_chain(ctx, conn, msg); if (status != NC_OK) { return status; } } while (conn->send_ready); return NC_OK; }
同理,在這里,conn->send_next函數指針指向rsp_send_next():
1. 從client conn的output隊列中拿出msg,記作req_msg,從req_msg的peer字段將相應的rsp_msg拿出來,放在conn的smsg上待發送
發出完成后,調用rsp_send_done(),主要做的事就是將req_msg從client conn的output隊列中刪除。