select(2),同步的 I/O 復用
直接看 epoll 的源碼把自己繞暈了,先整個簡單點的下手。
select(2) 提供的用戶接口
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);
- 第 1 個參數為最大的文件描述符加 1
- 第 2 3 4 個參數依次為讀寫異常需要檢查的結構體
- 第 5 個參數為超時時間,struct timeval 是一個精確到微秒的時間結構
man(2) 給出的用例
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int main(void) {
fd_set rfds;
struct timeval tv;
int retval;
/* Watch stdin (fd 0) to see when it has input. */
FD_ZERO(&rfds);
FD_SET(0, &rfds);
/* Wait up to five seconds. */
tv.tv_sec = 5;
tv.tv_usec = 0;
retval = select(1, &rfds, NULL, NULL, &tv);
/* Don't rely on the value of tv now! */
if (retval == -1)
perror("select()");
else if (retval)
printf("Data is available now.\n");
/* FD_ISSET(0, &rfds) will be true. */
else
printf("No data within five seconds.\n");
exit(EXIT_SUCCESS);
}
源碼分析
select(2) 依賴於文件結構 f_op->poll 方法,
- 在select中 循環 調用 f_op->poll() 回調 __pollwait(),在 __pollwait() 中設置好喚醒回調函數 pollwake() 再將其投入到文件的等待隊列中
- 獲得就緒事件掩碼,根據掩碼設置就緒的 fd,當文件狀態發生會主動調用喚醒函數再遍歷等待隊列執行隊列中設置的喚醒回調函數 pollwake()
// fs/kernfs/file.c
// 普通文件中的 poll 操作
const struct file_operations kernfs_file_fops = {
.poll = kernfs_fop_poll,
};
static __poll_t kernfs_fop_poll(struct file *filp, poll_table *wait)
{
// 最終調用 poll_wait 函數,轉而調用 _qproc
poll_wait(filp, &on->poll, wait);
}
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && p->_qproc && wait_address)
p->_qproc(filp, wait_address, p);
}
關鍵的數據結構
// 文件的 poll 操作的數據結構
typedef struct poll_table_struct {
poll_queue_proc _qproc; // 文件 poll 的回調函數
__poll_t _key; // 關注的事件掩碼 pollwake() 比較使用
} poll_table;
// 所有可以投入的等待隊列項集合,(可以觀察初始化位置)
struct poll_wqueues {
poll_table pt;
struct poll_table_page *table; // 如果 inline_entries 空間不足,則從 poll_table_page 中獲取
struct task_struct *polling_task; // 當前運行的任務
int triggered; // 觸發標志
int error; // 錯誤代碼
int inline_index; // inline_entries 中第一個未分配的下標
struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES]; // 固定空間
};
// poll_table_entry 的動態內存結構, poll_get_entry()
struct poll_table_page {
struct poll_table_page * next; // 指向舊的 poll_table_entry 內存
struct poll_table_entry * entry; // 指向空閑的 poll_table_entry 節點
struct poll_table_entry entries[0]; // 作指示,初始 entry 指向
};
// 添加到文件的poll等待隊列中的結構項,一個文件對應一個poll_table_entry
struct poll_table_entry {
struct file *filp; // 當前文件
__poll_t key; // 對應 poll_table 中的 key
wait_queue_entry_t wait; // 等待隊列中的節點
wait_queue_head_t *wait_address; // 等待隊列頭
};
/*
* A single wait-queue entry structure:
*/
struct wait_queue_entry {
unsigned int flags;
void *private;
wait_queue_func_t func; // 等待隊列上節點的回調函數
struct list_head entry;
};
struct wait_queue_head {
spinlock_t lock;
struct list_head head;
};
typedef struct wait_queue_entry wait_queue_entry_t;
typedef struct wait_queue_head wait_queue_head_t;
typedef int (*wait_queue_func_t)(struct wait_queue_entry *wq_entry, unsigned mode, int flags, void *key);
1. 先對超時時間做處理
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
fd_set __user *, exp, struct timeval __user *, tvp)
{
return kern_select(n, inp, outp, exp, tvp);
}
static int kern_select(int n, fd_set __user *inp, fd_set __user *outp,
fd_set __user *exp, struct timeval __user *tvp)
{
struct timespec64 end_time, *to = NULL;
struct timeval tv;
int ret;
if (tvp) {
if (copy_from_user(&tv, tvp, sizeof(tv)))
return -EFAULT;
to = &end_time;
if (poll_select_set_timeout(to,
tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),
(tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))
return -EINVAL;
}
ret = core_sys_select(n, inp, outp, exp, to);
ret = poll_select_copy_remaining(&end_time, tvp, 1, ret);
return ret;
}
struct timespec64 是一個包含秒和納秒的時間結構,struct timeval 的精度為微秒。
如果超時時間存在,則對時間做處理:將超時時間的精度安全的從微秒擴展到納秒。
2. 輪詢之前的准備工作
int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
fd_set __user *exp, struct timespec64 *end_time)
{
fd_set_bits fds;
void *bits;
int ret, max_fds;
size_t size, alloc_size;
struct fdtable *fdt;
/* Allocate small arguments on the stack to save memory and be faster */
long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];
ret = -EINVAL;
if (n < 0)
goto out_nofds;
/* max_fds can increase, so grab it once to avoid race */
rcu_read_lock();
fdt = files_fdtable(current->files);
max_fds = fdt->max_fds; // 當前進程上的最大 fd
rcu_read_unlock();
if (n > max_fds)
n = max_fds;
/*
* We need 6 bitmaps (in/out/ex for both incoming and outgoing),
* since we used fdset we need to allocate memory in units of
* long-words.
*/
size = FDS_BYTES(n);
bits = stack_fds; // 優先使用棧上內存,若是內存不夠再申請空間
if (size > sizeof(stack_fds) / 6) {
/* Not enough space in on-stack array; must use kmalloc */
ret = -ENOMEM;
if (size > (SIZE_MAX / 6))
goto out_nofds;
alloc_size = 6 * size;
bits = kvmalloc(alloc_size, GFP_KERNEL);
if (!bits)
goto out_nofds;
}
fds.in = bits;
fds.out = bits + size;
fds.ex = bits + 2*size;
fds.res_in = bits + 3*size;
fds.res_out = bits + 4*size;
fds.res_ex = bits + 5*size;
// 將用戶態的 rfds wfds efds 復制到內核空間的 fds 上
if ((ret = get_fd_set(n, inp, fds.in)) ||
(ret = get_fd_set(n, outp, fds.out)) ||
(ret = get_fd_set(n, exp, fds.ex)))
goto out;
zero_fd_set(n, fds.res_in); // 在輪詢前對 fds 的結果置空
zero_fd_set(n, fds.res_out);
zero_fd_set(n, fds.res_ex);
ret = do_select(n, &fds, end_time); // 核心邏輯
if (ret < 0)
goto out;
if (!ret) {
ret = -ERESTARTNOHAND;
if (signal_pending(current)) // 若當前進程有信號未處理
goto out;
ret = 0;
}
// 將輪詢后的結果復制到用戶空間
if (set_fd_set(n, inp, fds.res_in) ||
set_fd_set(n, outp, fds.res_out) ||
set_fd_set(n, exp, fds.res_ex))
ret = -EFAULT;
out:
if (bits != stack_fds)
kvfree(bits);
out_nofds:
return ret;
}
select 核心實現
static int do_select(int n, fd_set_bits *fds, struct timespec64 *end_time)
{
ktime_t expire, *to = NULL;
struct poll_wqueues table;
poll_table *wait;
int retval, i, timed_out = 0;
u64 slack = 0;
__poll_t busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;
unsigned long busy_start = 0;
rcu_read_lock();
retval = max_select_fd(n, fds); // 檢查 fds 中fd的有效性,並獲得最大的 fd
rcu_read_unlock();
if (retval < 0)
return retval;
n = retval;
poll_initwait(&table); // 初始化 poll_wqueue,見下,實現
wait = &table.pt;
if (end_time && !end_time->tv_sec && !end_time->tv_nsec) { // 超時的判斷
wait->_qproc = NULL;
timed_out = 1;
}
if (end_time && !timed_out) // 未超時,計算剩下的時間
slack = select_estimate_accuracy(end_time);
retval = 0;
for (;;) {
unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
bool can_busy_loop = false;
inp = fds->in; outp = fds->out; exp = fds->ex;
rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
unsigned long in, out, ex, all_bits, bit = 1, j;
unsigned long res_in = 0, res_out = 0, res_ex = 0;
__poll_t mask;
in = *inp++; out = *outp++; ex = *exp++;
all_bits = in | out | ex;
if (all_bits == 0) { // 沒有關注的文件
i += BITS_PER_LONG; // 一個 long 可以描述 32 個fd
continue; // 直接下一輪
}
// 這里就開始對每一個 fd 進行判斷了
for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) {
struct fd f;
if (i >= n)
break;
if (!(bit & all_bits)) // 該fd未被關注
continue;
f = fdget(i); // 獲取文件
if (f.file) {
const struct file_operations *f_op;
f_op = f.file->f_op;
mask = DEFAULT_POLLMASK;
if (f_op->poll) {
// 設置回調函數的關注的文件事件掩碼
wait_key_set(wait, in, out,
bit, busy_flag);
// 執行文件的 poll 方法,調用 __pollwait(見下實現),設置等待隊列回調函數,返回就緒文件事件掩碼
mask = (*f_op->poll)(f.file, wait);
}
fdput(f); // 釋放文件
// 接下來就是根據就緒事件掩碼將就緒的fd寫入對應的 fds 中
if ((mask & POLLIN_SET) && (in & bit)) {
res_in |= bit;
retval++;
wait->_qproc = NULL; // 已經添加過 喚醒函數了,所以這里直接置空
}
if ((mask & POLLOUT_SET) && (out & bit)) {
res_out |= bit;
retval++;
wait->_qproc = NULL;
}
if ((mask & POLLEX_SET) && (ex & bit)) {
res_ex |= bit;
retval++;
wait->_qproc = NULL;
}
/* got something, stop busy polling */
if (retval) {
can_busy_loop = false;
busy_flag = 0;
/*
* only remember a returned
* POLL_BUSY_LOOP if we asked for it
*/
} else if (busy_flag & mask)
can_busy_loop = true;
}
}
if (res_in)
*rinp = res_in;
if (res_out)
*routp = res_out;
if (res_ex)
*rexp = res_ex;
cond_resched();
}
wait->_qproc = NULL;
// 如果有就緒事件 或 超時 或 有信號,退出循環
if (retval || timed_out || signal_pending(current))
break;
if (table.error) {
retval = table.error;
break;
}
/* only if found POLL_BUSY_LOOP sockets && not out of time */
if (can_busy_loop && !need_resched()) {
if (!busy_start) {
busy_start = busy_loop_current_time();
continue;
}
if (!busy_loop_timeout(busy_start))
continue;
}
busy_flag = 0;
/*
* If this is the first loop and we have a timeout
* given, then we convert to ktime_t and set the to
* pointer to the expiry value.
*/
if (end_time && !to) {
expire = timespec64_to_ktime(*end_time);
to = &expire;
}
// 等待超時,並且將超時標志置 1,將再次經過一次循環
if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,
to, slack))
timed_out = 1;
}
poll_freewait(&table);
return retval;
}
先看 poll_initwait() 實現
void poll_initwait(struct poll_wqueues *pwq)
{
// init_poll_funcptr(&pwq->pt, __pollwait);
pwq->pt->_qproc = __pollwait; // 設置 poll_table 的回調函數為 __pollwait()
pwq->pt->_key = ~(__poll_t)0; /* all events enabled */
pwq->polling_task = current; // 指向當前的任務
pwq->triggered = 0; // 未觸發
pwq->error = 0; // 無錯誤
pwq->table = NULL; // 無動態內存分配
pwq->inline_index = 0; // table_entry 未使用
}
再看 __pollwait() 實現
/* Add a new entry */
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,
poll_table *p)
{
struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);
struct poll_table_entry *entry = poll_get_entry(pwq);
if (!entry)
return;
entry->filp = get_file(filp); // 設置文件
entry->wait_address = wait_address; // 設置等待隊列頭
entry->key = p->_key; // 設置關注的事件
init_waitqueue_func_entry(&entry->wait, pollwake); // 設置對待隊列節點的回調函數為 pollwake()
entry->wait.private = pwq; // 私有數據 poll_wqueues
add_wait_queue(wait_address, &entry->wait); // 將 poll_table_entry 添加到文件的等待隊列上
}
poll_get_entry() 的實現就不貼了,就是數組空間夠的話取數組空間,數組空間不夠申請內存頁,新的內存頁采用頭插法插入到 poll_wqueues->table 中
add_wait_queue() 就是一個將等待節點條目 wait_queue_entry_t 添加到 wait_queue_head_t 中,這里的 head 是 file->f_op->poll() 傳來的poll頭節點
再看 pollwake(), 最主要的工作是喚醒線程
static int pollwake(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
{
struct poll_table_entry *entry;
entry = container_of(wait, struct poll_table_entry, wait);
if (key && !(key_to_poll(key) & entry->key))
return 0;
return __pollwake(wait, mode, sync, key);
}
static int __pollwake(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
{
struct poll_wqueues *pwq = wait->private;
DECLARE_WAITQUEUE(dummy_wait, pwq->polling_task);
/*
* Although this function is called under waitqueue lock, LOCK
* doesn't imply write barrier and the users expect write
* barrier semantics on wakeup functions. The following
* smp_wmb() is equivalent to smp_wmb() in try_to_wake_up()
* and is paired with smp_store_mb() in poll_schedule_timeout.
*/
smp_wmb();
pwq->triggered = 1; // 設置觸發的標志
/*
* Perform the default wake up operation using a dummy
* waitqueue.
*
* TODO: This is hacky but there currently is no interface to
* pass in @sync. @sync is scheduled to be removed and once
* that happens, wake_up_process() can be used directly.
*/
return default_wake_function(&dummy_wait, mode, sync, key); // 喚醒,key 未使用
}
現在只有這些等待節點回調的實現,文件狀態發生變化時會調用等待隊列節點的回調函數 也就是 pollwake (wait_queue_func_t), 看實現
// 直接看喚醒的核心邏輯
// kernel/sched/wait.c
static int __wake_up_common(struct wait_queue_head *wq_head, unsigned int mode,
int nr_exclusive, int wake_flags, void *key,
wait_queue_entry_t *bookmark)
{
wait_queue_entry_t *curr;
// 遍歷文件 wait_queue_head 執行回調函數
list_for_each_entry_safe_from(curr, next, &wq_head->head, entry)
ret = curr->func(curr, mode, wake_flags, key);
}
雜項
- 為何 select 最大只能關注 1024 個文件描述符
/* fd_set for select and pselect. */
#define __FD_SETSIZE 1024
/* The fd_set member is required to be an array of longs. */
typedef long int __fd_mask;
/* It's easier to assume 8-bit bytes than to get CHAR_BIT. */
#define __NFDBITS (8 * (int)sizeof(__fd_mask)) // 每 long int 的位數 32
#define __FD_ELT(d) ((d) / __NFDBITS)
#define __FD_MASK(d) ((__fd_mask)1 << ((d) % __NFDBITS)) // 取模進行左移
typedef struct {
__fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
#define __FDS_BITS(set) ((set)->__fds_bits)
} fd_set;
#define __FD_SET(d, set) ((void)(__FDS_BITS(set)[__FD_ELT(d)] |= __FD_MASK(d)))
#define __FD_CLR(d, set) ((void)(__FDS_BITS(set)[__FD_ELT(d)] &= ~__FD_MASK(d)))
#define __FD_ISSET(d, set) ((__FDS_BITS(set)[__FD_ELT(d)] & __FD_MASK(d)) != 0)
如上所示,fd_set 是一個整形數組,大小為32,觀察以上的宏實現,一個整形可以存 32 個 fd,每個fd對應一個 bit,故宏__FD_MASK() 是找到該fd對應哪一位,而__FD_ELT()對應fd處於哪一個 long 單位中,相當於一個二維數組了。
__FDS_BITS(set)[__FD_ELT(d)] 就是對該fd的訪問了。
- 為何 select 的第一個參數要在最大的 fd 中加 1.
這是select設計的問題,將第一個參數即當作fd又作為長度來分配空間,且輪詢的fd是從 0(stdin) 開始的,所以要加 1
