0、 I/O多路復用機制
I/O多路復用 (I/O multiplexing),提供了同時監測若干個文件描述符是否可以執行IO操作的能力。 select/poll/epoll 函數都提供了這樣的機制,能夠同時監控多個描述符,當某個描述符就緒(讀或寫就緒),則立刻通知相應程序進行讀或寫操作。本文將從內核源碼(v5.2.14)入手,嘗試簡述 poll/select 機制的實現原理。
1、poll/select函數
介紹內核源碼前,先來簡單介紹 poll/select 函數的調用方式。
1.1 select函數
#include <poll.h> int select (int maxfd, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *tvptr);
- maxfd :代表要監控的最大文件描述符
fd+1
- writefds :監控可寫的文件描述符
fd
集合 - readfds :監控可讀的文件描述符
fd
集合 - exceptfds :監控異常事件的文件描述符
fd
集合 - timeout :超時時長
select 將監聽的文件描述符分為三組,每一組監聽不同的I/O操作。 readfds/writefds/exceptfds 分別表示可寫、可讀、異常事件的文件描述符集合,這三個參數可以用 NULL 來表示對應的事件不需要監聽。對信號集合的操作可以利用如下幾個函數完成:
void FD_CLR(int fd, fd_set *set); int FD_ISSET(int fd, fd_set *set); void FD_SET(int fd, fd_set *set); void FD_ZERO(fd_set *set);
select 的調用會阻塞到有文件描述符可以進行IO操作或被信號打斷或者超時才會返回。 timeout 參數用來指定超時時間,含義如下:
- NULL: 表示不設置超時,調用會一直阻塞直到文件描述符上的事件觸發
- 0: 表示不等待,立即返回,用於檢測文件描述符狀態
- 正整數: 表示指定時間內沒有事件觸發,則超時返回
值得注意的是, select 調用返回時,每個文件描述符集合均會被過濾,只保留得到事件響應的文件描述符。在下一次調用 select 時,描述符集合均需要重新設置。
1.2 poll函數
#include <poll.h> int poll (struct pollfd *fds, unsigned int nfds, int timeout); struct pollfd { int fd; /* file descriptor to check */ short events; /* events of interest on fd */ short revents; /* events that occured on fd */ };
poll 函數與 select 不同,不需要為三種事件分別設置文件描述符集,而是構造了 pollfd 結構的數組,每個數組元素指定一個描述符fd
以及對該描述符感興趣的條件(events)。 poll 調用返回時,每個描述符fd
上產生的事件均被保存在 revents 成員內。
和 select 類似, timeout 參數用來指定超時時間(ms)。
2、poll函數機制
poll和select均屬於系統調用的方式,先就 poll 函數在Linux中實現的機制進行分析。
poll和select函數在內核源碼中的定義均位於 fs/select.c 文件中,poll 函數的原型定義如下:
SYSCALL_DEFINE3(poll, struct pollfd __user *, ufds, unsigned int, nfds, int, timeout_msecs)
首先,會調用 poll_select_set_timeout 函數將超時時間轉換為 timespec64 結構變量,注意超時時間將會以當前時間(monotonic clock)為基礎,轉換為未來的一個超時時間點(絕對時間)。
struct timespec64 end_time, *to = NULL; if (timeout_msecs >= 0) { to = &end_time; poll_select_set_timeout(to, timeout_msecs / MSEC_PER_SEC, NSEC_PER_MSEC * (timeout_msecs % MSEC_PER_SEC)); }
2.1 do_sys_poll
static int do_sys_poll(struct pollfd __user *ufds, unsigned int nfds, struct timespec64 *end_time) { struct poll_wqueues table; int err = -EFAULT, fdcount, len, size; /* 在棧上分配小段空間提高速度,通過`poll_list`鏈表保存所有的`pollfd` */ long stack_pps[POLL_STACK_ALLOC/sizeof(long)]; struct poll_list *const head = (struct poll_list *)stack_pps; struct poll_list *walk = head; unsigned long todo = nfds; . . . len = min_t(unsigned int, nfds, N_STACK_PPS); for (;;) { walk->next = NULL; walk->len = len; . . . /* 1. 將pollfd從用戶空間拷貝到內核空間 */ if (copy_from_user(walk->entries, ufds + nfds-todo, sizeof(struct pollfd) * walk->len)) goto out_fds; todo -= walk->len; if (!todo) break; len = min(todo, POLLFD_PER_PAGE); size = sizeof(struct poll_list) + sizeof(struct pollfd) * len; walk = walk->next = kmalloc(size, GFP_KERNEL); . . . } poll_initwait(&table); /* 2. 調用do_poll完成poll的實際調用處理 */ fdcount = do_poll(head, &table, end_time); poll_freewait(&table); /* 3. 將每個fd上產生的事件revents再從內核空間拷貝到用戶空間 */ for (walk = head; walk; walk = walk->next) { struct pollfd *fds = walk->entries; . . . for (j = 0; j < walk->len; j++, ufds++) if (__put_user(fds[j].revents, &ufds->revents)) goto out_fds; } err = fdcount; out_fds: . . . return err; }
do_sys_poll 函數首先將 pollfd 結構體數組從用戶空間拷貝至內核空間,同時用名為 poll_list 的鏈表存儲(一部分存儲在棧空間上,一部分存儲在堆空間),形如:
poll_initwait(&table) 對 poll_wqueues 結構體變量 table 進行初始化:
struct poll_wqueues table = { poll_table pt = { ._qproc = __pollwait; ._key = ~(__poll_t)0; /* all events enabled */ }; struct poll_table_page *table = NULL; struct task_struct *polling_task = current; int triggered = 0; int error = 0; int inline_index = 0; struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES]; };
函數指針 table.pt._qproc 被初始化指向 __pollwait 函數,這個和 poll 調用過程中阻塞與喚醒機制相關,后面將介紹。
隨后即調用 do_poll 函數完成 poll 操作,最后將每個文件描述符fd
產生的事件再拷貝到內核空間。
2.2 do_poll
static int do_poll(struct poll_list *list, struct poll_wqueues *wait, struct timespec64 *end_time) { poll_table* pt = &wait->pt; ktime_t expire, *to = NULL; int timed_out = 0, count = 0; u64 slack = 0; __poll_t busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0; unsigned long busy_start = 0; /* timeout設置為0時,將pt->_qproc設置為NULL,同時不阻塞,相當於退化為輪詢操作 */ if (end_time && !end_time->tv_sec && !end_time->tv_nsec) { pt->_qproc = NULL; timed_out = 1; } /* 超時時間設置並有效的情況下,才設置slack */ if (end_time && !timed_out) slack = select_estimate_accuracy(end_time); for (;;) { struct poll_list *walk; bool can_busy_loop = false; /* 對每一項pollfd進行遍歷,調用do_pollfd */ for (walk = list; walk != NULL; walk = walk->next) { struct pollfd * pfd, * pfd_end; pfd = walk->entries; pfd_end = pfd + walk->len; for (; pfd != pfd_end; pfd++) { /* do_pollfd返回非負值,表示發現事件觸發,此時無需再將當前進程加入到相應的等待隊列 */ if (do_pollfd(pfd, pt, &can_busy_loop, busy_flag)) { count++; pt->_qproc = NULL; /* found something, stop busy polling */ busy_flag = 0; can_busy_loop = false; } } } /* 當前進程已經在上述的遍歷中被加入到各個fd對應驅動的等待隊列,無需重復加入 */ pt->_qproc = NULL; if (!count) { count = wait->error; /* 被信號中斷,后面將返回 */ if (signal_pending(current)) count = -EINTR; } /* 發現事件觸發,或者timed_out == 1 提前退出循環 */ if (count || timed_out) break; if (can_busy_loop && !need_resched()) { if (!busy_start) { busy_start = busy_loop_current_time(); continue; } if (!busy_loop_timeout(busy_start)) continue; } busy_flag = 0; /* 超時時間end_time有效時,將timespec64格式的end_time轉換為ktime_t格式 */ if (end_time && !to) { expire = timespec64_to_ktime(*end_time); to = &expire; } /* 進行poll進程的休眠工作,讓出CPU * 超時時間到達時返回,設置timed_out=1,在下一個輪詢后返回上層調用 */ if (!poll_schedule_timeout(wait, TASK_INTERRUPTIBLE, to, slack)) timed_out = 1; } return count; }
do_poll 函數首先從頭部到尾部遍歷鏈表 poll_list ,對每一項 pollfd 調用 do_pollfd 函數。 do_pollfd 函數主要將當前 poll 調用進程加入到每個 pollfd 對應fd
所關聯的底層驅動等待隊列中,將在下文詳細介紹這一點。
do_pollfd 調用后,如果某個fd
已經產生事件,那么后續遍歷其他fd
時,無需再將當前進程加入到對應的等待隊列中, poll 調用也將返回而不是睡眠(schedule)。
2.3 do_pollfd
static inline __poll_t do_pollfd(struct pollfd *pollfd, poll_table *pwait, bool *can_busy_poll, __poll_t busy_flag) { int fd = pollfd->fd; __poll_t mask = 0, filter; struct fd f; if (fd < 0) goto out; mask = EPOLLNVAL; f = fdget(fd); if (!f.file) goto out; /* userland u16 ->events contains POLL... bitmap */ filter = demangle_poll(pollfd->events) | EPOLLERR | EPOLLHUP; pwait->_key = filter | busy_flag; mask = vfs_poll(f.file, pwait); if (mask & busy_flag) *can_busy_poll = true; mask &= filter; /* Mask out unneeded events. */ fdput(f); out: /* ... and so does ->revents */ pollfd->revents = mangle_poll(mask); return mask; }
do_pollfd 主要完成與底層VFS中的驅動程序 f_op->poll 的調用,和對事件的過濾(只過濾出對每個文件描述符感興趣的事件)。最后會把過濾出的事件放入 revents 中,作為結果返回。
static inline __poll_t vfs_poll(struct file *file, struct poll_table_struct *pt) { if (unlikely(!file->f_op->poll)) return DEFAULT_POLLMASK; return file->f_op->poll(file, pt); }
vfs_poll 將調用 file->f_op->poll 函數,而這個函數是在設備驅動程序中定義的。這里通過一個模擬的字符驅動globalfifo
程序中定義的 xxx_poll 函數來分析調用過程。
globalfifo_poll 主要完成了如下幾方面的工作:
- 鎖定設備自定義的互斥量
- 調用 poll_wait 將 poll 進程加入到設備自定義的等待隊列中,下文將詳細介紹 poll_wait
- 判斷等待事件條件是否發生
- 對互斥量解鎖
static unsigned int globalfifo_poll(struct file *flip, struct poll_table_struct *poll_table) { struct globalfifo_dev *dev = flip->private_data; unsigned int mask = 0; mutex_lock(&dev->globalfifo_mutex); /* Add poll_table to r_wait/w_wait queue of device driver * then, device driver could wake up poll function of upper layer (do_poll) */ poll_wait(flip, &dev->r_wait, poll_table); poll_wait(flip, &dev->w_wait, poll_table); if (dev->current_len != 0) { mask |= POLLIN | POLLRDNORM; } if (dev->current_len != GLOBALFIFO_SIZE) { mask |= POLLOUT | POLLWRNORM; } mutex_unlock(&dev->globalfifo_mutex); return mask; }
2.4 poll_wait
void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p) { if (p && p->_qproc && wait_address) p->_qproc(filp, wait_address, p); }
poll_wait 進而調用到 poll_table p->_qproc ,而后者在2.1節中通過 poll_initwait(&table) 被初始化為 __pollwait 。
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p) { struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt); struct poll_table_entry *entry = poll_get_entry(pwq); if (!entry) return; entry->filp = get_file(filp); entry->wait_address = wait_address; entry->key = p->_key; init_waitqueue_func_entry(&entry->wait, pollwake); entry->wait.private = pwq; add_wait_queue(wait_address, &entry->wait); }
__pollwait 初始化等待隊列項(關聯到當前 poll 進程),最后將等待隊列項加入到設備驅動中定義的等待隊列中。關於等待隊列的介紹,可以參考:Linux等待隊列(Wait Queue)。
2.5 do_poll (continued)
static int do_poll(struct poll_list *list, struct poll_wqueues *wait, struct timespec64 *end_time) { . . . for (;;) { struct poll_list *walk; bool can_busy_loop = false; for (walk = list; walk != NULL; walk = walk->next) { struct pollfd * pfd, * pfd_end; pfd = walk->entries; pfd_end = pfd + walk->len; for (; pfd != pfd_end; pfd++) { /* do_pollfd返回非負值,表示發現事件觸發,此時無需再將當前進程加入到相應的等待隊列 */ if (do_pollfd(pfd, pt, &can_busy_loop, busy_flag)) { count++; pt->_qproc = NULL; /* found something, stop busy polling */ busy_flag = 0; can_busy_loop = false; } } } /* 已經通過遍歷處理完所有pollfd,無需再次進行等待隊列的處理 */ pt->_qproc = NULL; if (!count) { count = wait->error; /* 被信號中斷,將直接返回 */ if (signal_pending(current)) count = -EINTR; } /* 發現事件觸發,或者timed_out == 1 提前退出循環 */ if (count || timed_out) break; . . . busy_flag = 0; /* 超時時間end_time有效時,將timespec64格式的end_time轉換為ktime_t格式 */ if (end_time && !to) { expire = timespec64_to_ktime(*end_time); to = &expire; } /* 進行poll進程的休眠工作,讓出CPU * 超時時間到達時返回,設置timed_out=1,在下一個輪詢后返回上層調用 */ if (!poll_schedule_timeout(wait, TASK_INTERRUPTIBLE, to, slack)) timed_out = 1; } return count; }
前面幾節介紹了do_poll會依次遍歷每一個pollfd,調用do_pollfd將當前poll進程加入到文件描述符對應驅動的等待隊列中,此過程中會判斷等待條件是否已產生。如若任意一個fd
的事件event符合要求,對於后面的fd
不會把當前進程加入到等待隊列中(pt->_qproc = NULL)。這一輪將poll進程都放進等待隊列后,在下一個loop就不用重復存放。
存在(產生)下一個loop的條件:
- timeout超時條件發生, 在下一個loop會先判斷是否有event滿足,滿足剛好能夠作為結果被返回
- 等待隊列被喚醒,在下一個loop對每個
fd
進行判斷,對相應文件描述符更新revents
do_poll 阻塞條件終止返回上層調用 do_sys_poll 后,如2.1節所述,會依次遍歷每一項 pollfd ,將最終產生的事件 revents 從內核空間拷貝到用戶空間。
至此,poll
調用過程基本結束。
3、select函數機制
select 函數和 poll 函數的實現機制大體一致,同樣存在用戶空間到內核空間的拷貝過程,以及在等待隊列上睡眠和喚醒的過程,關於 select 調用的詳細過程分析在此不贅述了。與 poll 調用不同的一點是, select 調用對目標文件描述符的數量受到最大文件描述符 max_fds 的限制,在如下 compat_core_sys_select 函數中可以清晰得看到這點。
static int compat_core_sys_select(int n, compat_ulong_t __user *inp, compat_ulong_t __user *outp, compat_ulong_t __user *exp, struct timespec64 *end_time) { fd_set_bits fds; void *bits; int size, max_fds, ret = -EINVAL; struct fdtable *fdt; long stack_fds[SELECT_STACK_ALLOC/sizeof(long)]; if (n < 0) goto out_nofds; /* max_fds can increase, so grab it once to avoid race */ rcu_read_lock(); fdt = files_fdtable(current->files); max_fds = fdt->max_fds; rcu_read_unlock(); if (n > max_fds) n = max_fds; . . . }
static int compat_core_sys_select(int n, compat_ulong_t __user *inp, compat_ulong_t __user *outp, compat_ulong_t __user *exp, struct timespec64 *end_time) { fd_set_bits fds; void *bits; int size, max_fds, ret = -EINVAL; struct fdtable *fdt; long stack_fds[SELECT_STACK_ALLOC/sizeof(long)]; if (n < 0) goto out_nofds; /* max_fds can increase, so grab it once to avoid race */ rcu_read_lock(); fdt = files_fdtable(current->files); max_fds = fdt->max_fds; rcu_read_unlock(); if (n > max_fds) n = max_fds; . . . }
4、總結
前面幾節內容,介紹了 poll 和 select 調用的使用方法,並利用源碼重點分析了Linux內核 poll 機制的實現原理。
poll 系統調用的整體過程可以概括為下圖:
參考資料
[1] select/poll/epoll對比分析:(http://gityuan.com/2015/12/06/linux_epoll/)
[2] 源碼解讀poll/select內核機制:(http://gityuan.com/2019/01/05/linux-poll-select/)
[3] 一文看懂IO多路復用:(https://zhuanlan.zhihu.com/p/115220699)
[4] POLL機制:(https://www.cnblogs.com/liuqing520/p/12706187.html)