1、內核EPOLL模型講解
此部分參考http://blog.csdn.net/mango_song/article/details/42643971博文並整理
首先我們來定義流的概念,一個流可以是文件,socket,pipe等可以進行I/O操作的內核對象。不管是文件,還是套接字(socket),還是管道(pipe),我們都可以把他們看作流。
之后我們來討論I/O操作,通過read,我們可以從流中讀入數據;通過write,我們可以往流中寫入數據。現在假定1種情形,我們需要從流中讀數據,但是流中還沒有數據,(典型的例子為,客戶端要從socket讀數據,但是服務器端還沒有把數據傳回來),這時候該怎么辦?
阻塞:阻塞是個什么概念呢?比如某個時候你在等快遞,但是你還不知道快遞什么時候過來,而且你也沒有別的事可以干(或者說接下來的事要等快遞來了才能做);那么你可以去睡覺了,因為你知道快遞把貨送來時一定會給你打電話(假定一定能叫醒你)。
非阻塞忙輪詢:接着上面等快遞的例子,如果用忙輪詢的方法,那么你需要知道快遞員的手機號,然后每分鍾給他打個電話:“你到了沒?”
很明顯一般人不會用第二種做法,不僅顯得無腦,浪費話費不說,還占用了快遞員大量的時間。
大部分程序也不會用第二種做法,因為第一種方法經濟而簡單,經濟是指消耗很少的CPU時間,如果線程睡眠了,就掉出了系統的調度隊列,暫時不會去瓜分CPU寶貴的時間片。
為了了解阻塞是如何進行的,我們來討論緩沖區,以及內核緩沖區,最終把I/O事件解釋清楚。緩沖區的引入是為了減少頻繁I/O操作而引起頻繁的系統調用(你知道它很慢的),當你操作一個流時,更多的是以緩沖區為單位進行操作,這是相對於用戶空間而言。對於內核來說,也需要緩沖區。
假設有一個管道,進程A為管道的寫入方,B為管道的讀出方。假設一開始內核緩沖區是空的,B作為讀出方,被阻塞着。然后首先A往管道寫入,這時候內核緩沖區由空的狀態變到非空狀態,內核就會產生一個事件告訴B該醒來了,這個事件姑且稱之為“緩沖區非空”。但是“緩沖區非空”事件通知B后,B卻還沒有讀出數據;且內核許諾了不能把寫入管道中的數據丟掉這個時候,A寫入的數據會滯留在內核緩沖區中,如果內核也緩沖區滿了,B仍未開始讀數據,最終內核緩沖區會被填滿,這個時候會產生一個I/O事件,告訴進程A,你該等等(阻塞)了,我們把這個事件定義為“緩沖區滿”。假設后來B終於開始讀數據了,於是內核的緩沖區空了出來,這時候內核會告訴A,內核緩沖區有空位了,你可以從長眠中醒來了,繼續寫數據了,我們把這個事件叫做“緩沖區非滿”。也許事件Y1已經通知了A,但是A也沒有數據寫入了,而B繼續讀出數據,知道內核緩沖區空了。這個時候內核就告訴B,你需要阻塞了!,我們把這個時間定為“緩沖區空”。
這四種情形涵蓋了四個I/O事件,內核緩沖區滿,內核緩沖區空,內核緩沖區非空,內核緩沖區非滿。這四個I/O事件是進行阻塞同步的根本。(如果不能理解“同步”是什么概念,請學習操作系統的鎖,信號量,條件變量等任務同步方面的相關知識)。
然后我們來說說阻塞I/O的缺點。但是阻塞I/O模式下,一個線程只能處理一個流的I/O事件。如果想要同時處理多個流,要么多進程(fork),要么多線程(pthread_create),很不幸這兩種方法效率都不高。於是再來考慮非阻塞忙輪詢的I/O方式,我們發現可以同時處理多個流(把一個流從阻塞模式切換到非阻塞模式再此不予討論):

1 while true { 2 for i in stream[]; { 3 if i has data 4 read until unavailable 5 } 6 }
我們只要不停的把所有流從頭到尾問一遍,又從頭開始。這樣就可以處理多個流了,但這樣的做法顯然不好,因為如果所有的流都沒有數據,那么只會白白浪費CPU。這里要補充一點,阻塞模式下,內核對於I/O事件的處理是阻塞或者喚醒,而非阻塞模式下則把I/O事件交給其他對象(后文介紹的select以及epoll)處理甚至直接忽略。
為了避免CPU空轉,可以引進一個代理(一開始有一位叫做select的代理,后來又有一位叫做poll的代理,不過兩者的本質是一樣的)。這個代理比較厲害,可以同時觀察許多流的I/O事件,在空閑的時候,會把當前線程阻塞掉,當有一個或多個流有I/O事件時,就從阻塞態中醒來,於是我們的程序就會輪詢一遍所有的流(於是我們可以把“忙”字去掉了)。代碼長這樣:

1 while true { 2 select(streams[]) 3 for i in streams[] { 4 if i has data 5 read until unavailable 6 } 7 }
於是,如果沒有I/O事件產生,我們的程序就會阻塞在select處。但是依然有個問題,我們從select那里僅僅知道了,有I/O事件發生了,但卻並不知道是那幾個流(可能有一個,多個,甚至全部),我們只能無差別輪詢所有流,找出能讀出數據,或者寫入數據的流,對他們進行操作。

1 epoll_create創建一個epoll對象,一般epollfd = epoll_create() 2 epoll_ctl (epoll_add/epoll_del的合體),往epoll對象中增加/刪除某一個流的某一個事件 3 比如 4 epoll_ctl(epollfd, EPOLL_CTL_ADD, socket, EPOLLIN);//注冊緩沖區非空事件,即有數據流入 5 epoll_ctl(epollfd, EPOLL_CTL_DEL, socket, EPOLLOUT);//注冊緩沖區非滿事件,即流可以被寫入 6 epoll_wait(epollfd,...)等待直到注冊的事件發生 7 (注:當對一個非阻塞流的讀寫發生緩沖區滿或緩沖區空,write/read會返回-1,並設置errno=EAGAIN。而epoll只關心緩沖區非滿和緩沖區非空事件)。
一個epoll模式的代碼大概的樣子是:

2 python中的epoll
從以上可知,epoll是對select、poll模型的改進,提高了網絡編程的性能,廣泛應用於大規模並發請求的C/S架構中。
1、觸發方式:
邊緣觸發/水平觸發,只適用於Unix/Linux操作系統
2、原理圖
3、一般步驟
- Create an epoll object——創建1個epoll對象
- Tell the epoll object to monitor specific events on specific sockets——告訴epoll對象,在指定的socket上監聽指定的事件
- Ask the epoll object which sockets may have had the specified event since the last query——詢問epoll對象,從上次查詢以來,哪些socket發生了哪些指定的事件
- Perform some action on those sockets——在這些socket上執行一些操作
- Tell the epoll object to modify the list of sockets and/or events to monitor——告訴epoll對象,修改socket列表和(或)事件,並監控
- Repeat steps 3 through 5 until finished——重復步驟3-5,直到完成
- Destroy the epoll object——銷毀epoll對象
4、相關用法
import select 導入select模塊 epoll = select.epoll() 創建一個epoll對象 epoll.register(文件句柄,事件類型) 注冊要監控的文件句柄和事件 事件類型: select.EPOLLIN 可讀事件 select.EPOLLOUT 可寫事件 select.EPOLLERR 錯誤事件 select.EPOLLHUP 客戶端斷開事件 epoll.unregister(文件句柄) 銷毀文件句柄 epoll.poll(timeout) 當文件句柄發生變化,則會以列表的形式主動報告給用戶進程,timeout 為超時時間,默認為-1,即一直等待直到文件句柄發生變化,如果指定為1 那么epoll每1秒匯報一次當前文件句柄的變化情況,如果無變化則返回空 epoll.fileno() 返回epoll的控制文件描述符(Return the epoll control file descriptor) epoll.modfiy(fineno,event) fineno為文件描述符 event為事件類型 作用是修改文件描述符所對應的事件 epoll.fromfd(fileno) 從1個指定的文件描述符創建1個epoll對象 epoll.close() 關閉epoll對象的控制文件描述符 |
5 實例:客戶端發送數據 服務端將接收的數據返回給客戶端

1 #!/usr/bin/env python 2 #-*- coding:utf-8 -*- 3 4 import socket 5 import select 6 import Queue 7 8 #創建socket對象 9 serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 10 #設置IP地址復用 11 serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 12 #ip地址和端口號 13 server_address = ("127.0.0.1", 8888) 14 #綁定IP地址 15 serversocket.bind(server_address) 16 #監聽,並設置最大連接數 17 serversocket.listen(10) 18 print "服務器啟動成功,監聽IP:" , server_address 19 #服務端設置非阻塞 20 serversocket.setblocking(False) 21 #超時時間 22 timeout = 10 23 #創建epoll事件對象,后續要監控的事件添加到其中 24 epoll = select.epoll() 25 #注冊服務器監聽fd到等待讀事件集合 26 epoll.register(serversocket.fileno(), select.EPOLLIN) 27 #保存連接客戶端消息的字典,格式為{} 28 message_queues = {} 29 #文件句柄到所對應對象的字典,格式為{句柄:對象} 30 fd_to_socket = {serversocket.fileno():serversocket,} 31 32 while True: 33 print "等待活動連接......" 34 #輪詢注冊的事件集合,返回值為[(文件句柄,對應的事件),(...),....] 35 events = epoll.poll(timeout) 36 if not events: 37 print "epoll超時無活動連接,重新輪詢......" 38 continue 39 print "有" , len(events), "個新事件,開始處理......" 40 41 for fd, event in events: 42 socket = fd_to_socket[fd] 43 #如果活動socket為當前服務器socket,表示有新連接 44 if socket == serversocket: 45 connection, address = serversocket.accept() 46 print "新連接:" , address 47 #新連接socket設置為非阻塞 48 connection.setblocking(False) 49 #注冊新連接fd到待讀事件集合 50 epoll.register(connection.fileno(), select.EPOLLIN) 51 #把新連接的文件句柄以及對象保存到字典 52 fd_to_socket[connection.fileno()] = connection 53 #以新連接的對象為鍵值,值存儲在隊列中,保存每個連接的信息 54 message_queues[connection] = Queue.Queue() 55 #關閉事件 56 elif event & select.EPOLLHUP: 57 print 'client close' 58 #在epoll中注銷客戶端的文件句柄 59 epoll.unregister(fd) 60 #關閉客戶端的文件句柄 61 fd_to_socket[fd].close() 62 #在字典中刪除與已關閉客戶端相關的信息 63 del fd_to_socket[fd] 64 #可讀事件 65 elif event & select.EPOLLIN: 66 #接收數據 67 data = socket.recv(1024) 68 if data: 69 print "收到數據:" , data , "客戶端:" , socket.getpeername() 70 #將數據放入對應客戶端的字典 71 message_queues[socket].put(data) 72 #修改讀取到消息的連接到等待寫事件集合(即對應客戶端收到消息后,再將其fd修改並加入寫事件集合) 73 epoll.modify(fd, select.EPOLLOUT) 74 #可寫事件 75 elif event & select.EPOLLOUT: 76 try: 77 #從字典中獲取對應客戶端的信息 78 msg = message_queues[socket].get_nowait() 79 except Queue.Empty: 80 print socket.getpeername() , " queue empty" 81 #修改文件句柄為讀事件 82 epoll.modify(fd, select.EPOLLIN) 83 else : 84 print "發送數據:" , data , "客戶端:" , socket.getpeername() 85 #發送數據 86 socket.send(msg) 87 88 #在epoll中注銷服務端文件句柄 89 epoll.unregister(serversocket.fileno()) 90 #關閉epoll 91 epoll.close() 92 #關閉服務器socket 93 serversocket.close()

1 #!/usr/bin/env python 2 #-*- coding:utf-8 -*- 3 4 import socket 5 6 #創建客戶端socket對象 7 clientsocket = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 8 #服務端IP地址和端口號元組 9 server_address = ('127.0.0.1',8888) 10 #客戶端連接指定的IP地址和端口號 11 clientsocket.connect(server_address) 12 13 while True: 14 #輸入數據 15 data = raw_input('please input:') 16 #客戶端發送數據 17 clientsocket.sendall(data) 18 #客戶端接收數據 19 server_data = clientsocket.recv(1024) 20 print '客戶端收到的數據:'server_data 21 #關閉客戶端socket 22 clientsocket.close()
參考資料:
http://blog.csdn.net/mango_song/article/details/42643971
http://www.cnblogs.com/Alanpy/articles/5125986.html
http://scotdoyle.com/python-epoll-howto.html
http://www.haiyun.me/archives/1056.html