epoll(2) 源碼分析


epoll(2) 源碼分析

文本內核代碼取自 5.0.18 版本,和上一篇文章中的版本不同是因為另一個電腦出了問題,但是總體差異不大。

引子留下的問題

上一篇文章中留下了幾個問題,本文將針對這幾個問題進行分析:

  1. epoll(2) 得到就緒事件的復雜度為何是 \(O(1)\)
  2. epoll(2) 和普通的文件相比的區別在哪里,比如和 eventfd(2) 比較
  3. epoll(2) 相對 poll(2)/select(2) 多提供了 EPOLLET 的觸發模式,現象在上面可以看到區別,實現是如何做到的。
  4. epoll(2) 相互關注時,有就緒事件到來會產生相互喚醒的問題,為何會出現這樣的問題
  5. 對於問題 4,內核是如何解決這種相互喚醒的問題。

解答在文末.

關鍵的數據結構

在第一次閱讀代碼時,優先掌握該功能的核心數據結構有利於對於全局的把控。

struct eventpoll

struct eventpoll 對應一個 epoll 實例的結構,包含所有的文件事件,作為 epoll 的接口使用。

/*
 * This structure is stored inside the "private_data" member of the file
 * structure and represents the main data structure for the eventpoll
 * interface.
 */
struct eventpoll {
        spinlock_t lock;              // 保護整個數據結構
        struct mutex mtx;             // 保護正在操作的文件

        wait_queue_head_t wq;         // sys_epoll_wait() 使用的等待隊列
        wait_queue_head_t poll_wait;  // epoll 作為被監視文件時 file->poll() 使用的等待隊列,使用較少
                                      // poll_wait 隊列作用和 eventfd 文件中的 wqh 隊列相同

        struct list_head rdllist;     // 就緒的文件鏈表,連接 epitem 上的 rdllink
        struct epitem *ovflist;       // 也是用來串聯就緒的事件,作為 rdlist 的備胎使用

        struct rb_root_cached rbr;    // 所有關注的文件事件的紅黑樹,在內核空間維護

        /* wakeup_source used when ep_scan_ready_list is running */
        struct wakeup_source *ws;     // 不分析該功能,只知道為喚醒源就行
        struct user_struct *user;     // epoll 創建操作所屬的用戶
        struct file *file;            // epoll 關聯的文件結構

        /* used to optimize loop detection check */
        int visited;
        struct list_head visited_list_link;

#ifdef CONFIG_NET_RX_BUSY_POLL
        /* used to track busy poll napi_id */
        unsigned int napi_id;
#endif
};

struct epitem

struct epitem 每個文件描述符添加到 eventpoll 接口將產生一個 epitem項 被鏈接到 eventpoll 中的紅黑樹上。

/*
 * Each file descriptor added to the eventpoll interface will
 * have an entry of this type linked to the "rbr" RB tree.
 * Avoid increasing the size of this struct, there can be many thousands
 * of these on a server and we do not want this to take another cache line.
 */
struct epitem {
        union {
                /* RB tree node links this structure to the eventpoll RB tree */
                struct rb_node rbn;
                /* Used to free the struct epitem */
                struct rcu_head rcu;
        };

        struct list_head rdllink; // 用於連接到 eventpoll->rdllist 的鏈表,和 rdllist 一起使用
        struct epitem *next;      // 連接到 eventpoll->ovflist 的指針,和 ovflist 一起使用
        struct epoll_filefd ffd;  // 文件 file 結構 + fd,作為紅黑樹的節點

        int nwait;                // 附加在 poll 操作上活躍的等待隊列的數量
        /* List containing poll wait queues */
        struct list_head pwqlist; // 注釋是包含輪詢等待隊列的鏈表,但是實際上個人更傾向為這個鏈表只是為了連接 eppoll_entry 結構。
                                  // 和上面那個 nwait 一樣,這兩個變量的添加操作只會發生一次,就是調用 ep_insert() 的時候,但是 epitem 在一個 epoll 實例中只會調用一次。

        struct eventpoll *ep;     // 當前 epitem 的所有者
        struct list_head fllink;  // 連接文件結構的鏈表

        /* wakeup_source used when EPOLLWAKEUP is set */
        struct wakeup_source __rcu *ws;  // 喚醒源,不考慮

        /* The structure that describe the interested events and the source fd */
        struct epoll_event event;  // 用戶傳入的 event 結構
};

struct eppoll_entry

struct eppoll_entry 為文件的等待隊列項回調和epoll相關聯的結構. 類似為poll(2) 中的 poll_table_entry

/* Wait structure used by the poll hooks */
struct eppoll_entry {
        struct list_head llink;   // 連接至 epitem 中的 pwqlist 鏈表中
        struct epitem *base;      // epitem 所屬者

        wait_queue_entry_t wait;  // 等待隊列項
        wait_queue_head_t *whead; // 等待隊列頭,關注文件的等待隊列,如 eventfd->pwh
};

epoll(2) 相關的系統調用

整個 fs/eventpoll.c 的代碼量較多(2000+), 所以這里節選部分主要的代碼進行分析, 一些對於參數的合法性的校驗就不放出來了.

epoll 的實現做了兩種區分: 關注的文件是否為 epoll 類型, 我們先對非epoll文件進行分析, 這個部分代碼比較直觀易懂, 對epoll文件的處理考慮了多種情況, 留作之后分析.

epoll_create(2)

創建一個新的文件描述符, 對應一個 epoll 實例.

  1. 為 eventpoll 結構分配內存並且初始化
  2. 獲取一個新的文件並且與 eventpoll 結構相關聯.
/*
 * Open an eventpoll file descriptor.
 */
static int do_epoll_create(int flags)
{
        int error, fd;
        struct eventpoll *ep = NULL;
        struct file *file;

        error = ep_alloc(&ep);  // 分配內存並初始化, 代碼較直觀, 不做分析

        fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
        if (fd < 0) {
                error = fd;
                goto out_free_ep;
        }
        file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
                                 O_RDWR | (flags & O_CLOEXEC));

        ep->file = file;
        fd_install(fd, file);
}

epoll_ctl(2)

epoll_ctl 為epoll的控制函數, 根據函數的 @op 入參分發需要進行的操作.

函數的功能主體比較清晰, 也分為兩部分:

  1. 對監視文件為epoll經行循環檢測
  2. 根據操作類型分發具體執行的函數
/*
 * The following function implements the controller interface for
 * the eventpoll file that enables the insertion/removal/change of
 * file descriptors inside the interest set.
 */
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
                struct epoll_event __user *, event)
{
        // 加鎖部分為對監視的文件是epoll時候進行的循環檢測, 這部分后面分析, 這里只看非 epoll 文件的處理
        mutex_lock_nested(&ep->mtx, 0);
        if (op == EPOLL_CTL_ADD) {
                if (!list_empty(&f.file->f_ep_links) ||
                                                is_file_epoll(tf.file)) {
                        full_check = 1;
                        mutex_unlock(&ep->mtx);
                        mutex_lock(&epmutex);
                        if (is_file_epoll(tf.file)) {
                                error = -ELOOP;
                                if (ep_loop_check(ep, tf.file) != 0) {
                                        clear_tfile_check_list();
                                        goto error_tgt_fput;
                                }
                        } else
                                list_add(&tf.file->f_tfile_llink,
                                                        &tfile_check_list);
                        mutex_lock_nested(&ep->mtx, 0);
                        if (is_file_epoll(tf.file)) {
                                tep = tf.file->private_data;
                                mutex_lock_nested(&tep->mtx, 1);
                        }
                }
        }

        /*
         * Try to lookup the file inside our RB tree, Since we grabbed "mtx"
         * above, we can be sure to be able to use the item looked up by
         * ep_find() till we release the mutex.
         */
        epi = ep_find(ep, tf.file, fd);  // 從紅黑樹中尋找操作的文件

        error = -EINVAL;
        switch (op) {
        case EPOLL_CTL_ADD:
                if (!epi) {  // 不存在就插入到eventpoll中
                        epds.events |= EPOLLERR | EPOLLHUP;
                        error = ep_insert(ep, &epds, tf.file, fd, full_check);
                } else
                        error = -EEXIST;
                if (full_check)
                        clear_tfile_check_list();
                break;
        case EPOLL_CTL_DEL:
                if (epi)
                        error = ep_remove(ep, epi);
                else
                        error = -ENOENT;
                break;
        case EPOLL_CTL_MOD:
                if (epi) {
                        if (!(epi->event.events & EPOLLEXCLUSIVE)) {
                                epds.events |= EPOLLERR | EPOLLHUP;
                                error = ep_modify(ep, epi, &epds);
                        }
                } else
                        error = -ENOENT;
                break;
        }
        if (tep != NULL)
                mutex_unlock(&tep->mtx);
        mutex_unlock(&ep->mtx);

        return error;
}

ep_insert()

分配一個 epitem 的內存並初始化, 再將該 epitem 添加到 eventpoll 中的紅黑樹上.

初始化過程也包含了幾個部分:

  1. 對 epitem 結構進行初始化, 設置各成員變量的值.
  2. 調用目標文件的file->f_op->poll() 函數設置等待隊列項回調函數, 這個是實現 epoll_wait(2) 復雜度為 \(O(1)\) 最重要的一步, 關注的文件產生就緒事件就會調用該回調函數 ep_ptable_queue_proc
  3. 返回就緒事件掩碼, 將當前 epitem 添加到 eventpoll->rdllist 中, 喚醒 epoll_wait(2) 線程
/*
 * Must be called with "mtx" held.
 */
static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
                     struct file *tfile, int fd, int full_check)
{
        int error, pwake = 0;
        __poll_t revents;
        struct epitem *epi;

        if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
                return -ENOMEM;

        /* Item initialization follow here ... */


        /* Initialize the poll table using the queue callback */
        epq.epi = epi;
        init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

        /*
         * Attach the item to the poll hooks and get current event bits.
         * We can safely use the file* here because its usage count has
         * been increased by the caller of this function. Note that after
         * this operation completes, the poll callback can start hitting
         * the new item.
         */
        revents = ep_item_poll(epi, &epq.pt, 1);

        /* Add the current item to the list of active epoll hook for this file */
        spin_lock(&tfile->f_lock);
        list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links);  // 將當前 epitem 添加到監視文件的 f_ep_links 鏈表上.
        spin_unlock(&tfile->f_lock);

        /*
         * Add the current item to the RB tree. All RB tree operations are
         * protected by "mtx", and ep_insert() is called with "mtx" held.
         */
        ep_rbtree_insert(ep, epi); // 將當前 epitem 添加到eventpoll的紅黑樹中

        /* If the file is already "ready" we drop it inside the ready list */
        if (revents && !ep_is_linked(epi)) { // 產生就緒事件並且當前 epitem 未添加進 eventpoll 中(這個有點兒明顯)
                list_add_tail(&epi->rdllink, &ep->rdllist); // 添加至 ep->rdllist, 留意這兩個鏈表是一起出現的

                /* Notify waiting tasks that events are available */
                if (waitqueue_active(&ep->wq))  // wq 隊列是 epoll_wait(2) 使用的, 這里喚醒調用 epoll_wait(2) 進入阻塞狀態的線程.
                        wake_up_locked(&ep->wq);
                if (waitqueue_active(&ep->poll_wait))  // 這里不直接喚醒是加鎖的原因, poll_wait 隊列屬於被監視文件使用, 不應該在epoll實例中喚醒
                        pwake++;
        }

        spin_unlock_irq(&ep->wq.lock);

        /* We have to call this outside the lock */
        if (pwake)
                ep_poll_safewake(&ep->poll_wait);

}

ep_remove()

作用和 ep_insert() 相反, 釋放內存, 刪除與其它資源相關聯的連接, 在互斥量 eventpoll->mtx 加鎖下進行.

/*
 * Removes a "struct epitem" from the eventpoll RB tree and deallocates
 * all the associated resources. Must be called with "mtx" held.
 */
static int ep_remove(struct eventpoll *ep, struct epitem *epi)
{
        struct file *file = epi->ffd.file;

        lockdep_assert_irqs_enabled();

        /*
         * Removes poll wait queue hooks.
         */
        ep_unregister_pollwait(ep, epi);  // 刪除 epitem->pwqlist 關聯的等待項鏈表

        /* Remove the current item from the list of epoll hooks */
        spin_lock(&file->f_lock);
        list_del_rcu(&epi->fllink);  // 從監視文件中的 file->f_ep_links 鏈表中刪除當前 epitem
        spin_unlock(&file->f_lock);

        rb_erase_cached(&epi->rbn, &ep->rbr);  // 從 eventpoll 中的紅黑樹中刪除當前 epitem 節點

        spin_lock_irq(&ep->wq.lock);
        if (ep_is_linked(epi))
                list_del_init(&epi->rdllink);  // 從 eventpoll 中的就緒隊列 rdllist 中刪除當前 epitem 節點
        spin_unlock_irq(&ep->wq.lock);

        /*
         * At this point it is safe to free the eventpoll item. Use the union
         * field epi->rcu, since we are trying to minimize the size of
         * 'struct epitem'. The 'rbn' field is no longer in use. Protected by
         * ep->mtx. The rcu read side, reverse_path_check_proc(), does not make
         * use of the rbn field.
         */
        call_rcu(&epi->rcu, epi_rcu_free);  // 釋放當前 epitem 的內存

        atomic_long_dec(&ep->user->epoll_watches);  // eventpoll 所屬用戶監視的 epitem數量減一

        return 0;
}

ep_modify()

調整關注文件的事件.

/*
 * Modify the interest event mask by dropping an event if the new mask
 * has a match in the current file status. Must be called with "mtx" held.
 */
static int ep_modify(struct eventpoll *ep, struct epitem *epi,
                     const struct epoll_event *event)
{
        int pwake = 0;
        poll_table pt;

        lockdep_assert_irqs_enabled();

        // 設置 file->f_op->poll 的回調函數為NULL, 因為在insert中已經設置了文件等待隊列項的回調函數
        init_poll_funcptr(&pt, NULL);

        /*
         * Get current event bits. We can safely use the file* here because
         * its usage count has been increased by the caller of this function.
         * If the item is "hot" and it is not registered inside the ready
         * list, push it inside.
         */
        if (ep_item_poll(epi, &pt, 1)) {  // 調用f_op->poll() 獲取文件的就緒事件
                spin_lock_irq(&ep->wq.lock);
                if (!ep_is_linked(epi)) {  // 未添加至 eventpoll 接口的就緒隊列中
                        list_add_tail(&epi->rdllink, &ep->rdllist); // 添加
                        ep_pm_stay_awake(epi);  // 電源管理的函數, 不看

                        /* Notify waiting tasks that events are available */
                        if (waitqueue_active(&ep->wq))
                                wake_up_locked(&ep->wq); // 喚醒調用 epoll_wait(2) 的線程
                        if (waitqueue_active(&ep->poll_wait))  // 分析同 ep_insert()
                                pwake++;
                }
                spin_unlock_irq(&ep->wq.lock);
        }

        /* We have to call this outside the lock */
        if (pwake)
                ep_poll_safewake(&ep->poll_wait);

        return 0;
}

epoll_wait(2)

等待就緒的事件。

ep_events_available 為檢查是否存在就緒事件,其實就是檢查 rdllist 和 ovflist 是否有被修改過,復雜度為 \(O(1)\).

static inline int ep_events_available(struct eventpoll *ep)
{
        return !list_empty_careful(&ep->rdllist) ||
                READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR;
}

epoll_wait(2) 的主要邏輯由 ep_poll() 實現,核心邏輯分為兩部分

  1. 檢查就緒事件是否存在,存在執行 2,不存在根據超時時間進入阻塞狀態和直接返回。
  2. 將就緒事件復制到用戶空間,若是復制失敗,在條件(見代碼分析)滿足的情況下執行 1,成功則返回。
/*
 * Implement the event wait interface for the eventpoll file. It is the kernel
 * part of the user space epoll_wait(2).
 */
static int do_epoll_wait(int epfd, struct epoll_event __user *events,
                         int maxevents, int timeout)
{
        /*
         * At this point it is safe to assume that the "private_data" contains
         * our own data structure.
         */
        ep = f.file->private_data;

        /* Time to fish for events ... */
        error = ep_poll(ep, events, maxevents, timeout);
}

/*
 * ep_poll - Retrieves ready events, and delivers them to the caller supplied
 *           event buffer.
 */
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
                   int maxevents, long timeout)
{
        int res = 0, eavail, timed_out = 0;
        u64 slack = 0;
        bool waiter = false;
        wait_queue_entry_t wait;
        ktime_t expires, *to = NULL;

        lockdep_assert_irqs_enabled();

        // 時間處理,略過

fetch_events:  // 這為一整個獲取就緒事件邏輯的開端

        if (!ep_events_available(ep))  // 無就緒事件
                ep_busy_loop(ep, timed_out);  // 中斷緩解技術對 中斷頻繁的設置

        eavail = ep_events_available(ep);  // 有就緒事件
        if (eavail)
                goto send_events;  // 直接goto發送數據


        /*
         * We don't have any available event to return to the caller.  We need
         * to sleep here, and we will be woken by ep_poll_callback() when events
         * become available.
         */
        if (!waiter) {  // 無數據,需要等待
                waiter = true;  // 設置等待標識
                init_waitqueue_entry(&wait, current); // 初始化等待隊列項

                spin_lock_irq(&ep->wq.lock);
                __add_wait_queue_exclusive(&ep->wq, &wait); // 投入到 ep->wq 的等待隊列中
                spin_unlock_irq(&ep->wq.lock);
        }

        for (;;) {  // 進入無限循環
                /*
                 * We don't want to sleep if the ep_poll_callback() sends us
                 * a wakeup in between. That's why we set the task state
                 * to TASK_INTERRUPTIBLE before doing the checks.
                 */
                set_current_state(TASK_INTERRUPTIBLE);  // 設置可中斷運行狀態
                /*
                 * Always short-circuit for fatal signals to allow
                 * threads to make a timely exit without the chance of
                 * finding more events available and fetching
                 * repeatedly.
                 */
                if (fatal_signal_pending(current)) {  // 先判斷致命錯誤信號
                        res = -EINTR;
                        break;
                }

                eavail = ep_events_available(ep);  // 再判斷是否有就緒事件的產生,有的話推出循環
                if (eavail)
                        break;
                if (signal_pending(current)) {  // 非致命錯誤信號產生,中斷去處理該中斷
                        res = -EINTR;
                        break;
                }

                // 超時調度
                if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS)) {
                        timed_out = 1;
                        break;
                }
        }

        __set_current_state(TASK_RUNNING);

send_events:  // 將就緒事件復制到用戶空間邏輯開端
        // 1.沒有錯誤產生 2.有就緒事件 3.事件復制到用戶空間失敗 4.未超時
        // 滿足以上4個條件的情況下重新進行獲取就緒事件邏輯
        if (!res && eavail &&
            !(res = ep_send_events(ep, events, maxevents)) && !timed_out)
                goto fetch_events;

        // 在等待標志設置的情況下,需要把已添加等待隊列節點刪除。
        if (waiter) {
                spin_lock_irq(&ep->wq.lock);
                __remove_wait_queue(&ep->wq, &wait);
                spin_unlock_irq(&ep->wq.lock);
        }

        return res;
}

ep_send_events 將就緒的事件復制至用戶空間,ep_send_events_proc 為實際的執行函數,ep_scan_ready_list 為輔助函數,這個函數放在后面具體說明,這里只看 ep_send_events_proc 的實現。

static int ep_send_events(struct eventpoll *ep,
                          struct epoll_event __user *events, int maxevents)
{
        struct ep_send_events_data esed;

        esed.maxevents = maxevents;
        esed.events = events;

        ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
        return esed.res;
}


static __poll_t ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
                               void *priv)
{
        struct ep_send_events_data *esed = priv;
        __poll_t revents;
        struct epitem *epi, *tmp;
        struct epoll_event __user *uevent = esed->events;
        struct wakeup_source *ws;
        poll_table pt;

        init_poll_funcptr(&pt, NULL);  // 初始化poll_table, 但是並不設置 file->f_op->poll 的回調函數
        esed->res = 0;

        /*
         * We can loop without lock because we are passed a task private list.
         * Items cannot vanish during the loop because ep_scan_ready_list() is
         * holding "mtx" during this call.
         */
        lockdep_assert_held(&ep->mtx);

        // head 實際上為 rdllist,遍歷就緒文件鏈表
        list_for_each_entry_safe(epi, tmp, head, rdllink) {
                if (esed->res >= esed->maxevents) // 超過用戶的提供的緩沖區大小,maxevents 為 epoll_wait(2) 的第3個參數
                        break;

                // __pm_stay_awake(ep->ws);
                // 為電源保持喚醒狀態的處理,略過這部分邏輯

                list_del_init(&epi->rdllink);  // 從就緒文件鏈表中刪除當前事件

                /*
                 * If the event mask intersect the caller-requested one,
                 * deliver the event to userspace. Again, ep_scan_ready_list()
                 * is holding ep->mtx, so no operations coming from userspace
                 * can change the item.
                 */
                revents = ep_item_poll(epi, &pt, 1);  // 調用 file->f_op->poll() 獲取就緒事件的掩碼
                if (!revents)  // 無關注的就緒事件,抬走下一個就緒文件
                        continue;

                // 復制就緒事件至用戶空間
                if (__put_user(revents, &uevent->events) ||
                    __put_user(epi->event.data, &uevent->data)) {
                        list_add(&epi->rdllink, head);  // 復制失敗,將當前就緒文件重新鏈接至就緒文件鏈表中
                        ep_pm_stay_awake(epi);
                        if (!esed->res)  // 如果一個事件都沒有復制,就產生致命錯誤,畢竟連個毛都沒有撈着有點氣
                                esed->res = -EFAULT;
                        return 0;
                }
                esed->res++;  // 成功復制的數量
                uevent++;     // 用戶空間的緩沖區增長一下
                if (epi->event.events & EPOLLONESHOT)  // 用戶設置了 EPOLLONESHOT的情況下
                        epi->event.events &= EP_PRIVATE_BITS; // 重新設置關注的事件,見 ep_poll_callback 分析
                else if (!(epi->event.events & EPOLLET)) {
                        // 未設置邊緣觸發模式,則將當前就緒文件添加回就緒文件鏈表中
                        // 這里就區分了邊緣觸發和水平觸發,水平觸發必須每次epoll_wait(2)調用都檢查就緒文件的事件
                        list_add_tail(&epi->rdllink, &ep->rdllist);
                        ep_pm_stay_awake(epi);
                }
        }

        return 0;
}

相關就緒事件邏輯

ep_scan_ready_list 用來掃描就緒文件鏈表上的就緒文件並處理;ep_poll_callback 用來向就緒文件鏈表中添加就緒文件。

ep_scan_ready_list

就緒文件是否存在判斷了兩個變量,rdllist 上面分析過了,還有一個備胎的就緒文件鏈表 ovflist,備胎 ovflist 是在 rdllist 被占用的情況下使用的。

ep_scan_ready_list 對兩個就緒文件列表進行掃描找出就緒的文件,rdllist 是單獨放在回調函數中進行處理的,ovflist 是直接在 ep_scan_ready_list 中處理的。

  1. 在對 rdllist 鏈表處理的回調函數執行前,改變 ovflist 的狀態,使得在回調函數執行期間ovflist可以串聯起就緒的文件。
  2. 執行回調函數。
  3. 遍歷ovflist,將鏈表上的就緒文件對應的 epitem 插入到 rdllist 上,並且喚醒調用 epoll_wait(2) 線程。
/**
 * ep_scan_ready_list - Scans the ready list in a way that makes possible for
 *                      the scan code, to call f_op->poll(). Also allows for
 *                      O(NumReady) performance.
 * Returns: The same integer error code returned by the @sproc callback.
 */
static __poll_t ep_scan_ready_list(struct eventpoll *ep,
                              __poll_t (*sproc)(struct eventpoll *,
                                           struct list_head *, void *),
                              void *priv, int depth, bool ep_locked)
{
        __poll_t res;
        int pwake = 0;
        unsigned long flags;
        struct epitem *epi, *nepi;
        LIST_HEAD(txlist);  // 初始化一個鏈表,將 eventpoll->rdllist 轉移到該鏈表上

        /*
         * Steal the ready list, and re-init the original one to the
         * empty list. Also, set ep->ovflist to NULL so that events
         * happening while looping w/out locks, are not lost. We cannot
         * have the poll callback to queue directly on ep->rdllist,
         * because we want the "sproc" callback to be able to do it
         * in a lockless way.
         */
        spin_lock_irqsave(&ep->lock, flags);
        list_splice_init(&ep->rdllist, &txlist);  // txlist = rdllist; init(rdllist)
        // 這里做一個狀態的轉換,在文件的等待列隊項回調函數(ep_poll_callback)執行時會判斷ovflist是否發生改變,
        // 而NULL則為本函數后面的遍歷操作使用
        ep->ovflist = NULL;
        spin_unlock_irqrestore(&ep->lock, flags);

        /*
         * Now call the callback function.
         */
        res = (*sproc)(ep, &txlist, priv);

        spin_lock_irqsave(&ep->lock, flags);
        /*
         * During the time we spent inside the "sproc" callback, some
         * other events might have been queued by the poll callback.
         * We re-insert them inside the main ready-list here.
         */
        for (nepi = ep->ovflist; (epi = nepi) != NULL;
             nepi = epi->next, epi->next = EP_UNACTIVE_PTR) { // 每次訪問的 epi->next 回到最初狀態
                /*
                 * We need to check if the item is already in the list.
                 * During the "sproc" callback execution time, items are
                 * queued into ->ovflist but the "txlist" might already
                 * contain them, and the list_splice() below takes care of them.
                 */
                // 保留這段代碼的注釋
                if (!ep_is_linked(&epi->rdllink)) { // 未被包含進 txlist(rdllist)
                        list_add_tail(&epi->rdllink, &ep->rdllist);  // 添加至正統就緒文件鏈表 rdllist 中
                        ep_pm_stay_awake(epi);
                }
        }
        /*
         * We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
         * releasing the lock, events will be queued in the normal way inside
         * ep->rdllist.
         */
        ep->ovflist = EP_UNACTIVE_PTR;  // 恢復 ovflist的狀態, 備胎的壽命到此結束,投下一次胎(下次調用 ep_scan_ready_list)

        /*
         * Quickly re-inject items left on "txlist".
         */
        list_splice(&txlist, &ep->rdllist);  // 把對txlist 處理的結果,加入到rdllist,
        __pm_relax(ep->ws);

        if (!list_empty(&ep->rdllist)) {  // 如果還有就緒事件,則喚醒epoll_wait(2)線程
                /*
                 * Wake up (if active) both the eventpoll wait list and
                 * the ->poll() wait list (delayed after we release the lock).
                 */
                if (waitqueue_active(&ep->wq))
                        wake_up_locked(&ep->wq);
                if (waitqueue_active(&ep->poll_wait))
                        pwake++;
        }
        spin_unlock_irqrestore(&ep->lock, flags);

        if (!ep_locked)
                mutex_unlock(&ep->mtx);

        /* We have to call this outside the lock */
        if (pwake)
                ep_poll_safewake(&ep->poll_wait);

        return res;
}

ep_poll_callback

以上為對就緒鏈表有數據的處理,ep_poll_callback 為向就緒鏈表添加數據的回調函數。

ep_poll_callback() 在 ep_insert() 中被設置為目標文件的等待隊列項回調函數,當文件的狀態發生改變時,會調用 ep_poll_callback() 把目標文件對應的 epitem 添加至就緒鏈表中。

ewake 為獨占喚醒標志,ep_poll_callback 在 __wake_up_comm 中被調用

/*
 * This is the callback that is passed to the wait queue wakeup
 * mechanism. It is called by the stored file descriptors when they
 * have events to report.
 */
static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
{
        int pwake = 0;
        unsigned long flags;
        struct epitem *epi = ep_item_from_wait(wait);
        struct eventpoll *ep = epi->ep;
        __poll_t pollflags = key_to_poll(key);
        int ewake = 0;

        spin_lock_irqsave(&ep->lock, flags);

        if (!(epi->event.events & ~EP_PRIVATE_BITS))  // 對EPOLLONESHOT標志的處理
                goto out_unlock;

        // ep->ovflist 的狀態修改后作為備胎使用
        if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
                if (epi->next == EP_UNACTIVE_PTR) {  // 未連接到 ovflist 中
                        epi->next = ep->ovflist;     // 頭插法插入鏈表中
                        ep->ovflist = epi;
                }
                goto out_unlock;  // 不能進入正統就緒鏈表 rdllist的處理邏輯中,其實這里可用不用goto的
        }

        /* If this file is already in the ready list we exit soon */
        if (!ep_is_linked(&epi->rdllink)) {  // 當前epitem未連接至就緒列表中時,添加至就緒文件鏈表 rdllist 中
                list_add_tail(&epi->rdllink, &ep->rdllist);
                ep_pm_stay_awake_rcu(epi);
        }

        // EPOLLEXCLUSIVE 標志的處理,決定返回的參數
        // pollflags 為目標文件的就緒的事件掩碼,可閱讀 eventfd_read 的源碼
        // 1.關注的事件中出現了讀寫就緒的事件 2.就緒的事件不為讀寫事件,滿足其中之一則獨占喚醒的標志置 1
        if (waitqueue_active(&ep->wq)) {
                if ((epi->event.events & EPOLLEXCLUSIVE) &&
                                        !(pollflags & POLLFREE)) {
                        switch (pollflags & EPOLLINOUT_BITS) {
                        case EPOLLIN:
                                if (epi->event.events & EPOLLIN)
                                        ewake = 1;
                                break;
                        case EPOLLOUT:
                                if (epi->event.events & EPOLLOUT)
                                        ewake = 1;
                                break;
                        case 0:   
                                ewake = 1;
                                break;
                        }
                }
                wake_up_locked(&ep->wq);
        }
        if (waitqueue_active(&ep->poll_wait))
                pwake++;

out_unlock:
        spin_unlock_irqrestore(&ep->lock, flags);

        /* We have to call this outside the lock */
        if (pwake)
                ep_poll_safewake(&ep->poll_wait);  // 分析同上

        if (!(epi->event.events & EPOLLEXCLUSIVE))  // 未設置獨占喚醒標志
                ewake = 1;

        return ewake;
}

文件狀態發生改變調用文件的等待隊列項上的回調函數 func,在epoll中也就是 ep_poll_callback,ret 為回調函數的獨占喚醒標志。
當等待項設置了WQ_FLAG_EXCLUSIVE標志,只要回調函數返回了獨占喚醒標志,在獨占喚醒計數消耗完時,退出循環。

如果當前文件被多個 epoll 監視,那么這個文件的等待隊列上就有多個 ep_poll_callback 回調函數,現在設置了等待項的標志 wait_entry->flags |= WQ_FLAG_EXCLUSIVE 后,只會執行 nr_exclusive 個回調函數,而 nr_exclusive 在epoll中為 1,所以就只會把就緒的文件添加到一個 epoll 中,這樣避免了驚群效應.

代碼位置 kernel/sched/wait.c, __wake_up_comm()

list_for_each_entry_safe_from(curr, next, &wq_head->head, entry) {
        unsigned flags = curr->flags;
        ret = curr->func(curr, mode, wake_flags, key);
        if (ret < 0)
                break;
        if (ret && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
                break;
}

epoll 間的相互影響及處理

產生這個最根本的原因就是 epoll作為一個文件既可以監視其他文件,也可以被其他epoll監視。這樣就產生了一個監視的有向圖

ep_eventpoll_poll

ep_eventpoll_poll 文件的poll操作,也就是file->f_op->poll(). 調用該函數可以獲取就緒文件的事件掩碼,但是 epoll 文件只提供讀就緒事件,並且讀就緒事件是由非epoll文件的就緒事件決定的。也就是說當一個epoll文件被 select(2)/poll(2)/epoll(2) 監聽時,必須該epoll已經監聽了其他的非epoll文件(如eventfd), 在調用 該epoll file->f_op->poll() 時才可能返回可讀的就緒事件。

static __poll_t ep_eventpoll_poll(struct file *file, poll_table *wait)
{
        struct eventpoll *ep = file->private_data;
        int depth = 0;

        /* Insert inside our poll wait queue */
        poll_wait(file, &ep->poll_wait, wait);

        /*
         * Proceed to find out if wanted events are really available inside
         * the ready list.
         */
        return ep_scan_ready_list(ep, ep_read_events_proc,
                                  &depth, depth, false);
}

對單獨的 epitem 執行 poll 操作,獲取就緒的文件事件掩碼。

  1. 如果是非 epoll 文件,則執行 file->f_op->poll 操作。
  2. 如果是 epoll 文件,則掃描該 epoll 中就緒文件鏈表上的 epitem 是否就緒,這里產生了一個遞歸。

1 是遞歸的基准情況,而 ep_scan_ready_list 負責為向前推進


static __poll_t ep_item_poll(const struct epitem *epi, poll_table *pt,
                                 int depth)
{
        struct eventpoll *ep;
        bool locked;

        pt->_key = epi->event.events;
        if (!is_file_epoll(epi->ffd.file))
                return epi->ffd.file->f_op->poll(epi->ffd.file, pt) &
                       epi->event.events;

        ep = epi->ffd.file->private_data;
        poll_wait(epi->ffd.file, &ep->poll_wait, pt);
        locked = pt && (pt->_qproc == ep_ptable_queue_proc);

        return ep_scan_ready_list(epi->ffd.file->private_data,
                                  ep_read_events_proc, &depth, depth,
                                  locked) & epi->event.events;
}


// 實際執行讀取事件的函數
static __poll_t ep_read_events_proc(struct eventpoll *ep, struct list_head *head,
                               void *priv)
{
        struct epitem *epi, *tmp;
        poll_table pt;
        int depth = *(int *)priv;

        init_poll_funcptr(&pt, NULL);
        depth++;

        // 遍歷該 epoll 中的 rdllist(參數為head)
        list_for_each_entry_safe(epi, tmp, head, rdllink) {
                if (ep_item_poll(epi, &pt, depth)) {  // 獲取該 epitem 的就緒事件,若就緒,則返回為可讀的就緒事件掩碼
                        return EPOLLIN | EPOLLRDNORM;
                } else {  // 未就緒抬走下一個 epitem
                        /*
                         * Item has been dropped into the ready list by the poll
                         * callback, but it's not actually ready, as far as
                         * caller requested events goes. We can remove it here.
                         */
                        __pm_relax(ep_wakeup_source(epi));
                        list_del_init(&epi->rdllink);  // 無就緒事件,將當前epitem 從所有者 eventpoll 的就緒鏈表中刪除
                }
        }

        return 0;  // 當前這個 epitem 沒有就緒的事件產生。
}

深度遞歸調用及死循環的檢測

現在有三個文件

  1. eventfd efd
  2. epoll epfd1
  3. epoll epfd2

操作如下:

  • epoll_ctl(epfd1, EPOLL_CTL_ADD, efd, IN | OUT);
  • epoll_ctl(epfd2, EPOLL_CTL_ADD, epfd1, IN | OUT);

現在這三者的關系如下:

  • epfd1 \(\in\) struct_ctx(efd).wqh
  • epitem(efd) \(\in\) eventpoll(epfd1) , epfd2 \(\in\) eventpoll(epfd1).poll_wait

現在efd就緒,產生了 IN | OUT 事件,這個時候調用 ep_poll_callback(epfd1) 將 epitem(efd) 添加到 eventpoll(epfd1).rdllist 上,喚醒 epoll_wait(2) 和 eventpoll(epfd1).poll_wait 上的等待項, 這里再調用 ep_poll_callback(epfd2) 將 epitem(epfd1) 添加至 eventpoll(epfd2).rdllist 上,喚醒 epoll_wait(2).

關鍵點到了,如果現在epfd1 也監視 epfd2

  • 操作 epoll_ctl(epfd1, EPOLL_CTL_ADD, epfd2, IN | OUT).
  • 那么 ep_poll_callback(epfd1) \(\in\) eventpoll(epfd2).poll_wait.

在 ep_poll_callback(epfd2) 執行時,又會喚醒 eventpoll(epfd2).poll_wait 上的等待項,也就是 ep_poll_callback(epfd1). 所以就有可能出現死循環遞歸。

ep_call_nested 函數用來檢測嵌套調用,就是針對 epitem 為 epoll 文件的處理。
我們可以將 epoll 比作一個樹里面的一個節點,eventfd 這種文件只能作為葉節點使用,而 epoll 可以不是葉節點。現在我們對這棵樹(如果出現相互監視,就變成了圖)進行遍歷,用 visited 作為已訪問標志,檢測到其結構中的epitem的文件類型只要是epoll 文件就繼續向前推進(訪問其子epitem),每次向前推進的時候進行檢測,判斷是否出現死循環或者遞歸深度超出范圍。

和上面的 ep_scan_ready_list 處理邏輯有一點相近,就是遍歷這些 epoll 文件形成圖

static int ep_loop_check_proc(void *priv, void *cookie, int call_nests)
{
        int error = 0;
        struct file *file = priv;
        struct eventpoll *ep = file->private_data;
        struct eventpoll *ep_tovisit;
        struct rb_node *rbp;
        struct epitem *epi;

        mutex_lock_nested(&ep->mtx, call_nests + 1);
        ep->visited = 1;  // 優化處理,已訪問標志
        list_add(&ep->visited_list_link, &visited_list);
        for (rbp = rb_first_cached(&ep->rbr); rbp; rbp = rb_next(rbp)) {
                epi = rb_entry(rbp, struct epitem, rbn);
                if (unlikely(is_file_epoll(epi->ffd.file))) {  // epoll 文件
                        ep_tovisit = epi->ffd.file->private_data;
                        if (ep_tovisit->visited)
                                continue;
                        // 繼續向前推進,遞歸檢測
                        error = ep_call_nested(&poll_loop_ncalls, EP_MAX_NESTS,
                                        ep_loop_check_proc, epi->ffd.file,
                                        ep_tovisit, current);
                        if (error != 0)
                                break;
                } else {
                        // 該item未添加至文件檢測鏈表中(喚醒風暴檢測使用),是的 epoll 雖然叫文件,可是這里並不是一等公民。
                        if (list_empty(&epi->ffd.file->f_tfile_llink))
                                list_add(&epi->ffd.file->f_tfile_llink,
                                         &tfile_check_list);
                }
        }
        mutex_unlock(&ep->mtx);

        return error;
}

static int ep_call_nested(struct nested_calls *ncalls, int max_nests,
                          int (*nproc)(void *, void *, int), void *priv,
                          void *cookie, void *ctx)
{
        int error, call_nests = 0;
        unsigned long flags;
        struct list_head *lsthead = &ncalls->tasks_call_list;
        struct nested_call_node *tncur;
        struct nested_call_node tnode;

        spin_lock_irqsave(&ncalls->lock, flags);

        list_for_each_entry(tncur, lsthead, llink) {
                // call_nests 為嵌套的調用深度,cookie 為 eventpoll 結構,ctx 為當前的任務 struct_task,不懂為何呀用當前任務做限定。
                if (tncur->ctx == ctx &&
                    (tncur->cookie == cookie || ++call_nests > max_nests)) {
                        error = -1;
                        goto out_unlock;
                }
        }

        /* Add the current task and cookie to the list */
        tnode.ctx = ctx;
        tnode.cookie = cookie;
        list_add(&tnode.llink, lsthead);  // 滿足條件就添加到靜態鏈表中

        spin_unlock_irqrestore(&ncalls->lock, flags);

        /* Call the nested function */
        error = (*nproc)(priv, cookie, call_nests);  // 繼續調用向前推進

        /* Remove the current task from the list */
        spin_lock_irqsave(&ncalls->lock, flags);
        list_del(&tnode.llink);
out_unlock:
        spin_unlock_irqrestore(&ncalls->lock, flags);

        return error;
}

喚醒風暴的處理

在進行插入的時候調用該檢測操作,作為預防使用;而 EPOLLEXCLUSIVE 產生就緒事件后的處理,兩者作用不能混淆。

喚醒風暴的檢查為設定一個限制,epoll 允許喚醒的最大深度為 5,一個文件最多喚醒 path_limits[深度] 的epoll描述符。牽扯到遞歸的深度,自然是少不了 ep_call_nested 這個檢測函數了。
先看 reverse_path_check 的返回,只有兩種情況:

  1. -1 超出最該深度允許喚醒的epoll描述符
  2. 0 在正常范圍內
#define PATH_ARR_SIZE 5
static const int path_limits[PATH_ARR_SIZE] = { 1000, 500, 100, 50, 10 };
static int path_count[PATH_ARR_SIZE];

static int path_count_inc(int nests)
{
        /* Allow an arbitrary number of depth 1 paths */
        if (nests == 0)
                return 0;

        if (++path_count[nests] > path_limits[nests])
                return -1;
        return 0;
}

static void path_count_init(void)
{
        int i;

        for (i = 0; i < PATH_ARR_SIZE; i++)
                path_count[i] = 0;
}

static int reverse_path_check_proc(void *priv, void *cookie, int call_nests)
{
        int error = 0;
        struct file *file = priv;
        struct file *child_file;
        struct epitem *epi;

        /* CTL_DEL can remove links here, but that can't increase our count */
        rcu_read_lock();
        // 遍歷該文件上的 epoll 節點 epitem
        list_for_each_entry_rcu(epi, &file->f_ep_links, fllink) {
                child_file = epi->ep->file;  // 該 epitem 所屬 epoll 實例
                if (is_file_epoll(child_file)) {  // 文件應該必為 epoll 的
                        if (list_empty(&child_file->f_ep_links)) {  // epoll 未被監視
                                if (path_count_inc(call_nests)) {  // 判斷是否滿足調用深度的條件
                                        error = -1;
                                        break;  // 不滿足直接返回
                                }
                        } else {  // 被監視,那就繼續調用,往前推進
                                error = ep_call_nested(&poll_loop_ncalls,
                                                        EP_MAX_NESTS,
                                                        reverse_path_check_proc,
                                                        child_file, child_file,
                                                        current);
                        }
                        if (error != 0)
                                break;
                } else {
                        printk(KERN_ERR "reverse_path_check_proc: "
                                "file is not an ep!\n");
                }
        }
        rcu_read_unlock();
        return error;
}

static int reverse_path_check(void)
{
        int error = 0;
        struct file *current_file;

        /* let's call this for all tfiles */
        // 遍歷監視的文件
        list_for_each_entry(current_file, &tfile_check_list, f_tfile_llink) {
                path_count_init();  // 初始化調用深度的次數為0
                // 檢驗可能發生遞歸的調用
                error = ep_call_nested(&poll_loop_ncalls, EP_MAX_NESTS,
                                        reverse_path_check_proc, current_file,
                                        current_file, current);
                if (error)
                        break;
        }
        return error;
}

引子的解答

  1. epoll(2) 得到就緒事件的復雜度為何是 \(O(1)\)
    - epoll_wait(2) 只掃描就緒文件隊列,不用對所有的文件進行判斷,見 epoll_wait(2) 的分析。
  2. epoll(2) 和普通的文件相比的區別在哪里,比如和 eventfd(2) 比較
    - 少了 read(2)/write(2) 等文件操作
    - epoll 作為被監視文件只有可讀就緒事件,eventfd 擁有讀寫就緒事件。
    - eventfd 的就緒事件來自文件自身的狀態(計數)變化,而epoll的就緒來自監視文件的狀態的變化
  3. epoll(2) 相對 poll(2)/select(2) 多提供了 EPOLLET 的觸發模式,實現是如何做到的。
    - 區別在於每次調用 epoll_wait(2)在復制就緒事件至用戶空間后,水平觸發模式會將該文件添加回就緒鏈表。
  4. epoll(2) 相互關注時,有就緒事件到來會產生相互喚醒的問題,為何會出現這樣的問題
    - 見 epoll 間的相互影響及處理
  5. 對於問題 4,內核是如何解決這種相互喚醒的問題。
    - 同 4 解答

新的問題

循環檢測的時候為何需要限定單個線程(任務)間的 epoll 不同,這個猜測可能和喚醒的機制有關,作為一個問題留下。

參考

  1. epoll: add EPOLLEXCLUSIVE flagEPOLLEXCLUSIVE 標志的提交代碼。
  2. linux 內核poll/select/epoll實現剖析 ,對epoll很好的分析,代碼稍微有點舊了,不過還是非常值得一看。
  3. eventfd 源碼分析,上一篇對eventfd的分析。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM