Python如何實現異步IO


 

前言

  用阻塞 API 寫同步代碼最簡單,但一個線程同一時間只能處理一個請求,有限的線程數導致無法實現萬級別的並發連接,過多的線程切換也搶走了 CPU 的時間,從而降低了每秒能夠處理的請求數量。為了達到高並發,你可能會選擇一個異步框架,用非阻塞 API 把業務邏輯打亂到多個回調函數,通過多路復用與事件循環的方式實現高並發。

磁盤 IO 為例,描述了多線程中使用阻塞方法讀磁盤,2 個線程間的切換方式。那么,怎么才能實現高並發呢?

把上圖中本來由內核實現的請求切換工作,交由用戶態的代碼來完成就可以了,異步化編程通過應用層代碼實現了請求切換,降低了切換成本和內存占用空間。異步化依賴於 IO 多路復用機制,比如 Linux 的 epoll 或者 Windows 上的 iocp,同時,必須把阻塞方法更改為非阻塞方法,才能避免內核切換帶來的巨大消耗。Nginx、Redis 等高性能服務都依賴異步化實現了百萬量級的並發。

下圖描述了異步 IO 的非阻塞讀和異步框架結合后,是如何切換請求的。

 

 

然而,寫異步化代碼很容易出錯。因為所有阻塞函數,都需要通過非阻塞的系統調用拆分成兩個函數。雖然這兩個函數共同完成一個功能,但調用方式卻不同。第一個函數由你顯式調用,第二個函數則由多路復用機制調用。

這種方式違反了軟件工程的內聚性原則,函數間同步數據也更復雜。特別是條件分支眾多、涉及大量系統調用時,異步化的改造工作會非常困難。

Python如何實現異步調用

from flask import Flask
import time
app = Flask(__name__)


@app.route('/bar')
def bar():
    time.sleep(1)
    return '<h1>bar!</h1>'

@app.route('/foo')
def foo():
    time.sleep(1)
    return '<h1>foo!</h1>'
if __name__ == '__main__':
    app.run(host='127.0.0.1',port=5555,debug=True)

采用同步的方式調用

import requests
import time

starttime = time.time()
print(requests.get('http://127.0.0.1:5555/bar').content)
print(requests.get('http://127.0.0.1:5555/foo').content)
print("消耗時間: ",time.time() -starttime)
b'<h1>bar!</h1>'
b'<h1>foo!</h1>'
消耗時間:  2.015509605407715

采樣異步的方式調用:

重點:

1.將阻塞io改為非阻塞io;

2.多路復用io監聽內核事件,事件觸發通過回調函數;

3.用戶態代碼采取事件循環的方式獲取事件,執行事件的回調函數;

import selectors
import socket
import time
# from asynrequest import ParserHttp
class asynhttp:
    def __init__(self):
        self.selecter = selectors.DefaultSelector()

    def get(self,url,optiondict = None):
        global reqcount
        reqcount += 1
        s = socket.socket()
        s.setblocking(False)
        try:
            s.connect(('127.0.0.1',5555))
        except BlockingIOError:
            pass
        requset = 'GET %s HTTP/1.0\r\n\r\n' % url
        callback = lambda : self.send(s,requset)
        self.selecter.register(s.fileno(),selectors.EVENT_WRITE,callback)

    def send(self,s,requset):
        self.selecter.unregister(s.fileno())
        s.send(requset.encode())
        chunks = []
        callback = lambda: self.recv(s,chunks)
        self.selecter.register(s.fileno(),selectors.EVENT_READ,callback)

    def recv(self,s,chunks):
        self.selecter.unregister(s.fileno())
        chunk = s.recv(1024)
        if chunk:
            chunks.append(chunk)
            callback = lambda: self.recv(s,chunks)
            self.selecter.register(s.fileno(), selectors.EVENT_READ, callback)
        else:
            global reqcount
            reqcount -= 1
            request_first,request_headers,request_content,_ = ParserHttp.parser(b''.join(chunks))
            print("解析數據:",request_first,request_headers,request_content)
            print((b''.join(chunks)).decode())
            return (b''.join(chunks)).decode()

starttime = time.time()
reqcount = 0
asynhttper = asynhttp()
asynhttper.get('/bar')
asynhttper.get('/foo')
while reqcount:
    events = asynhttper.selecter.select()
    for event,mask in events:
        func = event.data
        func()
print("消耗時間:" ,time.time() - starttime)
HTTP/1.0 200 OK
Content-Type: text/html; charset=utf-8
Content-Length: 13
Server: Werkzeug/1.0.1 Python/3.7.7
Date: Thu, 15 Oct 2020 03:28:16 GMT

<h1>bar!</h1>
HTTP/1.0 200 OK
Content-Type: text/html; charset=utf-8
Content-Length: 13
Server: Werkzeug/1.0.1 Python/3.7.7
Date: Thu, 15 Oct 2020 03:28:16 GMT

<h1>foo!</h1>
消耗時間: 1.0127637386322021

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM