Python協程(gevent+asyncio)模塊


前言

如何在使用1個線程的前提下,提網站的並發性,使用協程?

如果要使用協程首先要解決2個問題:

1.如何檢測到代碼中遇到了IO操作?(XX)

2.如何在線程代碼里上下切換?(Greelet模塊)

而Gvent模塊封裝好了以上2種功能,可以讓我們在python中優雅的使用協程;

 

一、Gvent是什么?

:\版本1\cmdb_rbac_arya>pip show gevent
Name: gevent
Version: 1.4.0
Summary: Coroutine-based network library #基於協程的網站庫
Home-page: http://www.gevent.org/
Author: Denis Bilenko
Author-email: denis.bilenko@gmail.com #以后使用過程出現任何問題 給 Denis Bilenko發郵箱
License: MIT
Location: d:\python3.6.1\lib\site-packages
Requires: greenlet, cffi                       #需要依賴的庫

二、Linux網絡IO模型

網絡IO模型是指:Linux服務端如何接收、處理客戶端請求的形式;(智慧、創新的程序們設計那么多的IO模型,為1個目的 讓我們的網頁訪問更快。。。)

由於同步(synchronous) IO和異步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分別是什么,到底有什么區別?

不同的上下文環境里,所表達的意義有所不同,開篇前先框定今天的topic,我們今天主要講得是Linux網絡IO模型;

 

Linux系統服務器1次網絡IO發生時涉及的對象和階段?

對於一個Linux系統服務器的network IO (這里我們以read舉例,也就是python的socket模塊執行socket.receive()),它會涉及到2個對象,

對象1.調用這個IO的process (or thread)【我們在服務器端寫得web程序】

對象2.系統內核(kernel)

當我們在Linux服務端執行socket.receive()方法接收客戶端發送過來的消息時,該操作其實會經歷2個過程:

階段1.等待客戶端的數據傳輸到服務端的內核空間 (Waiting for the data to be ready):等待客戶端執行sen(b'hello')把數據傳輸到服務端的操作系統(內核空間);

階段2.將數據從內核空間拷貝到用戶空間(Copying the data from the kernel to the process): 我們socket程序工作在用戶空間,So階段2是等待數據從內核空間復制到用戶空間;

經歷了以上2個過程之后才 服務端的socket.receive()才得到了用戶send(b'hello')消息;

ps:

嚴格應該是4個步驟

1.socket.reveive 等待用戶分組發完數據

2.數據由用戶空間到內核空間

3.等待內核准備數據

4.內核准備玩據,數據由內核空間回到用戶空間

因為步驟2和步驟4的耗時很快,是可以忽略不計的。

 

記住這兩點真的真的很重要,IO模型的演變,都是圍繞着 盡可能規避這2個過程,為中心來展開的!

 o

 

阻塞/非阻塞 和 同步/異步 的區別?

阻塞、非阻塞是描述得是 程序在發起系統調用之后等待IO模型返回消息時的狀態

比如socket在執行recive()方法的時候,當前線程由活躍---->阻塞狀態,直到消息返回(執行以上2個過程之后), 程序由阻塞--->就緒---->活躍狀態,如果socket.setblocking(False),socket在執行recive()方法向操作系統發起系統調用之后, 將不會進入阻塞狀態,所以非阻塞IO可以大大提升進程對CPU的利用效率 ;

同步、異步描述得是程序從IO模型 獲得消息的機制

程序發起系統調用之后,最終消息是通過1種什么樣的機制 回調、通知到程序的;

 

阻塞、非阻塞 和 同步、異步和 的組合?

下面說說的:阻塞(blocking) IO模型,非阻塞(non-blocking )IO模型,IO 多路復用(multiplexing)都屬於同步(synchronous) IO這一類;

而 異步(asynchronous) I/O屬於異步IO模型。

 

二、各種IO模型介紹

 1.阻塞IO(blocking IO)模型

 我們寫平時寫的socket默認都是阻塞IO;

A.客戶端連接服務端

B.服務端發起系統調用

C.等待客戶端send()消息到操作系統(內核空間)(此時socket進程進入阻塞狀態,不能接收新的連接請求,你什么也不可以做!)

D.等待內核把數據從內核空間copy到用戶空間 (此時socket進程進入阻塞狀態,還是不能接受新的連接請求,你什么也不可以做!)

E.直到數據由內核空間---->socket進程,socket開始響應客戶端

F.開始接收新的客戶端連接請求(循環A-F 的步驟)

 

解決之道:

每當1個客戶端連接到服務端,服務端開啟1個線程,來處理客戶端請求,制造並發;

利:

大道至簡,這樣程序處理邏輯最簡單;

弊:

即便1個線程維護1個客戶端連接,但是每1個線程依然規避不了C、D這2個過程中的程序阻塞;

 

2.非阻塞IO(non-blocking IO)模型

我們寫平時寫的socket中調用socket.setblocking(False)之后,此時socket就是非阻塞式的了;

 

A.客戶端連接服務端

B.服務端socket發起系統調用

C.內核說:滾回去,不阻塞你了,你的數據還沒來呢!(此時 您的進程可以 print(‘fuck’)、可以接收其它用戶的連接.....You can do everything);

D.服務端socket再次發起系統調用

E.內核說:還給我滾回去,不阻塞你了,你的數據還沒來呢!(此時 您的進程可以 print(‘fuck’)、可以接收其它用戶的連接.....You can do everything);

F.服務端socket不斷發起系統調用進行輪詢內核,一直到數據從內核空間准備到了用戶空間,進程才拿到了數據;

 

解決之道:

socket.setblocking(False)

利:

規避了進程等待消息從client端------>服務端socket的過程;

進程根本不會再進入阻塞狀態,所以擁有了可以處理多個客戶端請求的能力;

弊:

不斷得發起系統調用輪詢內核

雖然進程不需要進入阻塞狀態,也規避了等待client端發送消息過來了的過程;

copy data from kenel to user的過程,依然強硬的存在着

代碼實現

import socket
sk=socket.socket()
sk.bind(('127.0.0.1',8080))
sk.setblocking(False)#把socket中所有需要阻塞的方法都改變成非阻塞:recv() recvfrom() accept()
sk.listen()
conn_list=[]#用來存儲所有來請求server端的conn連接
del_conn_list=[]#用來存儲所有已經和server端斷開連接的conn連接

while True:  # 在非阻塞式IO中,用戶進程其實是需要不斷的主動詢問kernel數據准備好了沒有。
    try:
        conn,ipaddr=sk.accept()#去問問內核有人連接我不? 設置socket不阻塞之后,這里會報錯,因為這不會阻塞住等待數據 由內核-->用戶空間
        print('建立連接了')
        conn_list.append(conn)
    except BlockingIOError:
        for con in conn_list:
            try:
                msg=con.recv(1024)#去問問內核連接過我的人中,有人給我發消息不?
                if msg == b'':#如果client端和server端斷開了連接,server端就會收到b''空消息
                    del_conn_list.append(con)
                    continue
                print(msg)
                con.send(b'Byebye')
            except BlockingIOError: pass
        else:
            for con in del_conn_list:#從正在連接的socket列表刪除已經斷開連接的conn
                conn_list.remove(con)
                con.close()
            del_conn_list.clear()#把刪除列表的scnn清空





#
server.py

 

import socket
import time
import threading
def func():
    sk=socket.socket()
    sk.connect(('127.0.0.1',8080))
    sk.send(b'hello')
    time.sleep(1)
    print(sk.recv(1024))
    sk.close()

for i in range(20):
    threading.Thread(target=func).start()
client.py

 

3.IO多路復用/事件驅動IO(event driven IO)模型

 雖然上面的異步IO模型在使用了單線程並切在沒有進行協程切換的前提下實現了並發,但是也並不完美,因為while ture會非常耗費內存,不斷得向Linux內核

.accept() .recv()輪詢,也會造成CPU負載過大;

IO多路復用出現了!

 

IO多路復用模型工作機制

 

IO多路復用中的IO依然是阻塞的,但是操作系統給我們提供了1個代理程序(Linux系統是epllo、Windows是select),這個代理程序去循環監聽1個socket列表;

該列表中監控2類socket:

conn,ipaddr=sk.accept()是否來了客戶端連接?

監聽msg=conn.recv(1024)客戶端連接是否發來了新數據?

這就意味着 此時我們Python程序的1只手被解放了!!為什么是1只手被解放了?而不是雙手?

即便IO多路復用網絡模型規避了等待client端發送消息過來的過程,但依舊還是沒有規避等待消息從內核空間copy到用戶空間的過程;

1.等待client端發送消息過來

2.等待消息從內核空間copy到用戶空間

Python程序不用去等待client端發送消息過來,而是由這個操作系統提供代理程序(select/epoll)去等待;

一旦有新的conn 或者conn有新的消息發送過來,代理程序立即回調、通知Python程序;

此時Python程序執行 socket.accept() conn.receive() ,恰到好處得接收到了新的client請求、conn連接發送過來的數據;

IO多路復用模型流程圖

 

 

解決之道:

找個代理監控socket、conn連接,如果有新的客戶端連接、或者新消息,回調Python;

規避了等待client端發送消息過來的過程

利:

規避了等待client端發送消息過來的過程消息回調通知機制,不用向非阻塞IO一樣,不斷得輪詢了;

弊:

雖然IO多路復用模型,有效的規避了等待client客戶端發送消息到server端的過程,並解決了非阻塞IO模型不斷輪詢操作系統的問題;

但copy data from kenel to user的過程,依然強硬的存在着;

建議:

因為在操作系統和程序之間加了中間代理回調,所以網站處理的連接數不是很高的話,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。

應用場景:

select的優勢在於可以處理多個連接,不適用於單個連接;

 代碼實現

import select
import socket

sk=socket.socket()
sk.bind(('127.0.0.1',8080))
sk.setblocking(False)
sk.listen()

read_list=[sk]

while True:#循環監聽 sk 和連接sk的client
    r_list, w_list, x_list = select.select(read_list, [], [])
    for i in r_list:
        if i is sk:
            conn,addr=i.accept()
            read_list.append(conn)#select不僅可以支持對sk對象的監控,還支持對conn對象的監聽
        else:
            msg=i.recv(1024)
            if msg == b'':#client端和server端斷開連接了
                print('斷開')
                read_list.remove(i)
                i.close()
                continue
            print(msg)
            i.send(b'byebye!!')
server.py

 

import socket
import time
import threading
import time
def func():
    sk=socket.socket()
    sk.connect(('127.0.0.1',8080))
    sk.send(b'hello')
    time.sleep(1)
    print(sk.recv(1024))
    sk.close()

for i in range(10):
    threading.Thread(target=func).start()
client.py

 

不同操作系統中IO多路復用模型介紹

 

select機制 :IO多路復用模型得以實現得核心:就是操作系統 監控1個[sk......conn,]列表,不斷輪詢每1個sk/conn/是否可以accpet/revive,隨着監控列表的增加,效率會遞減;

支持操作系統:linux/windows

 

poll機制:和select機制一樣,但是支持監控的socket會比select多

支持操作系統:linux

 

epoll機制:和select機制完全不一樣

1.epoll很高級,epoll不會去再通過操作循環檢查監控的socket列表中,那些socket出現了讀操作,而是給需要監聽的socket 1--1綁定1個回調函數;

2.檢測的socket中 有1個soket出現了讀操作,直接執行調用那個和該sk/con綁定的回調函數執行sk.accpet() 和conn.receve()

epoll的優勢:

回調函數取代了循環輪詢的反饋方式,所以性能大大提升;

比如epoll監控了1個長度為3000的socket列表,1993_socket和2019_socket同時出現了讀操作,那么這2個soket的回調函數同時執行,去響應客戶端;

省去了0---1993---->2019循環的過程;

支持操作系統:linux

如果在Linux操作系統我肯定會選擇epoll而不是select啊;所以如何讓自己的代碼兼容不同平台,自動選擇最佳的IO多路復用代理?

Python中的selectors模塊就是幫我們自動選擇最佳IO多路復用代理的;

#服務端
from socket import *
import selectors

sel=selectors.DefaultSelector()
def accept(server_fileobj,mask):
    conn,addr=server_fileobj.accept()
    sel.register(conn,selectors.EVENT_READ,read)

def read(conn,mask):
    try:
        data=conn.recv(1024)
        if not data:
            print('closing',conn)
            sel.unregister(conn)
            conn.close()
            return
        conn.send(data.upper()+b'_SB')
    except Exception:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()



server_fileobj=socket(AF_INET,SOCK_STREAM)
server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server_fileobj.bind(('127.0.0.1',8088))
server_fileobj.listen(5)
server_fileobj.setblocking(False) #設置socket的接口為非阻塞
sel.register(server_fileobj,selectors.EVENT_READ,accept) #相當於網select的讀列表里append了一個文件句柄server_fileobj,並且綁定了一個回調函數accept

while True:
    events=sel.select() #檢測所有的fileobj,是否有完成wait data的
    for sel_obj,mask in events:
        callback=sel_obj.data #callback=accpet
        callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)

#客戶端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8088))

while True:
    msg=input('>>: ')
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data=c.recv(1024)
    print(data.decode('utf-8'))

基於selectors模塊實現聊天
selectors模塊

 

 

4.異步IO(Asynchronous I/O)模型

異步框架最完美,程序發起系統調用之后,無需阻塞等待 client端發送數據過來,也不需要等待數據從內核到用戶空間;

而是我發起系統調用之后,沒有任何的等待,有消息過來,我馬上得知;

可以從上面看出來:

異步IO框架完全沒有了wait_data和copy_data的過程,所以它可以響應更多的請求;

雖然異步框架代碼實現起來比較困難,但是我們可以直接使用別人寫好的框架例如:Tornado/Twisted....需要說明的是Django是阻塞型IO Web框架


 

IO模型總結:

只有對各種IO模型有1定了解之后,你才能知道,在什么樣的場景中需要使用什么樣的框架,使用什么手段提升你網站的並發; 

 

Asyncio模塊

gevent模塊雖然可以實現協程但是需要在使用前給所有io操作相關的模塊加上 monkey path會影響模塊那些模塊原本的功能,在一次面試中有人建議我使用 asyncio模塊,於是淺嘗一下。

 

import asyncio

#執行1個協程
async def func(name):#定義1個協程
    print(name,'start')
    #await 關鍵字出現在可能發生IO阻塞的前面
    #await 必須卸載asyncio的函數里
    await asyncio.sleep(1)
    print('end')

loop=asyncio.get_event_loop()#創建1個事件循環
loop.run_until_complete(func('張根'))


#執行多個協程
async def func1(name):#async語法用於定義1個協程函數
    print(name,'start')
    #await 關鍵字出現在可能發生IO阻塞的前面
    #await 必須卸載asyncio的函數里
    await asyncio.sleep(1)
    print('end')

loop=asyncio.get_event_loop()#創建1個事件循環
loop.run_until_complete(asyncio.wait([func('張根'),func1('小米')]))
asyncio模塊簡單使用

 

 Asyncio模塊實現的原理

如果想要實現協程就必須首先做到 可以在1個線程里 切換執行代碼,Python里面什么能做到 代碼切換執行呢?

沒錯   yield, yield的切換 就是實現 async模塊最最基本的前提。

 

yield切換功能

如下面的代碼,func函數里面有yield關鍵字就成了生成器函數,調用生成器函數得了1個生成器對象。既然是生成器對象那我們就可以1點1點得執行生成器里的代碼了。

 

在函數外面next(g)開始調用這個生成器

第1次next(g)

print('2.我切到函數外面了')

print('1.我在函數里面') 代碼執行

第二次next(g)

print('3.我切回函數里面來了') 代碼執行

print('4.我又切到函數外面了')

這就是代碼切換。

def func():
    print('1.我在函數里面')
    yield
    print('3.我切回函數里面來了')

g=func()
next(g)
print('2.我切到函數外面了')
try:
    next(g)
except StopIteration:pass  #捕捉生成器 最后的StopIteration異常
print('4.我又切到函數外面了')

#代碼執行的流程
# 1.我在函數里面
# 2.我切到函數外面了
# 3.我切回函數里面來了
# 4.我又切到函數外面了

 

yield from是什么?

 顧英文名思義就是可以在另1個函數把其他生成器 yiled 出來,把當前函數做成了1個匯聚型的生成器;

def coroutine1(n):
    print('我這coroutine1里面')
    yield 'coroutine1' 


def coroutine2(n):
    print('我這coroutine2里面')
    yield 'coroutine2'


def func(n):
    print('我在func里面')
    yield from coroutine1(n) #yield from 相當於1個中間件,可以在1個函數里面 直接 把其他生成器 yield出來
    yield from coroutine2(n)

g=func(2)
ret1=next(g)
ret2=next(g)

print(ret1,ret2)

 

 使用 yield from 實現1個簡單的async模塊

import time
def coroutine(n):
    print('start sleep')
    yield time.time()+n #返回需要切換回此處的時間
    print('end sleep')



def func(n):
    print('1.我在函數里面')
    yield from coroutine(n)

g1=func(8)
g2=func(8)

ret1=next(g1)
ret2=next(g2)

#
time_dict={ret1:g1,ret2:g2}
while time_dict:
    min_time=min(time_dict)         #獲取最近的時間
    time.sleep(min_time-time.time()) #執行IO耗時操作
    try:
        next(time_dict[min_time])#切回到 coroutine 繼續執行
    except StopIteration:pass
    del time_dict[min_time]     #刪除

 

 

 

 

 

asnycio模塊

阻塞/非阻塞和同步/異步的區別

IO模型

協程yeild原理

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM