select函數操作集合的時候有個要求,要么集合本身是描述符,要么他提供一個fileno()接口,返回一個描述符。
I/O多路復用是在單線程模式下實現多線程的效果,實現一個多I/O並發的效果。看一個簡單socket例子:
服務端:
import socket SOCKET_FAMILY = socket.AF_INET SOCKET_TYPE = socket.SOCK_STREAM sockServer = socket.socket(SOCKET_FAMILY, SOCKET_TYPE) sockServer.bind(('0.0.0.0', 8888)) sockServer.listen(5) while True: cliobj, addr = sockServer.accept() while True: recvdata = cliobj.recv(1024) if recvdata: print(recvdata.decode()) else: cliobj.close() break
客戶端:
import socket socCli = socket.socket() socCli.connect(('127.0.0.1', 8888)) while True: data = input("input str:") socCli.send(data.encode())
以上為一個簡單的客戶端發送一個輸入信息給服務端的socket通信的實例,在以上的例子中,服務端是一個單線程、阻塞模式的。如何實現多客戶端連接呢,我們可以使用多線程模式,這個當然沒有問題。 使用多線程、阻塞socket來處理的話,代碼會很直觀,但是也會有不少缺陷。它很難確保線程共享資源沒有問題。而且這種編程風格的程序在只有一個CPU的電腦上面效率更低。但如果一個用戶開啟的線程有限的情況下,比如1024個。當第1025個客戶端連接是仍然會阻塞。
有沒有一種比較好的方式呢,當然有,其一是使用異步socket。 這種socket只有在一些event觸發時才會阻塞。相反,程序在異步socket上面執行一個動作,會立即被告知這個動作是否成功。程序會根據這個信 息決定怎么繼續下面的操作由於異步socket是非阻塞的,就沒有必要再來使用多線程。所有的工作都可以在一個線程中完成。這種單線程模式有它自己的挑 戰,但可以成為很多方案不錯的選擇。它也可以結合多線程一起使用:單線程使用異步socket用於處理服務器的網絡部分,多線程可以用來訪問其他阻塞資 源,比如數據庫。Linux的2.6內核有一系列機制來管理異 步socket,其中3個有對應的Python的API:select、poll和epoll。epoll和pool比select更好,因為 Python程序不需要檢查每一個socket感興趣的event。相反,它可以依賴操作系統來告訴它哪些socket可能有這些event。epoll 比pool更好,因為它不要求操作系統每次都去檢查python程序需要的所有socket感興趣的event。而是Linux在event發生的時候會 跟蹤到,並在Python需要的時候返回一個列表。因此epoll對於大量(成千上萬)並發socket連接,是更有效率和可擴展的機制
異步I/O處理模型
select最早於1983年出現在4.2BSD中,它通過一個select()系統調用來監視多個文件描述符的數組,當select()返回后,該數組中就緒的文件描述符便會被內核修改標志位,使得進程可以獲得這些文件描述符從而進行后續的讀寫操作。
select目前幾乎在所有的平台上支持,其良好跨平台支持也是它的一個優點,事實上從現在看來,這也是它所剩不多的優點之一。
select的一個缺點在於單個進程能夠監視的文件描述符的數量存在最大限制,在Linux上一般為1024,不過可以通過修改宏定義甚至重新編譯內核的方式提升這一限制。
另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其復制的開銷也線性增長。同時,由於網絡響應時間的延遲使得大量TCP連接處於非活躍狀態,但調用select()會對所有socket進行一次線性掃描,所以這也浪費了一定的開銷
select poll epoll比較
1 特點 | |
select | select本質上是通過設置或者檢查存放fd標志位的數據結構來進行下一步處理。這樣所帶來的缺點是: 1 單個進程可監視的fd數量被限制 2 需要維護一個用來存放大量fd的數據結構,這樣會使得用戶空間和內核空間在傳遞該結構時復制開銷大 3 對socket進行掃描時是線性掃描 |
poll | poll本質上和select沒有區別,它將用戶傳入的數組拷貝到內核空間,然后查詢每個fd對應的設備狀態,如果設備就緒則在設備等待隊列中加入一項並繼續遍歷,如果遍歷完所有fd后沒有發現就緒設備,則掛起當前進程,直到設備就緒或者主動超時,被喚醒后它又要再次遍歷fd。這個過程經歷了多次無謂的遍歷。 |
epoll | epoll支持水平觸發和邊緣觸發,最大的特點在於邊緣觸發,它只告訴進程哪些fd剛剛變為就需態,並且只會通知一次。 |
2 支持一個進程所能打開的最大連接數 | |
select | 單個進程所能打開的最大連接數有FD_SETSIZE宏定義,其大小是32個整數的大小(在32位的機器上,大小就是32*32,同理64位機器上FD_SETSIZE為32*64),當然我們可以對進行修改,然后重新編譯內核,但是性能可能會受到影響,這需要進一步的測試。 |
poll | poll本質上和select沒有區別,但是它沒有最大連接數的限制,原因是它是基於鏈表來存儲的 |
epoll | 雖然連接數有上限,但是很大,1G內存的機器上可以打開10萬左右的連接,2G內存的機器可以打開20萬左右的連接 |
3 FD劇增后帶來的IO效率問題 | |
select | 因為每次調用時都會對連接進行線性遍歷,所以隨着FD的增加會造成遍歷速度慢的“線性下降性能問題”。 |
poll | 同上 |
epoll | 因為epoll內核中實現是根據每個fd上的callback函數來實現的,只有活躍的socket才會主動調用callback,所以在活躍socket較少的情況下,使用epoll沒有前面兩者的線性下降的性能問題,但是所有socket都很活躍的情況下,可能會有性能問題。 |
4 消息傳遞方式 | |
select | 內核需要將消息傳遞到用戶空間,都需要內核拷貝動作。 |
poll | 同上 |
epoll | epoll通過內核和用戶空間共享一塊內存來實現的。 |
下面我們對上面的socket例子進行改造,看一下select的例子:
import socket import queue from select import select SERVER_IP = ('127.0.0.1', 9999) # 保存客戶端發送過來的消息,將消息放入隊列中 message_queue = {} input_list = [] output_list = [] if __name__ == "__main__": server = socket.socket() server.bind(SERVER_IP) server.listen(10) # 設置為非阻塞 server.setblocking(False) # 初始化將服務端加入監聽列表 input_list.append(server) while True: # 開始 select 監聽,對input_list中的服務端server進行監聽 stdinput, stdoutput, stderr = select(input_list, output_list, input_list) # 循環判斷是否有客戶端連接進來,當有客戶端連接進來時select將觸發 for obj in stdinput: # 判斷當前觸發的是不是服務端對象, 當觸發的對象是服務端對象時,說明有新客戶端連接進來了 if obj == server: # 接收客戶端的連接, 獲取客戶端對象和客戶端地址信息 conn, addr = server.accept() print("Client {0} connected! ".format(addr)) # 將客戶端對象也加入到監聽的列表中, 當客戶端發送消息時 select 將觸發 input_list.append(conn) # 為連接的客戶端單獨創建一個消息隊列,用來保存客戶端發送的消息 message_queue[conn] = queue.Queue() else: # 由於客戶端連接進來時服務端接收客戶端連接請求,將客戶端加入到了監聽列表中(input_list),客戶端發送消息將觸發 # 所以判斷是否是客戶端對象觸發 try: recv_data = obj.recv(1024) # 客戶端未斷開 if recv_data: print("received {0} from client {1}".format(recv_data.decode(), addr)) # 將收到的消息放入到各客戶端的消息隊列中 message_queue[obj].put(recv_data) # 將回復操作放到output列表中,讓select監聽 if obj not in output_list: output_list.append(obj) except ConnectionResetError: # 客戶端斷開連接了,將客戶端的監聽從input列表中移除 input_list.remove(obj) # 移除客戶端對象的消息隊列 del message_queue[obj] print("\n[input] Client {0} disconnected".format(addr)) # 如果現在沒有客戶端請求,也沒有客戶端發送消息時,開始對發送消息列表進行處理,是否需要發送消息 for sendobj in output_list: try: # 如果消息隊列中有消息,從消息隊列中獲取要發送的消息 if not message_queue[sendobj].empty(): # 從該客戶端對象的消息隊列中獲取要發送的消息 send_data = message_queue[sendobj].get() sendobj.sendall(send_data) else: # 將監聽移除等待下一次客戶端發送消息 output_list.remove(sendobj) except ConnectionResetError: # 客戶端連接斷開了 del message_queue[sendobj] output_list.remove(sendobj) print("\n[output] Client {0} disconnected".format(addr))
epoll實現實例:
#!/usr/bin/env python import select import socket response = b'' serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serversocket.bind(('0.0.0.0', 8080)) serversocket.listen(1) # 因為socket默認是阻塞的,所以需要使用非阻塞(異步)模式。 serversocket.setblocking(0) # 創建一個epoll對象 epoll = select.epoll() # 在服務端socket上面注冊對讀event的關注。一個讀event隨時會觸發服務端socket去接收一個socket連接 epoll.register(serversocket.fileno(), select.EPOLLIN) try: # 字典connections映射文件描述符(整數)到其相應的網絡連接對象 connections = {} requests = {} responses = {} while True: # 查詢epoll對象,看是否有任何關注的event被觸發。參數“1”表示,我們會等待1秒來看是否有event發生。 # 如果有任何我們感興趣的event發生在這次查詢之前,這個查詢就會帶着這些event的列表立即返回 events = epoll.poll(1) # event作為一個序列(fileno,event code)的元組返回。fileno是文件描述符的代名詞,始終是一個整數。 for fileno, event in events: # 如果是服務端產生event,表示有一個新的連接進來 if fileno == serversocket.fileno(): connection, address = serversocket.accept() print('client connected:', address) # 設置新的socket為非阻塞模式 connection.setblocking(0) # 為新的socket注冊對讀(EPOLLIN)event的關注 epoll.register(connection.fileno(), select.EPOLLIN) connections[connection.fileno()] = connection # 初始化接收的數據 requests[connection.fileno()] = b'' # 如果發生一個讀event,就讀取從客戶端發送過來的新數據 elif event & select.EPOLLIN: print("------recvdata---------") # 接收客戶端發送過來的數據 requests[fileno] += connections[fileno].recv(1024) # 如果客戶端退出,關閉客戶端連接,取消所有的讀和寫監聽 if not requests[fileno]: connections[fileno].close() # 刪除connections字典中的監聽對象 del connections[fileno] # 刪除接收數據字典對應的句柄對象 del requests[connections[fileno]] print(connections, requests) epoll.modify(fileno, 0) else: # 一旦完成請求已收到,就注銷對讀event的關注,注冊對寫(EPOLLOUT)event的關注。寫event發生的時候,會回復數據給客戶端 epoll.modify(fileno, select.EPOLLOUT) # 打印完整的請求,證明雖然與客戶端的通信是交錯進行的,但數據可以作為一個整體來組裝和處理 print('-' * 40 + '\n' + requests[fileno].decode()) # 如果一個寫event在一個客戶端socket上面發生,它會接受新的數據以便發送到客戶端 elif event & select.EPOLLOUT: print("-------send data---------") # 每次發送一部分響應數據,直到完整的響應數據都已經發送給操作系統等待傳輸給客戶端 byteswritten = connections[fileno].send(requests[fileno]) requests[fileno] = requests[fileno][byteswritten:] if len(requests[fileno]) == 0: # 一旦完整的響應數據發送完成,就不再關注寫event epoll.modify(fileno, select.EPOLLIN) # HUP(掛起)event表明客戶端socket已經斷開(即關閉),所以服務端也需要關閉。 # 沒有必要注冊對HUP event的關注。在socket上面,它們總是會被epoll對象注冊 elif event & select.EPOLLHUP: print("end hup------") # 注銷對此socket連接的關注 epoll.unregister(fileno) # 關閉socket連接 connections[fileno].close() del connections[fileno] finally: # 打開的socket連接不需要關閉,因為Python會在程序結束的時候關閉。這里顯式關閉是一個好的代碼習慣 epoll.unregister(serversocket.fileno()) epoll.close() serversocket.close()