python並發編程之IO阻塞


IO模型

  解決IO問題的方式方法
  問題是:IO操作阻塞程序執行
  解決的也僅僅是網絡IO操作
 
  一般數據傳輸經歷的兩個階段,如圖:

 

  IO阻塞模型分類:

  1. 阻塞IO
  2. 非阻塞IO
  3. 多路復用IO
  4. 異步IO(爬蟲階段)
  5. 信號驅動IO(了解)

1、阻塞IO模型

  socket模塊默認是阻塞的,一個讀操作流程如下:

  問題:

    同一時間只能服務一個客戶端

  解決辦法:

    1. 多線程

      優點:如果並發量不高,效率是較高的,因為每個客戶端都有單獨線程來處理

      缺點:不可能無限的開啟線程,線程也需要占用資源

    2. 多進程

      優點:可以多個CPU並行處理

      弊端:占用資源非常大,一旦客戶端稍微多一點,立馬就慢了

    3.線程池

      優點:保證了服務器正常運行,還幫你負責創建和銷毀線程,以及任務分配

      缺點:一旦並發量超出最大線程量,就只能等簽名的運行完畢。

    4. 協程

      優點:不需要創建一段線程,也不需要在線程間做切換,沒有數量限制

      缺點:不能利用多核優勢

    結果:真正倒是效率低的是阻塞問題,但上述辦法並沒有真正的解決阻塞問題。

2、非阻塞IO模型

  遇到IO操作也不阻塞,會繼續執行。意味着即使遇到IO操作CPU執行權也不會被剝奪

  方法:設置socket使其變為non-blocking,即server.setblocking(False),具體流程如下:

  從圖中看出,非阻塞的recv系統調用之后,進程沒有被阻塞,操作系統立馬把結果返回給進程,如果數據還沒准備好,則拋出異常,進程可以去做其他的事,然后在發起recv系統調用,重復上述過程(這個過程通常被稱為輪詢),一直到數據准備好,再拷貝數據到進程進行數據處理。需要注意,拷貝數據的整個過程,進程仍然是屬於阻塞狀態。

  缺點: 占用CPU太多,原因是需要無限的循環去向操作系統拿數據。

import socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('127.0.0.1', 8080))
server.listen(5)
server.setblocking(False)
# 所有客戶端的socket
conns = []
# 所有需要返回數據的客戶端
send_cs = []

while True:
    try:
        conn, client_addr = server.accept()
        print(client_addr,'已連接')
        conns.append(conn)
    except BlockingIOError:
        # 接收數據
        for conn in conns[:]:# 和conns.copy()一樣,原因:迭代過程不能刪除元素
            try:
                data = conn.recv(1024)
                print(data)
                send_cs.append((data, conn))
                # send也是IO操作,在一些極端情況下,如系統緩存滿了,肯定也會拋出異常
                # 所以,send要單拿出來處理
            except BlockingIOError:
                continue
            except ConnectionResetError:
                conn.close()
                conns.remove(conn)
        # 發送數據
        for item in send_cs[:]:
            data, conn = item
            try:
                conn.send(data.upper())
                # 如果發送成功就把數據從列表中刪除
                send_cs.remove(item)
            except BlockingIOError: # 如果緩沖區滿了 就下次再發
                continue
            except ConnectionResetError:
                conn.close()
                send_cs.remove(item)
                conns.remove(conn)
服務端代碼

3、多路復用IO

  用一個線程來處理並發所有的客戶端。

  需要使用select模塊,select原理:把所有的socket交給select,select會不斷輪詢所負責的所有socket,當某個socket有數據到達,就通知進程繼續執行后面代碼。

  流程:程序發起一個select調用,select使整個進程阻塞,直到有socket准備就緒,select就返回,這個時候進程在調用read操作,直接從緩沖中把數據拷貝到進程。流程圖如下:

import socket
import select

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("127.0.0.1", 8888))
server.listen()

r_list = [server] # 監測是否收到數據的客戶端
w_list = []       # 監測是否需要發送數據的客戶端
x_list = []

# 用來發送數據
data_dic = {}

while True:
    readables, writeables, _ = select.select(r_list, w_list, x_list)
    # 接收數據以及建立連接
    for conn in readables:
        if conn == server:
            new_conn, _ = conn.accept()
            r_list.append(new_conn)
        else:
            try:
                data = conn.recv(1024)
                if not data:
                    conn.close()
                    r_list.remove(conn)
                    continue
                print(data)
                # 發送數據
                w_list.append(conn)
                data_dic[conn] = data
            except ConnectionResetError:
                conn.close()
                r_list.remove(conn)
    # 發送數據
    for conn in writeables:
        try:
            conn.send(data_dic[conn].upper())
        except ConnectionResetError:
            conn.close()
        finally:
            data_dic.pop(conn)
            w_list.remove(conn)
服務端代碼
import socket
import threading
from  threading import Thread

def communication():
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect(('127.0.0.1', 8888))
    while True:
        msg = "%s say hello for you"%threading.current_thread()
        if not msg:
            continue
        client.send(msg.encode("utf-8"))
        data = client.recv(1024)
        print(data.decode("utf-8"))


for i in range(100):
    Thread(target=communication).start()
客戶端代碼

  強調:select的優勢在於可以處理多個連接,並不適用於單個連接

  優點:占用資源少,不消耗太多CPU,同時能夠為多個客戶端提供服務。(適用於簡單的事件驅動服務器)

  缺點:需要消耗大量時間區輪詢各個socket,更好的選擇時epoll,其次把事件探測和響應夾雜在一起,耦合性增加

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM