一、poll介紹
poll本質上和select沒有區別,只是沒有了最大連接數(linux上默認1024個)的限制,原因是它基於鏈表存儲的。
本人的另一篇博客講了 python select : https://www.cnblogs.com/weihengblog/p/9830253.html
二、使用poll編寫SocketServer(本博客代碼需要在linux下運行)
首先我們建立一個服務器端的socket
import select import socket import sys import queue from queue import Queue # 創建一個socket連接 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(False) # 綁定IP地址和端口號 server_address = ('localhost', 8800) server.bind(server_address) print("服務器已啟動http://localhost:8800/") # 監聽連接數 server.listen(5) # 消息隊列 用於記錄客戶端發來的消息 message_queues = {}
設置輪詢的超時時間
如果不設置timeout,方法將會阻塞直到對應的poll對象有一個事件發生。
TIMEOUT = 1000 # 設置為1秒
poll的事件類型
POLLIN Input ready 有數據讀取 POLLPRI Priority input ready 有緊急數據讀取 POLLOUT Able to receive output 准備輸出 POLLERR Error 某些錯誤 POLLHUP Channel closed 掛起 POLLNVAL Channel not open 無效請求,描述符無法打開
READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR READ_WRITE = READ_ONLY | select.POLLOUT
注冊要監聽文件描述符
首先需要實例化一個poll對象,對要監聽的句柄進行注冊
poller = select.poll() # 注冊server端socket 要監聽的事件類型為 讀 poller.register(server, READ_ONLY)
文件描述符映射具體的套接字對象
""" 由於poll()返回包含套接字文件描述符和事件標志的元組列表,因此需要從文件描述符號到對象的映射才能從套接字中讀取或寫入該套接字。 """ fd_to_socket = { server.fileno(): server,}
事件輪詢
while True: """ 去檢測已經注冊的文件描述符,會返回一個(fd,event)元祖列表 fd:文件描述符 event:描述符可能會發生的事件 如果返回為空的列表,則說明超時且沒有文件描述符有事件發生 """ events = poller.poll(TIMEOUT) # 如果timeout為None,將會阻塞,知道有事件發生 for fd, flag in events: # 從文件描述符中檢索實際的套接字 s = fd_to_socket[fd]
事件類型判斷
if flag & (select.POLLIN | select.POLLPRI): # 有數據可以讀取 if s is server: # 表示有新的連接 # 可以讀取數據 connection, client_address = s.accept() print(sys.stderr, '新的連接來自:', client_address) connection.setblocking(0) fd_to_socket[connection.fileno()] = connection # 往fd字典中添加一個新的 文件描述符 poller.register(connection, READ_ONLY) message_queues[connection] = Queue() # 為了防止等待客戶端發來數據期間發生阻塞,分配一個隊列用於保存數據 else: # 表示客戶端傳來了消息 data = s.recv(1024) if data: # 表明數據接受成功 print(sys.stderr, '接受數據 "%s" 來自 %s' % (data, s.getpeername())) message_queues[s].put(data) # 修改一個已經存在的fd,修改事件為寫。這里表示服務器向客戶端要發送數據 poller.modify(s, READ_WRITE) else: # 如果沒有接受到數據,表示要斷開連接 print(sys.stderr, '關閉', client_address, '並未讀取到數據') # 停止監聽連接上的輸入 poller.unregister(s) s.close() # 將此鏈接從隊列中刪除 del message_queues[s] elif flag & select.POLLHUP: print(sys.stderr, '關閉', client_address, '收到HUP后') poller.unregister(s) s.close() elif flag & select.POLLOUT: try: next_msg = message_queues[s].get_nowait() except queue.Empty: print(sys.stderr, '隊列', s.getpeername(), '為空') poller.modify(s, READ_ONLY) else: print(sys.stderr, '發送 "%s" 到 %s' % (next_msg, s.getpeername())) s.send(next_msg) elif flag & select.POLLERR: print(sys.stderr, '異常信息:', s.getpeername()) poller.unregister(s) s.close() del message_queues[s]
三、完整代碼示例
server端:
import select import socket import sys import queue from queue import Queue # 創建一個socket連接 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(False) # 綁定IP地址和端口號 server_address = ('localhost', 8800) server.bind(server_address) print("服務器已啟動http://localhost:8800/") # 監聽連接數 server.listen(5) # 消息隊列 message_queues = {} """ POLLIN Input ready 有數據讀取 POLLPRI Priority input ready 有緊急數據讀取 POLLOUT Able to receive output 准備輸出 POLLERR Error 某些錯誤 POLLHUP Channel closed 掛起 POLLNVAL Channel not open 無效請求,描述符無法打開 """ # 常用的標識 代表你想檢查的事件類型 READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR READ_WRITE = READ_ONLY | select.POLLOUT TIMEOUT = 1000 poller = select.poll() # 創建一個poll對象,該對象可以注冊或注銷文件描述符 # 注冊一個文件描述符,可以通過poll()方法來檢查是否有對應的IO事件發生 # 接受兩個參數, fd 和 eventmask poller.register(server,READ_ONLY) fd_to_socket = { server.fileno(): server,} # 服務器的循環調用poll(),然后通過查找套接字並根據事件中的標志采取行動來處理返回的“事件”。 while True: """ 去檢測已經注冊的文件描述符,會返回一個(fd,event)元祖列表 fd:文件描述符 event:描述符可能會發生的事件 如果返回為空的列表,則說明超時且沒有文件描述符有事件發生 """ events = poller.poll(TIMEOUT) # 如果timeout為None,將會阻塞,知道有事件發生 for fd, flag in events: # 從文件描述符中檢索實際的套接字 s = fd_to_socket[fd] if flag & (select.POLLIN | select.POLLPRI): # 有數據可以讀取 if s is server: # 表示有新的連接 # 可以讀取數據 connection, client_address = s.accept() print(sys.stderr, '新的連接來自:', client_address) connection.setblocking(0) fd_to_socket[connection.fileno()] = connection # 往fd字典中添加一個新的 文件描述符 poller.register(connection, READ_ONLY) message_queues[connection] = Queue() # 為了防止等待客戶端發來數據期間發生阻塞,分配一個隊列用於保存數據 else: # 表示客戶端傳來了消息 data = s.recv(1024) if data: # 表明數據接受成功 print(sys.stderr, '接受數據 "%s" 來自 %s' % (data, s.getpeername())) message_queues[s].put(data) # 修改一個已經存在的fd,修改事件為寫。這里表示服務器向客戶端要發送數據 poller.modify(s, READ_WRITE) else: # 如果沒有接受到數據,表示要斷開連接 print(sys.stderr, '關閉', client_address, '並未讀取到數據') # 停止監聽連接上的輸入 poller.unregister(s) s.close() # 將此鏈接從隊列中刪除 del message_queues[s] elif flag & select.POLLHUP: print(sys.stderr, '關閉', client_address, '收到HUP后') poller.unregister(s) s.close() elif flag & select.POLLOUT: try: next_msg = message_queues[s].get_nowait() except queue.Empty: print(sys.stderr, '隊列', s.getpeername(), '為空') poller.modify(s, READ_ONLY) else: print(sys.stderr, '發送 "%s" 到 %s' % (next_msg, s.getpeername())) s.send(next_msg) elif flag & select.POLLERR: print(sys.stderr, '異常信息:', s.getpeername()) poller.unregister(s) s.close() del message_queues[s]
client端:
import socket client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect(('localhost',8800)) while True: msg = bytes(input("<<<"),encoding='utf-8') client.sendall(msg) data = client.recv(1024) print("{}".format(data))
