python之select與selector


select/poll/epoll的區別

I/O多路復用的本質就是用select/poll/epoll,去監聽多個socket對象。

參考:Linux IO模式及 select、poll、epoll詳解

  1. select是不斷輪詢去監聽的socket,socket個數有限制,一般為1024個(文件描述符為1024,該值可以修改);隨着文件描述符數量增加,輪詢一回成本增加。
  2. poll采用輪詢方式監聽,只不過沒有個數限制;
  3. epoll不采用輪詢方式去監聽,而是當socket有變化時通過回調的方式主動告知用戶進程;無最大鏈接數的限制。

水平觸發(Level Triggered),select()和poll()將就緒的文件描述符告訴進程后,如果進程沒有對其進行IO操作,那么下次調用select()和poll()的時候將再次報告這些文件描述符,所以它們一般不會丟失就緒的消息,這種方式稱為。

邊緣觸發(Edge Triggered),只告訴進程哪些文件描述符剛剛變為就緒狀態,它只說一遍,如果我們沒有采取行動,那么它將不會再次告知,這種方式稱為邊緣觸發。

select

參考:https://docs.python.org/3/library/select.html#module-select

參考:https://pymotw.com/2/select/#module-select

Python的select()方法直接調用操作系統的IO接口,它監控sockets,open files, and pipes(所有帶fileno()方法的文件句柄)何時變成readable 和writeable, 或者通信錯誤,select()使得同時監控多個連接變的簡單,並且這比寫一個長循環來等待和監控多客戶端連接要高效,因為select直接通過操作系統提供的C的網絡接口進行操作,而不是通過Python的解釋器。

select使用

  1. 創建兩個列表來表示輸入輸出信息給select: select()方法接收並監控3個通信列表, 第一個是所有的輸入的data,就是指外部發過來的數據,第2個是監控和接收所有要發出去的data,第3個監控錯誤信息;
  2. select()返回3個新的list,分別賦值為readable,writable,exceptional。所有在readable list中的socket連接代表有數據可接收(recv);所有在writable list中的存放着你可以對其進行發送(send)操作的socket連接;當連接通信出現error時會把error寫到exceptional列表中。

Readable list 中的socket 3種可能狀態

  • 第一種是如果這個socket是main "server" socket,它負責監聽客戶端的連接,如果這個main server socket出現在readable里,那代表這是server端已經ready來接收一個新的連接進來了,為了讓這個main server能同時處理多個連接,在下面的代碼里,我們把這個main server的socket設置為非阻塞模式。
  • socket是已經建立了的連接,它把數據發了過來,這個時候你就可以通過recv()來接收它發過來的數據,然后把接收到的數據放到queue里,這樣你就可以把接收到的數據再傳回給客戶端了。
  • 這個客戶端已經斷開了,所以你再通過recv()接收到的數據就為空了,所以這個時候你就可以把這個跟客戶端的連接關閉了。

writable list中的socket

  • 如果這個客戶端連接在跟它對應的queue里有數據,就把這個數據取出來再發回給這個客戶端;
  • 否則就把這個連接從output list中移除,這樣下一次循環select()調用時檢測到outputs list中沒有這個連接,那就會認為這個連接還處於非活動狀態。

server

import socket
import select
import queue


HOST = ('localhost',9999)
server = socket.socket()
server.setblocking(False)
server.bind(HOST)
server.listen(3)

inputs = [server,]
outputs = []
message_queues = {}
print('inputs:',inputs)

while True:
    print('waiting for next event')
    print('***************inputs:', inputs)
    readable, writeable, exeptional = select.select(inputs,outputs,inputs)
    # 如果沒有任何fd就緒,程序一直會阻塞在這
    print('-------------readable:',readable )
    for s in readable:
        if s is server:
            conn,addr = s.accept()
            print('server is %',s)
            print('new connection from %s' % conn)
            conn.setblocking(0)
            inputs.append(conn)
            message_queues[conn] = queue.Queue()
        else:
            data = s.recv(1024)
            if data:
                print('收到來自[%s]的數據:%s' %(s.getpeername()[0],data))
                message_queues[s].put(data)
                if s not in outputs:
                    outputs.append(s)
            else:
                print('客戶端斷開')

                if s in outputs:
                    outputs.remove(s)
                inputs.remove(s)
                del message_queues[s]
    print('++++++++++++=writeabld:',writeable)
    for s in writeable:
        try:
            next_msg = message_queues[s].get_nowait()

        except queue.Empty:
            print("client [%s]" % s.getpeername()[0], "queue is empty..")
            outputs.remove(s)

        else:
            print("sending msg to [%s]" % s.getpeername()[0], next_msg)
            s.send(next_msg.upper())

    for s in exeptional:
        print("handling exception for ", s.getpeername())
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()

        del message_queues[s]

client

import socket
import sys

messages = [ b'This is the message. ',
             b'It will be sent ',
             b'in parts.',
             ]
server_address = ('localhost', 9999)

# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(3)]

# Connect the socket to the port where the server is listening
print('connecting to %s port %s' % server_address)
for s in socks:
    s.connect(server_address)

print('***',socks)
for message in messages:

    # Send messages on both sockets
    for s in socks:
        print('%s: sending "%s"' % (s.getsockname(), message) )
        s.send(message)

    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print( '%s: received "%s"' % (s.getsockname(), data) )
        if not data:
            print(sys.stderr, 'closing socket', s.getsockname() )
 

selector

參考:https://docs.python.org/3/library/selectors.html

  這個模塊允許高級別和高效的I/O多路復用,建立在select模塊之上,相當於epoll。如果不是對精確OS級別的原語控制,推薦使用這個模塊。

  selector定義了一個BaseSelector抽象基類,以及一些具體的實現(KqueueSelector, EpollSelector…),可以用於在多個文件對象上等待I/O准備通知。

  DefaultSelector是當前平台上可用的最高效實現的別名:這應該是大多數用戶的默認選擇。

  支持的文件對象類型取決於平台:在Windows上,支持套接字,但不支持管道,而在Unix上,兩者都得到支持(一些其他類型也可以支持,比如fifos或特殊文件設備)。

  事件:一個位掩碼,指示在給定的文件對象上應該等待哪些I/O事件。它可以是以下模塊常用的組合

EVENT_READ:  Available for read
EVENT_WRITE:Available for write

  class selectors.DefaultSelector                  默認選擇器類,使用當前平台上可用的最有效的實現。

實例

import selectors
import socket

sel = selectors.DefaultSelector()

def accept(sock, mask):              # sock相當於key.fileobj
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)

def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()

sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)

while True:
    events = sel.select()     # 根據系統是否支持,使用epoll還是select,優先epoll。默認阻塞,有活動連接就返回活動的連接列表
    for key, mask in events:
        callback = key.data    # 相當於回調accept
        callback(key.fileobj, mask)  # key.fileobj相當於建立連接后的conn

 


 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM