I/O 多路復用
I/O多路復用指:通過一種機制,可以監視多個描述符,一旦某個描述符就緒(一般是讀就緒或者寫就緒),能夠通知程序進行相應的讀寫操作。
IO多路復用是指內核一旦發現進程指定的一個或者多個IO條件准備讀取,它就通知該進程。IO多路復用適用如下場合:
- 當客戶處理多個描述字時(一般是交互式輸入和網絡套接口),必須使用I/O復用。
- 當一個客戶同時處理多個套接口時,而這種情況是可能的,但很少出現。
- 如果一個TCP服務器既要處理監聽套接口,又要處理已連接套接口,一般也要用到I/O復用。
- 如果一個服務器即要處理TCP,又要處理UDP,一般要使用I/O復用。
- 如果一個服務器要處理多個服務或多個協議,一般要使用I/O復用。
與多進程和多線程技術相比,I/O多路復用技術的最大優勢是系統開銷小,系統不必創建進程/線程,也不必維護這些進程/線程,從而大大減小了系統的開銷。
Linux
Linux中的 select,poll,epoll 都是IO多路復用的機制。

1 select 2 3 select最早於1983年出現在4.2BSD中,它通過一個select()系統調用來監視多個文件描述符的數組,當select()返回后,該數組中就緒的文件描述符便會被內核修改標志位,使得進程可以獲得這些文件描述符從而進行后續的讀寫操作。 4 select目前幾乎在所有的平台上支持,其良好跨平台支持也是它的一個優點,事實上從現在看來,這也是它所剩不多的優點之一。 5 select的一個缺點在於單個進程能夠監視的文件描述符的數量存在最大限制,在Linux上一般為1024,不過可以通過修改宏定義甚至重新編譯內核的方式提升這一限制。 6 另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其復制的開銷也線性增長。同時,由於網絡響應時間的延遲使得大量TCP連接處於非活躍狀態,但調用select()會對所有socket進行一次線性掃描,所以這也浪費了一定的開銷。 7 8 poll 9 10 poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差別,但是poll沒有最大文件描述符數量的限制。 11 poll和select同樣存在一個缺點就是,包含大量文件描述符的數組被整體復制於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增加而線性增大。 12 另外,select()和poll()將就緒的文件描述符告訴進程后,如果進程沒有對其進行IO操作,那么下次調用select()和poll()的時候將再次報告這些文件描述符,所以它們一般不會丟失就緒的消息,這種方式稱為水平觸發(Level Triggered)。 13 14 epoll 15 16 直到Linux2.6才出現了由內核直接支持的實現方法,那就是epoll,它幾乎具備了之前所說的一切優點,被公認為Linux2.6下性能最好的多路I/O就緒通知方法。 17 epoll可以同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變為就緒狀態,它只說一遍,如果我們沒有采取行動,那么它將不會再次告知,這種方式稱為邊緣觸發),理論上邊緣觸發的性能要更高一些,但是代碼實現相當復雜。 18 epoll同樣只告知那些就緒的文件描述符,而且當我們調用epoll_wait()獲得就緒文件描述符時,返回的不是實際的描述符,而是一個代表就緒描述符數量的值,你只需要去epoll指定的一個數組中依次取得相應數量的文件描述符即可,這里也使用了內存映射(mmap)技術,這樣便徹底省掉了這些文件描述符在系統調用時復制的開銷。 19 另一個本質的改進在於epoll采用基於事件的就緒通知方式。在select/poll中,進程只有在調用一定的方法后,內核才對所有監視的文件描述符進行掃描,而epoll事先通過epoll_ctl()來注冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會采用類似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便得到通知。
Python
Python中有一個select模塊,其中提供了:select、poll、epoll三個方法,分別調用系統的 select,poll,epoll 從而實現IO多路復用。
1
2
3
4
5
6
|
Windows Python:
提供: select
Mac Python:
提供: select
Linux Python:
提供: select、poll、epoll
|
注意:網絡操作、文件操作、終端操作等均屬於IO操作,對於windows只支持Socket操作,其他系統支持其他IO操作,但是無法檢測 普通文件操作 自動上次讀取是否已經變化。
對於select方法:
1
2
3
4
5
6
7
8
9
10
11
|
句柄列表
11
, 句柄列表
22
, 句柄列表
33
=
select.select(句柄序列
1
, 句柄序列
2
, 句柄序列
3
, 超時時間)
參數: 可接受四個參數(前三個必須)
返回值:三個列表
select方法用來監視文件句柄,如果句柄發生變化,則獲取該句柄。
1
、當 參數
1
序列中的句柄發生可讀時(accetp和read),則獲取發生變化的句柄並添加到 返回值
1
序列中
2
、當 參數
2
序列中含有句柄時,則將該序列中所有的句柄添加到 返回值
2
序列中
3
、當 參數
3
序列中的句柄發生錯誤時,則將該發生錯誤的句柄添加到 返回值
3
序列中
4
、當 超時時間 未設置,則select會一直阻塞,直到監聽的句柄發生變化
當 超時時間 =
1
時,那么如果監聽的句柄均無任何變化,則select會阻塞
1
秒,之后返回三個空列表,如果監聽的句柄有變化,則直接執行。
|

#!/usr/bin/env python # -*- coding:utf-8 -*- import select import threading import sys while True: readable, writeable, error = select.select([sys.stdin,],[],[],1) if sys.stdin in readable: print 'select get stdin',sys.stdin.readline() 利用select監聽終端操作實例

1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import socket 5 import select 6 7 sk1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 8 sk1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 9 sk1.bind(('127.0.0.1',8002)) 10 sk1.listen(5) 11 sk1.setblocking(0) 12 13 inputs = [sk1,] 14 15 while True: 16 readable_list, writeable_list, error_list = select.select(inputs, [], inputs, 1) 17 for r in readable_list: 18 # 當客戶端第一次連接服務端時 19 if sk1 == r: 20 print 'accept' 21 request, address = r.accept() 22 request.setblocking(0) 23 inputs.append(request) 24 # 當客戶端連接上服務端之后,再次發送數據時 25 else: 26 received = r.recv(1024) 27 # 當正常接收客戶端發送的數據時 28 if received: 29 print 'received data:', received 30 # 當客戶端關閉程序時 31 else: 32 inputs.remove(r) 33 34 sk1.close() 35 36 利用select實現偽同時處理多個Socket客戶端請求:服務端

1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import socket 4 5 ip_port = ('127.0.0.1',8002) 6 sk = socket.socket() 7 sk.connect(ip_port) 8 9 while True: 10 inp = raw_input('please input:') 11 sk.sendall(inp) 12 sk.close() 13 14 利用select實現偽同時處理多個Socket客戶端請求:客戶端
此處的Socket服務端相比與原生的Socket,他支持當某一個請求不再發送數據時,服務器端不會等待而是可以去處理其他請求的數據。但是,如果每個請求的耗時比較長時,select版本的服務器端也無法完成同時操作。

1 #!/usr/bin/env python 2 #coding:utf8 3 4 ''' 5 服務器的實現 采用select的方式 6 ''' 7 8 import select 9 import socket 10 import sys 11 import Queue 12 13 #創建套接字並設置該套接字為非阻塞模式 14 15 server = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 16 server.setblocking(0) 17 18 #綁定套接字 19 server_address = ('localhost',10000) 20 print >>sys.stderr,'starting up on %s port %s'% server_address 21 server.bind(server_address) 22 23 #將該socket變成服務模式 24 #backlog等於5,表示內核已經接到了連接請求,但服務器還沒有調用accept進行處理的連接個數最大為5 25 #這個值不能無限大,因為要在內核中維護連接隊列 26 27 server.listen(5) 28 29 #初始化讀取數據的監聽列表,最開始時希望從server這個套接字上讀取數據 30 inputs = [server] 31 32 #初始化寫入數據的監聽列表,最開始並沒有客戶端連接進來,所以列表為空 33 34 outputs = [] 35 36 #要發往客戶端的數據 37 message_queues = {} 38 while inputs: 39 print >>sys.stderr,'waiting for the next event' 40 #調用select監聽所有監聽列表中的套接字,並將准備好的套接字加入到對應的列表中 41 readable,writable,exceptional = select.select(inputs,outputs,inputs)#列表中的socket 套接字 如果是文件呢? 42 #監控文件句柄有某一處發生了變化 可寫 可讀 異常屬於Linux中的網絡編程 43 #屬於同步I/O操作,屬於I/O復用模型的一種 44 #rlist--等待到准備好讀 45 #wlist--等待到准備好寫 46 #xlist--等待到一種異常 47 #處理可讀取的套接字 48 49 ''' 50 如果server這個套接字可讀,則說明有新鏈接到來 51 此時在server套接字上調用accept,生成一個與客戶端通訊的套接字 52 並將與客戶端通訊的套接字加入inputs列表,下一次可以通過select檢查連接是否可讀 53 然后在發往客戶端的緩沖中加入一項,鍵名為:與客戶端通訊的套接字,鍵值為空隊列 54 select系統調用是用來讓我們的程序監視多個文件句柄(file descrīptor)的狀態變化的。程序會停在select這里等待, 55 直到被監視的文件句柄有某一個或多個發生了狀態改變 56 ''' 57 58 ''' 59 若可讀的套接字不是server套接字,有兩種情況:一種是有數據到來,另一種是鏈接斷開 60 如果有數據到來,先接收數據,然后將收到的數據填入往客戶端的緩存區中的對應位置,最后 61 將於客戶端通訊的套接字加入到寫數據的監聽列表: 62 如果套接字可讀.但沒有接收到數據,則說明客戶端已經斷開。這時需要關閉與客戶端連接的套接字 63 進行資源清理 64 ''' 65 66 for s in readable: 67 if s is server: 68 connection,client_address = s.accept() 69 print >>sys.stderr,'connection from',client_address 70 connection.setblocking(0)#設置非阻塞 71 inputs.append(connection) 72 message_queues[connection] = Queue.Queue() 73 else: 74 data = s.recv(1024) 75 if data: 76 print >>sys.stderr,'received "%s" from %s'% \ 77 (data,s.getpeername()) 78 message_queues[s].put(data) 79 if s not in outputs: 80 outputs.append(s) 81 else: 82 print >>sys.stderr,'closing',client_address 83 if s in outputs: 84 outputs.remove(s) 85 inputs.remove(s) 86 s.close() 87 del message_queues[s] 88 89 #處理可寫的套接字 90 ''' 91 在發送緩沖區中取出響應的數據,發往客戶端。 92 如果沒有數據需要寫,則將套接字從發送隊列中移除,select中不再監視 93 ''' 94 95 for s in writable: 96 try: 97 next_msg = message_queues[s].get_nowait() 98 99 except Queue.Empty: 100 print >>sys.stderr,' ',s,getpeername(),'queue empty' 101 outputs.remove(s) 102 else: 103 print >>sys.stderr,'sending "%s" to %s'% \ 104 (next_msg,s.getpeername()) 105 s.send(next_msg) 106 107 108 109 #處理異常情況 110 111 for s in exceptional: 112 for s in exceptional: 113 print >>sys.stderr,'exception condition on',s.getpeername() 114 inputs.remove(s) 115 if s in outputs: 116 outputs.remove(s) 117 s.close() 118 del message_queues[s] 119 120 基於select實現socket服務端
select模塊(實現偽並發)
Python中有一個select模塊,其中提供了:select、poll、epoll三個方法,分別調用系統的 select,poll,epoll 從而實現IO多路復用
1
2
3
4
5
6
7
8
|
select 模塊
Windows Python:
提供: select
Mac Python:
提供: select
Linux Python:
提供: select、poll、epoll
|
- select
select最早於1983年出現在4.2BSD中,它通過一個select()系統調用來監視多個文件描述符的數組,當select()返回后,該數組中就緒的文件描述符便會被內核修改標志位,使得進程可以獲得這些文件描述符從而進行后續的讀寫操作。select目前幾乎在所有的平台上支持,其良好跨平台支持也是它的一個優點,事實上從現在看來,這也是它所剩不多的優點之一。select的一個缺點在於單個進程能夠監視的文件描述符的數量存在最大限制,在Linux上一般為1024,不過可以通過修改宏定義甚至重新編譯內核的方式提升這一限制。另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其復制的開銷也線性增長。同時,由於網絡響應時間的延遲使得大量TCP連接處於非活躍狀態,但調用select()會對所有socket進行一次線性掃描,所以這也浪費了一定的開銷。
- poll
poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差別,但是poll沒有最大文件描述符數量的限制。poll和select同樣存在一個缺點就是,包含大量文件描述符的數組被整體復制於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增加而線性增大。另外,select()和poll()將就緒的文件描述符告訴進程后,如果進程沒有對其進行IO操作,那么下次調用select()和poll()的時候將再次報告這些文件描述符,所以它們一般不會丟失就緒的消息,這種方式稱為水平觸發(Level Triggered)。
- epoll
直到Linux2.6才出現了由內核直接支持的實現方法,那就是epoll,它幾乎具備了之前所說的一切優點,被公認為Linux2.6下性能最好的多路I/O就緒通知方法。epoll可以同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變為就緒狀態,它只說一遍,如果我們沒有采取行動,那么它將不會再次告知,這種方式稱為邊緣觸發),理論上邊緣觸發的性能要更高一些,但是代碼實現相當復雜。epoll同樣只告知那些就緒的文件描述符,而且當我們調用epoll_wait()獲得就緒文件描述符時,返回的不是實際的描述符,而是一個代表就緒描述符數量的值,你只需要去epoll指定的一個數組中依次取得相應數量的文件描述符即可,這里也使用了內存映射(mmap)技術,這樣便徹底省掉了這些文件描述符在系統調用時復制的開銷。另一個本質的改進在於epoll采用基於事件的就緒通知方式。在select/poll中,進程只有在調用一定的方法后,內核才對所有監視的文件描述符進行掃描,而epoll事先通過epoll_ctl()來注冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會采用類似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便得到通知。
select.select方法:
select函數需要3個序列作為它的必選參數,此外還有一個可選的以秒單位的超時時間作為第4個參數。3個序列用於輸入、輸出以及異常情況(錯誤);如果沒有給定超時時間,select會阻塞(也就是處於等待狀態),知道其中的一個文件描述符以及為行動做好了准備,如果給定了超時時間,select最多阻塞給定的超時時間,如果超時時間為0,那么就給出一個連續的poll(即不阻塞);select的返回值是3個序列,每個代表相應參數的一個活動子集。第一個序列用於監聽socket對象內部是否發生變化,如果有變化表示有新的連接,下面直接看程序代碼
select.select偽並發程序服務端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
import
socket
import
select
sk
=
socket.socket()
sk.bind((
'127.0.0.1'
,
8002
))
sk.listen(
5
)
sk.setblocking(
0
)
#不阻塞
inputs
=
[sk,]
messages
=
{}
outputs
=
[]
while
True
:
readable_list, writeable_list, error_list
=
select.select(inputs, outputs, [],
1
)
# readable_list 監聽服務端對象,當inputs列表有變化時,變化的值會賦值給readable_list中
# 如果有新的連接進來,sk會發生變化,此時readable_list—的值為sk
# 如果conn對象發生變化,表示客戶端發送了新的消息過來,此時readable_list的值為客戶端連接
# writeable_lists實現讀寫分離,需要回復信息的conn對象添加到里面
print
(
len
(inputs),
len
(readable_list),
len
(writeable_list),
len
(outputs))
for
r
in
readable_list:
# 當客戶端第一次連接服務端時,未在inputs里
if
r
=
=
sk:
print
(
'accept'
)
conn, address
=
r.accept()
conn.sendall(
"hello"
.encode())
inputs.append(conn)
#添加到inputs
messages[conn]
=
[]
#設置messages key值r為列表
# 當客戶端連接上服務端之后,再次發送數據時,已經存在inputs
else
:
try
:
received
=
r.recv(
1024
)
# 當正常接收客戶端發送的數據時
if
not
received:
raise
Exception(
"斷開連接"
)
else
:
messages[r].append(received)
outputs.append(r)
# 當客戶端關閉程序時
except
Exception as e:
inputs.remove(r)
del
messages[r]
for
w
in
writeable_list:
msg
=
messages[w].pop()
rest
=
msg
+
"response"
.encode()
w.sendall(rest)
outputs.remove(w)
sk.close()
|
select.select偽並發程序客戶端
1
2
3
4
5
6
7
8
9
10
11
12
|
import
socket
sk
=
socket.socket()
sk.connect((
"127.0.0.1"
,
8002
))
print
(sk.recv(
1024
).decode())
while
True
:
command
=
input
(
"--->>>"
)
sk.sendall(command.encode())
res
=
sk.recv(
1024
)
print
(res.decode())
sk.close()
|
select.poll方法:
poll方法使用起來比select簡單。在調用poll時,會得到一個poll對象。然后就可以使用poll的對象的register方法注冊一個文件描述符(或者是帶有fileno方法的對象)。注冊后可以使用unregister方法移出注冊的對象。注冊了一些對象(比如套接字)以后,就可以調用poll方法(帶有一個可選的超時時間參數)並得到一個(fd,event)格式列表(可能為空),其中fd是文件描述符,event則告訴你發生了什么。這是一個位掩碼(bitmask),意思是它是一個整數,這個整數的每個位對應不同的事件。那些不同的事件是select模塊的常量,為了驗證是否設置了一個定位(也就是說,一個給定的事件是否發生了),可以使用按位與操作符(&):if event & select.POLLIN
select模塊中的polling事件常量:
1
2
3
4
5
6
7
8
|
事件名 描述
POLLIN 讀取來自文件描述符的數據
POLLPRT 讀取來自文件描述符的緊急數據
POLLOUT 文件描述符已經准備好數據,寫入時不會發生阻塞
POLLERR 與文件描述符有關的錯誤情況
POLLHUP 掛起,連接丟失
POLLNVAL 無效請求,連接沒有打開
|
poll的簡單程序服務端(linux)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
#poll 異步I/O
import
socket,select
s
=
socket.socket()
host
=
"127.0.0.1"
port
=
8002
s.bind((host,port))
fdmap
=
{s.fileno():s}
#文件描述符到套接字對象的映射
s.listen(
5
)
p
=
select.poll()
#poll對象
p.register(s)
#注冊一個文件描述符(帶有fileno方法的對象)
while
True
:
events
=
p.poll()
for
fd,event
in
events:
if
fd
=
=
s.fileno():
#新的連接進來
c,addr
=
s.accept()
print
(
"Got connectins from"
,addr)
p.register(c)
#注冊一個文件描述符(帶有fileno方法的對象)
fdmap[c.fileno()]
=
c
#添加到fdmap
elif
event & select.POLLIN:
#讀取來自文件描述符的數據
data
=
fdmap[fd].recv(
1024
)
if
not
data:
#表示客戶端斷開
print
(fdmap[fd].getpeername(),
"disconnected"
)
p.unregister(fd)
#清除文件描述符
del
fdmap[fd]
#刪除fdmap對應的key值
else
:
print
(data.decode())
|
poll程序客戶端
1
2
3
4
5
6
7
8
9
10
11
|
#poll 異步I/O
import
socket
sk
=
socket.socket()
sk.connect((
"127.0.0.1"
,
8002
))
while
True
:
command
=
input
(
"--->>>"
)
sk.sendall(command.encode())
sk.close()
|
epoll方法:
epoll是在2.6內核中提出的,是之前的select和poll的增強版本。相對於select和poll來說,epoll更加靈活,沒有描述符限制。epoll使用一個文件描述符管理多個描述符,將用戶關系的文件描述符的事件存放到內核的一個事件表中,這樣在用戶空間和內核空間的copy只需一次。
一 epoll操作過程
epoll操作過程需要三個接口,分別如下:
1
2
3
|
int
epoll_create(
int
size);
//創建一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大
int
epoll_ctl(
int
epfd,
int
op,
int
fd,
struct
epoll_event *event);
int
epoll_wait(
int
epfd,
struct
epoll_event * events,
int
maxevents,
int
timeout);
|
1. int epoll_create(int size);
創建一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大,這個參數不同於select()中的第一個參數,給出最大監聽的fd+1的值,參數size並不是限制了epoll所能監聽的描述符最大個數,只是對內核初始分配內部數據結構的一個建議
。
當創建好epoll句柄后,它就會占用一個fd值,在linux下如果查看/proc/進程id/fd/,是能夠看到這個fd的,所以在使用完epoll后,必須調用close()關閉,否則可能導致fd被耗盡。
2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
函數是對指定描述符fd執行op操作。
- epfd:是epoll_create()的返回值。
- op:表示op操作,用三個宏來表示:添加EPOLL_CTL_ADD,刪除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分別添加、刪除和修改對fd的監聽事件。
- fd:是需要監聽的fd(文件描述符)
- epoll_event:是告訴內核需要監聽什么事
3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待epfd上的io事件,最多返回maxevents個事件。
參數events用來從內核得到事件的集合,maxevents告之內核這個events有多大,這個maxevents的值不能大於創建epoll_create()時的size,參數timeout是超時時間(毫秒,0會立即返回,-1將不確定,也有說法說是永久阻塞)。該函數返回需要處理的事件數目,如返回0表示已超時。

1 #_*_coding:utf-8_*_ 2 __author__ = 'Alex Li' 3 4 import socket, logging 5 import select, errno 6 7 logger = logging.getLogger("network-server") 8 9 def InitLog(): 10 logger.setLevel(logging.DEBUG) 11 12 fh = logging.FileHandler("network-server.log") 13 fh.setLevel(logging.DEBUG) 14 ch = logging.StreamHandler() 15 ch.setLevel(logging.ERROR) 16 17 formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") 18 ch.setFormatter(formatter) 19 fh.setFormatter(formatter) 20 21 logger.addHandler(fh) 22 logger.addHandler(ch) 23 24 25 if __name__ == "__main__": 26 InitLog() 27 28 try: 29 # 創建 TCP socket 作為監聽 socket 30 listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) 31 except socket.error as msg: 32 logger.error("create socket failed") 33 34 try: 35 # 設置 SO_REUSEADDR 選項 36 listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 37 except socket.error as msg: 38 logger.error("setsocketopt SO_REUSEADDR failed") 39 40 try: 41 # 進行 bind -- 此處未指定 ip 地址,即 bind 了全部網卡 ip 上 42 listen_fd.bind(('', 2003)) 43 except socket.error as msg: 44 logger.error("bind failed") 45 46 try: 47 # 設置 listen 的 backlog 數 48 listen_fd.listen(10) 49 except socket.error as msg: 50 logger.error(msg) 51 52 try: 53 # 創建 epoll 句柄 54 epoll_fd = select.epoll() 55 # 向 epoll 句柄中注冊 監聽 socket 的 可讀 事件 56 epoll_fd.register(listen_fd.fileno(), select.EPOLLIN) 57 except select.error as msg: 58 logger.error(msg) 59 60 connections = {} 61 addresses = {} 62 datalist = {} 63 while True: 64 # epoll 進行 fd 掃描的地方 -- 未指定超時時間則為阻塞等待 65 epoll_list = epoll_fd.poll() 66 67 for fd, events in epoll_list: 68 # 若為監聽 fd 被激活 69 if fd == listen_fd.fileno(): 70 # 進行 accept -- 獲得連接上來 client 的 ip 和 port,以及 socket 句柄 71 conn, addr = listen_fd.accept() 72 logger.debug("accept connection from %s, %d, fd = %d" % (addr[0], addr[1], conn.fileno())) 73 # 將連接 socket 設置為 非阻塞 74 conn.setblocking(0) 75 # 向 epoll 句柄中注冊 連接 socket 的 可讀 事件 76 epoll_fd.register(conn.fileno(), select.EPOLLIN | select.EPOLLET) 77 # 將 conn 和 addr 信息分別保存起來 78 connections[conn.fileno()] = conn 79 addresses[conn.fileno()] = addr 80 elif select.EPOLLIN & events: 81 # 有 可讀 事件激活 82 datas = '' 83 while True: 84 try: 85 # 從激活 fd 上 recv 10 字節數據 86 data = connections[fd].recv(10) 87 # 若當前沒有接收到數據,並且之前的累計數據也沒有 88 if not data and not datas: 89 # 從 epoll 句柄中移除該 連接 fd 90 epoll_fd.unregister(fd) 91 # server 側主動關閉該 連接 fd 92 connections[fd].close() 93 logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1])) 94 break 95 else: 96 # 將接收到的數據拼接保存在 datas 中 97 datas += data 98 except socket.error as msg: 99 # 在 非阻塞 socket 上進行 recv 需要處理 讀穿 的情況 100 # 這里實際上是利用 讀穿 出 異常 的方式跳到這里進行后續處理 101 if msg.errno == errno.EAGAIN: 102 logger.debug("%s receive %s" % (fd, datas)) 103 # 將已接收數據保存起來 104 datalist[fd] = datas 105 # 更新 epoll 句柄中連接d 注冊事件為 可寫 106 epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT) 107 break 108 else: 109 # 出錯處理 110 epoll_fd.unregister(fd) 111 connections[fd].close() 112 logger.error(msg) 113 break 114 elif select.EPOLLHUP & events: 115 # 有 HUP 事件激活 116 epoll_fd.unregister(fd) 117 connections[fd].close() 118 logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1])) 119 elif select.EPOLLOUT & events: 120 # 有 可寫 事件激活 121 sendLen = 0 122 # 通過 while 循環確保將 buf 中的數據全部發送出去 123 while True: 124 # 將之前收到的數據發回 client -- 通過 sendLen 來控制發送位置 125 sendLen += connections[fd].send(datalist[fd][sendLen:]) 126 # 在全部發送完畢后退出 while 循環 127 if sendLen == len(datalist[fd]): 128 break 129 # 更新 epoll 句柄中連接 fd 注冊事件為 可讀 130 epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLET) 131 else: 132 # 其他 epoll 事件不進行處理 133 continue 134 135 epoll socket echo server 136 137 epoll socket echo server
selectors模塊
selectors模塊已經封裝了epoll,select方法;epoll優先級大於select
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
import
selectors
import
socket
sel
=
selectors.DefaultSelector()
def
accept(sock, mask):
conn, addr
=
sock.accept()
# Should be ready
print
(
'accepted'
, conn,
'from'
, addr)
conn.setblocking(
False
)
sel.register(conn, selectors.EVENT_READ, read)
def
read(conn, mask):
data
=
conn.recv(
1000
)
# Should be ready
if
data:
print
(
'echoing'
,
repr
(data),
'to'
, conn)
conn.send(data)
# Hope it won't block
else
:
print
(
'closing'
, conn)
sel.unregister(conn)
conn.close()
sock
=
socket.socket()
sock.bind((
'localhost'
,
10000
))
sock.listen(
100
)
sock.setblocking(
False
)
sel.register(sock, selectors.EVENT_READ, accept)
while
True
:
events
=
sel.select()
for
key, mask
in
events:
callback
=
key.data
callback(key.fileobj, mask)
|