epoll源碼分析


epoll源碼分析

最近在使用libev過程中遇到一個場景:一個fd從一個ev_loop遷移到另一個ev_loop,會出現這個fd同時存在兩個epoll的瞬間。
不禁要問了,一個fd同時被兩個epoll監視的行為是怎樣的,epoll嵌套使用是怎樣實現的?為此,整理了以前讀的epoll源碼。

概述

epoll的擴展性和性能關鍵在於兩個數據結構: 0) 一個rbtree; 1) 一個ready list.
epoll是有狀態的, 內核中維護了一個數據結構用來管理所要監視的fd,這個數據結構是eventpoll.
在eventpoll中有一顆紅黑樹, 用來快速的查找和修改要監視的fd,每個節點被封裝成epitem結構.
在eventpoll中有一個列表, 用來收集已經發生事件的epitem, 這個list叫ready list.

epoll系統的初始化

eventpoll_init()
{
    eventpoll_mnt = kern_mount(&eventpoll_fs_type);

    epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem),
        0, SLAB_HWCACHE_ALIGN|EPI_SLAB_DEBUG|SLAB_PANIC,
    NULL, NULL);

    pwq_cache = kmem_cache_create("eventpoll_pwq",
    sizeof(struct eppoll_entry), 0,
    EPI_SLAB_DEBUG|SLAB_PANIC, NULL, NULL);

    error = register_filesystem(&eventpoll_fs_type);
    eventpoll_mnt = kern_mount(&eventpoll_fs_type);
}

init初始化代碼很簡單:
1. 申請epitem的緩沖;
2. 申請eppoll_entry的緩沖;
3. 把epoll和文件系統關聯起來.

下圖是 epoll和VFS的關聯:
epoll_

epoll創建 - epoll_create

SYSCALL_DEFINE1(epoll_create1, int, flags)
{
    int error, fd;
    struct eventpoll *ep = NULL;
    struct file *file;

    error = ep_alloc(&ep);
    fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
    file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep, (flags & O_CLOEXEC));
    fd_install(fd, file);
    ep->file = file;
    return fd;
}
  1. error = ep_alloc(&ep); 分配一個epollevent結構體;
  2. 把ep和文件系統的inode, file關聯起來。

epoll添加事件 - epoll_ctl

asmlinkage long
sys_epoll_ctl(int epfd, int op, int fd, struct epoll_event __user *event)
{
    struct file *file, *tfile;
    struct eventpoll *ep;
    struct epitem *epi;
    struct epoll_event epds;

    tfile = fget(fd);
    // 判斷epfd是否是一個epoll
    if (file == tfile || !is_file_epoll(file))
        goto eexit_3;

    // 從private_data中取出eventpoll指針
    // 並且上鎖, 因此一個epoll_ctl是線程安全的
    ep = file->private_data;
    down_write(&ep->sem);

    // 嘗試着從紅黑樹ep->rbr上找到tfile對應的一個epitem
    epi = ep_find(ep, tfile, fd);

    error = -EINVAL;
    switch (op) {
        case EPOLL_CTL_ADD:
        if (!epi) { // 如果是ADD操作,並且這個fd不在eventpoll里,則執行插入操作,注意:內核會主動加上POLLERR和POLLHUP事件
            epds.events |= POLLERR | POLLHUP;
        error = ep_insert(ep, &epds, tfile, fd);
        } else // 否則設置error
        error = -EEXIST;
        clear_tfile_check_list();
        break;
        case EPOLL_CTL_DEL:
        if (epi)
            error = ep_remove(ep, epi);
        else
            error = -ENOENT;
        break;
    case EPOLL_CTL_MOD:
       if (epi) {
           epds.events |= POLLERR | POLLHUP;
           error = ep_modify(ep, epi, &epds);
       } else
           error = -ENOENT;
           break;
       }
    }
}

ep_insert插入事件

下圖是epoll的數據結構。root指向紅黑樹的樹根;rdlist指向待收割事件的列表ready list:
epoll__tree_readylist

static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd)
{
    int error, revents, pwake = 0;
    unsigned long flags;
    struct epitem *epi;
    struct ep_pqueue epq;

    // 從slab中分配一個epitem
    if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
        return -ENOMEM;

    // 初始化epi
    INIT_LIST_HEAD(&epi->rdllink);
    INIT_LIST_HEAD(&epi->fllink);
    INIT_LIST_HEAD(&epi->pwqlist);
    epi->ep = ep;
    ep_set_ffd(&epi->ffd, tfile, fd);
    epi->event = *event;
    epi->nwait = 0;
    epi->next = EP_UNACTIVE_PTR;

    // 調用tcp_poll
    // 在tcp_sock->sk_sleep中插入一個等待者
    epq.epi = epi;
    init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

    // 0) 向fd添加一個回調讓其有事件發生時通知epoll;
    // 1) 同時, 可能此時已經有事件存在了, revents返回這個事件
    revents = tfile->f_op->poll(tfile, &epq.pt);


    // 把這個epi添加到紅黑樹中
    ep_rbtree_insert(ep, epi);

    error = -EINVAL;
    if (reverse_path_check())
        goto error_remove_epi;

    spin_lock_irqsave(&ep->lock, flags);


    // 如果此時有事件到來,並且沒有把epi添加到就緒隊列,則添加到epoll的就緒隊列
    if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
        list_add_tail(&epi->rdllink, &ep->rdllist);

    // 並且喚醒一個正在等在這個epoll管理的fd的進程
    if (waitqueue_active(&ep->wq))
        wake_up_locked(&ep->wq);
    // 並且喚醒一個正在等在這個epoll本身的進程
    if (waitqueue_active(&ep->poll_wait))
        pwake++;
    }

    spin_unlock_irqrestore(&ep->lock, flags);

    atomic_inc(&ep->user->epoll_watches);

    // 在ep->lock鎖的外面喚醒嵌套epoll
    if (pwake)
        ep_poll_safewake(&ep->poll_wait);
    return 0;
}

在插入一個fd到epoll中會顯示調用一次poll, 對於tcp來說是tcp_poll.
來看看poll是如何初始化和被調用的.

tcp_poll

下圖是網卡硬件中斷觸發epoll_wait返回的調用路徑:
_

// tcp 協議初始化
static struct inet_protosw inetsw_array[] =
{
  {
    .type =       SOCK_STREAM,
    .protocol =   IPPROTO_TCP,
    .prot =       &tcp_prot,
    .ops =        &inet_stream_ops,
    .capability = -1,
    .no_check =   0,
    .flags =      INET_PROTOSW_PERMANENT |
    INET_PROTOSW_ICSK,
  },
...
...
...
}
// tcp_poll才是最終的調用函數
const struct proto_ops inet_stream_ops = {
    .family                       = PF_INET,
    .owner                = THIS_MODULE,
    .bind                 = inet_bind,
    .accept               = inet_accept,
    .poll                 = tcp_poll,
    .listen                           = inet_listen
}

tcp_poll的邏輯

static unsigned int sock_poll(struct file *file, poll_table * wait)
{
    struct socket *sock;
    sock = file->private_data;
    return sock->ops->poll(file, sock, wait);
}

// 0) 注冊事件到tcp中;
// 1) 返回此時已經發生的事件.
unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
    unsigned int mask;
    struct sock *sk = sock->sk;
    struct tcp_sock *tp = tcp_sk(sk);

    // 注冊一個回調到sk->sk_sleep中
    // 注意, wait為空時忽略注冊動作
    poll_wait(file, sk->sk_sleep, wait);

    // 如果是監聽套接字,則inet_csk_listen_poll
    if (sk->sk_state == TCP_LISTEN)
        return inet_csk_listen_poll(sk);

    mask = 0;
    if (sk->sk_err)
        mask = POLLERR;

    // copied_seq 和 rcv_nxt 不相等,則說明有未讀數據出現了
    if ((1 << sk->sk_state) & ~(TCPF_SYN_SENT | TCPF_SYN_RECV)) {
        if ((tp->rcv_nxt != tp->copied_seq) && (tp->urg_seq != tp->copied_seq || tp->rcv_nxt != tp->copied_seq + 1 || sock_flag(sk, SOCK_URGINLINE) || !tp->urg_data))
        mask |= POLLIN | POLLRDNORM;

        if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
        mask |= POLLOUT | POLLWRNORM;
        }
    }
}

看看如何注冊回調到tcp socket中

// 反向調用poll_table->qproc,注冊一個poll_callback
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
    if (p && wait_address)
        p->qproc(filp, wait_address, p);
}


// 注冊poll_callback到sock->sk_sleep上
// 0) file是sock對應的file句柄;
// 1) whead是sock->sk_sleep
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt)
{
    struct epitem *epi = ep_item_from_epqueue(pt);
    struct eppoll_entry *pwq;

    if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, SLAB_KERNEL))) {
        init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
    pwq->whead = whead;
    pwq->base = epi;
    add_wait_queue(whead, &pwq->wait);
    list_add_tail(&pwq->llink, &epi->pwqlist);
    epi->nwait++;
    } else {
        /* We have to signal that an error occurred */
    epi->nwait = -1;
    }
}

// 只要socket上有事件發生就會回調上面注冊的回調

poll_callback的回調

數據包到達:
PKT Arrive INT
--> Driver
--> 0) alloc_skb; 1) netif_rx
--> RX_SOFTIRQ
--> net_rx_action軟中斷處理函數 (dev->poll)
--> process_backlog
--> netif_receive_skb
--> tcp_v4_rcv()
--> tcp_v4_do_rcv
--> tcp_rcv_state_process
--> sock_def_wakeup
--> ep_poll_callback

回調

static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{

    int pwake = 0;
    unsigned long flags;

    // 通過wait找到epoll_entry
    // 通過epoll_entry->base找到epitem
    struct epitem *epi = ep_item_from_wait(wait);
    struct eventpoll *ep = epi->ep;

    spin_lock_irqsave(&ep->lock, flags);

    if (!(epi->event.events & ~EP_PRIVATE_BITS))
        goto out_unlock;

    if (key && !((unsigned long) key & epi->event.events))
        goto out_unlock;

    // 把當前epitem添加到ready list中,等待收割
    if (!ep_is_linked(&epi->rdllink))
        list_add_tail(&epi->rdllink, &ep->rdllist);

    // 在收到數據包的回調中喚醒等待在epll上的進程    
    if (waitqueue_active(&ep->wq))
        wake_up_locked(&ep->wq);
    // 喚醒嵌套epoll的進程
    if (waitqueue_active(&ep->poll_wait))
        pwake++;

    pin_unlock_irqrestore(&ep->lock, flags);

    if (pwake)
        ep_poll_safewake(&ep->poll_wait);

return 1;
}

下面看看用戶態如何收割事件.

epoll事件收割 - epoll_wait

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events, int, maxevents, int, timeout)
{
    error = ep_poll(ep, events, maxevents, timeout);                 
}

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout)
{
    int res = 0, eavail, timed_out = 0;

    if (timeout > 0) {
        struct timespec end_time = ep_set_mstimeout(timeout);

    slack = select_estimate_accuracy(&end_time);
    to = &expires;
    *to = timespec_to_ktime(end_time);
    } else if (timeout == 0) {
        timed_out = 1;
    spin_lock_irqsave(&ep->lock, flags);
    goto check_events;
    }

    fetch_events:
    spin_lock_irqsave(&ep->lock, flags);

     // 如果ready list為空
     if (!ep_events_available(ep)) {
         init_waitqueue_entry(&wait, current);
     wait.flags |= WQ_FLAG_EXCLUSIVE;
     // 把當前進程添加到等待隊列中
     __add_wait_queue(&ep->wq, &wait);

     for (;;) {
         // 設置進程的狀態為TASK_INTERRUPTIBLE,以便在ep_poll_callback將其喚醒
         set_current_state(TASK_INTERRUPTIBLE);
         // ready list非空
         if (ep_events_available(ep) || timed_out)
             break;

            // 有信號返回EINTR 
        if (signal_pending(current)) {
            res = -EINTR;
        break;
            }

        // 解鎖准備調度出去
        spin_unlock_irqrestore(&ep->lock, flags);
        if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
            timed_out = 1;
            // 再次運行后,第一件事就是獲取鎖    
        spin_lock_irqsave(&ep->lock, flags);
        }
    __remove_wait_queue(&ep->wq, &wait);

    set_current_state(TASK_RUNNING);
    }

    eavail = ep_events_available(ep);
    // 開始收割事件
    ep_send_events(ep, events, maxevents);
}


static int ep_send_events(struct eventpoll *ep, struct epoll_event __user *events, int maxevents)
{
    struct ep_send_events_data esed;

    esed.maxevents = maxevents;
    esed.events = events;

    return ep_scan_ready_list(ep, ep_send_events_proc, &esed);
}

static int ep_scan_ready_list(struct eventpoll *ep, int (*sproc)(struct eventpoll *, struct list_head *, void *), void *priv)
{
    int error, pwake = 0;
    unsigned long flags;
    struct epitem *epi, *nepi;
    LIST_HEAD(txlist);

    // 上鎖,和epoll_ctl, epoll_wait互斥
    mutex_lock(&ep->mtx);

    // 原子的置換readlist 到 txlist中
    // 並且開啟ovflist, 使得在sproc執行過程中產生的事件存入其中, 是一個事件的臨時停靠點
    spin_lock_irqsave(&ep->lock, flags);
    list_splice_init(&ep->rdllist, &txlist);
    ep->ovflist = NULL;
    spin_unlock_irqrestore(&ep->lock, flags);

    // 開始調用sproc組織事件到用戶空間的數組中
    error = (*sproc)(ep, &txlist, priv);

    spin_lock_irqsave(&ep->lock, flags);
    for (nepi = ep->ovflist; (epi = nepi) != NULL;
        nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
        // 把sproc執行期間產生的事件加入到ready list中, 但是有可能這些新誕生的事件到目前為止還在txlist中
    // 也就是, 有可能sproc並沒有消耗完本次的ready list,那么剩下的事件要等到下次epoll_wait來收割
    // 所以,
    //     0) 需要去重, 這是通過ep_is_linked(&epi->rdllink)來做到的, 因為如果這個epi在txlist中, 它的rdllikn非空;
    //     1) 需要把還沒有被收割到用戶空間的事件再次的放入ready list中, 並且要保證這些事件在新誕生的事件的前面, 這是通過list_splice做到的.
    if (!ep_is_linked(&epi->rdllink))
        list_add_tail(&epi->rdllink, &ep->rdllist);
    }

    // 關閉ovflist 
    ep->ovflist = EP_UNACTIVE_PTR;

    list_splice(&txlist, &ep->rdllist);

    // 喚醒
    if (!list_empty(&ep->rdllist)) {
        if (waitqueue_active(&ep->wq))
        wake_up_locked(&ep->wq);
    if (waitqueue_active(&ep->poll_wait))
        pwake++;
    }
    spin_unlock_irqrestore(&ep->lock, flags);

    mutex_unlock(&ep->mtx);

    if (pwake)
        ep_poll_safewake(&ep->poll_wait);

    return error;
}

事件是怎么被收割到用戶空間的

static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv)
{
    struct ep_send_events_data *esed = priv;
    int eventcnt;
    unsigned int revents;
    struct epitem *epi;
    struct epoll_event __user *uevent;

    // 這個函數不需要再上鎖了
    // 收割事件的個數上限是esed->maxevents
    for (eventcnt = 0, uevent = esed->events; !list_empty(head) && eventcnt < esed->maxevents;) {
        epi = list_first_entry(head, struct epitem, rdllink);

    // 已經被收割的事件要從txlist中移除掉, 很重要.
    // 因為,並不是txlist上的所有的事件都會被收割到用戶空間
    // 剩下的未收割的事件要再次的放回到ready list
    list_del_init(&epi->rdllink);

    // 顯示的tcp_poll一次事件, 看看這個fd上發生了什么事情, 並和自己關心的事件做交集
    revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) & epi->event.events;

    if (revents) {
        // 回傳到用戶空間            
        if (__put_user(revents, &uevent->events) ||__put_user(epi->event.data, &uevent->data)) {
            list_add(&epi->rdllink, head);
        return eventcnt ? eventcnt : -EFAULT;
        }
        eventcnt++;
        uevent++;
        if (epi->event.events & EPOLLONESHOT)
            epi->event.events &= EP_PRIVATE_BITS;
        else if (!(epi->event.events & EPOLLET)) {
            // 如果是LT模式要再次放入到ready list中
        // 難道這個事件就一直在ready list中了? 用戶態的epoll_wait豈不是每次都會收割到事件?什么時候會被剔除掉?
        // 非也(以讀事件為例):
        //     0) 如果用戶態在epoll_wait中獲取到了一個epi事件, 並沒有處理, 那么這個事件是一直存在在fd上的(舉個例子: 可讀狀態會一直處於可讀, rcv_nxt>copied_seq)
        //     1) 用戶態代碼不讀取數據或僅僅讀取了部分數據, 為了保證LT語義, 下次epoll_wait時候能夠再次獲取到改epi, 這個epi必須要保存到ready list中;
        //     2) 用戶態代碼一直讀取這個fd上的數據直到EGAIN, 下次epoll_wait的時候任然會從ready list中碰到這個事件, 但此時tcp_poll不會返回可讀事件了, 所以此后會從ready list中剔除掉.
        //     3) 也就是, epoll事件的剔除是發生在下一次epoll_wait中
        list_add_tail(&epi->rdllink, &ep->rdllist);
        }
        }
    }
    return eventcnt;
}

自問自答

2_epoll
問: 一個fd加入到多個epoll行為如何?
答: 統一個fd通過epoll_ctl添加到兩個epoll中,在epoll_ctl流程中會通過tcp_poll調用在fd->sock->sk_sleep中插入一個回調。也就是說,兩次epoll_ctl就會往同一個fd的sk_sleep中插入兩個回調。在有事件到來時會遍歷sk_sleep上所有的回調。所以,會觸發兩次epoll_wait返回。
問:epoll的LT和ET如何實現的?和具體的poll()有關嗎?
答:具體的poll()函數是無感知LT和ET的。tcp_poll在state change時候會回調sk_sleep上的回調。epoll在收割事件的時候會判斷是ET還是LT,如果是ET則把epi從ready list移除掉,並且加入到用戶態的events數組中,所以下次epoll_wait就不會收割到這個事件了,除非state change又發生了變化觸發了回調;如果是LT除了把epi加入到用戶態的events數組中,還會再次加入到ready list之后,下次epoll_wait會再次返回,但是並不會始終返回。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM