Python poll IO多路復用


一、poll介紹

poll本質上和select沒有區別,只是沒有了最大連接數(linux上默認1024個)的限制,原因是它基於鏈表存儲的。

本人的另一篇博客講了 python  select : https://www.cnblogs.com/weihengblog/p/9830253.html

二、使用poll編寫SocketServer(本博客代碼需要在linux下運行)

首先我們建立一個服務器端的socket

import select
import socket
import sys
import queue
from queue import Queue


# 創建一個socket連接
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)

# 綁定IP地址和端口號
server_address = ('localhost', 8800)
server.bind(server_address)
print("服務器已啟動http://localhost:8800/")

# 監聽連接數
server.listen(5)

# 消息隊列 用於記錄客戶端發來的消息
message_queues = {}

設置輪詢的超時時間

如果不設置timeout,方法將會阻塞直到對應的poll對象有一個事件發生。

TIMEOUT = 1000 # 設置為1秒

poll的事件類型

POLLIN    Input ready                  有數據讀取
POLLPRI    Priority input ready        有緊急數據讀取
POLLOUT    Able to receive output      准備輸出
POLLERR    Error                       某些錯誤
POLLHUP    Channel closed              掛起
POLLNVAL    Channel not open           無效請求,描述符無法打開

READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR READ_WRITE = READ_ONLY | select.POLLOUT

注冊要監聽文件描述符

首先需要實例化一個poll對象,對要監聽的句柄進行注冊

poller = select.poll()
# 注冊server端socket  要監聽的事件類型為 讀
poller.register(server, READ_ONLY)

文件描述符映射具體的套接字對象

"""
由於poll()返回包含套接字文件描述符和事件標志的元組列表,因此需要從文件描述符號到對象的映射才能從套接字中讀取或寫入該套接字。
"""
fd_to_socket = { server.fileno(): server,}

 

事件輪詢

while True:

    """
    去檢測已經注冊的文件描述符,會返回一個(fd,event)元祖列表
    fd:文件描述符
    event:描述符可能會發生的事件
    如果返回為空的列表,則說明超時且沒有文件描述符有事件發生
    """
    events = poller.poll(TIMEOUT) # 如果timeout為None,將會阻塞,知道有事件發生
    for fd, flag in events:
        # 從文件描述符中檢索實際的套接字
        s = fd_to_socket[fd]

事件類型判斷

        if flag & (select.POLLIN | select.POLLPRI): # 有數據可以讀取

            if s is server: # 表示有新的連接
                # 可以讀取數據
                connection, client_address = s.accept()
                print(sys.stderr, '新的連接來自:', client_address)
                connection.setblocking(0)
                fd_to_socket[connection.fileno()] = connection # 往fd字典中添加一個新的 文件描述符
                poller.register(connection, READ_ONLY)

                message_queues[connection] = Queue() # 為了防止等待客戶端發來數據期間發生阻塞,分配一個隊列用於保存數據
            else: # 表示客戶端傳來了消息

                data = s.recv(1024)
                if data: # 表明數據接受成功

                    print(sys.stderr, '接受數據 "%s" 來自 %s' % (data, s.getpeername()))
                    message_queues[s].put(data)
                    # 修改一個已經存在的fd,修改事件為寫。這里表示服務器向客戶端要發送數據
                    poller.modify(s, READ_WRITE)
                else:
                    # 如果沒有接受到數據,表示要斷開連接
                    print(sys.stderr, '關閉', client_address, '並未讀取到數據')
                    # 停止監聽連接上的輸入
                    poller.unregister(s)
                    s.close()

                    # 將此鏈接從隊列中刪除
                    del message_queues[s]

        elif flag & select.POLLHUP:
            print(sys.stderr, '關閉', client_address, '收到HUP后')
            poller.unregister(s)
            s.close()

        elif flag & select.POLLOUT:
            try:
                next_msg = message_queues[s].get_nowait()
            except queue.Empty:
                print(sys.stderr, '隊列', s.getpeername(), '為空')
                poller.modify(s, READ_ONLY)
            else:
                print(sys.stderr, '發送 "%s" 到 %s' % (next_msg, s.getpeername()))
                s.send(next_msg)

        elif flag & select.POLLERR:
            print(sys.stderr, '異常信息:', s.getpeername())
            poller.unregister(s)
            s.close()
            del message_queues[s]

三、完整代碼示例

server端:

import select
import socket
import sys
import queue
from queue import Queue


# 創建一個socket連接
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)

# 綁定IP地址和端口號
server_address = ('localhost', 8800)
server.bind(server_address)
print("服務器已啟動http://localhost:8800/")

# 監聽連接數
server.listen(5)

# 消息隊列
message_queues = {}

"""
POLLIN    Input ready      有數據讀取
POLLPRI    Priority input ready   有緊急數據讀取
POLLOUT    Able to receive output  准備輸出
POLLERR    Error   某些錯誤
POLLHUP    Channel closed   掛起
POLLNVAL    Channel not open  無效請求,描述符無法打開
"""
# 常用的標識  代表你想檢查的事件類型
READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR
READ_WRITE = READ_ONLY | select.POLLOUT

TIMEOUT = 1000
poller = select.poll() # 創建一個poll對象,該對象可以注冊或注銷文件描述符

# 注冊一個文件描述符,可以通過poll()方法來檢查是否有對應的IO事件發生
# 接受兩個參數, fd  和  eventmask
poller.register(server,READ_ONLY)
fd_to_socket = { server.fileno(): server,}
# 服務器的循環調用poll(),然后通過查找套接字並根據事件中的標志采取行動來處理返回的“事件”。
while True:

    """
    去檢測已經注冊的文件描述符,會返回一個(fd,event)元祖列表
    fd:文件描述符
    event:描述符可能會發生的事件
    如果返回為空的列表,則說明超時且沒有文件描述符有事件發生
    """
    events = poller.poll(TIMEOUT) # 如果timeout為None,將會阻塞,知道有事件發生
    for fd, flag in events:
        # 從文件描述符中檢索實際的套接字
        s = fd_to_socket[fd]

        if flag & (select.POLLIN | select.POLLPRI): # 有數據可以讀取

            if s is server: # 表示有新的連接
                # 可以讀取數據
                connection, client_address = s.accept()
                print(sys.stderr, '新的連接來自:', client_address)
                connection.setblocking(0)
                fd_to_socket[connection.fileno()] = connection # 往fd字典中添加一個新的 文件描述符
                poller.register(connection, READ_ONLY)

                message_queues[connection] = Queue() # 為了防止等待客戶端發來數據期間發生阻塞,分配一個隊列用於保存數據
            else: # 表示客戶端傳來了消息

                data = s.recv(1024)
                if data: # 表明數據接受成功

                    print(sys.stderr, '接受數據 "%s" 來自 %s' % (data, s.getpeername()))
                    message_queues[s].put(data)
                    # 修改一個已經存在的fd,修改事件為寫。這里表示服務器向客戶端要發送數據
                    poller.modify(s, READ_WRITE)
                else:
                    # 如果沒有接受到數據,表示要斷開連接
                    print(sys.stderr, '關閉', client_address, '並未讀取到數據')
                    # 停止監聽連接上的輸入
                    poller.unregister(s)
                    s.close()

                    # 將此鏈接從隊列中刪除
                    del message_queues[s]

        elif flag & select.POLLHUP:
            print(sys.stderr, '關閉', client_address, '收到HUP后')
            poller.unregister(s)
            s.close()

        elif flag & select.POLLOUT:
            try:
                next_msg = message_queues[s].get_nowait()
            except queue.Empty:
                print(sys.stderr, '隊列', s.getpeername(), '為空')
                poller.modify(s, READ_ONLY)
            else:
                print(sys.stderr, '發送 "%s" 到 %s' % (next_msg, s.getpeername()))
                s.send(next_msg)

        elif flag & select.POLLERR:
            print(sys.stderr, '異常信息:', s.getpeername())
            poller.unregister(s)
            s.close()
            del message_queues[s]

client端:

import socket


client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('localhost',8800))

while True:
    msg = bytes(input("<<<"),encoding='utf-8')
    client.sendall(msg)

    data = client.recv(1024)

    print("{}".format(data))

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM