【經典】5種IO模型 | IO多路復用


上篇回顧:靜態服務器+壓測

3.2.概念篇

1.同步與異步

同步是指一個任務的完成需要依賴另外一個任務時,只有等待被依賴的任務完成后,依賴的任務才能算完成。

異步是指不需要等待被依賴的任務完成,只是通知被依賴的任務要完成什么工作。然后繼續執行下面代碼邏輯,只要自己完成了整個任務就算完成了(異步一般使用狀態、通知和回調)

PS:項目里面一般是這樣的:(個人經驗)

  1. 同步架構:一般都是和錢相關的需求,需要實時返回的業務
  2. 異步架構:更多是對寫要求比較高時的場景(同步變異步)
    • 讀一般都是實時返回,代碼一般都是await xxx()
  3. 想象個情景就清楚了:
    • 異步:現在用戶寫了篇文章,可以異步操作,就算沒真正寫到數據庫也可以返回:發表成功(大不了失敗提示一下)
    • 同步:用戶獲取訂單信息,你如果異步就會這樣了:提示下獲取成功,然后一片空白...用戶不卸載就怪了...

2.阻塞與非阻塞

阻塞是指調用結果返回之前,當前線程會被掛起,一直處於等待消息通知,不能夠執行其他業務(大部分代碼都是這樣的)

非阻塞是指在不能立刻得到結果之前,該函數不會阻塞當前線程,而會立刻返回(繼續執行下面代碼,或者重試機制走起)

PS:項目里面重試機制為啥一般都是3次?

  1. 第一次重試,兩台PC掛了也是有可能的
  2. 第二次重試,負載均衡分配的三台機器同時掛的可能性不是很大,這時候就有可能是網絡有點擁堵了
  3. 最后一次重試,再失敗就沒意義了,日記寫起來,再重試網絡負擔就加大了,得不償失了

3.五種IO模型

對於一次IO訪問,數據會先被拷貝到內核的緩沖區中,然后才會從內核的緩沖區拷貝到應用程序的地址空間。需要經歷兩個階段:

  1. 准備數據
  2. 將數據從內核緩沖區拷貝到進程地址空間

由於存在這兩個階段,Linux產生了下面五種IO模型(以socket為例

  1. 阻塞式IO:
    • 當用戶進程調用了recvfrom等阻塞方法時,內核進入IO的第1個階段:准備數據(內核需要等待足夠的數據再拷貝)這個過程需要等待,用戶進程會被阻塞,等內核將數據准備好,然后拷貝到用戶地址空間,內核返回結果,用戶進程才從阻塞態進入就緒態
    • Linux中默認情況下所有的socket都是阻塞的
  2. 非阻塞式IO:
    • 當用戶進程發出read操作時,如果kernel中的數據還沒有准備好,那么它並不會block用戶進程,而是立刻返回一個error
    • 用戶進程判斷結果是一個error時,它就知道數據還沒有准備好,於是它可以再次發送read操作
    • 一旦kernel中的數據准備好了,並且又再次收到了用戶進程的system call,那么它馬上就將數據拷貝到了用戶內存,然后返回
    • 非阻塞IO模式下用戶進程需要不斷地詢問內核的數據准備好了沒有
  3. IO多路復用
    • 通過一種機制,一個進程可以監視多個文件描述符(套接字描述符)一旦某個文件描述符就緒(一般是讀就緒或者寫就緒),能夠通知程序進行相應的讀寫操作(這樣就不需要每個用戶進程不斷的詢問內核數據准備好了沒)
    • 常用的IO多路復用方式有selectpollepoll
  4. 信號驅動IO:
    • 內核文件描述符就緒后,通過信號通知用戶進程,用戶進程再通過系統調用讀取數據。
    • 此方式屬於同步IO(實際讀取數據到用戶進程緩存的工作仍然是由用戶進程自己負責的)
  5. 異步IOPOSIXaio_系列函數)
    • 用戶進程發起read操作之后,立刻就可以開始去做其它的事。內核收到一個異步IO read之后,會立刻返回,不會阻塞用戶進程。
    • 內核會等待數據准備完成,然后將數據拷貝到用戶內存,當這一切都完成之后,內核會給用戶進程發送一個signal告訴它read操作完成了

4.Unix圖示

貼一下Unix編程里面的圖:

**非阻塞IO**
![2.非阻塞IO](https://img2018.cnblogs.com/blog/1127869/201812/1127869-20181210212858009-948984805.png)
**IO復用**
![3.IO復用](https://img2018.cnblogs.com/blog/1127869/201812/1127869-20181210212908314-1267377747.png)
**信號IO**
![4.信號IO](https://img2018.cnblogs.com/blog/1127869/201812/1127869-20181210212934040-13536334.png)
**異步AIO**
![5.異步AIO](https://img2018.cnblogs.com/blog/1127869/201812/1127869-20181210212944334-1184572641.png)

3.3.IO多路復用

開始之前咱們通過非阻塞IO引入一下:(來個簡單例子socket.setblocking(False))

import time
import socket

def select(socket_addr_list):
    for client_socket, client_addr in socket_addr_list:
        try:
            data = client_socket.recv(2048)
            if data:
                print(f"[來自{client_addr}的消息:]\n")
                print(data.decode("utf-8"))
                client_socket.send(
                    b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
                )
            else:
                # 沒有消息是觸發異常,空消息是斷開連接
                client_socket.close()  # 關閉客戶端連接
                socket_addr_list.remove((client_socket, client_addr))
                print(f"[客戶端{client_addr}已斷開連接,當前連接數:{len(socket_addr_list)}]")
        except Exception:
            pass

def main():
    # 存放客戶端集合
    socket_addr_list = list()

    with socket.socket() as tcp_server:
        # 防止端口綁定的設置
        tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        tcp_server.bind(('', 8080))
        tcp_server.listen()
        tcp_server.setblocking(False)  # 服務端非阻塞
        while True:
            try:
                client_socket, client_addr = tcp_server.accept()
                client_socket.setblocking(False)  # 客戶端非阻塞
                socket_addr_list.append((client_socket, client_addr))
            except Exception:
                pass
            else:
                print(f"[來自{client_addr}的連接,當前連接數:{len(socket_addr_list)}]")
            # 防止客戶端斷開后出錯
            if socket_addr_list:
                # 輪詢查看客戶端有沒有消息
                select(socket_addr_list)  # 引用傳參
                time.sleep(0.01)

if __name__ == "__main__":
    main()

輸出:
3.nowait.gif

可以思考下:

  1. 為什么Server也要設置為非阻塞?
    • PS:一個線程里面只能有一個死循環,現在程序需要兩個死循環,so ==> 放一起咯
  2. 斷開連接怎么判斷?
    • PS:沒有消息是觸發異常,空消息是斷開連接
  3. client_socket為什么不用dict存放?
    • PS:dict在循環的過程中,del會引發異常

1.Select

select和上面的有點類似,就是輪詢的過程交給了操作系統:

kernel會“監視”所有select負責的socket,當任何一個socket中的數據准備好了,select就會返回。這個時候用戶進程再調用read操作,將數據從kernel拷貝到用戶進程

來個和上面等同的案例:

import select
import socket

def main():
    with socket.socket() as tcp_server:
        tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        tcp_server.bind(('', 8080))
        tcp_server.listen()
        socket_info_dict = dict()
        socket_list = [tcp_server]  # 監測列表
        while True:
            # 劣勢:select列表數量有限制
            read_list, write_list, error_list = select.select(
                socket_list, [], [])
            for item in read_list:
                # 服務端迎接新的連接
                if item == tcp_server:
                    client_socket, client_address = item.accept()
                    socket_list.append(client_socket)
                    socket_info_dict[client_socket] = client_address
                    print(f"[{client_address}已連接,當前連接數:{len(socket_list)-1}]")
                # 客戶端發來
                else:
                    data = item.recv(2048)
                    if data:
                        print(data.decode("utf-8"))
                        item.send(
                            b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
                        )
                    else:
                        item.close()
                        socket_list.remove(item)
                        info = socket_info_dict[item]
                        print(f"[{info}已斷開,當前連接數:{len(socket_list)-1}]")

if __name__ == "__main__":
    main()

輸出和上面一樣

擴展說明:

select 函數監視的文件描述符分3類,分別是writefdsreadfds、和exceptfds。調用后select函數會阻塞,直到有描述符就緒函數返回(有數據可讀、可寫、或者有except)或者超時(timeout指定等待時間,如果立即返回設為null即可)

select的一個缺點在於單個進程能夠監視的文件描述符的數量存在最大限制,在Linux上一般為1024(64位=>2048)

然后Poll就出現了,就是把上限給去掉了,本質並沒變,還是使用的輪詢

2.EPoll

epoll在內核2.6中提出(Linux獨有),使用一個文件描述符管理多個描述符,將用戶關心的文件描述符的事件存放到內核的一個事件表中,采用監聽回調的機制,這樣在用戶空間和內核空間的copy只需一次,避免再次遍歷就緒的文件描述符列表

先來看個案例吧:(輸出和上面一樣)

import socket
import select

def main():
    with socket.socket() as tcp_server:
        tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        tcp_server.bind(('', 8080))
        tcp_server.listen()

        # epoll是linux獨有的
        epoll = select.epoll()
        # tcp_server注冊到epoll中
        epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET)

        # key-value
        fd_socket_dict = dict()

        # 回調需要自己處理
        while True:
            # 返回可讀寫的socket fd 集合
            poll_list = epoll.poll()
            for fd, event in poll_list:
                # 服務器的socket
                if fd == tcp_server.fileno():
                    client_socket, client_addr = tcp_server.accept()
                    fd = client_socket.fileno()
                    fd_socket_dict[fd] = (client_socket, client_addr)
                    # 把客戶端注冊進epoll中
                    epoll.register(fd, select.EPOLLIN | select.EPOLLET)
                else:  # 客戶端
                    client_socket, client_addr = fd_socket_dict[fd]
                    data = client_socket.recv(2048)
                    print(
                        f"[來自{client_addr}的消息,當前連接數:{len(fd_socket_dict)}]\n")
                    if data:
                        print(data.decode("utf-8"))
                        client_socket.send(
                            b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
                        )
                    else:
                        del fd_socket_dict[fd]
                        print(
                            f"[{client_addr}已離線,當前連接數:{len(fd_socket_dict)}]\n"
                        )
                        # 從epoll中注銷
                        epoll.unregister(fd)
                        client_socket.close()

if __name__ == "__main__":
    main()

擴展:epoll的兩種工作模式

LT(level trigger,水平觸發)模式:當epoll_wait檢測到描述符就緒,將此事件通知應用程序,應用程序可以不立即處理該事件。下次調用epoll_wait時,會再次響應應用程序並通知此事件。LT模式是默認的工作模式。
LT模式同時支持阻塞和非阻塞socket。

ET(edge trigger,邊緣觸發)模式:當epoll_wait檢測到描述符就緒,將此事件通知應用程序,應用程序必須立即處理該事件。如果不處理,下次調用epoll_wait時,不會再次響應應用程序並通知此事件。
ET是高速工作方式,只支持非阻塞socket(ET模式減少了epoll事件被重復觸發的次數,因此效率要比LT模式高)

Code提煉一下

  1. 實例化對象:epoll = select.epoll()
  2. 注冊對象:epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET)
  3. 注銷對象:epoll.unregister(fd)

PS:epoll不一定比Select性能高,一般都是分場景的:

  1. 高並發下,連接活躍度不高時:epoll比Select性能高(eg:web請求,頁面隨時關閉)
  2. 並發不高,連接活躍度比較高:Select更合適(eg:小游戲)
  3. Select是win和linux通用的,而epoll只有linux有

其實IO多路復用還有一個kqueue,和epoll類似,下面的通用寫法中有包含


3.通用寫法(Selector

一般來說:Linux下使用epoll,Win下使用select(IO多路復用會這個通用的即可)

先看看Python源代碼:

# 選擇級別:epoll|kqueue|devpoll > poll > select
if 'KqueueSelector' in globals():
    DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
    DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
    DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
    DefaultSelector = PollSelector
else:
    DefaultSelector = SelectSelector

實戰案例:(可讀和可寫可以不分開)

import socket
import selectors

# Linux下使用epoll,Win下使用select
Selector = selectors.DefaultSelector()

class Task(object):
    def __init__(self):
        # 存放客戶端fd和socket鍵值對
        self.fd_socket_dict = dict()

    def run(self):
        self.server = socket.socket()
        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server.bind(('', 8080))
        self.server.listen()
        # 把Server注冊到epoll
        Selector.register(self.server.fileno(), selectors.EVENT_READ,
                          self.connected)

    def connected(self, key):
        """客戶端連接時處理"""
        client_socket, client_address = self.server.accept()
        fd = client_socket.fileno()
        self.fd_socket_dict[fd] = (client_socket, client_address)
        # 注冊一個客戶端讀的事件(服務端去讀消息)
        Selector.register(fd, selectors.EVENT_READ, self.call_back_reads)
        print(f"{client_address}已連接,當前連接數:{len(self.fd_socket_dict)}")

    def call_back_reads(self, key):
        """客戶端可讀時處理"""
        # 一個fd只能注冊一次,監測可寫的時候需要把可讀給注銷
        Selector.unregister(key.fd)
        client_socket, client_address = self.fd_socket_dict[key.fd]
        print(f"[來自{client_address}的消息:]\n")
        data = client_socket.recv(2048)
        if data:
            print(data.decode("utf-8"))
            # 注冊一個客戶端寫的事件(服務端去發消息)
            Selector.register(key.fd, selectors.EVENT_WRITE,
                              self.call_back_writes)
        else:
            client_socket.close()
            del self.fd_socket_dict[key.fd]
            print(f"{client_address}已斷開,當前連接數:{len(self.fd_socket_dict)}")

    def call_back_writes(self, key):
        """客戶端可寫時處理"""
        Selector.unregister(key.fd)
        client_socket, client_address = self.fd_socket_dict[key.fd]
        client_socket.send(b"ok")
        Selector.register(key.fd, selectors.EVENT_READ, self.call_back_reads)

def main():
    t = Task()
    t.run()
    while True:
        ready = Selector.select()
        for key, obj in ready:
            # 需要自己回調
            call_back = key.data
            call_back(key)

if __name__ == "__main__":
    main()

Code提煉一下

  1. 實例化對象:Selector = selectors.DefaultSelector()
  2. 注冊對象:
    • Selector.register(server.fileno(), selectors.EVENT_READ, call_back)
    • Selector.register(server.fileno(), selectors.EVENT_WRITE, call_back)
  3. 注銷對象:Selector.unregister(key.fd)
  4. 注意一下:一個fd只能注冊一次,監測可寫的時候需要把可讀給注銷(反之一樣)

業余拓展:

select, iocp, epoll,kqueue及各種I/O復用機制
https://blog.csdn.net/shallwake/article/details/5265287

kqueue用法簡介
http://www.cnblogs.com/luminocean/p/5631336.html

下級預估:協程篇 or 網絡深入篇


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM