1、什么是I/O多路復用
關於什么是I/O多路復用,在知乎上有個很好的回答,可以參考羅志宇前輩的回答。
這里記錄一下自己的理解。我認為要理解這個術語得從兩方面去出發,一是:多路是個什么概念?二是:復用的什么東西?先說第一個問題。多路指的是多條獨立的i/o流,i/o流可以這么理解:讀是一條流(稱之為讀流,比如輸入流),寫是一條流(稱之為寫流,比如輸出流),異常也是一條流(稱之為異常流),每條流用一個文件描述符來表示,同一個文件描述符可以同時表示讀流和寫流。再來看第二個方面,復用的是什么東西?復用的是線程,復用線程來跟蹤每路io的狀態,然后用一個線程就可以處理所有的io。
當然,不提什么I/O多路復用也能在一個線程就處理完所有的io流,用個while循環挨個處理一次不就解決了嘛?那為什么還要提出這個技術呢?原因就是剛才我們想的方法(輪詢)效率太低了,資源利用率也不高。試想一下,如果某個io被設置成了阻塞io,那么其他的io將被卡死,也就浪費掉了其他的io資源。另一方面,假設所有io被設置成非阻塞,那cpu一天到晚也不用干別的事了,就在這不停的問,現在可以進行io操作了嗎,直到有一個設備准備好環境才能進行io,也就是在設備准備io環境的這一段時間,cpu是沒必要瞎問的,問了也沒結果。
隨后硬件發展起來了,有了多核的概念,也就有了多線程。這個時候可以這樣做,來一條io我開一個線程,這樣的話再也不用輪詢了。然而,管理線程是要耗費系統資源的,程序員也開始頭疼了,線程之間的交互是十分麻煩的。這樣一來程序的復雜性蹭蹭蹭地往上漲,io效率是可能提高了,但是軟件的開發效率卻可能減低了。
所以也就有了I/O多路復用這一技術。簡單來說,就是一個線程追蹤多條io流(讀,寫,異常),但不使用輪詢,而是由設備本身告知程序哪條流可用了,這樣一來就解放了cpu,也充分利用io資源,下文主要講解如何實現這一技術,linux下這一技術有三個實現,select,poll,epoll。今天主要記錄自己對select的理解,從接口到原理再到實現。
2、select接口
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
- readfds,讀流集合,也就是程序員希望從這些描述符中讀內容
- writefds,寫流集合,也就是程序員希望向這些描述符中寫內容
- exceptfds,異常流集合,也就是中間過程發送了異常
- nfds,上面三種事件中,最大的文件描述符+1
- timeout,程序員的容忍度,可等待的時間
struct timeval{
long tv_sec;//second
long tv_usec;//minisecond
}
timeout有三種取值:
- NULL,select一直阻塞,知道readfds、writefds、exceptfds集合中至少一個文件描述符可用才喚醒
- 0,select不阻塞
- timeout_value,select在timeout_value這個時間段內阻塞
如果非得與“多路”這個詞關聯起來,那就是readfds+writefds+exceptfds的數量和就是路數。
另外,還有一組與fd_set 有關的操作
- FD_SET(fd, _fdset),把fd加入_fdset集合中
- FD_CLR(fd, _fdset),把fd從_fdset集合中清除
- FD_ISSET(fd, _fdset),判定fd是否在_fdset集合中
- FD_ZERO(_fdset),清除_fdset有描述符
3、select實現原理
select的實現依賴於設備的驅動函數poll,poll的功能是檢查設備的哪條條流可用(一個設備一般有三條流,讀流,寫流,設備發生異常的異常流),如果其中一條流可用,返回一個mask(表示可用的流),如果不可用,把當前進程加入設備的流等待隊列中,例如讀等待隊列、寫等待隊列,並返回資源不可用。
select正是利用了poll的這個功能,首先讓程序員告知自己關心哪些io流(用文件描述符表示,也就是上文的readfds、writefds和exceptfds),並讓程序員告知自己這些流的范圍(也就是上文的nfds參數)以及程序的容忍度(timeout參數),然后select會把她們拷貝到內核,在內核中逐個調用流所對應的設備的驅動poll函數,當范圍內的所有流也就是描述符都遍歷完之后,他會檢查是否至少有一個流發生了,如果有,就修改那三個流集合,把她們清空,然后把發生的流加入到相應的集合中,並且select返回。如果沒有,就睡眠,讓出cpu,直到某個設備的某條流可用,就去喚醒阻塞在流上的進程,這個時候,調用select的進程重新開始遍歷范圍內的所有描述符。
直接看這個步驟可能會好理解些
- 1、拷貝nfds、readfds、writefds和exceptfds到內核
- 2、遍歷[0,nfds)范圍內的每個流,調用流所對應的設備的驅動poll函數
- 3、檢查是否有流發生,如果有發生,把流設置對應的類別,並執行4,如果沒有流發生,執行5。或者timeout=0,執行4
- 4、select返回
- 5、select阻塞當前進程,等待被流對應的設備喚醒,當被喚醒時,執行2。或者timeout到期,執行4
然后補充一副select在內核中的流程圖

4、select實現
select的核心實現是do_select,所以下面看一下do_select的源碼,非完整源碼,只保留了關鍵部分
int do_select(int n, fd_set_bits *fds, s64 *timeout) { retval = 0; //retval用於保存已經准備好的描述符數,初始為0 for (;;) { unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp; long __timeout; set_current_state(TASK_INTERRUPTIBLE); //將當前進程狀態改為TASK_INTERRUPTIBLE,可中斷 inp = fds->in; outp = fds->out; exp = fds->ex; rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex; for (i = 0; i < n; ++rinp, ++routp, ++rexp) { //遍歷每個描述符 unsigned long in, out, ex, all_bits, bit = 1, mask, j; unsigned long res_in = 0, res_out = 0, res_ex = 0; const struct file_operations *f_op = NULL; struct file *file = NULL; in = *inp++; out = *outp++; ex = *exp++; all_bits = in | out | ex; if (all_bits == 0) { i += __NFDBITS; //all_bits的類型是unsigned long int ,大小為4個字節32位,all_bits=0,說明連續32個描述符(流)不在readdfs、writedfs、execptdfs集合中,所以i+=32,而__NFDBITS=32。 continue; } for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) { //遍歷每個長字里的每個位 int fput_needed; if (i >= n) break; if (!(bit & all_bits)) continue; file = fget_light(i, &fput_needed); if (file) { f_op = file->f_op; MARK(fs_select, "%d %lld", i, (long long)*timeout); mask = DEFAULT_POLLMASK; if (f_op && f_op->poll) mask = (*f_op->poll)(file, retval ? NULL : wait);//調用設備的驅動poll函數 fput_light(file, fput_needed); if ((mask & POLLIN_SET) && (in & bit)) { res_in |= bit; //如果是這個描述符可讀, 將這個位置位 retval++; //返回描述符個數加1 } if ((mask & POLLOUT_SET) && (out & bit)) { res_out |= bit; retval++; } if ((mask & POLLEX_SET) && (ex & bit)) { res_ex |= bit; retval++; } } } if (res_in) *rinp = res_in; if (res_out) *routp = res_out; if (res_ex) *rexp = res_ex; } wait = NULL; if (retval || !*timeout || signal_pending(current))//如果retval!=0,也就是有readdfs、writedfs、execptdfs至少有一個發生,跳出循環 break; /*以下處理timeout參數*/ __timeout = schedule_timeout(__timeout); if (*timeout >= 0) *timeout += __timeout; } __set_current_state(TASK_RUNNING); return retval; }
參考資料:
