python網絡-多任務實現之協程(27)


一、協程

協程,又稱微線程,纖程。英文名Coroutine。

協程不是進程,也不是線程,它就是一個函數,一個特殊的函數——可以在某個地方掛起,並且可以重新在掛起處繼續運行。所以說,協程與進程、線程相比,不是一個維度的概念。

一個進程可以包含多個線程,一個線程也可以包含多個協程,也就是說,一個線程內可以有多個那樣的特殊函數在運行。但是有一點,必須明確,一個線程內的多個協程的運行是串行的。如果有多核CPU的話,多個進程或一個進程內的多個線程是可以並行運行的,但是一個線程內的多個協程卻絕對串行的,無論有多少個CPU(核)。這個比較好理解,畢竟協程雖然是一個特殊的函數,但仍然是一個函數。一個線程內可以運行多個函數,但是這些函數都是串行運行的。當一個協程運行時,其他協程必須掛起。

通俗的理解:在一個線程中的某個函數,可以在任何地方保存當前函數的一些臨時變量等信息,然后切換到另外一個函數中執行,注意不是通過調用函數的方式做到的,並且切換的次數以及什么時候再切換到原來的函數都由開發者自己確定

二、yield實現協程

 1 import time
 2 
 3 def A():
 4     while True:
 5         print("----A---")
 6         yield
 7         time.sleep(0.3)
 8 
 9 def B(c):
10     while True:
11         print("----B---")
12         next(c)
13         time.sleep(0.3)
14 
15 if __name__=='__main__':
16     a = A()
17     B(a)

執行結果

----B---
----A---
----B---
----A---
----B---
----A---
----B---
----A---
----B---
----A---
省略。。。

代碼說明:

第17行:調用函數B,並把a傳遞進去。執行打印B的代碼,代碼執行到next(c)時,會調用函數A,執行打印A的代碼,當代碼實行帶第6行遇到yield的實行,該協程進入等待狀態,回到原來next(c)處繼續執行,從而實現多協程的切換,通過yield關鍵字。

 

三、greenlet

1、greenlet實現多任務協程

為了更好使用協程來完成多任務,python中的greenlet模塊對其封裝,從而使得切換任務變的更加簡單,在使用前先要確保greenlet模塊安裝

使用如下命令安裝greenlet模塊:

sudo pip install greenlet
#coding = utf-8
from greenlet import greenlet
def test1():
    print("1")
    gr2.switch()
    print("2")

def test2():
    print("3")
    gr1.switch()
    print("4")

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

運行結果:

1
3
2

當創建一個greenlet時,首先初始化一個空的棧, switch到這個棧的時候,會運行在greenlet構造時傳入的函數(首先在test1中打印 1), 如果在這個函數(test1)中switch到其他協程(到了test2 打印3),那么該協程會被掛起,等到切換回來(在test1切換回來 打印2)。當這個協程對應函數執行完畢,那么這個協程就變成dead狀態。
  

注意 上面沒有打印test2的最后一行輸出 4,因為在test2中切換到gr1之后掛起,但是沒有地方再切換回來。

2、greenlet的模塊與類

我們首先看一下greenlet這個module里面的屬性

>>> import greenlet
>>> dir(greenlet)
['GREENLET_USE_GC', 'GREENLET_USE_TRACING', 'GreenletExit', '_C_API', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', '__version__', 'error', 'getcurrent', 'gettrace', 'greenlet', 'settrace']

其中,比較重要的是getcurrent(), 類greenlet、異常類GreenletExit。

getcurrent()返回當前的greenlet實例;

GreenletExit:是一個特殊的異常,當觸發了這個異常的時候,即使不處理,也不會拋到其parent(后面會提到協程中對返回值或者異常的處理)

然后我們再來看看greenlet.greenlet這個類:

>>>dir(greenlet.greenlet)
['GreenletExit', '__bool__', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '_stack_saved', 'dead', 'error', 'getcurrent', 'gettrace', 'gr_frame', 'parent', 'run', 'settrace', 'switch', 'throw']

比較重要的幾個屬性:

  run:當greenlet啟動的時候會調用到這個callable,如果我們需要繼承greenlet.greenlet時,需要重寫該方法

  switch:前面已經介紹過了,在greenlet之間切換

  parent:可讀寫屬性,后面介紹

  dead:如果greenlet執行結束,那么該屬性為true

  throw:切換到指定greenlet后立即跑出異常

文章后面提到的greenlet大多都是指greenlet.greenlet這個class,請注意區別 

對於greenlet,最常用的寫法是 x = gr.switch(y)。 這句話的意思是切換到gr,傳入參數y。當從其他協程(不一定是這個gr)切換回來的時候,將值付給x。

import greenlet


def test1(x, y):
    z = gr2.switch(x + y)
    print("test1:%s" % z)


def test2(a):
    print('test2:%s' % a)
    gr1.switch(10)


gr1 = greenlet.greenlet(test1)
gr2 = greenlet.greenlet(test2)
print(gr1.switch("Hello", "World"))

運行結果為:

test2:HelloWorld
test1:10
None

上面的例子,第10行從main greenlet切換到了gr1,test1第3行切換到了gs2,然后gr1掛起,第7行從gr2切回gr1時,將值(10)返回值給了 z。 

3、greenlet生命周期

 文章開始的地方提到第一個例子中的gr2其實並沒有正常結束,我們可以借用greenlet.dead這個屬性來查看

運行結果為:

 1 import greenlet
 2 
 3 
 4 def test1():
 5     gr2.switch(1)
 6     print("test1: finished")
 7 
 8 
 9 def test2(x):
10     print("test2:first %s" % x)
11     gr1.switch()
12     print("test2:back")
13 
14 gr1 = greenlet.greenlet(test1)
15 gr2 = greenlet.greenlet(test2)
16 gr1.switch()
17 print("gr1 is dead? : %s, gr2 is dead? :%s" % (gr1.dead, gr2.dead))
18 gr2.switch()
19 print("gr1 is dead? : %s, gr2 is dead? :%s" % (gr1.dead, gr2.dead))

運行結果為:

test2:first 1
test1: finished
gr1 is dead? : True, gr2 is dead? :False
test2:back
gr1 is dead? : True, gr2 is dead? :True

只有當協程對應的函數執行完畢,協程才會die,所以第一次Check的時候gr2並沒有die,因為第12行切換出去了就沒切回來。在main中再switch到gr2的時候, 執行后面的邏輯,gr2 die

4、greenlet注意事項

使用greenlet需要注意一下三點:

  第一:greenlet創生之后,一定要結束,不能switch出去就不回來了,否則容易造成內存泄露

  第二:python中每個線程都有自己的main greenlet及其對應的sub-greenlet ,不能線程之間的greenlet是不能相互切換的

  第三:不能存在循環引用,這個是官方文檔明確說明

 1 from greenlet import greenlet, GreenletExit
 2 huge = []
 3 def show_leak():
 4     def test1():
 5         gr2.switch()
 6 
 7     def test2():
 8         huge.extend([x* x for x in range(100)])
 9         gr1.switch()
10         print 'finish switch del huge'
11         del huge[:]
12     
13     gr1 = greenlet(test1)
14     gr2 = greenlet(test2)
15     gr1.switch()
16     gr1 = gr2 = None
17     print 'length of huge is zero ? %s' % len(huge)
18 
19 if __name__ == '__main__':
20     show_leak() 

在test2函數中 第11行,我們將huge清空,然后再第16行將gr1、gr2的引用計數降到了0。但運行結果告訴我們,第11行並沒有執行,所以如果一個協程沒有正常結束是很危險的,往往不符合程序員的預期。greenlet提供了解決這個問題的辦法,官網文檔提到:如果一個greenlet實例的引用計數變成0,那么會在上次掛起的地方拋出GreenletExit異常,這就使得我們可以通過try ... finally 處理資源泄露的情況。如下面的代碼: 

1 from greenlet import greenlet, GreenletExit
 2 huge = []
 3 def show_leak():
 4     def test1():
 5         gr2.switch()
 6 
 7     def test2():
 8         huge.extend([x* x for x in range(100)])
 9         try:
10             gr1.switch()
11         finally:
12             print 'finish switch del huge'
13             del huge[:]
14     
15     gr1 = greenlet(test1)
16     gr2 = greenlet(test2)
17     gr1.switch()
18     gr1 = gr2 = None
19     print 'length of huge is zero ? %s' % len(huge)
20 
21 if __name__ == '__main__':
22     show_leak()

上述代碼的switch流程:main greenlet --> gr1 --> gr2 --> gr1 --> main greenlet, 很明顯gr2沒有正常結束(在第10行刮起了)。第18行之后gr1,gr2的引用計數都變成0,那么會在第10行拋出GreenletExit異常,因此finally語句有機會執行。同時,在文章開始介紹Greenlet module的時候也提到了,GreenletExit這個異常並不會拋出到parent,所以main greenlet也不會出異常。

四、gevent

greenlet已經實現了協程,但是這個還的人工切換,是不是覺得太麻煩了,不要捉急,python還有一個比greenlet更強大的並且能夠自動切換任務的模塊gevent

其原理是當一個greenlet遇到IO(指的是input output 輸入輸出,比如網絡、文件操作等)操作時,比如訪問網絡,就自動切換到其他的greenlet,等到IO操作完成,再在適當的時候切換回來繼續執行。

由於IO操作非常耗時,經常使程序處於等待狀態,有了gevent為我們自動切換協程,就保證總有greenlet在運行,而不是等待IO

import gevent


def f():
    for i in range(5):
        print("%s:%d"%(gevent.getcurrent(),i))


g1 = gevent.spawn(f)
g2 = gevent.spawn(f)
g3 = gevent.spawn(f)
g1.join()
g2.join()
g3.join()

運行結果為:

<Greenlet at 0x1ba533f9598: f(5)>:0
<Greenlet at 0x1ba533f9598: f(5)>:1
<Greenlet at 0x1ba533f9598: f(5)>:2
<Greenlet at 0x1ba533f9598: f(5)>:3
<Greenlet at 0x1ba533f9598: f(5)>:4
<Greenlet at 0x1ba533f97b8: f(5)>:0
<Greenlet at 0x1ba533f97b8: f(5)>:1
<Greenlet at 0x1ba533f97b8: f(5)>:2
<Greenlet at 0x1ba533f97b8: f(5)>:3
<Greenlet at 0x1ba533f97b8: f(5)>:4
<Greenlet at 0x1ba533f99d8: f(5)>:0
<Greenlet at 0x1ba533f99d8: f(5)>:1
<Greenlet at 0x1ba533f99d8: f(5)>:2
<Greenlet at 0x1ba533f99d8: f(5)>:3
<Greenlet at 0x1ba533f99d8: f(5)>:4

可以看到,3個greenlet是依次運行而不是交替運行

gevent的切換執行

import gevent


def f():
    for i in range(5):
        print("%s:%d"%(gevent.getcurrent(),i))
        gevent.sleep(0)


g1=gevent.spawn(f)
g2=gevent.spawn(f)
g3=gevent.spawn(f)
g1.join()
g2.join()
g3.join()

執行結果為:

<Greenlet at 0x20a5e719598: f>:0
<Greenlet at 0x20a5e7197b8: f>:0
<Greenlet at 0x20a5e7199d8: f>:0
<Greenlet at 0x20a5e719598: f>:1
<Greenlet at 0x20a5e7197b8: f>:1
<Greenlet at 0x20a5e7199d8: f>:1
<Greenlet at 0x20a5e719598: f>:2
<Greenlet at 0x20a5e7197b8: f>:2
<Greenlet at 0x20a5e7199d8: f>:2
<Greenlet at 0x20a5e719598: f>:3
<Greenlet at 0x20a5e7197b8: f>:3
<Greenlet at 0x20a5e7199d8: f>:3
<Greenlet at 0x20a5e719598: f>:4
<Greenlet at 0x20a5e7197b8: f>:4
<Greenlet at 0x20a5e7199d8: f>:4

3個greenlet交替運行

gevent.spawn 啟動協程,參數為函數名稱,參數名稱

3、gevent並發下載器

monkey可以使一些阻塞的模塊變得不阻塞,機制:遇到IO操作則自動切換,手動切換可以用gevent.sleep(0)

from gevent import monkey
import gevent
import urllib.request


#有I/O時需要這一句,如果沒有這句話就會有阻塞狀態,加上就沒有阻塞
monkey.patch_all()


def myDownLoad(url):
    print("GET:%s"%url)
    resp = urllib.request.urlopen(url)
    data = resp.read()
    print("%d bytes received from %s"%(len(data),url))


gevent.joinall((
    gevent.spawn(myDownLoad,"http://www.baidu.com/"),
    gevent.spawn(myDownLoad,"https://apple.com"),
    gevent.spawn(myDownLoad,"https://www.cnblogs.com/Se7eN-HOU/")
))

運行結果為:

GET:http://www.baidu.com/
GET:https://apple.com
GET:https://www.cnblogs.com/Se7eN-HOU/
153390 bytes received from http://www.baidu.com/
18880 bytes received from https://www.cnblogs.com/Se7eN-HOU/
58865 bytes received from https://apple.com

從上能夠看到是先發送的獲取baidu的相關信息,然后依次是apple,cnblogs但是收到數據的先后順序不一定與發送順序相同,這也就體現出了異步,即不確定什么時候會收到數據,順序不一定.

上面如果沒有下面這句代碼,

#有I/O時需要這一句,如果沒有這句話就會有阻塞狀態,加上就沒有阻塞
monkey.patch_all()

執行結果如下

GET:http://www.baidu.com/
153378 bytes received from http://www.baidu.com/
GET:https://apple.com
58865 bytes received from https://apple.com
GET:https://www.cnblogs.com/Se7eN-HOU/
18880 bytes received from https://www.cnblogs.com/Se7eN-HOU/

每請求一個網站就會等着請求完畢了在執行第二個,在請求的過程中,網速慢等待的狀態就是在阻塞。

五、asyncio

我們都知道,現在的服務器開發對於IO調度的優先級控制權已經不再依靠系統,都希望采用協程的方式實現高效的並發任務,如js、lua等在異步協程方面都做的很強大。

Python在3.4版本也加入了協程的概念,並在3.5確定了基本完善的語法和實現方式。同時3.6也對其進行了如解除了await和yield在同一個函數體限制等相關的優化。

event_loop 事件循環:程序開啟一個無限的循環,程序員會把一些函數注冊到事件循環上。當滿足事件發生的時候,調用相應的協程函數。
coroutine 協程:協程對象,指一個使用async關鍵字定義的函數,它的調用不會立即執行函數,而是會返回一個協程對象。協程對象需要注冊到事件循環,由事件循環調用。
task 任務:一個協程對象就是一個原生可以掛起的函數,任務則是對協程進一步封裝,其中包含任務的各種狀態。
future: 代表將來執行或沒有執行的任務的結果。它和task上沒有本質的區別
async/await 關鍵字:python3.5 用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的異步調用接口。

1、創建協程

首先定義一個協程,在def前加入async聲明,就可以定義一個協程函數。

一個協程函數不能直接調用運行,只能把協程加入到事件循環loop中。asyncio.get_event_loop方法可以創建一個事件循環,然后使用run_until_complete將協程注冊到事件循環,並啟動事件循環。

例如:

import asyncio


async def fun():
    print("---協程中---")

def main():
    print("---主線程中---")

    loop = asyncio.get_event_loop()
    loop.run_until_complete(fun())

if __name__ == "__main__":
    main()

運行結果:

---主線程中---
---協程中---

二、任務對象task

協程對象不能直接運行,在注冊事件循環的時候,其實是run_until_complete方法將協程包裝成為了一個任務(task)對象。所謂task對象是Future類的子類。保存了協程運行后的狀態,用於未來獲取協程的結果。

例如:

import asyncio


async def fun():
    print("---協程中---")
    return "Se7eN_HOU"

def main():
    print("---主線程中---")

    loop = asyncio.get_event_loop()
    #創建task
    task = loop.create_task(fun())
    print(task)
    loop.run_until_complete(task)
    print(task)

if __name__ == "__main__":
    main()

運行結果為:

---主線程中---
<Task pending coro=<fun() running at C:/Users/Se7eN_HOU/PycharmProjects/PythonLesson/test.py:4>>
---協程中---
<Task finished coro=<fun() done, defined at C:/Users/Se7eN_HOU/PycharmProjects/PythonLesson/test.py:4> result='Se7eN_HOU'>

創建task后,task在加入事件循環之前是pending狀態,因為fun()中沒有耗時的阻塞操作,task很快就執行完畢了。后面打印的finished狀態。
asyncio.ensure_future 和 loop.create_task都可以創建一個task,run_until_complete的參數是一個futrue對象。

 三、綁定回調

import asyncio

#協程
async def fun():
    print("---協程中---")
    return "Se7eN_HOU"

#協程的回調函數
def callback(future):
    #future.result是協程的返回值
    print("callBack:%s"%future.result())


def main():
    print("---主線程中---")
    #創建loop回路
    loop = asyncio.get_event_loop()
    #創建task
    task = loop.create_task(fun())
    #調用回調函數
    task.add_done_callback(callback)
    print(task)
    loop.run_until_complete(task)
    print(task)

if __name__ == "__main__":
    main()

運行結果為:

---主線程中---
<Task pending coro=<fun() running at C:/Users/Se7eN_HOU/PycharmProjects/PythonLesson/test.py:4> cb=[callback() at C:/Users/Se7eN_HOU/PycharmProjects/PythonLesson/test.py:9]>
---協程中---
callBack:Se7eN_HOU
<Task finished coro=<fun() done, defined at C:/Users/Se7eN_HOU/PycharmProjects/PythonLesson/test.py:4> result='Se7eN_HOU'>

也可以使用ensure_future獲取返回值

例如:

import asyncio

#協程
async def fun():
    print("---協程中---")
    return "Se7eN_HOU"

#協程的回調函數
#def callback(future):
    #future.result是協程的返回值
    #print("callBack:%s"%future.result())


def main():
    #創建loop回路
    loop = asyncio.get_event_loop()
    #創建task
    #task = loop.create_task(fun())
    #調用回調函數
    #task.add_done_callback(callback)
    task = asyncio.ensure_future(fun())
    loop.run_until_complete(task)
    print("fun函數的返回值是:%s"%format(task.result()))

if __name__ == "__main__":
    main()

運行結果為:

---協程中---
fun函數的返回值是:Se7eN_HOU

四、await阻塞,執行並發

使用async可以定義協程對象,使用await可以針對耗時的操作進行掛起,就像生成器里的yield一樣,函數讓出控制權。協程遇到await,事件循環將會掛起該協程,執行別的協程,直到其他的協程也掛起或者執行完畢,再進行下一個協程的執行。
耗時的操作一般是一些IO操作,例如網絡請求,文件讀取等。我們使用asyncio.sleep函數來模擬IO操作。協程的目的也是讓這些IO操作異步化。

例如:

import asyncio


async def test1():
    print("---1---")
    await asyncio.sleep(5)    
    print("---2---")


async def test2():
    print("---3---")
    await asyncio.sleep(1)
    print("---4---")


async def test3():
    print("---5---")
    await asyncio.sleep(3)
    print("---6---")

def main():
    loop = asyncio.get_event_loop()
    print("begin")

    t1 = test1()
    t2 = test2()
    t3 = test3()
    tasks1 = [t1,t2,t3]


    loop.run_until_complete(asyncio.wait(tasks1))
    print("end")
    loop.close()

if __name__=="__main__":
    main()

運行結果為:

begin
---3---
---1---
---5---
---4---
---6---
---2---
end

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM