- 項目概述
Accelio是一套支持rdma協議的通訊框架,並且允許擴展包含client和server,同時支持用戶態和內核態。
-
功能
Simplified API for application developers
High-performance asynchronous APIs
Reliable message delivery (end to end)
Request/Replay or Send/Receive models
Connection and resource abstraction to maximize scalability and availability
Native support for service and storage clustering/scale-out
Maximize multi-threaded application performance allowing dedicated hardware resources per thread
Zero copy data delivery, with optional built-in memory management
Support for multiple transport technologies (RDMA, TCP, Shared-Memory etc.)**
Integration with common event loop mechanisms
Fast event notifications or combined models for lowest message latency
Message combining and batch message processing optimization
User space and kernel implementations
Java bindings**
-
結構
划分為三層
Application Interface
Easy-to-use primitives for fast and reliable asynchronous message queue or RPC (Remote Procedure Call)
Session and Connection Management
Provides reliable end-to-end connectivity to peer endpoints, with dynamic connection establishment, pooling, fault recovery, and migration/redirection
Pluggable Transport Layer
Allows mapping to different hardware or software transport implementations
-
主要數據結構
xio_transport
協議類型,目前只有rdma,包含協議名和該協議相關的所有操作函數 ,rdma模塊中定義了一個xio_rdma_transport全局變量,實現了xio_transport中所有函數,同時還有一個struct xio_rdma_transport結構,該類型是對底層驅動接口的封裝,同時包含了上下文,每個conn對象都包含一個單獨的,該結構實際應該是和xio_transport_base在同一層次
struct xio_transport {
const char *name;
struct xio_transport_msg_validators_cls validators_cls;
/* transport ctor/dtor called right after registration */
void (*ctor)(void);
void (*dtor)(void);
/* transport initialization */
int (*init)(struct xio_transport *self);
void (*release)(struct xio_transport *self);
/* running thread (context) is going down */
int (*context_shutdown)(struct xio_transport_base *trans_hndl,
struct xio_context *ctx);
/* observers */
void (*reg_observer)(struct xio_transport_base *trans_hndl,
struct xio_observer *observer);
void (*unreg_observer)(struct xio_transport_base *trans_hndl,
struct xio_observer *observer);
/* task pools managment */
void (*get_pools_setup_ops)(struct xio_transport_base *trans_hndl,
struct xio_tasks_pool_ops **initial_pool_ops,
struct xio_tasks_pool_ops **primary_pool_ops);
void (*set_pools_cls)(struct xio_transport_base *trans_hndl,
struct xio_tasks_pool_cls *initial_pool_cls,
struct xio_tasks_pool_cls *primary_pool_cls);
/* connection */
struct xio_transport_base *(*open)(struct xio_transport *self,
struct xio_context *ctx,
struct xio_observer *observer);
int (*connect)(struct xio_transport_base *trans_hndl,
const char *portal_uri,
const char *out_if);
int (*listen)(struct xio_transport_base *trans_hndl,
const char *portal_uri, uint16_t *src_port,
int backlog);
int (*accept)(struct xio_transport_base *trans_hndl);
int (*poll)(struct xio_transport_base *trans_hndl,
long min_nr, long nr,
struct timespec *timeout);
int (*reject)(struct xio_transport_base *trans_hndl);
void (*close)(struct xio_transport_base *trans_hndl);
int (*send)(struct xio_transport_base *trans_hndl,
struct xio_task *task);
int (*set_opt)(void *xio_obj,
int optname, const void *optval, int optlen);
int (*get_opt)(void *xio_obj,
int optname, void *optval, int *optlen);
int (*cancel_req)(struct xio_transport_base *trans_hndl,
struct xio_msg *req, uint64_t stag,
void *ulp_msg, size_t ulp_msg_len);
int (*cancel_rsp)(struct xio_transport_base *trans_hndl,
struct xio_task *task, enum xio_status result,
void *ulp_msg, size_t ulp_msg_len);
struct list_head transports_list_entry;
};
xio_transport_base
底層通信的上下文,每個conn對應一個
struct xio_transport_base {
struct xio_context *ctx;
struct xio_observable observable;
uint32_t is_client; /* client or server */
atomic_t refcnt;
char *portal_uri;
struct sockaddr_storage peer_addr;
enum xio_proto proto;
int pad;
};
xio_conn
對transport_base的簡單封裝,管理task緩存,處理框架內部定義的一些消息,握手消息之類
struct xio_conn {
struct xio_transport *transport;
struct xio_transport_base *transport_hndl;
struct xio_tasks_pool *primary_tasks_pool;
struct xio_tasks_pool_ops *primary_pool_ops;
struct xio_tasks_pool *initial_tasks_pool;
struct xio_tasks_pool_ops *initial_pool_ops;
struct xio_observer *server_observer;
struct xio_observer trans_observer;
struct xio_observer ctx_observer;
struct xio_observable observable;
struct kref kref;
int cid;
enum xio_conn_state state;
int is_first_req;
int is_listener;
int pad;
xio_ctx_timer_handle_t close_time_hndl;
struct list_head observers_htbl;
HT_ENTRY(xio_conn, xio_key_int32) conns_htbl;
};
xio_connection
對xio_conn的封裝,管理msg緩存,對外提供訪問接口
struct xio_connection {
struct xio_conn *conn;
struct xio_session *session;
struct xio_context *ctx; /* connection context */
struct xio_session_ops ses_ops;
/* server's session may have multiple connections each has
* private data assignd by bind
*/
void *cb_user_context;
int conn_idx;
int state;
int32_t send_req_toggle; // 選擇發送請求還是回應
int pad;
struct kref kref;
struct kref fin_kref;
struct xio_msg_list reqs_msgq; // 待發送的請求隊列
struct xio_msg_list rsps_msgq; // 待發送的回應隊列
struct xio_msg_list in_flight_reqs_msgq; // 已經發送的請求隊列
struct xio_msg_list in_flight_rsps_msgq; // 已經發送的回應隊列
struct xio_msg_list one_way_msg_pool;
struct xio_msg *msg_array;
struct list_head io_tasks_list;
struct list_head post_io_tasks_list;
struct list_head pre_send_list;
struct list_head connections_list_entry;
struct list_head ctx_list_entry;
};
xio_session
網絡session,一個addr對應一個session,每個session管理多個xio_connection,但是每個session中的多個connection不能由同一個context管理,多線程的使用模式是,由一個主線程創建session,每個工作線程創建context,並建立連接(Client端)
struct xio_session {
uint64_t trans_sn; /* transaction sn */
uint32_t session_id;
uint32_t peer_session_id;
uint32_t session_flags;
uint32_t connections_nr;
struct list_head sessions_list_entry;
struct list_head connections_list;
HT_ENTRY(xio_session, xio_key_int32) sessions_htbl;
struct xio_session_ops ses_ops;
struct xio_transport_msg_validators_cls *validators_cls;
struct xio_msg *setup_req;
struct xio_observer observer;
enum xio_session_type type;
volatile enum xio_session_state state;
struct xio_new_session_rsp new_ses_rsp;
char *uri;
char **portals_array;
char **services_array;
void *user_context;
void *cb_user_context;
uint16_t user_context_len;
uint16_t uri_len;
uint16_t portals_array_len;
uint16_t services_array_len;
uint16_t last_opened_portal;
uint16_t last_opened_service;
uint16_t in_notify;
uint16_t pad[3];
uint32_t reject_reason;
struct mutex lock; /* lock open connection */
spinlock_t connections_list_lock;
int disable_teardown;
struct xio_connection *lead_connection;
struct xio_connection *redir_connection;
};
xio_server
服務端,xio_bind調用時返回,處理session未建立前的一些事件
struct xio_server {
struct xio_conn *listener;
struct xio_observer observer;
char *uri;
struct xio_context *ctx;
struct xio_session_ops ops;
uint32_t session_flags;
uint32_t pad;
void *cb_private_data;
};
xio_context
主線程循環的上下文,
struct xio_context {
void *ev_loop;
int cpuid;
int nodeid;
int polling_timeout;
unsigned int flags;
uint64_t worker;
struct xio_statistics stats;
struct xio_context_params params;
struct xio_schedwork *sched_work;
struct list_head ctx_list; /* per context storage */ 用來記錄該ctx上的xio_connection
/* list of sessions using this connection */
struct xio_observable observable;
void *netlink_sock;
};
xio_msg
用戶需要發送的內容
struct xio_msg {
struct xio_vmsg in;
struct xio_vmsg out;
union {
uint64_t sn; /* unique message serial number
* returned by the library
*/
struct xio_msg *request; /* on server side - attached
* request
*/
};
enum xio_msg_type type;
int more_in_batch; /* more messages are expected */
int status;
int flags;
enum xio_receipt_result receipt_res;
uint64_t timestamp; /**< submission timestamp */
int reserved;
void *user_context; /* for user usage - not sent */
struct xio_msg_pdata pdata; /**< accelio private data */
struct xio_msg *next; /* internal use */
};
xio_task
打包msg,加入消息頭
struct xio_task {
struct list_head tasks_list_entry;
void *dd_data; // 傳輸協議擴展,記錄協議內所用到的數據,例如rdma_task
struct xio_mbuf mbuf; // 由各協議自己初始化,使用pool_init_item
struct xio_task *sender_task; /* client only on receiver */
struct xio_msg *omsg; /* pointer from user */
struct xio_session *session;
struct xio_conn *conn;
struct xio_connection *connection;
void *pool;
release_task_fn release;
enum xio_task_state state; /* task state enum */
struct kref kref;
uint64_t magic;
uint64_t stag; /* session unique tag */
uint32_t is_control;
uint32_t tlv_type;
uint32_t ltid; /* local task id */
uint32_t rtid; /* remote task id */
uint32_t omsg_flags;
uint32_t pad;
struct xio_msg imsg; /* message to the user */
};
Xio_rdma_task
struct xio_rdma_task {
struct xio_rdma_transport *rdma_hndl;
enum xio_ib_op_code ib_op;
u16 more_in_batch;
u16 sn;
u16 phantom_idx;
u16 pad[3];
//struct xio_data_buffer sdb;
/* The buffer mapped with the 3 xio_work_req
* used to transfer the headers
*/
void *buf;
unsigned long size;
struct xio_work_req txd;
struct xio_work_req rxd;
struct xio_work_req rdmad;
/* User (from vmsg) or pool buffer used for */
u32 read_num_sge;
u32 write_num_sge;
u32 recv_num_sge;
struct xio_rdma_mem_desc read_sge;
struct xio_rdma_mem_desc write_sge;
/* What this side got from the peer for RDMA R/W
* Currently limitd to 1
*/
u32 req_write_num_sge;
u32 req_read_num_sge;
u32 req_recv_num_sge;
struct xio_sge req_read_sge[XIO_MAX_IOV];
struct xio_sge req_write_sge[XIO_MAX_IOV];
/* What this side got from the peer for SEND
*/
struct xio_sge req_recv_sge[XIO_MAX_IOV];
};
Req_*_sge說明,用來記錄對端或者自己這邊的使用的存放數據的地址
如果是要發送寫請求,這種場景發生在回應response時,在收到req時,就將rdma_task中的req_read_sge設置為對端的存放數據的內存地址,那么req_write_sge就為本地存放用戶寫入數據的內存地址。
如果為讀請求,之前對端發送過來的req中必然攜帶了對方存放數據的內存地址,req_write_sge,本地的用來存放讀取過來數據的地址為req_read_sge。這時候對端發送過來的req_read_sge是沒用的,會在解析頭部的時候重置為NULL。
Read_sge和write_seq是用來存放數據的內存控制描述符。真正的內存buf控制結構。Req_*_sge只是記錄了內存地址。
xio_tasks_pool
struct xio_tasks_pool {
/* pool of tasks */
struct xio_task **array;
/* LIFO */
struct list_head stack;
/* max number of elements */
int max;
int nr;
void *dd_data; // 底層協議擴展,例如在rdma協議時,指向rdma_task_pool
void *pool_ops; // 底層協議擴展,注冊創建(初始化),回收底層pool,以及初始化pool中每個成員等函數
};
xio_observer
事件觀察者,
struct xio_observer {
void *impl;
notify_fn_t notify;
};
xio_observable
被觀察者
struct xio_observable {
void *impl;
struct list_head observers_list;
struct xio_observer_node *observer_node; /* for one observer */
};
xio_work_req
struct xio_work_req {
union {
struct ib_send_wr send_wr;
struct ib_recv_wr recv_wr;
};
struct ib_sge sge[XIO_MAX_IOV + 1];
struct scatterlist sgl[XIO_MAX_IOV + 1];
int nents; /* number of sgl entries */
int mapped; /* number of mapped entries */
};
底層ib通訊時使用的參數的封裝,ib_sge用來設置send_wr或者recv_wr的sge通過ib接口映射過的地址,sgl原始地址
rxd->recv_wr.sg_list = rxd->sge;
sg_set_page(rxd->sgl, virt_to_page(buf), size, offset_in_page(buf));
xd->sge[i].addr = ib_sg_dma_address(ib_dev, &xd->sgl[i]);
xio_map_work_req
-
機制
事件處理
Xio框架負責處理主線程的調度,在調用xio_context_create時,初始化主處理線程,並設置添加事件的回調接口add_event,線程啟動接口run,以及停止接口stop
struct xio_loop_ops {
void *ev_loop;
int (*run)(void *loop);
void (*stop)(void *loop);
int (*add_event)(void *loop, struct xio_ev_data *data);
};
struct xio_context *xio_context_create(unsigned int flags,
struct xio_loop_ops *loop_ops,
struct task_struct *worker,
int polling_timeout,
int cpu_hint);
底層協議層,如果有網絡事件產生,通過add_event接口把要處理的事件和處理函數加入到主處理線程(也可以通過定時器加入事件),add_event會將事件加入隊列中,並喚醒正在等待的主線程,主線程逐一調用事件處理函數
Rdma在初始化cq時,xio_cq_init會設置回調函數xio_cq_data_callback,當rdma事件產生時把事件處理函數xio_data_handler通過add_event加入主處理線程,xio_data_handler會依次檢查每個連接
void xio_data_handler(void *user_context)
{
struct xio_cq *tcq = (struct xio_cq *) user_context;
struct xio_rdma_transport *rdma_hndl;
xio_cq_event_handler(tcq, tcq->ctx->polling_timeout);
list_for_each_entry(rdma_hndl, &tcq->trans_list, trans_list_entry) {
xio_rdma_idle_handler(rdma_hndl);
}
return;
}
觀察者
當被觀察者上發生事件時,會通知觀察者,觀察者如果對某一對象上面的一些事件感興趣,需要注冊自己到該對象上的觀察者列表中。例如:
xio_context_reg_observer:觀察者注冊自己到ctx中,觀察者會收到ctx事件的通知
xio_conn_reg_observer:xio_connection_set_conn函數會在執行時把session注冊到conn中,而client端,會在調用xio_conn_open時把session注冊到conn中(server端同樣會調用這個函數,但是傳遞的observer是空)
tcq和transprot也會關注context的事件
client端,在執行xio_conn_open時,把conn注冊到transport中
server端,會在執行xio_conn_create,時注冊conn到transport中
並發和多線程
一個session只能有一個URI,每個session管理多個xio_connection,但是每個session中的多個connection不能由同一個context管理,多線程的使用模式是,由一個主線程創建session,每個工作線程創建context,並建立連接(Client端),同一個ctx中,URI相同的不同session可以在同一個工作線程中,此時這些session的xio_connection公用同一個xio_conn,每個context中,一個地址只能對應一個xio_conn,同一個session中,每個xio_connection對應一個xio_conn
Xio_on_setup_rsp_recv里面的兩個分支代表兩個場景,portals_array為空,但是session->connections_nr > 1的場景是:多個線程同時對一個session建立連接,這時候已經產生了多個xio_connection,但是只生成了一個xio_conn去發送建立session的信息,其他的xio_connection在session建立之后再進行連接,另外一個場景,portals_array不為空,對端有多個網絡線程工作線程,一個監聽線程,但session建立之后,對端又指定了多個地址,這個時候就需要重新創建xio_connection,並建立連接。
內存管理
Accelio使用兩類buf,一種用於消息頭部或者小數據量,每個大小為4-8k,另外一類用於rdma的大數據io,第二類buf,可以由庫或者程序自己申請。
l 程序管理的buf
需要提前通過reg_mr接口注冊
l 庫管理的buf
要使用程序自己申請的內存,需要傳入mr信息,如果沒傳入,那么會把傳入的內存拷貝到庫維護的緩存中,多一次內存拷貝。
消息合並和批處理
消息的批量發送,可以發送消息時,遍歷待發送隊列,把send_wr用next指針連接起來,使用ibv_post_send接口批量發送消息,ibv_poll_cq接口,批量獲取收到的請求
Has_more標記,在幾個地方會使用,在底層有事件發生時,如果has_more標記為0,那么表示當前網絡空閑,可以把待發送的消息發送給對方,還有就是在rdma_send_req和rdma_send_rsp中,作用都是相同的。
On_msg接口中,會帶有has_more_batch參數,來標記是否還有其他消息需要處理,應用程序可以根據這個參數來做一些合並工作。
訪問接口
幾個重要標記
l XIO_IB_RDMA_READ的使用場景,當請求方的數據大小,超過了設置的緩存大小時,請求方發送IBV_WR_SEND請求,並設置ib_op類型為XIO_IB_RDMA_READ。接收方在收到該請求后,發送IBV_WR_RDMA_READ請求給發送方,並設置ib_op為XIO_IB_RDMA_READ,直接讀取遠處內存中的數據,不在處理recv消息。
l XIO_IB_RDMA_WRITE的使用場景,當接收方處理完成發送回應時,如果需要回應的數據大小超過了設置的緩存大小時,接收方發送IB_WR_RDMA_WRITE請求,並設置ib_op類型為XIO_IB_RDMA_WRITE
l XIO_IB_RECV的使用場景,接收方用來接收請求的task都設置該標記,有一種使用場景檢查了這個標記,當接收方收到了一個XIO_IB_RDMA_READ請求之后,發送請求去發送方讀取數據,這時會重新生成一個tmp_task,它的ib_op為XIO_IB_RDMA_READ,與原始的task請求時不同的,這時候需要把接收方收到的原始task加入rdma_rd_list中,等待讀取數據完成。接收方在發送task請求時,根據這個標記位來進去判斷哪些讀請求時要發送給發送方的。哪些只是等待讀取完成。
l XIO_IB_SEND的使用場景,發送方的task都設置該標記
另外需要說明的是,為了保證消息的到達順序,接收方在等待接收消息時,也就是rdma_rd_list非空時,是不處理收到的POST消息的,否則就可能出現后POST的消息,先處理,而之前的消息,還在讀取過程中。
回調函數
l on_msg_send_complete
該函數由請求的接收方調用,當發送回應完成時調用該回調函數,這里說的發送完成是指ib層認為數據發送完畢,不涵蓋其他邏輯。
l on_msg_delivered
該函數由請求的發送方調用,當發送方收到了接收方的回應時,調用該函數,接收方在收到請求時,會調用xio_connection_send_read_receipt,發送回應,one_way_msg必須設置該標記,request可選擇是否設置,通過檢查task狀態是否為XIO_TASK_STATE_DELIVERED確保只發送一次確認消息
xio_on_req_recv-- connection->ses_ops.on_msg-- xio_connection_send_read_receipt
接收方處理完成之后發送receipt
l on_msg
在收到消息時
Request/reply
發送方
l xio_send_request
檢查session狀態
檢查參數合法性
統計數據
獲取序列號xio_session_get_sn
把消息加入待發送隊列中,xio_msg_list_insert_tail
連接在線,則執行xio_connection_xmit
l xio_connection_xmit
根據標記選擇要發送的隊列
遍歷隊列,逐一發送每個msg xio_connection_send
發送失敗,重試或者返回錯誤
發送成功,把消息加入已發送隊列中
l xio_connection_send
a) 發送的是response,並且消息的標記僅設置XIO_MSG_RSP_FLAG_FIRST轉下一步,否則轉d
b) 從primary_task_pool中申請一個xio_task
c) 從該response對應的請求msg中獲取請求task,並設置xio_task的sender_task成員,轉f
d) 發送的是request,轉下一步,否則轉h
e) 從primary_task_pool中申請一個xio_task
f) 設置序列號
g) 把新申請的xio_task加入pre_send_list中,轉i
h) 把原始task加入pre_send_list中
i) 修改消息頭,設置序列號,task類型
j) 重置task的mbuf在xio_rdma_initial_pool_init_task生成task的時候從kmem_cache中分配一段內存,后面用該內存初始化mbuf
k) 把頭部寫入mbuf
l) xio_conn_send
m) xio_rdma_send
l xio_rdma_send_req
如果已經發送的請求數超過了最大限制,返回EAGIN的錯誤
准備接受回應的緩沖區,xio_rdma_prep_req_in_data
准備發送請求的緩存沖,之前的緩沖區只填充了消息頭,沒有寫入msg內容,xio_rdma_prep_req_out_data
修改buf長度xio_mbuf_tlv_payload_len
最好要完整填充txd->send_wr
最后把task加入底層傳輸的待發送隊列中rdma_hndl->tx_ready_list
判斷待發送列大小,超過的警戒值,則批量發送xio_rdma_xmit
l xio_rdma_prep_req_out_data
a) 判斷要發送的數據量是否超過了發送緩存的大小,沒有超過轉下一步,超過轉d
b) xio_rdma_prep_req_header 寫入頭部到mbuf中,用戶寫入的數據非空轉下一步,否則轉h
c) xio_rdma_write_send_data,用傳入的buf地址設置sge[1]起始的數組,或者沒有mr則把用戶數據拷貝到mbuf中,轉g
d) 超過緩存大小,rdma_task->ib_op = XIO_IB_RDMA_READ;由對方來主動讀取數據
e) 檢查用戶是否設置mr,設置轉下一步,沒有設置轉g
f) 逐一設置addr,rdma_task->write_sge[i].addr = vmsg->data_iov[i].iov_base;轉h
g) 用戶沒有設置mr,需要逐一為write_sge分配內存,並拷貝數據
h) 返回
之前在初始化txd時,通過sg_set_page(rxd->sgl, virt_to_page(buf), size, offset_in_page(buf)),把buf和sgl做過映射,后面再執行xio_rdma_xmit時,會用sgl數組元素的addr來設置sge數組元素的addr,如果用戶寫入的數據直接設置了sge,那么sgl數組應該只有元素0存放消息頭部
l xio_rdma_xmit
接收方
l xio_handle_wc
xio_rdma_rx_handler
xio_rdma_on_recv_req
xio_rdma_notify_observer(xio_rdma_datapath.c)
xio_on_new_message(xio_conn.c)
xio_conn_on_recv_req(xio_conn.c)
xio_observable_notify_any_observer
參數說明
Req.in.header或者req.in.iov[i].iov_base如果設置為NULL,那么當收到消息時,iov_base會設置為task->mbuf的地址,參看函數xio_rdma_on_recv_req
Send/recv
流程和上面的類似,除了沒有response,接收方收到消息后,會觸發一個回調,可選的確認通知
連接管理
狀態變遷
Session狀態變遷
XIO_SESSION_STATE_INIT 初始創建
XIO_SESSION_STATE_CONNECT 嘗試建立連接(調用xio_connect接口之后,立即設置)
XIO_SESSION_STATE_ONLINE (該session上的所有connection都已經建立連接)
XIO_SESSION_STATE_ACCEPTED (該session上的部分connection已經建立連接)
處於INIT狀態時,client調用xio_connect,會創建xio_connection,同時也會創建xio_conn
Xio_connect調用之后,狀態變為CONNECT,此時有另外的線程再調用xio_connect,只會創建xio_connection,不會創建xio_conn,
enum xio_transport_state {
XIO_STATE_INIT,
XIO_STATE_CONNECTED,
XIO_STATE_DISCONNECTED,
XIO_STATE_CLOSED,
XIO_STATE_DESTROYED,
};
連接建立
l Xio_conn建立
Client端:
1.xio_on_connection_established -- xio_conn_initial_pool_setup
xio_conn_send_setup_req
2.xio_conn_on_recv_setup_rsp -- xio_conn_primary_pool_setup
3.xio_on_client_conn_established
如果SESSION狀態不為ACCEPTED或ONLINE轉到下面的session建立,否則轉xio_connection建立
Server端:
1.xio_on_new_connection—xio_conn_create(listener創建新連接)-- xio_conn_initial_pool_setup
xio_on_new_conn--xio_conn_accept(accept新創建的連接)
2.xio_conn_on_recv_setup_req-- xio_conn_primary_pool_setup
xio_conn_write_setup_rsp
3.xio_conn_on_send_setup_rsp_comp
4.xio_on_server_conn_established
轉到下面的session建立
l Xio_connection建立
Client端:
xio_connection_send_hello_req
Server端:
xio_on_new_message
xio_session_alloc_connection
xio_session_assign_conn(xio_conn.c)
xio_on_connection_hello_req_recv(xio_session)
xio_connection_send_hello_rsp(xio_session.c)
xio_session_notify_new_connection(xio_session.c)
l Xio_session建立:
Client端:
xio_session_write_setup_req
xio_conn_on_recv_rsp(xio_conn.c)
xio_on_setup_rsp_recv(xio_session.c)
session->state = XIO_SESSION_STATE_ACCEPTED;或者XIO_SESSION_STATE_ONLINE
Server端:
xio_conn_on_recv_req(xio_conn.c)
xio_on_new_message(xio_server.c)
xio_session_init(xio_server.c)
xio_session_alloc_connection(xio_server.c)
xio_session_assign_conn(xio_server.c)
xio_on_setup_req_recv(xio_session_server.c)
on_new_session或者(xio_accept)(xio_session_server.c)
xio_session_write_accept_rsp(XIO_SESSION_SETUP_RSP)
xio_on_setup_rsp_send_comp
xio_session_notify_new_connection(xio_session.c)
疑問:
xio_conn_connect 中為什么要判斷XIO_CONN_STATE_CONNECTED這個狀態
同一個ctx中,不同session的xio_connection公用同一個xio_conn,每個ctx中,一個地址只能對應一個xio_conn
連接銷毀
l 檢測掉線
a) Transport層有事件產生,通知觀察者
b) xio_on_connection_disconnected(xio_conn.c)
c) 如果xio_conn沒有注冊任何觀察者,轉i,否則轉c
d) xio_on_conn_event_client(xio_on_conn_event_server)(xio_session_client.c)
e) xio_on_conn_disconnected(xio_session.c)
f) 還存在xio_connection和xio_conn關聯,xio_session_notify_connection_disconnected,狀態發生變換,通知session做相應處理,不存和xio_connection關聯的xio_conn轉f
g) xio_session_disconnect-- xio_session_notify_connection_teardown-- session->ses_ops.on_session_event --xio_connection_destroy,執行xio_conn_close,轉h,xio_connection_close,並判斷是否需要銷毀session,xio_session_notify_teardown-- xio_session_destroy-- xio_session_post_teardown
h) 沒有xio_connection和xio_conn關聯(listener的情況),直接調用xio_conn_close關閉連接,計算引用數,不為1時轉o
i) xio_conn_release
j) transport->close
k) xio_rdma_close
l) xio_rdma_notify_observer
m) xio_on_conn_closed
n) kfree(conn)
o) Session是客戶端,針對session中的其他xio_connection逐一調用xio_session_notify_connection_disconnected
l 主動關閉一
a) xio_disconnect-- xio_send_fin_req
xio_session_notify_connection_closed
b) xio_on_fin_rsp_recv-- xio_connection_fin_put-- xio_fin_complete-- xio_session_disconnect-- xio_session_notify_connection_teardown,其他部分同上
l 主動關閉二
c) xio_connection_destroy-- xio_connection_flush_tasks
xio_conn_close
xio_connection_close
l 銷毀session
a) 釋放資源,
說明,只釋放資源,因此需要等連接都已經關閉之后才能調用,
Rdma_hndl的釋放,主動關閉時,調用xio_conn_close最終會調用xio_rdma_close,這個函數會根據當前conn的狀態進行處理,如果是connected狀態則調用disconnect函數,其他情況,這里還有init狀態和disconnected狀態,則通過notify調用xio_on_conn_closed釋放資源最后修改狀態為destroy狀態,,
當斷線時,disconnected處理函數最終會調用xio_conn_release來釋放conn,會調用到函數xio_rdma_close,這時候就不需要再調用disconnect函數,直接釋放資源,
- 資料
代碼:https://github.com/accelio/
Accelio技術白皮書
聲明:本文所用圖片和部分內容來自accelio官網和白皮書。轉載本博文章須在文章明顯處注明作者及附上原文鏈接,便於讀者找到原文的更新版。