一、理解操作系統
操作系統(OS
)統管了計算機的所有硬件,並負責為應用程序分配和回收硬件資源。
硬件資源總是有限的,而應用程序對資源的欲望都是貪婪的。
當多個應用程序發生硬件資源爭奪時,OS
負責出面調度,保證多任務的資源分配以保證系統穩定執行。
只有CPU
可以執行代碼,所以應用程序(任務)執行前,必須申請到CPU
資源,同一時刻,一個CPU
只能執行一個任務代碼。
計算機的CPU
數量(資源方)遠遠小於需要執行的任務數(需求方),操作系統將CPU
的資源按照時間片划分,並根據任務類型分配,各任務輪流使用CPU
。
CPU
的執行/切換速度非常快,對於用戶而言,多任務看上去就像同時執行一樣,此稱為並發。
如下是串行和並發的對比:
計算機的內存、硬盤、網卡、屏幕、鍵盤等硬件提供了數據交換的場所。
OS
提供了IO
接口以實現數據交換,數據交換的過程一般不需要CPU
的參與。
IO
接口有兩種類型:
1、阻塞型IO
發生IO
(數據交換)的時候,調用線程無法向下執行剩余代碼,意圖占用CPU
但不執行任何代碼,單線程阻塞型IO自身無法支持並發
2、非阻塞型IO
發生IO
(數據交換)的時候,調用線程可以向下執行剩余代碼,單線程非阻塞型IO自身可以支持並發
如下是阻塞型IO和非阻塞型IO的對比:
二、任務類型
根據一個任務執行期間占用CPU
的比例來划分,有兩種類型:
1、CPU密集型
絕大部分時間都是占用CPU
並執行代碼,比如科學計算任務
2、IO密集型
絕大部分時間都未占用CPU
,而是在發生IO
操作,比如網絡服務
三、Socket模塊
OS
提供了阻塞IO和非阻塞IO兩種類型的接口,應用程序可以自行選擇。
Socket
模塊封裝了兩種接口,Socket
模塊提供的函數默認是阻塞IO類型。
用戶可以選擇手工切換至非阻塞IO類型,使用socketobj.setblocking(False)
切換至非阻塞IO模式。
下面將通過一個簡單的例子程序來記錄對並發的學習思考及總結。
四、一個簡單的C/S程序
客戶端:循環接收用戶的輸入,並發送給服務器。從服務器接收反饋並打印至屏幕。
服務器:將接收到的用戶輸入,變成大寫並返回給客戶端。
客戶端代碼固定,主要思考服務器端的代碼。
一般我們會這樣寫服務端代碼:
# 服務器端
import socket
addr = ('127.0.0.1', 8080)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(addr)
server.listen(5)
print('監聽中...')
while True: # 鏈接循環
conn, client = server.accept()
print(f'一個客戶端上線 -> {client}')
while True: # 消息循環
try:
request = conn.recv(1024)
if not request:
break
print(f"request: {request.decode('utf-8')}")
conn.send(request.upper())
except ConnectionResetError as why:
print(f'客戶端丟失,原因是: {why}')
break
conn.close()
客戶端代碼保持不變:
# 客戶端
import socket
addr = ('127.0.0.1', 8080)
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(addr)
print(f'服務器{addr}連接成功')
while True: # 消息循環
inp = input('>>>').strip()
if not inp: continue
try:
client.send(inp.encode('utf-8'))
response = client.recv(1024)
print(response.decode('utf-8'))
except ConnectionResetError as why:
print(f'服務端丟失,原因是: {why}')
break
client.close()
這種形式的編碼我稱為:單線程+阻塞IO+循環串行,有如下幾個特點:
1、編碼簡單,模型簡潔,可讀性強
2、串行提供服務,用戶使用服務器必須一個一個排隊
單一線程的阻塞IO模型是無法支持並發的,如果要支持並發,有如下兩類解決方案。
五、使用阻塞IO實現並發
單線程阻塞IO,本質上是無法實現並發的。因為一旦發生IO阻塞,線程就會阻塞,下方代碼不會繼續執行。如果要使用單線程阻塞IO來實現並發,需要增加線程數目或者進程數目,當某一個線程/進程發生阻塞的時候,由OS
調度至另一個線程/進程執行。
方案一:阻塞IO+多進程
服務器端代碼
import socket
from multiprocessing import Process
def task(conn):
"""通信循環處理函數"""
while True:
try:
request = conn.recv(1024)
if not request:
break
print(f"request: {request.decode('utf-8')}")
conn.send(request.upper())
except ConnectionResetError as why:
print(f'客戶端丟失,原因是: {why}')
break
if __name__ == '__main__': # windows下需要把新建進程寫到main中,不然會報錯
addr = ('127.0.0.1', 8080)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(addr)
server.listen(5)
print('監聽中...')
while True:
conn, client = server.accept()
print(f'一個客戶端上線 -> {client}')
p = Process(target=task, args=(conn,)) # 開啟子進程處理與用戶的消息循環
p.start()
將服務器對用戶的消息循環操作封裝到進程中,單進程依然會發生阻塞。
進程之間的調度交由OS
負責(重要)。
進程太重,創建和銷毀進程都需要比較大的開銷,此外,一台設備所能涵蓋的進程數量非常有限(一般就幾百左右)。
進程之間的切換開銷也不小。
當進程數小於等於CPU
核心數的時候,可以實現真正的並行,當進程數大於CPU
核心的時候,依然以並發執行。
方案二:阻塞IO+多線程
服務器端代碼
import socket
from threading import Thread
def task(conn):
"""通信循環處理函數"""
while True:
try:
request = conn.recv(1024)
if not request:
break
print(f"request: {request.decode('utf-8')}")
conn.send(request.upper())
except ConnectionResetError as why:
print(f'客戶端丟失,原因是: {why}')
break
if __name__ == '__main__':
addr = ('127.0.0.1', 8080)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(addr)
server.listen(5)
print('監聽中...')
while True:
conn, client = server.accept()
print(f'一個客戶端上線 -> {client}')
t = Thread(target=task, args=(conn,)) # 啟動多線程處理與用戶的消息循環
t.start()
將服務器對用戶的操作封裝到線程中,單線程中依然會發生IO阻塞。
線程之間的調度交由OS負責(重要)。
線程較輕,創建和銷毀的開銷都比較小,但是線程數量也不會太大,一台設備一般能容納幾百至上千的線程。
注意:因為CPython的GIL的存在,使用CPython編寫的多線程代碼,只能使用一個CPU核心,換句話說,使用官方的解釋器執行Python多線程代碼,無法並行(單進程中)。
線程之間的切換開銷比較小。
實際上,多線程的最大問題並不是並發數太少,而是數據安全問題。
線程之間共享同一進程的數據,在頻繁發生IO操作的過程中,難免需要修改共享數據,這就需要增加額外的處理,當線程數量大量增加時,如何妥善處理數據安全的問題就會變成主要困難。
阻塞IO模型的思考和總結
1、多線程和多進程都是基於阻塞IO模式提供的並發,兩者編程模型比較簡單,可讀性也很高。
2、如果使用多線程/進程的方案來提供並發,當線程/進程數量不斷增大時,系統穩定性將會下降。雖然可以使用線程/進程池來提供一定的優化,但超過一定數量之后,池子發揮的效果也會越來越小。所以,兩者都無法支持超大規模的並發(如C10M及以上)。
3、線程/進程切換都交由OS
調度,調度策略依據OS
的算法,應用程序無法主動控制,無法針對任務的特性做一些必要的調度算法調整。
4、編碼思維直接、易理解,學習曲線平緩。
5、多線程/進程的方案可以理解為單純的增加資源,如果要想支持超大規模的並發,單純的增加資源的行為並不合理(資源不可能無限或者總得考慮成本以及效率,而且數量越大,原有的缺點就會越凸顯)。
6、另一種解決方案的核心思路是:改變IO模型。
六、使用非阻塞IO實現並發
單線程非阻塞IO模型,本身就直接支持並發,為啥?請回頭看看阻塞IO和非阻塞IO的流程圖片。
非阻塞IO接口的核心是:調用線程一旦向OS
發起IO調用,OS
就直接返回結果,因此,調用線程不會被阻塞而可以執行下方代碼。不過也正因為不會阻塞,調用線程無法判斷立即返回的結果是不是期望結果,所以調用線程需要增加額外的操作對返回結果進行判斷,正因為這一點,就增加了編程難度(增加的難度可不是一點啊)。
對立即返回的結果進行判斷的方案有兩種:
- 輪詢
線程定期/不定期主動發起查詢和判斷 - 回調函數+事件循環
線程在發起IO時注冊回調函數,然后統一處理事件循環
注意:非阻塞IO實現並發有多種解決方案,編程模型的可讀性都不高,有些方案的編程思維甚至晦澀、難以理解、且編碼困難。
方案一:非阻塞IO+Try+輪詢
服務器端代碼
import socket
addr = ('127.0.0.1', 8080)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(addr)
server.setblocking(False)
server.listen(5)
print('監聽中...')
# 需要執行接收的conn對象放入此列表
recv_list = []
# 需要發送數據的conn對象和數據放入此列表
send_list = []
# 執行鏈接循環
while True:
try:
conn, client = server.accept()
# 執行成功,說明返回值是conn,client
print(f'一個客戶端上線 -> {client}')
# 將成功鏈接的conn放入列表,當accept發生錯誤的時候執行conn的消息接收操作
recv_list.append(conn)
except BlockingIOError:
# 執行accept不成功,意味着當前未有任何連接
# 在下一次執行accept之前,可以執行其他的任務(消息接收操作)
# 無法對處於遍歷期間的接收列表執行remove操作,使用臨時列表存儲需要刪除的conn對象
del_recv_list = []
# 對已經成功鏈接的conn列表執行接收操作
for conn in recv_list:
# 對每一個conn對象,執行recv獲取request
try:
# recv也是非阻塞
request = conn.recv(1024)
# 執行成功,就要處理request
if not request:
# 當前conn鏈接已經失效
conn.close()
# 不再接收此conn鏈接的消息,將失效conn加入刪除列表
del_recv_list.append(conn)
# 當前conn處理完畢,切換下一個
continue
# request有消息,處理,然后需要加入發送列表中
response = request.upper()
# 發送列表需要存放元組,發送conn和發送的數據
send_list.append((conn, response))
except BlockingIOError:
# 當前conn的數據還沒有准備好,處理下一個conn
continue
except ConnectionResetError:
# 當前conn失效,不再接收此conn消息
conn.close()
del_recv_list.append(conn)
# 無法處理發送列表遍歷期間的remove,使用臨時列表
del_send_list = []
# 接收列表全部處理完畢,准備處理發送列表
for item in send_list:
conn = item[0]
response = item[1]
# 執行發送
try:
conn.send(response)
# 發送成功,就應該從發送列表中移除此項目
del_send_list.append(item)
except BlockingIOError:
# 發送緩沖區有可能已經滿了,留待下次發送處理
continue
except ConnectionResetError:
# 鏈接失效
conn.close()
del_recv_list.append(conn)
del_send_list.append(item)
# 刪除接收列表中已經失效的conn對象
for conn in del_recv_list:
recv_list.remove(conn)
# 刪除發送列表中已經發送或者不需要發送的對象
for item in del_send_list:
send_list.remove(item)
服務器使用單線程實現了並發。
對於accept
接收到的多個conn
對象,加入列表,並通過遍歷讀取列表、發送列表來提供多用戶訪問。
單線程中的Socket
模塊提供的IO
函數都被設置成:非阻塞IO類型。
增加了額外操作:對非阻塞調用立即返回的結果,使用了Try
來判斷是否為期望值。
因為不知道何時返回的結果是期望值,所以需要不停的發起調用,並通過Try
來判斷,即,輪詢。
兩次輪詢期間,線程可以執行其他任務。但是模型中也只是不停的發起輪詢,並沒有利用好這些時間。
編碼模型復雜,難理解。
優化:此模型中的主動輪詢的工作由程序負責,其實可以交由OS
代為操作。這樣的話,應用程序就不需要編寫輪詢的部分,可以更聚焦於業務邏輯(upper()
的部分),Python
提供了Select
模塊以處理應用程序的輪詢工作。
方案二:非阻塞IO+Select代理輪詢
服務器端代碼
import socket
import select
addr = ('127.0.0.1', 8080)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(addr)
server.setblocking(False)
server.listen(5)
print('監聽中...')
# 最開始的server對象需要被監聽,一旦可讀,說明可以執行accept
read_list = [server,]
# 需要監聽的寫列表,一旦wl中可寫對象處理完send,應該將它也從此列表中刪除
write_list = []
# 用於臨時存放某一個sock對象需要發送的數據
data_dic = {}
# 不停的發起select查詢
while True:
# 發起select查詢,嘗試得到可以操作的socket對象
rl, wl, xl = select.select(read_list, write_list, [], 1)
# 操作可讀列表
for sock in rl:
# 如果可讀列表中的對象是server,意味着有鏈接,則server可執行accept
if sock is server:
# 執行accept一定不會報錯,所以不需要try
conn, client = sock.accept()
# 一旦獲得conn,就需要將此conn加入可讀列表
read_list.append(conn)
else:
# 說明可讀的對象是普通的conn對象,執行recv時要處理鏈接失效問題
try:
request = sock.recv(1024)
except (ConnectionResetError, ConnectionAbortedError):
# 此鏈接失效
sock.close()
read_list.remove(sock)
else:
# 還需要繼續判斷request的內容
if not request:
# 說明此conn鏈接失效
sock.close()
# 不再監控此conn
read_list.remove(sock)
continue
# 處理請求
response = request.upper()
# 加入發送列表
write_list.append(sock)
# 保存發送的數據
data_dic[sock] = response
# 操作可寫列表
for sock in wl:
# 執行發送操作,send也會出錯
try:
sock.send(data_dic[sock])
# 發送完畢后,需要移除發送列表
write_list.remove(sock)
# 需要移除發送數據
data_dic.pop(sock)
except (ConnectionResetError, ConnectionAbortedError):
# 此鏈接失效
sock.close()
read_list.remove(sock)
write_list.remove(sock)
服務器使用單線程實現了並發。
使用了Select
模塊之后,應用程序不再需要編寫主動輪詢的代碼,而是將此部分工作交由Select
模塊的select
函數代為處理。
應用程序只需要遍歷select
函數返回的可操作socket
列表,並處理相關業務邏輯即可。
雖然應用程序將輪詢工作甩給了select
,自己不用編寫代碼。不過select
函數的底層接口效率不高,使用epoll
接口可以提升效率,此接口被封裝在Selectors
模塊中。
此外,select
函數是一個阻塞IO,在並發數很少的時候,線程大部分時間會阻塞在select
函數上。所以select
函數應該適用於隨時隨刻都有socket
准備好、大規模並發的場景。
編碼困難,模型難理解。
select函數接口說明
def select(rlist, wlist, xlist, timeout=None): # real signature unknown; restored from __doc__
"""
select(rlist, wlist, xlist[, timeout]) -> (rlist, wlist, xlist)
Wait until one or more file descriptors are ready for some kind of I/O.
The first three arguments are sequences of file descriptors to be waited for:
rlist -- wait until ready for reading
wlist -- wait until ready for writing
xlist -- wait for an ``exceptional condition''
If only one kind of condition is required, pass [] for the other lists.
A file descriptor is either a socket or file object, or a small integer
gotten from a fileno() method call on one of those.
The optional 4th argument specifies a timeout in seconds; it may be
a floating point number to specify fractions of seconds. If it is absent
or None, the call will never time out.
The return value is a tuple of three lists corresponding to the first three
arguments; each contains the subset of the corresponding file descriptors
that are ready.
*** IMPORTANT NOTICE ***
On Windows, only sockets are supported; on Unix, all file
descriptors can be used.
"""
pass
- 輸入4個參數(3位置,1默認),返回3個值
- select函數是阻塞IO,函數的返回必須等到至少1個文件描述符准備就緒
- 位置參數
rlist/wlist/xlist
分為是:需要監控的讀列表/寫列表/例外列表(第3參數暫不理解) - 在
windows
下,列表中只能放socket對
象,unix
下,可以放任何文件描述符 - 第4參數如果是
None
(默認),則會永久阻塞,否則按照給定的值(單位是秒)發生超時,可以使用小數如0.5秒 - 返回值是3個列表,里面涵蓋的是可以操作的文件描述符對象
關於輪詢效率的思考
輪詢操作,效率不高。
輪詢的工作視角是:發起者定期/不定期主動發起詢問,如果數據沒有准備好,就繼續發起詢問。如果數據准備好了,發起者就處理這些數據。
假設,調用者在第35次主動輪詢的時候發現數據准備好了,那么意味着前34次主動輪詢的操作是沒有任何收益的。
調用者要想知道數據是否就緒,就要主動詢問,而主動詢問的效率又比較低。
這個矛盾的核心關鍵在於:如何得知數據准備就緒這件事呢?
使用回調函數+事件循環。
此種方案中,調用者不會主動發起輪詢,而是被動的等待IO操作完成,並由OS
向調用者發起准備就緒的事件通知。
方案三:非阻塞IO+Selectors+回調函數+事件循環
# 服務器端代碼
import socket
from selectors import DefaultSelector, EVENT_READ
def recv_read(conn, mask):
# recv回調函數
try:
request = conn.recv(1024)
if not request:
# 意味着鏈接失效,不再監控此socket
conn.close()
selector.unregister(conn)
# 結束此回調的執行
return None
# 鏈接正常,處理數據
conn.send(request.upper())
except (ConnectionResetError, ConnectionAbortedError):
# 鏈接失效
conn.close()
selector.unregister(conn)
def accept_read(server, mask):
# accept回調函數
conn, client = server.accept()
print(f'一個客戶端上線{client}')
# 監聽conn對象的可讀事件的發生,並注冊回調函數
selector.register(conn, EVENT_READ, recv_read)
if __name__ == '__main__':
addr = ('127.0.0.1', 8080)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(addr)
server.setblocking(False)
server.listen(5)
print('監聽中...')
# 獲取對象
selector = DefaultSelector()
# 第一個注冊,監聽server對象的可讀事件的發生,並注冊回調函數
selector.register(server, EVENT_READ, accept_read)
# 執行事件循環
while True:
# 循環調用select,select是阻塞調用,返回就緒事件
events = selector.select()
for key, mask in events:
# 獲取此事件預先注冊的回調函數
callback = key.data
# 對此事件中准備就緒的socket對象執行回調
callback(key.fileobj, mask)
服務器使用單線程實現了並發。
OS
使用了Selectors
自行選擇最優的底層接口監聽socket
對象。
程序不再需要主動發起查詢,而是注冊回調函數。
增加事件循環,用於處理准備就緒的socket
對象,調用預先注冊的回調函數。
應用程序不用再關注如何判斷非阻塞IO的返回值,而將精力聚焦於回調函數的編寫。
方案四:非阻塞IO+協程+回調函數+事件循環(待后續補充)
pass
非阻塞IO的思考和總結(待后續補充)
- 如果將一個IO密集型任務的IO模型設置為非阻塞,則此任務類型將會從IO密集型逐漸轉變為CPU密集型。
- 非阻塞IO的編程模型比較困難,可讀性較差,模型理解困難
- 我認為,含有非阻塞IO+回調+事件循環的編程模型,就是異步編程。
pass
七、關於同步/異步,阻塞IO/非阻塞IO的區別和思考
- 阻塞IO和非阻塞IO指的是
OS
提供的兩種IO接口,區別在於調用時是否立即返回。 - 同步和異步指的是兩個任務之間的執行模型
同步:兩個任務關聯性大,任務相互依賴,對任務執行的前后順序有一定要求
異步:兩個任務關聯性小,任務可以相互獨立,任務執行順序沒有要求 - 網上有很多關於同步阻塞、同步非阻塞、異步阻塞、異步非阻塞的各種理解,站在不同的角度,理解都不一樣。我覺得應該把同步/異步划為一類,用於描述任務執行模型,而把阻塞/非阻塞IO划為一類,用於描述IO調用模型。
如下是我根據網上的各種解釋,結合自己的思考給出的一個關於同步/異步簡單的例子:
-
同步
第一天,晚飯時間到了,你餓了,你走到你老婆面前說:老婆,我餓了,快點做飯!你老婆回答:好的,我去做飯。
你跟着老婆走到廚房,你老婆花了30分鍾的時間給你做飯。這期間,你就站在身邊,啥也不干,就這樣注視着她,你老婆問你:你站這干嘛?你說:我要等你做完飯再走。30分鍾后,你吃到了晚飯。 -
異步+輪詢
第二天,晚飯時間到了,你餓了,你大喊:老婆,我餓了,快點做飯!你老婆回答:好的,我去做飯。
你老婆花了30分鍾的時間給你做飯,但是你不再跟着你老婆走到廚房。這期間,你在客廳看電視,不過你實在餓得不行了,於是你每過5分鍾,就跑到廚房詢問:老婆,飯做好了沒?你老婆回答:還要一會。30分鍾后,你吃到了晚飯。 -
異步+事件通知
第三天,晚飯時間到了,你餓了,你大喊:老婆,我餓了,快點做飯!你老婆回答:好的,我去做飯。
你老婆花了30分鍾的時間給你做飯,你也不再跟着你老婆走到廚房。這期間,你在客廳看電視,你知道你老婆在做飯,你也不會去催她,專心看電視。30分鍾后,你老婆喊你:飯做好了。最后你吃到了晚飯。