python網絡編程——IO多路復用之epoll


1、內核EPOLL模型講解

    此部分參考http://blog.csdn.net/mango_song/article/details/42643971博文並整理

    首先我們來定義流的概念,一個流可以是文件,socket,pipe等可以進行I/O操作的內核對象。不管是文件,還是套接字(socket),還是管道(pipe),我們都可以把他們看作流。

    之后我們來討論I/O操作,通過read,我們可以從流中讀入數據;通過write,我們可以往流中寫入數據。現在假定1種情形,我們需要從流中讀數據,但是流中還沒有數據,(典型的例子為,客戶端要從socket讀數據,但是服務器端還沒有把數據傳回來),這時候該怎么辦?

    阻塞:阻塞是個什么概念呢?比如某個時候你在等快遞,但是你還不知道快遞什么時候過來,而且你也沒有別的事可以干(或者說接下來的事要等快遞來了才能做);那么你可以去睡覺了,因為你知道快遞把貨送來時一定會給你打電話(假定一定能叫醒你)。

    非阻塞忙輪詢:接着上面等快遞的例子,如果用忙輪詢的方法,那么你需要知道快遞員的手機號,然后每分鍾給他打個電話:“你到了沒?”

    很明顯一般人不會用第二種做法,不僅顯得無腦,浪費話費不說,還占用了快遞員大量的時間。

    大部分程序也不會用第二種做法,因為第一種方法經濟而簡單,經濟是指消耗很少的CPU時間,如果線程睡眠了,就掉出了系統的調度隊列,暫時不會去瓜分CPU寶貴的時間片

    為了了解阻塞是如何進行的,我們來討論緩沖區,以及內核緩沖區,最終把I/O事件解釋清楚。緩沖區的引入是為了減少頻繁I/O操作而引起頻繁的系統調用(你知道它很慢的)當你操作一個流時,更多的是以緩沖區為單位進行操作,這是相對於用戶空間而言。對於內核來說,也需要緩沖區。

    假設有一個管道,進程A為管道的寫入方,B為管道的讀出方。假設一開始內核緩沖區是空的,B作為讀出方,被阻塞着。然后首先A往管道寫入,這時候內核緩沖區由空的狀態變到非空狀態,內核就會產生一個事件告訴B該醒來了,這個事件姑且稱之為“緩沖區非空”。但是“緩沖區非空”事件通知B后,B卻還沒有讀出數據;且內核許諾了不能把寫入管道中的數據丟掉這個時候,A寫入的數據會滯留在內核緩沖區中,如果內核也緩沖區滿了,B仍未開始讀數據,最終內核緩沖區會被填滿,這個時候會產生一個I/O事件,告訴進程A,你該等等(阻塞)了,我們把這個事件定義為“緩沖區滿”。假設后來B終於開始讀數據了,於是內核的緩沖區空了出來,這時候內核會告訴A,內核緩沖區有空位了,你可以從長眠中醒來了,繼續寫數據了,我們把這個事件叫做“緩沖區非滿”。也許事件Y1已經通知了A,但是A也沒有數據寫入了,而B繼續讀出數據,知道內核緩沖區空了。這個時候內核就告訴B,你需要阻塞了!,我們把這個時間定為“緩沖區空”。

    這四種情形涵蓋了四個I/O事件,內核緩沖區滿,內核緩沖區空,內核緩沖區非空,內核緩沖區非滿。這四個I/O事件是進行阻塞同步的根本。(如果不能理解“同步”是什么概念,請學習操作系統的鎖,信號量,條件變量等任務同步方面的相關知識)。

    然后我們來說說阻塞I/O的缺點。但是阻塞I/O模式下,一個線程只能處理一個流的I/O事件。如果想要同時處理多個流,要么多進程(fork),要么多線程(pthread_create),很不幸這兩種方法效率都不高。於是再來考慮非阻塞忙輪詢的I/O方式,我們發現可以同時處理多個流(把一個流從阻塞模式切換到非阻塞模式再此不予討論):

1 while true {  
2      for i in stream[]; {  
3            if i has data  
4            read until unavailable  
5         }  
6 }  
View Code

    我們只要不停的把所有流從頭到尾問一遍,又從頭開始。這樣就可以處理多個流了,但這樣的做法顯然不好,因為如果所有的流都沒有數據,那么只會白白浪費CPU。這里要補充一點,阻塞模式下,內核對於I/O事件的處理是阻塞或者喚醒,而非阻塞模式下則把I/O事件交給其他對象(后文介紹的select以及epoll)處理甚至直接忽略。

    為了避免CPU空轉,可以引進一個代理(一開始有一位叫做select的代理,后來又有一位叫做poll的代理,不過兩者的本質是一樣的)。這個代理比較厲害,可以同時觀察許多流的I/O事件,在空閑的時候,會把當前線程阻塞掉,當有一個或多個流有I/O事件時,就從阻塞態中醒來,於是我們的程序就會輪詢一遍所有的流(於是我們可以把“忙”字去掉了)。代碼長這樣:

1 while true {  
2       select(streams[])  
3       for i in streams[] {  
4             if i has data  
5             read until unavailable  
6        }  
7 }  
View Code

    於是,如果沒有I/O事件產生,我們的程序就會阻塞在select處。但是依然有個問題,我們從select那里僅僅知道了,有I/O事件發生了,但卻並不知道是那幾個流(可能有一個,多個,甚至全部),我們只能無差別輪詢所有流,找出能讀出數據,或者寫入數據的流,對他們進行操作。

    但是使用select,我們有O(n)的無差別輪詢復雜度,同時處理的流越多,每一次無差別輪詢時間就越長。再次說了這么多,終於能好好解釋epoll了。
    epoll可以理解為event poll,不同於忙輪詢和無差別輪詢,epoll只會把哪個流發生了怎樣的I/O事件通知我們。此時我們對這些流的操作都是有意義的(復雜度降低到了O(1))。
    在討論epoll的實現細節之前,先把epoll的相關操作列出:
1 epoll_create創建一個epoll對象,一般epollfd = epoll_create()  
2 epoll_ctl (epoll_add/epoll_del的合體),往epoll對象中增加/刪除某一個流的某一個事件  
3  比如  
4 epoll_ctl(epollfd, EPOLL_CTL_ADD, socket, EPOLLIN);//注冊緩沖區非空事件,即有數據流入  
5 epoll_ctl(epollfd, EPOLL_CTL_DEL, socket, EPOLLOUT);//注冊緩沖區非滿事件,即流可以被寫入  
6 epoll_wait(epollfd,...)等待直到注冊的事件發生  
7 (注:當對一個非阻塞流的讀寫發生緩沖區滿或緩沖區空,write/read會返回-1,並設置errno=EAGAIN。而epoll只關心緩沖區非滿和緩沖區非空事件)。  
View Code

 一個epoll模式的代碼大概的樣子是:

View Code

2 python中的epoll

   從以上可知,epoll是對select、poll模型的改進,提高了網絡編程的性能,廣泛應用於大規模並發請求的C/S架構中。

  1、觸發方式:

     邊緣觸發/水平觸發,只適用於Unix/Linux操作系統

   2、原理圖

  3、一般步驟

  1. Create an epoll object——創建1個epoll對象
  2. Tell the epoll object to monitor specific events on specific sockets——告訴epoll對象,在指定的socket上監聽指定的事件
  3. Ask the epoll object which sockets may have had the specified event since the last query——詢問epoll對象,從上次查詢以來,哪些socket發生了哪些指定的事件
  4. Perform some action on those sockets——在這些socket上執行一些操作
  5. Tell the epoll object to modify the list of sockets and/or events to monitor——告訴epoll對象,修改socket列表和(或)事件,並監控
  6. Repeat steps 3 through 5 until finished——重復步驟3-5,直到完成
  7. Destroy the epoll object——銷毀epoll對象

  4、相關用法

import select 導入select模塊

epoll = select.epoll() 創建一個epoll對象

epoll.register(文件句柄,事件類型) 注冊要監控的文件句柄和事件

事件類型:

  select.EPOLLIN    可讀事件

  select.EPOLLOUT   可寫事件

  select.EPOLLERR   錯誤事件

  select.EPOLLHUP   客戶端斷開事件

epoll.unregister(文件句柄)   銷毀文件句柄

epoll.poll(timeout)  當文件句柄發生變化,則會以列表的形式主動報告給用戶進程,timeout

                     為超時時間,默認為-1,即一直等待直到文件句柄發生變化,如果指定為1

                     那么epoll每1秒匯報一次當前文件句柄的變化情況,如果無變化則返回空

epoll.fileno() 返回epoll的控制文件描述符(Return the epoll control file descriptor)

epoll.modfiy(fineno,event) fineno為文件描述符 event為事件類型  作用是修改文件描述符所對應的事件

epoll.fromfd(fileno) 從1個指定的文件描述符創建1個epoll對象

epoll.close()   關閉epoll對象的控制文件描述符

   5 實例:客戶端發送數據 服務端將接收的數據返回給客戶端

 1 #!/usr/bin/env python
 2 #-*- coding:utf-8 -*-
 3 
 4 import socket
 5 import select
 6 import Queue
 7 
 8 #創建socket對象
 9 serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
10 #設置IP地址復用
11 serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
12 #ip地址和端口號
13 server_address = ("127.0.0.1", 8888)
14 #綁定IP地址
15 serversocket.bind(server_address)
16 #監聽,並設置最大連接數
17 serversocket.listen(10)
18 print  "服務器啟動成功,監聽IP:" , server_address
19 #服務端設置非阻塞
20 serversocket.setblocking(False)  
21 #超時時間
22 timeout = 10
23 #創建epoll事件對象,后續要監控的事件添加到其中
24 epoll = select.epoll()
25 #注冊服務器監聽fd到等待讀事件集合
26 epoll.register(serversocket.fileno(), select.EPOLLIN)
27 #保存連接客戶端消息的字典,格式為{}
28 message_queues = {}
29 #文件句柄到所對應對象的字典,格式為{句柄:對象}
30 fd_to_socket = {serversocket.fileno():serversocket,}
31 
32 while True:
33   print "等待活動連接......"
34   #輪詢注冊的事件集合,返回值為[(文件句柄,對應的事件),(...),....]
35   events = epoll.poll(timeout)
36   if not events:
37      print "epoll超時無活動連接,重新輪詢......"
38      continue
39   print "" , len(events), "個新事件,開始處理......"
40   
41   for fd, event in events:
42      socket = fd_to_socket[fd]
43      #如果活動socket為當前服務器socket,表示有新連接
44      if socket == serversocket:
45             connection, address = serversocket.accept()
46             print "新連接:" , address
47             #新連接socket設置為非阻塞
48             connection.setblocking(False)
49             #注冊新連接fd到待讀事件集合
50             epoll.register(connection.fileno(), select.EPOLLIN)
51             #把新連接的文件句柄以及對象保存到字典
52             fd_to_socket[connection.fileno()] = connection
53             #以新連接的對象為鍵值,值存儲在隊列中,保存每個連接的信息
54             message_queues[connection]  = Queue.Queue()
55      #關閉事件
56      elif event & select.EPOLLHUP:
57         print 'client close'
58         #在epoll中注銷客戶端的文件句柄
59         epoll.unregister(fd)
60         #關閉客戶端的文件句柄
61         fd_to_socket[fd].close()
62         #在字典中刪除與已關閉客戶端相關的信息
63         del fd_to_socket[fd]
64      #可讀事件
65      elif event & select.EPOLLIN:
66         #接收數據
67         data = socket.recv(1024)
68         if data:
69            print "收到數據:" , data , "客戶端:" , socket.getpeername()
70            #將數據放入對應客戶端的字典
71            message_queues[socket].put(data)
72            #修改讀取到消息的連接到等待寫事件集合(即對應客戶端收到消息后,再將其fd修改並加入寫事件集合)
73            epoll.modify(fd, select.EPOLLOUT)
74      #可寫事件
75      elif event & select.EPOLLOUT:
76         try:
77            #從字典中獲取對應客戶端的信息
78            msg = message_queues[socket].get_nowait()
79         except Queue.Empty:
80            print socket.getpeername() , " queue empty"
81            #修改文件句柄為讀事件
82            epoll.modify(fd, select.EPOLLIN)
83         else :
84            print "發送數據:" , data , "客戶端:" , socket.getpeername()
85            #發送數據
86            socket.send(msg)
87 
88 #在epoll中注銷服務端文件句柄
89 epoll.unregister(serversocket.fileno())
90 #關閉epoll
91 epoll.close()
92 #關閉服務器socket
93 serversocket.close()
服務端代碼
 1 #!/usr/bin/env python
 2 #-*- coding:utf-8 -*-
 3 
 4 import socket
 5 
 6 #創建客戶端socket對象
 7 clientsocket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
 8 #服務端IP地址和端口號元組
 9 server_address = ('127.0.0.1',8888)
10 #客戶端連接指定的IP地址和端口號
11 clientsocket.connect(server_address)
12 
13 while True:
14     #輸入數據
15     data = raw_input('please input:')
16     #客戶端發送數據
17     clientsocket.sendall(data)
18     #客戶端接收數據
19     server_data = clientsocket.recv(1024)
20     print '客戶端收到的數據:'server_data
21     #關閉客戶端socket
22     clientsocket.close() 
客戶端代碼

 

參考資料:

      http://blog.csdn.net/mango_song/article/details/42643971

      http://www.cnblogs.com/Alanpy/articles/5125986.html

      http://scotdoyle.com/python-epoll-howto.html

      http://www.haiyun.me/archives/1056.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM