select原理
網絡通信被Unix系統抽象為文件的讀寫,通常是一個設備,由設備驅動程序提供,驅動可以知道自身的數據是否可用。支持阻塞操作的設備驅動通常會實現一組自身的等待隊列,如讀/寫等待隊列用於支持上層(用戶層)所需的block或non-block操作。設備的文件的資源如果可用(可讀或者可寫)則會通知進程,反之則會讓進程睡眠,等到數據到來可用的時候,再喚醒進程。
這些設備的文件描述符被放在一個數組中,然后select調用的時候遍歷這個數組,如果對於的文件描述符可讀則會返回改文件描述符。當遍歷結束之后,如果仍然沒有一個可用設備文件描述符,select讓用戶進程則會睡眠,直到等待資源可用的時候在喚醒,遍歷之前那個監視的數組。每次遍歷都是線性的。
select方法
Python的select()方法直接調用操作系統的IO接口,它監控sockets,open files, and pipes(所有帶fileno()方法的文件句柄)何時變成readable 和writeable, 或者通信錯誤,select()使得同時監控多個連接變的簡單,並且這比寫一個長循環來等待和監控多客戶端連接要高效,因為select直接通過操作系統提供的C的網絡接口進行操作,而不是通過Python的解釋器。
只支持Unix,不支持Windows
示例
import select
import socket
import sys
import Queue
# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)
# Bind the socket to the port
server_address = ('localhost', 10000)
print >>sys.stderr, 'starting up on %s port %s' % server_address
server.bind(server_address)
# Listen for incoming connections
server.listen(5)
select()方法接收並監控3個通信列表, 第一個是所有的輸入的data,就是指外部發過來的數據,第2個是監控和接收所有要發出去的data(outgoing data),第3個監控錯誤信息,接下來我們需要創建2個列表來包含輸入和輸出信息來傳給select().
readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout)
所有客戶端的進來的連接和數據將會被server的主循環程序放在上面的list中處理,我們現在的server端需要等待連接可寫(writable)之后才能過來,然后接收數據並返回(因此不是在接收到數據之后就立刻返回),因為每個連接要把輸入或輸出的數據先緩存到queue里,然后再由select取出來再發出去
當你把inputs,outputs,exceptional(這里跟inputs共用)傳給select()后,它返回3個新的list,我們上面將他們分別賦值為readable,writable,exceptional, 所有在readable list中的socket連接代表有數據可接收(recv),所有在writable list中的存放着你可以對其進行發送(send)操作的socket連接,當連接通信出現error時會把error寫到exceptional列表中。
Readable list 中的socket 可以有3種可能狀態,第一種是如果這個socket是main "server" socket,它負責監聽客戶端的連接,如果這個main server socket出現在readable里,那代表這是server端已經ready來接收一個新的連接進來了,為了讓這個main server能同時處理多個連接,在下面的代碼里,我們把這個main server的socket設置為非阻塞模式。
# Handle inputs
for s in readable:
if s is server:
# A "readable" server socket is ready to accept a connection
connection, client_address = s.accept()
print >>sys.stderr, 'new connection from', client_address
connection.setblocking(0)
inputs.append(connection)
# Give the connection a queue for data we want to send
message_queues[connection] = Queue.Queue()
第二種情況是這個socket是已經建立了的連接,它把數據發了過來,這個時候你就可以通過recv()來接收它發過來的數據,然后把接收到的數據放到queue里,這樣你就可以把接收到的數據再傳回給客戶端了。
else:
data = s.recv(1024)
if data:
# A readable client socket has data
print >>sys.stderr, 'received "%s" from %s' % (data, s.getpeername())
message_queues[s].put(data)
# Add output channel for response
if s not in outputs:
outputs.append(s)
第三種情況就是這個客戶端已經斷開了,所以你再通過recv()接收到的數據就為空了,所以這個時候你就可以把這個跟客戶端的連接關閉了。
else:
# Interpret empty result as closed connection
print >>sys.stderr, 'closing', client_address, 'after reading no data'
# Stop listening for input on the connection
if s in outputs:
outputs.remove(s) #既然客戶端都斷開了,我就不用再給它返回數據了,所以這時候如果這個客戶端的連接對象還在outputs列表中,就把它刪掉
inputs.remove(s) #inputs中也刪除掉
s.close() #把這個連接關閉掉
# Remove message queue
del message_queues[s]
else:
# Interpret empty result as closed connection
print >>sys.stderr, 'closing', client_address, 'after reading no data'
# Stop listening for input on the connection
if s in outputs:
outputs.remove(s) #既然客戶端都斷開了,我就不用再給它返回數據了,所以這時候如果這個客戶端的連接對象還在outputs列表中,就把它刪掉
inputs.remove(s) #inputs中也刪除掉
s.close() #把這個連接關閉掉
# Remove message queue
del message_queues[s]
對於writable list中的socket,也有幾種狀態,如果這個客戶端連接在跟它對應的queue里有數據,就把這個數據取出來再發回給這個客戶端,否則就把這個連接從output list中移除,這樣下一次循環select()調用時檢測到outputs list中沒有這個連接,那就會認為這個連接還處於非活動狀態
# Handle outputs
for s in writable:
try:
next_msg = message_queues[s].get_nowait()
except Queue.Empty:
# No messages waiting so stop checking for writability.
print >>sys.stderr, 'output queue for', s.getpeername(), 'is empty'
outputs.remove(s)
else:
print >>sys.stderr, 'sending "%s" to %s' % (next_msg, s.getpeername())
s.send(next_msg)
最后,如果在跟某個socket連接通信過程中出了錯誤,就把這個連接對象在inputs\outputs\message_queue中都刪除,再把連接關閉掉
# Handle "exceptional conditions"
for s in exceptional:
print >>sys.stderr, 'handling exceptional condition for', s.getpeername()
# Stop listening for input on the connection
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close()
# Remove message queue
del message_queues[s]
客戶端
下面的這個是客戶端程序展示了如何通過select()對socket進行管理並與多個連接同時進行交互
import socket
import sys
messages = [ 'This is the message. ',
'It will be sent ',
'in parts.',
]
server_address = ('localhost', 10000)
# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
socket.socket(socket.AF_INET, socket.SOCK_STREAM),
]
# Connect the socket to the port where the server is listening
print >>sys.stderr, 'connecting to %s port %s' % server_address
for s in socks:
s.connect(server_address)
接下來通過循環通過每個socket連接給server發送和接收數據
for message in messages:
# Send messages on both sockets
for s in socks:
print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)
s.send(message)
# Read responses on both sockets
for s in socks:
data = s.recv(1024)
print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)
if not data:
print >>sys.stderr, 'closing socket', s.getsockname()
服務端完整代碼
#_*_coding:utf-8_*_
import select
import socket
import sys
import queue
# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
# Bind the socket to the port
server_address = ('localhost', 10000)
print(sys.stderr, 'starting up on %s port %s' % server_address)
server.bind(server_address)
# Listen for incoming connections
server.listen(5)
# Sockets from which we expect to read
inputs = [ server ]
# Sockets to which we expect to write
outputs = [ ]
message_queues = {}
while inputs:
# Wait for at least one of the sockets to be ready for processing
print( '\nwaiting for the next event')
readable, writable, exceptional = select.select(inputs, outputs, inputs)
# Handle inputs
for s in readable:
if s is server:
# A "readable" server socket is ready to accept a connection
connection, client_address = s.accept()
print('new connection from', client_address)
connection.setblocking(False)
inputs.append(connection)
# Give the connection a queue for data we want to send
message_queues[connection] = queue.Queue()
else:
data = s.recv(1024)
if data:
# A readable client socket has data
print(sys.stderr, 'received "%s" from %s' % (data, s.getpeername()) )
message_queues[s].put(data)
# Add output channel for response
if s not in outputs:
outputs.append(s)
else:
# Interpret empty result as closed connection
print('closing', client_address, 'after reading no data')
# Stop listening for input on the connection
if s in outputs:
outputs.remove(s) #既然客戶端都斷開了,我就不用再給它返回數據了,所以這時候如果這個客戶端的連接對象還在outputs列表中,就把它刪掉
inputs.remove(s) #inputs中也刪除掉
s.close() #把這個連接關閉掉
# Remove message queue
del message_queues[s]
# Handle outputs
for s in writable:
try:
next_msg = message_queues[s].get_nowait()
except queue.Empty:
# No messages waiting so stop checking for writability.
print('output queue for', s.getpeername(), 'is empty')
outputs.remove(s)
else:
print( 'sending "%s" to %s' % (next_msg, s.getpeername()))
s.send(next_msg)
# Handle "exceptional conditions"
for s in exceptional:
print('handling exceptional condition for', s.getpeername() )
# Stop listening for input on the connection
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close()
# Remove message queue
del message_queues[s]
客戶端代碼
import socket
import sys
messages = [ 'This is the message. ',
'It will be sent ',
'in parts.',
]
server_address = ('localhost', 10000)
# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
socket.socket(socket.AF_INET, socket.SOCK_STREAM),
]
# Connect the socket to the port where the server is listening
print >>sys.stderr, 'connecting to %s port %s' % server_address
for s in socks:
s.connect(server_address)
for message in messages:
# Send messages on both sockets
for s in socks:
print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)
s.send(message)
# Read responses on both sockets
for s in socks:
data = s.recv(1024)
print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)
if not data:
print >>sys.stderr, 'closing socket', s.getsockname()
s.close()