python並發編程之gevent協程(四)


協程的含義就不再提,在py2和py3的早期版本中,python協程的主流實現方法是使用gevent模塊。由於協程對於操作系統是無感知的,所以其切換需要程序員自己去完成。

系列文章

基於生成器的簡單協程

import time
def A():
    for i in range(100):
        print("----A---")
        yield i
        time.sleep(0.5)


def B(c):
    while True:
        print("----B---")
        try:
            next(c)
        except StopIteration:
            break
        else:
            time.sleep(0.5)

if __name__ == '__main__':
    a = A()
    B(a)

上面的例子並沒有帶來代碼效率的提高,因為time.sleep()是同步阻塞操作;上例主要是為了說明協程的上下文切換原理。

greenlet的協程

greenlet模塊也是程序顯性主動控制協程切換,但是對原生做了一定的封裝使得切換變得簡單一些。

from greenlet import greenlet
import time

def test1(gr,g):
    for i in range(100):
        print("---A--")
        gr.switch(g, gr) # 切換到另一個協程執行
        time.sleep(0.5)

def test2(gr, g):
    for i in range(100):
        print("---B--")
        gr.switch(g, gr)
        # gr.throw(AttributeError)
        time.sleep(0.5)

if __name__ == '__main__':
    # 創建一個協程1
    gr1 = greenlet(test1)
    # 創建一個協程2
    gr2 = greenlet(test2)
    # 啟動協程
    gr1.switch(gr2, gr1)

greenlet類主要有兩個方法:

  • switch:用來切換協程;

  • throw():用來拋出異常同時終止程序;

gevent模塊協程

  • gevent是在greenlet的基礎上進行封裝使得gevent變得更加的易用。

  • gevent采用了隱式啟動事件循環,即在需要阻塞的時候開啟一個專門的協程來啟動事件循環;

  • 如果一個任務沒有io操作,那么他會一直執行直到完成;其他協程沒有執行的機會;

  • 自動識別io事件,放棄CPU控制時間;

# 一個補丁patch_all,注意要放在所有的import前面,其會將線程、進程替換成gevent框架,使得我們可以用同步編程的方式編寫異步代碼
from gevent import monkey;monkey.patch_all()
import gevent
import requests

def target0(n):
    print('--start---{}'.format(n))
    res = requests.get('http://www.baidu.com')
    print(res)
    return n

if __name__ == '__main__':
    jobs = [gevent.spawn(target0, 1),gevent.spawn(target0, 2),gevent.spawn(target0, 3)]
    gevent.joinall(jobs)
    print([job.value for job in jobs])

gevent模塊分析

  • gevent頂層方法
gevent.spawn():創建一個普通的Greenlet對象並切換;
gevent.spawn_later(seconds=3) # 延時創建一個普通的Greenlet對象並切換
gevent.spawn_raw() # 創建的協程對象屬於一個組
gevent.getcurrent() # 返回當前正在執行的greenlet
gevent.joinall(jobs):將協程任務添加到事件循環,接收一個任務列表
gevent.wait() # 可以替代join函數等待循環結束,也可以傳入協程對象列表
gevent.kill() # 殺死一個協程
gevent.killall() # 殺死一個協程列表里的所有協程
monkey.patch_all():非常重要,會自動將python的一些標准模塊替換成gevent框架
  • 設置強制切換的時間
# 手動設置CPU密集型最大執行時間,如果是單線程的協程不需要關注這個
sys.setcheckinterval(n):每n條執行嘗試進行線程切換,n必須是int
sys.getswitchinterval() # 默認5ms切換
  • Greenlet對象
# Greenlet對象
from gevent import Greenlet

# Greenlet對象創建
job = Greenlet(target0, 3)
Greenlet.spawn() # 創建一個協程並啟動
Greenlet.spawn_later(seconds=3) # 延時啟動

# 協程啟動
job.start() # 將協程加入循環並啟動協程
job.start_later(3) # 延時啟動

# 等待任務完成
job.join() # 等待任務完成
job.get() # 獲取協程返回的值

# 任務中斷和判斷任務狀態
job.dead() # 判斷協程是否死亡
job.kill() # 殺死正在運行的協程並喚醒其他的協程,這個協程將不會再執行,可以
job.ready() # 任務完成返回一個真值
job.successful() # 任務成功完成返回真值,否則拋出錯誤

# 獲取屬性
job.loop # 時間循環對象
job.value # 獲取返回的值

# 捕捉異常
job.exception # 如果運行有錯誤,獲取它
job.exc_info # 錯誤的詳細信息

# 設置回調函數
job.rawlink(back) # 普通回調,將job對象作為回調函數的參數
job.unlink() # 刪除回調函數
# 執行成功的回調函數
job.link_value(back)
# 執行失敗的回調函數
job.link_exception(back)

限制並發

  • 通過設置協程池Pool來限制運行的協程的最大數目。該Pool和multiprocessing進程模塊的Pool的API十分相似。

  • Group類管理一組不限制數目的協程對象,但是Pool是它的子類,使用一般用Pool替代。

def tag():
    print('--start---')
    x = requests.get('http://www.baidu.com')
    print(x)
    print('------end--------')
    return 0

if __name__ == '__main__':

    from gevent.pool import Pool
    p = Pool(5)
    for i in range(10):
        p.apply_async(tag)
    p.join()
  • gevent.Pool的特殊方法
pool.wait_available():等待直到有一個協程有結果
pool.dd(greenlet):向進程池添加一個方法並跟蹤,非阻塞
pool.discard(greenlet):停止跟蹤某個協程
pool.start(greenlet):加入並啟動協程
pool.join():阻塞等待結束
pool.kill():殺死所有跟蹤的協程
pool.killone(greenlet):殺死一個協程


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM