http://blog.sina.com.cn/s/blog_56dee71a0100qx4s.html
很多時候,除了響應事件之外,應用還希望做一定的數據緩沖。比如說,寫入數據的時候,通常的運行模式是:
l 決定要向連接寫入一些數據,把數據放入到緩沖區中
l 等待連接可以寫入
l 寫入盡量多的數據
l 記住寫入了多少數據,如果還有更多數據要寫入,等待連接再次可以寫入
這種緩沖IO模式很通用,libevent為此提供了一種通用機制,即bufferevent。bufferevent由一個底層的傳輸端口(如套接字),一個讀取緩沖區和一個寫入緩沖區組成。與通常的事件在底層傳輸端口已經就緒,可以讀取或者寫入的時候執行回調不同的是,bufferevent在讀取或者寫入了足夠量的數據之后調用用戶提供的回調。
bufferevent 的簡單范例
這里選取了 Libevent 的一個范例程序 hello-world.c 來看看 Libevent 的用法:
#include <string.h>
#include <errno.h> #include <stdio.h> #include <signal.h> #ifndef WIN32 #include <netinet/in.h> # ifdef _XOPEN_SOURCE_EXTENDED # include <arpa/inet.h> # endif #include <sys/socket.h> #endif // bufferevent #include <event2/bufferevent.h> // bufferevent 使用的 buffer #include <event2/buffer.h> // 連接監聽器 #include <event2/listener.h> #include <event2/util.h> #include <event2/event.h> static const char MESSAGE[] = "Hello, World!\n"; static const int PORT = 9995; // 新連接到來時的回調 static void listener_cb(struct evconnlistener *, evutil_socket_t, struct sockaddr *, int socklen, void *); // 讀取回調 static void conn_writecb(struct bufferevent *, void *); // 事件回調 static void conn_eventcb(struct bufferevent *, short, void *); // 信號回調 static void signal_cb(evutil_socket_t, short, void *); int main(int argc, char **argv) { struct event_base *base; struct evconnlistener *listener; struct event *signal_event; struct sockaddr_in sin; #ifdef WIN32 WSADATA wsa_data; WSAStartup(0x0201, &wsa_data); #endif // 首先構建 base base = event_base_new(); if (!base) { fprintf(stderr, "Could not initialize libevent!\n"); return 1; } memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_port = htons(PORT); // 創建監聽器 listener = evconnlistener_new_bind(base, listener_cb, (void *)base, LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1, (struct sockaddr*)&sin, sizeof(sin)); if (!listener) { fprintf(stderr, "Could not create a listener!\n"); return 1; } // 中斷信號 signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base); if (!signal_event || event_add(signal_event, NULL)<0) { fprintf(stderr, "Could not create/add a signal event!\n"); return 1; } event_base_dispatch(base); evconnlistener_free(listener); event_free(signal_event); event_base_free(base); printf("done\n"); return 0; } static void listener_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sa, int socklen, void *user_data) { struct event_base *base = user_data; struct bufferevent *bev; // 得到一個新的連接,通過連接 fd 構建一個 bufferevent bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); if (!bev) { fprintf(stderr, "Error constructing bufferevent!"); event_base_loopbreak(base); return; } // 設置創建的 bufferevent 的回調函數 bufferevent_setcb(bev, NULL, conn_writecb, conn_eventcb, NULL); bufferevent_enable(bev, EV_WRITE); bufferevent_disable(bev, EV_READ); // 寫入數據到 bufferevent 中 bufferevent_write(bev, MESSAGE, strlen(MESSAGE)); } static void conn_writecb(struct bufferevent *bev, void *user_data) { struct evbuffer *output = bufferevent_get_output(bev); if (evbuffer_get_length(output) == 0) { printf("flushed answer\n"); bufferevent_free(bev); } } static void conn_eventcb(struct bufferevent *bev, short events, void *user_data) { if (events & BEV_EVENT_EOF) { printf("Connection closed.\n"); } else if (events & BEV_EVENT_ERROR) { printf("Got an error on the connection: %s\n", strerror(errno));/*XXX win32*/ } /* None of the other events can happen here, since we haven't enabled * timeouts */ bufferevent_free(bev); } static void signal_cb(evutil_socket_t sig, short events, void *user_data) { struct event_base *base = user_data; struct timeval delay = { 2, 0 }; printf("Caught an interrupt signal; exiting cleanly in two seconds.\n"); // 停止事件循環 event_base_loopexit(base, &delay); }
研究 bufferevent 的關鍵代碼
這里只研究基於 socket 的 bufferevent。從上面 bufferevent 的使用可以看出,有幾個關鍵函數:
- 開始需要調用 bufferevent_socket_new 創建一個 bufferevent
- 調用 bufferevent_setcb 設置回調函數
- 調用 bufferevent_write 寫入數據
- 調用 bufferevent_free 釋放 bufferevent
bufferevent_socket_new 的源碼以及分析如下:
// base --- 新創建的 bufferevent 關聯的 base
// fd --- bufferevent 關聯的文件描述符 struct bufferevent * bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, int options) { // bufferevent_private 結構體持有 bufferevent 的數據 struct bufferevent_private *bufev_p; // bufev == &(bufev_p->bev); // struct bufferevent 中存放的是不同類型的 bufferevent 所共有的部分 // struct bufferevent 是 struct bufferevent_private 的子集 struct bufferevent *bufev; // windows 下如果啟用 IOCP 則構建異步 IO bufferevent #ifdef WIN32 if (base && event_base_get_iocp(base)) // 細節略 return bufferevent_async_new(base, fd, options); #endif if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL) return NULL; // 初始化 bufferevent_private // 由於 bufferevent 有不同類型,所以這里設計了 bufferevent_ops_socket // 對於不同類型的 bufferevent 有不同的 bufferevent_ops_socket 對象 // bufferevent_ops_socket 包括函數指針和一些信息 if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket, options) < 0) { mm_free(bufev_p); return NULL; } bufev = &bufev_p->bev; // 設置 EVBUFFER_FLAG_DRAINS_TO_FD,此選項和 evbuffer_add_file() 函數有關(詳見文檔) evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD); // 初始化 read 和 write event // 一個 bufferevent(一個 fd)關聯兩個 event 對象 ev_read 和 ev_write // ev_read --- socket 可讀或者超時 // ev_write --- socket 可寫或者超時 // 它們都未使用 Edge triggered 方式 event_assign(&bufev->ev_read, bufev->ev_base, fd, EV_READ|EV_PERSIST, bufferevent_readcb, bufev); event_assign(&bufev->ev_write, bufev->ev_base, fd, EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev); // 為輸出緩沖區設置回調 // 當輸出緩沖區被修改時調用 bufferevent_socket_outbuf_cb 回調函數 evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev); // 防止輸入緩沖區和輸出緩沖區被意外修改 evbuffer_freeze(bufev->input, 0); evbuffer_freeze(bufev->output, 1); return bufev; }
其中 bufferevent_init_common 函數實現為:
int
bufferevent_init_common(struct bufferevent_private *bufev_private, struct event_base *base, const struct bufferevent_ops *ops, enum bufferevent_options options) { struct bufferevent *bufev = &bufev_private->bev; // 創建輸入緩沖區 if (!bufev->input) { if ((bufev->input = evbuffer_new()) == NULL) return -1; } // 創建輸出緩沖區 if (!bufev->output) { if ((bufev->output = evbuffer_new()) == NULL) { evbuffer_free(bufev->input); return -1; } } // 初始化 bufferevent 的引用計數 bufev_private->refcnt = 1; bufev->ev_base = base; /* Disable timeouts. */ // 清理超時時間 evutil_timerclear(&bufev->timeout_read); evutil_timerclear(&bufev->timeout_write); bufev->be_ops = ops; /* * Set to EV_WRITE so that using bufferevent_write is going to * trigger a callback. Reading needs to be explicitly enabled * because otherwise no data will be available. */ // enabled 被 bufferevent_get_enabled 函數返回 // enabled 的值可以為 EV_WRITE EV_READ bufev->enabled = EV_WRITE; // bufferevent 相關線程初始化 #ifndef _EVENT_DISABLE_THREAD_SUPPORT if (options & BEV_OPT_THREADSAFE) { if (bufferevent_enable_locking(bufev, NULL) < 0) { /* cleanup */ evbuffer_free(bufev->input); evbuffer_free(bufev->output); bufev->input = NULL; bufev->output = NULL; return -1; } } #endif // 選項正確性檢查 if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS)) == BEV_OPT_UNLOCK_CALLBACKS) { event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS"); return -1; } // defer callbacks 初始化 if (options & BEV_OPT_DEFER_CALLBACKS) { if (options & BEV_OPT_UNLOCK_CALLBACKS) event_deferred_cb_init(&bufev_private->deferred, bufferevent_run_deferred_callbacks_unlocked, bufev_private); else event_deferred_cb_init(&bufev_private->deferred, bufferevent_run_deferred_callbacks_locked, bufev_private); } bufev_private->options = options; // 關聯 bufferevent 和 buffer evbuffer_set_parent(bufev->input, bufev); evbuffer_set_parent(bufev->output, bufev); return 0; }
bufferevent 創建成功之后,fd 上存在數據可讀則調用 bufferevent_readcb
// fd 可讀
static void bufferevent_readcb(evutil_socket_t fd, short event, void *arg) { struct bufferevent *bufev = arg; struct bufferevent_private *bufev_p = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); struct evbuffer *input; int res = 0; short what = BEV_EVENT_READING; ev_ssize_t howmuch = -1, readmax=-1; _bufferevent_incref_and_lock(bufev); // 如果超時了 if (event == EV_TIMEOUT) { /* Note that we only check for event==EV_TIMEOUT. If * event==EV_TIMEOUT|EV_READ, we can safely ignore the * timeout, since a read has occurred */ what |= BEV_EVENT_TIMEOUT; goto error; } input = bufev->input; /* * If we have a high watermark configured then we don't want to * read more data than would make us reach the watermark. */ // 是否設置了輸入緩沖區的最大大小 if (bufev->wm_read.high != 0) { howmuch = bufev->wm_read.high - evbuffer_get_length(input); /* we somehow lowered the watermark, stop reading */ // 緩沖區中數據過多 if (howmuch <= 0) { // 暫停 bufferevent 的數據讀取 // 具體的做法是移除 read event(ev_read) bufferevent_wm_suspend_read(bufev); goto done; } } // 獲取可讀最大大小 // 和限速有關,如果不限速,則為 MAX_TO_READ_EVER(16384) 也就是 16K readmax = _bufferevent_get_read_max(bufev_p); if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited" * uglifies this code. XXXX */ howmuch = readmax; // 如果讀取暫停 if (bufev_p->read_suspended) goto done; // 輸入緩沖區可讀 evbuffer_unfreeze(input, 0); // 讀取 fd 上的數據 res = evbuffer_read(input, fd, (int)howmuch); /* XXXX evbuffer_read would do better to take and return ev_ssize_t */ // 輸入緩沖區禁止讀取 evbuffer_freeze(input, 0); // 讀取數據失敗 if (res == -1) { // 獲取到錯誤 int err = evutil_socket_geterror(fd); // EINTR、EAGAIN // Windows 下為 WSAEWOULDBLOCK、WSAEINTR if (EVUTIL_ERR_RW_RETRIABLE(err)) goto reschedule; // 如果錯誤是不可重試的,報錯 /* error case */ what |= BEV_EVENT_ERROR; // eof } else if (res == 0) { /* eof case */ what |= BEV_EVENT_EOF; } if (res <= 0) goto error; _bufferevent_decrement_read_buckets(bufev_p, res); /* Invoke the user callback - must always be called last */ // 判斷是否需要調用回調 if (evbuffer_get_length(input) >= bufev->wm_read.low) _bufferevent_run_readcb(bufev); goto done; reschedule: goto done; error: // 出錯后暫停讀取數據 bufferevent_disable(bufev, EV_READ); // 通過事件回調通知出錯 _bufferevent_run_eventcb(bufev, what); done: _bufferevent_decref_and_unlock(bufev); }
這里比較關鍵的函數為 evbuffer_read:
int
evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch) { struct evbuffer_chain **chainp; int n; int result; #ifdef USE_IOVEC_IMPL int nvecs, i, remaining; #else struct evbuffer_chain *chain; unsigned char *p; #endif EVBUFFER_LOCK(buf); // buffer 是否可讀 if (buf->freeze_end) { result = -1; goto done; } // 獲取當前 socket 可讀的字節數 n = get_n_bytes_readable_on_socket(fd); if (n <= 0 || n > EVBUFFER_MAX_READ) n = EVBUFFER_MAX_READ; // 盡可能多的讀取 if (howmuch < 0 || howmuch > n) howmuch = n; // 一種實現 #ifdef USE_IOVEC_IMPL /* Since we can use iovecs, we're willing to use the last * NUM_READ_IOVEC chains. */ // 調整緩沖區(細節略) if (_evbuffer_expand_fast(buf, howmuch, NUM_READ_IOVEC) == -1) { result = -1; goto done; } else { IOV_TYPE vecs[NUM_READ_IOVEC]; #ifdef _EVBUFFER_IOVEC_IS_NATIVE nvecs = _evbuffer_read_setup_vecs(buf, howmuch, vecs, NUM_READ_IOVEC, &chainp, 1); #else /* We aren't using the native struct iovec. Therefore, we are on win32. */ struct evbuffer_iovec ev_vecs[NUM_READ_IOVEC]; nvecs = _evbuffer_read_setup_vecs(buf, howmuch, ev_vecs, 2, &chainp, 1); for (i=0; i < nvecs; ++i) WSABUF_FROM_EVBUFFER_IOV(&vecs[i], &ev_vecs[i]); #endif #ifdef WIN32 { // 讀取到的數據字節數 DWORD bytesRead; DWORD flags=0; // windows 下進行 recv if (WSARecv(fd, vecs, nvecs, &bytesRead, &flags, NULL, NULL)) { /* The read failed. It might be a close, * or it might be an error. */ // 這里使用 WSARecv 時需要注意 WSAECONNABORTED 表示了連接關閉了 if (WSAGetLastError() == WSAECONNABORTED) n = 0; else n = -1; } else n = bytesRead; } #else // 非 windows 平台 read n = readv(fd, vecs, nvecs); #endif } // 使用另外一種實現 #else /*!USE_IOVEC_IMPL*/ /* If we don't have FIONREAD, we might waste some space here */ /* XXX we _will_ waste some space here if there is any space left * over on buf->last. */ if ((chain = evbuffer_expand_singlechain(buf, howmuch)) == NULL) { result = -1; goto done; } /* We can append new data at this point */ p = chain->buffer + chain->misalign + chain->off; // read #ifndef WIN32 n = read(fd, p, howmuch); #else n = recv(fd, p, howmuch, 0); #endif #endif /* USE_IOVEC_IMPL */ if (n == -1) { result = -1; goto done; } if (n == 0) { result = 0; goto done; } #ifdef USE_IOVEC_IMPL remaining = n; for (i=0; i < nvecs; ++i) { ev_ssize_t space = (ev_ssize_t) CHAIN_SPACE_LEN(*chainp); if (space < remaining) { (*chainp)->off += space; remaining -= (int)space;