【深入淺出Linux網絡編程】 “基礎 -- 事件觸發機制”


回顧一下“"開篇 -- 知其然,知其所以然"”中的兩段代碼,第一段雖然只使用1個線程但卻也只能處理一個socket,第二段雖然能處理成百上千個socket但卻需要創建同等數量的線程,分開來看都不完美,如果1個線程能夠處理成百上千個socket就太好了!

 

問題在於,當前的實現中1個線程只能阻塞的recv等待網絡數據的到來,recv在數據到來之前會掛起並讓出cpu直到數據到來后才能繼續執行,在此之前cpu是空閑的,並且你也無法獲得cpu使用權。

 

如果可以趁着這個socket數據沒到來之前先處理其他socket而不是苦苦等待一個socket,那一個線程是不是就可以處理多個socket的呢?答案是肯定的。

 

通過設置socket為非阻塞模式(O_NONBLOCK),我們在調用recv的時候就不會因為沒有數據而掛起了,recv會立即返回並在沒有數據的情況下設置errno=EWOULDBLOCK,通過檢查返回值和errno,我們便可以獲知recv發生了什么。

 

【初窺門徑】在這個前提下,我們如何在1個線程中同時管理多個socket呢?沒那么復雜,我們只需要寫一個while(1)死循環,不停的遍歷所有的socket,對每個socket調用非阻塞的recv嘗試讀取一段數據進行處理,並通過send返回應答即可,大致代碼如下:

 

int main()
{
    ...
    fcntl(listen_fd, O_NONBLOCK...); /* 設置非阻塞 */
    listen(listen_fd); /* 監聽套接字*/

    int fd_array[10000] = {0}; /*fd下標的數組*/
    while (1) {
sleep(1); // 睡眠1秒, 避免cpu負載過高 new_fd = accept(listen_fd); /* 嘗試accept一個新socket*/ if (new_fd >= 0) { ... ... fcntl(new_fd, O_NONBLOCK...); fd_array[new_fd]= new_fd; ... } foreach(fd in fd_array) { if (fd == 0) continue; int n = recv(fd, request); /* 嘗試從socket recv一段數據*/ if (n > 0) { ... /* 處理request */ send(fd, response); } if (fd has error) { close(fd); fd_array[fd] = 0; } } } }

 

首先啟動了監聽套接字,並設置了非阻塞,然后進入while(1)死循環。在每次循環頭部首先調用accept嘗試獲取一個socket,由於非阻塞的原因,如果沒有新連接會立即返回-1,否則設置新socket為非阻塞並放入fd_array數組中記錄。接着,由於你不知道哪些socket有數據,於是只能遍歷所有曾經accept獲得到socket,調用非阻塞recv嘗試讀取數據,如果的確讀到了數據則處理並send返回應答,如果socket發生了錯誤則關閉socket。

 

這段代碼成功的實現了1個線程處理多個socket的目標,是完全可行的,但並不完美。你可以回顧一下代碼,其中的while(1)死循環將導致這個線程毫不停歇的對socket一遍又一遍的輪詢,無論socket是否真的有數據到來,這樣簡單粗暴的實現會讓程序總是100%cpu滿負載運轉,造成不必要的資源浪費(假設機器只有1顆cpu,還有一堆進程等待cpu調度,勢必會對其他進程造成極大的影響)。

 

我們還是思考怎么解決這個現狀,切忌天馬行空。既然while(1)忙輪詢造成cpu負載高,那是否可以在while(1)頭部sleep一會呢,當然可以通過sleep讓出cpu給其他進程使用,但如果sleep太久導致socket數據不被及時處理也會是一個大問題,所以還必須保證sleep掛起的時間足夠短,索性就sleep 1毫秒,問題差不多就解決了。

 

講到這里,總算拋足了磚頭該看看玉了。回顧一下我們一步一步改進的過程,總算到了這個節骨眼上,貌似基於手頭上的工具很難再有所改進了。其實,linux內核開發者也注意到了這一點,為了解決這個切實的問題在內核中實現了一系列的api,目的就是避免我們忙輪詢所有socket,轉而由內核主動通知哪些socket有數據可讀,我們在編碼時就不必為遍歷socket和sleep多少秒糾結了,新的api會sleep直到某些socket有數據可讀才返回,並且直接告訴我們具體是哪些socket可讀從而避免了遍歷所有socket。

 

為了避免誤導,這里提示一下:上述只提到了非阻塞模式下的recv操作,沒有提到send。實際上,阻塞模式下的socket調用send同樣會阻塞,這是由於TCP協議棧滑動窗口已滿造成的,可以簡單理解為數據擁塞的情況下導致send同樣阻塞。在非阻塞模式下,調用socket的send會因為數據擁塞而返回失敗,errno同樣為EWOULDBLOCK,數據沒發送出去只能不停的重試去send,和輪詢recv的道理是類似的。為了避免引入太多閱讀理解負擔,所以在這里理解到這個程度已經完全足夠了。

 


 

了解了背景,接下來直奔主題,看看新的api怎么用,怎么結合到之前的代碼中。這里有個背景需要介紹一下,linux內核在實現這個功能的時候也並不是一步就做到了今天的樣子,它至少經歷了select,poll 兩個版本的API實現后,才有了今天廣泛使用的API:kqueue(freebsd), epoll(linux)。由於我們主要接觸的都是linux系統,並且兩者從原理上大同小異,所以對freebsd上的kqueue不做介紹,而對於select和poll兩個版本的實現由於已經基本沒有實用價值,所以暫時不做介紹,有興趣可以在看完epoll之后搜索引擎簡單了解一下。

 

【春暖花開】我們馬上看一段epoll的使用片段(通過man epoll你可以在manpage里看到epoll這段代碼),並與我們上面的非阻塞版本代碼進行比較,看清兩份代碼實現之間的差異,然后逐個介紹其中涉及的API:

       struct epoll_event ev, *events;

       for(;;) { // 相當於我們的while(1)
           nfds = epoll_wait(kdpfd, events, maxevents, -1); // 相當於我們的sleep(1)

           for(n = 0; n < nfds; ++n) { // 相當於我們的for遍歷所有socket
               if(events[n].data.fd == listener) {   // 相當於我們嘗試accept新連接
                   client = accept(listener, (struct sockaddr *) &local,
                                   &addrlen);
                   if(client < 0){
                       perror("accept");
                       continue;
                   }
                   setnonblocking(client);
                   ev.events = EPOLLIN | EPOLLET;
                   ev.data.fd = client;
                   if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev) < 0) {
                       fprintf(stderr, "epoll set insertion error: fd=%d0,
                               client);
                       return -1;
                   }
               }
               else
                   do_use_fd(events[n].data.fd);  // 相當於我們recv,處理,send一個socket
           }
       }

 

可以看一下代碼中的注釋, 比對我們實現的非阻塞忙輪詢版本代碼, 會發現代碼邏輯基本能夠一一對應,一方面要accept新的socket,一方面要處理已有socket的讀與寫。為了學習epoll,我們需要關注差異在哪里,以及差異帶來了什么好處,解決了什么問題。

 

首先笨拙的sleep被換成了epoll_wait,它的第1參數kdpfd是epoll的句柄(epoll_create創建),這個句柄中此前被注冊了希望被epoll管理的socket(epoll_ctl注冊)。當epoll_wait被調用后,會檢查注冊其上的socket是否有數據到來或者是否有剩余空間發送數據,如果都沒有則會掛起,就像sleep一樣睡眠,但與sleep的最大區別在於sleep多久是我們拍腦袋指定一個很小的數值,而epoll_wait會在任意socket可讀或者可寫的時候返回,這是由內核檢測注冊其上的socket並在滿足條件時喚醒epoll_wait返回的,這就解決了sleep少則cpu繁忙sleep多則增加socket處理延遲的麻煩問題。

 

epoll_wait的第2,3個參數分別指定了一個struct epoll_event數組events和數組的大小maxevents,這是一個in/out參數,也就是epoll_wait在返回前會對數組內容進行賦值,其中記錄的是發生了可讀或者可寫或者錯誤事件的socket以及具體發生的事件類型。這里的新名詞”可讀事件"表示有數據到來,"可寫事件"表示內核緩沖區有剩余發送空間,“錯誤事件“表示socket發生了一些網絡錯誤。既然epoll_wait在返回時把發生讀寫事件的socket寫到了數組里,我們還需要遍歷所有socket嗎?當然不必了!借助epoll_wait,我們不必在那些沒有任何動靜的socket上做無用的recv和send嘗試,只要是epoll_wait記錄在數組里的socket一定是發生了特定的事件,這又幫我們解決了一個大麻煩。

 

for(n = 0; n < nfds; ++n) 遍歷struct epoll_event數組,對於listener這個監聽socket,調用accept得到新連接,並通過調用epoll_ctl注冊到epoll句柄上以便之后的epoll_wait可以檢測該socket的讀寫事件,對於非監聽socket則調用do_use_fd函數去讀寫與請求處理,這里manpage並沒有給出什么實際的代碼,因為那些與epoll已經沒有必然聯系了。

 

現在你對epoll應該有了一個差不多的認識,但涉及到的結構體和API還沒有詳細的去看參數與返回值,使用上要注意什么也沒有涉及。 在詳細學習API之前,首先記住一點概念,epoll監聽的是fd(文件描述符)的讀,寫,錯誤事件,與socket或者說tcp socket還是udp socket沒有必然聯系,epoll負責的僅僅是”事件觸發“,正合本篇博客標題。

 

1,創建一個epoll句柄:

int epoll_create(int size)

這里的size參數意義為epoll管理的fd個數的一個建議值,簡單說就是預分配多少個fd的管理空間,如果不足會擴容,所以稱為建議值,一般填個1000,10000的都無所謂。

 

2,向epoll句柄注冊,刪除,修改socket:

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
            
typedef union epoll_data {
                 void *ptr;
                 int fd;
                 __uint32_t u32;
                 __uint64_t u64;
} epoll_data_t;

struct epoll_event {
                 __uint32_t events;  /* Epoll events */
                 epoll_data_t data;  /* User data variable */
};

EPOLL_CTL_ADD // 注冊fd
EPOLL_CTL_MOD // 修改fd
EPOLL_CTL_DEL // 刪除fd

這個函數有3種功能,一個是注冊(EPOLL_CTL_ADD)fd到epoll,一個是從epoll刪除fd(EPOLL_CTL_DEL),一個是向epoll修改一個已注冊的fd(EPOLL_CTL_MOD)。

第1個參數epfd是epoll句柄,第二個參數op是指上述3個操作類型之一,第三個參數是一個結構體,epoll_event的第一個成員events表示希望epoll監測fd的什么事件,常用包含:EPOLLIN(可讀),EPOLLOUT(可寫),EPOLLERR(錯誤),EPOLLHUP(也是錯誤),你可以通過位或的方式同時包含多個事件。data是一個union,你可以使用其中的一個字段記錄一些信息,也就是一個用戶參數,在epoll_wait返回的epoll_event數組中會返回給調用者使用。

 

3,檢測fd事件並返回相關信息:

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)

 epfd是epoll句柄(epoll_create),events是用戶分配的數組,maxevents是數組的大小,timeout表示多少毫秒沒有任何socket發生事件則超時返回的時間,-1表示不超時,函數返回在events數組里填充了多少個fd,於是我們就可以訪問events數組里特定數量的fd進行處理了。

 

4,關閉epoll句柄:

int close(int fd);

也就是關閉epoll_create的返回值,注冊其上的fd不會被關閉,僅僅是從這個句柄上取消了注冊。

 

我們使用epoll完成事件觸發所需要做的所有操作都是依靠上述4個接口而已,在上面的代碼示例里也對其使用方法和時機有大概的了解了。參考manpage,你應該有能力實現一個用epoll監聽fd=0(終端標准輸入)的程序,並將讀到的文本回顯到終端上的小demo,如果你感興趣可以在這里停下自己去探索一下再回來。

 

再次回顧一下上述manpage里的示例代碼,在for循環遍歷epoll_wait返回的fd數組的時候有一處if else的判定,對於fd=listener則調用了accept相關的邏輯,對於其他的則調用了do_use_fd的邏輯,也就是用戶使用epoll的時候必須對epoll_wait返回的fd屬於什么應用邏輯進行區分對待,從代碼來看會令代碼比較冗長,缺乏共性的提取和問題的抽象,用起來並不方便。代碼里為了像epoll注冊一個fd,需要對struct epoll_event結構各字段賦值,然后調用epoll_ctl,顯得過於繁瑣。

 

為了方便自己使用,我們考慮對epoll進行一定程度的封裝和抽象,對接口進行簡化,對過程進行抽象,對細節進行隱藏,讓epoll用起來輕松愉快~ 

 

在開始前,首先使用git把代碼拉取下來,我的代碼上傳在code.csdn.net,你可以通過如下命令獲取代碼:

git clone git://code.csdn.net/qq120848369/simple_io.git

這是我在這系列博客前不久開發的一個小項目,名字叫做simple_io,顧名思義就是代碼實現簡單,使用簡單,並且閱讀簡單。

在“基礎 -- 事件觸發機制”章節里,我們只研究epoll自身,學習如何使用它的各個API,以及真實項目實踐中是如何抽象與封裝epoll的,即只需要關注sio.h和sio.c兩個文件即可,它們對epoll進行了封裝與抽象,通過閱讀sio.c你可以完全掌握epoll用法與機制,我只會講解sio實現了什么(接口),為什么實現,但至於如何實現則需要讀者自己對照閱讀代碼,代碼並不長,只有200行。

 

1,創建與釋放epoll句柄:

struct sio *sio_new();
void sio_free(struct sio *sio);

2,向epoll注冊一個fd,並提供一個事件回調函數以及用戶參數,返回一個fd句柄:

struct sio_fd *sio_add(struct sio *sio, int fd, sio_callback_t callback, void *arg);

3,修改一個fd句柄的事件回調函數與用戶參數:

void sio_set(struct sio *sio, struct sio_fd *sfd, sio_callback_t callback, void *arg);

4,向epoll取消一個fd的注冊:

void sio_del(struct sio *sio, struct sio_fd *sfd);

5,向epoll注冊fd的EPOLLOUT事件:

void sio_watch_write(struct sio *sio, struct sio_fd *sfd);

6,向epoll取消注冊fd的EPOLLOUT事件:

void sio_unwatch_write(struct sio *sio, struct sio_fd *sfd);

7,向epoll注冊fd的EPOLLIN事件:

void sio_watch_read(struct sio *sio, struct sio_fd *sfd);

8,向epoll取消注冊fd的EPOLLIN事件:

void sio_unwatch_read(struct sio *sio, struct sio_fd *sfd);

9,調用epoll_wait並處理fd事件,通過回調函數通知用戶:

void sio_run(struct sio *sio, int timeout_ms);

10,立即喚醒epoll_wait,令其返回(暫時不需要理解這個接口):

void sio_wakeup(struct sio *sio);

11,啟動定時器,提供超時回調和用戶參數:

void sio_start_timer(struct sio *sio, struct sio_timer *timer, uint64_t timeout_ms, sio_timer_callback_t callback, void *arg);

12,停止定時器:

void sio_start_timer(struct sio *sio, struct sio_timer *timer, uint64_t timeout_ms, sio_timer_callback_t callback, void *arg);

 

為了學習epoll自身,閱讀時暫時忽略定時器的2個接口,忽略喚醒wakeup接口,剩余接口均是對epoll的4個API的抽象與封裝,逐個函數閱讀理解,有任何疑惑可以留言(我會受到郵件通知),我會第一時間回復。

 

為了便於理解,這里需要為sio.c設計上的東西做一些基本的解釋,先去讀代碼,如果你閱讀的過程中遇到了障礙,再回來查看,切忌不要直接讀下面的內容。

 

1,struct sio里的is_in_loop和deferred_to_close是什么用途?

答:這是為了sio_del接口設計的,當你在sio_run函數之外調用sio_del時,epoll_ctl可以通過EPOLL_CTL_DEL立即從epoll句柄上移除fd,下一次sio_run(epoll_wait調用)就不會檢測到這個fd的任何事件了。 首先注意我是將struct sio_fd注冊到了struct epoll_event的data字段,考慮在epoll_wait返回之后,sio_run結束之前對各個fd處理的過程中調用sio_del是否會有特殊問題?

 

這里的問題就是,假如你在fd=1的回調函數中sio_del了fd=2,並且本次epoll_wait也檢測到了fd=2的事件並已經填充到了struct epoll_event  poll_events[64]中,那么接下來處理fd=2的時候就會非法內存操作,因為在fd=1中已經sio_del釋放了fd=2的struct sio_fd內存。

 

這是一個非常常見的網絡庫bug,有的網絡庫為了避免這種問題選擇將fd注冊到struct epoll_event的data字段,並創建一個fd索引數組fd_array,通過設置fd_array[fd]=NULL標示已關閉從而避免非法操作,但仍然是有bug的,因為極有可能在sio_del后又創建了新fd並且fd=2然后sio_add注冊到epoll並令fd_array[fd]!=NULL,這將導致接下來處理fd=2的事件時發生”串門“,也就是說現在的fd=2早已不是epoll_wait時的fd=2,又一個悲劇的bug。

 

2,sio_new里忽略SIGPIPE信號是為什么?

答:摘自man 3p write:EPIPE A write was attempted on a socket that is shut down for writing, or is no longer connected. In the latter case, if the socket is of type,SOCK_STREAM, the SIGPIPE signal is generated to the calling process.

也就是說,sio.c極有可能被用於開發socket程序,那么就有可能會觸發SIGPIPE信號,而該信號默認通常是結束程序,所以需要為用戶干掉這個信號。 那么這個信號到底發生於什么情況下呢?首先是TCP連接,其次是當本端收到RST信號的時候,當對端向本端發送了FIN握手之后,如果本端繼續向對端發送數據,對端就可能返回RST包,造成SIGPIPE信號的觸發。

 

3,為什么不檢測malloc和realloc的返回值,而要檢測epoll_ctl(EPOLL_CTL_ADD)的返回值,返回值檢查的原則是什么?

答:純屬個人習慣,如果內存都不足了,程序崩潰也沒什么不可接受的,況且有swap分區的情況下,malloc失敗的可能性幾乎為0,所以我總是假設內存分配不會失敗,這樣就不必寫大量的返回值檢測了,而且真的沒必要檢測。  而對於向epoll ADD一個fd的調用,是真真切切會失敗的,因為一個epoll能夠容納的fd個數是可以通過系統參數配置的,所以我不會假設它成功。

我在返回值檢測方面的原則:來自不可信用戶的外部輸入需要嚴格檢查,比如網絡請求。其他輸入不做檢查,比如sio.c里的接口絕對不會校驗你傳入的參數是不是NULL,因為使用者應該有能力明確的保證使用是正確的,這個職責不在於sio自身。 而對於系統API來說,我只檢查的確會失敗的,不會檢查永遠不會失敗的,這一點還是需要靠manpage和經驗。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM