事件驅動模型
上節的問題:
協程:遇到IO操作就切換。
但什么時候切回去呢?怎么確定IO操作完了?

很多程序員可能會考慮使用“線程池”或“連接池”。“線程池”旨在減少創建和銷毀線程的頻率,其維持一定合理數量的線程,並讓空閑的線程重新承擔新的執行任務。“連接池”維持連接的緩存池,盡量重用已有的連接、減少創建和關閉連接的頻率。
這兩種技術都可以很好的降低系統開銷,都被廣泛應用很多大型系統,如websphere、tomcat和各種數據庫等。但是,“線程池”和“連接池”技術也只是在一定程度上緩解了頻繁調用IO接口帶來的資源占用。而且,所謂“池”始終有其上限,當請求大大超過上限時,“池”構成的系統對外界的響應並不比沒有池的時候效果好多少。所以使用“池”必須考慮其面臨的響應規模,並根據響應規模調整“池”的大小。
對應上例中的所面臨的可能同時出現的上千甚至上萬次的客戶端請求,“線程池”或“連接池”或許可以緩解部分壓力,但是不能解決所有問題。總之,多線程模型可以方便高效的解決小規模的服務請求,但面對大規模的服務請求,多線程模型也會遇到瓶頸,可以用非阻塞接口來嘗試解決這個問題
傳統的編程是如下線性模式的:
開始--->代碼塊A--->代碼塊B--->代碼塊C--->代碼塊D--->......--->結束
每一個代碼塊里是完成各種各樣事情的代碼,但編程者知道代碼塊A,B,C,D...的執行順序,唯一能夠改變這個流程的是數據。輸入不同的數據,根據條件語句判斷,流程或許就改為A--->C--->E...--->結束。每一次程序運行順序或許都不同,但它的控制流程是由輸入數據和你編寫的程序決定的。如果你知道這個程序當前的運行狀態(包括輸入數據和程序本身),那你就知道接下來甚至一直到結束它的運行流程。
對於事件驅動型程序模型,它的流程大致如下:
開始--->初始化--->等待
與上面傳統編程模式不同,事件驅動程序在啟動之后,就在那等待,等待什么呢?等待被事件觸發。傳統編程下也有“等待”的時候,比如在代碼塊D中,你定義了一個input(),需要用戶輸入數據。但這與下面的等待不同,傳統編程的“等待”,比如input(),你作為程序編寫者是知道或者強制用戶輸入某個東西的,或許是數字,或許是文件名稱,如果用戶輸入錯誤,你還需要提醒他,並請他重新輸入。事件驅動程序的等待則是完全不知道,也不強制用戶輸入或者干什么。只要某一事件發生,那程序就會做出相應的“反應”。這些事件包括:輸入信息、鼠標、敲擊鍵盤上某個鍵還有系統內部定時器觸發。
一、事件驅動模型介紹
通常,我們寫服務器處理模型的程序時,有以下幾種模型:
(1)每收到一個請求,創建一個新的進程,來處理該請求; (2)每收到一個請求,創建一個新的線程,來處理該請求; (3)每收到一個請求,放入一個事件列表,讓主進程通過非阻塞I/O方式來處理請求
第三種就是協程、事件驅動的方式,一般普遍認為第(3)種方式是大多數網絡服務器采用的方式
論事件驅動模型

<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <p onclick="fun()">點我呀</p> <script type="text/javascript"> function fun() { alert('約嗎?') } </script> </body> </html>
在UI編程中,常常要對鼠標點擊進行相應,首先如何獲得鼠標點擊呢? 兩種方式:
1創建一個線程循環檢測是否有鼠標點擊
那么這個方式有以下幾個缺點:
- CPU資源浪費,可能鼠標點擊的頻率非常小,但是掃描線程還是會一直循環檢測,這會造成很多的CPU資源浪費;如果掃描鼠標點擊的接口是阻塞的呢?
- 如果是堵塞的,又會出現下面這樣的問題,如果我們不但要掃描鼠標點擊,還要掃描鍵盤是否按下,由於掃描鼠標時被堵塞了,那么可能永遠不會去掃描鍵盤;
- 如果一個循環需要掃描的設備非常多,這又會引來響應時間的問題;
所以,該方式是非常不好的。
2 就是事件驅動模型
目前大部分的UI編程都是事件驅動模型,如很多UI平台都會提供onClick()事件,這個事件就代表鼠標按下事件。事件驅動模型大體思路如下:
- 有一個事件(消息)隊列;
- 鼠標按下時,往這個隊列中增加一個點擊事件(消息);
- 有個循環,不斷從隊列取出事件,根據不同的事件,調用不同的函數,如onClick()、onKeyDown()等;
- 事件(消息)一般都各自保存各自的處理函數指針,這樣,每個消息都有獨立的處理函數;
事件驅動編程是一種編程范式,這里程序的執行流由外部事件來決定。它的特點是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程范式是(單線程)同步以及多線程編程。
讓我們用例子來比較和對比一下單線程、多線程以及事件驅動編程模型。下圖展示了隨着時間的推移,這三種模式下程序所做的工作。這個程序有3個任務需要完成,每個任務都在等待I/O操作時阻塞自身。阻塞在I/O操作上所花費的時間已經用灰色框標示出來了。
最初的問題:怎么確定IO操作完了切回去呢?通過回調函數

1.要理解事件驅動和程序,就需要與非事件驅動的程序進行比較。實際上,現代的程序大多是事件驅動的,比如多線程的程序,肯定是事件驅動的。早期則存在許多非事件驅動的程序,這樣的程序,在需要等待某個條件觸發時,會不斷地檢查這個條件,直到條件滿足,這是很浪費cpu時間的。而事件驅動的程序,則有機會釋放cpu從而進入睡眠態(注意是有機會,當然程序也可自行決定不釋放cpu),當事件觸發時被操作系統喚醒,這樣就能更加有效地使用cpu. 2.再說什么是事件驅動的程序。一個典型的事件驅動的程序,就是一個死循環,並以一個線程的形式存在,這個死循環包括兩個部分,第一個部分是按照一定的條件接收並選擇一個要處理的事件,第二個部分就是事件的處理過程。程序的執行過程就是選擇事件和處理事件,而當沒有任何事件觸發時,程序會因查詢事件隊列失敗而進入睡眠狀態,從而釋放cpu。 3.事件驅動的程序,必定會直接或者間接擁有一個事件隊列,用於存儲未能及時處理的事件。 4.事件驅動的程序的行為,完全受外部輸入的事件控制,所以,事件驅動的系統中,存在大量這種程序,並以事件作為主要的通信方式。 5.事件驅動的程序,還有一個最大的好處,就是可以按照一定的順序處理隊列中的事件,而這個順序則是由事件的觸發順序決定的,這一特性往往被用於保證某些過程的原子化。 6.目前windows,linux,nucleus,vxworks都是事件驅動的,只有一些單片機可能是非事件驅動的。
注意,事件驅動的監聽事件是由操作系統調用的cpu來完成的
IO多路復用
前面是用協程實現的IO阻塞自動切換,那么協程又是怎么實現的,在原理是是怎么實現的。如何去實現事件驅動的情況下IO的自動阻塞的切換,這個學名叫什么呢? => IO多路復用
比如socketserver,多個客戶端連接,單線程下實現並發效果,就叫多路復用。
同步IO和異步IO,阻塞IO和非阻塞IO分別是什么,到底有什么區別?不同的人在不同的上下文下給出的答案是不同的。所以先限定一下本文的上下文。
本文討論的背景是Linux環境下的network IO。
1 IO模型前戲准備
在進行解釋之前,首先要說明幾個概念:
- 用戶空間和內核空間
- 進程切換
- 進程的阻塞
- 文件描述符
- 緩存 I/O
用戶空間與內核空間
現在操作系統都是采用虛擬存儲器,那么對32位操作系統而言,它的尋址空間(虛擬存儲空間)為4G(2的32次方)。
操作系統的核心是內核,獨立於普通的應用程序,可以訪問受保護的內存空間,也有訪問底層硬件設備的所有權限。
為了保證用戶進程不能直接操作內核(kernel),保證內核的安全,操心系統將虛擬空間划分為兩部分,一部分為內核空間,一部分為用戶空間。
針對linux操作系統而言,將最高的1G字節(從虛擬地址0xC0000000到0xFFFFFFFF),供內核使用,稱為內核空間,而將較低的3G字節(從虛擬地址0x00000000到0xBFFFFFFF),供各個進程使用,稱為用戶空間。
進程切換
為了控制進程的執行,內核必須有能力掛起正在CPU上運行的進程,並恢復以前掛起的某個進程的執行。這種行為被稱為進程切換,這種切換是由操作系統來完成的。因此可以說,任何進程都是在操作系統內核的支持下運行的,是與內核緊密相關的。
從一個進程的運行轉到另一個進程上運行,這個過程中經過下面這些變化:
保存處理機上下文,包括程序計數器和其他寄存器。
更新PCB信息。
把進程的PCB移入相應的隊列,如就緒、在某事件阻塞等隊列。
選擇另一個進程執行,並更新其PCB。
更新內存管理的數據結構。
恢復處理機上下文。
注:總而言之就是很耗資源的
進程的阻塞
正在執行的進程,由於期待的某些事件未發生,如請求系統資源失敗、等待某種操作的完成、新數據尚未到達或無新工作做等,則由系統自動執行阻塞原語(Block),使自己由運行狀態變為阻塞狀態。可見,進程的阻塞是進程自身的一種主動行為,也因此只有處於運行態的進程(獲得CPU),才可能將其轉為阻塞狀態。當進程進入阻塞狀態,是不占用CPU資源的。
文件描述符fd
文件描述符(File descriptor)是計算機科學中的一個術語,是一個用於表述指向文件的引用的抽象化概念。
文件描述符在形式上是一個非負整數。實際上,它是一個索引值,指向內核為每一個進程所維護的該進程打開文件的記錄表。當程序打開一個現有文件或者創建一個新文件時,內核向進程返回一個文件描述符。在程序設計中,一些涉及底層的程序編寫往往會圍繞着文件描述符展開。但是文件描述符這一概念往往只適用於UNIX、Linux這樣的操作系統。
緩存 I/O
緩存 I/O 又被稱作標准 I/O,大多數文件系統的默認 I/O 操作都是緩存 I/O。在 Linux 的緩存 I/O 機制中,操作系統會將 I/O 的數據緩存在文件系統的頁緩存( page cache )中,也就是說,數據會先被拷貝到操作系統內核的緩沖區中,然后才會從操作系統內核的緩沖區拷貝到應用程序的地址空間。用戶空間沒法直接訪問內核空間的,內核態到用戶態的數據拷貝
思考:為什么數據一定要先到內核區,直接到用戶內存不是更直接嗎?
緩存 I/O 的缺點:
數據在傳輸過程中需要在應用程序地址空間和內核進行多次數據拷貝操作,這些數據拷貝操作所帶來的 CPU 以及內存開銷是非常大的。
同步(synchronous) IO和異步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分別是什么,到底有什么區別?這個問題其實不同的人給出的答案都可能不同,比如wiki,就認為asynchronous IO和non-blocking IO是一個東西。這其實是因為不同的人的知識背景不同,並且在討論這個問題的時候上下文(context)也不相同。所以,為了更好的回答這個問題,我先限定一下本文的上下文。
本文討論的背景是Linux環境下的network IO。
Stevens在文章中一共比較了五種IO Model:
-
-
- blocking IO
- nonblocking IO
- IO multiplexing
- signal driven IO
- asynchronous IO
-
由於signal driven IO在實際中並不常用,所以我這只提及剩下的四種IO Model。
再說一下IO發生時涉及的對象和步驟。
對於一個network IO (這里我們以read舉例),它會涉及到兩個系統對象,一個是調用這個IO的process (or thread),另一個就是系統內核(kernel)。當一個read操作發生時,它會經歷兩個階段:
1 等待數據准備 (Waiting for the data to be ready)
2 將數據從內核拷貝到進程中 (Copying the data from the kernel to the process)
記住這兩點很重要,因為這些IO Model的區別就是在兩個階段上各有不同的情況。
2 blocking IO (阻塞IO)
在linux中,默認情況下所有的socket都是blocking,一個典型的讀操作流程大概是這樣:
當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一個階段:准備數據。對於network io來說,很多時候數據在一開始還沒有到達(比如,還沒有收到一個完整的UDP包),這個時候kernel就要等待足夠的數據到來。而在用戶進程這邊,整個進程會被阻塞。當kernel一直等到數據准備好了,它就會將數據從kernel中拷貝到用戶內存,然后kernel返回結果,用戶進程才解除block的狀態,重新運行起來。
所以,blocking IO的特點就是在IO執行的兩個階段都被block了。
3 non-blocking IO(非阻塞IO)
linux下,可以通過設置socket使其變為non-blocking。當對一個non-blocking socket執行讀操作時,流程是這個樣子:
從圖中可以看出,當用戶進程發出read操作時,如果kernel中的數據還沒有准備好,那么它並不會block用戶進程,而是立刻返回一個error。從用戶進程角度講 ,它發起一個read操作后,並不需要等待,而是馬上就得到了一個結果。用戶進程判斷結果是一個error時,它就知道數據還沒有准備好,於是它可以再次發送read操作。一旦kernel中的數據准備好了,並且又再次收到了用戶進程的system call,那么它馬上就將數據拷貝到了用戶內存,然后返回。
所以,用戶進程其實是需要不斷的主動詢問kernel數據好了沒有。
注意:
在網絡IO時候,非阻塞IO也會進行recvform系統調用,檢查數據是否准備好,與阻塞IO不一樣,”非阻塞將大的整片時間的阻塞分成N多的小的阻塞, 所以進程不斷地有機會 ‘被’ CPU光顧”。即每次recvform系統調用之間,cpu的權限還在進程手中,這段時間是可以做其他事情的,
也就是說非阻塞的recvform系統調用調用之后,進程並沒有被阻塞,內核馬上返回給進程,如果數據還沒准備好,此時會返回一個error。進程在返回之后,可以干點別的事情,然后再發起recvform系統調用。重復上面的過程,循環往復的進行recvform系統調用。這個過程通常被稱之為輪詢。輪詢檢查內核數據,直到數據准備好,再拷貝數據到進程,進行數據處理。需要注意,拷貝數據整個過程,進程仍然是屬於阻塞的狀態。
4 IO multiplexing(IO多路復用)
IO multiplexing這個詞可能有點陌生,但是如果我說select,epoll,大概就都能明白了。有些地方也稱這種IO方式為event driven IO。我們都知道,select/epoll的好處就在於單個process就可以同時處理多個網絡連接的IO。它的基本原理就是select/epoll這個function會不斷的輪詢所負責的所有socket,當某個socket有數據到達了,就通知用戶進程。它的流程如圖:
當用戶進程調用了select,那么整個進程會被block,而同時,kernel會“監視”所有select負責的socket,當任何一個socket中的數據准備好了,select就會返回。這個時候用戶進程再調用read操作,將數據從kernel拷貝到用戶進程。
這個圖和blocking IO的圖其實並沒有太大的不同,事實上,還更差一些。因為這里需要使用兩個system call (select 和 recvfrom),而blocking IO只調用了一個system call (recvfrom)。但是,用select的優勢在於它可以同時處理多個connection。(多說一句。所以,如果處理的連接數不是很高的話,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優勢並不是對於單個連接能處理得更快,而是在於能處理更多的連接。)
在IO multiplexing Model中,實際中,對於每一個socket,一般都設置成為non-blocking,但是,如上圖所示,整個用戶的process其實是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。
注意1:select函數返回結果中如果有文件可讀了,那么進程就可以通過調用accept()或recv()來讓kernel將位於內核中准備到的數據copy到用戶區。
注意2: select的優勢在於可以處理多個連接,不適用於單個連接
5 Asynchronous I/O(異步IO)
linux下的asynchronous IO其實用得很少。先看一下它的流程:
用戶進程發起read操作之后,立刻就可以開始去做其它的事。而另一方面,從kernel的角度,當它受到一個asynchronous read之后,首先它會立刻返回,所以不會對用戶進程產生任何block。然后,kernel會等待數據准備完成,然后將數據拷貝到用戶內存,當這一切都完成之后,kernel會給用戶進程發送一個signal,告訴它read操作完成了。
到目前為止,已經將四個IO Model都介紹完了。現在回過頭來回答最初的那幾個問題:blocking和non-blocking的區別在哪,synchronous IO和asynchronous IO的區別在哪。
先回答最簡單的這個:blocking vs non-blocking。前面的介紹中其實已經很明確的說明了這兩者的區別。調用blocking IO會一直block住對應的進程直到操作完成,而non-blocking IO在kernel還准備數據的情況下會立刻返回。
在說明synchronous IO和asynchronous IO的區別之前,需要先給出兩者的定義。Stevens給出的定義(其實是POSIX的定義)是這樣子的:
A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes;
An asynchronous I/O operation does not cause the requesting process to be blocked;
兩者的區別就在於synchronous IO做”IO operation”的時候會將process阻塞。按照這個定義,之前所述的blocking IO,non-blocking IO,IO multiplexing都屬於synchronous IO。有人可能會說,non-blocking IO並沒有被block啊。這里有個非常“狡猾”的地方,定義中所指的”IO operation”是指真實的IO操作,就是例子中的recvfrom這個system call。non-blocking IO在執行recvfrom這個system call的時候,如果kernel的數據沒有准備好,這時候不會block進程。但是,當kernel中數據准備好的時候,recvfrom會將數據從kernel拷貝到用戶內存中,這個時候進程是被block了,在這段時間內,進程是被block的。而asynchronous IO則不一樣,當進程發起IO 操作之后,就直接返回再也不理睬了,直到kernel發送一個信號,告訴進程說IO完成。在這整個過程中,進程完全沒有被block。
注意:由於咱們接下來要講的select,poll,epoll都屬於IO多路復用,而IO多路復用又屬於同步的范疇,故,epoll只是一個偽異步而已。
各個IO Model的比較如圖所示:
經過上面的介紹,會發現non-blocking IO和asynchronous IO的區別還是很明顯的。在non-blocking IO中,雖然進程大部分時間都不會被block,但是它仍然要求進程去主動的check,並且當數據准備完成以后,也需要進程主動的再次調用recvfrom來將數據拷貝到用戶內存。而asynchronous IO則完全不同。它就像是用戶進程將整個IO操作交給了他人(kernel)完成,然后他人做完后發信號通知。在此期間,用戶進程不需要去檢查IO操作的狀態,也不需要主動的去拷貝數據。
五種IO模型比較:
6 select poll epoll IO多路復用介紹
首先列一下,sellect、poll、epoll三者的區別
- select
select最早於1983年出現在4.2BSD中,它通過一個select()系統調用來監視多個文件描述符的數組,當select()返回后,該數組中就緒的文件描述符便會被內核修改標志位,使得進程可以獲得這些文件描述符從而進行后續的讀寫操作。
select目前幾乎在所有的平台上支持
select的一個缺點在於單個進程能夠監視的文件描述符的數量存在最大限制,在Linux上一般為1024,不過可以通過修改宏定義甚至重新編譯內核的方式提升這一限制。
另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其復制的開銷也線性增長。同時,由於網絡響應時間的延遲使得大量TCP連接處於非活躍狀態,但調用select()會對所有socket進行一次線性掃描,所以這也浪費了一定的開銷。
-
poll
它和select在本質上沒有多大差別,但是poll沒有最大文件描述符數量的限制。
一般也不用它,相當於過渡階段 -
epoll
直到Linux2.6才出現了由內核直接支持的實現方法,那就是epoll。被公認為Linux2.6下性能最好的多路I/O就緒通知方法。windows不支持
沒有最大文件描述符數量的限制。
比如100個連接,有兩個活躍了,epoll會告訴用戶這兩個兩個活躍了,直接取就ok了,而select是循環一遍。
(了解)epoll可以同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變為就緒狀態,它只說一遍,如果我們沒有采取行動,那么它將不會再次告知,這種方式稱為邊緣觸發),理論上邊緣觸發的性能要更高一些,但是代碼實現相當復雜。
另一個本質的改進在於epoll采用基於事件的就緒通知方式。在select/poll中,進程只有在調用一定的方法后,內核才對所有監視的文件描述符進行掃描,而epoll事先通過epoll_ctl()來注冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會采用類似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便得到通知。
所以市面上上見到的所謂的異步IO,比如nginx、Tornado、等,我們叫它異步IO,實際上是IO多路復用。
select與epoll

# 首先我們來定義流的概念,一個流可以是文件,socket,pipe等等可以進行I/O操作的內核對象。 # 不管是文件,還是套接字,還是管道,我們都可以把他們看作流。 # 之后我們來討論I/O的操作,通過read,我們可以從流中讀入數據;通過write,我們可以往流寫入數據。現在假 # 定一個情形,我們需要從流中讀數據,但是流中還沒有數據,(典型的例子為,客戶端要從socket讀如數據,但是 # 服務器還沒有把數據傳回來),這時候該怎么辦? # 阻塞。阻塞是個什么概念呢?比如某個時候你在等快遞,但是你不知道快遞什么時候過來,而且你沒有別的事可以干 # (或者說接下來的事要等快遞來了才能做);那么你可以去睡覺了,因為你知道快遞把貨送來時一定會給你打個電話 # (假定一定能叫醒你)。 # 非阻塞忙輪詢。接着上面等快遞的例子,如果用忙輪詢的方法,那么你需要知道快遞員的手機號,然后每分鍾給他掛 # 個電話:“你到了沒?” # 很明顯一般人不會用第二種做法,不僅顯很無腦,浪費話費不說,還占用了快遞員大量的時間。 # 大部分程序也不會用第二種做法,因為第一種方法經濟而簡單,經濟是指消耗很少的CPU時間,如果線程睡眠了, # 就掉出了系統的調度隊列,暫時不會去瓜分CPU寶貴的時間片了。 # # 為了了解阻塞是如何進行的,我們來討論緩沖區,以及內核緩沖區,最終把I/O事件解釋清楚。緩沖區的引入是為 # 了減少頻繁I/O操作而引起頻繁的系統調用(你知道它很慢的),當你操作一個流時,更多的是以緩沖區為單位進 # 行操作,這是相對於用戶空間而言。對於內核來說,也需要緩沖區。 # 假設有一個管道,進程A為管道的寫入方,B為管道的讀出方。 # 假設一開始內核緩沖區是空的,B作為讀出方,被阻塞着。然后首先A往管道寫入,這時候內核緩沖區由空的狀態變 # 到非空狀態,內核就會產生一個事件告訴B該醒來了,這個事件姑且稱之為“緩沖區非空”。 # 但是“緩沖區非空”事件通知B后,B卻還沒有讀出數據;且內核許諾了不能把寫入管道中的數據丟掉這個時候,A寫 # 入的數據會滯留在內核緩沖區中,如果內核也緩沖區滿了,B仍未開始讀數據,最終內核緩沖區會被填滿,這個時候 # 會產生一個I/O事件,告訴進程A,你該等等(阻塞)了,我們把這個事件定義為“緩沖區滿”。 # 假設后來B終於開始讀數據了,於是內核的緩沖區空了出來,這時候內核會告訴A,內核緩沖區有空位了,你可以從 # 長眠中醒來了,繼續寫數據了,我們把這個事件叫做“緩沖區非滿” # 也許事件Y1已經通知了A,但是A也沒有數據寫入了,而B繼續讀出數據,知道內核緩沖區空了。這個時候內核就告 # 訴B,你需要阻塞了!,我們把這個時間定為“緩沖區空”。 # 這四個情形涵蓋了四個I/O事件,緩沖區滿,緩沖區空,緩沖區非空,緩沖區非滿(注都是說的內核緩沖區,且這四 # 個術語都是我生造的,僅為解釋其原理而造)。這四個I/O事件是進行阻塞同步的根本。(如果不能理解“同步”是 # 什么概念,請學習操作系統的鎖,信號量,條件變量等任務同步方面的相關知識)。 # # 然后我們來說說阻塞I/O的缺點。但是阻塞I/O模式下,一個線程只能處理一個流的I/O事件。如果想要同時處理多 # 個流,要么多進程(fork),要么多線程(pthread_create),很不幸這兩種方法效率都不高。 # 於是再來考慮非阻塞忙輪詢的I/O方式,我們發現我們可以同時處理多個流了(把一個流從阻塞模式切換到非阻塞 # 模式再此不予討論): # while true { # for i in stream[]; { # if i has data # read until unavailable # } # } # 我們只要不停的把所有流從頭到尾問一遍,又從頭開始。這樣就可以處理多個流了,但這樣的做法顯然不好,因為 # 如果所有的流都沒有數據,那么只會白白浪費CPU。這里要補充一點,阻塞模式下,內核對於I/O事件的處理是阻 # 塞或者喚醒,而非阻塞模式下則把I/O事件交給其他對象(后文介紹的select以及epoll)處理甚至直接忽略。 # # 為了避免CPU空轉,可以引進了一個代理(一開始有一位叫做select的代理,后來又有一位叫做poll的代理,不 # 過兩者的本質是一樣的)。這個代理比較厲害,可以同時觀察許多流的I/O事件,在空閑的時候,會把當前線程阻 # 塞掉,當有一個或多個流有I/O事件時,就從阻塞態中醒來,於是我們的程序就會輪詢一遍所有的流(於是我們可 # 以把“忙”字去掉了)。代碼長這樣: # while true { # select(streams[]) # for i in streams[] { # if i has data # read until unavailable # } # } # 於是,如果沒有I/O事件產生,我們的程序就會阻塞在select處。但是依然有個問題,我們從select那里僅僅知 # 道了,有I/O事件發生了,但卻並不知道是那幾個流(可能有一個,多個,甚至全部),我們只能無差別輪詢所有流, # 找出能讀出數據,或者寫入數據的流,對他們進行操作。 # 但是使用select,我們有O(n)的無差別輪詢復雜度,同時處理的流越多,每一次無差別輪詢時間就越長。再次 # 說了這么多,終於能好好解釋epoll了 # epoll可以理解為event poll,不同於忙輪詢和無差別輪詢,epoll之會把哪個流發生了怎樣的I/O事件通知我 # 們。此時我們對這些流的操作都是有意義的。 # 在討論epoll的實現細節之前,先把epoll的相關操作列出: # epoll_create 創建一個epoll對象,一般epollfd = epoll_create() # epoll_ctl (epoll_add/epoll_del的合體),往epoll對象中增加/刪除某一個流的某一個事件 # 比如 # epoll_ctl(epollfd, EPOLL_CTL_ADD, socket, EPOLLIN);//有緩沖區內有數據時epoll_wait返回 # epoll_ctl(epollfd, EPOLL_CTL_DEL, socket, EPOLLOUT);//緩沖區可寫入時epoll_wait返回 # epoll_wait(epollfd,...)等待直到注冊的事件發生 # (注:當對一個非阻塞流的讀寫發生緩沖區滿或緩沖區空,write/read會返回-1,並設置errno=EAGAIN。 # 而epoll只關心緩沖區非滿和緩沖區非空事件)。 # 一個epoll模式的代碼大概的樣子是: # while true { # active_stream[] = epoll_wait(epollfd) # for i in active_stream[] { # read or write till unavailable # } # } # 舉個例子: # select: # 班里三十個同學在考試,誰先做完想交卷都要通過按鈕來活動,他按按鈕作為老師的我桌子上的燈就會變紅. # 一旦燈變紅,我(select)我就可以知道有人交卷了,但是我並不知道誰交的,所以,我必須跟個傻子似的輪詢 # 地去問:嘿,是你要交卷嗎?然后我就可以以這種效率極低地方式找到要交卷的學生,然后把它的卷子收上來. # # # epoll: # 這次再有人按按鈕,我這不光燈會亮,上面還會顯示要交卷學生的名字.這樣我就可以直接去對應學生那收卷就 # 好了.當然,同時可以有多人交卷.
IO多路復用的觸發方式

# 在linux的IO多路復用中有水平觸發,邊緣觸發兩種模式,這兩種模式的區別如下: # # 水平觸發:如果文件描述符已經就緒可以非阻塞的執行IO操作了,此時會觸發通知.允許在任意時刻重復檢測IO的狀態, # 沒有必要每次描述符就緒后盡可能多的執行IO.select,poll就屬於水平觸發. # # 邊緣觸發:如果文件描述符自上次狀態改變后有新的IO活動到來,此時會觸發通知.在收到一個IO事件通知后要盡可能 # 多的執行IO操作,因為如果在一次通知中沒有執行完IO那么就需要等到下一次新的IO活動到來才能獲取到就緒的描述 # 符.信號驅動式IO就屬於邊緣觸發. # # epoll既可以采用水平觸發,也可以采用邊緣觸發. # # 大家可能還不能完全了解這兩種模式的區別,我們可以舉例說明:一個管道收到了1kb的數據,epoll會立即返回,此時 # 讀了512字節數據,然后再次調用epoll.這時如果是水平觸發的,epoll會立即返回,因為有數據准備好了.如果是邊 # 緣觸發的不會立即返回,因為此時雖然有數據可讀但是已經觸發了一次通知,在這次通知到現在還沒有新的數據到來, # 直到有新的數據到來epoll才會返回,此時老的數據和新的數據都可以讀取到(當然是需要這次你盡可能的多讀取). # 下面我們還從電子的角度來解釋一下: # # 水平觸發:也就是只有高電平(1)或低電平(0)時才觸發通知,只要在這兩種狀態就能得到通知.上面提到的只要 # 有數據可讀(描述符就緒)那么水平觸發的epoll就立即返回. # # 邊緣觸發:只有電平發生變化(高電平到低電平,或者低電平到高電平)的時候才觸發通知.上面提到即使有數據 # 可讀,但是沒有新的IO活動到來,epoll也不會立即返回.
簡單實例
實例1(non-blocking IO):

import time import socket sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM) sk.setsockopt sk.bind(('127.0.0.1',6667)) sk.listen(5) sk.setblocking(False) while True: try: print ('waiting client connection .......') connection,address = sk.accept() # 進程主動輪詢 print("+++",address) client_messge = connection.recv(1024) print(str(client_messge,'utf8')) connection.close() except Exception as e: print (e) time.sleep(4) #############################client import time import socket sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM) while True: sk.connect(('127.0.0.1',6667)) print("hello") sk.sendall(bytes("hello","utf8")) time.sleep(2) break
優點:能夠在等待任務完成的時間里干其他活了(包括提交其他任務,也就是 “后台” 可以有多個任務在同時執行)。
缺點:任務完成的響應延遲增大了,因為每過一段時間才去輪詢一次read操作,而任務可能在兩次輪詢之間的任意時間完成。這會導致整體數據吞吐量的降低。
實例2(IO multiplexing):
在非阻塞實例中,輪詢的主語是進程,而“后台” 可能有多個任務在同時進行,人們就想到了循環查詢多個任務的完成狀態,只要有任何一個任務完成,就去處理它。不過,這個監聽的重任通過調用select等函數交給了內核去做。IO多路復用有兩個特別的系統調用select、poll、epoll函數。select調用是內核級別的,select輪詢相對非阻塞的輪詢的區別在於—前者可以等待多個socket,能實現同時對多個IO端口進行監聽,當其中任何一個socket的數據准好了,就能返回進行可讀,然后進程再進行recvfrom系統調用,將數據由內核拷貝到用戶進程,當然這個過程是阻塞的。
實例2:

import socket import select sk=socket.socket() sk.bind(("127.0.0.1",9904)) sk.listen(5) while True: r,w,e=select.select([sk,],[],[],5) for i in r: # conn,add=i.accept() #print(conn) print("hello") print('>>>>>>') #*************************client.py import socket sk=socket.socket() sk.connect(("127.0.0.1",9904)) while 1: inp=input(">>").strip() sk.send(inp.encode("utf8")) data=sk.recv(1024) print(data.decode("utf8"))
請思考:為什么不調用accept,會反復print?

select屬於水平觸發
實例3(server端並發聊天):

#***********************server.py import socket import select sk=socket.socket() sk.bind(("127.0.0.1",8801)) sk.listen(5) inputs=[sk,] while True: r,w,e=select.select(inputs,[],[],5) print(len(r)) for obj in r: if obj==sk: conn,add=obj.accept() print(conn) inputs.append(conn) else: data_byte=obj.recv(1024) print(str(data_byte,'utf8')) inp=input('回答%s號客戶>>>'%inputs.index(obj)) obj.sendall(bytes(inp,'utf8')) print('>>',r) #***********************client.py import socket sk=socket.socket() sk.connect(('127.0.0.1',8801)) while True: inp=input(">>>>") sk.sendall(bytes(inp,"utf8")) data=sk.recv(1024) print(str(data,'utf8'))
文件描述符其實就是咱們平時說的句柄,只不過文件描述符是linux中的概念。注意,我們的accept或recv調用時即向系統發出recvfrom請求
(1) 如果內核緩沖區沒有數據--->等待--->數據到了內核緩沖區,轉到用戶進程緩沖區;
(2) 如果先用select監聽到某個文件描述符對應的內核緩沖區有了數據,當我們再調用accept或recv時,直接將數據轉到用戶緩沖區。
思考1:開啟5個client,分別按54321的順序發送消息,那么server端是按什么順序回消息的呢?
思考2: 如何在某一個client端退出后,不影響server端和其它客戶端正常交流
linux:
if not data_byte: inputs.remove(obj) continue
win
try: data_byte=obj.recv(1024) print(str(data_byte,'utf8')) inp=input('回答%s號客戶>>>'%inputs.index(obj)) obj.sendall(bytes(inp,'utf8')) except Exception: inputs.remove(obj)
延伸
實例4:

#_*_coding:utf-8_*_ __author__ = 'Alex Li' import select import socket import sys import queue # Create a TCP/IP socket server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(False) # Bind the socket to the port server_address = ('localhost', 10000) print(sys.stderr, 'starting up on %s port %s' % server_address) server.bind(server_address) # Listen for incoming connections server.listen(5) # Sockets from which we expect to read inputs = [ server ] # Sockets to which we expect to write outputs = [ ] message_queues = {} while inputs: # Wait for at least one of the sockets to be ready for processing print( '\nwaiting for the next event') readable, writable, exceptional = select.select(inputs, outputs, inputs) # Handle inputs for s in readable: if s is server: # A "readable" server socket is ready to accept a connection connection, client_address = s.accept() print('new connection from', client_address) connection.setblocking(False) inputs.append(connection) # Give the connection a queue for data we want to send message_queues[connection] = queue.Queue() else: data = s.recv(1024) if data: # A readable client socket has data print(sys.stderr, 'received "%s" from %s' % (data, s.getpeername()) ) message_queues[s].put(data) # Add output channel for response if s not in outputs: outputs.append(s) else: # Interpret empty result as closed connection print('closing', client_address, 'after reading no data') # Stop listening for input on the connection if s in outputs: outputs.remove(s) #既然客戶端都斷開了,我就不用再給它返回數據了,所以這時候如果這個客戶端的連接對象還在outputs列表中,就把它刪掉 inputs.remove(s) #inputs中也刪除掉 s.close() #把這個連接關閉掉 # Remove message queue del message_queues[s] # Handle outputs for s in writable: try: next_msg = message_queues[s].get_nowait() except queue.Empty: # No messages waiting so stop checking for writability. print('output queue for', s.getpeername(), 'is empty') outputs.remove(s) else: print( 'sending "%s" to %s' % (next_msg, s.getpeername())) s.send(next_msg) # Handle "exceptional conditions" for s in exceptional: print('handling exceptional condition for', s.getpeername() ) # Stop listening for input on the connection inputs.remove(s) if s in outputs: outputs.remove(s) s.close() # Remove message queue del message_queues[s]
實例5:

# select 模擬一個socket server,注意socket必須在非阻塞情況下才能實現IO多路復用。 # 接下來通過例子了解select 是如何通過單進程實現同時處理多個非阻塞的socket連接的。 #server端 import select import socket import queue server = socket.socket() server.bind(('localhost',9000)) server.listen(1000) server.setblocking(False) # 設置成非阻塞模式,accept和recv都非阻塞 # 這里如果直接 server.accept() ,如果沒有連接會報錯,所以有數據才調他們 # BlockIOError:[WinError 10035] 無法立即完成一個非阻塞性套接字操作。 msg_dic = {} inputs = [server,] # 交給內核、select檢測的列表。 # 必須有一個值,讓select檢測,否則報錯提供無效參數。 # 沒有其他連接之前,自己就是個socket,自己就是個連接,檢測自己。活動了說明有鏈接 outputs = [] # 你往里面放什么,下一次就出來了 while True: readable, writeable, exceptional = select.select(inputs, outputs, inputs) # 定義檢測 #新來連接 檢測列表 異常(斷開) # 異常的也是inputs是: 檢測那些連接的存在異常 print(readable,writeable,exceptional) for r in readable: if r is server: # 有數據,代表來了一個新連接 conn, addr = server.accept() print("來了個新連接",addr) inputs.append(conn) # 把連接加到檢測列表里,如果這個連接活動了,就說明數據來了 # inputs = [server.conn] # 【conn】只返回活動的連接,但怎么確定是誰活動了 # 如果server活動,則來了新連接,conn活動則來數據 msg_dic[conn] = queue.Queue() # 初始化一個隊列,后面存要返回給這個客戶端的數據 else: try : data = r.recv(1024) # 注意這里是r,而不是conn,多個連接的情況 print("收到數據",data) # r.send(data) # 不能直接發,如果客戶端不收,數據就沒了 msg_dic[r].put(data) # 往里面放數據 outputs.append(r) # 放入返回的連接隊列里 except ConnectionResetError as e: print("客戶端斷開了",r) if r in outputs: outputs.remove(r) #清理已斷開的連接 inputs.remove(r) #清理已斷開的連接 del msg_dic[r] ##清理已斷開的連接 for w in writeable: # 要返回給客戶端的連接列表 data_to_client = msg_dic[w].get() # 在字典里取數據 w.send(data_to_client) # 返回給客戶端 outputs.remove(w) # 刪除這個數據,確保下次循環的時候不返回這個已經處理完的連接了。 for e in exceptional: # 如果連接斷開,刪除連接相關數據 if e in outputs: outputs.remove(e) inputs.remove(e) del msg_dic[e] #*************************client import socket client = socket.socket() client.connect(('localhost', 9000)) while True: cmd = input('>>> ').strip() if len(cmd) == 0 : continue client.send(cmd.encode('utf-8')) data = client.recv(1024) print(data.decode()) client.close()
實例6:

import selectors import socket sel = selectors.DefaultSelector() def accept(sock, mask): conn, addr = sock.accept() # Should be ready print('accepted', conn, 'from', addr) conn.setblocking(False) sel.register(conn, selectors.EVENT_READ, read) def read(conn, mask): data = conn.recv(1000) # Should be ready if data: print('echoing', repr(data), 'to', conn) conn.send(data) # Hope it won't block else: print('closing', conn) sel.unregister(conn) conn.close() sock = socket.socket() sock.bind(('localhost', 1234)) sock.listen(100) sock.setblocking(False) sel.register(sock, selectors.EVENT_READ, accept) while True: events = sel.select() for key, mask in events: callback = key.data callback(key.fileobj, mask)
注:本文最重要的參考文獻是Richard Stevens的“UNIX® Network Programming Volume 1, Third Edition: The Sockets Networking ” http://mp.weixin.qq.com/s__biz=MzA4MjEyNTA5Mw==&mid=2652563599&idx=1&sn=9781747e54d906c0c140228376e671ed&scene=21#wecha t_redirect
https://pymotw.com/2/select/#module-select
http://blog.csdn.net/lingfengtengfei/article/details/12392449
http://www.jb51.net/article/37416.htm
https://pymotw.com/2/select/#module-select