OceanBase使用libeasy原理源碼分析:服務器端


libeasy是個網絡框架,這個網絡框架基於事件驅動模型,libeasy可以有多個網絡I/O線程,每個網絡I/O線程一個event loop,事件驅動模型基於開源的libev實現。

我認為,libeasy不同於其它的網絡框架比如tbnet,muduo。tbnet,muduo等網絡框架的目的就是向應用層暴露出簡單的發包和收包的接口,讓應用層從底層發包和收包的處理細節中解放出來,使得應用層能更加專注於業務邏輯的實現,為了做到這些,網絡框架幫助應用程序管理連接,管理輸入輸出緩沖區,處理具體的發包收包等細節和錯誤的處理,處理流控,並且允許應用層注入封包,解包,新建連接時處理,斷開連接時處理,收到包后處理包的邏輯等。libeasy與這些網絡框架稍不同,從高層次看,libeasy中的線程分為業務邏輯線程和網絡I/O線程,無論哪種線程,線程都有唯一的一個event loop,工作的時候,I/O線程和網絡I/O線程有對應關系,他們之間通過pipe來實現線程的喚醒,原理就是業務邏輯線程的event loop監聽pipe的讀端,當I/O線程A接收到包的時候,往pipe的寫端寫入數據,業務邏輯線程event loop返回,從任務鏈表中取任務執行。可以說,libeasy已經超出了網絡框架的范疇,但是libeasy也支持和普通的網絡框架一樣,僅僅使用網絡 I/O線程部分。

 下面分析libeasy的原理和實現,由於libeasy基於libev,對libev不了解的參見  

這篇主要分析OceanBase 0.4的mergeserver使用libeasy作為服務器端的模式,客戶端自然就是MySQL客戶端。OceanBase僅僅使用了libeasy的IO線程部分,工作線程是使用了我們自己的線程池。

主要說如下幾個方面:

一、OceanBase啟動時的使用模式

二、 基礎數據結構

  2.1 easy_list_t

  2.2 easy_pool_t

  2.3 easy_buf_t

  2.4 easy_connection_t

三、 連接建立

四、 同步處理(OceanBase少量使用這種模式)

五、 異步處理(OceanBase大量采用這種模式)

六、 資源管理

 

一、OceanBase啟動時的使用模式 

libeasy內與OceanBase使用模式相關的各個對象之間關系如下圖所示:

 OceanBase主要用到的是libeasy的IO線程池部分,沒有用到libeasy內置的工作線程池。libeasy采用one event loop per thread的模式,即每個線程一個event loop,

內存資源每連接自管理,連接之間的資源互不干涉。每個連接(easy_connection_t)上有可以有多個消息(easy_message_t),通過鏈表連起來,每個消息可以由多個請求組成(easy_request_t),也通過鏈表連起來。

OceanBase 0.4的ObMySQLServer啟動的時候,會初始化libeasy相關的東西,主要步驟如下:

//設置一堆給libeasy的回調函數
memset(&handler_, 0, sizeof(easy_io_handler_pt));   
//以下都是對於OceanBase 0.4 mergeserver的obmysql端口來說的
// 將mergeserver需要回復給mysql客戶端的結果以easy_buf_t(libeasy用來管理輸入輸出緩沖區的數據結構)的形式加到請求所屬於的easy_connection_t(TCP連接)的輸出緩沖區鏈表中
handler_.encode = ObMySQLCallback::encode;
// libeasy回調這個函數用於從該連接的輸入緩沖區中反序列化出一個符合MySQL協議的包,然后吐給上層使用
handler_.decode = ObMySQLCallback::decode;
// 對於每個decode出來的包進行實際的處理,在OceanBase的實現中,我們大多數時候僅僅是將包push到我們自己的工作隊列中,在這種情況下返回的不是EASY_OK這個錯誤碼,因為目前我們還沒有對
// 這個包進行實質上的處理,還沒有為這個包產生結果。少數情況下,當我們接收到的包的大小超過2MB的時候,process這個函數會返回EASY_OK,並且會為這個請求產生一個響應結果,一個MySQL的
// error packet,將其掛在request->opacket上,當libeasy看到了返回EASY_OK后,就會調用encode方法將opacket給掛在連接的輸出緩沖區鏈表中,隨后將其發送出去
handler_.process = ObMySQLCallback::process;
handler_.get_packet_id = ObMySQLCallback::get_packet_id;
handler_.on_disconnect = ObMySQLCallback::on_disconnect;
// 登錄邏輯,在libeasy發現listenfd上有讀事件時,會將連接接下來,然后給MySQL客戶端發送握手包,同時接受客戶端發送過來的用戶名密碼等信息,最后進行服務器端的驗證,這幾次
//交互過程是不經過libeasy網絡框架的
handler_.on_connect = ObMySQLCallback::on_connect;
// 用於當請求處理完畢后,告訴工作線程不要再發包了 handler_.cleanup = ObMySQLCallback::clean_up; eio_ = easy_eio_create(eio_, io_thread_count_); eio_->tcp_defer_accept = 0; easy_listen_t* listen = easy_connection_add_listen(eio_, NULL, port_, &handler_); rc = easy_eio_start(eio_); easy_eio_wait(eio_);

  

easy_eio_create(eio_, io_thread_count_)做了如下幾件事:

1. 分配一個easy_pool_t, 用來存放easy_io_t對象,io_thread_count_個io線程(easy_io_thread_t),初始化針對每個eio(一般系統就只有一個)的統計信息結構(easy_summary_t)

2. 設置一些tcp參數,比如tcp_defer_accept,tcp_nodelay,設置一些負載保護參數,比如EASY_CONN_DOING_REQ_CNT,表示每個連接同時正在處理的請求數不能

超過EASY_CONN_DOING_REQ_CNT

3. 初始化每個io線程:

    3.1 初始化其各個鏈表節點成員,比如conn_list(已建立連接但是讀寫事件還沒有監聽的連接鏈表), connected_list(連接已建立並且事件已監聽的連接鏈表),request_list(已處理完成但是還沒有將結果發送出去的請求鏈表)等

    3.2 統計信息初始化,例如io線程同時正在處理的請求數,已經處理的請求數

    3.3 初始化成員變量listen_watcher, 每100ms觸發一次對於listen的切換(回調函數easy_connection_on_listen),實際上,在剛啟動的時候,是100ms,當有IO線程搶到listen的權利后,這個timer會被改成60s,隨后,每60s進行一次listen的切換,而之前擁有listen權利的IO線程則會停掉它的read_watcher

    3.4 設定io線程的執行體函數easy_io_on_thread_start

    3.5 設定io線程被喚醒時的回調函數easy_connection_on_wakeup(設置為成員變量ev_async thread_watcher的cb上,當io線程被喚醒時,這個thread_watcher被觸發,從而回調).

    3.6 計算出io線程在io線程池中的下標放在io線程的成員變量idx中

    3.7 為這個線程分配一個event loop

easy_listen_t * listen = easy_connection_add_listen(eio_, NULL, port_, &handler_) : 增加一個listen地址,並且設置回調函數

easy_listen_t定義如下:

 struct easy_listen_t {
     int                     fd;
// read_watcher的下標 int8_t cur, old; int8_t hidden_sum;
//如果為1,則所有線程可以監聽同一個地址 uint8_t reuseport : 1; // 監聽地址 easy_addr_t addr; //各種回調函數的集合 easy_io_handler_pt *handler; // 多個io線程競爭listen的鎖 easy_atomic_t listen_lock; //當前listen權利被哪個IO線程擁有 easy_io_thread_t *curr_ioth; easy_io_thread_t *old_ioth; easy_listen_t *next; //有多少個IO線程就有多少個watcher, 每個watcher都監聽fd上的EV_READ和EV_CLEANUP事件 ev_io read_watcher[0]; };

easy_connection_add_listen:

從eio->pool中為easy_listen_t和io線程個數個ev_io分配空間,開始監聽某個地址,初始化每個read_watcher,關注listen fd上的讀事件,設置其回調函數   為easy_connection_on_accept,在這里僅僅是初始化read_watcher, 還沒有激活,激活在每個IO線程啟動(easy_io_on_thread_start)的時候做。一旦激活后,當有連接到來的時候,觸發easy_connection_on_accept

easy_eio_start(eio_):

    將eio_ 上掛着的所有的線程池中的所有線程全部啟動,每個IO線程的執行函數體easy_io_on_thread_start做如下幾件事:

    1. 可以選擇設置是否屏蔽信號,可以設置CPU親緣性

    2. 選擇一個線程listen:通過listen_watcher的方式,或者如果只有一個線程或者設置了socket的SO_REUSEPORT標記,則所有線程一起監聽,同時將listen的線程的read_watcher激活,從而下次來新連接的時候,就可以調用回調函數easy_connection_on_accept來接收新連接了。

二、基礎數據結構 

在libeasy中,有如下一些重要的數據結構,分別為

2.1 easy_list_t:鏈表結構,只有兩個指針next,prev,可以用於所有的元素類型,如同內核中的鏈表,假設有一個easy_request_t的元素的鏈表,遍歷它的代碼如下:

// 假設有一個指針easy_list_t *request_list指向的是一個easy_request_t鏈表的頭
easy_request_t              *r, *rn;
// request_list_node 它是easy_request_t結構體中的一個成員:easy_list_t *request_list_node,用於將easy_request_t串起來
easy_list_for_each_entry_safe(r, rn, request_list, request_list_node)
{
  //r指向鏈表中第一個easy_request_t元素
  //rn指向鏈表中第二個easy_request_t元素
}

  

這里用到easy_list_for_each_entry_safe和easy_list_entry兩個宏,就以這個例子來解釋一下這兩個宏:

宏easy_list_for_each_entry_safe:

#define easy_list_for_each_entry_safe(pos, n, head, member)                 \                                                                             
      for (pos = easy_list_entry((head)->next, typeof(*pos), member),       \
              n = easy_list_entry(pos->member.next, typeof(*pos), member);  \
              &pos->member != (head);                                       \
              pos = n, n = easy_list_entry(n->member.next, typeof(*n), member))

  

宏easy_list_entry:這個宏的作用就是根據鏈表節點指針找到對應元素的首地址

 #define easy_list_entry(ptr, type, member) ({                               \
       const typeof( ((type *)0)->member ) *__mptr = (ptr);                  \  // 得到easy_request_t中成員request_list_node的指針
       (type *)( (char *)__mptr - offsetof(type,member) );})                    // 指針減去 request_list_node成員在easy_request_t中的偏移就得到了easy_request_t的首地址

  

其中offsetof(type,member)是得到member這個這個成員在結構type中的偏移量(man offsetof),typeof(((type*)0)->member)得到參數的類型。

2.2 easy_pool_t :內存池,和nginx的內存池實現幾乎一樣,見http://www.alidata.org/archives/1390  它不是一個全局的內存池,libeasy中可以有很多個,比如對於每個新的連接產生一個easy_pool_t

2.3 easy_buf_t : 用於管理連接的輸入輸出緩沖區

  #define EASY_BUF_DEFINE             \
      easy_list_t         node;       \
      int                 flags;      \
      //當easy_buf_t不再使用時調用cleanup
      easy_buf_cleanup_pt *cleanup;   \
      void                *args;
  struct easy_buf_t {                                                                                                                                     
      EASY_BUF_DEFINE;
      //buf開始處
      char                *pos;
      // 下次從這開始寫,或者從已經讀到這
      char                *last;
      //buf結束處
      char                *end;
  };

2.4 easy_connection_t:封裝一個TCP連接,在libeasy中,一個easy_message_t可以包含一個或者多個easy_request_t,easy_request_t就相當於應用層的一個具體的包,例如

在OceanBase 0.4中,一個 easy_request_t 對應於一個mysql客戶端發過來的mysql協議包

struct easy_connection_t {
    //這個event loop監聽這個連接的事件
      struct ev_loop          *loop;
      //該連接是在這個內存池上分配的
      easy_pool_t             *pool;
      // 該連接所屬的io線程
      easy_io_thread_t        *ioth;
      //用於鏈表
      easy_connection_t       *next;
      // 用於串鏈表,比如連接建立后,通過這個鏈表節點,將其串到io線程的已連接鏈表中
      easy_list_t             conn_list_node;
  
      // file description
      // default_message_len 默認是16, 大小是8KB, first_message_len默認是2, 大小是1KB
      // libeasy對於每個連接收數據的時候,如果上次對於這個連接已經收到了1個或者n個完整的包(easy_request_t對應於一個完整的包,在OB 0.4,對應一個mysql協議的包),
      // 那么會重新分配一個8KB大小的easy_pool_t,然后再上面easy_message_t所需要的內存,最主要就是easy_message_t的輸入緩沖區easy_buf_t *input所需要的內存,
      // 這里,會為easy_buf_t分配1KB的內存,一個數據包所占用的內存是不會跨easy_buf_t的邊界的,讀連接的數據的時候,如果easy_buf_t里空間不夠,會分配一個足夠大的
      // easy_buf_t, 然后將原來的數據拷貝過來
      uint16_t                first_message_len, default_message_len;
      int                     reconn_time, reconn_fail;
      int                     idle_time;
      // fd: socket fd,
      // seq: 這是系統accept的第seq個連接
      int                     fd, seq;
      easy_addr_t             addr;
      // 該連接fd的讀事件的watcher
      ev_io                   read_watcher;
      // 該連接fd的寫事件的watcher
      ev_io                   write_watcher;
      // 該連接fd的超時事件的watcher
      ev_timer                timeout_watcher;
      // 將該連接接收到的所有的easy_message_t串在一起
      easy_list_t             message_list;
      // 輸出緩沖區鏈表,實際上就是easy_buf_t的鏈表
      easy_list_t             output;
      //應用層注入的各種回調函數的集合,比如decode,在OceanBase 0.4中,就是解析mysql協議
      //on_connect,在OceanBase 0.4中,on_connect會驗證用戶名和密碼
      easy_io_handler_pt      *handler;
      // 對連接數據的讀,就是簡單封裝recv系統調用
      easy_read_pt            *read;
      // 將output寫出
      easy_write_pt           *write;
//用於libeasy作為客戶端的時候,再下一篇中講述 easy_client_t *client; easy_list_t session_list; easy_hash_t *send_queue; void *user_data; easy_uthread_t *uthread; //user thread //一堆TCP的參數 uint32_t status : 4; uint32_t event_status : 4; uint32_t type : 1; uint32_t async_conn : 1; uint32_t conn_has_error : 1; uint32_t tcp_cork_flag : 1; uint32_t wait_close : 1; uint32_t need_redispatch : 1; uint32_t read_eof : 1; uint32_t auto_reconn : 1; uint32_t life_idle : 1; //當前connection上同時正在處理的easy_request_t的個數 uint32_t doing_request_count; ev_tstamp start_time, last_time; ev_tstamp wait_client_time, wcs; // 統計信息,比如連接上走的流量 easy_summary_node_t *con_summary; //add for summary // 不關注 easy_ssl_connection_t *sc; };

三、連接建立

如前所述,當listen fd上有可讀事件時,IO線程相應的read_watcher會被觸發,從而回調easy_connection_on_accept函數接受新連接。

easy_connection_on_accept主要做如下幾件事:

1. accept將連接接下來

2. 調用easy_connection_new為返回的fd新建一個easy_connection_t

    2.1 分配一個easy_pool_t,專門用來存easy_connection_t, 並且設置其各個成員:

             

 // 表示這個連接是在pool上分配的,對於連接的引用計數也是記在pool上的
c->pool = pool;
// 用於重連的一個參數,100ms,用於libeasy作為客戶端,現不關注 c->reconn_time = 100;
c->idle_time = 60000;
// 在前面easy_connection_t中有解釋 c->first_message_len = 2; // 1Kbyte c->default_message_len = (EASY_IO_BUFFER_SIZE >> 9); // 8Kbyte
//往連接上讀寫的函數 c->read = easy_socket_read; c->write = easy_socket_write;
//連接上message的鏈表 easy_list_init(&c->message_list);
// 用於libeasy IO線程喚醒工作線程(對於OceanBase 0.4來說,就是ObPacketQueueThread)
// 當OceanBase同步寫數據給MySQL客戶端時,工作線程每發一個包就阻塞住,直到IO線程將其喚醒再發下一個包,session_list就存放這些工作線程正在處理的request easy_list_init(&c->session_list);
// 用於串連接到鏈表中 easy_list_init(&c->conn_list_node);
// 該連接上的輸出緩沖區鏈表,每個元素是一個easy_buf_t easy_list_init(&c->output);

3. 設置返回的socket fd為非阻塞

4.將回調函數集合easy_io_handler_pt, 調用回調函數on_connect(),在OB中,用於驗證用戶名密碼。

5.初始化該連接的read_watcher, write_watcher,和timeout_watcher,並且設置其回調函數分別為easy_connection_on_readable,easy_connection_on_writable和easy_connection_on_timeout_conn(注意,僅僅是初始化,還沒有激活)

6. 激活該連接的read_watcher(即讓該連接所屬的IO線程的event loop監聽這個連接的讀事件), 設置其回調函數為easy_connection_on_readable(這里有一些細節,例如如果設置了tcp_defer_accept參數,則如果連接上沒有數據,則該連接不會返回,IO線程阻塞住)

7. 將該連接加入了所屬IO線程的已連接鏈表中(connected_list)

至此連接建立了。

四、 同步處理

這里的同步是指libeasy的IO線程回調應用層的process函數后對這個包的業務邏輯處理即結束。不需要應用層的工作線程的參與。這種模式下,process函數應該直接返回EASY_OK. 以OceanBase 0.4的obmysql接口為例,當用戶輸入的包的大小大於2MB的時候,應用層process函數直接構造一個MySQL的error packet,掛在連接的輸出緩沖區鏈表中(r->retcode默認為EASY_OK)。隨后輸出緩沖區中的數據被寫出,請求結束。

 

下面就以從MySQL客戶端接收到一個大於2MB的包為例:

顯然每個連接上有可讀事件的時候都會回調easy_connection_on_readable函數:該函數流程如下:

1. 檢查當前IO線程同時正在處理的請求是否超過EASY_IOTH_DOING_REQ_CNT(8192),當前連接上的請求數是否超過EASY_CONN_DOING_REQ_CNT(1024),如果超過,則調用easy_connection_destroy(c)將連接銷毀掉, 提供了一種負載保護機制

2. 檢查上一次收到的message(easy_message_t)是不是完整的,即收到了一條或者多條(easy_request_t),一個easy_request_t相當於一個請求包,貫穿着請求的輸入,處理和輸出整個流程。如果收到的是一個完整的message(判斷message的狀態status, status == EASY_MESG_READ_AGAIN說明不完整),那么就調用easy_message_create函數創建一個8KB的easy_pool_t,然后在其上分配一個easy_message_t結構,再分配一個1KB大小的easy_buf_t作為輸入緩沖區將其掛在easy_message_t的input成員上。然后設置message的pool,並將其引用計數初始化為1,並將next_read_len設置為1KB

#define EASY_MESSAGE_SESSION_HEADER \
   easy_connection_t       *c;     \
   easy_pool_t             *pool;  \
   int8_t                  type;   \
   int8_t                  async;  \
   int8_t                  status; \
   int8_t                  error;
// 用於接收, 一個或多個easy_request_t
struct easy_message_t {
  EASY_MESSAGE_SESSION_HEADER
  int                     recycle_cnt;
  //該連接的輸入緩沖區
  easy_buf_t              *input;
  //用於將一個連接的所有的message串起來
  easy_list_t             message_list_node;
  easy_list_t             request_list;
  easy_list_t             all_list;
  //該message上request的個數
  int                     request_list_count;
  //下次需要讀取的數據的長度
  int                     next_read_len;
 
