day36
異步回調與協程
一、異步回調
1、什么是回調:
異步回調指的是:在發起一個異步任務的同時指定一個函數,在異步任務完成時會自動的調用這個函數。
2、為什么需要回調函數
需要獲取異步任務的執行結果,但是又不應該讓其阻塞(降低效率),即想要高效的獲取任務的執行結果。
之前在使用線程池或進程池提交任務時,如果想要處理任務的執行結果則必須調用result函數或是shutdown函數,而它們都是是阻塞的,會等到任務執行完畢后才能繼續執行,這樣一來在這個等待過程中就無法執行其他任務,降低了效率,所以需要一種方案,即保證解析結果的線程不用等待,又能保證數據能夠及時被解析,該方案就是異步回調。
3、如何使用異步回調
通常情況下,異步都會和回調函數一起使用,使用方法即是add_done_callback(),給Future對象綁定一個回調函數。
注意:在多進程中回調函數 是交給主進程來執行 而在多線程中 回調函數是誰有空誰執行(不是主線程)
import requests,re,os,random,time from concurrent.futures import ProcessPoolExecutor def get_data(url): print("%s 正在請求%s" % (os.getpid(),url)) time.sleep(random.randint(1,2)) response = requests.get(url) print(os.getpid(),"請求成功 數據長度",len(response.content)) #parser(response) # 3.直接調用解析方法 哪個進程請求完成就那個進程解析數據 強行使兩個操作耦合到一起了 return response def parser(obj): data = obj.result() htm = data.content.decode("utf-8") ls = re.findall("href=.*?com",htm) print(os.getpid(),"解析成功",len(ls),"個鏈接") if __name__ == '__main__': pool = ProcessPoolExecutor(3) urls = ["https://www.baidu.com", "https://www.sina.com", "https://www.python.org", "https://www.tmall.com", "https://www.mysql.com", "https://www.apple.com.cn"] # objs = [] for url in urls: # res = pool.submit(get_data,url).result() # 1.同步的方式獲取結果 將導致所有請求任務不能並發 # parser(res) obj = pool.submit(get_data,url) # obj.add_done_callback(parser) # 4.使用異步回調,保證了數據可以被及時處理,並且請求和解析解開了耦合 # objs.append(obj) # pool.shutdown() # 2.等待所有任務執行結束在統一的解析 # for obj in objs: # res = obj.result() # parser(res) # 1.請求任務可以並發 但是結果不能被及時解析 必須等所有請求完成才能解析 # 2.解析任務變成了串行,
總結:異步回調使用方法就是在提交任務后得到一個Futures對象,調用對象的add_done_callback來指定一個回調函數。
如果把任務比喻為燒水,沒有回調時就只能守着水壺等待水開,有了回調相當於換了一個會響的水壺,燒水期間可用作其他的事情,等待水開了水壺會自動發出聲音,這時候再回來處理。水壺自動發出聲音就是回調。
注意:
-
使用進程池時,回調函數都是主進程中執行執行;
-
使用線程池時,回調函數的執行線程是不確定的,哪個線程空閑就交給哪個線程;
-
回調函數默認接收一個參數就是這個任務對象自己,再通過對象的result函數來獲取任務的處理結果。
二、線程中的隊列
引入線程隊列 : import queue
線程隊列方法 :
q = queue.Queue() #實例化對列,先進先出
q = queue.LifoQueue() #實例化隊列,后進先出 ( Last in, first out )
q = queue.PriorityQueue() #實例化隊列,優先級隊列
優先級隊列,put() 方法接收的是一個元組,第一個元素是優先級,第二個元素是數據;
優先級可以是數字或字符,只要能夠進行大小比較即可(即優先級必須要是能夠比較大小的);
如果優先級是字符串或特殊字符,按照字符串或特殊字符的ASCII碼比較,如果ASCII碼相同,按照先進先出原則取出。
from queue import Queue,LifoQueue,PriorityQueue # 1. 先進先出隊列 # q = Queue(1) # q.put("a") # q.put("b",timeout=1) # # print(q.get()) # print(q.get(timeout=2)) # 2.last in first out 后進先出隊列(堆棧) # lq = LifoQueue() # lq.put("a") # lq.put("b") # lq.put("c") # # print(lq.get()) # print(lq.get()) # print(lq.get()) # 3.優先級隊列 (取出順序是 由小到大 優先級可以使數字或字符 只要能夠比較大小即可) pq = PriorityQueue() # pq.put((2,"b")) # pq.put((3,"c")) # pq.put((1,"a")) # # print(pq.get()) # print(pq.get()) # print(pq.get()) pq.put((["a"],"bdslkfjdsfjd")) pq.put((["b"],"csdlkjfksdjkfds")) pq.put((["c"],"asd;kjfksdjfkdsf")) print(pq.get()) print(pq.get()) print(pq.get())
三、事件
1、什么是事件
線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其 他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。
2、Event簡述
Event對象包含一個可由線程設置的信號標志,它允許線程等待某些事件的發生。在 初始情況下,Event對象中的信號標志被設置為假。如果有線程等待一個Event對象, 而這個Event對象的標志為假,那么這個線程將會被一直阻塞直至該標志為真。一個線程如果將一個Event對象的信號標志設置為真,它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那么它將忽略這個事件, 繼續執行。
## event的常用方法 event.isSet():返回event的狀態值; event.wait():如果 event.isSet()==False將阻塞線程; event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度; event.clear():恢復event的狀態值為False。
event代碼示例:
使用變量類完成多線程協作
import time from threading import Thread from threading import Event # 創建一個事件(使用異步修改后) e = Event() #默認False def start(): print("正在啟動服務器......") time.sleep(5) print("服務器啟動成功!") e.set() # 就是把事件的值設置為True def connect(): for i in range(3): print("等待服務器啟動....") e.wait(1) # 會阻塞 直到對方把事件設置為True if e.isSet(): print("連接成功!") break else: print("連接失敗") else: #如果3次都沒成功 就打印這個消息 print("服務器沒有啟動") Thread(target=start).start() Thread(target=connect).start()
四、協程
1、引言
上一節中我們知道GIL鎖將導致CPython無法利用多核CPU的優勢,只能使用單核並發的執行。很明顯效率不高,那有什么辦法能夠提高效率呢?
效率要高只有一個方法就是讓這個當前線程盡可能多的占用CPU時間,如何做到?
任務類型可以分為兩種 IO密集型 和 計算密集型
對於計算密集型任務而言 ,無需任何操作就能一直占用CPU直到超時為止,沒有任何辦法能夠提高計算密集任務的效率,除非把GIL鎖拿掉,讓多核CPU並行執行。
對於IO密集型任務任務,一旦線程遇到了IO操作CPU就會立馬切換到其他線程,而至於切換到哪個線程,應用程序是無法控制的,這樣就導致了效率降低。
如何能提升效率呢?想一想如果可以監測到線程的IO操作時,應用程序自發的切換到其他的計算任務,是不是就可以留住CPU?的確如此
2、單線程實現並發
單線程實現並發這句話乍一聽好像在瞎說
首先需要明確並發的定義
並發:指的是多個任務同時發生,看起來好像是同時都在進行
並行:指的是多個任務真正的同時進行
早期的計算機只有一個CPU,既然CPU可以切換線程來實現並發,那么為何不能再線程中切換任務來並發呢?
上面的引子中提到,如果一個線程能夠檢測IO操作並且將其設置為非阻塞,並自動切換到其他任務就可以提高CPU的利用率,指的就是在單線程下實現並發。
3、如何能夠實現並發呢
並發 = 切換任務+保存狀態,只要找到一種方案,能夠在兩個任務之間切換執行並且保存狀態,那就可以實現單線程並發
python中的生成器就具備這樣一個特點,每次調用next都會回到生成器函數中執行代碼,這意味着任務之間可以切換,並且是基於上一次運行的結果,這意味着生成器會自動保存執行狀態!
於是乎我們可以利用生成器來實現並發執行:
def task1(): while True: yield print("task1 run") def task2(): g = task1() while True: next(g) print("task2 run") task2()
並發雖然實現了,單這對效率的影響是好是壞呢?來測試一下
yield實現並發的代碼性能測試
可以看到對於純計算任務而言,單線程並發反而使執行效率下降了一半左右,所以這樣的方案對於純計算任務而言是沒有必要的
我們暫且不考慮這樣的並發對程序的好處是什么,在上述代碼中,使用yield來切換是的代碼結構非常混亂,如果十個任務需要切換呢,不敢想象!因此就有人專門對yield進行了封裝,這便有了greenlet模塊
4、greenlet模塊實現並發
def task1(name): print("%s task1 run1" % name) g2.switch(name) # 切換至任務2 print("task1 run2") g2.switch() # 切換至任務2 def task2(name): print("%s task2 run1" % name) g1.switch() # 切換至任務1 print("task2 run2") g1 = greenlet.greenlet(task1) g2 = greenlet.greenlet(task2) g1.switch("jerry") # 為任務傳參數
該模塊簡化了yield復雜的代碼結構,實現了單線程下多任務並發,但是無論直接使用yield還是greenlet都不能檢測IO操作,遇到IO時同樣進入阻塞狀態,所以此時的並發是沒有任何意義的。
現在我們需要一種方案 即可檢測IO 又能夠實現單線程並發,於是gevent閃亮登場
協程:是單線程下的並發,又稱微線程,纖程。英文名Coroutine。一句話說明什么是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序自己控制調度的。
需要強調的是:
#1. python的線程屬於內核級別的,即由操作系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其他線程運行) #2. 單線程內開啟協程,一旦遇到io,就會從應用程序級別(而非操作系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)
對比操作系統控制線程的切換,用戶在單線程內控制協程的切換
優點如下:
#1. 協程的切換開銷更小,屬於程序級別的切換,操作系統完全感知不到,因而更加輕量級 #2. 單線程內就可以實現並發的效果,最大限度地利用cpu
缺點如下:
#1. 協程的本質是單線程下,無法利用多核,可以是一個程序開啟多個進程,每個進程內開啟多個線程,每個線程內開啟協程來盡可能提高效率 #2. 協程指的是單個線程,因而一旦協程出現阻塞,將會阻塞整個線程
6、gevent協程的使用
import gevent,sys from gevent import monkey # 導入monkey補丁 monkey.patch_all() # 打補丁 import time print(sys.path) def task1(): print("task1 run") # gevent.sleep(3) time.sleep(3) print("task1 over") def task2(): print("task2 run") # gevent.sleep(1) time.sleep(1) print("task2 over") g1 = gevent.spawn(task1) g2 = gevent.spawn(task2) gevent.joinall([g1,g2])
需要注意:
1.協程執行時要想使任務執行則必須對協程對象調用join函數
2.有多個任務時,隨便調用哪一個的join都會並發的執行所有任務,但是需要注意如果一個存在io的任務沒有被join該任務將無法正常執行完畢
3.monkey補丁的原理是把原始的阻塞模塊替換為修改后的非阻塞模塊,即偷梁換柱,來實現IO自定切換,所以打補丁的位置一定要放到導入阻塞模塊之前
