libeasy是個網絡框架,這個網絡框架基於事件驅動模型,libeasy可以有多個網絡I/O線程,每個網絡I/O線程一個event loop,事件驅動模型基於開源的libev實現。
我認為,libeasy不同於其它的網絡框架比如tbnet,muduo。tbnet,muduo等網絡框架的目的就是向應用層暴露出簡單的發包和收包的接口,讓應用層從底層發包和收包的處理細節中解放出來,使得應用層能更加專注於業務邏輯的實現,為了做到這些,網絡框架幫助應用程序管理連接,管理輸入輸出緩沖區,處理具體的發包收包等細節和錯誤的處理,處理流控,並且允許應用層注入封包,解包,新建連接時處理,斷開連接時處理,收到包后處理包的邏輯等。libeasy與這些網絡框架稍不同,從高層次看,libeasy中的線程分為業務邏輯線程和網絡I/O線程,無論哪種線程,線程都有唯一的一個event loop,工作的時候,I/O線程和網絡I/O線程有對應關系,他們之間通過pipe來實現線程的喚醒,原理就是業務邏輯線程的event loop監聽pipe的讀端,當I/O線程A接收到包的時候,往pipe的寫端寫入數據,業務邏輯線程event loop返回,從任務鏈表中取任務執行。可以說,libeasy已經超出了網絡框架的范疇,但是libeasy也支持和普通的網絡框架一樣,僅僅使用網絡 I/O線程部分。
下面分析libeasy的原理和實現,由於libeasy基於libev,對libev不了解的參見 這
這篇主要分析OceanBase 0.4的mergeserver使用libeasy作為服務器端的模式,客戶端自然就是MySQL客戶端。OceanBase僅僅使用了libeasy的IO線程部分,工作線程是使用了我們自己的線程池。
主要說如下幾個方面:
一、OceanBase啟動時的使用模式
二、 基礎數據結構
2.1 easy_list_t
2.2 easy_pool_t
2.3 easy_buf_t
2.4 easy_connection_t
三、 連接建立
四、 同步處理(OceanBase少量使用這種模式)
五、 異步處理(OceanBase大量采用這種模式)
六、 資源管理
一、OceanBase啟動時的使用模式
libeasy內與OceanBase使用模式相關的各個對象之間關系如下圖所示:
OceanBase主要用到的是libeasy的IO線程池部分,沒有用到libeasy內置的工作線程池。libeasy采用one event loop per thread的模式,即每個線程一個event loop,
內存資源每連接自管理,連接之間的資源互不干涉。每個連接(easy_connection_t)上有可以有多個消息(easy_message_t),通過鏈表連起來,每個消息可以由多個請求組成(easy_request_t),也通過鏈表連起來。
OceanBase 0.4的ObMySQLServer啟動的時候,會初始化libeasy相關的東西,主要步驟如下:
//設置一堆給libeasy的回調函數 memset(&handler_, 0, sizeof(easy_io_handler_pt)); //以下都是對於OceanBase 0.4 mergeserver的obmysql端口來說的 // 將mergeserver需要回復給mysql客戶端的結果以easy_buf_t(libeasy用來管理輸入輸出緩沖區的數據結構)的形式加到請求所屬於的easy_connection_t(TCP連接)的輸出緩沖區鏈表中 handler_.encode = ObMySQLCallback::encode; // libeasy回調這個函數用於從該連接的輸入緩沖區中反序列化出一個符合MySQL協議的包,然后吐給上層使用 handler_.decode = ObMySQLCallback::decode; // 對於每個decode出來的包進行實際的處理,在OceanBase的實現中,我們大多數時候僅僅是將包push到我們自己的工作隊列中,在這種情況下返回的不是EASY_OK這個錯誤碼,因為目前我們還沒有對 // 這個包進行實質上的處理,還沒有為這個包產生結果。少數情況下,當我們接收到的包的大小超過2MB的時候,process這個函數會返回EASY_OK,並且會為這個請求產生一個響應結果,一個MySQL的 // error packet,將其掛在request->opacket上,當libeasy看到了返回EASY_OK后,就會調用encode方法將opacket給掛在連接的輸出緩沖區鏈表中,隨后將其發送出去 handler_.process = ObMySQLCallback::process; handler_.get_packet_id = ObMySQLCallback::get_packet_id; handler_.on_disconnect = ObMySQLCallback::on_disconnect; // 登錄邏輯,在libeasy發現listenfd上有讀事件時,會將連接接下來,然后給MySQL客戶端發送握手包,同時接受客戶端發送過來的用戶名密碼等信息,最后進行服務器端的驗證,這幾次 //交互過程是不經過libeasy網絡框架的 handler_.on_connect = ObMySQLCallback::on_connect;
// 用於當請求處理完畢后,告訴工作線程不要再發包了 handler_.cleanup = ObMySQLCallback::clean_up; eio_ = easy_eio_create(eio_, io_thread_count_); eio_->tcp_defer_accept = 0; easy_listen_t* listen = easy_connection_add_listen(eio_, NULL, port_, &handler_); rc = easy_eio_start(eio_); easy_eio_wait(eio_);
easy_eio_create(eio_, io_thread_count_)做了如下幾件事:
1. 分配一個easy_pool_t, 用來存放easy_io_t對象,io_thread_count_個io線程(easy_io_thread_t),初始化針對每個eio(一般系統就只有一個)的統計信息結構(easy_summary_t)
2. 設置一些tcp參數,比如tcp_defer_accept,tcp_nodelay,設置一些負載保護參數,比如EASY_CONN_DOING_REQ_CNT,表示每個連接同時正在處理的請求數不能
超過EASY_CONN_DOING_REQ_CNT
3. 初始化每個io線程:
3.1 初始化其各個鏈表節點成員,比如conn_list(已建立連接但是讀寫事件還沒有監聽的連接鏈表), connected_list(連接已建立並且事件已監聽的連接鏈表),request_list(已處理完成但是還沒有將結果發送出去的請求鏈表)等
3.2 統計信息初始化,例如io線程同時正在處理的請求數,已經處理的請求數
3.3 初始化成員變量listen_watcher, 每100ms觸發一次對於listen的切換(回調函數easy_connection_on_listen),實際上,在剛啟動的時候,是100ms,當有IO線程搶到listen的權利后,這個timer會被改成60s,隨后,每60s進行一次listen的切換,而之前擁有listen權利的IO線程則會停掉它的read_watcher
3.4 設定io線程的執行體函數easy_io_on_thread_start
3.5 設定io線程被喚醒時的回調函數easy_connection_on_wakeup(設置為成員變量ev_async thread_watcher的cb上,當io線程被喚醒時,這個thread_watcher被觸發,從而回調).
3.6 計算出io線程在io線程池中的下標放在io線程的成員變量idx中
3.7 為這個線程分配一個event loop
easy_listen_t * listen = easy_connection_add_listen(eio_, NULL, port_, &handler_) : 增加一個listen地址,並且設置回調函數
easy_listen_t定義如下:
struct easy_listen_t { int fd;
// read_watcher的下標 int8_t cur, old; int8_t hidden_sum;
//如果為1,則所有線程可以監聽同一個地址 uint8_t reuseport : 1; // 監聽地址 easy_addr_t addr; //各種回調函數的集合 easy_io_handler_pt *handler; // 多個io線程競爭listen的鎖 easy_atomic_t listen_lock; //當前listen權利被哪個IO線程擁有 easy_io_thread_t *curr_ioth; easy_io_thread_t *old_ioth; easy_listen_t *next; //有多少個IO線程就有多少個watcher, 每個watcher都監聽fd上的EV_READ和EV_CLEANUP事件 ev_io read_watcher[0]; };
easy_connection_add_listen:
從eio->pool中為easy_listen_t和io線程個數個ev_io分配空間,開始監聽某個地址,初始化每個read_watcher,關注listen fd上的讀事件,設置其回調函數 為easy_connection_on_accept,在這里僅僅是初始化read_watcher, 還沒有激活,激活在每個IO線程啟動(easy_io_on_thread_start)的時候做。一旦激活后,當有連接到來的時候,觸發easy_connection_on_accept
easy_eio_start(eio_):
將eio_ 上掛着的所有的線程池中的所有線程全部啟動,每個IO線程的執行函數體easy_io_on_thread_start做如下幾件事:
1. 可以選擇設置是否屏蔽信號,可以設置CPU親緣性
2. 選擇一個線程listen:通過listen_watcher的方式,或者如果只有一個線程或者設置了socket的SO_REUSEPORT標記,則所有線程一起監聽,同時將listen的線程的read_watcher激活,從而下次來新連接的時候,就可以調用回調函數easy_connection_on_accept來接收新連接了。
二、基礎數據結構
在libeasy中,有如下一些重要的數據結構,分別為
2.1 easy_list_t:鏈表結構,只有兩個指針next,prev,可以用於所有的元素類型,如同內核中的鏈表,假設有一個easy_request_t的元素的鏈表,遍歷它的代碼如下:
// 假設有一個指針easy_list_t *request_list指向的是一個easy_request_t鏈表的頭 easy_request_t *r, *rn; // request_list_node 它是easy_request_t結構體中的一個成員:easy_list_t *request_list_node,用於將easy_request_t串起來 easy_list_for_each_entry_safe(r, rn, request_list, request_list_node) { //r指向鏈表中第一個easy_request_t元素 //rn指向鏈表中第二個easy_request_t元素 }
這里用到easy_list_for_each_entry_safe和easy_list_entry兩個宏,就以這個例子來解釋一下這兩個宏:
宏easy_list_for_each_entry_safe:
#define easy_list_for_each_entry_safe(pos, n, head, member) \ for (pos = easy_list_entry((head)->next, typeof(*pos), member), \ n = easy_list_entry(pos->member.next, typeof(*pos), member); \ &pos->member != (head); \ pos = n, n = easy_list_entry(n->member.next, typeof(*n), member))
宏easy_list_entry:這個宏的作用就是根據鏈表節點指針找到對應元素的首地址
#define easy_list_entry(ptr, type, member) ({ \ const typeof( ((type *)0)->member ) *__mptr = (ptr); \ // 得到easy_request_t中成員request_list_node的指針 (type *)( (char *)__mptr - offsetof(type,member) );}) // 指針減去 request_list_node成員在easy_request_t中的偏移就得到了easy_request_t的首地址
其中offsetof(type,member)是得到member這個這個成員在結構type中的偏移量(man offsetof),typeof(((type*)0)->member)得到參數的類型。
2.2 easy_pool_t :內存池,和nginx的內存池實現幾乎一樣,見http://www.alidata.org/archives/1390 它不是一個全局的內存池,libeasy中可以有很多個,比如對於每個新的連接產生一個easy_pool_t
2.3 easy_buf_t : 用於管理連接的輸入輸出緩沖區
#define EASY_BUF_DEFINE \ easy_list_t node; \ int flags; \ //當easy_buf_t不再使用時調用cleanup easy_buf_cleanup_pt *cleanup; \ void *args; struct easy_buf_t { EASY_BUF_DEFINE; //buf開始處 char *pos; // 下次從這開始寫,或者從已經讀到這 char *last; //buf結束處 char *end; };
2.4 easy_connection_t:封裝一個TCP連接,在libeasy中,一個easy_message_t可以包含一個或者多個easy_request_t,easy_request_t就相當於應用層的一個具體的包,例如
在OceanBase 0.4中,一個 easy_request_t 對應於一個mysql客戶端發過來的mysql協議包
struct easy_connection_t { //這個event loop監聽這個連接的事件 struct ev_loop *loop; //該連接是在這個內存池上分配的 easy_pool_t *pool; // 該連接所屬的io線程 easy_io_thread_t *ioth; //用於鏈表 easy_connection_t *next; // 用於串鏈表,比如連接建立后,通過這個鏈表節點,將其串到io線程的已連接鏈表中 easy_list_t conn_list_node; // file description // default_message_len 默認是16, 大小是8KB, first_message_len默認是2, 大小是1KB // libeasy對於每個連接收數據的時候,如果上次對於這個連接已經收到了1個或者n個完整的包(easy_request_t對應於一個完整的包,在OB 0.4,對應一個mysql協議的包), // 那么會重新分配一個8KB大小的easy_pool_t,然后再上面easy_message_t所需要的內存,最主要就是easy_message_t的輸入緩沖區easy_buf_t *input所需要的內存, // 這里,會為easy_buf_t分配1KB的內存,一個數據包所占用的內存是不會跨easy_buf_t的邊界的,讀連接的數據的時候,如果easy_buf_t里空間不夠,會分配一個足夠大的 // easy_buf_t, 然后將原來的數據拷貝過來 uint16_t first_message_len, default_message_len; int reconn_time, reconn_fail; int idle_time; // fd: socket fd, // seq: 這是系統accept的第seq個連接 int fd, seq; easy_addr_t addr; // 該連接fd的讀事件的watcher ev_io read_watcher; // 該連接fd的寫事件的watcher ev_io write_watcher; // 該連接fd的超時事件的watcher ev_timer timeout_watcher; // 將該連接接收到的所有的easy_message_t串在一起 easy_list_t message_list; // 輸出緩沖區鏈表,實際上就是easy_buf_t的鏈表 easy_list_t output; //應用層注入的各種回調函數的集合,比如decode,在OceanBase 0.4中,就是解析mysql協議 //on_connect,在OceanBase 0.4中,on_connect會驗證用戶名和密碼 easy_io_handler_pt *handler; // 對連接數據的讀,就是簡單封裝recv系統調用 easy_read_pt *read; // 將output寫出 easy_write_pt *write;
//用於libeasy作為客戶端的時候,再下一篇中講述 easy_client_t *client; easy_list_t session_list; easy_hash_t *send_queue; void *user_data; easy_uthread_t *uthread; //user thread //一堆TCP的參數 uint32_t status : 4; uint32_t event_status : 4; uint32_t type : 1; uint32_t async_conn : 1; uint32_t conn_has_error : 1; uint32_t tcp_cork_flag : 1; uint32_t wait_close : 1; uint32_t need_redispatch : 1; uint32_t read_eof : 1; uint32_t auto_reconn : 1; uint32_t life_idle : 1; //當前connection上同時正在處理的easy_request_t的個數 uint32_t doing_request_count; ev_tstamp start_time, last_time; ev_tstamp wait_client_time, wcs; // 統計信息,比如連接上走的流量 easy_summary_node_t *con_summary; //add for summary // 不關注 easy_ssl_connection_t *sc; };
三、連接建立
如前所述,當listen fd上有可讀事件時,IO線程相應的read_watcher會被觸發,從而回調easy_connection_on_accept函數接受新連接。
easy_connection_on_accept主要做如下幾件事:
1. accept將連接接下來
2. 調用easy_connection_new為返回的fd新建一個easy_connection_t
2.1 分配一個easy_pool_t,專門用來存easy_connection_t, 並且設置其各個成員:
// 表示這個連接是在pool上分配的,對於連接的引用計數也是記在pool上的
c->pool = pool;
// 用於重連的一個參數,100ms,用於libeasy作為客戶端,現不關注 c->reconn_time = 100;
c->idle_time = 60000;
// 在前面easy_connection_t中有解釋 c->first_message_len = 2; // 1Kbyte c->default_message_len = (EASY_IO_BUFFER_SIZE >> 9); // 8Kbyte
//往連接上讀寫的函數 c->read = easy_socket_read; c->write = easy_socket_write;
//連接上message的鏈表 easy_list_init(&c->message_list);
// 用於libeasy IO線程喚醒工作線程(對於OceanBase 0.4來說,就是ObPacketQueueThread)
// 當OceanBase同步寫數據給MySQL客戶端時,工作線程每發一個包就阻塞住,直到IO線程將其喚醒再發下一個包,session_list就存放這些工作線程正在處理的request easy_list_init(&c->session_list);
// 用於串連接到鏈表中 easy_list_init(&c->conn_list_node);
// 該連接上的輸出緩沖區鏈表,每個元素是一個easy_buf_t easy_list_init(&c->output);
3. 設置返回的socket fd為非阻塞
4.將回調函數集合easy_io_handler_pt, 調用回調函數on_connect(),在OB中,用於驗證用戶名密碼。
5.初始化該連接的read_watcher, write_watcher,和timeout_watcher,並且設置其回調函數分別為easy_connection_on_readable,easy_connection_on_writable和easy_connection_on_timeout_conn(注意,僅僅是初始化,還沒有激活)
6. 激活該連接的read_watcher(即讓該連接所屬的IO線程的event loop監聽這個連接的讀事件), 設置其回調函數為easy_connection_on_readable(這里有一些細節,例如如果設置了tcp_defer_accept參數,則如果連接上沒有數據,則該連接不會返回,IO線程阻塞住)
7. 將該連接加入了所屬IO線程的已連接鏈表中(connected_list)
至此連接建立了。
四、 同步處理
這里的同步是指libeasy的IO線程回調應用層的process函數后對這個包的業務邏輯處理即結束。不需要應用層的工作線程的參與。這種模式下,process函數應該直接返回EASY_OK. 以OceanBase 0.4的obmysql接口為例,當用戶輸入的包的大小大於2MB的時候,應用層process函數直接構造一個MySQL的error packet,掛在連接的輸出緩沖區鏈表中(r->retcode默認為EASY_OK)。隨后輸出緩沖區中的數據被寫出,請求結束。
下面就以從MySQL客戶端接收到一個大於2MB的包為例:
顯然每個連接上有可讀事件的時候都會回調easy_connection_on_readable函數:該函數流程如下:
1. 檢查當前IO線程同時正在處理的請求是否超過EASY_IOTH_DOING_REQ_CNT(8192),當前連接上的請求數是否超過EASY_CONN_DOING_REQ_CNT(1024),如果超過,則調用easy_connection_destroy(c)將連接銷毀掉, 提供了一種負載保護機制
2. 檢查上一次收到的message(easy_message_t)是不是完整的,即收到了一條或者多條(easy_request_t),一個easy_request_t相當於一個請求包,貫穿着請求的輸入,處理和輸出整個流程。如果收到的是一個完整的message(判斷message的狀態status, status == EASY_MESG_READ_AGAIN說明不完整),那么就調用easy_message_create函數創建一個8KB的easy_pool_t,然后在其上分配一個easy_message_t結構,再分配一個1KB大小的easy_buf_t作為輸入緩沖區將其掛在easy_message_t的input成員上。然后設置message的pool,並將其引用計數初始化為1,並將next_read_len設置為1KB
#define EASY_MESSAGE_SESSION_HEADER \ easy_connection_t *c; \ easy_pool_t *pool; \ int8_t type; \ int8_t async; \ int8_t status; \ int8_t error; // 用於接收, 一個或多個easy_request_t struct easy_message_t { EASY_MESSAGE_SESSION_HEADER int recycle_cnt; //該連接的輸入緩沖區 easy_buf_t *input; //用於將一個連接的所有的message串起來 easy_list_t message_list_node; easy_list_t request_list; easy_list_t all_list; //該message上request的個數 int request_list_count; //下次需要讀取的數據的長度 int next_read_len; void *user_data; };
3. 調用easy_buf_check_read_space檢查連接的輸入緩沖區input中是否有next_read_len個字節的空間,如果沒有,則繼續在message的pool上分配next_read_len個字節的空間,並且將原來的輸入緩沖區的數據拷貝過來,將這個next_read_len字節大小的緩沖區作為新的輸入緩沖區
4. 調用 c->read從連接讀數據,實際上調用的是函數easy_socket_read,該函數只是簡單的封裝了recv()。設置連接的成員read_eof,如果讀到的數據小於next_read_len,則將其設置為1,主要用於異常情況下關閉連接的情形
5. 更新連接的統計信息con_summary
6. 如果是作為服務器端,則調用easy_connection_do_request(m),否則調用 easy_connection_do_response(m),目前僅關注服務器端。
easy_connection_do_request(m)流程如下:
// ipacket放進來的包, opacket及出去的包 struct easy_request_t { //所屬的message easy_message_session_t *ms; easy_list_t request_list_node; easy_list_t all_node; int16_t retcode, status; int reserved; //請求開始時間 ev_tstamp start_time; void *ipacket; void *opacket; void *args; void *user_data; // waitobj,用於IO線程喚醒工作線程,封裝pthread_mutex_t和pthread_cond_t easy_client_wait_t *client_wait; };
1. 回調應用層實現的decode函數,然后在message的pool上分配一個easy_request_t,再將decode出來的packet(對於OceanBase 0.4來說,就是ObMySQLCommandPacket)掛在request的ipacket上. 然后將該請求加入到message的request_list中
2. 修改一些統計信息
3. 設置message的status,當輸入緩沖區中還有空閑空間時,說明沒有收到一個完整的request,則將message的status設置為EASY_MESG_READ_AGAIN,這正是前面用來判斷是否接收到了一個完整的message的判斷標記
4. 調用easy_connection_process_request函數對剛才decode出來的所有的request進行處理
easy_connection_process_request是處理請求的函數,主要流程如下:
1. 對於每個request,
1.1 將這個request從所在的message中摘下來(request_list_node)
1.2. 回調應用層傳入的process:
process函數返回EASY_OK, 則說明應用已經處理完了這個請求了(這種情況出現在當OceanBase 0.4接收到了一個大於2MB的包的時候),接着調用easy_connection_request_done函數。
easy_connection_request_done函數處理:
1.2.1 回調應用層定義的encode,對於OceanBase 0.4來說,比如在create table這種回復只需要回復一個包的情形來說,就是將封裝了MySQL回復包的easy_buf_t(r->opacket)給掛在該請求所在連接的輸出緩沖區鏈表(c->output)后面。
1.2.2 調用easy_request_set_cleanup(r, &c->output), 將message的引用計數加1,以防結果還沒有輸出message就被析構了。然后設置一個cleanup(實際上是函數easy_request_cleanup)函數掛在輸出緩沖區鏈表上的easy_buf_t(也就是剛剛掛上去的那個buf)上,當buf被寫出去后不再使用的時候,會回調cleanup。
1.2.3 將request的status設置成EASY_REQUEST_DONE
1.3. 當連接的輸出緩沖區鏈表中積累了128個沒發出去,或者不使用tcp nagle算法的時候,則調用easy_connection_write_socket(一次寫不完會再次啟寫事件)主動往連接寫一次數據
1.4. 檢查request所在的message上是否還有request,如果沒有請求需要處理並且沒有接收到了一半的請求,則調用easy_message_destroy(m, 1),將這個message從m所屬的連接上的message鏈表中摘除,把m的status設置為EASY_MESG_DESTROY,最后將message的引用計數減1.如果message的引用計數變成了0,則將和message關聯的easy_pool_t整個給釋放掉
2. 調用easy_connection_write_socket將連接的輸出緩沖區鏈表中的數據寫出,這個函數調用c->write(實際調用easy_socket_write)將數據寫出,easy_socket_write使用writev系統調用將數據寫出,同時,對於已經寫出去的easy_buf_t,調用easy_buf_destroy函數,它會回調buf的cleanup函數,如前所屬,實際上是easy_request_cleanup函數.easy_request_cleanup函數做了如下幾件事:
2.1 調用easy_request_server_done,修改一些統計信息,並且回調應用層定義的cleanup函數。對於我們這種同步處理的場景來說,什么都沒做。
2.2 調用easy_message_destroy((easy_message_t *)r->ms, 0),將message的引用計數減1,到此message的引用計數變為0,將這個message的easy_pool_t銷毀,從而內存被釋放。
請求結束。
五、異步處理
異步處理流程:IO線程將數據接下來后,調用應用層定義的process方法,應用層process方法不返回EASY_OK,返回EASY_AGAIN,將packet丟到工作線程隊列,同時IO線程將這個異步請求丟到連接的session鏈表中,並且啟動寫(start write watcher)。隨后,應用層的工作線程從工作隊列里面拿出packet,進行處理,檢索出結果包,將其掛在r->opacket上,然后調用easy_request_wakeup(req), 將請求掛在IO線程的已完成的請求鏈表中,並且喚醒IO線程,隨后工作線程阻塞在一個信號量上。IO線程被喚醒后,將所有的輸出buf都寫出,然后再次回調process函數,process函數看到請求的retcode等於EASY_OK,則signal信號量,從而工作線程被喚醒。
典型的,select請求這種需要回復多個包給MySQL客戶端的場景使用的都是這種模式
其它諸如只需要回復一個包給MySQL客戶端的DML操作,例如INSERT,UPDATE等也使用這種模式,只是工作線程由於只需要發一個包,所以不會阻塞在信號量上,直接就返回了
詳細分析如下:
message的引用計數初始化為1
前面讀取輸入,decode過程都類似,不同的是easy_connection_process_request函數:
對於這種需要工作線程處理的request,
1.1 將這個request從所在的message中摘下來(request_list_node)
1.2 回調應用層定義的process,process將包放入到工作隊列,然后process(message引用計數加1)返回EASY_AGAIN。
隨后,工作線程從工作隊列中拿到了請求包,處理,將結果包封裝成一個easy_buf_t,掛在請求r的opacket上,隨后調用easy_request_wakeup(r)將請求r掛在IO線程的已完成請求鏈表,並且喚醒IO線程,然后調用wait_client_obj(*wait_obj),wait在easy_client_wait_t的一個信號量上。
IO線程被喚醒:首先會做一些其他的事情(比如從自己的conn_list隊列中取出從其它的IO線程遷移過來的連接,然后監聽這個連接原有的讀寫事件后將其加到本IO線程的已連接鏈表中(connected_list))。然后調用easy_connection_send_response。
easy_connection_send_response:
遍歷IO線程的已完成請求鏈表:將請求所在message的引用計數減1(此時引用計數為1),然后執行easy_connection_request_done
easy_connection_request_done:
和前面一樣:回調應用層定義的encode方法將r->opacket掛在連接的輸出緩沖區鏈表output中,然后調easy_request_set_cleanup,給剛才加入的輸出buf設置cleanup回調函數,並且將message的引用計數加1(此時引用計數為2),由於此時請求r的retcode等於EASY_AGAIN,所以和同步處理不同的是,這里會將該請求加入到該連接的session_list鏈表中,然后激活該連接的寫(ev_io_start(c->loop, &c->write_watcher))。函數結束。
隨后調用easy_connection_write_socket,繼而調用c->write(實際是easy_socket_write)將數據寫入連接,同時回調buf的cleanup,message引用計數減1(此時引用計數等於1)由於fd可寫,隨后下次event loop會直接返回,回調
該連接write_watcher的回調函數easy_connection_on_writable方法:
1. 調用easy_connection_write_socket繼續寫數據,如果沒有數據可寫,將write_watcher停掉
2. 將連接中的session_list拿出來,對於每個請求調用應用層process方法,這是由於請求r的返回值retcode是EASY_AGAIN,所以在proces中,會調用easy_client_wait_wakeup_request(message引用計數加1,此時引用計數為2),從而signal信號量,將工作線程喚醒。這是工作線程發第一個包的情形,隨后發第二個包,第三個包……如此反復。
當發最后一個包時,message的引用計數為2,process函數返回EASY_OK,將最后一個包掛在請求r的opacket上,然后執行easy_request_wakeup(r)將請求掛在IO線程的已完成鏈表中,然后喚醒IO線程,IO線程執行回調函數easy_connection_on_wakeup,在函數最后調用easy_connection_send_response處理這個IO線程所有已完成請求,處理流程如下:
對於每個請求,在這里只關注request的最后一個回復包:
1. 將請求r所在的message的引用計數減1(此時引用計數為1)
2. 執行easy_connection_request_done,將請求r的opacket掛在連接的輸出緩沖區鏈表output上。
3. 調用easy_request_set_cleanup,設置buf的cleanup,並且將message引用計數加1(此時引用計數為2)
4. 由於請求r的返回碼retcode為EASY_OK,則更新一些統計信息,將r的status狀態值為EASY_REQUEST_DONE
5. 判斷是否message上還有請求,這里已經沒有了,調用easy_message_destroy(m, 1),將message從連接的message鏈表中刪除,並將message的引用計數減1,,此時message的引用計數為1.
6. 調用easy_connection_write_socket將數據寫出,然后調用easy_buf_destroy(buf),從而會調buf的cleanup函數easy_request_cleanup,將message的引用計數減1,此時引用計數變成0,將message的pool銷毀,從而內存釋放。
六、資源管理
可以看出,libeasy管理內存是以連接為單位的,更具體說,是基於message的。當新建一個連接的時候,會分配1+N個內存池,N指連接上message的個數。
1:用來存儲連接本身的元信息(easy_connection_t)
N:每個完整的message會分配一個對應的內存池,這個內存池用來存儲請求元信息(easy_request_t),這個message的輸入緩沖區,輸出緩沖區的內存可以應用層進行分配,也可以由這個內存池進行分配。
message的銷毀和連接的銷毀都是基於引用計數,當引用計數為0時,相應的內存池被銷毀。
message的引用計數加1的場景:
1. 將結果包掛在連接的輸出緩沖區時引用計數加1,因為結果包的元信息easy_buf_t是在message的內存池中分配的,結果沒有返回,內存池不能銷毀。
2. 異步處理模式下process時引用計數也要加1,因為請求顯然還沒有處理完成,而請求也是在message的內存池中分配的,內存池不能銷毀。
3. 工作線程被IO線程喚醒,工作線程繼續處理,相當於請求還沒有結束,理由同2,message的引用計數繼續+1,內存池不能被銷毀
message的引用計數減1的場景:
1. 當結果buf被發出去后,調用cleanup函數將message的引用計數減1,與上面的1對應。
2. 當IO線程被喚醒后,處理被工作線程標記為已完成(easy_request_wakeup)的請求(easy_connection_send_response函數中)時,將message的引用計數減1,與上面的2對應。
3. IO線程處理完請求,確認是否message上面的請求是否都已經處理完成(ret == EASY_OK && m->request_list_count == 0 && m->status != EASY_MESG_READ_AGAIN),如果處理完成則將message的引用計數減1。需要注意的是,message的引用計數初始狀態為1。這次的引用計數減1正是對應這個情況。
相對與message的引用計數,connection的引用計數簡單很多,因為本身connection對應於的pool只存儲connection相關的元數據。
connection的引用計數加1的場景:
1. 應用層的process函數中將連接的引用計數加1。
2. 當IO線程通過easy_client_wait_wakeup_request 喚醒工作線程時,將連接的引用計數加1。
connection的引用計數減1的場景:
1. 每次IO線程調用easy_connection_send_response給所有的已完成請求發應答后,將連接的引用計數減1
connection的銷毀:
每次對連接進行讀寫,都會更新其last_time,如果連接的引用計數大於0的時候(異常情況下)執行了easy_connection_destroy(),則會將連接的讀寫watcher關掉,但是連接沒有關掉,導致OS實際上還在接包,但是libeasy沒有對其進行處理,導致超時,在這種情況下,會起一個周期性的timeout watcher,每0.5秒檢查一下:對於狀態不等於EASY_CONN_OK的連接,判斷現在距離c->last_time是否超過了時間force_destroy_second,如果超過了,則將連接的引用計數強行置為0,隨后close連接並且銷毀。