  void                    *user_data;
 };

3. 調用easy_buf_check_read_space檢查連接的輸入緩沖區input中是否有next_read_len個字節的空間,如果沒有,則繼續在message的pool上分配next_read_len個字節的空間,並且將原來的輸入緩沖區的數據拷貝過來,將這個next_read_len字節大小的緩沖區作為新的輸入緩沖區

4. 調用 c->read從連接讀數據,實際上調用的是函數easy_socket_read,該函數只是簡單的封裝了recv()。設置連接的成員read_eof,如果讀到的數據小於next_read_len,則將其設置為1,主要用於異常情況下關閉連接的情形

5. 更新連接的統計信息con_summary

6. 如果是作為服務器端,則調用easy_connection_do_request(m),否則調用 easy_connection_do_response(m),目前僅關注服務器端。

easy_connection_do_request(m)流程如下:

 

  // ipacket放進來的包, opacket及出去的包
  struct easy_request_t {          
      //所屬的message                                                                                                                       
      easy_message_session_t  *ms;
      easy_list_t             request_list_node;
      easy_list_t             all_node;

      int16_t                 retcode, status;
      int                     reserved;
      //請求開始時間
      ev_tstamp               start_time;
      void                    *ipacket;
      void                    *opacket;
      void                    *args;
      void                    *user_data;
      // waitobj,用於IO線程喚醒工作線程,封裝pthread_mutex_t和pthread_cond_t
      easy_client_wait_t      *client_wait;
  };

1. 回調應用層實現的decode函數,然后在message的pool上分配一個easy_request_t,再將decode出來的packet(對於OceanBase 0.4來說,就是ObMySQLCommandPacket)掛在request的ipacket上. 然后將該請求加入到message的request_list中

2. 修改一些統計信息

3. 設置message的status,當輸入緩沖區中還有空閑空間時,說明沒有收到一個完整的request,則將message的status設置為EASY_MESG_READ_AGAIN,這正是前面用來判斷是否接收到了一個完整的message的判斷標記

4. 調用easy_connection_process_request函數對剛才decode出來的所有的request進行處理

easy_connection_process_request是處理請求的函數,主要流程如下:

1. 對於每個request, 

   1.1 將這個request從所在的message中摘下來(request_list_node)

   1.2. 回調應用層傳入的process:

        process函數返回EASY_OK, 則說明應用已經處理完了這個請求了(這種情況出現在當OceanBase 0.4接收到了一個大於2MB的包的時候),接着調用easy_connection_request_done函數。

        easy_connection_request_done函數處理:

          1.2.1 回調應用層定義的encode,對於OceanBase 0.4來說,比如在create  table這種回復只需要回復一個包的情形來說,就是將封裝了MySQL回復包的easy_buf_t(r->opacket)給掛在該請求所在連接的輸出緩沖區鏈表(c->output)后面。

          1.2.2 調用easy_request_set_cleanup(r, &c->output), 將message的引用計數加1,以防結果還沒有輸出message就被析構了。然后設置一個cleanup(實際上是函數easy_request_cleanup)函數掛在輸出緩沖區鏈表上的easy_buf_t(也就是剛剛掛上去的那個buf)上,當buf被寫出去后不再使用的時候,會回調cleanup。

          1.2.3 將request的status設置成EASY_REQUEST_DONE

   1.3. 當連接的輸出緩沖區鏈表中積累了128個沒發出去,或者不使用tcp nagle算法的時候,則調用easy_connection_write_socket(一次寫不完會再次啟寫事件)主動往連接寫一次數據

   1.4. 檢查request所在的message上是否還有request,如果沒有請求需要處理並且沒有接收到了一半的請求,則調用easy_message_destroy(m, 1),將這個message從m所屬的連接上的message鏈表中摘除,把m的status設置為EASY_MESG_DESTROY,最后將message的引用計數減1.如果message的引用計數變成了0,則將和message關聯的easy_pool_t整個給釋放掉

2. 調用easy_connection_write_socket將連接的輸出緩沖區鏈表中的數據寫出,這個函數調用c->write(實際調用easy_socket_write)將數據寫出,easy_socket_write使用writev系統調用將數據寫出,同時,對於已經寫出去的easy_buf_t,調用easy_buf_destroy函數,它會回調buf的cleanup函數,如前所屬,實際上是easy_request_cleanup函數.easy_request_cleanup函數做了如下幾件事:

         2.1 調用easy_request_server_done,修改一些統計信息,並且回調應用層定義的cleanup函數。對於我們這種同步處理的場景來說,什么都沒做。

         2.2 調用easy_message_destroy((easy_message_t *)r->ms, 0),將message的引用計數減1,到此message的引用計數變為0,將這個message的easy_pool_t銷毀,從而內存被釋放。

 

請求結束。

五、異步處理

 異步處理流程:IO線程將數據接下來后,調用應用層定義的process方法,應用層process方法不返回EASY_OK,返回EASY_AGAIN,將packet丟到工作線程隊列,同時IO線程將這個異步請求丟到連接的session鏈表中,並且啟動寫(start write watcher)。隨后,應用層的工作線程從工作隊列里面拿出packet,進行處理,檢索出結果包,將其掛在r->opacket上,然后調用easy_request_wakeup(req), 將請求掛在IO線程的已完成的請求鏈表中,並且喚醒IO線程,隨后工作線程阻塞在一個信號量上。IO線程被喚醒后,將所有的輸出buf都寫出,然后再次回調process函數,process函數看到請求的retcode等於EASY_OK,則signal信號量,從而工作線程被喚醒。

典型的,select請求這種需要回復多個包給MySQL客戶端的場景使用的都是這種模式

其它諸如只需要回復一個包給MySQL客戶端的DML操作,例如INSERT,UPDATE等也使用這種模式,只是工作線程由於只需要發一個包,所以不會阻塞在信號量上,直接就返回了

 詳細分析如下:

message的引用計數初始化為1

前面讀取輸入,decode過程都類似,不同的是easy_connection_process_request函數:

對於這種需要工作線程處理的request, 

   1.1 將這個request從所在的message中摘下來(request_list_node)

   1.2 回調應用層定義的process,process將包放入到工作隊列,然后process(message引用計數加1)返回EASY_AGAIN。

隨后,工作線程從工作隊列中拿到了請求包,處理,將結果包封裝成一個easy_buf_t,掛在請求r的opacket上,隨后調用easy_request_wakeup(r)將請求r掛在IO線程的已完成請求鏈表,並且喚醒IO線程,然后調用wait_client_obj(*wait_obj),wait在easy_client_wait_t的一個信號量上。

IO線程被喚醒:首先會做一些其他的事情(比如從自己的conn_list隊列中取出從其它的IO線程遷移過來的連接,然后監聽這個連接原有的讀寫事件后將其加到本IO線程的已連接鏈表中(connected_list))。然后調用easy_connection_send_response。

easy_connection_send_response

遍歷IO線程的已完成請求鏈表:將請求所在message的引用計數減1(此時引用計數為1),然后執行easy_connection_request_done

easy_connection_request_done:

和前面一樣:回調應用層定義的encode方法將r->opacket掛在連接的輸出緩沖區鏈表output中,然后調easy_request_set_cleanup,給剛才加入的輸出buf設置cleanup回調函數,並且將message的引用計數加1(此時引用計數為2),由於此時請求r的retcode等於EASY_AGAIN,所以和同步處理不同的是,這里會將該請求加入到該連接的session_list鏈表中,然后激活該連接的寫(ev_io_start(c->loop, &c->write_watcher))。函數結束。

隨后調用easy_connection_write_socket,繼而調用c->write(實際是easy_socket_write)將數據寫入連接,同時回調buf的cleanup,message引用計數減1(此時引用計數等於1)由於fd可寫,隨后下次event loop會直接返回,回調

該連接write_watcher的回調函數easy_connection_on_writable方法:

1. 調用easy_connection_write_socket繼續寫數據,如果沒有數據可寫,將write_watcher停掉

2. 將連接中的session_list拿出來,對於每個請求調用應用層process方法,這是由於請求r的返回值retcode是EASY_AGAIN,所以在proces中,會調用easy_client_wait_wakeup_request(message引用計數加1,此時引用計數為2),從而signal信號量,將工作線程喚醒。這是工作線程發第一個包的情形,隨后發第二個包,第三個包……如此反復。

當發最后一個包時,message的引用計數為2,process函數返回EASY_OK,將最后一個包掛在請求r的opacket上,然后執行easy_request_wakeup(r)將請求掛在IO線程的已完成鏈表中,然后喚醒IO線程,IO線程執行回調函數easy_connection_on_wakeup,在函數最后調用easy_connection_send_response處理這個IO線程所有已完成請求,處理流程如下:

對於每個請求,在這里只關注request的最后一個回復包:

1. 將請求r所在的message的引用計數減1(此時引用計數為1)

2. 執行easy_connection_request_done,將請求r的opacket掛在連接的輸出緩沖區鏈表output上。

3. 調用easy_request_set_cleanup,設置buf的cleanup,並且將message引用計數加1(此時引用計數為2)

4. 由於請求r的返回碼retcode為EASY_OK,則更新一些統計信息,將r的status狀態值為EASY_REQUEST_DONE

5. 判斷是否message上還有請求,這里已經沒有了,調用easy_message_destroy(m, 1),將message從連接的message鏈表中刪除,並將message的引用計數減1,,此時message的引用計數為1.

6. 調用easy_connection_write_socket將數據寫出,然后調用easy_buf_destroy(buf),從而會調buf的cleanup函數easy_request_cleanup,將message的引用計數減1,此時引用計數變成0,將message的pool銷毀,從而內存釋放。

 六、資源管理

可以看出,libeasy管理內存是以連接為單位的,更具體說,是基於message的。當新建一個連接的時候,會分配1+N個內存池,N指連接上message的個數。

1:用來存儲連接本身的元信息(easy_connection_t)

N:每個完整的message會分配一個對應的內存池,這個內存池用來存儲請求元信息(easy_request_t),這個message的輸入緩沖區,輸出緩沖區的內存可以應用層進行分配,也可以由這個內存池進行分配。

message的銷毀和連接的銷毀都是基於引用計數,當引用計數為0時,相應的內存池被銷毀。

message的引用計數加1的場景

1. 將結果包掛在連接的輸出緩沖區時引用計數加1,因為結果包的元信息easy_buf_t是在message的內存池中分配的,結果沒有返回,內存池不能銷毀。

2. 異步處理模式下process時引用計數也要加1,因為請求顯然還沒有處理完成,而請求也是在message的內存池中分配的,內存池不能銷毀。

3. 工作線程被IO線程喚醒,工作線程繼續處理,相當於請求還沒有結束,理由同2,message的引用計數繼續+1,內存池不能被銷毀

message的引用計數減1的場景:

1. 當結果buf被發出去后,調用cleanup函數將message的引用計數減1,與上面的1對應。

2. 當IO線程被喚醒后,處理被工作線程標記為已完成(easy_request_wakeup)的請求(easy_connection_send_response函數中)時,將message的引用計數減1,與上面的2對應。

3. IO線程處理完請求,確認是否message上面的請求是否都已經處理完成(ret == EASY_OK && m->request_list_count == 0 && m->status != EASY_MESG_READ_AGAIN),如果處理完成則將message的引用計數減1。需要注意的是,message的引用計數初始狀態為1。這次的引用計數減1正是對應這個情況。

相對與message的引用計數,connection的引用計數簡單很多,因為本身connection對應於的pool只存儲connection相關的元數據。

connection的引用計數加1的場景:

1. 應用層的process函數中將連接的引用計數加1。

2. 當IO線程通過easy_client_wait_wakeup_request 喚醒工作線程時,將連接的引用計數加1。

connection的引用計數減1的場景:

1. 每次IO線程調用easy_connection_send_response給所有的已完成請求發應答后,將連接的引用計數減1

connection的銷毀:

每次對連接進行讀寫,都會更新其last_time,如果連接的引用計數大於0的時候(異常情況下)執行了easy_connection_destroy(),則會將連接的讀寫watcher關掉,但是連接沒有關掉,導致OS實際上還在接包,但是libeasy沒有對其進行處理,導致超時,在這種情況下,會起一個周期性的timeout watcher,每0.5秒檢查一下:對於狀態不等於EASY_CONN_OK的連接,判斷現在距離c->last_time是否超過了時間force_destroy_second,如果超過了,則將連接的引用計數強行置為0,隨后close連接並且銷毀。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM