select的內核實現原理


http://blog.csdn.net/shuxiaogd/article/details/50366039

0 前言

在學習網絡編程時,我們總是從最簡單的Server程序寫起:

socket -> bind -> listen -> accept -> read -> write -> return

再接下來,就是學習如何處理客戶端的並發請求。主要思路有:

  1. 使用多線程/多進程模型
  2. 使用IO多路復用模型
  3. 使用多線程 + IO多路復用模型

其中,使用IO多路復用模型,我們總是從select系統調用開始學起。但是,我們也總是聽到,select效率太低了,大型項目中也從來不使用select/poll,而是使用的epoll

那么,為什么select系統調用效率低下呢?而epoll又是作了哪方面改進,從而要高效很多呢?

本篇文章試圖通過跟蹤select的內核實現源碼,來解答為什么select不適用於高並發量的大型項目中。

1 阻塞調用與非阻塞調用

使用多路復用技術時,一般都是要求設置待監聽的文件為非阻塞模式。那么我們先來看一下什么是阻塞模式,什么是非阻塞模式。

概念描述

當read或write一個文件[注]時,如果read/write不能立即返回,那么調用者就會進入睡眠狀態,直到該文件變成可讀/可寫。這種模式就是阻塞模式。

注: 在UNIX文化中,一切皆文件。無論是普通磁盤文件,設備,還是網絡連接套接字,在虛擬文件系統中都是以文件的形式存在。用戶使用統一的read/write函數去讀寫文件,而不管文件在底層到底是如何存在的。因此,在Linux中,總是可以看到各種形式的“文件”,比如說eventfd,可以實現事件通知,等等。

以TCP連接為例,用戶空間read/write是與TCP緩沖區交互的,而不是直接同網卡驅動交互。當接收緩沖區內無數據時,read操作就無法立即返回,因此就會阻塞住調用進程,直到對方發送數據過來並由網卡驅動拷貝到接收緩沖區;同理,當網絡延時過大或接收方問題導致本端(發送方)發送緩沖區滿時,調用write無法將數據寫入到發送緩沖區(一個字節都沒法寫入),調用進程也會阻塞。

這里需要注意的是,阻塞與否,是取決於該文件的類型的,換句話說,是取決於該文件對應的設備驅動程序的。

比方說,對於普通磁盤文件,read/write總是可以立即返回的,除非磁盤出現故障。而對於網絡連接,或者管道等類型的文件,read/write是有可能發送阻塞的,而且是一種常態模式。

內核中的實現

阻塞調用在內核中的實現包含兩方面:

  • 阻塞方如何進入睡眠狀態
  • 驅動程序如何喚醒睡眠進程

先來明確一下,這兩個操作在哪里實現,或者說,由誰實現。

進程進入睡眠狀態,肯定是在調用read/write時由於某個條件未滿足(比如說read時接收緩沖區空),因此首先需要執行條件判斷。這個工作肯定是而且是只能由驅動程序來完成的。而在VFS中,read/write系統調用最終會調用驅動程序的內部實現(函數指針保存在struct file 的struct file_operations中)。因此,進程進入睡眠的工作,在驅動程序的read/write中實現。

同理,也是只有在特定的條件滿足了,才會去喚醒進程。當read阻塞時,是因為沒有數據可讀。那何時有新數據進來呢?對於本地文件,當然是有進程寫入數據了(調用write);而對於網絡連接,當然是由於對方發送數據過來。這兩個操作也都是在驅動程序的write中完成。

因此,進程的睡眠和喚醒動作都是在文件對應的設備驅動程序的read和write中實現的。

接下來,就描述一下進程如何睡眠,以及如何喚醒。其核心是一個叫做“等待隊列”的數據結構。

等待隊列

等待隊列基於struct list_head實現,包含一個頭結點和若干個隊列元素。

struct __wait_queue_head { spinlock_t lock; struct list_head task_list; };
  • 1
  • 2
  • 3
  • 4

頭結點包含一個指向第一個隊列元素的指針和一個保護隊列的自旋鎖:當插入(add_wait_queue)新元素或刪除(remove_wait_queue)舊元素時,均使用自旋鎖保證同步。

struct __wait_queue { unsigned int flags; void *private; wait_queue_func_t func; struct list_head task_list; };
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

等待隊列中的每個元素包含一個回調函數func,在喚醒該元素時調用。其中,private指針指向的是進程的task_struct結構,這樣在喚醒時才知道喚醒哪個進程。元素與元素間使用list_head連成雙向鏈表。

睡眠

調用以下任一函數,將進程睡眠在某個等待隊列上。

wait_event(wq, condition);
wait_event_timeout(wq, condition, timeout);

wait_event_interruptible(wq, condition);
wait_event_interruptible(wq, condition, timeout);
  • 1
  • 2
  • 3
  • 4
  • 5

上面兩組函數的區別,在於調用進程睡眠時的狀態:前者是TASK_UNINTERRUPTIBLE狀態,后者是TASK_INTERRUPTIBLE狀態,可以被信號打斷睡眠。

睡眠的過程分為四步:

  1. 判斷條件是否滿足,若滿足則無需睡眠;
  2. 否則,定義一個新的wait_queue_t隊列元素,插入到wq表示的等待隊列中去;
  3. 設置進程狀態;
  4. 調用schedule(),讓出CPU,調度其它進程。

需要注意的是,一個等待隊列上可能有多個元素,也就表示有多個進程在同時等待。因此,當條件滿足時,所有進程都會被喚醒。而且,第一個被喚醒后得到調度的進程可能會“消費”掉條件,因此所有被喚醒的進程在繼續執行后首先需要再次判斷條件是否滿足。

這也是很多代碼將睡眠操作放在while循環中的原因。

    /* wait event */ while(!condition) { /* 如果是非阻塞調用,那么直接返回 */ if (filp->f_flags & O_NONBLOCK) return -EAGAIN; if (wait_event_interruptible(wq, condition)) { /* 被信號中斷了 */ return -ERESTARTSYS; } } /* handle event */
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

如果設置了文件的O_NONBLOCK標志,也就是非阻塞模式,那么在條件不滿足時,直接返回-EAGAIN,不會進入睡眠狀態。

喚醒

與睡眠函數相對應,調用以下任一函數,將隊列上的進程喚醒

void wake_up(wait_event_head_t *queue); void wake_up_interruptible(wait_event_head_t *queue);
  • 1
  • 2

這兩個函數會喚醒某個隊列上睡眠着的所有進程,分為以下步驟:

  1. 對隊列中的每一個元素,調用其回調函數func;
  2. 如果回調函數返回非0值,那么看隊列元素的flag值,如果設置了WQ_FLAG_EXCLUSIVE標志,那么停止遍歷,不繼續喚醒其它進程;否則繼續喚醒下一個進程。

那么這個等待隊列的回調函數和flag是什么呢?在wait_event中定義的wait_event_t結構體的值如下所示:

wait_queue_t name = {
    .private = current, .func = autoremove_wake_function, .task_list = LIST_HEAD_INIT((name).task_list), };
  • 1
  • 2
  • 3
  • 4
  • 5

autoremove_wake_function:

  • 調用default_wake_function喚醒睡眠的進程,
  • 將其從等待隊列的list中刪除掉,成功會返回1。

而flag參數則在prepare_to_wait函數中被清除掉了WQ_FLAG_EXCLUSIVE標志。函數調用順序如下:

wait_event_interruptible -> DEFINE_WAIT -> prepare_to_wait -> schedule -> finish_wait

其中,prepare_to_wait完成睡眠的前3個步驟。

因此,wake_up_interruptible和wake_up會喚醒所有進程。

2 select系統調用

/* * @nfds: 待監聽的最大fd值+1 * @readfds: 待監聽的可讀文件fd集合 * @writefds: 待監聽的可寫文件fd集合 * @exceptfds: 待監聽的異常文件fd集合 * @timeout: 超時設置,在等待指定時間后返回超時 * return:返回滿足條件的fd數量和,如果出錯返回-1,如果是超時返回0 int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

正常返回之后,readfds/writefds/exceptfds會設置相應滿足條件的fd。

先來看一下fd_set到底是什么東西。

#define __NFDBITS (8 * sizeof(unsigned long)) #define __FD_SETSIZE 1024 #define __FDSET_LONGS (__FD_SETSIZE/__NFDBITS) typedef struct { unsigned long fds_bits [__FDSET_LONGS]; } __kernel_fd_set;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

在x86機器上,select最大支持1024個文件(__FD_SETSIZE)。在fd_set中每個文件使用一個bit表示,因此共需要__FDSET_LONGS個long型整數,放在一個數組里。

以下一組函數專門用來操作fd_set:

void FD_SET(int fd, fd_set *set); void FD_ZERO(fd_set *set); void FD_CLR(int fd, fd_set *set); int FD_ISSET(int fd, fd_set *set);
  • 1
  • 2
  • 3
  • 4

超時設置

如果要求select不是無止境地等待,可以指定在若干時間后直接返回0.超時時間由參數timeout指定,是一個timeval結構。

struct timeval { long tv_sec; /* seconds */ long tv_usec; /* microseconds */ };
  • 1
  • 2
  • 3
  • 4

但是,在內核里,確是用的timespec結構。

struct timespec { long tv_sec; /* seconds */ long tv_nsec; /* nanoseconds */ };
  • 1
  • 2
  • 3
  • 4

3 select的實現

接下來我們就可以研究一下select是如何實現的了。

假設讓我們來設計select,我們該如何實現呢?

3.1 一個簡單的select實現

我們先把問題簡化一下,假定select只監控文件可讀的情況,那么一個簡單的算法(偽代碼)如下所示:

count=0 FD_ZERO(&res_rset) for fd in read_set if( readable(fd) ) count++ FDSET(fd, &res_rset) break else add_to_wait_queue if count > 0 return count else wait_any_event return count
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

上面的偽代碼只演示了判斷文件可讀的條件。算法也很朴素,遍歷所有文件,如果有文件可讀,那么select無需阻塞,直接返回即可,並標記該文件可讀;否則,將調用進程加入到每個文件對應設備驅動的讀等待隊列中,並進入睡眠狀態。

該算法有幾個待解決的問題:

  1. 如何判斷文件當前是否可讀(或可寫),即如何實現readable(fd)函數?
  2. 如何將進程加入到驅動的讀等待隊列?如果調用wait_event_interruptible,那么進程在遇到第一個不可讀的fd后會立即進入睡眠,而無法繼續監聽其它文件。

仔細想一下,這兩個問題的解決方法都需要依賴具體的設備驅動才行。因此,為了方便select/poll函數的實現,在Linux中規定每一個支持select/poll監聽的文件所屬設備驅動必須實現struct file_operations中的poll函數:

3.2 poll文件操作

/* * @filp: 指向打開文件的指針 * @p: 傳入的poll_table指針 * return: 一個MASK,標志該文件當前狀態,例如可讀(POLLIN),可寫(POLLOUT) * 或出錯(POLLERR),或到達文件尾(POLLHUP) */ unsigned int (*poll) (struct file *filp, struct poll_table_struct *p);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

poll函數的工作有兩部分:

  • 判斷當前文件狀態,並在返回值中標記。
  • 對本驅動程序的等待隊列調用poll_wait函數。

poll_wait函數有什么用呢?

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p) { if (p && wait_address) p->qproc(filp, wait_address, p); }
  • 1
  • 2
  • 3
  • 4
  • 5

其實就是調用了poll_table的一個函數指針。這個poll_table是在select調用文件的poll函數時傳入的。

所以,回到本小節一開始提的2個問題。第1個問題已經解決了,而第2個問題可以在這個p->qproc中實現。

看一下Linux內核中select是如何實現這個函數的:

/* 1. poll_table存放在一個poll_wqueues結構體中 */ struct poll_wqueues { poll_table pt; struct poll_table_page *table; struct task_struct *polling_task;/* 指向睡眠進程的task_struct */ int triggered; int error; int inline_index; struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES]; }; /* 2. 在do_select中調用poll_initwait初始化poll_wqueues */ void poll_initwait(struct poll_wqueues *pwq) { init_poll_funcptr(&pwq->pt, __pollwait); /* 初始化poll_table */ pwq->polling_task = current; pwq->triggered = 0; pwq->error = 0; pwq->table = NULL; pwq->inline_index = 0; }; /* 3. p->qproc指向的就是__pollwait函數 */ static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p) { struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt); struct poll_table_entry *entry = poll_get_entry(pwq); if (!entry) return; get_file(filp); entry->filp = filp; entry->wait_address = wait_address; entry->key = p->key; init_waitqueue_func_entry(&entry->wait, pollwake); entry->wait.private = pwq; add_wait_queue(wait_address, &entry->wait); }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

__pollwait中將進程加入到文件的等待隊列中。select為管理所有監聽的文件,為每個文件分配了一個poll_table_entry結構。poll_table_entry包含文件信息、等待隊列頭以及等待隊列元素。

poll_table_entry的內存管理

poll_wqueues如何為新的poll_table_entry對象分配內存呢?

它采取的是靜態分配 與 動態分配相結合的方式。

poll_table_entry_mm

如果只有少量文件,那么將文件的poll_table_entry放在poll_wqueues結構體里面的數組里面,避免了額外的內存分配工作;如果有很多文件,那么就分配額外的poll_table_page。

每個poll_table_page占據一個內存頁的大小。所有的poll_table_page連成一條鏈表。

struct poll_table_page { struct poll_table_page * next; struct poll_table_entry * entry; struct poll_table_entry entries[0]; };
  • 1
  • 2
  • 3
  • 4
  • 5

喚醒時的回調函數pollwake

注意到,在__pollwait中重新設置wait_queue_t的回調函數指針,而不是使用wait_event_interruptible中設置的autoremove_wake_function。

pollwake調用了__pollwake,而后者在設置了pwq->triggered為1后,直接調用default_wake_function,以喚醒pwq->polling_task進程。也就是調用select系統調用從而睡眠的進程。

因此,任何一個文件可讀/可寫/出錯時,均會觸發pollwake被調用,從而睡眠的進程繼續執行。

最后一個問題

在進程被喚醒后,如何知道是哪個文件的驅動喚醒的呢?因為在

3.3 內核中select的實現

跟蹤sys_select的內核實現,其大致過程如下:

  1. 拷貝用戶空間的timeout對象到內核空間end_time,並重新設定時間值(標准化處理)
  2. 調用core_sys_select。過程如下: 
    1. 根據傳入的maxfd值,計算保存所有fd需要多少字節(每fd占1bit),然后判斷是在棧上分配內存還是在堆中分配內存。共需要分配6個fdset:用戶傳入的in, out, exception以及要返回給用戶的res_in,res_out和res_exception。
    2. 將3個輸入fdset從用戶空間拷貝到內核空間,並初始化輸出的fdset為0;
    3. 調用do_select,獲得返回值ret。do_select的工作就是初始化poll_wqueues對象,並調用驅動程序的poll函數。類似於我們寫的簡單的select。過程如下所示: 
      1. 調用poll_initwait初始化poll_wqueues對象table,包括其成員poll_table;
      2. 如果用戶傳入的timeout不為NULL,但是設定的時間為0,那么設置poll_table指針wait(即 &table.pt)為NULL;
      3. 將in,out和exception進行或運算,得到all_bits,然后遍歷all_bits中bit為1的fd,根據進程的fd_table查找到file指針filp,然后設置wait的key值(POLLEX_SET, POLLIN_SET,POLLIN_SET三者的或運算,取決於用戶輸入),並調用filp->poll(filp, wait),獲得返回值mask。 再根據mask值檢查該文件是否立即滿足條件,如果滿足,設置res_in/res_out/res_exception的值,執行retval++, 並設置wait為NULL。
      4. 在每遍歷32(取決於long型整數的位數)個文件后,調用1次cond_resched(),主動尋求調度,可以等待已經遍歷過的文件是否有喚醒的;
      5. 在遍歷完所有文件之后,設置wait為NULL,並檢查是否有滿足條件的文件(retval值是否為0),或者是否超時,或者是否有未決信號,如果有那么直接跳出循環,進入步驟7;
      6. 否則調用poll_schedule_timeout,使進程進入睡眠,直到超時(如果未設置超時,那么是直接調用的schedule())。如果是超時后進程繼續執行,那么設置pwq->triggered為0;如果是被文件對應的驅動程序喚醒的,那么pwq->triggered被設置為1(見第2節).
      7. 最終,函數調用poll_freewait,將本進程從所有文件的等待隊列中刪掉,並刪除分配的poll_table_page對象,回收內存,並返回retval值。
    4. 拷貝res_in, res_out和res_exception到傳入的in, out, exception,並返回ret。
  3. 調用poll_select_copy_remaining,將剩余的timeout時間拷貝回用戶空間。

以上就是select的全部過程了,其實邏輯是很簡單的,非常類似於我們自己實現的簡單的select。

其中它利用了一個重要的特性,就是以NULL作為poll_table指針去調用驅動程序的poll函數。這會使得驅動不會將進程添加到等待隊列上。在上面的過程中,有兩次使用了這個特性:

  1. 如果傳入了非NULL的timeout,但超時時間為0,不會睡眠在任何文件上;
  2. 任何一個文件是立即可讀/可寫的,不會繼續睡眠在剩下的文件上。

可以看出,這種實現效率之所以低下,有以下幾方面原因:

  1. 可以同時監聽的文件數量有限,最多1024個。這對要處理幾十萬並發請求的服務器來說,太少了!
  2. 每次調用select,都需要從0bit一直遍歷到最大的fd,並且每隔32個fd還有調度一次(2次上下文切換)。試想,如果我要監聽的fd是1000,那么該是多么的慢啊!而且在有多個fd的情況下,如果小的fd一直可讀,那就會導致大的fd一直不會被監聽。
  3. 內存復制開銷。需要在用戶空間和內核空間來回拷貝fd_set,並且要為每個fd分配一個poll_table_entry對象。

4 總結

select雖然效率低下,且基本上不會再被用於大型項目中。但是其內核實現相比較epoll則更為簡單,便於理解。

在此基礎上,我們可以進一步地去研究epoll的實現,兩相比較,更好地理解,為什么epoll會更加高效。這也是我下一步要研究的東西。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM