異步回調 協程


day36

異步回調與協程

一、異步回調

1、什么是回調:

異步回調指的是:在發起一個異步任務的同時指定一個函數,在異步任務完成時會自動的調用這個函數。

2、為什么需要回調函數

需要獲取異步任務的執行結果,但是又不應該讓其阻塞(降低效率),即想要高效的獲取任務的執行結果。

之前在使用線程池或進程池提交任務時,如果想要處理任務的執行結果則必須調用result函數或是shutdown函數,而它們都是是阻塞的,會等到任務執行完畢后才能繼續執行,這樣一來在這個等待過程中就無法執行其他任務,降低了效率,所以需要一種方案,即保證解析結果的線程不用等待,又能保證數據能夠及時被解析,該方案就是異步回調。

3、如何使用異步回調

通常情況下,異步都會和回調函數一起使用,使用方法即是add_done_callback(),給Future對象綁定一個回調函數。

注意:在多進程中回調函數 是交給主進程來執行 而在多線程中 回調函數是誰有空誰執行(不是主線程)

import requests,re,os,random,time
from concurrent.futures import ProcessPoolExecutor
​
def get_data(url):
    print("%s 正在請求%s" % (os.getpid(),url))
    time.sleep(random.randint(1,2))
    response = requests.get(url)
    print(os.getpid(),"請求成功 數據長度",len(response.content))
    #parser(response) # 3.直接調用解析方法  哪個進程請求完成就那個進程解析數據  強行使兩個操作耦合到一起了
    return response
​
def parser(obj):
    data = obj.result()
    htm = data.content.decode("utf-8")
    ls = re.findall("href=.*?com",htm)
    print(os.getpid(),"解析成功",len(ls),"個鏈接")
​
if __name__ == '__main__':
    pool = ProcessPoolExecutor(3)
    urls = ["https://www.baidu.com",
            "https://www.sina.com",
            "https://www.python.org",
            "https://www.tmall.com",
            "https://www.mysql.com",
            "https://www.apple.com.cn"]
    # objs = []
    for url in urls:
        # res = pool.submit(get_data,url).result() # 1.同步的方式獲取結果 將導致所有請求任務不能並發
        # parser(res)
​
        obj = pool.submit(get_data,url) # 
        obj.add_done_callback(parser) # 4.使用異步回調,保證了數據可以被及時處理,並且請求和解析解開了耦合
        # objs.append(obj)
        
    # pool.shutdown() # 2.等待所有任務執行結束在統一的解析
    # for obj in objs:
    #     res = obj.result()
    #     parser(res)
    # 1.請求任務可以並發 但是結果不能被及時解析 必須等所有請求完成才能解析
    # 2.解析任務變成了串行,
View Code

 

總結:異步回調使用方法就是在提交任務后得到一個Futures對象,調用對象的add_done_callback來指定一個回調函數。

如果把任務比喻為燒水,沒有回調時就只能守着水壺等待水開,有了回調相當於換了一個會響的水壺,燒水期間可用作其他的事情,等待水開了水壺會自動發出聲音,這時候再回來處理。水壺自動發出聲音就是回調。

注意:

  1. 使用進程池時,回調函數都是主進程中執行執行;

  2. 使用線程池時,回調函數的執行線程是不確定的,哪個線程空閑就交給哪個線程;

  3. 回調函數默認接收一個參數就是這個任務對象自己,再通過對象的result函數來獲取任務的處理結果。

二、線程中的隊列

引入線程隊列 : import queue  

  線程隊列方法 :

    q = queue.Queue()  #實例化對列,先進先出

    q = queue.LifoQueue()  #實例化隊列,后進先出  ( Last in, first out )

    q = queue.PriorityQueue()  #實例化隊列,優先級隊列

      優先級隊列,put() 方法接收的是一個元組,第一個元素是優先級,第二個元素是數據;

      優先級可以是數字或字符,只要能夠進行大小比較即可(即優先級必須要是能夠比較大小的);

      如果優先級是字符串或特殊字符,按照字符串或特殊字符的ASCII碼比較,如果ASCII碼相同,按照先進先出原則取出。

from queue import Queue,LifoQueue,PriorityQueue
​
# 1. 先進先出隊列
# q = Queue(1)
# q.put("a")
# q.put("b",timeout=1)
#
# print(q.get())
# print(q.get(timeout=2))
# 2.last in first out 后進先出隊列(堆棧)
# lq = LifoQueue()
# lq.put("a")
# lq.put("b")
# lq.put("c")
#
# print(lq.get())
# print(lq.get())
# print(lq.get())
# 3.優先級隊列  (取出順序是 由小到大  優先級可以使數字或字符 只要能夠比較大小即可)
pq = PriorityQueue()
# pq.put((2,"b"))
# pq.put((3,"c"))
# pq.put((1,"a"))
#
# print(pq.get())
# print(pq.get())
# print(pq.get())
​
pq.put((["a"],"bdslkfjdsfjd"))
pq.put((["b"],"csdlkjfksdjkfds"))
pq.put((["c"],"asd;kjfksdjfkdsf"))
​
print(pq.get())
print(pq.get())
print(pq.get())
View Code

 

三、事件

1、什么是事件

線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其 他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。

2、Event簡述

Event對象包含一個可由線程設置的信號標志,它允許線程等待某些事件的發生。在 初始情況下,Event對象中的信號標志被設置為假。如果有線程等待一個Event對象, 而這個Event對象的標志為假,那么這個線程將會被一直阻塞直至該標志為真。一個線程如果將一個Event對象的信號標志設置為真,它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那么它將忽略這個事件, 繼續執行。

## event的常用方法
event.isSet():返回event的狀態值;
event.wait():如果 event.isSet()==False將阻塞線程;
event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;
event.clear():恢復event的狀態值為False。

 

event代碼示例:

使用變量類完成多線程協作

 

import time
from threading import Thread
from threading import Event
​
# 創建一個事件(使用異步修改后)
e = Event() #默認False
def start():
​
    print("正在啟動服務器......")
    time.sleep(5)
    print("服務器啟動成功!")
    e.set() # 就是把事件的值設置為True
def connect():
    for i in range(3):
        print("等待服務器啟動....")
        e.wait(1) # 會阻塞 直到對方把事件設置為True
        if e.isSet():
            print("連接成功!")
            break
        else:
            print("連接失敗")
    else: #如果3次都沒成功 就打印這個消息
        print("服務器沒有啟動")
​
Thread(target=start).start()
Thread(target=connect).start()
使用Event

 四、協程

1、引言

上一節中我們知道GIL鎖將導致CPython無法利用多核CPU的優勢,只能使用單核並發的執行。很明顯效率不高,那有什么辦法能夠提高效率呢?

效率要高只有一個方法就是讓這個當前線程盡可能多的占用CPU時間,如何做到?

任務類型可以分為兩種 IO密集型 和 計算密集型

對於計算密集型任務而言 ,無需任何操作就能一直占用CPU直到超時為止,沒有任何辦法能夠提高計算密集任務的效率,除非把GIL鎖拿掉,讓多核CPU並行執行。

對於IO密集型任務任務,一旦線程遇到了IO操作CPU就會立馬切換到其他線程,而至於切換到哪個線程,應用程序是無法控制的,這樣就導致了效率降低。

如何能提升效率呢?想一想如果可以監測到線程的IO操作時,應用程序自發的切換到其他的計算任務,是不是就可以留住CPU?的確如此

2、單線程實現並發

單線程實現並發這句話乍一聽好像在瞎說

首先需要明確並發的定義

並發:指的是多個任務同時發生,看起來好像是同時都在進行

並行:指的是多個任務真正的同時進行

早期的計算機只有一個CPU,既然CPU可以切換線程來實現並發,那么為何不能再線程中切換任務來並發呢?

上面的引子中提到,如果一個線程能夠檢測IO操作並且將其設置為非阻塞,並自動切換到其他任務就可以提高CPU的利用率,指的就是在單線程下實現並發。

3、如何能夠實現並發呢

並發 = 切換任務+保存狀態,只要找到一種方案,能夠在兩個任務之間切換執行並且保存狀態,那就可以實現單線程並發

python中的生成器就具備這樣一個特點,每次調用next都會回到生成器函數中執行代碼,這意味着任務之間可以切換,並且是基於上一次運行的結果,這意味着生成器會自動保存執行狀態!

於是乎我們可以利用生成器來實現並發執行:

def task1():
    while True:
        yield
        print("task1 run")
​
def task2():
    g = task1()
    while True:
        next(g)
        print("task2 run")
task2()
yield實現並發

 

並發雖然實現了,單這對效率的影響是好是壞呢?來測試一下

yield實現並發的代碼性能測試

 

可以看到對於純計算任務而言,單線程並發反而使執行效率下降了一半左右,所以這樣的方案對於純計算任務而言是沒有必要的

我們暫且不考慮這樣的並發對程序的好處是什么,在上述代碼中,使用yield來切換是的代碼結構非常混亂,如果十個任務需要切換呢,不敢想象!因此就有人專門對yield進行了封裝,這便有了greenlet模塊

4、greenlet模塊實現並發

def task1(name):
    print("%s task1 run1" % name)
    g2.switch(name) # 切換至任務2
    print("task1 run2") 
    g2.switch() # 切換至任務2
def task2(name):
    print("%s task2 run1" % name)
    g1.switch() # 切換至任務1
    print("task2 run2")
​
g1 = greenlet.greenlet(task1)
g2 = greenlet.greenlet(task2)
g1.switch("jerry") # 為任務傳參數
View Code

 

該模塊簡化了yield復雜的代碼結構,實現了單線程下多任務並發,但是無論直接使用yield還是greenlet都不能檢測IO操作,遇到IO時同樣進入阻塞狀態,所以此時的並發是沒有任何意義的。

現在我們需要一種方案 即可檢測IO 又能夠實現單線程並發,於是gevent閃亮登場

5、協程概述

協程:是單線程下的並發,又稱微線程,纖程。英文名Coroutine。一句話說明什么是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序自己控制調度的。

需要強調的是:

#1. python的線程屬於內核級別的,即由操作系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其他線程運行)
#2. 單線程內開啟協程,一旦遇到io,就會從應用程序級別(而非操作系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)

 

對比操作系統控制線程的切換,用戶在單線程內控制協程的切換

優點如下:

#1. 協程的切換開銷更小,屬於程序級別的切換,操作系統完全感知不到,因而更加輕量級
#2. 單線程內就可以實現並發的效果,最大限度地利用cpu

 

缺點如下:

#1. 協程的本質是單線程下,無法利用多核,可以是一個程序開啟多個進程,每個進程內開啟多個線程,每個線程內開啟協程來盡可能提高效率
#2. 協程指的是單個線程,因而一旦協程出現阻塞,將會阻塞整個線程

 

6、gevent協程的使用

import gevent,sys
from gevent import monkey # 導入monkey補丁
monkey.patch_all() # 打補丁 
import time
​
print(sys.path)
​
def task1():
    print("task1 run")
    # gevent.sleep(3)
    time.sleep(3)
    print("task1 over")
​
def task2():
    print("task2 run")
    # gevent.sleep(1)
    time.sleep(1)
    print("task2 over")
​
g1 = gevent.spawn(task1)
g2 = gevent.spawn(task2)
gevent.joinall([g1,g2])
View Code

 

需要注意:

1.協程執行時要想使任務執行則必須對協程對象調用join函數

2.有多個任務時,隨便調用哪一個的join都會並發的執行所有任務,但是需要注意如果一個存在io的任務沒有被join該任務將無法正常執行完畢

3.monkey補丁的原理是把原始的阻塞模塊替換為修改后的非阻塞模塊,即偷梁換柱,來實現IO自定切換,所以打補丁的位置一定要放到導入阻塞模塊之前


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM