mark 引用:http://janfan.cn/chinese/2015/01/05/select-poll-impl-inside-the-kernel.html 文章
select()/poll() 的內核實現
05 Jan 2015同時對多個文件設備進行I/O事件監聽的時候(I/O multiplexing),我們經常會用到系統調用函數select()
poll()
,甚至是為大規模成百上千個文件設備進行並發讀寫而設計的epoll()
。
I/O multiplexing: When an application needs to handle multiple I/O descriptors at the same time, and I/O on any one descriptor can result in blocking. E.g. file and socket descriptors, multiple socket descriptors
一旦某些文件設備准備好了,可以讀寫了,或者是我們自己設置的timeout時間到了,這些函數就會返回,根據返回結果主程序繼續運行。
用了這些函數有什么好處? 我們自己本來就可以實現這種I/O Multiplexing啊,比如說:
- 創建多個進程或線程來監聽
- Non-blocking讀寫監聽的輪詢(polling)
- 異步I/O(Asynchronous I/O)與Unix Signal事件觸發
想要和我們自己的實現手段做比較,那么首先我們就得知道這些函數在背后是怎么實現的。 本文以Linux(v3.9-rc8)源碼為例,探索select()
poll()
的內核實現。
select()
源碼概述
首先看看select()
函數的函數原型,具體用法請自行輸入命令行$ man 2 select
查閱吧 : )
int select(int nfds, fd_set *restrict readfds, fd_set *restrict writefds, fd_set *restrict errorfds, struct timeval *restrict timeout);
下文將按照這個結構來講解select()
在Linux的實現機制。
select()
內核入口do_select()
的循環體struct file_operations
設備驅動的操作函數scull
驅動實例poll_wait
與設備的等待隊列- 其它相關細節
- 最后
好,讓我們開始吧 : )
select()
內核入口
我們首先把目光放到文件fs/select.c
文件上。
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp, fd_set __user *, exp, struct timeval __user *, tvp) { // … ret = core_sys_select(n, inp, outp, exp, to); ret = poll_select_copy_remaining(&end_time, tvp, 1, ret); return ret; }
int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp, fd_set __user *exp, struct timespec *end_time) { fd_set_bits fds; // … if ((ret = get_fd_set(n, inp, fds.in)) || (ret = get_fd_set(n, outp, fds.out)) || (ret = get_fd_set(n, exp, fds.ex))) goto out; zero_fd_set(n, fds.res_in); zero_fd_set(n, fds.res_out); zero_fd_set(n, fds.res_ex); // … ret = do_select(n, &fds, end_time); // … }
很好,我們找到了一個宏定義的select()
函數的入口,繼續深入,可以看到其中最重要的就是do_select()
這個內核函數。
do_select()
的循環體
do_select()
實質上是一個大的循環體,對每一個主程序要求監聽的設備fd(File Descriptor)做一次struct file_operations
結構體里的poll
操作。
int do_select(int n, fd_set_bits *fds, struct timespec *end_time) { // … for (;;) { // … for (i = 0; i < n; ++rinp, ++routp, ++rexp) { // … struct fd f; f = fdget(i); if (f.file) { const struct file_operations *f_op; f_op = f.file->f_op; mask = DEFAULT_POLLMASK; if (f_op->poll) { wait_key_set(wait, in, out, bit, busy_flag); // 對每個fd進行I/O事件檢測 mask = (*f_op->poll)(f.file, wait); } fdput(f); // … } } // 退出循環體 if (retval || timed_out || signal_pending(current)) break; // 進入休眠 if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE, to, slack)) timed_out = 1; } }
(*f_op->poll)
會返回當前設備fd的狀態(比如是否可讀可寫),根據這個狀態,do_select()
接着做出不同的動作
- 如果設備fd的狀態與主程序的感興趣的I/O事件匹配,則記錄下來,
do_select()
退出循環體,並把結果返回給上層主程序。 - 如果不匹配,
do_select()
發現timeout已經到了或者進程有signal信號打斷,也會退出循環,只是返回空的結果給上層應用。
但如果do_select()
發現當前沒有事件發生,又還沒到timeout,更沒signal打擾,內核會在這個循環體里面永遠地輪詢下去嗎?
select()
把全部fd檢測一輪之后如果沒有可用I/O事件,會讓當前進程去休眠一段時間,等待fd設備或定時器來喚醒自己,然后再繼續循環體看看哪些fd可用,以此提高效率。
int poll_schedule_timeout(struct poll_wqueues *pwq, int state, ktime_t *expires, unsigned long slack) { int rc = -EINTR; // 休眠 set_current_state(state); if (!pwq->triggered) rc = schedule_hrtimeout_range(expires, slack, HRTIMER_MODE_ABS); __set_current_state(TASK_RUNNING); /* * Prepare for the next iteration. * * The following set_mb() serves two purposes. First, it's * the counterpart rmb of the wmb in pollwake() such that data * written before wake up is always visible after wake up. * Second, the full barrier guarantees that triggered clearing * doesn't pass event check of the next iteration. Note that * this problem doesn't exist for the first iteration as * add_wait_queue() has full barrier semantics. */ set_mb(pwq->triggered, 0); return rc; } EXPORT_SYMBOL(poll_schedule_timeout);
struct file_operations
設備驅動的操作函數
設備發現I/O事件時會喚醒主程序進程? 每個設備fd的等待隊列在哪?我們什么時候把當前進程添加到它們的等待隊列里去了?
mask = (*f_op->poll)(f.file, wait);
就是上面這行代碼干的好事。 不過在此之前,我們得先了解一下系統內核與文件設備的驅動程序之間耦合框架的設計。
上文對每個設備的操作f_op->poll
,是一個針對每個文件設備特定的內核函數,區別於我們平時用的系統調用poll()
。 並且,這個操作是select()
poll()
epoll()
背后實現的共同基礎。
Support for any of these calls requires support from the device driver. This support (for all three calls,
select()
poll()
andepoll()
) is provided through the driver’s poll method.
Linux的設計很靈活,它並不知道每個具體的文件設備是怎么操作的(怎么打開,怎么讀寫),但內核讓每個設備擁有一個struct file_operations
結構體,這個結構體里定義了各種用於操作設備的函數指針,指向操作每個文件設備的驅動程序實現的具體操作函數,即設備驅動的回調函數(callback)。
struct file { struct path f_path; struct inode *f_inode; /* cached value */ const struct file_operations *f_op; // … } __attribute__((aligned(4))); /* lest something weird decides that 2 is OK */
struct file_operations { struct module *owner; loff_t (*llseek) (struct file *, loff_t, int); ssize_t (*read) (struct file *, char __user *, size_t, loff_t *); ssize_t (*write) (struct file *, const char __user *, size_t, loff_t *); ssize_t (*aio_read) (struct kiocb *, const struct iovec *, unsigned long, loff_t); ssize_t (*aio_write) (struct kiocb *, const struct iovec *, unsigned long, loff_t); ssize_t (*read_iter) (struct kiocb *, struct iov_iter *); ssize_t (*write_iter) (struct kiocb *, struct iov_iter *); int (*iterate) (struct file *, struct dir_context *); // select()輪詢設備fd的操作函數 unsigned int (*poll) (struct file *, struct poll_table_struct *); // … };
這個f_op->poll
對文件設備做了什么事情呢? 一是調用poll_wait()
函數(在include/linux/poll.h
文件); 二是檢測文件設備的當前狀態。
unsigned int (*poll) (struct file *filp, struct poll_table_struct *pwait);
The device method is in charge of these two steps:
- Call
poll_wait()
on one or more wait queues that could indicate a change in the poll status. If no file descriptors are currently available for I/O, the kernel causes the process to wait on the wait queues for all file descriptors passed to the system call.- Return a bit mask describing the operations (if any) that could be immediately performed without blocking.
或者來看另一個版本的說法:
For every file descriptor, it calls that fd’s
poll()
method, which will add the caller to that fd’s wait queue, and return which events (readable, writeable, exception) currently apply to that fd.
下一節里我們會結合驅動實例程序來理解。
scull
驅動實例
由於Linux設備驅動的耦合設計,對設備的操作函數都是驅動程序自定義的,我們必須要結合一個具體的實例來看看,才能知道f_op->poll
里面弄得是什么鬼。
在這里我們以Linux Device Drivers, Third Edition一書中的例子——scull
設備的驅動程序為例。
scull
(Simple Character Utility for Loading Localities). scull is a char driver that acts on a memory area as though it were a device.
scull
設備不同於硬件設備,它是模擬出來的一塊內存,因此對它的讀寫更快速更自由,內存支持你順着讀倒着讀點着讀怎么讀都可以。 我們以書中“管道”(pipe)式,即FIFO的讀寫驅動程序為例。
首先是scull_pipe
的結構體,注意wait_queue_head_t
這個隊列類型,它就是用來記錄等待設備I/O事件的進程的。
struct scull_pipe { wait_queue_head_t inq, outq; /* read and write queues */ char *buffer, *end; /* begin of buf, end of buf */ int buffersize; /* used in pointer arithmetic */ char *rp, *wp; /* where to read, where to write */ int nreaders, nwriters; /* number of openings for r/w */ struct fasync_struct *async_queue; /* asynchronous readers */ struct mutex mutex; /* mutual exclusion semaphore */ struct cdev cdev; /* Char device structure */ };
scull
設備的輪詢操作函數scull_p_poll
,驅動模塊加載后,這個函數就被掛到(*poll)
函數指針上去了。
我們可以看到它的確是返回了當前設備的I/O狀態,並且調用了內核的poll_wait()
函數,這里注意,它把自己的wait_queue_head_t
隊列也當作參數傳進去了。
static unsigned int scull_p_poll(struct file *filp, poll_table *wait) { struct scull_pipe *dev = filp->private_data; unsigned int mask = 0; /* * The buffer is circular; it is considered full * if "wp" is right behind "rp" and empty if the * two are equal. */ mutex_lock(&dev->mutex); poll_wait(filp, &dev->inq, wait); poll_wait(filp, &dev->outq, wait); if (dev->rp != dev->wp) mask |= POLLIN | POLLRDNORM; /* readable */ if (spacefree(dev)) mask |= POLLOUT | POLLWRNORM; /* writable */ mutex_unlock(&dev->mutex); return mask; }
當scull
有數據寫入時,它會把wait_queue_head_t
隊列里等待的進程給喚醒。
static ssize_t scull_p_write(struct file *filp, const char __user *buf, size_t count, loff_t *f_pos) { // … /* Make sure there's space to write */ // … /* ok, space is there, accept something */ // … /* finally, awake any reader */ wake_up_interruptible(&dev->inq); /* blocked in read() and select() */ // … }
可是wait_queue_head_t
隊列里的進程是什么時候裝進去的? 肯定是poll_wait
搞的鬼! 我們又得回到該死的Linux內核去了。
poll_wait
與設備的等待隊列
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p) { if (p && p->_qproc && wait_address) p->_qproc(filp, wait_address, p); } /* * Do not touch the structure directly, use the access functions * poll_does_not_wait() and poll_requested_events() instead. */ typedef struct poll_table_struct { poll_queue_proc _qproc; unsigned long _key; } poll_table; /* * structures and helpers for f_op->poll implementations */ typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *);
可以看到,poll_wait()
其實就是只是直接調用了struct poll_table_struct
結構里綁定的函數指針。 我們得找到struct poll_table_struct
初始化的地方。
The
poll_table
structure is just a wrapper around a function that builds the actual data structure. That structure, forpoll
andselect
, is a linked list of memory pages containingpoll_table_entry
structures.
struct poll_table_struct
里的函數指針,是在do_select()
初始化的。
int do_select(int n, fd_set_bits *fds, struct timespec *end_time) { struct poll_wqueues table; poll_table *wait; poll_initwait(&table); wait = &table.pt; // … } void poll_initwait(struct poll_wqueues *pwq) { // 初始化poll_table里的函數指針 init_poll_funcptr(&pwq->pt, __pollwait); pwq->polling_task = current; pwq->triggered = 0; pwq->error = 0; pwq->table = NULL; pwq->inline_index = 0; } EXPORT_SYMBOL(poll_initwait); static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc) { pt->_qproc = qproc; pt->_key = ~0UL; /* all events enabled */ }
我們現在終於知道,__pollwait()
函數,就是poll_wait()
幕后的真凶。
add_wait_queue()
把當前進程添加到設備的等待隊列wait_queue_head_t
中去。
/* Add a new entry */ static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p) { struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt); struct poll_table_entry *entry = poll_get_entry(pwq); if (!entry) return; entry->filp = get_file(filp); entry->wait_address = wait_address; entry->key = p->_key; init_waitqueue_func_entry(&entry->wait, pollwake); entry->wait.private = pwq; // 把當前進程裝到設備的等待隊列 add_wait_queue(wait_address, &entry->wait); } void add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait) { unsigned long flags; wait->flags &= ~WQ_FLAG_EXCLUSIVE; spin_lock_irqsave(&q->lock, flags); __add_wait_queue(q, wait); spin_unlock_irqrestore(&q->lock, flags); } EXPORT_SYMBOL(add_wait_queue); static inline void __add_wait_queue(wait_queue_head_t *head, wait_queue_t *new) { list_add(&new->task_list, &head->task_list); } /** * Insert a new element after the given list head. The new element does not * need to be initialised as empty list. * The list changes from: * head → some element → ... * to * head → new element → older element → ... * * Example: * struct foo *newfoo = malloc(...); * list_add(&newfoo->entry, &bar->list_of_foos); * * @param entry The new element to prepend to the list. * @param head The existing list. */ static inline void list_add(struct list_head *entry, struct list_head *head) { __list_add(entry, head, head->next); }
其它相關細節
fd_set
實質上是一個unsigned long
數組,里面的每一個long
整值的每一位都代表一個文件,其中置為1的位表示用戶要求監聽的文件。 可以看到,select()
能同時監聽的fd好少,只有1024個。
#define __FD_SETSIZE 1024 typedef struct { unsigned long fds_bits[__FD_SETSIZE / (8 * sizeof(long))]; } __kernel_fd_set; typedef __kernel_fd_set fd_set;
- 所謂的文件描述符fd (File Descriptor),大家也知道它其實只是一個表意的整數值,更深入地說,它是每個進程的file數組的下標。
struct fd { struct file *file; unsigned int flags; };
select()
系統調用會創建一個poll_wqueues
結構體,用來記錄相關I/O設備的等待隊列;當select()
退出循環體返回時,它要把當前進程從全部等待隊列中移除——這些設備再也不用着去喚醒當前隊列了。
The call to
poll_wait
sometimes also adds the process to the given wait queue. The whole structure must be maintained by the kernel so that the process can be removed from all of those queues before poll or select returns.
/* * Structures and helpers for select/poll syscall */ struct poll_wqueues { poll_table pt; struct poll_table_page *table; struct task_struct *polling_task; int triggered; int error; int inline_index; struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES]; }; struct poll_table_entry { struct file *filp; unsigned long key; wait_queue_t wait; wait_queue_head_t *wait_address; };
wait_queue_head_t
就是一個進程(task)的隊列。
struct __wait_queue_head { spinlock_t lock; struct list_head task_list; }; typedef struct __wait_queue_head wait_queue_head_t;
select()
與epoll()
的比較
- select,poll實現需要自己不斷輪詢所有fd集合,直到設備就緒,期間可能要睡眠和喚醒多次交替。而epoll其實也需要調用
epoll_wait
不斷輪詢就緒鏈表,期間也可能多次睡眠和喚醒交替,但是它是設備就緒時,調用回調函數,把就緒fd放入就緒鏈表中,並喚醒在epoll_wait中進入睡眠的進程。雖然都要睡眠和交替,但是select和poll在“醒着”的時候要遍歷整個fd集合,而epoll在“醒着”的時候只要判斷一下就緒鏈表是否為空就行了,這節省了大量的CPU時間。這就是回調機制帶來的性能提升。- epoll所支持的FD上限是最大可以打開文件的數目,這個數字一般遠大於2048,舉個例子,在1GB內存的機器上大約是10萬左右,具體數目可以
cat /proc/sys/fs/file-max
察看,一般來說這個數目和系統內存關系很大。
更具體的比較可以參見這篇文章。
最后
非常艱難的,我們終於來到了這里(T^T)
總結一下select()
的大概流程(poll
同理,只是用於存放fd的數據結構不同而已)。
- 先把全部fd掃一遍
- 如果發現有可用的fd,跳到5
- 如果沒有,當前進程去睡覺xx秒
- xx秒后自己醒了,或者狀態變化的fd喚醒了自己,跳到1
- 結束循環體,返回
我相信,你肯定還沒懂,這代碼實在是亂得一逼,被我剪輯之后再是亂得沒法看了(嘆氣)。 所以看官請務必親自去看Linux源碼,在這里我已經給出了大致的方向,等你看完源碼回來,這篇文章你肯定也就明白了。 當然別忘了下面的參考資料,它們可幫大忙了 :P