IO多路復用(select、poll、epoll)介紹及select、epoll的實現
IO多路復用中包括 select、pool、epoll,這些都屬於同步,還不屬於異步
一、IO多路復用介紹
1、select
select最早於1983年出現在4.2BSD中,它通過一個select()系統調用來監視多個文件描述符的數組,當select()返回后,該數組中就緒的文件描述符便會被內核修改標志位,使得進程可以獲得這些文件描述符從而進行后續的讀寫操作。
select目前幾乎在所有的平台上支持,其良好跨平台支持也是它的一個優點,事實上從現在看來,這也是它所剩不多的優點之一。
select的一個缺點在於單個進程能夠監視的文件描述符的數量存在最大限制,在Linux上一般為1024,不過可以通過修改宏定義甚至重新編譯內核的方式提升這一限制。
另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其復制的開銷也線性增長。同時,由於網絡響應時間的延遲使得大量TCP連接處於非活躍狀態,但調用select()會對所有socket進行一次線性掃描,所以這也浪費了一定的開銷。
2、poll
poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差別,但是poll沒有最大文件描述符數量的限制。
poll和select同樣存在一個缺點就是,包含大量文件描述符的數組被整體復制於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增加而線性增大。
另外,select()和poll()將就緒的文件描述符告訴進程后,如果進程沒有對其進行IO操作,那么下次調用select()和poll()的時候將再次報告這些文件描述符,所以它們一般不會丟失就緒的消息,這種方式稱為水平觸發(Level Triggered)。
3、epoll
直到Linux2.6才出現了由內核直接支持的實現方法,那就是epoll,它幾乎具備了之前所說的一切優點,被公認為Linux2.6下性能最好的多路I/O就緒通知方法。
epoll可以同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變為就緒狀態,它只說一遍,如果我們沒有采取行動,那么它將不會再次告知,這種方式稱為邊緣觸發),理論上邊緣觸發的性能要更高一些,但是代碼實現相當復雜。
epoll同樣只告知那些就緒的文件描述符,而且當我們調用epoll_wait()獲得就緒文件描述符時,返回的不是實際的描述符,而是一個代表就緒描述符數量的值,你只需要去epoll指定的一個數組中依次取得相應數量的文件描述符即可,這里也使用了內存映射(mmap)技術,這樣便徹底省掉了這些文件描述符在系統調用時復制的開銷。
另一個本質的改進在於epoll采用基於事件的就緒通知方式。在select/poll中,進程只有在調用一定的方法后,內核才對所有監視的文件描述符進行掃描,而epoll事先通過epoll_ctl()來注冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會采用類似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便得到通知。
4、sellect、poll、epoll三者的區別
二、select IO多路復用
Python的select()方法直接調用操作系統的IO接口,它監控sockets,open files, and pipes(所有帶fileno()方法的文件句柄)何時變成readable 和writeable, 或者通信錯誤,select()使得同時監控多個連接變的簡單,並且這比寫一個長循環來等待和監控多客戶端連接要高效,因為select直接通過操作系統提供的C的網絡接口進行操作,而不是通過Python的解釋器。
select目前幾乎在所有的平台上支持,其良好跨平台支持也是它的一個優點。select的一 個缺點在於單個進程能夠監視的文件描述符的數量存在最大限制,在Linux上一般為1024,可以通過修改宏定義甚至重新編譯內核的方式提升這一限制,但是這樣會造成效率的降低
1、select語法:
select(rlist, wlist, xlist, timeout=None)
select()方法接收並監控3個通信列表, 第一個rlist監控所有要進來的輸入數據,第二個wlist是監控所有要發出去的輸出數據,第三個監控異常錯誤數據,第四個設置指定等待時間,如果想立即返回,設為null即可,最后需要創建2個列表來包含輸入和輸出信息來傳給select(),讓select方法通過內核去監控,然后生成三個實例。
#建立兩個列表,比如想讓內核去檢測50個連接,需要傳給它一個列表,就是這個inputs(列表中里面存放的是需要被內核監控的鏈接),然后交給select,就相當於交給內核了 inputs = [server,] #輸入列表,監控所有輸入數據 outputs = [] #輸入列表,監控所有輸出數據 #把兩個列表傳給select方法通過內核去監控,生成三個實例 readable,writeable,exceptional = select.select(inputs,outputs,inputs) # 這里select方法的第三個參數同樣傳入input列表是因為,input列表中存放着所有的鏈接,比如之前放入的50被監控鏈接中有5個斷了,出現了異常,就會輸入到exceptional里面,但這5鏈接本身是放在inputs列表中
2、select服務端代碼實例:
import select,socket,queue server = socket.socket() server.bind(("localhost",9000)) server.listen(1000) server.setblocking(False) #設置為非阻塞 msg_dic = dict() #定義一個隊列字典 inputs = [server,] #由於設置成非阻塞模式,accept和recive都不阻塞了,沒有值就會報錯,因此最開始需要最開始需要監控服務端本身,等待客戶端連接 outputs = [] while True: #exceptional表示如果inputs列表中出現異常,會輸出到這個exceptional中 readable,writeable,exceptional = select.select(inputs,outputs,inputs)#如果沒有任何客戶端連接,就會阻塞在這里 for r in readable:# 沒有個r代表一個socket鏈接 if r is server: #如果這個socket是server的話,就說明是是新客戶端連接了 conn,addr = r.accept() #新連接進來了,接受這個連接,生成這個客戶端實例 print("來了一個新連接",addr) inputs.append(conn)#為了不阻塞整個程序,我們不會立刻在這里開始接收客戶端發來的數據, 把它放到inputs里, 下一次loop時,這個新連接 #就會被交給select去監聽 msg_dic[conn] = queue.Queue() #初始化一個隊列,后面存要返回給這個客戶端的數據 else: #如果不是server,就說明是之前建立的客戶端來數據了 data = r.recv(1024) print("收到數據:",data) msg_dic[r].put(data)#收到的數據先放到queue里,一會返回給客戶端 outputs.append(r)#為了不影響處理與其它客戶端的連接 , 這里不立刻返回數據給客戶端 # r.send(data) # print("send done....") for w in writeable: #要返回給客戶端的鏈接列表 data_to_client = msg_dic[w].get() w.send(data_to_client) #返回給客戶端的源數據 outputs.remove(w) #確保下次循環的時候writeable,不返回這個已經處理完的這個連接了 for e in exceptional: #處理異常的連接 if e in outputs: #因為e不一定在outputs,所以先要判斷 outputs.remove(e) inputs.remove(e) #刪除inputs中異常連接 del msg_dic[e] #刪除此連接對應的隊列
三、epoll IO多路復用
epoll的方式,這種效率更高,但是這種方式在Windows下不支持,在Linux是支持的,selectors模塊就是默認使用就是epoll,但是如果在windows系統上使用selectors模塊,就會找不到epoll,從而使用select。
1、selectors語法:
#定義一個對象 sel = selectors.DefaultSelector() #注冊一個事件 sel.register(server,selectors.EVENT_READ,accept) #注冊事件,只要來一個連接就調accept這個函數,就相當於之前select的用法,sel.register(server,selectors.EVENT_READ,accept) == inputs=[server,],readable,writeable,exceptional = select.select(inputs,outputs,inputs)意思是一樣的。
2、selectors代碼實例:
import selectors,socket sel = selectors.DefaultSelector() def accept(sock,mask): "接收客戶端信息實例" conn,addr = sock.accept() print("accepted",conn,'from',addr) conn.setblocking(False) sel.register(conn,selectors.EVENT_READ,read) #新連接注冊read回調函數 def read(conn,mask): "接收客戶端的數據" data = conn.recv(1024) if data: print("echoing",repr(data),'to',conn) conn.send(data) else: print("closing",conn) sel.unregister(conn) conn.close() server = socket.socket() server.bind(('localhost',9999)) server.listen(500) server.setblocking(False) sel.register(server,selectors.EVENT_READ,accept) #注冊事件,只要來一個連接就調accept這個函數, #sel.register(server,selectors.EVENT_READ,accept) == inputs=[server,] while True: events = sel.select() #這個select,看起來是select,有可能調用的是epoll,看你操作系統是Windows的還是Linux的 #默認阻塞,有活動連接就返回活動連接列表 print("事件:",events) for key,mask in events: callback = key.data #相當於調accept了 callback(key.fileobj,mask) #key.fileobj=文件句柄
打印服務端:
[(SelectorKey(fileobj=<socket.socket fd=436, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222)>, fd=436, events=1, data=<function accept at 0x0000022296063E18>), 1)] accepted <socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)> from ('127.0.0.1', 50281) 事件: [(SelectorKey(fileobj=<socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>, fd=508, events=1, data=<function read at 0x00000222980501E0>), 1)] echoing b'adas' to <socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)> 事件: [(SelectorKey(fileobj=<socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>, fd=508, events=1, data=<function read at 0x00000222980501E0>), 1)] echoing b'HA' to <socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)> 事件: [(SelectorKey(fileobj=<socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>, fd=508, events=1, data=<function read at 0x00000222980501E0>), 1)] echoing b'asdHA' to <socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>
這樣就容易明白:callback
=
key.data
#第一次調用的是accept,第二次調用的是read
callback(key.fileobj,mask)
#key.fileobj=文件句柄
客戶端代碼:
在Linux端,selectors模塊才能是epoll
import socket,sys messages = [ b'This is the message. ', b'It will be sent ', b'in parts.', ] server_address = ('localhost', 9999) # 創建100個 TCP/IP socket實例 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(100)] # 連接服務端 print('connecting to %s port %s' % server_address) for s in socks: s.connect(server_address) for message in messages: # 發送消息至服務端 for s in socks: print('%s: sending "%s"' % (s.getsockname(), message) ) s.send(message) # 從服務端接收消息 for s in socks: data = s.recv(1024) print( '%s: received "%s"' % (s.getsockname(), data) ) if not data: print(sys.stderr, 'closing socket', s.getsockname() )