前言
如何在使用1個線程的前提下,提網站的並發性,使用協程?
如果要使用協程首先要解決2個問題:
1.如何檢測到代碼中遇到了IO操作?(XX)
2.如何在線程代碼里上下切換?(Greelet模塊)
而Gvent模塊封裝好了以上2種功能,可以讓我們在python中優雅的使用協程;
一、Gvent是什么?
:\版本1\cmdb_rbac_arya>pip show gevent Name: gevent Version: 1.4.0 Summary: Coroutine-based network library #基於協程的網站庫 Home-page: http://www.gevent.org/ Author: Denis Bilenko Author-email: denis.bilenko@gmail.com #以后使用過程出現任何問題 給 Denis Bilenko發郵箱 License: MIT Location: d:\python3.6.1\lib\site-packages Requires: greenlet, cffi #需要依賴的庫
二、Linux網絡IO模型
網絡IO模型是指:Linux服務端如何接收、處理客戶端請求的形式;(智慧、創新的程序們設計那么多的IO模型,為1個目的 讓我們的網頁訪問更快。。。)
由於同步(synchronous) IO和異步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分別是什么,到底有什么區別?
不同的上下文環境里,所表達的意義有所不同,開篇前先框定今天的topic,我們今天主要講得是Linux網絡IO模型;
Linux系統服務器1次網絡IO發生時涉及的對象和階段?
對於一個Linux系統服務器的network IO (這里我們以read舉例,也就是python的socket模塊執行socket.receive()),它會涉及到2個對象,
對象1.調用這個IO的process (or thread)【我們在服務器端寫得web程序】
對象2.系統內核(kernel)
當我們在Linux服務端執行socket.receive()方法接收客戶端發送過來的消息時,該操作其實會經歷2個過程:
階段1.等待客戶端的數據傳輸到服務端的內核空間 (Waiting for the data to be ready):等待客戶端執行sen(b'hello')把數據傳輸到服務端的操作系統(內核空間);
階段2.將數據從內核空間拷貝到用戶空間(Copying the data from the kernel to the process): 我們socket程序工作在用戶空間,So階段2是等待數據從內核空間復制到用戶空間;
經歷了以上2個過程之后才 服務端的socket.receive()才得到了用戶send(b'hello')消息;
ps:
嚴格應該是4個步驟
1.socket.reveive 等待用戶分組發完數據
2.數據由用戶空間到內核空間
3.等待內核准備數據
4.內核准備玩據,數據由內核空間回到用戶空間
因為步驟2和步驟4的耗時很快,是可以忽略不計的。
記住這兩點真的真的很重要,IO模型的演變,都是圍繞着 盡可能規避這2個過程,為中心來展開的!
o
阻塞/非阻塞 和 同步/異步 的區別?
阻塞、非阻塞是描述得是 程序在發起系統調用之后等待IO模型返回消息時的狀態 ;
比如socket在執行recive()方法的時候,當前線程由活躍---->阻塞狀態,直到消息返回(執行以上2個過程之后), 程序由阻塞--->就緒---->活躍狀態,如果socket.setblocking(False),socket在執行recive()方法向操作系統發起系統調用之后, 將不會進入阻塞狀態,所以非阻塞IO可以大大提升進程對CPU的利用效率 ;
同步、異步描述得是程序從IO模型 獲得消息的機制;
程序發起系統調用之后,最終消息是通過1種什么樣的機制 回調、通知到程序的;
阻塞、非阻塞 和 同步、異步和 的組合?
下面說說的:阻塞(blocking) IO模型,非阻塞(non-blocking )IO模型,IO 多路復用(multiplexing)都屬於同步(synchronous) IO這一類;
而 異步(asynchronous) I/O屬於異步IO模型。
二、各種IO模型介紹
1.阻塞IO(blocking IO)模型
我們寫平時寫的socket默認都是阻塞IO;
A.客戶端連接服務端
B.服務端發起系統調用
C.等待客戶端send()消息到操作系統(內核空間)(此時socket進程進入阻塞狀態,不能接收新的連接請求,你什么也不可以做!)
D.等待內核把數據從內核空間copy到用戶空間 (此時socket進程進入阻塞狀態,還是不能接受新的連接請求,你什么也不可以做!)
E.直到數據由內核空間---->socket進程,socket開始響應客戶端
F.開始接收新的客戶端連接請求(循環A-F 的步驟)
解決之道:
每當1個客戶端連接到服務端,服務端開啟1個線程,來處理客戶端請求,制造並發;
利:
大道至簡,這樣程序處理邏輯最簡單;
弊:
即便1個線程維護1個客戶端連接,但是每1個線程依然規避不了C、D這2個過程中的程序阻塞;
2.非阻塞IO(non-blocking IO)模型
我們寫平時寫的socket中調用socket.setblocking(False)之后,此時socket就是非阻塞式的了;
A.客戶端連接服務端
B.服務端socket發起系統調用
C.內核說:滾回去,不阻塞你了,你的數據還沒來呢!(此時 您的進程可以 print(‘fuck’)、可以接收其它用戶的連接.....You can do everything);
D.服務端socket再次發起系統調用
E.內核說:還給我滾回去,不阻塞你了,你的數據還沒來呢!(此時 您的進程可以 print(‘fuck’)、可以接收其它用戶的連接.....You can do everything);
F.服務端socket不斷發起系統調用進行輪詢內核,一直到數據從內核空間准備到了用戶空間,進程才拿到了數據;
解決之道:
socket.setblocking(False)
利:
規避了進程等待消息從client端------>服務端socket的過程;
進程根本不會再進入阻塞狀態,所以擁有了可以處理多個客戶端請求的能力;
弊:
不斷得發起系統調用輪詢內核
雖然進程不需要進入阻塞狀態,也規避了等待client端發送消息過來了的過程;
但copy data from kenel to user的過程,依然強硬的存在着;
代碼實現

import socket sk=socket.socket() sk.bind(('127.0.0.1',8080)) sk.setblocking(False)#把socket中所有需要阻塞的方法都改變成非阻塞:recv() recvfrom() accept() sk.listen() conn_list=[]#用來存儲所有來請求server端的conn連接 del_conn_list=[]#用來存儲所有已經和server端斷開連接的conn連接 while True: # 在非阻塞式IO中,用戶進程其實是需要不斷的主動詢問kernel數據准備好了沒有。 try: conn,ipaddr=sk.accept()#去問問內核有人連接我不? 設置socket不阻塞之后,這里會報錯,因為這不會阻塞住等待數據 由內核-->用戶空間 print('建立連接了') conn_list.append(conn) except BlockingIOError: for con in conn_list: try: msg=con.recv(1024)#去問問內核連接過我的人中,有人給我發消息不? if msg == b'':#如果client端和server端斷開了連接,server端就會收到b''空消息 del_conn_list.append(con) continue print(msg) con.send(b'Byebye') except BlockingIOError: pass else: for con in del_conn_list:#從正在連接的socket列表刪除已經斷開連接的conn conn_list.remove(con) con.close() del_conn_list.clear()#把刪除列表的scnn清空 #

import socket import time import threading def func(): sk=socket.socket() sk.connect(('127.0.0.1',8080)) sk.send(b'hello') time.sleep(1) print(sk.recv(1024)) sk.close() for i in range(20): threading.Thread(target=func).start()
3.IO多路復用/事件驅動IO(event driven IO)模型
雖然上面的異步IO模型在使用了單線程並切在沒有進行協程切換的前提下實現了並發,但是也並不完美,因為while ture會非常耗費內存,不斷得向Linux內核
.accept() .recv()輪詢,也會造成CPU負載過大;
IO多路復用出現了!
IO多路復用模型工作機制
IO多路復用中的IO依然是阻塞的,但是操作系統給我們提供了1個代理程序(Linux系統是epllo、Windows是select),這個代理程序去循環監聽1個socket列表;
該列表中監控2類socket:
conn,ipaddr=sk.accept()是否來了客戶端連接?
監聽msg=conn.recv(1024)客戶端連接是否發來了新數據?
這就意味着 此時我們Python程序的1只手被解放了!!為什么是1只手被解放了?而不是雙手?
即便IO多路復用網絡模型規避了等待client端發送消息過來的過程,但依舊還是沒有規避等待消息從內核空間copy到用戶空間的過程;
1.等待client端發送消息過來
2.等待消息從內核空間copy到用戶空間
Python程序不用去等待client端發送消息過來,而是由這個操作系統提供代理程序(select/epoll)去等待;
一旦有新的conn 或者conn有新的消息發送過來,代理程序立即回調、通知Python程序;
此時Python程序執行 socket.accept() conn.receive() ,恰到好處得接收到了新的client請求、conn連接發送過來的數據;
IO多路復用模型流程圖
解決之道:
找個代理監控socket、conn連接,如果有新的客戶端連接、或者新消息,回調Python;
規避了等待client端發送消息過來的過程
利:
規避了等待client端發送消息過來的過程消息回調通知機制,不用向非阻塞IO一樣,不斷得輪詢了;
弊:
雖然IO多路復用模型,有效的規避了等待client客戶端發送消息到server端的過程,並解決了非阻塞IO模型不斷輪詢操作系統的問題;
但copy data from kenel to user的過程,依然強硬的存在着;
建議:
因為在操作系統和程序之間加了中間代理回調,所以網站處理的連接數不是很高的話,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。
應用場景:
select的優勢在於可以處理多個連接,不適用於單個連接;
代碼實現

import select import socket sk=socket.socket() sk.bind(('127.0.0.1',8080)) sk.setblocking(False) sk.listen() read_list=[sk] while True:#循環監聽 sk 和連接sk的client r_list, w_list, x_list = select.select(read_list, [], []) for i in r_list: if i is sk: conn,addr=i.accept() read_list.append(conn)#select不僅可以支持對sk對象的監控,還支持對conn對象的監聽 else: msg=i.recv(1024) if msg == b'':#client端和server端斷開連接了 print('斷開') read_list.remove(i) i.close() continue print(msg) i.send(b'byebye!!')

import socket import time import threading import time def func(): sk=socket.socket() sk.connect(('127.0.0.1',8080)) sk.send(b'hello') time.sleep(1) print(sk.recv(1024)) sk.close() for i in range(10): threading.Thread(target=func).start()
不同操作系統中IO多路復用模型介紹
select機制 :IO多路復用模型得以實現得核心:就是操作系統 監控1個[sk......conn,]列表,不斷輪詢每1個sk/conn/是否可以accpet/revive,隨着監控列表的增加,效率會遞減;
支持操作系統:linux/windows
poll機制:和select機制一樣,但是支持監控的socket會比select多
支持操作系統:linux
epoll機制:和select機制完全不一樣
1.epoll很高級,epoll不會去再通過操作循環檢查監控的socket列表中,那些socket出現了讀操作,而是給需要監聽的socket 1--1綁定1個回調函數;
2.檢測的socket中 有1個soket出現了讀操作,直接執行調用那個和該sk/con綁定的回調函數執行sk.accpet() 和conn.receve()
epoll的優勢:
回調函數取代了循環輪詢的反饋方式,所以性能大大提升;
比如epoll監控了1個長度為3000的socket列表,1993_socket和2019_socket同時出現了讀操作,那么這2個soket的回調函數同時執行,去響應客戶端;
省去了0---1993---->2019循環的過程;
支持操作系統:linux
如果在Linux操作系統我肯定會選擇epoll而不是select啊;所以如何讓自己的代碼兼容不同平台,自動選擇最佳的IO多路復用代理?
Python中的selectors模塊就是幫我們自動選擇最佳IO多路復用代理的;

#服務端 from socket import * import selectors sel=selectors.DefaultSelector() def accept(server_fileobj,mask): conn,addr=server_fileobj.accept() sel.register(conn,selectors.EVENT_READ,read) def read(conn,mask): try: data=conn.recv(1024) if not data: print('closing',conn) sel.unregister(conn) conn.close() return conn.send(data.upper()+b'_SB') except Exception: print('closing', conn) sel.unregister(conn) conn.close() server_fileobj=socket(AF_INET,SOCK_STREAM) server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server_fileobj.bind(('127.0.0.1',8088)) server_fileobj.listen(5) server_fileobj.setblocking(False) #設置socket的接口為非阻塞 sel.register(server_fileobj,selectors.EVENT_READ,accept) #相當於網select的讀列表里append了一個文件句柄server_fileobj,並且綁定了一個回調函數accept while True: events=sel.select() #檢測所有的fileobj,是否有完成wait data的 for sel_obj,mask in events: callback=sel_obj.data #callback=accpet callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1) #客戶端 from socket import * c=socket(AF_INET,SOCK_STREAM) c.connect(('127.0.0.1',8088)) while True: msg=input('>>: ') if not msg:continue c.send(msg.encode('utf-8')) data=c.recv(1024) print(data.decode('utf-8')) 基於selectors模塊實現聊天
4.異步IO(Asynchronous I/O)模型
異步框架最完美,程序發起系統調用之后,無需阻塞等待 client端發送數據過來,也不需要等待數據從內核到用戶空間;
而是我發起系統調用之后,沒有任何的等待,有消息過來,我馬上得知;
可以從上面看出來:
異步IO框架完全沒有了wait_data和copy_data的過程,所以它可以響應更多的請求;
雖然異步框架代碼實現起來比較困難,但是我們可以直接使用別人寫好的框架例如:Tornado/Twisted....需要說明的是Django是阻塞型IO Web框架;
IO模型總結:
只有對各種IO模型有1定了解之后,你才能知道,在什么樣的場景中需要使用什么樣的框架,使用什么手段提升你網站的並發;
Asyncio模塊
gevent模塊雖然可以實現協程但是需要在使用前給所有io操作相關的模塊加上 monkey path會影響模塊那些模塊原本的功能,在一次面試中有人建議我使用 asyncio模塊,於是淺嘗一下。

import asyncio #執行1個協程 async def func(name):#定義1個協程 print(name,'start') #await 關鍵字出現在可能發生IO阻塞的前面 #await 必須卸載asyncio的函數里 await asyncio.sleep(1) print('end') loop=asyncio.get_event_loop()#創建1個事件循環 loop.run_until_complete(func('張根')) #執行多個協程 async def func1(name):#async語法用於定義1個協程函數 print(name,'start') #await 關鍵字出現在可能發生IO阻塞的前面 #await 必須卸載asyncio的函數里 await asyncio.sleep(1) print('end') loop=asyncio.get_event_loop()#創建1個事件循環 loop.run_until_complete(asyncio.wait([func('張根'),func1('小米')]))
Asyncio模塊實現的原理
如果想要實現協程就必須首先做到 可以在1個線程里 切換執行代碼,Python里面什么能做到 代碼切換執行呢?
沒錯 yield, yield的切換 就是實現 async模塊最最基本的前提。
yield切換功能
如下面的代碼,func函數里面有yield關鍵字就成了生成器函數,調用生成器函數得了1個生成器對象。既然是生成器對象那我們就可以1點1點得執行生成器里的代碼了。
在函數外面next(g)開始調用這個生成器
第1次next(g)
print('2.我切到函數外面了')
print('1.我在函數里面') 代碼執行
第二次next(g)
print('3.我切回函數里面來了') 代碼執行
print('4.我又切到函數外面了')
這就是代碼切換。
def func(): print('1.我在函數里面') yield print('3.我切回函數里面來了') g=func() next(g) print('2.我切到函數外面了') try: next(g) except StopIteration:pass #捕捉生成器 最后的StopIteration異常 print('4.我又切到函數外面了') #代碼執行的流程 # 1.我在函數里面 # 2.我切到函數外面了 # 3.我切回函數里面來了 # 4.我又切到函數外面了
yield from是什么?
顧英文名思義就是可以在另1個函數把其他生成器 yiled 出來,把當前函數做成了1個匯聚型的生成器;
def coroutine1(n): print('我這coroutine1里面') yield 'coroutine1' def coroutine2(n): print('我這coroutine2里面') yield 'coroutine2' def func(n): print('我在func里面') yield from coroutine1(n) #yield from 相當於1個中間件,可以在1個函數里面 直接 把其他生成器 yield出來 yield from coroutine2(n) g=func(2) ret1=next(g) ret2=next(g) print(ret1,ret2)
使用 yield from 實現1個簡單的async模塊
import time def coroutine(n): print('start sleep') yield time.time()+n #返回需要切換回此處的時間 print('end sleep') def func(n): print('1.我在函數里面') yield from coroutine(n) g1=func(8) g2=func(8) ret1=next(g1) ret2=next(g2) # time_dict={ret1:g1,ret2:g2} while time_dict: min_time=min(time_dict) #獲取最近的時間 time.sleep(min_time-time.time()) #執行IO耗時操作 try: next(time_dict[min_time])#切回到 coroutine 繼續執行 except StopIteration:pass del time_dict[min_time] #刪除