關鍵詞:fasync_helper、kill_async、sigsuspend、sigaction、fcntl、F_SETOWN_EX、F_SETSIG、select()、poll()、poll_wait()等。
《Linux/UNIX系統編程手冊》第63章主要介紹了select()/poll()、信號驅動IO、epoll三方面,以及他們之間異同、優劣點。
這里准備結合項目中遇到的問題,分兩個方向進行歸納總結。一是一個IO模型從測試程序、API、內核實現進行縱向分析;二是橫向不同IO模型的優缺點對比。
IO多路復用允許進程同時檢查多個文件描述符以找出它們中的任何一個是否可執行IO操作。系統調用select()和poll()用來執行IO多路復用。
信號驅動IO是指當有輸入或者數據可以寫到指定的文件描述符上時,內核向請求數據的進程發送一個信號。進程可以處理其他的任務,當IO操作可執行時通過接收信號來獲得通知。當同時檢查大量的文件描述符時,信號驅動IO相比select()和poll()有顯著的性能提升。
epoll API是Linux專有的特性,首次出現在Linux 2.6中。同IO多路復用API一樣,epoll API允許進程同時檢查多個文件描述符,看其中任意一個是否能執行IO操作。通信號驅動IO一樣,當同時檢查大量文件描述符時,epoll能提供更好的性能。
在實際應用中使用那種技術,下面是一些要點:
- 系統調用select()和poll()在UNIX系統中已經存在了很長時間。優勢在於可移植性強;缺點在於當同時檢查大量文件描述符時性能延展性不佳。
- epoll API的關鍵優勢在於它能讓應用程序高效地檢查大量的文件描述符。主要缺點在於它是Linux專用API。
- 同epoll一樣,信號驅動IO可以讓應用程序高效地檢查大量的文件描述符。但是epoll有一些信號驅動IO所沒有的優點。
- 避免了處理信號的復雜性。
- 可以指定想要檢查的事件類型,比如讀就緒或寫就緒。
- 可以選擇水平觸發或邊緣出發形式來通知進程。
1. 水平觸發和邊沿觸發
討論多種IO機制之前,首先區分兩種文件描述符准備就緒的通知模式。
水平觸發通知:如果文件描述符上可以非阻塞地執行IO系統調用,此時認為它已經就緒。
邊緣觸發通知:如果文件描述符自上次狀態檢查以來有了新的IO活動,此時需要觸發通知。
IO模式 | 水平觸發 | 邊緣觸發 |
select()/poll() | √ | |
信號驅動IO | √ | |
epoll | √ | √ |
當采用水平觸發通知時,可以在任意時刻檢查文件描述符的就緒狀態。表示當文件描述符處於就緒態時,就可以對其執行一些IO操作;然后重復檢查文件描述符。看看是否仍然處於就緒態,此時可以執行更多的IO。由於水平觸發模式允許我們在任意時刻重復檢查IO狀態,沒有必要每次當文件描述符就緒后需要盡可能多地址性IO。
當采用邊緣觸發時,只有當IO事件發生時才會收到通知,因此:
- 在接收到一個IO事件通知后,程序應該在相應的文件描述符上盡可能多地執行IO。如果不那么做,可能失去執行IO的機會,導致數據丟失或者程序出現阻塞。
- 如果程序采用循環來對文件描述符執行盡可能多的IO,而文件描述符又被設置為可阻塞的,那么最終當沒有更多的IO可執行時,IO系統調用就會阻塞。因此,每個被檢查的文件描述符都應該設置為非阻塞模式,在得到IO事件通知后重復執行IO操作。
2. 信號驅動IO
信號驅動IO中,當文件描述符上可執行IO操作時,進程請求內核為自己發送一個信號。
進程可以執行其他任何任務直到IO就緒位置,此時內核會發送信號給進程。
2.1 信號驅動IO步驟及相關API
信號驅動IO的使用遵循一定的步驟:
1.為內核發送通知信號安裝一個信號處理例程;通常是SIGIO,但在多線程環境下可以指定自定義的實時信號。
2.設定文件描述符屬主,也就是當文件描述符上可執行IO操作時會接收通知信號的進程或進程組。通過fcntl()命令F_SETOWN或者F_SETOWN_EX來制定。
3.通過設定O_NONBLOCK標志是能非阻塞IO。
4.通過O_ASYNC標志是能信號驅動IO。
5.然后進程可以處理其他任務,當有信號到達時對應的信號處理函數就會被調用
6.通常情況下,如果需要等待IO操作,可以通過sigsuspend()進行等待,此時進程會放棄
2.2 async IO測試分析
首先構造內核驅動和用戶空間測試程序,然后針對測試程序進行詳細的分析。
2.2.1 async IO測試程序
創建signal_kernel.c作為模組插入內核,signal_user.c作為用戶空間測試程序。
創建/dev/signal0設備,通過wirte可以讓內核發送信號到用戶空間;wirte之后立即進入sigsuspend()等待。
#include <linux/kernel.h> #include <linux/module.h> #include <linux/init.h> #include <linux/cdev.h> #include <linux/device.h> #include <linux/fs.h> #include <linux/uaccess.h> static struct class *signal_class; static int signal_major; #define SIGNAL_NAME "signal" struct timer_list signal_timer; static struct fasync_struct *signal_async; int signal_count = 0; static unsigned char signal_text[30]; static int signal_release(struct inode *inode, struct file *file) { return 0; } static void dummy_timer(unsigned long data) { //printk("Send SIGIO signal.\n"); kill_fasync (&signal_async, SIGIO, POLL_IN);----------------------------------------發送異步信號給fa綁定的進程。 signal_count++; } static int signal_open(struct inode *inode, struct file *file) { return 0; } static ssize_t signal_read(struct file * file, char __user * buf, size_t count, loff_t *ppos) { int size; size = sprintf(signal_text, "%d", signal_count); copy_to_user(buf, signal_text, size); return size; } static ssize_t signal_write(struct file * file, const char __user * buf, size_t count, loff_t *ppos) { char str[30]; copy_from_user(str, buf,count); printk("Receive %s from user.\n",str); dummy_timer(0); return 0; } static int signal_fasync (int fd, struct file *filp, int on) { return fasync_helper(fd, filp, on, &signal_async);----------------------------------創建fasync_struct並插入到當前fasync鏈表中。 } static const struct file_operations signal_fops = { .owner = THIS_MODULE, .open = signal_open, .release = signal_release, .read = signal_read, .write = signal_write, .fasync = signal_fasync,----------------------------------------------------------當用戶空間設置FASYNC的時候調用fasync函數。 }; static int __init signal_test_init(void) { struct device *signal_device; signal_major = register_chrdev(0, SIGNAL_NAME, &signal_fops); if (signal_major < 0) { pr_err("register_chrdev failed\n"); goto err; } signal_class = class_create(THIS_MODULE, SIGNAL_NAME); if (IS_ERR(signal_class)) { pr_err("device class file already in use\n"); goto err_class; } signal_device = device_create(signal_class, NULL, MKDEV(signal_major, 0), NULL, "%s%d", SIGNAL_NAME, 0); if (IS_ERR(signal_device)) { pr_err("failed to create device\n"); goto err_device; } return 0; err_device: class_destroy(signal_class); err_class: unregister_chrdev(signal_major, SIGNAL_NAME); err: return 0; } static void __exit signal_test_exit(void) { del_timer(&signal_timer); device_destroy(signal_class, MKDEV(signal_major, 0)); class_destroy(signal_class); unregister_chrdev(signal_major, SIGNAL_NAME); } module_init(signal_test_init); module_exit(signal_test_exit); MODULE_LICENSE("GPL");
對應的Makefile如下:
CONFIG_MODULE_SIG=n EXTRA_CFLAGS += -D_GNU_SOURCE---------------------------------------------針對F_SETOWN_EX等擴展命令需要定義此宏。 obj-m := signal_kernel.o KERN_DIR := /lib/modules/$(shell uname -r)/build PWD := $(shell pwd) all: $(MAKE) -C $(KERN_DIR) M=$(PWD) modules gcc $(EXTRA_CFLAGS) signal_user.c -o signal_user -pthread clean: $(MAKE) -C $(KERN_DIR) M=$(PWD) modules clean rm signal_user modules_install: $(MAKE) -C $(KERN_DIR) M=$(PWD) modules_install
用戶空間測試程序signal_user.c如下。
測試程序創建一個sigio線程專門處理信號,重點是進行信號處理前進行一系列設置;然后調用sigsuspend()進入睡眠,等待內核信號喚醒。
在實際應用中,有可能是中斷調用kill_fasync()發送信號。
#include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <stdio.h> #include <poll.h> #include <signal.h> #include <sys/types.h> #include <unistd.h> #include <fcntl.h> #include <time.h> #include <pthread.h> #include <sys/prctl.h> #include <sys/syscall.h> #include <memory.h> int fd; char *filename = "/dev/signal0"; char signal_response[] = "OK"; unsigned int handle_count = 0; #define LOOP_COUNT 10 #define SIGTEST (SIGRTMIN+1) void sig_handler(int sig, siginfo_t *si, void *uc) { unsigned char signal_count[30]; memset(signal_count, 30, 0); read(fd, &signal_count, 0); handle_count ++; printf(">>>>>PID %ld Receive sig=%d count=%d.\n", syscall(SYS_gettid), sig, handle_count); //Trigger a SIGTEST signal. if(handle_count < LOOP_COUNT) write(fd, signal_response, sizeof(signal_response));------------------重復觸發內核發送SIGTEST信號。 //if(atoi(signal_count) != handle_count) //printf("%s: receive_count=%d, handle_count=%d\n", __func__, atoi(signal_count), handle_count); } void *pthread_func(void *arg) { int Oflags; struct timespec time1, time2; struct sigaction sa; sigset_t set, oldset; unsigned long int duration; struct f_owner_ex owner_ex; //Set thread name. prctl(PR_SET_NAME,"sigio"); printf("main=%d, sigio=%ld.\n", getpid(), syscall(SYS_gettid)); //Set SIGTEST actiong. memset(&sa, 0, sizeof(sa)); sa.sa_sigaction = sig_handler; sa.sa_flags |= SA_RESTART | SA_SIGINFO;------------------------------------這里必須使用sa_sigaction和SA_SIGINFO。 sigaction(SIGTEST, &sa, NULL);---------------------------------------------設置SIGTEST信號處理函數。 //Set proc mask. sigemptyset(&set); sigprocmask(SIG_SETMASK, &set, NULL); sigaddset(&set, SIGTEST); fd = open(filename, O_RDWR); if (fd < 0) { printf("Can't open %s!\n", filename); } owner_ex.pid = syscall(SYS_gettid); owner_ex.type = F_OWNER_TID; fcntl(fd, F_SETOWN_EX, &owner_ex);------------------------------------------將fd文件句柄和當前線程綁定;如果設置F_SETOWN則是和線程組綁定,這兩者區別后續重點介紹。 Oflags = fcntl(fd, F_GETFL); fcntl(fd, F_SETFL, Oflags | FASYNC);----------------------------------------對應內核驅動file_operations的fasync成員,給當前fd創建一個fasync_struct。 fcntl(fd, F_SETSIG, SIGTEST);-----------------------------------------------設置SIGTEST替代SIGIO作為信號發送。 clock_gettime(CLOCK_REALTIME, &time1); write(fd, signal_response, sizeof(signal_response));------------------------觸發內核發送SIGTEST信號。 //pthread_sigmask(SIG_BLOCK, &set, &oldset); sigprocmask(SIG_BLOCK, &set, &oldset); while(handle_count < LOOP_COUNT) { //Will suspend here, and later code will be executed after SIGTEST handler. sigsuspend(&oldset);----------------------------------------------------在此處睡眠,等待信號來喚醒。 } //pthread_sigmask(SIG_UNBLOCK, &set, NULL); sigprocmask(SIG_UNBLOCK, &set, NULL); clock_gettime(CLOCK_REALTIME, &time2); duration =(time2.tv_sec-time1.tv_sec)*1000000000 + (time2.tv_nsec-time1.tv_nsec); printf("End time %ld.%ld\n", duration/1000000000, duration%1000000000); close(fd); pthread_exit(0); } void main(int argc, char **argv) { pthread_t tidp; sigset_t set; sigemptyset(&set); sigaddset(&set, SIGTEST); sigprocmask(SIG_BLOCK, &set, NULL); if(pthread_create(&tidp, NULL, pthread_func, NULL) == -1) { printf("Create pthread error.\n"); return; } if(pthread_join(tidp, NULL)) { printf("Join pthread error.\n"); return; } printf("Main exit.\n"); return; }
2.2.2 測試結果
將編譯的module signal_kernel.ko插入內核,然后用戶空間執行sudo ./signal_user結果如下。
main=10485, sigio=10486. >>>>>PID 10486 Receive sig=35 count=1. >>>>>PID 10486 Receive sig=35 count=2. >>>>>PID 10486 Receive sig=35 count=3. >>>>>PID 10486 Receive sig=35 count=4. >>>>>PID 10486 Receive sig=35 count=5. >>>>>PID 10486 Receive sig=35 count=6. >>>>>PID 10486 Receive sig=35 count=7. >>>>>PID 10486 Receive sig=35 count=8. >>>>>PID 10486 Receive sig=35 count=9. >>>>>PID 10486 Receive sig=35 count=10. End time 0.103711 Main exit.
2.3 信號驅動IO流程解讀
首先整個流程在用戶空間配置;然后由內核發起,用戶空間處理。
2.3.1 sigaction()設置信號處置
除了signal(),sigaction()是另一種設置信號處置的選擇。sigaction()更加復雜,但也根據靈活性。
#include <signal.h> int sigaction(int sig, const struct sigaction *act, struct sigaction *oldact);
sig參數標識想要獲取或改變的信號編號,可以使處SIGKILL和SIGSTOP之外任何信號。
act指向描述信號新處置數據結構,oldact用於返回之前信號處置相關信息。兩者都可以設置為NULL。
struct sigaction { void (*sa_handler)(int); /* Address of handler */ sigset_t sa_mask; /* Signals blocked during handler invocation */ int sa_flags; /* Flags controlling handler invocation */ void (*sa_restorer)(void); /* Not for application use */ };
sa_handler指向sig對應的處理函數。
sa_mask定義一組新號,不允許它們中斷此處理程序的執行。調用信號處理程序之前,將該組信號中其他還沒有處於進程掩碼之列的信號添加到進程掩碼中;在信號處理返回之后再將,之前添加的掩碼移除。這就保證,在處理程序執行期間,sa_mask所有信號不會中斷此處理程序。
引發處理程序自身的信號將自動添加到進程掩碼中,這意味着信號處理程序不允許嵌套。當正在執行處理程序時,如果同一信號第二次抵達,將不會遞歸中斷自己。
由於標准信號(sig < SIGRTMIN)信號處理程序不會被隊列化,所以標准信號處理程序執行期間第二次達到的同一信號將會被忽略。
但是事實信號不會存在這種現象。
struct sighand_struct { atomic_t count; struct k_sigaction action[_NSIG];------------------------------------------------一共_NSIG,即64個信號1-31是標准信號,32-64是實時信號。 spinlock_t siglock; wait_queue_head_t signalfd_wqh; }; int do_sigaction(int sig, struct k_sigaction *act, struct k_sigaction *oact) { struct task_struct *p = current, *t; struct k_sigaction *k; sigset_t mask; if (!valid_signal(sig) || sig < 1 || (act && sig_kernel_only(sig))) return -EINVAL; k = &p->sighand->action[sig-1];------------------------------------------------------將k指向當前進程sig對應的k_sigaction結構。 spin_lock_irq(&p->sighand->siglock); if (oact) *oact = *k; sigaction_compat_abi(act, oact); if (act) { sigdelsetmask(&act->sa.sa_mask, sigmask(SIGKILL) | sigmask(SIGSTOP)); *k = *act;-----------------------------------------------------------------------將act和k關聯。 ... } spin_unlock_irq(&p->sighand->siglock); return 0; }
2.3.2 F_SETOWN_EX和F_SETOWN
F_SETOWN是標准fcntl;F_SETOWN_EX是Linux擴展命令,如果要使用需要加上_D_GNU_SOURCE。
這兩個命令都通過fcntl()系統調用,設置文件控制操作。
struct fown_struct是內核中關聯文件句柄和其擁有者信息的結構體:
struct fown_struct { rwlock_t lock; /* protects pid, uid, euid fields */ struct pid *pid; /* pid or -pgrp where SIGIO should be sent */----------------------擁有此文件SIGIO信號的進程。 enum pid_type pid_type; /* Kind of process group SIGIO should be sent to */---------進程類型。 kuid_t uid, euid; /* uid/euid of process setting the owner */ int signum; /* posix.1b rt signal to be delivered on IO */----------------------kill_fasync()發送的信號。 };
下面就來分析這兩命令的區別,以及他們是如何影響信號處理行為的。
enum pid_type { PIDTYPE_PID, PIDTYPE_PGID, PIDTYPE_SID, PIDTYPE_MAX, /* only valid to __task_pid_nr_ns() */ __PIDTYPE_TGID }; static long do_fcntl(int fd, unsigned int cmd, unsigned long arg, struct file *filp) { long err = -EINVAL; switch (cmd) { ... case F_GETFL:------------------------------------------------獲取、設置文件句柄的f_flags。 err = filp->f_flags; break; case F_SETFL: err = setfl(fd, filp, arg); break; ...
case F_GETOWN:-----------------------------------------------獲取、設置文件句柄的擁有者進程pid。 err = f_getown(filp); force_successful_syscall_return(); break; case F_SETOWN: f_setown(filp, arg, 1); err = 0; break; case F_GETOWN_EX:--------------------------------------------進階獲取、設置文件句柄擁有者進程pid。 err = f_getown_ex(filp, arg); break; case F_SETOWN_EX: err = f_setown_ex(filp, arg); break; ... case F_GETSIG:-----------------------------------------------設置文件句柄和其擁有者之間異步通信的信號。 err = filp->f_owner.signum; break; case F_SETSIG: if (!valid_signal(arg)) { break; } err = 0; filp->f_owner.signum = arg; break; ... } return err; } pid_t f_getown(struct file *filp) { pid_t pid; read_lock(&filp->f_owner.lock); pid = pid_vnr(filp->f_owner.pid); if (filp->f_owner.pid_type == PIDTYPE_PGID)-----------------如果是進程組返回其負值。 pid = -pid; read_unlock(&filp->f_owner.lock); return pid; } void f_setown(struct file *filp, unsigned long arg, int force) { enum pid_type type; struct pid *pid; int who = arg; type = PIDTYPE_PID; if (who < 0) { type = PIDTYPE_PGID;-----------------------------------默認類型是進程,如果who為負,則是進程組。 who = -who; } rcu_read_lock(); pid = find_vpid(who); __f_setown(filp, pid, type, force); rcu_read_unlock(); } static int f_getown_ex(struct file *filp, unsigned long arg) { struct f_owner_ex __user *owner_p = (void __user *)arg; struct f_owner_ex owner; int ret = 0; read_lock(&filp->f_owner.lock); owner.pid = pid_vnr(filp->f_owner.pid); switch (filp->f_owner.pid_type) {-------------------------相對於F_SETOWN,多了pid_type類型設置。其中PIDTYPE_MAX表示該文件句柄被一個線程所擁有。而不是進程或者進程組。這點對於信號究竟發給誰,有着非常重要的作用。 case PIDTYPE_MAX: owner.type = F_OWNER_TID; break; case PIDTYPE_PID: owner.type = F_OWNER_PID; break; case PIDTYPE_PGID: owner.type = F_OWNER_PGRP; break; default: WARN_ON(1); ret = -EINVAL; break; } read_unlock(&filp->f_owner.lock); if (!ret) { ret = copy_to_user(owner_p, &owner, sizeof(owner)); if (ret) ret = -EFAULT; } return ret; } static int f_setown_ex(struct file *filp, unsigned long arg) { struct f_owner_ex __user *owner_p = (void __user *)arg; struct f_owner_ex owner; struct pid *pid; int type; int ret; ret = copy_from_user(&owner, owner_p, sizeof(owner)); if (ret) return -EFAULT; switch (owner.type) { case F_OWNER_TID: type = PIDTYPE_MAX; break; case F_OWNER_PID: type = PIDTYPE_PID; break; case F_OWNER_PGRP: type = PIDTYPE_PGID; break; default: return -EINVAL; } rcu_read_lock(); pid = find_vpid(owner.pid); if (owner.pid && !pid) ret = -ESRCH; else __f_setown(filp, pid, type, 1); rcu_read_unlock(); return ret; } static void f_modown(struct file *filp, struct pid *pid, enum pid_type type, int force) { write_lock_irq(&filp->f_owner.lock); if (force || !filp->f_owner.pid) { put_pid(filp->f_owner.pid); filp->f_owner.pid = get_pid(pid); filp->f_owner.pid_type = type; if (pid) { const struct cred *cred = current_cred(); filp->f_owner.uid = cred->uid; filp->f_owner.euid = cred->euid; } } write_unlock_irq(&filp->f_owner.lock); } void __f_setown(struct file *filp, struct pid *pid, enum pid_type type, int force)----------------------------------------------------------------無論是F_SETOWN還是F_SETOWN_EX兩者的force都是1。 { security_file_set_fowner(filp); f_modown(filp, pid, type, force); }
所以F_SETOWN和F_SETOWN_EX主要區別就在於,F_SETOWN_EX可以設置進程的類型。這對於后續信號的發送,有重要作用。
F_SETOWN_EX更加細致,可以指定只發送給某個線程;而F_SETOWN優先發送給線程。如果線程被阻塞,則選擇同一進程中的其他線程接收。
相關問題調試詳見:《sigsuspend()阻塞:異步信號SIGIO為什么會被截胡?》。
2.3..3 F_SETSIG和SA_SIGINFO
如果要使用實時信號替代SIGIO作為kill_fasync()作為信號發送,可以設置F_SETSIG。
參照do_fcntl(),也即通過設置fown_struct的signum值。
如果需要在信號處理函數中獲取更多信息,還需要在sa.sa_flags增加SA_SIGINFO標志。
struct sigaction { union { void (*sa_handler)(int); void (*sa_sigaction)(int, siginfo_t *, void *); } __sigaction_handler; sigset_t sa_mask; int sa_flags; void (*sa_restorer)(void); };
在定義sa_handler是就需要定義如下類型函數:
void handler(int sig, siginfo_t *siginfo, void *ucontext);
其中第二個參數siginfo包含的字段標識出了在哪個文件描述上發生了事件。
使用F_SETSIG有兩個優點:
1.指定不同信號作為信號驅動IO通知信號,解決了在同一進程范圍內,多線程使用信號驅動IO沖突問題。因為默認都是發送SIGIO,而信號處理是進程范圍的。
2.默認的SIGIO是非排隊信號,如果有對個IO時間發送了信號,而SIGIO被阻塞了,除了第一個通知外,其他后續的通知都會丟失。使用實時信號就不會存在這個問題,信號的處理會被排隊;除非信號隊列溢出。
2.3.4 FASYNC以及kill_fasync()發送信號
此表示是通過fcntl進行設置的,所以看一些setfl()。
static int setfl(int fd, struct file * filp, unsigned long arg) { struct inode * inode = file_inode(filp); int error = 0; ... /* * ->fasync() is responsible for setting the FASYNC bit. */ if (((arg ^ filp->f_flags) & FASYNC) && filp->f_op->fasync) {----------------------可以看出主要當前文件具有FASYNC標志位,並且f_op->fasync()定義了就會執行相關函數。 error = filp->f_op->fasync(fd, filp, (arg & FASYNC) != 0); if (error < 0) goto out; if (error > 0) error = 0; } spin_lock(&filp->f_lock); filp->f_flags = (arg & SETFL_MASK) | (filp->f_flags & ~SETFL_MASK); spin_unlock(&filp->f_lock); out: return error; }
我們看到對應的驅動中調用fasync_helper()進行fasync_struct創建,並插入到fasync列表中。
int fasync_helper(int fd, struct file * filp, int on, struct fasync_struct **fapp) { if (!on) return fasync_remove_entry(filp, fapp); return fasync_add_entry(fd, filp, fapp); } static int fasync_add_entry(int fd, struct file *filp, struct fasync_struct **fapp) { struct fasync_struct *new; new = fasync_alloc(); if (!new) return -ENOMEM; if (fasync_insert_entry(fd, filp, fapp, new)) { fasync_free(new); return 0; } return 1; }
當設備IO就需后,通過kill_fasync()發送信號。
void kill_fasync(struct fasync_struct **fp, int sig, int band) { if (*fp) { rcu_read_lock(); kill_fasync_rcu(rcu_dereference(*fp), sig, band); rcu_read_unlock(); } } static void kill_fasync_rcu(struct fasync_struct *fa, int sig, int band) { while (fa) {-------------------------------------------------------------------遍歷當前文件句柄的所有fasync列表,並進行信號發送操作。 struct fown_struct *fown; unsigned long flags; if (fa->magic != FASYNC_MAGIC) { printk(KERN_ERR "kill_fasync: bad magic number in " "fasync_struct!\n"); return; } spin_lock_irqsave(&fa->fa_lock, flags); if (fa->fa_file) { fown = &fa->fa_file->f_owner; if (!(sig == SIGURG && fown->signum == 0))-----------------------------只有在signum為0,並sig等於SIGURQ放棄發送信號,因為SIGURQ有自己的特殊處理。 send_sigio(fown, fa->fa_fd, band); } spin_unlock_irqrestore(&fa->fa_lock, flags); fa = rcu_dereference(fa->fa_next); } } void send_sigio(struct fown_struct *fown, int fd, int band) { struct task_struct *p; enum pid_type type; struct pid *pid; int group = 1; read_lock(&fown->lock); type = fown->pid_type; if (type == PIDTYPE_MAX) {----------------------------------------------------這里體現了F_SETOWN_EX由於F_SETOWN的地方,如果設置為PIDTYPE_MAX,那么group則為0,只發送給線程,不會選擇其他線程作為替代。 group = 0; type = PIDTYPE_PID; } pid = fown->pid; if (!pid) goto out_unlock_fown; read_lock(&tasklist_lock); do_each_pid_task(pid, type, p) { send_sigio_to_task(p, fown, fd, band, group); } while_each_pid_task(pid, type, p); read_unlock(&tasklist_lock); out_unlock_fown: read_unlock(&fown->lock); } static void send_sigio_to_task(struct task_struct *p, struct fown_struct *fown, int fd, int reason, int group) { /* * F_SETSIG can change ->signum lockless in parallel, make * sure we read it once and use the same value throughout. */ int signum = ACCESS_ONCE(fown->signum);--------------------------------------------------------在使用F_SETSIG設置信號后最好不好改變,否則可能造成發送和信號處理函數不匹配。 if (!sigio_perm(p, fown, signum)) return; switch (signum) { siginfo_t si; default:-----------------------------------------------------------------------------------其他情況發送一個RT信號,提供更加豐富的返回信息。 si.si_signo = signum; si.si_errno = 0; si.si_code = reason; BUG_ON((reason & __SI_MASK) != __SI_POLL); if (reason - POLL_IN >= NSIGPOLL) si.si_band = ~0L; else si.si_band = band_table[reason - POLL_IN]; si.si_fd = fd; if (!do_send_sig_info(signum, &si, p, group))------------------------------------------使用signum代提SIGIO作為信號發送。 break; case 0:------------------------------------------------------------------------------------在沒有通過F_SETSIG設置signum的情況下,默認發送SIGIO信號。 do_send_sig_info(SIGIO, SEND_SIG_PRIV, p, group); } } int do_send_sig_info(int sig, struct siginfo *info, struct task_struct *p, bool group) { unsigned long flags; int ret = -ESRCH; if (lock_task_sighand(p, &flags)) { ret = send_signal(sig, info, p, group); unlock_task_sighand(p, &flags); } return ret; } static inline struct sighand_struct *lock_task_sighand(struct task_struct *tsk, unsigned long *flags) { struct sighand_struct *ret; ret = __lock_task_sighand(tsk, flags); (void)__cond_lock(&tsk->sighand->siglock, ret); return ret; } static inline void unlock_task_sighand(struct task_struct *tsk, unsigned long *flags) { spin_unlock_irqrestore(&tsk->sighand->siglock, *flags); }
2.3.5 標准信號和實時信號區別
實時信號相對於標准信號有如下優勢:
- 實時信號擴大了自定義信號的范圍,從SIGRTMIN~SIGRTMAX。
- 對實時信號采取隊列化管理。將一個實時信號多個實例發送給一個進程,那么將會多次傳遞信號。如果一個標准信號已經在等待某一進程,即使再次向該進程發送信號,信號也只會被傳遞一次。
- 發送一個實時信號可以傳遞更多信息。
- 不同實時信號傳遞順序得到保障,如果多個實時信號處於等待狀態,那么將優先傳遞具有最小編號的信號。信號編號越小,其優先級越高。同一類型的多個信號在排隊,那么傳遞順序與信號發送來的順序保持一致。
標准信號和實時信號都通過seng_signal()發送。下面看一下兩者是如何被區別對待的。
static int send_signal(int sig, struct siginfo *info, struct task_struct *t, int group) { int from_ancestor_ns = 0; #ifdef CONFIG_PID_NS from_ancestor_ns = si_fromuser(info) && !task_pid_nr_ns(current, task_active_pid_ns(t)); #endif return __send_signal(sig, info, t, group, from_ancestor_ns); } static int __send_signal(int sig, struct siginfo *info, struct task_struct *t, int group, int from_ancestor_ns) { struct sigpending *pending; struct sigqueue *q; int override_rlimit; int ret = 0, result; assert_spin_locked(&t->sighand->siglock); result = TRACE_SIGNAL_IGNORED; if (!prepare_signal(sig, t, from_ancestor_ns || (info == SEND_SIG_FORCED))) goto ret; pending = group ? &t->signal->shared_pending : &t->pending;----------------------------group起作用的地方,當group為0,則將信號放在當前線程私有的pending列表上;如果group為1,則將信號放在線程組共享的shared_pending列表上。 result = TRACE_SIGNAL_ALREADY_PENDING; if (legacy_queue(pending, sig))--------------------------------------------------------如果sig<SIGRTMIN,並且sig已經被處於pending中。也即標准信號sig已經在pending隊列中,則返回表示當前sig信號已經放入到隊列中。這也說明標准信號沒有被隊列化。 goto ret; result = TRACE_SIGNAL_DELIVERED; if (info == SEND_SIG_FORCED) goto out_set; if (sig < SIGRTMIN) override_rlimit = (is_si_special(info) || info->si_code >= 0);--------------------標准信號存在override_rlimit為1情況,那么即使當前信號隊列達到了RLIMIT_SIGPENDING。仍然可以創建信號隊列告訴緩存。 else override_rlimit = 0;--------------------------------------------------------------實時信號是不允許超過RLIMIT_SIGPENDING限制的。 q = __sigqueue_alloc(sig, t, GFP_ATOMIC | __GFP_NOTRACK_FALSE_POSITIVE, override_rlimit);-----------------------------------------------------------------創建一個高速緩存sigqueue_cachep。 if (q) { list_add_tail(&q->list, &pending->list);------------------------------------------將新創建的信號加入到隊列中。 switch ((unsigned long) info) { case (unsigned long) SEND_SIG_NOINFO: q->info.si_signo = sig; q->info.si_errno = 0; q->info.si_code = SI_USER; q->info.si_pid = task_tgid_nr_ns(current, task_active_pid_ns(t)); q->info.si_uid = from_kuid_munged(current_user_ns(), current_uid()); break; case (unsigned long) SEND_SIG_PRIV: q->info.si_signo = sig; q->info.si_errno = 0; q->info.si_code = SI_KERNEL; q->info.si_pid = 0; q->info.si_uid = 0; break; default: copy_siginfo(&q->info, info); if (from_ancestor_ns) q->info.si_pid = 0; break; } userns_fixup_signal_uid(&q->info, t); } else if (!is_si_special(info)) { if (sig >= SIGRTMIN && info->si_code != SI_USER) { result = TRACE_SIGNAL_OVERFLOW_FAIL; ret = -EAGAIN; goto ret; } else { result = TRACE_SIGNAL_LOSE_INFO; } } out_set: signalfd_notify(t, sig); sigaddset(&pending->signal, sig); complete_signal(sig, t, group); ret: trace_signal_generate(sig, info, t, group, result); return ret; }
static struct sigqueue *
__sigqueue_alloc(int sig, struct task_struct *t, gfp_t flags, int override_rlimit)
{
struct sigqueue *q = NULL;
struct user_struct *user;
rcu_read_lock();
user = get_uid(__task_cred(t)->user);
atomic_inc(&user->sigpending);
rcu_read_unlock();
if (override_rlimit ||
atomic_read(&user->sigpending) <=
task_rlimit(t, RLIMIT_SIGPENDING)) {----------------------------------這里面說明了override_rlimit為1情況可以超出RLIMIT_SIGPENDING限制。實時信號在隊列超過RLIMIT_SIGPENDING后則不允許創建信號隊列。
q = kmem_cache_alloc(sigqueue_cachep, flags);
} else {
print_dropped_signal(sig);---------------------------------------------------當出現丟棄信號的情況,打印信息。
}
if (unlikely(q == NULL)) {
atomic_dec(&user->sigpending);
free_uid(user);
} else {
INIT_LIST_HEAD(&q->list);
q->flags = 0;
q->user = user;
}
return q;
}
這里詳細解釋了信號驅動IO的流程;但信號產生放入pending隊列之后,何時被處理呢?可以參考《信號何時被處理》。
3. IO多路復用select()/poll()
IO多路復用允許我們同時檢查多個文件描述符,看其中任意一個是否可執行IO操作。可以在普通文件、終端、管道等字符型設備上使用select()/poll()來檢查文件描述符。
下面分別對select()/poll()的API、內核流程、測試程序進行分析,然后對兩者進行對比。
3.1 select()詳解
3.1.1 select() API
select()會一直阻塞,直到一個或多個文件描述符集合成為就緒態。
#include <sys/time.h> /* For portability */ #include <sys/select.h> int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,struct timeval *timeout);
參數nfds、readfds、writefds、exceptfds指定了select()要檢查的文件描述符集合。
參數timeout用來設定select()阻塞的時間上限。
文件描述符集合操作
#include <sys/select.h> void FD_ZERO(fd_set *fdset);--------------------將fdset指向的集合初始化為空。 void FD_SET(int fd, fd_set *fdset);-------------將描述符fd添加到fdset所指向的集合中。 void FD_CLR(int fd, fd_set *fdset);-------------將描述符fd從fdset指向的集合中移除。 int FD_ISSET(int fd, fd_set *fdset);------------判斷描述符fd是否在fdset結合中設置。
文件描述符集合有一個最大容量限制,由常量FD_SETSIZE來決定。Linux通常為1024。
timeout參數
參數timeout控制着select()阻塞行為,NULL時select()會一直阻塞。或者指向一個timeval結構體。
struct timeval { time_t tv_sec; /* Seconds */ suseconds_t tv_usec; /* Microseconds (long int) */ };
當timeout為NULL或者指向結構體字段非0時,select()阻塞直到下列事件發生:
- readfds、writefds、exceptfds中指定的文件描述符中至少有一個成為就緒態。
- 該調用被信號處理例程中斷。
- timeout中指定的時間上限已超時。
如果timeout非空,且一個或多個文件描述符就緒返回時,timeout所指向的結構體表示剩余超時時間。
select()返回值
-1表示有錯誤發生。EBADF表示有一個文件描述符是非法的;EINTR表示該調用被信號處理例程中斷了。
0表示在任何文件描述符成為就緒態前select()已經超時。
正整數表示一個或多個文件描述符已達到就緒態。返回值表示就緒態的文件描述符個數。
3.1.2 select()內核詳解
select系統調用路徑是select()->core_sys_select()->do_select(),最終是遍歷每個文件句柄的poll成員,根據poll()返回的參數判斷當前文件狀態。
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp, fd_set __user *, exp, struct timeval __user *, tvp) { struct timespec64 end_time, *to = NULL; struct timeval tv; int ret; if (tvp) { if (copy_from_user(&tv, tvp, sizeof(tv))) return -EFAULT; to = &end_time; if (poll_select_set_timeout(to, tv.tv_sec + (tv.tv_usec / USEC_PER_SEC), (tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))------------------------------------從內核拷貝信息並轉換成內核數據結構struct timespec64。 return -EINVAL; } ret = core_sys_select(n, inp, outp, exp, to);------------------------------------------------和系統調用類似,只是to已經轉變成內核時間。 ret = poll_select_copy_remaining(&end_time, tvp, 1, ret);------------------------------------計算end_time和當前時間的差值,並轉化成tvp返回給用戶空間。 return ret; } int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp, fd_set __user *exp, struct timespec64 *end_time) { fd_set_bits fds; void *bits; int ret, max_fds; size_t size, alloc_size; struct fdtable *fdt; /* Allocate small arguments on the stack to save memory and be faster */ long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];----------------------------------------------SELECT_STACK_ALLOC默認是256,32位系統stack_fds一共64成員。 ret = -EINVAL; if (n < 0) goto out_nofds; /* max_fds can increase, so grab it once to avoid race */ rcu_read_lock(); fdt = files_fdtable(current->files); max_fds = fdt->max_fds; rcu_read_unlock(); if (n > max_fds) n = max_fds; size = FDS_BYTES(n);------------------------------------------------------------------------一個文件描述符占用一bit,size表示這些fd_set需要用掉多少個字節。 bits = stack_fds; if (size > sizeof(stack_fds) / 6) {---------------------------------------------------------如果stack空間足夠當前存放當前strcut fd_set_bits fds的時候,優先使用stack內存,好處是更加快並且節省內存。不然就需要kmalloc去申請內存。 /* Not enough space in on-stack array; must use kmalloc */------------------------------stack_fds大小為256字節,所以要使用stack的最大size=256/6/sizeof(long)=10。 ret = -ENOMEM; if (size > (SIZE_MAX / 6)) goto out_nofds; alloc_size = 6 * size; bits = kmalloc(alloc_size, GFP_KERNEL|__GFP_NOWARN);-------------------------------------通過kmalloc來分配6個size大小的內存;大於一個頁面使用vmalloc。 if (!bits && alloc_size > PAGE_SIZE) bits = vmalloc(alloc_size); if (!bits) goto out_nofds; } fds.in = bits;--------------------------------------------------------------------------此時bits指向的內存都已經分配完畢,並且是6個同樣size大小的。 fds.out = bits + size; fds.ex = bits + 2*size; fds.res_in = bits + 3*size; fds.res_out = bits + 4*size; fds.res_ex = bits + 5*size; if ((ret = get_fd_set(n, inp, fds.in)) || (ret = get_fd_set(n, outp, fds.out)) || (ret = get_fd_set(n, exp, fds.ex))) goto out; zero_fd_set(n, fds.res_in); zero_fd_set(n, fds.res_out); zero_fd_set(n, fds.res_ex);------------------------------------------------------------------將用戶空間傳入的fds拷貝到fds中,並清空res_in、res_out、res_ex。 ret = do_select(n, &fds, end_time); if (ret < 0) goto out; if (!ret) { ret = -ERESTARTNOHAND; if (signal_pending(current)) goto out; ret = 0; } if (set_fd_set(n, inp, fds.res_in) || set_fd_set(n, outp, fds.res_out) || set_fd_set(n, exp, fds.res_ex))----------------------------------------------------------res_int/res_out/res_ex中包含了狀態變化的文件句柄,拷貝到in/out/ex返回給用戶空間。 ret = -EFAULT; out: if (bits != stack_fds) kvfree(bits); out_nofds: return ret; } int do_select(int n, fd_set_bits *fds, struct timespec64 *end_time) { ktime_t expire, *to = NULL; struct poll_wqueues table; poll_table *wait; int retval, i, timed_out = 0; u64 slack = 0; unsigned int busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;---------------------------這個參數影響下面一輪查詢所有文件描述符之后,才做busy loop還是睡眠。這個很影響CPU占用率,busy loop不會主動放棄CPU,睡眠則主動放棄CPU。 unsigned long busy_end = 0; rcu_read_lock(); retval = max_select_fd(n, fds); rcu_read_unlock(); if (retval < 0) return retval; n = retval; poll_initwait(&table);-----------------------------------------------------------------------將當前進程放入自己的等待隊列table,並將該等待隊列加入到測試表wait。 wait = &table.pt; if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {-----------------------------------這也是timeout參數為0時候的特殊情況,直接timed_out置1。 wait->_qproc = NULL; timed_out = 1; } if (end_time && !timed_out) slack = select_estimate_accuracy(end_time); retval = 0; for (;;) { unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp; bool can_busy_loop = false; inp = fds->in; outp = fds->out; exp = fds->ex; rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex; for (i = 0; i < n; ++rinp, ++routp, ++rexp) {--------------------------------------------遍歷所有的fd。 unsigned long in, out, ex, all_bits, bit = 1, mask, j; unsigned long res_in = 0, res_out = 0, res_ex = 0; in = *inp++; out = *outp++; ex = *exp++;---------------------------------------------先取出當前循環周期中的32個文件描述符對應的bitmaps。 all_bits = in | out | ex;------------------------------------------------------------in/out/ex三者組合,有的fd可能監測讀或寫或異常,或者都檢測。 if (all_bits == 0) {-----------------------------------------------------------------32個文件描述符不檢測任何狀態,調到下一個循環。 i += BITS_PER_LONG; continue; } for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) { struct fd f; if (i >= n) break; if (!(bit & all_bits))-----------------------------------------------------------bit每次循環左移一位,如果沒有監測當前為則跳過進入下一次循環。 continue; f = fdget(i); if (f.file) { const struct file_operations *f_op; f_op = f.file->f_op; mask = DEFAULT_POLLMASK; if (f_op->poll) { wait_key_set(wait, in, out, bit, busy_flag);-----------------------------------------------設置當前fd待檢測的事件掩碼。 mask = (*f_op->poll)(f.file, wait);-------------------------------------調用每個文件句柄的poll成員,返回文件的狀態mask。下面分別檢查POLLIN_SET、POLLOUT_SET、POLLEX_SET三種狀態。 } fdput(f); if ((mask & POLLIN_SET) && (in & bit)) { res_in |= bit; retval++; wait->_qproc = NULL; } if ((mask & POLLOUT_SET) && (out & bit)) { res_out |= bit; retval++; wait->_qproc = NULL; } if ((mask & POLLEX_SET) && (ex & bit)) { res_ex |= bit; retval++; wait->_qproc = NULL; } /* got something, stop busy polling */ if (retval) { can_busy_loop = false; busy_flag = 0; /* * only remember a returned * POLL_BUSY_LOOP if we asked for it */ } else if (busy_flag & mask) can_busy_loop = true; } } if (res_in)--------------------------------------------------------------------------根據poll結果寫回到輸出位圖里。 *rinp = res_in; if (res_out) *routp = res_out; if (res_ex) *rexp = res_ex; cond_resched(); } wait->_qproc = NULL; if (retval || timed_out || signal_pending(current))-------------------------------------三種返回情況,retval表示有文件句柄滿足條件;timed_out表示在有timeout參數的情況下超時;signal_pending()表示被信號中斷。 break; if (table.error) { retval = table.error; break; } if (can_busy_loop && !need_resched()) {--------------------------------------------------這里面會一直占用CPU。 if (!busy_end) { busy_end = busy_loop_end_time(); continue; } if (!busy_loop_timeout(busy_end)) continue; } busy_flag = 0; if (end_time && !to) { expire = timespec64_to_ktime(*end_time); to = &expire; } if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE, to, slack))-------------------------------------------------------------進程進入睡眠等待喚醒,如果poll_schedule_timeout()返回0表示超時,下面timed_out被置1。to是根據end_time計算的超時時間。 timed_out = 1; } poll_freewait(&table); return retval; }
回顧一下select()大致流程如下:
1.進入系統調用。
2.進行時間轉換;數據准備,分成in、out、exception三部分。
3.對所有文件句柄進行循環,調用對應文件句柄的poll()函數;分別查詢是那種類型文件句柄有狀態變化。
4.如果一輪循環下來沒有變化,則進入休眠等待,直到超時。
5.如果有數據則喚醒進程,將變化句柄數返回給用戶空間。返回值是三種狀態的綜合,具體哪種狀態哪個文件句柄變化,需要查看分別查看返回的文件句柄。
3.1.3 select()測試程序
select()測試程序和poll()使用同樣的內核驅動,區別就是設置文件句柄和API參數。
#include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <stdio.h> #include <poll.h> #include <memory.h> #include <unistd.h> #include <stdlib.h> #define POLLTEST_NAME "/dev/polltest0" #define LOOP_COUNT 3 int main(int argc, char **argv) { int fd, loop_count = 0; unsigned char polltest_count[30]; int ret; fd_set rds; memset(polltest_count, 30, 0); fd = open(POLLTEST_NAME, O_RDWR); if (fd < 0) { printf("can't open!\n"); } FD_ZERO(&rds); FD_SET(fd, &rds);--------------------------------------------------------------將fd加入到rds句柄集中,在select()中對其進行監控。 while (1) { ret = select(fd+1, &rds, NULL, NULL, NULL); if (ret == 0) { printf("time out\n"); } else { if(FD_ISSET(fd, &rds))-------------------------------------------------判斷句柄fd是否有POLLIN事件發生。 { read(fd, &polltest_count, 0); loop_count = atoi(polltest_count); printf("key_val = %d\n", loop_count); if(loop_count >= LOOP_COUNT) break; } } } return 0; }
3.2 poll()詳解
同樣的從認識poll() API開始;然后詳細分析poll系統調用內核是如何實現的;最后對poll()測試程序流程結合內核進行詳細分析。
3.2.1 poll() API
poll()提供一列文件描述符,在每個文件描述符上標明感興趣的事件。
#include <poll.h> int poll(struct pollfd fds[], nfds_t nfds, int timeout);
nfds指定了fds中的元素個數;參數fds列出了需要poll()來檢查的文件描述符:
struct pollfd {
int fd; /* File descriptor */
short events; /* Requested events bit mask */-------指定需要為描述符fd做檢查的事件。
short revents; /* Returned events bit mask */-------表示fd上實際發生的事件。 };
timeout參數
timeout參數的單位是毫秒。
-1,poll()會一直阻塞直到fds數組中列出的有一個達到就緒態或者捕獲到一個信號。
0,poll()不會阻塞,只是執行一次檢查看看哪個文件描述符處於就緒態。
>0,poll()至多阻塞timeout毫秒,直到fds列出的文件描述符中有一個達到就緒態,或者捕獲到一個信號為止。
poll()返回值
-1標識有錯誤發生,一種可能是被信號中斷返回EINTR。
0表示調用在任意一個文件描述符就緒之前就超時了。
正整數表示一個或多個文件描述符處於就緒態。返回值表示數組fds中擁有非零revents字段的pollfd結構體數量。
3.2.2 poll()內核流程
SYSCALL_DEFINE3(poll, struct pollfd __user *, ufds, unsigned int, nfds, int, timeout_msecs) { struct timespec64 end_time, *to = NULL; int ret; if (timeout_msecs >= 0) {-------------------------------------------從poll()參數timeout可知小於0表示poll()會一直阻塞;等於0表示只檢查一次;大於0表示等待一個超時時間。 to = &end_time; poll_select_set_timeout(to, timeout_msecs / MSEC_PER_SEC, NSEC_PER_MSEC * (timeout_msecs % MSEC_PER_SEC));------------根據timeout_msecs轉換成內核超時數據結構to。 } ret = do_sys_poll(ufds, nfds, to); if (ret == -EINTR) { struct restart_block *restart_block; restart_block = ¤t->restart_block; restart_block->fn = do_restart_poll; restart_block->poll.ufds = ufds; restart_block->poll.nfds = nfds; if (timeout_msecs >= 0) { restart_block->poll.tv_sec = end_time.tv_sec; restart_block->poll.tv_nsec = end_time.tv_nsec; restart_block->poll.has_timeout = 1; } else restart_block->poll.has_timeout = 0; ret = -ERESTART_RESTARTBLOCK; } return ret; } int do_sys_poll(struct pollfd __user *ufds, unsigned int nfds, struct timespec64 *end_time) { struct poll_wqueues table; int err = -EFAULT, fdcount, len, size; long stack_pps[POLL_STACK_ALLOC/sizeof(long)]; struct poll_list *const head = (struct poll_list *)stack_pps; struct poll_list *walk = head; unsigned long todo = nfds; if (nfds > rlimit(RLIMIT_NOFILE))-----------------------------------nfds不能超過系統對打開文件數目限制。 return -EINVAL; len = min_t(unsigned int, nfds, N_STACK_PPS);-----------------------取nfds和棧能容納的fds最小值。 for (;;) {----------------------------------------------------------遍歷所有的ufds[],分配對應的struct poll_list,並且鏈接起來;每個struct poll_list里面又包含了若干個struct pollfd,他的大小通過len確定。 walk->next = NULL; walk->len = len; if (!len) break; if (copy_from_user(walk->entries, ufds + nfds-todo, sizeof(struct pollfd) * walk->len))-----------------首先將使用棧提供的空間,將用戶空間struct pollfd拷貝到內核空間。 goto out_fds; todo -= walk->len;----------------------------------------------假設len==nfds,todo也等於walk->len;所以此處退出for(;;)。 if (!todo) break; len = min(todo, POLLFD_PER_PAGE); size = sizeof(struct poll_list) + sizeof(struct pollfd) * len; walk = walk->next = kmalloc(size, GFP_KERNEL);------------------如果占空間不夠存放struct pollfd,那么kmalloc()申請內存。 if (!walk) { err = -ENOMEM; goto out_fds; } } poll_initwait(&table);----------------------------------------------初始化一個struct poll_wqueues。 fdcount = do_poll(head, &table, end_time); poll_freewait(&table); for (walk = head; walk; walk = walk->next) { struct pollfd *fds = walk->entries; int j; for (j = 0; j < walk->len; j++, ufds++) if (__put_user(fds[j].revents, &ufds->revents))-------------將所有內核處理的revents返回給用戶空間。 goto out_fds; } err = fdcount; out_fds: walk = head->next; while (walk) { struct poll_list *pos = walk; walk = walk->next; kfree(pos);-----------------------------------------------------釋放申請的內存。 } return err; } static int do_poll(struct poll_list *list, struct poll_wqueues *wait, struct timespec64 *end_time) { poll_table* pt = &wait->pt; ktime_t expire, *to = NULL; int timed_out = 0, count = 0; u64 slack = 0; unsigned int busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;-----------------------決定下面是循環還是睡眠等待超時。 unsigned long busy_end = 0; /* Optimise the no-wait case */ if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {------------------------------對timeout為0特殊情況的處理,只查詢一次就退出。 pt->_qproc = NULL; timed_out = 1; } if (end_time && !timed_out) slack = select_estimate_accuracy(end_time); for (;;) { struct poll_list *walk; bool can_busy_loop = false; for (walk = list; walk != NULL; walk = walk->next) {---------------------------------遍歷所有的struct poll_list。 struct pollfd * pfd, * pfd_end; pfd = walk->entries; pfd_end = pfd + walk->len; for (; pfd != pfd_end; pfd++) { if (do_pollfd(pfd, pt, &can_busy_loop, busy_flag)) { count++; pt->_qproc = NULL; /* found something, stop busy polling */ busy_flag = 0; can_busy_loop = false; } } } pt->_qproc = NULL; if (!count) { count = wait->error; if (signal_pending(current)) count = -EINTR; } if (count || timed_out)-----------------------------------------------------------------count是所有的有狀態變化的描述符數量;timed_out表示是否超時。 break; /* only if found POLL_BUSY_LOOP sockets && not out of time */ if (can_busy_loop && !need_resched()) { if (!busy_end) { busy_end = busy_loop_end_time(); continue; } if (!busy_loop_timeout(busy_end)) continue; } busy_flag = 0; if (end_time && !to) { expire = timespec64_to_ktime(*end_time); to = &expire; } if (!poll_schedule_timeout(wait, TASK_INTERRUPTIBLE, to, slack)) timed_out = 1; } return count; } static inline unsigned int do_pollfd(struct pollfd *pollfd, poll_table *pwait, bool *can_busy_poll, unsigned int busy_flag) { unsigned int mask; int fd; mask = 0; fd = pollfd->fd;------------------------------------------------------------------------------根據struct pollfd找到對應的fd,然后調用所屬的poll()函數。 if (fd >= 0) { struct fd f = fdget(fd); mask = POLLNVAL; if (f.file) { mask = DEFAULT_POLLMASK; if (f.file->f_op->poll) { pwait->_key = pollfd->events|POLLERR|POLLHUP; pwait->_key |= busy_flag; mask = f.file->f_op->poll(f.file, pwait);----------------------------------------調用具體文件的poll()函數。 if (mask & busy_flag) *can_busy_poll = true; } /* Mask out unneeded events. */ mask &= pollfd->events | POLLERR | POLLHUP; fdput(f); } } pollfd->revents = mask; return mask; }
回顧一下,poll系統調用大致流程是:
1.進入poll系統調用。
2.進行timeout時間轉換;准備每個文件句柄對應的數據,並將這些數據串聯起來。
3.對所有的struct pollfd進行循環,調用do_pollfd()查詢狀態。
4.do_pollfd()調用具體文件的file->f_op->poll()查詢狀態。
5.一次輪詢之后,將當前線程掛起等待超時獲喚醒。
6.有數據到達后退出循環,將數據返回給用戶空間。
3.2.3 poll()測試程序
首先創建polltest_kernel.c和Makefile文件。
#define DEBUG #include <linux/kernel.h> #include <linux/module.h> #include <linux/init.h> #include <linux/cdev.h> #include <linux/device.h> #include <linux/fs.h> #include <linux/uaccess.h> #include <linux/poll.h> #define POLL_EXPRIES 1 #define POLLTEST_NAME "polltest" static struct class *polltest_class; static int polltest_major; static struct hrtimer polltest_hrtimer; static volatile int ev_press = 0; int polltest_count = 0; static unsigned char polltest_text[30]; DECLARE_WAIT_QUEUE_HEAD(polltest_waitq); static enum hrtimer_restart hrtimer_func(struct hrtimer *timer) { wake_up_interruptible(&polltest_waitq);--------------------------------------------------調用隊列上所有等待項wait_queue_t->func()函數,這個函數主要用來喚醒等待的進程。 printk("%s line=%d\n", __func__, __LINE__); ev_press = 1;----------------------------------------------------------------------------只有ev_press置位,poll()才會返回POLLIN狀態。其他情況就會阻塞。 polltest_count++; hrtimer_forward_now(&polltest_hrtimer, ktime_set(0, 10000000)); return HRTIMER_RESTART; } static int polltest_open(struct inode *inode, struct file *file) { hrtimer_init(&polltest_hrtimer, CLOCK_MONOTONIC, HRTIMER_MODE_REL); polltest_hrtimer.function = hrtimer_func; hrtimer_start(&polltest_hrtimer, ktime_set(0, 10000000), HRTIMER_MODE_REL); printk("%s line=%d\n", __func__, __LINE__); return 0; } static int polltest_release(struct inode *inode, struct file *file) { hrtimer_cancel(&polltest_hrtimer); printk("%s line=%d\n", __func__, __LINE__); return 0; } static ssize_t polltest_read(struct file * file, char __user * buf, size_t count, loff_t *ppos) { int size; size = snprintf(polltest_text, 30, "%d\n", polltest_count); copy_to_user(buf, polltest_text, size); return size; } static ssize_t polltest_write(struct file * file, const char __user * buf, size_t count, loff_t *ppos) { char str[30]; copy_from_user(str, buf,count); return 0; } static unsigned int polltest_poll(struct file *file, struct poll_table_struct *wait) { unsigned int mask = 0; poll_wait(file, &polltest_waitq, wait);-------------------------------------------------將當前struct poll_table_struct的wait加入到polltest_waitq隊列頭中。那么這個wait合適被從polltest_waitq隊列頭移出呢?在poll_freewait()中。 printk("%s line=%d\n", __func__, __LINE__); if(ev_press)----------------------------------------------------------------------------在hrtimer超時之后,ev_press置位,才會發送POLLIN狀態。
{
mask |= POLLIN | POLLRDNORM;--------------------------------------------------------如果返回POLLIN狀態,poll系統調用才會返回;否則會進入睡眠狀態及poll_schedule_timeout()。
ev_press = 0;-----------------------------------------------------------------------恢復為0,避免下次poll()返回POLLIN。
} return mask; } static const struct file_operations polltest_fops = { .owner = THIS_MODULE, .open = polltest_open, .release = polltest_release, .read = polltest_read, .write = polltest_write, .poll = polltest_poll, }; static int __init polltest_test_init(void) { struct device *polltest_device; polltest_major = register_chrdev(0, POLLTEST_NAME, &polltest_fops); if (polltest_major < 0) { pr_err("register_chrdev failed\n"); goto err; } polltest_class = class_create(THIS_MODULE, POLLTEST_NAME); if (IS_ERR(polltest_class)) { pr_err("device class file already in use\n"); goto err_class; } polltest_device = device_create(polltest_class, NULL, MKDEV(polltest_major, 0), NULL, "%s%d", POLLTEST_NAME, 0); if (IS_ERR(polltest_device)) { pr_err("failed to create device\n"); goto err_device; } return 0; err_device: class_destroy(polltest_class); err_class: unregister_chrdev(polltest_major, POLLTEST_NAME); err: return 0; } static void __exit polltest_test_exit(void) { device_destroy(polltest_class, MKDEV(polltest_major, 0)); class_destroy(polltest_class); unregister_chrdev(polltest_major, POLLTEST_NAME); } module_init(polltest_test_init); module_exit(polltest_test_exit); MODULE_LICENSE("GPL");
Makefile:
CONFIG_MODULE_SIG=n obj-m := polltest_kernel.o KERN_DIR := /lib/modules/$(shell uname -r)/build PWD := $(shell pwd) all: $(MAKE) -C $(KERN_DIR) M=$(PWD) modules gcc polltest_user.c -o polltest_user gcc polltest_select.c -o polltest_select clean: $(MAKE) -C $(KERN_DIR) M=$(PWD) modules clean rm polltest_user rm polltest_select modules_install: $(MAKE) -C $(KERN_DIR) M=$(PWD) modules_install
然后創建用戶空間測試程序:
#include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <stdio.h> #include <poll.h> #include <memory.h> #include <unistd.h> #include <stdlib.h> #define POLLTEST_NAME "/dev/polltest0" #define LOOP_COUNT 3 int main(int argc, char **argv) { int fd, loop_count = 0; unsigned char polltest_count[30]; int ret; struct pollfd fds[1]; memset(polltest_count, 30, 0); fd = open(POLLTEST_NAME, O_RDWR);--------------------------------打開/dev/polltest0此時啟動一個10ms的hrtimer。 if (fd < 0) { printf("can't open!\n"); } fds[0].fd = fd; fds[0].events = POLLIN; while (1) { ret = poll(fds, 1, 5000);------------------------------------調用/dev/polltest0的poll()函數。如果返回狀態有POLLIN則ret表示滿足狀態的句柄數,應該為1;如果返回0表示超時。 if (ret == 0) { printf("time out\n"); } else { if(fds[0].revents == POLLIN) { read(fd, &polltest_count, 0); loop_count = atoi(polltest_count); printf("key_val = %d\n", loop_count); if(loop_count >= LOOP_COUNT) break; } } } return 0; }
結果如下:
[ 2366.904469] polltest_open line=41---------------------------------打開/dev/polltest0,啟動10ms的hrtimer。 [ 2366.904475] polltest_poll line=78---------------------------------緊接着poll()系統調用,由於ev_press為0,沒有滿足POLLIN條件;所以進程進入睡眠狀態。 [ 2366.914893] hrtimer_func line=29----------------------------------hrtimer 10ms超時,喚醒進程並且ev_press為1。 [ 2366.914951] polltest_poll line=78---------------------------------進程被喚醒之后,重新調用/dev/polltest0的poll()檢查狀態;因為此時ev_press為1,所以返回POLLIN。滿足條件,poll()系統調用退出,用戶空間read()且置ev_press為0。 [ 2366.915197] polltest_poll line=78---------------------------------再次調用poll()系統調用,此時ev_press為0,所以進入睡眠狀態。 [ 2366.924575] hrtimer_func line=29 [ 2366.924584] polltest_poll line=78 [ 2366.924641] polltest_poll line=78 [ 2366.934632] hrtimer_func line=29 [ 2366.934683] polltest_poll line=78 [ 2366.934980] polltest_release line=49
3.3 select()/poll()流程詳解
從上面的分析可以看出select()/poll()的內核實現其實大同小異,最終都是調用具體文見的poll()函數查詢狀態。
select()和poll()系統調用有如下幾個重要的結構體:struct poll_wqueues、struct poll_table_struct、struct poll_table_page、struct poll_table_entry,其中poll()還有兩個文件句柄相關結構體:struct poll_list、struct pollfd。
其中struct poll_wqueues是核心,用來統一輔助實現這個進程中所有監測fd的輪訓工作。
struct poll_list {-------------------------------------------------一次poll()的poll_list可能有多個,通過next鏈接起來。一個poll_list里可以有多個pollfd。 struct poll_list *next; int len;-------------------------------------------------------下面entries[]的數目。 struct pollfd entries[0]; }; struct pollfd {---------------------------------------------------和用戶空間一致的數據結構,表示一個poll句柄。 int fd; short events; short revents; }; struct poll_wqueues { poll_table pt; struct poll_table_page *table;----------------------------------指向poll_table_page類型頁面,多個頁面可以互相鏈接起來。 struct task_struct *polling_task;-------------------------------保存當前調用select()/poll()的進程struct task_struct結構體。 int triggered;--------------------------------------------------當前用戶進程被喚醒后置位,以免該進程接着睡眠。 int error;------------------------------------------------------錯誤碼。 int inline_index;-----------------------------------------------數組inline_entries[]的下標。 struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES]; };
struct poll_table_page {------------------------------------------申請的物理頁面都會將起始地址強制轉換成該結構體指針。
struct poll_table_page * next;---------------------------------指向下一個申請的物理頁面地址。
struct poll_table_entry * entry;-------------------------------指向entries[]中首個待分配poll_table_entry地址。
struct poll_table_entry entries[0];----------------------------該頁后面剩余的空間都是待分配的poll_table_entry結構體。
};
typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *); typedef struct poll_table_struct { poll_queue_proc _qproc; unsigned long _key; } poll_table; struct poll_table_entry { struct file *filp;--------------------------------------------指向特定fd對應的file結構體。 unsigned long key;--------------------------------------------等待特定fd對應硬件設備的事件掩碼,如POLLIN、POLLOUT等。 wait_queue_t wait;--------------------------------------------代表調用select()/poll()的應用程序,等待在fd對應設備的特定時間的等待隊列頭上的等待隊列項。 wait_queue_head_t *wait_address;------------------------------設備驅動中特定時間的等待隊列頭。 };
一次select()/poll()調用對應一個strcut poll_wqueue結構體;struct poll_table_struct就是一個函數和一個key值。
一次select()/poll()可能包含一個或者多個struct poll_table_page,這個可能根據文件句柄的數量而變化。
一個fd對應一個struct poll_table_entry。
所以fd、struct poll_fd、strcut poll_table_entry是一一對應的;一次select()/poll()和struct poll_wqueues、strcut poll_table_strcut是一一對應的。
這兩個結構體都通過poll_initwait()初始化,然后在poll_wait()的時候調用poll_queue_proc函數。
poll_initwait()是select()/poll()中對相關結構體進行初始化的入口;poll_wait()是驅動中實現file_operations成員poll()函數的主要功能,它將此次調用轉換成一個wait_queue_t放入到驅動的等待隊列中。
然后wake_up_interruptible()是喚醒所有wait_queue_head_t上的等待項,這些等待項調用對應的func()函數;默認是用來喚醒default_wake_function()。
void poll_initwait(struct poll_wqueues *pwq) { init_poll_funcptr(&pwq->pt, __pollwait);-----------------------------------------------------------------初始化struct poll_table,函數是__pollwait。在后面poll_wait()會被調用。 pwq->polling_task = current;-----------------------------------------------------------------------------polling_task指向當前進程結構體。 pwq->triggered = 0; pwq->error = 0; pwq->table = NULL; pwq->inline_index = 0; } static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc) { pt->_qproc = qproc; pt->_key = ~0UL; /* all events enabled */ } static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)-------------wait_address是驅動程序提供的等待隊列頭,來容納后需等待該硬件設備就緒的進程對應的等待隊列項。p是系統調用傳下來的struct poll_table_strcut。 { if (p && p->_qproc && wait_address) p->_qproc(filp, wait_address, p);-----------------------------------------------------------------------調用__pollwait()函數。 } /* Add a new entry */ static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p) { struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt); struct poll_table_entry *entry = poll_get_entry(pwq);-------------------------------------------------------首先從struct poll_wqueues->inline_entries[]中獲取poll_table_entry;如果不夠用則通過盛情一個頁面轉換成struct poll_table_page類型指針。 if (!entry) return; entry->filp = get_file(filp); entry->wait_address = wait_address; entry->key = p->_key; init_waitqueue_func_entry(&entry->wait, pollwake);----------------------------------------------------------等待隊列項的操作函數指定為pollwake(),這個函數作用就是喚醒polling_task對應的進程。 entry->wait.private = pwq;----------------------------------------------------------------------------------私有變量指向pwq,在__pollwake()中會使用到pwq->polling_task來喚醒對應進程。 add_wait_queue(wait_address, &entry->wait);-----------------------------------------------------------------將poll_table_entry對應的wait加入到驅動的wait_queue_head_t上。 }
static struct poll_table_entry *poll_get_entry(struct poll_wqueues *p)
{
struct poll_table_page *table = p->table;
if (p->inline_index < N_INLINE_POLL_ENTRIES)
return p->inline_entries + p->inline_index++;
if (!table || POLL_TABLE_FULL(table)) {
struct poll_table_page *new_table;
new_table = (struct poll_table_page *) __get_free_page(GFP_KERNEL);--------------------------------------申請一個空閑頁面,轉換成struct poll_table_page類型指針;在poll_freewait()中被釋放。
if (!new_table) {
p->error = -ENOMEM;
return NULL;
}
new_table->entry = new_table->entries;
new_table->next = table;
p->table = new_table;
table = new_table;
}
return table->entry++;
}
static int pollwake(wait_queue_t *wait, unsigned mode, int sync, void *key) { struct poll_table_entry *entry; entry = container_of(wait, struct poll_table_entry, wait);--------------------------------------------------根據wait找到struct poll_table_entry,進而獲得關注的事件值key。 if (key && !((unsigned long)key & entry->key)) return 0; return __pollwake(wait, mode, sync, key); } static int __pollwake(wait_queue_t *wait, unsigned mode, int sync, void *key) { struct poll_wqueues *pwq = wait->private;------------------------------------------------------------------在__pollwait()中設置變量,此處使用其polling_task成員。 DECLARE_WAITQUEUE(dummy_wait, pwq->polling_task);----------------------------------------------------------定義一個臨時的wait_queue_t,給下面的default_wake_functio做參數。 smp_wmb(); pwq->triggered = 1; return default_wake_function(&dummy_wait, mode, sync, key);------------------------------------------------喚醒pwq->polling_task對應的線程。 } int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags, void *key) { return try_to_wake_up(curr->private, mode, wake_flags); }
釋放poll_wqueues申請的內存,主要是struct poll_table_page對應的頁面。
poll_schedule_timeout()實現超時功能。
void poll_freewait(struct poll_wqueues *pwq) { struct poll_table_page * p = pwq->table; int i; for (i = 0; i < pwq->inline_index; i++) free_poll_entry(pwq->inline_entries + i);------------------------------------------這里將poll_table_entry->wait從其所在隊列頭移出。在poll_wait()中被添加進去。 while (p) { struct poll_table_entry * entry; struct poll_table_page *old; entry = p->entry; do { entry--; free_poll_entry(entry); } while (entry > p->entries); old = p; p = p->next; free_page((unsigned long) old); } }
static void free_poll_entry(struct poll_table_entry *entry)
{
remove_wait_queue(entry->wait_address, &entry->wait);
fput(entry->filp);
}
int poll_schedule_timeout(struct poll_wqueues *pwq, int state, ktime_t *expires, unsigned long slack) { int rc = -EINTR; set_current_state(state); if (!pwq->triggered) rc = schedule_hrtimeout_range(expires, slack, HRTIMER_MODE_ABS);-------------------調用schedule_hrtimeout_range()來完成timeout功能。 __set_current_state(TASK_RUNNING); smp_store_mb(pwq->triggered, 0); return rc; }
wake_up_interruptible()用於執行等待隊列wake_queue_head_t上的所有wait_queue_t對應func函數,這里對應的就是pollwake(),即喚醒對應的進程。
#define wake_up_interruptible(x) __wake_up(x, TASK_INTERRUPTIBLE, 1, NULL) void __wake_up(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, void *key) { unsigned long flags; spin_lock_irqsave(&q->lock, flags); __wake_up_common(q, mode, nr_exclusive, 0, key); spin_unlock_irqrestore(&q->lock, flags); } static void __wake_up_common(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, int wake_flags, void *key) { wait_queue_t *curr, *next; list_for_each_entry_safe(curr, next, &q->task_list, task_list) {------------------------q->task_list是所有wait_queue_head_t上wait_queue_t對應的進程鏈表,遍歷鏈表上進程結構體,進而找到對應的wait_queue_t。然后調用wait_queue_t->func函數。 unsigned flags = curr->flags; if (curr->func(curr, mode, wake_flags, key) && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)------------------------------ break; } }
void add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait)------------------------------將wait_queue_t加入到wait_queue_head_t的時候通過將wait_queue_t->task_list加入到wait_queue_head_t->task_list鏈表上。
{
unsigned long flags;
wait->flags &= ~WQ_FLAG_EXCLUSIVE;
spin_lock_irqsave(&q->lock, flags);
__add_wait_queue(q, wait);
spin_unlock_irqrestore(&q->lock, flags);
}
static inline void __add_wait_queue(wait_queue_head_t *head, wait_queue_t *new)
{
list_add(&new->task_list, &head->task_list);
}
3.4 select()和poll()比較
select()和poll()基於同樣的內核機制poll_wqueues,不同點在於select()區分讀寫異常句柄等。
3.4.1 select()和poll()共同點
select()和poll()使用了相同的內核poll例程集合,
3.4.2 select()和poll()的區別
- select()檢查的文件描述符數量有一個限制(FD_SETSIZE),默認是1024;而poll()對於被檢查的文件描述符則沒有限制。
- select()參數fd_set同時也保存結果,所以每次重復使用前必須重新初始化fd_set;poll()通過獨立字段events和revents來區分,則不需要每次都重新初始化參數。
- select()提供微秒級的超時精度,poll()只提供毫秒級的超時精度。
- 歷史上,select()比poll()移植性更好。
- 當檢查范圍較小時或者待檢查大量文件描述符分布很密集,select()和poll()兩者性能相似。
- 如果被檢查文件描述符分布很稀疏,且只有一個或幾個要被檢查,poll()性能表現優於select()。
3.4.3 select()和poll()的局限性
當select()和poll()用來檢查大量文件描述符時,可能會遇到一些問題。
- 檢查所有文件描述符耗時大:每次調用select()或poll()內核都必須檢查所有被指定的文件描述符,當檢查大量處於密集范圍內的文件描述符時,該操作耗費的時間將大大超過接下來的操作。
- 用戶<->內核數據來回拷貝耗時大:每次調用select()或poll()程序都必須傳遞一個表示所有需要被檢查文件描述符的數據結構到內核,內核檢查過后再傳回給程序。從用戶空間和內核空間來回拷貝這些數據將占用大量的CPU時間。
- 返回數據檢查所有數據:select()或poll()調用完成后,必須檢查返回數據結構中的每個元素,以此查明哪個文件描述符處於就緒態。
- 程序重復使用這些文件描述符集合,但是內核並不會在每次調用成功后記錄他們。
如上種種特性造成select()或poll()在性能延展性上要低於信號驅動IO和epoll()。
4. epoll
epoll API有以下優點:
- 當檢查大量文件描述符時,epoll性能延展性比select()和poll()高很多。
- epoll API既支持水平觸發也支持邊緣觸發。select/poll只支持水平觸發,而信號驅動IO只支持邊緣觸發。
性能表現上,epoll通信號驅動IO相似。但epoll有一些勝過信號驅動IO:
- 可以避免復雜的信號處理流程。
- 靈活性高,可以指定我們希望檢查的事件類型。
4.1 epoll APIs
epoll API由以下三個系統調用組成:
epoll_create()創建一個epoll實例,返回代表該實例的文件描述符。
epoll_ctl()操作同epoll實例相關聯的興趣列表。通過epoll_ctl()可以增加到新的描述符到列表中,將已有的文件描述符從該列表中移除,以及修改代表文件描述符上時間類型的掩碼。
epoll_wait()返回與epoll實例相關聯的就緒列表中的成員。
#include <sys/epoll.h> int epoll_create(int size);
epoll_create()創建了一個新的epoll實例,其對應的興趣列表初始化為空。
參數size指定了想要通過epoll實例老檢查的文件描述符個數。Linux 2.6.8以來,size參數被忽略不用。
返回值代表新創建的epoll實例的文件描述符,這個文件描述符在其他幾個epoll系統調用中用來表示epoll實例。
當文件描述符不再需要時,可以通過close()關閉。
struct epoll_event { uint32_t events; /* epoll events (bit mask) */ epoll_data_t data; /* User data */ }; typedef union epoll_data { void *ptr; /* Pointer to user-defined data */ int fd; /* File descriptor */ uint32_t u32; /* 32-bit integer */ uint64_t u64; /* 64-bit integer */ } epoll_data_t; #include <sys/epoll.h> int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev);
參數epfd是epoll_create()創建的文件描述符。
參數fd指明了要修改興趣列表中的哪一個文件描述符的設定。
參數op用來指定需要執行的操作,可以是:
- EPOLL_CTL_ADD:將描述符fd添加到epoll實例中的興趣列表中去。
- EPOLL_CTL_MOD:修改描述符fd上設定的事件,需要用到由ev所指向的結構體中的信息。
- EPOLL_CTL_DEL:將描述符fd從epfd的興趣列表中移除。
參數ev為文件描述符fd所做的設置如下:
- 結構體epoll_event中的events字段是一個位掩碼,指定了我們為待檢查的描述符fd上所感興趣的事件集合。
- data字段是一個聯合體,當描述符fd稍后成為就緒態時,聯合體成員可用來指定船會給調用進程的信息。
max_user_watches上限
每個注冊到epoll實例上的文件描述符都占用一小段不能被交換的內核交換空間。
內核提供了一個接口用來定義每個用戶可以注冊到epoll實例上的文件描述符總數,這個上限值可以通過max_user_watches來修改,這個文件在/proc/sys/fs/epoll/max_user_watches。
#include <sys/epoll.h> int epoll_wait(int epfd, struct epoll_event *evlist, int maxevents, int timeout);
參數evlist指向的結構體數組中返回的是有關就緒態文件描述符的信息。數組evlist的空間由調用者負責申請,所包含的元素個數在參數maxevents中指定。
數組evlist中,每個元素返回的都是單個就緒態文件描述符的信息。events字段返回了在該描述符上已經發生的事件掩碼。data字段返回的是使用epoll_ctl()注冊感興趣的事件時在ev.data中所指定的值。
參數timeout用來確定epoll_wait()的阻塞行為:
- 如果timeout為-1,調用將一直阻塞,直到興趣列表中的文件描述符有事件產生,或者知道捕獲到一個信號為止。
- 如果timeout為0,執行一次非阻塞式檢查,看興趣列表中文件描述符產生了哪個事件。
- 如果timeout大於0,調用將阻塞至多timeout毫秒,直到文件描述符上有事件發生,或者直到捕獲到一個信號為止。
調用成功后epoll_wait()返回數組evlist中的元素個數,如果在timeout超時間隔內沒有任何文件描述符處於就緒態,返回0.
出錯時返回-1,並在errno總設定錯誤碼以表示錯誤原因。
epoll事件
調用epoll_ctl()時可以在ev.events中指定位掩碼以及由epoll_wait()返回的evlist[].events中給出。
位掩碼 | epoll_ctl()輸入 | epoll_wait()返回 | 描述 |
EPOLLIN | √ | √ | 可讀取非高優先級的數據 |
EPOLLPRI | √ | √ | 可讀取高優先級的數據 |
EPOLLRDHUP | √ | √ | |
EPOLLOUT | √ | √ | 普通數據可寫 |
EPOLLET | √ | 采用邊緣觸發事件通知 | |
EPOLLONESHOT | √ | 在完成事件通知之后禁用檢查 | |
EPOLLERR | √ | 有錯誤發生 | |
EPOLLHUP | √ | 出現掛斷 |
EPOLLONESHOT標志
一旦通過epoll_ctl()的EPOLL_CTL_ADD將文件描述符添加到epoll實例興趣列表中,它會保持激活狀態,直到顯式地通過EPOLL_CTL_DEL操作將其從列表中移除。
如果我們希望在某個特定的文件描述符上只得到一次通知,那么可以在ev.events中指定EPOLLONESHOT標志。
4.2 epoll測試程序
epoll API測試程序內核部分可以和select/poll共用,用戶空間測試程序如下。
#include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <stdio.h> #include <memory.h> #include <unistd.h> #include <stdlib.h> #include <sys/epoll.h> #include <errno.h> #define POLLTEST_NAME "/dev/polltest0" #define LOOP_COUNT 3 #define MAX_EVENTS 5 void main(void) { int epfd, i; struct epoll_event ev; struct epoll_event evlist[MAX_EVENTS]; int fd, loop_count = 0; unsigned char polltest_count[30]; int ret; fd = open(POLLTEST_NAME, O_RDWR); if (fd < 0) { printf("%s() %s %s\n", __func__, strerror(errno), POLLTEST_NAME); return; } epfd = epoll_create(MAX_EVENTS);---------------------------------------------里面的size其實已經不重要。 if(epfd == -1) { printf("%s() %s\n", __func__, strerror(errno)); return; } ev.data.fd = fd; ev.events = EPOLLIN; if(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) == -1)----------------------------增加監控fd文件句柄上EPOLLIN事件。 { printf("%s() %s\n", __func__, strerror(errno)); return; } while(1) { ret = epoll_wait(epfd, evlist, MAX_EVENTS, -1);--------------------------在epfd上等待,直到fd上有事件發生。 if (ret == 0) { printf("time out\n"); } else if(ret == -1) { printf("%s() %s\n", __func__, strerror(errno)); } else { for(i = 0; i < ret; i++) { if(evlist[i].events & EPOLLIN)-----------------------------------檢查事件類型,並從fd中讀取信息。 { read(fd, &polltest_count, 0); loop_count = atoi(polltest_count); printf("epoll key_val = %d\n", loop_count); if(loop_count >= LOOP_COUNT) return; } } } } }
4.3 epoll API內核解析
4.3.1 epoll_create()
struct eventpoll { spinlock_t lock; struct mutex mtx; /* Wait queue used by sys_epoll_wait() */ wait_queue_head_t wq;-----------------------------后面epoll_wait()使用的等待隊列頭。 /* Wait queue used by file->poll() */ wait_queue_head_t poll_wait;----------------------file->poll()使用的等待隊列頭。 /* List of ready file descriptors */ struct list_head rdllist;-------------------------准備就緒的文件描述符鏈表,在epoll_wait()中返回給用戶空間。 /* RB tree root used to store monitored fd structs */ struct rb_root rbr; struct epitem *ovflist; struct wakeup_source *ws; struct user_struct *user; struct file *file; int visited; struct list_head visited_list_link; }; struct epitem { union { struct rb_node rbn; struct rcu_head rcu; }; /* List header used to link this structure to the eventpoll ready list */ struct list_head rdllink; struct epitem *next; struct epoll_filefd ffd; int nwait; struct list_head pwqlist; struct eventpoll *ep; struct list_head fllink; struct wakeup_source __rcu *ws; /* The structure that describe the interested events and the source fd */ struct epoll_event event; }; struct epoll_filefd { struct file *file; int fd; } __packed;
SYSCALL_DEFINE1(epoll_create1, int, flags) { int error, fd; struct eventpoll *ep = NULL; struct file *file; /* Check the EPOLL_* constant for consistency. */ BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC); if (flags & ~EPOLL_CLOEXEC) return -EINVAL; /* * Create the internal data structure ("struct eventpoll"). */ error = ep_alloc(&ep);----------------------------------------------分配一個struct eventpoll結構體,並且初始化。 if (error < 0) return error; /* * Creates all the items needed to setup an eventpoll file. That is, * a file structure and a free file descriptor. */ fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));-------------在當前進程中,尋找一個空閑的文件描述符並返回;同時對文件設置O_RDWR等標志。 if (fd < 0) { error = fd; goto out_free_ep; } file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep, O_RDWR | (flags & O_CLOEXEC));-------------------------創建一個匿名inode文件。 if (IS_ERR(file)) { error = PTR_ERR(file); goto out_free_fd; } ep->file = file;----------------------------------------------------將struct eventpoll和struct file關聯起來。 fd_install(fd, file);-----------------------------------------------將fd和file在current->files->fdt中關聯起來。 return fd; out_free_fd: put_unused_fd(fd); out_free_ep: ep_free(ep); return error; } SYSCALL_DEFINE1(epoll_create, int, size) { if (size <= 0) return -EINVAL; return sys_epoll_create1(0); } static int ep_alloc(struct eventpoll **pep) { int error; struct user_struct *user; struct eventpoll *ep; user = get_current_user(); error = -ENOMEM; ep = kzalloc(sizeof(*ep), GFP_KERNEL);-----------------------分配一個struct eventpoll結構體。 if (unlikely(!ep)) goto free_uid; spin_lock_init(&ep->lock);-----------------------------------初始化自旋鎖、等待隊列、紅黑樹等。 mutex_init(&ep->mtx); init_waitqueue_head(&ep->wq); init_waitqueue_head(&ep->poll_wait); INIT_LIST_HEAD(&ep->rdllist); ep->rbr = RB_ROOT; ep->ovflist = EP_UNACTIVE_PTR; ep->user = user; *pep = ep; return 0; free_uid: free_uid(user); return error; } int get_unused_fd_flags(unsigned flags) { return __alloc_fd(current->files, 0, rlimit(RLIMIT_NOFILE), flags);----------current->files表示當前進程打開文件相關信息,通過RLIMIT_NOFILE可以獲取系統最大打開文件數目。 } int __alloc_fd(struct files_struct *files, unsigned start, unsigned end, unsigned flags)------------------------__alloc_fd()在start和end之間,尋找一個合適的fd返回。 { unsigned int fd; int error; struct fdtable *fdt; spin_lock(&files->file_lock); repeat: fdt = files_fdtable(files); fd = start; if (fd < files->next_fd) fd = files->next_fd; if (fd < fdt->max_fds) fd = find_next_fd(fdt, fd); error = -EMFILE; if (fd >= end) goto out; error = expand_files(files, fd); if (error < 0) goto out; if (error) goto repeat; if (start <= files->next_fd) files->next_fd = fd + 1; __set_open_fd(fd, fdt);-----------------------------------------------------將fd設置為打開狀態。 if (flags & O_CLOEXEC) __set_close_on_exec(fd, fdt); else __clear_close_on_exec(fd, fdt); error = fd; ... out: spin_unlock(&files->file_lock); return error; }
系統調用epoll_create()就是進程在內核中創建了一個從epoll文件描述符到struct event結構的通道;首先調用ep_alloc()分配一個struct eventpoll數據結構,並初始化;然后尋找一個空閑的文件描述符,並創建一個匿名文件inode;最后將兩者在進程task_struct結構體中關聯起來。
最終返回打開的文件描述符。
后面的epoll_ctl()和epoll_wait()都可以通過此文件描述符關聯到內核中的struct eventpoll結構體。
4.3.2 epoll_ctl()
struct ep_pqueue { poll_table pt; struct epitem *epi; }; typedef struct poll_table_struct { poll_queue_proc _qproc; unsigned long _key; } poll_table; typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *);
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd, struct epoll_event __user *, event) { int error; int full_check = 0; struct fd f, tf; struct eventpoll *ep; struct epitem *epi; struct epoll_event epds; struct eventpoll *tep = NULL; error = -EFAULT; if (ep_op_has_event(op) && copy_from_user(&epds, event, sizeof(struct epoll_event)))--------------------將用戶空間的event拷貝到內核的epds中。 goto error_return; error = -EBADF; f = fdget(epfd);-----------------------------------------------------------------根據epfd找到對應的struct file結構體。 if (!f.file) goto error_return; tf = fdget(fd);------------------------------------------------------------------獲取目標文件的struct file結構體。 if (!tf.file) goto error_fput; /* The target file descriptor must support poll */ error = -EPERM; if (!tf.file->f_op->poll) goto error_tgt_fput; /* Check if EPOLLWAKEUP is allowed */ if (ep_op_has_event(op)) ep_take_care_of_epollwakeup(&epds); error = -EINVAL; if (f.file == tf.file || !is_file_epoll(f.file)) goto error_tgt_fput; if (epds.events & EPOLLEXCLUSIVE) { if (op == EPOLL_CTL_MOD) goto error_tgt_fput; if (op == EPOLL_CTL_ADD && (is_file_epoll(tf.file) || (epds.events & ~EPOLLEXCLUSIVE_OK_BITS))) goto error_tgt_fput; } ep = f.file->private_data; mutex_lock_nested(&ep->mtx, 0); if (op == EPOLL_CTL_ADD) { if (!list_empty(&f.file->f_ep_links) || is_file_epoll(tf.file)) { full_check = 1; mutex_unlock(&ep->mtx); mutex_lock(&epmutex); if (is_file_epoll(tf.file)) { error = -ELOOP; if (ep_loop_check(ep, tf.file) != 0) { clear_tfile_check_list(); goto error_tgt_fput; } } else list_add(&tf.file->f_tfile_llink, &tfile_check_list); mutex_lock_nested(&ep->mtx, 0); if (is_file_epoll(tf.file)) { tep = tf.file->private_data; mutex_lock_nested(&tep->mtx, 1); } } } epi = ep_find(ep, tf.file, fd);-------------------------------------------------在ep->rbr紅黑樹中查找file對應的文件,如果沒有找到返回NULL。 error = -EINVAL; switch (op) { case EPOLL_CTL_ADD: if (!epi) {-----------------------------------------------------------------如果之前ep_find()沒有找到對應的epi,那么此處插入到ep->rbr紅黑樹中。 epds.events |= POLLERR | POLLHUP; error = ep_insert(ep, &epds, tf.file, fd, full_check); } else error = -EEXIST; if (full_check) clear_tfile_check_list(); break; case EPOLL_CTL_DEL: if (epi) error = ep_remove(ep, epi);---------------------------------------------在ep_find()找到epi的情況下,將其從ep->rbr中移除。 else error = -ENOENT; break; case EPOLL_CTL_MOD: if (epi) { if (!(epi->event.events & EPOLLEXCLUSIVE)) { epds.events |= POLLERR | POLLHUP; error = ep_modify(ep, epi, &epds);----------------------------------修改epi的事件類型。 } } else error = -ENOENT; break; } if (tep != NULL) mutex_unlock(&tep->mtx); mutex_unlock(&ep->mtx); ... return error; } static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd, int full_check) { int error, revents, pwake = 0; unsigned long flags; long user_watches; struct epitem *epi; struct ep_pqueue epq; user_watches = atomic_long_read(&ep->user->epoll_watches); if (unlikely(user_watches >= max_user_watches)) return -ENOSPC; if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))-------------------------從slab中分配緩存struct epitem結構體空間。 return -ENOMEM; /* Item initialization follow here ... */ INIT_LIST_HEAD(&epi->rdllink);------------------------------------------------初始化epi數據結構。 INIT_LIST_HEAD(&epi->fllink); INIT_LIST_HEAD(&epi->pwqlist); epi->ep = ep; ep_set_ffd(&epi->ffd, tfile, fd); epi->event = *event; epi->nwait = 0; epi->next = EP_UNACTIVE_PTR; if (epi->event.events & EPOLLWAKEUP) { error = ep_create_wakeup_source(epi); if (error) goto error_create_wakeup_source; } else { RCU_INIT_POINTER(epi->ws, NULL); } epq.epi = epi; init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);-----------------------------將epq.pt->_qproc指向ep_ptable_queue_proc()。設置后面poll()等待隊列被喚醒的時候,將要調用到的函數。 revents = ep_item_poll(epi, &epq.pt);-----------------------------------------調用epi對應文件的poll()函數,插入到poll()的等待隊列中。;返回的事件類型時用戶空間關心事件類型的交集。 error = -ENOMEM; if (epi->nwait < 0) goto error_unregister; spin_lock(&tfile->f_lock); list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links); spin_unlock(&tfile->f_lock); ep_rbtree_insert(ep, epi);----------------------------------------------------將新的epi插入到ep->rbr的紅黑樹中。 ... if (pwake) ep_poll_safewake(&ep->poll_wait); return 0; ... return error; } static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt) { struct epitem *epi = ep_item_from_epqueue(pt); struct eppoll_entry *pwq;---------------------------------------------------------struct epoll_entry主要完成struct epitem和和epitem事件發生時的callback函數之間的關聯。 if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {---------申請一個struct epoll_entry結構體緩存。 init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);----------------------初始化等待隊列函數的入口,也就是poll醒來時要調用的回調函數。 pwq->whead = whead; pwq->base = epi; if (epi->event.events & EPOLLEXCLUSIVE) add_wait_queue_exclusive(whead, &pwq->wait); else add_wait_queue(whead, &pwq->wait);---------------------------------------將pwq->wait加入到等待隊列中。 list_add_tail(&pwq->llink, &epi->pwqlist); epi->nwait++; } else { /* We have to signal that an error occurred */ epi->nwait = -1; } } static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)---主要功能試在被監視的文件等待時間就緒時,將文件對應的epitem實例添加到就緒列表中,當用戶調用epoll_wait()時,內核會將就緒隊列中的時間報告給用戶。 { int pwake = 0; unsigned long flags; struct epitem *epi = ep_item_from_wait(wait); struct eventpoll *ep = epi->ep; int ewake = 0; if ((unsigned long)key & POLLFREE) { ep_pwq_from_wait(wait)->whead = NULL; list_del_init(&wait->task_list); } spin_lock_irqsave(&ep->lock, flags); if (!(epi->event.events & ~EP_PRIVATE_BITS)) goto out_unlock; if (key && !((unsigned long) key & epi->event.events)) goto out_unlock; if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) { if (epi->next == EP_UNACTIVE_PTR) { epi->next = ep->ovflist; ep->ovflist = epi; if (epi->ws) { __pm_stay_awake(ep->ws); } } goto out_unlock; } if (!ep_is_linked(&epi->rdllink)) { list_add_tail(&epi->rdllink, &ep->rdllist); ep_pm_stay_awake_rcu(epi); } if (waitqueue_active(&ep->wq)) { if ((epi->event.events & EPOLLEXCLUSIVE) && !((unsigned long)key & POLLFREE)) { switch ((unsigned long)key & EPOLLINOUT_BITS) { case POLLIN: if (epi->event.events & POLLIN) ewake = 1; break; case POLLOUT: if (epi->event.events & POLLOUT) ewake = 1; break; case 0: ewake = 1; break; } } wake_up_locked(&ep->wq); } if (waitqueue_active(&ep->poll_wait)) pwake++; ... return 1; } static int ep_modify(struct eventpoll *ep, struct epitem *epi, struct epoll_event *event) { int pwake = 0; unsigned int revents; poll_table pt; init_poll_funcptr(&pt, NULL); epi->event.events = event->events; /* need barrier below */ epi->event.data = event->data; /* protected by mtx */ if (epi->event.events & EPOLLWAKEUP) { if (!ep_has_wakeup_source(epi)) ep_create_wakeup_source(epi); } else if (ep_has_wakeup_source(epi)) { ep_destroy_wakeup_source(epi); } smp_mb(); revents = ep_item_poll(epi, &pt); if (revents & event->events) { spin_lock_irq(&ep->lock); if (!ep_is_linked(&epi->rdllink)) { list_add_tail(&epi->rdllink, &ep->rdllist); ep_pm_stay_awake(epi); /* Notify waiting tasks that events are available */ if (waitqueue_active(&ep->wq)) wake_up_locked(&ep->wq); if (waitqueue_active(&ep->poll_wait)) pwake++; } spin_unlock_irq(&ep->lock); } /* We have to call this outside the lock */ if (pwake) ep_poll_safewake(&ep->poll_wait); return 0; } static int ep_remove(struct eventpoll *ep, struct epitem *epi)-------------------------------------將epi從當前ep中移除。 { unsigned long flags; struct file *file = epi->ffd.file; ep_unregister_pollwait(ep, epi); spin_lock(&file->f_lock); list_del_rcu(&epi->fllink); spin_unlock(&file->f_lock); rb_erase(&epi->rbn, &ep->rbr); spin_lock_irqsave(&ep->lock, flags); if (ep_is_linked(&epi->rdllink)) list_del_init(&epi->rdllink); spin_unlock_irqrestore(&ep->lock, flags); wakeup_source_unregister(ep_wakeup_source(epi)); call_rcu(&epi->rcu, epi_rcu_free); atomic_long_dec(&ep->user->epoll_watches); return 0; }
epoll_ctl() 函數首先就分配空間, 將結構從用戶空間復制到內核空間中, 在進行方法(op)判斷之前, 先采用ep_find函數進行查找, 以確保該數據已經設置好回調函數了, 然后使用fget函數獲取該epoll的匿名文件的文件描述符, 最后進行方法(op)判斷, 確定是EPOLL_CTL_ADD, EPOLL_CTL_MOD還是 EPOLL_CTL_DEL。
這里主要講的是EPOLL_CTL_ADD, 所以當是選擇加入時, 就調用ep_insert函數, 將回調函數設置為ep_ptable_queue_proc函數, 也就是將消息到達后, 需要自動啟動ep_ptable_proc函數, 進而調用ep_poll_callback函數, 該函數就是把來的消息所對應的結構和文件信息加入到就緒鏈表中, 以便之后調用 epoll_wait 可以直接從就緒隊列鏈表中奪得就緒的文件. 也正是這樣, epoll的回調函數使epoll不用每次都輪詢遍歷數據, 而是自動喚醒回調, 更加的高效. 並且回調函數也只是在進程加入的時侯才設置, 而且只設置一次.
4.3.3 epoll_wait()
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events, int, maxevents, int, timeout) { int error; struct fd f; struct eventpoll *ep; if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)----------------------------------判斷maxevents合法性。 return -EINVAL; if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) return -EFAULT; f = fdget(epfd);------------------------------------------------------------------獲取epoll_create()函數創建的匿名文件描述符。 if (!f.file) return -EBADF; error = -EINVAL; if (!is_file_epoll(f.file))-------------------------------------------------------通過判斷f.file->f_op是否為eventpoll_fops來確定是否為epoll文件。 goto error_fput; ep = f.file->private_data;--------------------------------------------------------私有數據指向struct eventpoll數據結構。 error = ep_poll(ep, events, maxevents, timeout);----------------------------------等待消息到來;沒有消息到來就阻塞自己。 error_fput: fdput(f); return error; } static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout) { int res = 0, eavail, timed_out = 0; unsigned long flags; u64 slack = 0; wait_queue_t wait; ktime_t expires, *to = NULL; if (timeout > 0) {----------------------------------------------------------------將用戶空間傳入的timeout事件轉換成內核超時時間。 struct timespec64 end_time = ep_set_mstimeout(timeout); slack = select_estimate_accuracy(&end_time); to = &expires; *to = timespec64_to_ktime(end_time); } else if (timeout == 0) { timed_out = 1; spin_lock_irqsave(&ep->lock, flags); goto check_events; } fetch_events: spin_lock_irqsave(&ep->lock, flags); if (!ep_events_available(ep)) { init_waitqueue_entry(&wait, current);-----------------------------------------初始化一個等待隊列入口,並將其添加到等待隊列上。 __add_wait_queue_exclusive(&ep->wq, &wait); for (;;) { set_current_state(TASK_INTERRUPTIBLE);------------------------------------進程設置為可中斷的。 if (ep_events_available(ep) || timed_out)---------------------------------退出的條件是是否有ready事件、是否超時、是否有信號中斷,三者任一則退出循環。 break; if (signal_pending(current)) { res = -EINTR; break; } spin_unlock_irqrestore(&ep->lock, flags); if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))---------------時間達到之前,讓出cpu調用其他進程;超時后,重新通過中斷回調該進程。 timed_out = 1; spin_lock_irqsave(&ep->lock, flags); } __remove_wait_queue(&ep->wq, &wait); __set_current_state(TASK_RUNNING); } check_events: eavail = ep_events_available(ep); spin_unlock_irqrestore(&ep->lock, flags); if (!res && eavail && !(res = ep_send_events(ep, events, maxevents)) && !timed_out)-----------------如果中間沒有被信號中斷,並且ep->rdlist不為空,則調用ep_send_events()給用戶空間發送消息。 goto fetch_events; return res; } static int ep_send_events(struct eventpoll *ep, struct epoll_event __user *events, int maxevents) { struct ep_send_events_data esed; esed.maxevents = maxevents; esed.events = events; return ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false); } static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv) { struct ep_send_events_data *esed = priv; int eventcnt; unsigned int revents; struct epitem *epi; struct epoll_event __user *uevent; struct wakeup_source *ws; poll_table pt; init_poll_funcptr(&pt, NULL); for (eventcnt = 0, uevent = esed->events; !list_empty(head) && eventcnt < esed->maxevents;) {---------------------------從就緒隊列去除一個個epi,進行處理;並將相關事件返回給用戶空間。 epi = list_first_entry(head, struct epitem, rdllink);--------------------------取出第一個消息數據對應的struct epitem結構,然后從就緒列表上移除。 ws = ep_wakeup_source(epi); if (ws) { if (ws->active) __pm_stay_awake(ep->ws); __pm_relax(ws); } list_del_init(&epi->rdllink); revents = ep_item_poll(epi, &pt);----------------------------------------------調用epi對應文件描述符的poll()函數,返回值是。 if (revents) { if (__put_user(revents, &uevent->events) || __put_user(epi->event.data, &uevent->data)) {--------------------------將revents和event.data發送到用戶空間。 list_add(&epi->rdllink, head); ep_pm_stay_awake(epi); return eventcnt ? eventcnt : -EFAULT; } eventcnt++;----------------------------------------------------------------返回給用戶空間的參數eventcnt遞增,uevent也遞增。 uevent++; if (epi->event.events & EPOLLONESHOT) epi->event.events &= EP_PRIVATE_BITS; else if (!(epi->event.events & EPOLLET)) { list_add_tail(&epi->rdllink, &ep->rdllist); ep_pm_stay_awake(epi); } } } return eventcnt; }
獲得匿名文件的文件指針, 在通過調用ep_poll函數, 進行時間片的設置, 在時間片結束后就緒隊列為空, 就退出等待; 如果時間設置的是負數, ep_poll函數會調用schedule_timeout, 執行進程調度, 當設置的時間片結束后又繼續回到ep_poll進程 查看就緒隊列是否為空, 為空的話就繼續cinching調度, 此時wait又變成阻塞態; 當就緒隊列准備好后, 就退出進程調度, 執行ep_send_events函數, 主要是為了將就緒隊列的鏈表從內核空間發送給用戶空間。
ep_send_events函數, 先將就緒鏈表rdllist復制到另一個新的鏈表, 重新將就緒鏈表清零, 然后程序調用__put_user將新鏈表的數據發送給用戶空間, 同時, 發送的個數吉拉路下來, 以便函數的返回 , 但是, 如果在發送的時候又有就緒信號到來, 就將來的就緒信號保存在ovflist鏈表中, 最后又重新數據拷貝給rdllist中, 再重復執行ep_send_events函數。
從對epoll的分析也可以看出,為什么性能要優於select()/poll()。
- epoll使用了三個API來達到select()/poll()同樣的功能,epoll將每次查詢不必要重復的部分和需要重復的部分區分開來。降低了不必要的開銷。
- 每次調用select()/poll()時,內核必須檢查所有在調用中指定的文件描述符。相反,通過epoll_ctl()制定了需要監視的文件描述符時,內核會在與打開的文件描述符上下文相關聯的列表中記錄該描述符。之后每當執行IO操作使得文件描述符成為就緒態時,內核就在epoll描述就緒列表中添加一個元素。之后的epoll_wait()就從就緒列表中簡單地去除這些元素。
- 每次調用select()/poll()時,傳遞一個標記了所有待監視的文件描述符的數據結構給內核,調用返回時,內核將所有標記為就緒態的文件描述符的數據結構在傳回到用戶空間。相反,在epoll中使用epoll_ctl()在內核空間中建立一個數據結構,該數據結構會將待監視的文件描述符都記錄下來。一旦這個數據機構建立完成,稍后每次調用epoll_wait()時就不需要再傳遞任何與文件描述符有關的信息給內核了,而調用返回的信息中只包含那些已經處於就緒態的描述符。
參考文檔:
《select(poll)系統調用實現解析(一)》《select(poll)系統調用實現解析(二)》《select(poll)系統調用實現解析(三)》《epoll源碼分析(一)》《epoll源碼分析(二)》《epoll源碼分析(三)》《Linux epoll模型詳解及源碼分析》《epoll源碼實現分析[整理]》《Linux下的I/O復用與epoll詳解》《epoll源碼分析(全)》《epoll內核源碼詳解+自己總結的流程》