IO多路復用
I/O多路復用 : 通過一種機制,可以監視多個描述符,一旦某個描述符就緒(一般是讀就緒或者寫就緒),能夠通知程序進行相應的讀寫操作.
Python
Python中有一個select模塊,其中提供了 : select , poll , epoll ,三個方法,分別調用系統的select , poll , epoll ,從而實現IO多路復用.


注意:網絡操作、文件操作、終端操作等均屬於IO操作,對於windows只支持Socket操作,其他系統支持其他IO操作,但是無法檢測 普通文件操作 自動上次讀取是否已經變化。
單線程實現並發
對於select方法:
句柄列表11, 句柄列表22, 句柄列表33 = select.select(句柄序列1, 句柄序列2, 句柄序列3, 超時時間) 參數: 可接受四個參數(前三個必須) 返回值:三個列表 select方法用來監視文件句柄,如果句柄發生變化,則獲取該句柄。 1、當 參數1 序列中的句柄發生可讀時(accetp和read),則獲取發生變化的句柄並添加到 返回值1 序列中 2、當 參數2 序列中含有句柄時,則將該序列中所有的句柄添加到 返回值2 序列中 3、當 參數3 序列中的句柄發生錯誤時,則將該發生錯誤的句柄添加到 返回值3 序列中 4、當 超時時間 未設置,則select會一直阻塞,直到監聽的句柄發生變化 當 超時時間 = 1時,那么如果監聽的句柄均無任何變化,則select會阻塞 1 秒,之后返回三個空列表,如果監聽的句柄有變化,則直接執行。
import socket import select client1 = socket.socket() client1.setblocking(False) # 百度創建連接: 非阻塞(原本要等待回復或者連接,現在是發送之后再阻塞的時間就可以去執行別的任務 try: client1.connect(('www.baidu.com',80)) #執行了,但是報錯了 except BlockingIOError as e: pass client2 = socket.socket() client2.setblocking(False) # 搜狗創建連接: 非阻塞 try: client2.connect(('www.sogou.com',80)) except BlockingIOError as e: pass client3 = socket.socket() client3.setblocking(False) # 搜狐創建連接: 非阻塞 try: client3.connect(('www.souhu.com',80)) except BlockingIOError as e: pass socket_list = [client1,client2,client3] conn_list = [client1,client2,client3] while True: """ socket_list:檢測是否服務端給我返回數據,可讀 conn_list:檢測其中的所有socket是否已經和服務端連接成功,可寫 rlist :就是有 [client2 , client3 不一定有1,2,3哪一個] wlist:[client1,client2] [] : 是連接不成功的放這里 0.005 : 最多0.005秒檢測一次,檢測是否連接成功 / 返回數據 """ rlist,wlist,elist = select.select(socket_list,conn_list,[],0.005) # wlist中表示已經連接成功的socket對象 for sk in wlist: if sk == client1: sk.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n') elif sk==client2: sk.sendall(b'GET /web?query=fdf HTTP/1.0\r\nhost:www.sogou.com\r\n\r\n') else: sk.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.souhu.com\r\n\r\n') conn_list.remove(sk) """ 連接成功之后就不用繼續監聽是否連接成功了,所有剔除 """ for sk in rlist: chunk_list = [] while True: """ 不阻塞,但是如果沒有了會報錯,都接受之后 """ try: chunk = sk.recv(8096) if not chunk: break chunk_list.append(chunk) except BlockingIOError as e: break body = b''.join(chunk_list) print('------------>',body) sk.close() socket_list.remove(sk) if not socket_list: break
import socket import select class Req(object): def __init__(self,sk,func): self.sock = sk self.func = func def fileno(self): return self.sock.fileno() class Nb(object): def __init__(self): self.conn_list = [] self.socket_list = [] def add(self,url,func): client = socket.socket() client.setblocking(False) # 非阻塞 try: client.connect((url, 80)) except BlockingIOError as e: pass obj = Req(client,func) self.conn_list.append(obj) self.socket_list.append(obj) def run(self): while True: rlist,wlist,elist = select.select(self.socket_list,self.conn_list,[],0.005) # wlist中表示已經連接成功的req對象 for sk in wlist: # 發生變換的req對象 sk.sock.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n') self.conn_list.remove(sk) for sk in rlist: chunk_list = [] while True: try: chunk = sk.sock.recv(8096) if not chunk: break chunk_list.append(chunk) except BlockingIOError as e: break body = b''.join(chunk_list) # print(body.decode('utf-8')) sk.func(body) sk.sock.close() self.socket_list.remove(sk) if not self.socket_list: break def baidu_repsonse(body): print('百度下載結果:',body) def sogou_repsonse(body): print('搜狗下載結果:', body) def souhu_repsonse(body): print('老男孩下載結果:', body) t1 = Nb() t1.add('www.baidu.com',baidu_repsonse) t1.add('www.sogou.com',sogou_repsonse) t1.add('www.souhu.com',oldboyedu_repsonse) t1.run()
基於事件循環實現的異步非阻塞框架 Twisted
異步 : 執行完某個任務后自動調用分配的函數
非阻塞 : 不等待 (給個任務之后不用等待回復)
from lzl import Nb def baidu_repsonse(body): print('百度下載結果:',body) def sogou_repsonse(body): print('搜狗下載結果:', body) def souhu_repsonse(body): print('搜狐下載結果:', body) t1 = Nb() t1.add('www.baidu.com',baidu_repsonse) t1.add('www.sogou.com',sogou_repsonse) t1.add('www.souhu.com',oldboyedu_repsonse) t1.run() #連接網絡請求之后,不用去等着回復直接去執行后面相對應的函數
總結 :
1. IO多路復用的作用 : 檢測多個socket是否發送變化 (三種模式 select , poll, epoll .Windows系統只支持 select).
2.異步非阻塞:
異步:通知,執行之后自動執行回調函數或自動執行某些操作(通知).
非阻塞 : 不等待 :
比如:創建socket對某個地址進行 connect,獲取接收數據recv時默認都會等待(連接成功或接收數據),才執行后續操作.-----如果設置了 setblocking(False),以上兩個過程就不在等待,但是會報錯-BiockingIOError的錯誤,只要捕獲即可.
3.同步阻塞 :
阻塞: 等待
同步 : 按照順序逐步執行.
key_list = ['ab','db','sb'] for item in key_list: ret = requests.get('https://www.baidu.com/s?wd=%s' %item) print(ret.text)
協程
概念
協程 : 是單線程下的並發,又稱"微線程",是由程序員創造出來的一個不是真實存在的東西.
作用是 : 對一個線程進行切片,使得線程在代碼塊之間進行來回切換執行任務,而不是原來逐行執行.
注意 :
#1. python的線程屬於內核級別的,即由操作系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其他線程運行) #2. 單線程內開啟協程,一旦遇到io,就會從應用程序級別(而非操作系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)
對比操作系統的線程的切換,用戶在單線程內控制協程的切換 :
#優點: #1. 協程的切換開銷更小,屬於程序級別的切換,操作系統完全感知不到,因而更加輕量級 #2. 單線程內就可以實現並發的效果,最大限度地利用cpu #缺點: #1. 協程的本質是單線程下,無法利用多核,可以是一個程序開啟多個進程,每個進程內開啟多個線程,每個線程內開啟協程 #2. 協程指的是單個線程,因而一旦協程出現阻塞,將會阻塞整個線程
協程的特點 :
- 必須在只有一個單線程里實現並發
- 修改共享數據不需加鎖
- 用戶程序里自己保存多個控制流的上下文棧
- 附加:一個協程遇到IO操作自動切換到其它協程(如何實現檢測IO,yield、greenlet都無法實現,就用到了gevent模塊(select機制))
greenlet 模塊
先安裝greenlet模塊 ; pip3 install greenlet
import greenlet def f1(): print(11) gr2.switch() print(22) gr2.switch() def f2(): print(33) gr1.switch() print(44) # 協程 gr1 gr1 = greenlet.greenlet(f1) # 協程 gr2 gr2 = greenlet.greenlet(f2) gr1.switch() 結果: 11 33 22 44
注意 :單純的切換有時候還會降低程序的執行速度,greenlet只是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時如果遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提升效率的問題。
單線程里的這20個任務的代碼通常會既有計算操作又有阻塞操作,我們完全可以在執行任務1時遇到阻塞,就利用阻塞的時間去執行任務2。。。。如此,才能提高效率,這就用到了Gevent模塊。
所以說 當--- 協程 + 遇到IO就切換 ===厲害了!!!
gevent 模塊
安裝 : pip3 install gevent
from gevent import monkey monkey.patch_all() # 以后代碼中遇到IO都會自動執行greenlet的switch進行切換 import requests import gevent def get_page1(url): ret = requests.get(url) print(url,ret.content) def get_page2(url): ret = requests.get(url) print(url,ret.content) def get_page3(url): ret = requests.get(url) print(url,ret.content) gevent.joinall([ gevent.spawn(get_page1, 'https://www.python.org/'), # 協程1 gevent.spawn(get_page2, 'https://www.yahoo.com/'), # 協程2 gevent.spawn(get_page3, 'https://github.com/'), # 協程3 ]) #結果的順序是不確定的,A,B,C三個請求,當執行A的時候遇到阻塞,那么久在阻塞的這個時間段去執行不阻塞的B或者C,當執行B的時候遇到阻塞就去執行A/C,在網絡編程中會有請求時候的阻塞和接收時候的阻塞.
總結 :
1.協程能提高並發嗎 ?
答:協程本身是無法提高並發的,但是協程+IO切換可以.
2.單線程提高並發的方法:
--- 協程+IO切換 gevent
--- 基於事件循環的異步非阻塞框架 Twisted
3.進程,線程,協程的區別 : ★★★★★★★★★★★★
--- 進程是資源分配的最小單位,線程是CPU調度的最小單位.
--- 在一個程序中可以有多個進行,一個進程最少有一個線程.
--- 和其他語言相比較,其他語言幾乎不用進程的,但是在Python中,它的進程和線程是有差異的,Python有個GIL鎖,GIL鎖保證一個進程在同一時刻只有一個線程被CPU調到.
--- 對於協程來說,它是有程序員創造出來的不是一個真實存在的東西,它本身是沒有意義的,但是當 協程+IO切換放到一起的時候就可以提高單線程並發的性能.
