Linux內核poll/select機制簡析


0、 I/O多路復用機制

I/O多路復用 (I/O multiplexing),提供了同時監測若干個文件描述符是否可以執行IO操作的能力。 select/poll/epoll 函數都提供了這樣的機制,能夠同時監控多個描述符,當某個描述符就緒(讀或寫就緒),則立刻通知相應程序進行讀或寫操作。本文將從內核源碼(v5.2.14)入手,嘗試簡述 poll/select 機制的實現原理。

 

1、poll/select函數

介紹內核源碼前,先來簡單介紹 poll/select 函數的調用方式。

1.1 select函數

#include <poll.h>
int select (int maxfd, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, 
            struct timeval *tvptr);
  •  maxfd :代表要監控的最大文件描述符fd+1
  •  writefds :監控可寫的文件描述符fd集合
  •  readfds :監控可讀的文件描述符fd集合
  •  exceptfds :監控異常事件的文件描述符fd集合
  •  timeout :超時時長

 select 將監聽的文件描述符分為三組,每一組監聽不同的I/O操作。 readfds/writefds/exceptfds 分別表示可寫、可讀、異常事件的文件描述符集合,這三個參數可以用 NULL 來表示對應的事件不需要監聽。對信號集合的操作可以利用如下幾個函數完成:

void FD_CLR(int fd, fd_set *set);
int  FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);

 select 的調用會阻塞到有文件描述符可以進行IO操作或被信號打斷或者超時才會返回。 timeout 參數用來指定超時時間,含義如下:

  •  NULL: 表示不設置超時,調用會一直阻塞直到文件描述符上的事件觸發
  • 0:     表示不等待,立即返回,用於檢測文件描述符狀態
  • 正整數: 表示指定時間內沒有事件觸發,則超時返回

值得注意的是, select 調用返回時,每個文件描述符集合均會被過濾,只保留得到事件響應的文件描述符。在下一次調用 select 時,描述符集合均需要重新設置。

1.2 poll函數

#include <poll.h>
int poll (struct pollfd *fds, unsigned int nfds, int timeout);

struct pollfd {
    int fd;             /* file descriptor to check  */
    short events;       /* events of interest on fd  */
    short revents;      /* events that occured on fd */
};

 poll 函數與 select 不同,不需要為三種事件分別設置文件描述符集,而是構造了 pollfd 結構的數組,每個數組元素指定一個描述符fd以及對該描述符感興趣的條件(events)。 poll 調用返回時,每個描述符fd上產生的事件均被保存在 revents 成員內。

和 select 類似, timeout 參數用來指定超時時間(ms)。

 

2、poll函數機制

pollselect均屬於系統調用的方式,先就 poll 函數在Linux中實現的機制進行分析。
pollselect函數在內核源碼中的定義均位於 fs/select.c 文件中,poll 函數的原型定義如下:

SYSCALL_DEFINE3(poll, struct pollfd __user *, ufds, unsigned int, nfds, int, timeout_msecs)

首先,會調用 poll_select_set_timeout 函數將超時時間轉換為 timespec64 結構變量,注意超時時間將會以當前時間(monotonic clock)為基礎,轉換為未來的一個超時時間點(絕對時間)。

struct timespec64 end_time, *to = NULL;
if (timeout_msecs >= 0) {
    to = &end_time;
    poll_select_set_timeout(to, timeout_msecs / MSEC_PER_SEC, 
                            NSEC_PER_MSEC * (timeout_msecs % MSEC_PER_SEC));
}

2.1 do_sys_poll

static int do_sys_poll(struct pollfd __user *ufds, unsigned int nfds, 
                       struct timespec64 *end_time)
{
    struct poll_wqueues table;
    int err = -EFAULT, fdcount, len, size;
    /* 在棧上分配小段空間提高速度,通過`poll_list`鏈表保存所有的`pollfd` */
    long stack_pps[POLL_STACK_ALLOC/sizeof(long)];
    struct poll_list *const head = (struct poll_list *)stack_pps;
    struct poll_list *walk = head;
    unsigned long todo = nfds;
    . . .
    len = min_t(unsigned int, nfds, N_STACK_PPS);
    for (;;) {
        walk->next = NULL;
        walk->len = len;
        . . .
        /* 1. 將pollfd從用戶空間拷貝到內核空間 */
        if (copy_from_user(walk->entries, ufds + nfds-todo, sizeof(struct pollfd) * walk->len))
            goto out_fds;

        todo -= walk->len;
        if (!todo)
            break;

        len = min(todo, POLLFD_PER_PAGE);
        size = sizeof(struct poll_list) + sizeof(struct pollfd) * len;
        walk = walk->next = kmalloc(size, GFP_KERNEL);
        . . .
    }

    poll_initwait(&table);
    /* 2. 調用do_poll完成poll的實際調用處理 */
    fdcount = do_poll(head, &table, end_time);
    poll_freewait(&table);
    /* 3. 將每個fd上產生的事件revents再從內核空間拷貝到用戶空間 */
    for (walk = head; walk; walk = walk->next) {
        struct pollfd *fds = walk->entries;
        . . .
        for (j = 0; j < walk->len; j++, ufds++)
            if (__put_user(fds[j].revents, &ufds->revents))
                goto out_fds;
    }
    err = fdcount;

out_fds:
    . . .
    return err;
}

 do_sys_poll 函數首先將 pollfd 結構體數組從用戶空間拷貝至內核空間,同時用名為 poll_list 的鏈表存儲(一部分存儲在棧空間上,一部分存儲在堆空間),形如:
poll_list  poll_initwait(&table) 對 poll_wqueues 結構體變量 table 進行初始化:

struct poll_wqueues table = {
    poll_table pt = {
        ._qproc = __pollwait;
        ._key   = ~(__poll_t)0;  /* all events enabled */
    };

    struct poll_table_page  *table = NULL;
    struct task_struct      *polling_task = current;
    int                     triggered = 0;
    int                     error = 0;
    int                     inline_index = 0;
    struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES];
};

函數指針 table.pt._qproc 被初始化指向 __pollwait 函數,這個和 poll 調用過程中阻塞與喚醒機制相關,后面將介紹。
隨后即調用 do_poll 函數完成 poll 操作,最后將每個文件描述符fd產生的事件再拷貝到內核空間。

2.2 do_poll

static int do_poll(struct poll_list *list, struct poll_wqueues *wait, 
                   struct timespec64 *end_time)
{
    poll_table* pt = &wait->pt;
    ktime_t expire, *to = NULL;
    int timed_out = 0, count = 0;
    u64 slack = 0;
    __poll_t busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;
    unsigned long busy_start = 0;

    /* timeout設置為0時,將pt->_qproc設置為NULL,同時不阻塞,相當於退化為輪詢操作 */
    if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
        pt->_qproc = NULL;
        timed_out = 1;
    }

    /* 超時時間設置並有效的情況下,才設置slack */
    if (end_time && !timed_out)
        slack = select_estimate_accuracy(end_time);

    for (;;) {
        struct poll_list *walk;
        bool can_busy_loop = false;

        /* 對每一項pollfd進行遍歷,調用do_pollfd */
        for (walk = list; walk != NULL; walk = walk->next) {
            struct pollfd * pfd, * pfd_end;

            pfd = walk->entries;
            pfd_end = pfd + walk->len;
            for (; pfd != pfd_end; pfd++) {
                /* do_pollfd返回非負值,表示發現事件觸發,此時無需再將當前進程加入到相應的等待隊列 */
                if (do_pollfd(pfd, pt, &can_busy_loop, busy_flag)) {
                    count++;
                    pt->_qproc = NULL;
                    /* found something, stop busy polling */
                    busy_flag = 0;
                    can_busy_loop = false;
                }
            }
        }
        /* 當前進程已經在上述的遍歷中被加入到各個fd對應驅動的等待隊列,無需重復加入 */
        pt->_qproc = NULL;
        if (!count) {
            count = wait->error;
            /* 被信號中斷,后面將返回 */
            if (signal_pending(current))
                count = -EINTR;
        }
        /* 發現事件觸發,或者timed_out == 1 提前退出循環 */
        if (count || timed_out)
            break;

        if (can_busy_loop && !need_resched()) {
            if (!busy_start) {
                busy_start = busy_loop_current_time();
                continue;
            }
            if (!busy_loop_timeout(busy_start))
                continue;
        }
        busy_flag = 0;

        /* 超時時間end_time有效時,將timespec64格式的end_time轉換為ktime_t格式 */
        if (end_time && !to) {
            expire = timespec64_to_ktime(*end_time);
            to = &expire;
        }

        /* 進行poll進程的休眠工作,讓出CPU
         * 超時時間到達時返回,設置timed_out=1,在下一個輪詢后返回上層調用
         */
        if (!poll_schedule_timeout(wait, TASK_INTERRUPTIBLE, to, slack))
            timed_out = 1;
    }
    return count;
}

 do_poll 函數首先從頭部到尾部遍歷鏈表 poll_list ,對每一項 pollfd 調用 do_pollfd 函數。 do_pollfd 函數主要將當前 poll 調用進程加入到每個 pollfd 對應fd所關聯的底層驅動等待隊列中,將在下文詳細介紹這一點。
 do_pollfd 調用后,如果某個fd已經產生事件,那么后續遍歷其他fd時,無需再將當前進程加入到對應的等待隊列中, poll 調用也將返回而不是睡眠(schedule)。

2.3 do_pollfd

static inline __poll_t do_pollfd(struct pollfd *pollfd, poll_table *pwait, 
        bool *can_busy_poll, __poll_t busy_flag)
{
    int fd = pollfd->fd;
    __poll_t mask = 0, filter;
    struct fd f;

    if (fd < 0)
        goto out;
    mask = EPOLLNVAL;
    f = fdget(fd);
    if (!f.file)
        goto out;

    /* userland u16 ->events contains POLL... bitmap */
    filter = demangle_poll(pollfd->events) | EPOLLERR | EPOLLHUP;
    pwait->_key = filter | busy_flag;
    mask = vfs_poll(f.file, pwait);
    if (mask & busy_flag)
        *can_busy_poll = true;
    mask &= filter;     /* Mask out unneeded events. */
    fdput(f);

out:
    /* ... and so does ->revents */
    pollfd->revents = mangle_poll(mask);
    return mask;
}

 do_pollfd 主要完成與底層VFS中的驅動程序 f_op->poll 的調用,和對事件的過濾(只過濾出對每個文件描述符感興趣的事件)。最后會把過濾出的事件放入 revents 中,作為結果返回。

static inline __poll_t vfs_poll(struct file *file, struct poll_table_struct *pt)
{
    if (unlikely(!file->f_op->poll))
        return DEFAULT_POLLMASK;
    return file->f_op->poll(file, pt);
}

 vfs_poll 將調用 file->f_op->poll 函數,而這個函數是在設備驅動程序中定義的。這里通過一個模擬的字符驅動globalfifo程序中定義的 xxx_poll 函數來分析調用過程。
 globalfifo_poll 主要完成了如下幾方面的工作:

  • 鎖定設備自定義的互斥量
  • 調用 poll_wait 將 poll 進程加入到設備自定義的等待隊列中,下文將詳細介紹 poll_wait 
  • 判斷等待事件條件是否發生
  • 對互斥量解鎖
static unsigned int globalfifo_poll(struct file *flip, 
                                    struct poll_table_struct *poll_table)
{
    struct globalfifo_dev *dev = flip->private_data;
    unsigned int mask = 0;

    mutex_lock(&dev->globalfifo_mutex);
    /* Add poll_table to r_wait/w_wait queue of device driver
     * then, device driver could wake up poll function of upper layer (do_poll)
     */

    poll_wait(flip, &dev->r_wait, poll_table);
    poll_wait(flip, &dev->w_wait, poll_table);

    if (dev->current_len != 0) {
        mask |= POLLIN | POLLRDNORM;
    }

    if (dev->current_len != GLOBALFIFO_SIZE) {
        mask |= POLLOUT | POLLWRNORM;
    }
    mutex_unlock(&dev->globalfifo_mutex);

    return mask;
}

2.4 poll_wait

void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
    if (p && p->_qproc && wait_address)
        p->_qproc(filp, wait_address, p);
}

 poll_wait 進而調用到 poll_table p->_qproc ,而后者在2.1節中通過 poll_initwait(&table) 被初始化為 __pollwait 。

static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p)
{
    struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);
    struct poll_table_entry *entry = poll_get_entry(pwq);
    if (!entry)
        return;
    entry->filp = get_file(filp);
    entry->wait_address = wait_address;
    entry->key = p->_key;
    init_waitqueue_func_entry(&entry->wait, pollwake);
    entry->wait.private = pwq;
    add_wait_queue(wait_address, &entry->wait);
}

 __pollwait 初始化等待隊列項(關聯到當前 poll 進程),最后將等待隊列項加入到設備驅動中定義的等待隊列中。關於等待隊列的介紹,可以參考:Linux等待隊列(Wait Queue)

2.5  do_poll (continued)

static int do_poll(struct poll_list *list, struct poll_wqueues *wait, 
                   struct timespec64 *end_time)
{
    . . .
    for (;;) {
        struct poll_list *walk;
        bool can_busy_loop = false;

        for (walk = list; walk != NULL; walk = walk->next) {
            struct pollfd * pfd, * pfd_end;
            pfd = walk->entries;
            pfd_end = pfd + walk->len;
            for (; pfd != pfd_end; pfd++) {
                /* do_pollfd返回非負值,表示發現事件觸發,此時無需再將當前進程加入到相應的等待隊列 */
                if (do_pollfd(pfd, pt, &can_busy_loop, busy_flag)) {
                    count++;
                    pt->_qproc = NULL;
                    /* found something, stop busy polling */
                    busy_flag = 0;
                    can_busy_loop = false;
                }
            }
        }
        /* 已經通過遍歷處理完所有pollfd,無需再次進行等待隊列的處理 */
        pt->_qproc = NULL;
        if (!count) {
            count = wait->error;
            /* 被信號中斷,將直接返回 */
            if (signal_pending(current))
                count = -EINTR;
        }
        /* 發現事件觸發,或者timed_out == 1 提前退出循環 */
        if (count || timed_out)
            break;
        . . .
        busy_flag = 0;

        /* 超時時間end_time有效時,將timespec64格式的end_time轉換為ktime_t格式 */
        if (end_time && !to) {
            expire = timespec64_to_ktime(*end_time);
            to = &expire;
        }

        /* 進行poll進程的休眠工作,讓出CPU
         * 超時時間到達時返回,設置timed_out=1,在下一個輪詢后返回上層調用
         */
        if (!poll_schedule_timeout(wait, TASK_INTERRUPTIBLE, to, slack))
            timed_out = 1;
    }
    return count;
}

前面幾節介紹了do_poll會依次遍歷每一個pollfd,調用do_pollfd將當前poll進程加入到文件描述符對應驅動的等待隊列中,此過程中會判斷等待條件是否已產生。如若任意一個fd的事件event符合要求,對於后面的fd不會把當前進程加入到等待隊列中(pt->_qproc = NULL)。這一輪將poll進程都放進等待隊列后,在下一個loop就不用重復存放。
存在(產生)下一個loop的條件:

  • timeout超時條件發生, 在下一個loop會先判斷是否有event滿足,滿足剛好能夠作為結果被返回
  • 等待隊列被喚醒,在下一個loop對每個fd進行判斷,對相應文件描述符更新revents 

 do_poll 阻塞條件終止返回上層調用 do_sys_poll 后,如2.1節所述,會依次遍歷每一項 pollfd ,將最終產生的事件 revents 從內核空間拷貝到用戶空間。
至此,poll調用過程基本結束。

 

3、select函數機制

 select 函數和 poll 函數的實現機制大體一致,同樣存在用戶空間到內核空間的拷貝過程,以及在等待隊列上睡眠和喚醒的過程,關於 select 調用的詳細過程分析在此不贅述了。與 poll 調用不同的一點是, select 調用對目標文件描述符的數量受到最大文件描述符 max_fds 的限制,在如下 compat_core_sys_select 函數中可以清晰得看到這點。

static int compat_core_sys_select(int n, compat_ulong_t __user *inp, 
        compat_ulong_t __user *outp, compat_ulong_t __user *exp, struct timespec64 *end_time)
{
    fd_set_bits fds;
    void *bits;
    int size, max_fds, ret = -EINVAL;
    struct fdtable *fdt;
    long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];

    if (n < 0)
        goto out_nofds;

    /* max_fds can increase, so grab it once to avoid race */
    rcu_read_lock();
    fdt = files_fdtable(current->files);
    max_fds = fdt->max_fds;
    rcu_read_unlock();
    if (n > max_fds)
        n = max_fds;
    . . .
}
static int compat_core_sys_select(int n, compat_ulong_t __user *inp, 
        compat_ulong_t __user *outp, compat_ulong_t __user *exp, struct timespec64 *end_time)
{
    fd_set_bits fds;
    void *bits;
    int size, max_fds, ret = -EINVAL;
    struct fdtable *fdt;
    long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];

    if (n < 0)
        goto out_nofds;

    /* max_fds can increase, so grab it once to avoid race */
    rcu_read_lock();
    fdt = files_fdtable(current->files);
    max_fds = fdt->max_fds;
    rcu_read_unlock();
    if (n > max_fds)
        n = max_fds;
    . . .
}

 

4、總結

前面幾節內容,介紹了 poll 和 select 調用的使用方法,並利用源碼重點分析了Linux內核 poll 機制的實現原理。
 poll 系統調用的整體過程可以概括為下圖:
poll_syscall

 

 

參考資料

[1] select/poll/epoll對比分析:(http://gityuan.com/2015/12/06/linux_epoll/)
[2] 源碼解讀poll/select內核機制:(http://gityuan.com/2019/01/05/linux-poll-select/)

[3] 一文看懂IO多路復用:(https://zhuanlan.zhihu.com/p/115220699)

[4] POLL機制:(https://www.cnblogs.com/liuqing520/p/12706187.html)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM