之前異步IO一直沒搞明白,大致的理解就是在一個大的循環中,有兩部分:第一部分是監聽事件;第二部分是處理事件(通過添加回調函數的方式)。就拿網絡通信來說,可以先通過調用 select 模塊中的 select 監聽各個 socket 。當 socket 有事件到來時,針對相應的事件做出處理,就這么一直循環下去。所以異步IO也被稱為事件驅動IO。原理其實我說得太簡單了,所以我會以一個例子來說明一切。不過在這之前我還是要說一下 select 和 epoll 的區別。
一、IO多路服用的select
IO多路復用相對於阻塞式和非阻塞式的好處就是它可以監聽多個 socket ,並且不會消耗過多資源。當用戶進程調用 select 時,它會監聽其中所有 socket 直到有一個或多個 socket 數據已經准備好,否則就一直處於阻塞狀態。select的缺點在於單個進程能夠監視的文件描述符的數量存在最大限制,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其復制的的開銷也線性增長。同時,由於網絡響應時間的延遲使得大量的tcp鏈接處於非常活躍狀態,但調用select()會對所有的socket進行一次線性掃描,所以這也浪費了一定的開銷。不過它的好處還有就是它的跨平台特性。
二、 異步IO的epoll
epoll的優點就是完全的異步,你只需要對其中 poll 函數注冊相應的 socket 和事件,就可以完全不管。當有時間發生時,數據已經從內核態拷貝到用戶態,也就是完全沒有阻塞。
三、基於epoll的聊天室程序
說了這么多,我決定還是用epoll寫一個多人聊天程序。epoll可以支持大量連接,select卻有限制,所以這就是我決定用epoll的原因。首先看服務器程序:
1 import socket, select 2 # 服務端 3 4 serverSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 5 serverSock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) 6 serverSock.bind(('127.0.0.1', 8001)) 7 serverSock.listen(5) 8 serverSock.setblocking(False) 9 10 EOL = bytes('\n\n', 'utf-8') 11 QUIT = bytes('\r\n', 'utf-8') 12 epoll = select.epoll() 13 epoll.register(serverSock.fileno(), select.EPOLLIN) 14 print('注冊事件:%s'%serverSock.fileno()) 15 16 try: 17 connections = {}; requests = {}; responses = {} 18 while True: 19 events = epoll.poll(1) 20 for fileno, event in events: 21 # print('event:%s fileno:%s'%(event, fileno)) 22 if fileno == serverSock.fileno(): 23 clientSock, address = serverSock.accept() 24 print('連接到客戶端: %s:%s'%(address[0], address[1])) 25 clientSock.setblocking(False) 26 connections[clientSock.fileno()] = (clientSock, address) 27 epoll.register(clientSock.fileno(), select.EPOLLOUT) # socket只能注冊輸入或輸出一個,不能同時注冊 28 requests[clientSock.fileno()] = bytes('', 'utf-8') 29 responses[clientSock.fileno()] = bytes('你已連接到服務器,IP為{}:{}\n\n'.format(*serverSock.getsockname()), 30 'utf-8') 31 elif event & select.EPOLLIN: 32 requests[fileno] += connections[fileno][0].recv(1024) 33 if requests[fileno].endswith(EOL): 34 msg = str(requests[fileno], 'utf-8') 35 msg = '來自{}的消息:{}'.format(connections[fileno][1], msg[:-2]) 36 requests[fileno] = b'' 37 #print(msg) 38 for i in responses: 39 if i == fileno: 40 continue 41 responses[i] += bytes(msg, 'utf-8') 42 epoll.modify(i, select.EPOLLOUT) 43 if QUIT in requests[fileno]: 44 epoll.modify(fileno, select.EPOLLOUT) 45 46 elif event & select.EPOLLOUT: 47 #print('開始發送消息:%s'%str(responses[fileno], 'utf-8')) 48 bytesSend = connections[fileno][0].send(responses[fileno]) 49 responses[fileno] = responses[fileno][bytesSend:] 50 #print('發送完成') 51 if responses[fileno] == b'': 52 epoll.modify(fileno, select.EPOLLIN) 53 if QUIT in requests[fileno]: 54 epoll.modify(fileno, 0) 55 connections[fileno][0].shutdown(socket.SHUT_RDWR) 56 57 elif event & select.EPOLLHUP: 58 epoll.unregister(fileno) 59 connections[fileno][0].close() 60 del connections[fileno] 61 finally: 62 epoll.unregister(serverSock.fileno()) 63 epoll.close() 64 serverSock.close() 65 print('已退出服務端程序')
注意,我首先定義了兩個終止符:EOL表示這段話已經發完了;QUIT表示客戶端想要退出。客戶端的程序有點讓我為難,既要在命令行輸入又要同時保證能輸出別人發過來的消息,所有我只好用了prompt_toolkit再加上一個線程。如下:
1 import socket, prompt_toolkit, select 2 import threading, queue 3 4 5 class Client: 6 def __init__(self, sock): 7 self.sock = sock 8 self.want_to_send = False 9 self.want_to_recv = True 10 self._msg = queue.Queue() 11 12 def fileno(self): 13 return self.sock.fileno() 14 15 def handle_recv(self): 16 print('接受消息..') 17 msg = self.sock.recv(1024) 18 print(str(msg, 'utf-8')) 19 20 def handle_send(self): 21 msg = self._msg.get() 22 if msg == '\r\n': 23 self.want_to_send = False 24 self.want_to_recv = False 25 self.sock.sendall(bytes(msg, 'utf-8')) 26 self.want_to_send = False 27 28 def handle_sock(want_to_send, want_to_recv, sock): 29 print('開始處理消息...') 30 want_to_recv.append(sock.fileno()) 31 while True: 32 if sock.want_to_send: 33 if not want_to_send: 34 want_to_send.append(myclient.fileno()) 35 else: 36 want_to_send.clear() 37 can_recv, can_send, _ = select.select(want_to_recv, want_to_send, [], 1) 38 if can_recv: 39 sock.handle_recv() 40 if can_send: 41 sock.handle_send() 42 if not (sock.want_to_send or sock.want_to_recv): 43 print('正停止客戶端連接...') 44 break 45 if sock._msg.qsize(): 46 sock.want_to_send = True 47 48 49 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 50 s.connect(('127.0.0.1',8001)) 51 52 myclient = Client(s) 53 want_to_send = [] 54 want_to_recv = [] 55 56 57 58 t = threading.Thread(target=handle_sock, 59 args=(want_to_send, want_to_recv, myclient), 60 daemon=True) 61 t.start() 62 63 try: 64 while True: 65 messages = prompt_toolkit.shortcuts.prompt('\n\n>>> ',patch_stdout=True) 66 myclient._msg.put(messages+'\n\n') 67 except KeyboardInterrupt: 68 myclient._msg.put('\r\n') 69 finally: 70 t.join() 71 myclient.sock.close() 72 print('網絡已斷開')
我的服務器跑在 jupyter 上,客戶端跑在命令行上,效果如下:
客戶端接受和發送消息都是互不影響的,這樣就實現了一個多人聊天的功能。而且服務器使用的是epoll,所以哪怕是成千上萬的人同時在線也沒有任何壓力。至於怎么測試暫時還沒想到辦法。