流暢python學習筆記第十八章:使用asyncio包處理並發(二)


前面介紹了asyncio的用法。下面我們來看下如何用協程的方式來實現之前的旋轉指針的方法

@asyncio.coroutine

def spin(msg):

    write,flush=sys.stdout.write,sys.stdout.flush

    for char in itertools.cycle('|/-\\'):

        status=char+''+msg

        write(status)

        flush()

        write('\x08'*len(status))

        try:

            yield from asyncio.sleep(0.1)

        except asyncio.CancelledError:

            break

    write(''*len(status)+'\0x8'*len(status))

 

@asyncio.coroutine

def slow_function():

    yield from asyncio.sleep(3)

    return 42

 

@asyncio.coroutine

def supervisor():

    spinner=asyncio.ensure_future(spin('thinking'))

#    spinner=asyncio.async(spin('thinking'))

    print('spinner object:',spinner)

    result=yield from slow_function()

    spinner.cancel()

return result

 

if __name__=="__main__":

    start=time.time()

    loop=asyncio.get_event_loop()

    result=loop.run_until_complete(supervisor())

    loop.close()

    print('Answer:',result)

    end=time.time()

print("Total time:{}".format(end-start))

運行的結果和之前用多線程是一樣的。我們來看下運行的具體原理

1 asyncio.get_event_loop和 loop.run_until_complete(supervisor())

創建主循環,並傳入任務supervisor

supervisor中首先通過asyncio.async(spin('thinking'))spin函數添加如Task。方法也可以是通過spinner=asyncio.ensure_future(spin('thinking'))。在asyncioTask對象和threading.Thread的用途是一樣的。Task對象用於驅動協程,Thread對象用於調用可調用的對象,Task對象不用自己實例化,而是通過把協程傳給asyncio.async函數或者是asyncio.ensure_future,或者是loop.create_task

spin函數中,執行在終端輸出旋轉指針。並通過asyncio.sleep(0.1)的方式讓出控制權,會到主循環

此時來到supervisor函數。此時進入slow_function,在slow_functionasyncio.sleep(3)進行休眠,並在休眠期把控制權交給主循環。此時主循環會將控制權又交給spin函數。

5 3秒休眠結束后,返回42,並通過spinner.cancel函數取消spintask,取消后會在協程當前暫停的yield出拋出asyncio.CancelledError異常。至此整個程序運行完畢。

我們繼續來看下用asyncio來實現圖片下載的程序

DEST_URL='downloads/'

BASE_URL1='http://seopic.699pic.com/photo/40011'

down_list=('8840.jpg_wh1200','7347.jpg_wh1200','6876.jpg_wh1200','6876.jpg_wh1200')

def save_flag(img,filename):

    path=os.path.join(DEST_URL,filename)

    with open(path,'wb') as f:

        f.write(img)

 

@asyncio.coroutine

def get_flag(cc):

    url='{}/{cc}.jpg'.format(BASE_URL1,cc=cc)

    print(url)

    resp = yield from aiohttp.ClientSession().get(url)

    print (resp.status)

    image = yield from resp.read()

    return image

 

def show(text):

    print(text)

    sys.stdout.flush()

 

@asyncio.coroutine

def download_one(cc):

    image = yield from get_flag(cc)

    show(cc)

    save_flag(image, cc + '.jpg')

    return cc

 

def download_many(cc_list):

    loop=asyncio.get_event_loop()

    to_do=[download_one(cc) for cc in sorted(cc_list)]

    wait_coro=asyncio.wait(to_do)

    res,_=loop.run_until_complete(wait_coro)

    loop.close()

    return len(res)

def main(download_many):

    t1=time.time()

    count=download_many(down_list)

    elapsed=time.time()-t1

    msg='\n{} flags downloaded in {:.2f}s'

    print(msg.format(count,elapsed))

if __name__=="__main__":

main(download_many)

在這里我們用yield from aiohttp.ClientSession().get(url)

代替了request函數。因為request函數是個IO阻塞型的函數。注意aiohttp必須要安裝才能使用。書上寫的是用aiohttp.request(‘GET’,url)方法,但是我在實際使用的時候發現無法下載,提示如下錯誤:

client_session: <aiohttp.client.ClientSession object at 0x7fac75231f28>

Unclosed client session

client_session: <aiohttp.client.ClientSession object at 0x7fac75231dd8>

Unclosed client session

client_session: <aiohttp.client.ClientSession object at 0x7fac75231eb8>

Unclosed client session

client_session: <aiohttp.client.ClientSession object at 0x7fac75231f28>

Task exception was never retrieved

future: <Task finished coro=<download_one() done, defined at /home/zhf/py_prj/function_test/asy_try.py:51> exception=TypeError("'_SessionRequestContextManager' object is not iterable",)>

Traceback (most recent call last):

  File "/home/zhf/py_prj/function_test/asy_try.py", line 53, in download_one

    image = yield from get_flag(cc)

  File "/home/zhf/py_prj/function_test/asy_try.py", line 39, in get_flag

    resp=yield from aiohttp.request('GET',url)

TypeError: '_SessionRequestContextManager' object is not iterable

Task exception was never retrieved

future: <Task finished coro=<download_one() done, defined at /home/zhf/py_prj/function_test/asy_try.py:51> exception=TypeError("'_SessionRequestContextManager' object is not iterable",)>

在網上搜了下,推薦使用aiohttp.ClientSession().get進行下載。這個函數能保證相關的TCP資源能夠得到釋放,比如TCP鏈接

在這里download_oneget_flag都用到了協程意味着必須像協程那樣驅動,這樣才能把控制權交還給時間循環

asyncio.wait分別把各個協程包裝進一個task對象,最后的結果是,wait處理的所有對象都通過某種方式變成Future類的實例,wait是協程函數,因此返回的是一個協程或生成器對象。為了驅動協程,我們把協程傳給run_until_completed方法。

運行時間:

5 flags downloaded in 0.34s

 

下面繼續來優化下這個下載程序。

首先將代碼改動下,在保存圖片的時候將大小放大1000

def save_flag(img,filename):

    path=os.path.join(DEST_URL,filename)

    with open(path,'wb') as f:

        f.write(img*1000)

5 flags downloaded in 13.46s

下載的圖片大小如下所示:一張圖片493M

那么耗時的時間呢.總共耗費了13.46秒的時間。速率降低了40多倍

5 flags downloaded in 13.46s

原因是什么呢。原因就在與save_flag是一個阻塞性的函數(f.write)save_flag函數阻塞了客戶代碼與asyncio事件循環共用的唯一線程。因此保存文件的時候,整個程序都會凍結。那么解決辦法就是使用run_in_executor方法。download_one代碼修改如下:

@asyncio.coroutine

def download_one(cc):

    image = yield from get_flag(cc)

    show(cc)

    loop=asyncio.get_event_loop()

loop.run_in_executor(None,save_flag,image,cc+'.jpg')

return cc

修改之后總共耗時1.5

5 flags downloaded in 1.50s

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM