I/O多路復用之select


1、什么是I/O多路復用

       關於什么是I/O多路復用,在知乎上有個很好的回答,可以參考羅志宇前輩的回答

  這里記錄一下自己的理解。我認為要理解這個術語得從兩方面去出發,一是:多路是個什么概念?二是:復用的什么東西?先說第一個問題。多路指的是多條獨立的i/o流,i/o流可以這么理解:讀是一條流(稱之為讀流,比如輸入流),寫是一條流(稱之為寫流,比如輸出流),異常也是一條流(稱之為異常流),每條流用一個文件描述符來表示,同一個文件描述符可以同時表示讀流和寫流。再來看第二個方面,復用的是什么東西?復用的是線程,復用線程來跟蹤每路io的狀態,然后用一個線程就可以處理所有的io。

       當然,不提什么I/O多路復用也能在一個線程就處理完所有的io流,用個while循環挨個處理一次不就解決了嘛?那為什么還要提出這個技術呢?原因就是剛才我們想的方法(輪詢)效率太低了,資源利用率也不高。試想一下,如果某個io被設置成了阻塞io,那么其他的io將被卡死,也就浪費掉了其他的io資源。另一方面,假設所有io被設置成非阻塞,那cpu一天到晚也不用干別的事了,就在這不停的問,現在可以進行io操作了嗎,直到有一個設備准備好環境才能進行io,也就是在設備准備io環境的這一段時間,cpu是沒必要瞎問的,問了也沒結果。

       隨后硬件發展起來了,有了多核的概念,也就有了多線程。這個時候可以這樣做,來一條io我開一個線程,這樣的話再也不用輪詢了。然而,管理線程是要耗費系統資源的,程序員也開始頭疼了,線程之間的交互是十分麻煩的。這樣一來程序的復雜性蹭蹭蹭地往上漲,io效率是可能提高了,但是軟件的開發效率卻可能減低了。

       所以也就有了I/O多路復用這一技術。簡單來說,就是一個線程追蹤多條io流(讀,寫,異常),但不使用輪詢,而是由設備本身告知程序哪條流可用了,這樣一來就解放了cpu,也充分利用io資源,下文主要講解如何實現這一技術,linux下這一技術有三個實現,select,poll,epoll。今天主要記錄自己對select的理解,從接口到原理再到實現。

2、select接口

#include <sys/select.h>

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

  • readfds,讀流集合,也就是程序員希望從這些描述符中讀內容
  • writefds,寫流集合,也就是程序員希望向這些描述符中寫內容
  • exceptfds,異常流集合,也就是中間過程發送了異常
  • nfds,上面三種事件中,最大的文件描述符+1
  • timeout,程序員的容忍度,可等待的時間

struct timeval{

  long tv_sec;//second

  long tv_usec;//minisecond

}

 timeout有三種取值:

  • NULL,select一直阻塞,知道readfds、writefds、exceptfds集合中至少一個文件描述符可用才喚醒
  • 0,select不阻塞
  • timeout_value,select在timeout_value這個時間段內阻塞

如果非得與“多路”這個詞關聯起來,那就是readfds+writefds+exceptfds的數量和就是路數。

另外,還有一組與fd_set 有關的操作

  • FD_SET(fd, _fdset),把fd加入_fdset集合中
  • FD_CLR(fd, _fdset),把fd從_fdset集合中清除
  • FD_ISSET(fd, _fdset),判定fd是否在_fdset集合中
  • FD_ZERO(_fdset),清除_fdset有描述符

3、select實現原理

  select的實現依賴於設備的驅動函數poll,poll的功能是檢查設備的哪條條流可用(一個設備一般有三條流,讀流,寫流,設備發生異常的異常流),如果其中一條流可用,返回一個mask(表示可用的流),如果不可用,把當前進程加入設備的流等待隊列中,例如讀等待隊列、寫等待隊列,並返回資源不可用。

  select正是利用了poll的這個功能,首先讓程序員告知自己關心哪些io流(用文件描述符表示,也就是上文的readfds、writefds和exceptfds),並讓程序員告知自己這些流的范圍(也就是上文的nfds參數)以及程序的容忍度(timeout參數),然后select會把她們拷貝到內核,在內核中逐個調用流所對應的設備的驅動poll函數,當范圍內的所有流也就是描述符都遍歷完之后,他會檢查是否至少有一個流發生了,如果有,就修改那三個流集合,把她們清空,然后把發生的流加入到相應的集合中,並且select返回。如果沒有,就睡眠,讓出cpu,直到某個設備的某條流可用,就去喚醒阻塞在流上的進程,這個時候,調用select的進程重新開始遍歷范圍內的所有描述符。

  直接看這個步驟可能會好理解些

  • 1、拷貝nfds、readfds、writefds和exceptfds到內核
  • 2、遍歷[0,nfds)范圍內的每個流,調用流所對應的設備的驅動poll函數
  • 3、檢查是否有流發生,如果有發生,把流設置對應的類別,並執行4,如果沒有流發生,執行5。或者timeout=0,執行4
  • 4、select返回
  • 5、select阻塞當前進程,等待被流對應的設備喚醒,當被喚醒時,執行2。或者timeout到期,執行4

  然后補充一副select在內核中的流程圖

      

4、select實現

select的核心實現是do_select,所以下面看一下do_select的源碼,非完整源碼,只保留了關鍵部分

int do_select(int n, fd_set_bits *fds, s64 *timeout)
{
         retval = 0;        //retval用於保存已經准備好的描述符數,初始為0
         for (;;) {
                   unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
                   long __timeout;
                   set_current_state(TASK_INTERRUPTIBLE);    //將當前進程狀態改為TASK_INTERRUPTIBLE,可中斷
                   inp = fds->in; outp = fds->out; exp = fds->ex;
                   rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
 
                   for (i = 0; i < n; ++rinp, ++routp, ++rexp) { //遍歷每個描述符
                            unsigned long in, out, ex, all_bits, bit = 1, mask, j;
                            unsigned long res_in = 0, res_out = 0, res_ex = 0;
                            const struct file_operations *f_op = NULL;
                            struct file *file = NULL;
 
                            in = *inp++; out = *outp++; ex = *exp++;
                            all_bits = in | out | ex;
                            if (all_bits == 0) {
                                     i += __NFDBITS;       //all_bits的類型是unsigned long int ,大小為4個字節32位,all_bits=0,說明連續32個描述符(流)不在readdfs、writedfs、execptdfs集合中,所以i+=32,而__NFDBITS=32。
                                     continue;
                            }
 
                            for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) {     //遍歷每個長字里的每個位
                                     int fput_needed;
                                     if (i >= n)
                                               break;
                                     if (!(bit & all_bits))
                                               continue;
                                     file = fget_light(i, &fput_needed);
                                     if (file) {
                                               f_op = file->f_op;
                                               MARK(fs_select, "%d %lld", i, (long long)*timeout);
                                               mask = DEFAULT_POLLMASK;
                                               if (f_op && f_op->poll)
                                                        mask = (*f_op->poll)(file, retval ? NULL : wait);//調用設備的驅動poll函數
                                               fput_light(file, fput_needed);
                                               if ((mask & POLLIN_SET) && (in & bit)) {
                                                        res_in |= bit; //如果是這個描述符可讀, 將這個位置位
                                                        retval++;  //返回描述符個數加1
                                               }
                                               if ((mask & POLLOUT_SET) && (out & bit)) {
                                                        res_out |= bit;
                                                        retval++;
                                               }
                                               if ((mask & POLLEX_SET) && (ex & bit)) {
                                                        res_ex |= bit;
                                                        retval++;
                                               }
                                     }
                            }
                            if (res_in)
                                     *rinp = res_in;
                            if (res_out)
                                     *routp = res_out;
                            if (res_ex)
                                     *rexp = res_ex;
                   }
                   wait = NULL;
                   if (retval || !*timeout || signal_pending(current))//如果retval!=0,也就是有readdfs、writedfs、execptdfs至少有一個發生,跳出循環
                            break;
                   /*以下處理timeout參數*/
            __timeout = schedule_timeout(__timeout);
                   if (*timeout >= 0)
                            *timeout += __timeout;
         }
         __set_current_state(TASK_RUNNING);
     return retval;
}

 

參考資料:

Select函數實現原理分析 
等待隊列(二)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM