亂談服務器編程
第一部分 編程模型
1、服務器編程模型
關於server編程模型,大師stevens在他的《UNP》一書中已經做了詳細論述,這里不再重復,這里主要講一下我的一些理解。
從線程的角度,可以分為兩類,一是單線程,一是多線程。先來看單線程模型。
1.1、單線程模型
整個進程只有一個線程,由於只有一個線程,所以要實現高性能,必須與“non-blocking IO + IO multiplexing”相結合。程序基本模型如下:
struct epoll_event ev, events[MAX_EVENTS];
int listen_sock, nfds, epollfd;
/* Set up listening socket, 'listen_sock' (socket(),bind(), listen()) */
/* 1. 創建一個epoll描述符 */
epollfd = epoll_create(10);
ev.events = EPOLLIN;
ev.data.fd = listen_sock;
/* 2. 注冊監聽事件 */
epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev);
for (;;) {
/* 3. 監聽事件 */
nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
/* 4. 處理事件 */
for (n = 0; n < nfds; ++n) {
if (events[i].data.fd == listener_fd) {
HandleAccept(events[i].data.fd);
continue;
}
if (events[i].events & EPOLLIN) {
HandleRead(events[i].data.fd);
}
if (events[i].events & EPOLLOUT) {
HandleWrite(events[i].data.fd);
}
}
}
該模型的代表主要有haproxy、nginx等,另外libevent本身也是單線程的。相對於多線程,單線程server沒有線程切換以及加鎖的開銷,劣勢是不能充分利用CPU的多核優勢,不過,這可以通過多個進程來解決。
另外,這種模型編程也很簡單,因為簡單,所以是編寫高性能server的首選。公司內部的網絡框架Srvframework、GNP都屬於這類范疇。 Alan Cox曾說:“線程是為不懂狀態機的人准備的”[9]。的確,單線程+狀態機可以做很多事情。
1.2、多線程模型
進程有多個線程,一般來說,可以將server的線程分成兩類:I/O線程和工作線程。由此又可以將多線程模型大體分成兩類:單一I/O線程+多個工作線程、多個I/O線程(工作線程)。另外,不論是單線程,還是多線程,non-blocking IO + IO multiplexing都是必選的。
(1)單一I/O線程+多個工作線程
這種模型下,I/O線程負責event loop、I/O操作,工作線程負責實際的業務邏輯處理,I/O線程與工作線程可以通過阻塞隊列/共享內存等方式進行數據交換,隊列/共享內存的訪問需要加鎖。實際上,這種模型本質上與1.2中單線程是類似的,只不過這里的業務邏輯交由單獨的工作處理,它有大家都很熟悉的名字——半同步/半異步模型(HS/HA)。Taf屬於這類。
(2)多個I/O線程(工作線程)
這種模型下,每個I/O線程都有一個event loop,另外,這里的工作線程可有可無,而且通常是沒有,即I/O線程既處理I/O,又進行業務邏輯計算。大家熟悉的leader/follower(L/F)以及muti-reactor(M-R)模型都屬於這類,這方面的資料很多,見參考文獻,不再討論。Memcached使用的M-R,ICE使用的L/F。
1.3、小結
個人認為:(1)單線程模型實現簡單,如果業務邏輯不復雜,是實現高性能server的首選,比如proxy之類的server。(2)HS/HA很清晰,如果業務邏輯很復雜,比如database,可以考慮這種模型。(3)如果你想充分利用多CPU,當然可以考慮L/F或者M-R。但是值得一提的是,L/F中會有鎖的開銷,而M-R中沒有鎖的開銷,但M-R的線程切換的開銷要高於L/F。根據同事的一些測試,對於短連接L/F的結果好於M-R,而對於長連接,M-R要好於L/F。
第二部分 理解epoll
2、epoll的實現
2.1、核心數據結構
2.1.1、epoll實例
當用系統調用函數epoll_create創建一個epoll環境時,用戶態得到epoll fd,內核態創建一個eventpoll:
//fs/eventpoll.c
struct eventpoll {
/* Protect the access to this structure */
spinlock_t lock;
/*
* This mutex is used to ensure that files are not removed
* while epoll is using them. This is held during the event
* collection loop, the file cleanup path, the epoll file exit
* code and the ctl operations.
*/
struct mutex mtx; ///主要用於epoll_ctl的並發
/* Wait queue used by sys_epoll_wait() */
wait_queue_head_t wq; ///sys_epoll_wait()使用的等待隊列, process list
/* Wait queue used by file->poll() */
wait_queue_head_t poll_wait; ///file->poll()使用的等待隊列
/* List of ready file descriptors */
struct list_head rdllist; ///ready list
/* RB tree root used to store monitored fd structs */
struct rb_root rbr;
/*
* This is a single linked list that chains all the "struct epitem" that
* happened while transferring ready events to userspace w/out
* holding ->lock.
*/
struct epitem *ovflist;
/* The user that created the eventpoll descriptor */
struct user_struct *user;
}
理解這個結構是理解epoll的開始,所以這里有必要解釋一下。
我們知道在Unix/Linux中,一切都是文件,對於epoll實例的fd,eventpoll通常保存在file. private_data字段中。
lock:自旋鎖,用於保護該數據結構。
mtx:互斥量,主要用於多個epoll_ctl之間的並發,epoll以紅黑樹組織關注的fd,epoll_ctl會修改紅黑樹,參見epoll_ctl的實現。
為什么有了一個自旋鎖,還要搞一個互斥量?見最后一小結。
wq:epoll_wait使用的等待隊列,在多線程程序中,我們可以在多個線程中對同一個epoll實例調用epoll_wait。
poll_wait:這個域是比較讓人費解的。這里說說我的理解:對於socket fd,會將fd對應的epitem加入到sock的等待隊列,但是,對於epfd,沒有sock對象,用poll_wait做等待隊列,參見ep_eventpoll_poll。
ovflist: 主要是解決當內核在傳輸數據給用戶空間(ep_send_events_proc)時的鎖(eventpoll->mtx),此時epoll就是將這個時候傳遞上來的事件保存到ovflist中。
2.1.2、epitem
在epoll內部,每個關注的fd都對應一個epitem:
struct epitem {
/* RB tree node used to link this structure to the eventpoll RB tree */
struct rb_node rbn; ///RB tree
/* List header used to link this structure to the eventpoll ready list */
struct list_head rdllink; ///ready list
/*
* Works together "struct eventpoll"->ovflist in keeping the
* single linked chain of items.
*/
struct epitem *next;
/* The file descriptor information this item refers to */
struct epoll_filefd ffd; ///文件描述符信息
/* Number of active wait queue attached to poll operations */
int nwait;
/* List containing poll wait queues */
struct list_head pwqlist;
/* The "container" of this item */
struct eventpoll *ep;
/* List header used to link this item to the "struct file" items list */
struct list_head fllink;
/* The structure that describe the interested events and the source fd */
struct epoll_event event; ///關注的事件
};
這個結構比較簡單,沒什么好說的。
2.2、核心函數
2.2.1、epoll_ctl
當我們調用epoll_create創建一個epoll實例后,就可以調用epoll_ctl向epoll添加關注的fd了。
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
…
ep = file->private_data;
//對互斥量加鎖
mutex_lock(&ep->mtx);
/*
* Try to lookup the file inside our RB tree, Since we grabbed "mtx"
* above, we can be sure to be able to use the item looked up by
* ep_find() till we release the mutex.
*/
epi = ep_find(ep, tfile, fd);
error = -EINVAL;
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_insert(ep, &epds, tfile, fd);
} else
error = -EEXIST;
break;
case EPOLL_CTL_DEL:
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
case EPOLL_CTL_MOD:
if (epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &epds);
} else
error = -ENOENT;
break;
}
mutex_unlock(&ep->mtx);
…
}
ep_insert:
/* @tfile :target file,即fd對應的file */
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd)
{
…
/* Initialize the poll table using the queue callback */
epq.epi = epi;
//設置poll中調用的回調函數ep_ptable_queue_proc
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
/* 調用底層文件系統的poll,對於tcp socket,為sock_poll,后者調用具體協議(protocol-specific)的poll,如tcp_poll( ) */
revents = tfile->f_op->poll(tfile, &epq.pt); ///參見sock_poll
//加入紅黑樹
ep_rbtree_insert(ep, epi);
/* We have to drop the new item inside our item list to keep track of it */
spin_lock_irqsave(&ep->lock, flags);
/* 如果文件已經ready,則加入到ready list */
if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
/* Notify waiting tasks that events are available */
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq); ///喚醒等待進程,調用epoll_wait的進程
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock, flags);
…
}
ep_insert主要將fd添加到紅黑樹,然后在具體協議的poll,比如tcp_poll中,調用回調函數ep_ptable_queue_proc,最后檢查fd是否ready,如果ready,則將fd加入到就緒隊列,並喚醒等待進程。另外,值得一提的是在epoll_ctl和epoll_wait中,對epoll的就緒隊列的訪問都是由自旋鎖lock保護。
/* @file : target file
** @whead: fd對應的sock的等待隊列
*/
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;
if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); ///注冊喚醒回調函數
pwq->whead = whead;
pwq->base = epi;
add_wait_queue(whead, &pwq->wait); ///將epitem加到sock的等待隊列
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
} else {
/* We have to signal that an error occurred */
epi->nwait = -1;
}
}
ep_ptable_queue_proc主要將epoll_ctl傳進來的fd封裝成的epitem添加到target file對應的sock的等待隊列。
當socket收到數據時,內核協議棧會將其放到sock的接收隊列sk_receive_queue,並調用sk_data_ready回調函數(如果用標准函數sock_init_data來初始化sock實例,通常是sock_def_readable),喚醒等待隊列,內核喚醒原語最終會調用這里注冊的回調函數ep_poll_callback。
//net/core/sock.c
static void sock_def_readable(struct sock *sk, int len)
{
struct socket_wq *wq;
rcu_read_lock();
wq = rcu_dereference(sk->sk_wq);
if (wq_has_sleeper(wq)) ///檢查sock的等待隊列
wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI |
POLLRDNORM | POLLRDBAND); ///喚醒
sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
rcu_read_unlock();
}
下面來看看ep_poll_callback:
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
…
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi->ep;
spin_lock_irqsave(&ep->lock, flags);
//如果只有ET/ONESHOT位設置,則不加入ready list
if (!(epi->event.events & ~EP_PRIVATE_BITS))
goto out_unlock;
///check event
if (key && !((unsigned long) key & epi->event.events))
goto out_unlock;
/* If this file is already in the ready list we exit soon */
if (!ep_is_linked(&epi->rdllink)) ///加入ready鏈表
list_add_tail(&epi->rdllink, &ep->rdllist);
/*
* Wake up ( if active ) both the eventpoll wait list and the ->poll()
* wait list.
*/
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq); ///喚醒等待進程
if (waitqueue_active(&ep->poll_wait))
pwake++;
out_unlock:
spin_unlock_irqrestore(&ep->lock, flags);
…
}
該函數將fd加到epoll實例的ready list,然后喚醒epoll實例的等待進程隊列。注意紅色部分,后面討論epoll線程安全性的時候會再提到。
2.2.2、epoll_wait
當我們把關注的fd添加到epoll環境后,就可以調用epoll_wait等待fd上的事件發生了。
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
struct file *file; ///epfd file
struct eventpoll *ep;
file = fget(epfd);
ep = file->private_data; ///epoll環境
/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, timeout);
…
}
很簡單,主要邏輯由ep_poll完成:
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
…
fetch_events:
spin_lock_irqsave(&ep->lock, flags);
if (!ep_events_available(ep)) { ///沒有ready event, sleep current process
/*
* We don't have any available event to return to the caller.
* We need to sleep here, and we will be wake up by
* ep_poll_callback() when events will become available.
*/
init_waitqueue_entry(&wait, current);
///將進程加入到epoll環境的等待隊列
__add_wait_queue_exclusive(&ep->wq, &wait);
for (;;) {
/*
* We don't want to sleep if the ep_poll_callback() sends us
* a wakeup in between. That's why we set the task state
* to TASK_INTERRUPTIBLE before doing the checks.
*/
set_current_state(TASK_INTERRUPTIBLE);
if (ep_events_available(ep) || timed_out) ///timeout==0, timed_out==1,則break
break;
if (signal_pending(current)) {
res = -EINTR;
break;
}
spin_unlock_irqrestore(&ep->lock, flags);
if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS)) ///schedule
timed_out = 1;
spin_lock_irqsave(&ep->lock, flags);
}
__remove_wait_queue(&ep->wq, &wait);
set_current_state(TASK_RUNNING);
}
check_events:
/* Is it worth to try to dig for events ? */
eavail = ep_events_available(ep);
spin_unlock_irqrestore(&ep->lock, flags);
/*
* Try to transfer events to user space. In case we get 0 events and
* there's still timeout left over, we go trying again in search of
* more luck.
*/
if (!res && eavail && ///將事件拷貝到用戶空間
!(res = ep_send_events(ep, events, maxevents)) && !timed_out)
goto fetch_events;
…
}
ep_poll的邏輯也非常簡單,就是不斷檢查epoll的ready list,如果有ready fd,則將其拷貝到用戶空間。
注意ep_poll是調用__add_wait_queue_exclusive將當前進程加入到epoll的等待隊列的,所以,即使多個線程對同一個epoll調用epoll_wait,也不會出現thundering herd問題。
最后來看一下ep_send_events函數,因為它與epoll的幾種模式:LT、ET和ONESHOT相關,理解了其實現,也就理解了LT、ET與ONESHOT。
2.2.3、ep_send_events
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
struct ep_send_events_data esed;
esed.maxevents = maxevents;
esed.events = events;
///處理ready list
return ep_scan_ready_list(ep, ep_send_events_proc, &esed);
}
主要邏輯是在回調函數ep_send_events_proc中完成的。
static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv) { struct ep_send_events_data *esed = priv; int eventcnt; unsigned int revents; struct epitem *epi; struct epoll_event __user *uevent;
/* * We can loop without lock because we are passed a task private list. * Items cannot vanish during the loop because ep_scan_ready_list() is * holding "mtx" during this call. */ for (eventcnt = 0, uevent = esed->events; !list_empty(head) && eventcnt < esed->maxevents;) { epi = list_first_entry(head, struct epitem, rdllink);
list_del_init(&epi->rdllink); ///刪除一個epitem ///檢查事件 revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) & epi->event.events;
/* * If the event mask intersect the caller-requested one, * deliver the event to userspace. Again, ep_scan_ready_list() * is holding "mtx", so no operations coming from userspace * can change the item. */ if (revents) { if (__put_user(revents, &uevent->events) || __put_user(epi->event.data, &uevent->data)) { list_add(&epi->rdllink, head); ///加入到輸出隊列 return eventcnt ? eventcnt : -EFAULT; } eventcnt++; uevent++; if (epi->event.events & EPOLLONESHOT) ///take care,只設置EPOLLET/EPOLLONESHOT,參見ep_poll_callback epi->event.events &= EP_PRIVATE_BITS; else if (!(epi->event.events & EPOLLET)) { //LT模式 /* * If this file has been added with Level * Trigger mode, we need to insert back inside * the ready list, so that the next call to * epoll_wait() will check again the events * availability. At this point, no one can insert * into ep->rdllist besides us. The epoll_ctl() * callers are locked out by * ep_scan_ready_list() holding "mtx" and the * poll callback will queue them in ep->ovflist. */ list_add_tail(&epi->rdllink, &ep->rdllist); //LT模式,重新加入到ready list } } }
return eventcnt; } |
LT:Level Triggered,水平觸發是epoll的默認工作模式,當fd上有事件發生,內核除了把事件上報給用戶,還把fd重新加到就緒隊列中,所以直到收集事件時沒有事件發生,該fd才從epoll的就緒隊列中移除。
例如:socket fd可讀,如果數據並沒有讀完,則接下來每次epoll_wait都會返回該fd可讀,直到有一次收集事件失敗,即socket不可讀。
ET: Edge Triggered,邊緣觸發,相比LT,ET收集完事件后不會把fd重新加入就緒隊列。
如果fd可讀,epoll上報有事件發生,該fd也從就緒隊列中移除了,無論數據有沒有讀完。該fd只有在下次事件發生並在回調函數ep_poll_callback中被加入就緒隊列。
ONESHOT: 顧名思義,如果fd有事件發生,只會觸發一次。從ep_send_events_pro的實現可以看到,對於EPOLLONESHOT,會將其它事件位全部清掉。這樣,以后ep_poll_callback(參見其實現)將不會將該fd加入ready list,即使有事件發生,直到用戶再一次通過EPOLL_CTL_MOD重新設置fd。所以,對於ONESHOT的fd,如果有事件發生,每次EPOLL_CTL_MOD只會上報一次。
第三部分 問題
3、epoll為什么比poll高效
先看看poll的實現:
//fs/select.c
SYSCALL_DEFINE3(poll, struct pollfd __user *, ufds, unsigned int, nfds,
long, timeout_msecs)
{
…
ret = do_sys_poll(ufds, nfds, to);
…
}
主要邏輯在do_sys_poll完成:
int do_sys_poll(struct pollfd __user *ufds, unsigned int nfds,
struct timespec *end_time)
{
struct poll_wqueues table;
int err = -EFAULT, fdcount, len, size;
/* Allocate small arguments on the stack to save memory and be
faster - use long to make sure the buffer is aligned properly
on 64 bit archs to avoid unaligned access */
long stack_pps[POLL_STACK_ALLOC/sizeof(long)];
struct poll_list *const head = (struct poll_list *)stack_pps; ///先使用棧上的空間
struct poll_list *walk = head;
unsigned long todo = nfds;
if (nfds > rlimit(RLIMIT_NOFILE))
return -EINVAL;
len = min_t(unsigned int, nfds, N_STACK_PPS);
for (;;) {
walk->next = NULL;
walk->len = len;
if (!len)
break;
///1.將用戶空間的描述符拷貝到內核
if (copy_from_user(walk->entries, ufds + nfds-todo,
sizeof(struct pollfd) * walk->len))
goto out_fds;
todo -= walk->len;
if (!todo)
break;
///如果棧上空間不夠,則調用kmalloc分配空間存儲描述符信息
len = min(todo, POLLFD_PER_PAGE);
size = sizeof(struct poll_list) + sizeof(struct pollfd) * len;
walk = walk->next = kmalloc(size, GFP_KERNEL);
if (!walk) {
err = -ENOMEM;
goto out_fds;
}
}
///設置poll的回調函數,與epoll類似
poll_initwait(&table);
///2.poll所有描述符,是否有事件發生
fdcount = do_poll(nfds, head, &table, end_time);
poll_freewait(&table);
for (walk = head; walk; walk = walk->next) {
struct pollfd *fds = walk->entries;
int j;
///3.設置相應文件描述發生的事件
for (j = 0; j < walk->len; j++, ufds++)
if (__put_user(fds[j].revents, &ufds->revents))
goto out_fds;
}
…
}
從上面的代碼可以看出,poll至少有三個原因導致它比epoll效率低:
(1)每次調用都要將用戶空間的所有描述符信息拷貝到內核;
(2)與epoll不同,poll內部沒一個ready list,所以,每次都需要檢查所有的描述符;
(3)遍歷所有的描述符,設置發生的事件。
當fd數量較多時,比如支持上萬連接的高並發的server,這些遍歷操作會成為性能的致命殺手。
4、epoll線程安全性
考慮兩種情況:一是兩個線程對同一個epoll調用epoll_ctl;二是A線程對epoll調用epoll_wait,同時線程B調用epoll_ctl。
(1) 從第2節的epoll實現可以看到,epoll_ctl會修改內部紅黑樹,而且同時涉及到內存分配,所以它們之間通過互斥量保證線程性安全性。
(2) 另外,epoll_wait與epoll_wait,或者epoll_wait與epoll_ctl之間,涉及到對epoll內部ready list的訪問,而且時間很短,沒有其它復雜邏輯。所以用自旋鎖保護。
綜上,epoll是線程安全的。
在Memcached中,每個線程都有一個epoll loop,主線程(處理accept)收到一個連接,會丟到工作線程的epoll loop中,它先將連接對象CQ_ITEM加入工作線程的隊列,然后通過管道來通知工作線程。但是,由於兩個線程需要訪問連接對象隊列,所以,使用了鎖來保護。實際上,如果在主線程中直接對工作線程調用epoll_ctl,是沒有問題的,這樣,我們就可以做一些工作(比如利用epoll_event數據結構),從而做到多個線程間無鎖編程(這里的無鎖是指用戶態無鎖,但內核仍然會有鎖)。注意,這里只針對epoll,不適用於其它模型。
主要參考
1. Kernel 3.0 sourcode
2. Libevent 1.4.13 sourcecode
3. Memcached 1.4.5 sourcecode
4. Haproxy 1.4.8 sourcecode
5. Nginx 0.8.28 sourcecode
6. 《Half Sync/Half Async》
7. 《Leader/Followers:A Design Pattern for Efficient Multi-threaded I/O Demultiplexing and Dispatching》
8. 《Understanding Linux Network Internals》
9. http://lkml.indiana.edu/hypermail/linux/kernel/0106.2/0405.html
作者:MrDB
出處:http://www.cnblogs.com/hustcat/
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。