Python並發(二)


並發是指一次處理多件事,而並行是指一次做多件事。二者不同,但互相有聯系。打個比方:像Python的多線程,就是並發,因為Python的解釋器GIL是線程不安全的,一次只允許執行一個線程的Python字節碼,我們在使用多線程時,看上去像很多個任務同時進行,但實際上但一個線程在執行的時候,其他線程是處於休眠狀態的。而在多CPU的服務器上,Java或Go的多線程,則是並行,因為他們的多線程會利用到服務器上的每個CPU,如果一個服務器上只有一個CPU,那么Java或者Go的多線程依舊是並發,而不是並行。

在上個章節,我們討論了Python的多線程,在這個章節,我們將通過asyncio包來實現並發,這個包使用事件循環驅動的協程來實現並發

下面,我們看一下asyncio包的簡單使用

import asyncio
from time import strftime


@asyncio.coroutine
def hello():
    print(strftime('[%H:%M:%S]'), "Hello world!")
    r = yield from asyncio.sleep(1)
    print(strftime('[%H:%M:%S]'), "Hello again!")


loop = asyncio.get_event_loop()
loop.run_until_complete(hello())
loop.close()

    

運行結果:

[17:01:59] Hello world!
[17:02:00] Hello again!

  

@asyncio.coroutine把一個生成器標記為協程類型,然后,我們就把這個協程扔到EventLoop中執行

現在,我們封裝兩個協程扔進EventLoop中執行

import threading
import asyncio
from time import strftime


@asyncio.coroutine
def hello(id):
    print(strftime('[%H:%M:%S]'), 'coroutine_id:%s thread_id:%s' % (id, threading.currentThread()))
    yield from asyncio.sleep(1)
    print(strftime('[%H:%M:%S]'), 'coroutine_id:%s thread_id:%s' % (id, threading.currentThread()))


loop = asyncio.get_event_loop()
tasks = [hello(1), hello(2)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

  

運行結果:

[17:10:51] coroutine_id:1 thread_id:<_MainThread(MainThread, started 5100)>
[17:10:51] coroutine_id:2 thread_id:<_MainThread(MainThread, started 5100)>
[17:10:52] coroutine_id:1 thread_id:<_MainThread(MainThread, started 5100)>
[17:10:52] coroutine_id:2 thread_id:<_MainThread(MainThread, started 5100)>

  

由打印的當前線程名稱可以看出,兩個協程是由同一個線程並發執行的。
如果把asyncio.sleep()換成真正的IO操作,則多個協程就可以由一個線程並發執行。

async/await

我們可以用asyncio提供的@asyncio.coroutine可以把一個生成器標記為協程類型,然后在協程內部用yield from調用另一個協程實現異步操作。為了簡化並更好地標識異步IO,從Python3.5開始引入了新的語法async和await,可以讓協程的代碼更簡潔易讀。async和await是針對協程的新語法,要使用新的語法,只需要做兩步簡單的替換:

import asyncio
from time import strftime


async def hello():
    print(strftime('[%H:%M:%S]'), "Hello world!")
    r = await asyncio.sleep(1)
    print(strftime('[%H:%M:%S]'), "Hello again!")


loop = asyncio.get_event_loop()
loop.run_until_complete(hello())
loop.close()

  

運行結果:

[17:19:55] Hello world!
[17:19:56] Hello again!

  

下面,讓我們用協程並發下載多張圖片,這里需要用到aiohttp包,asyncio包只支持TCP和UDP,如果想要使用HTTP協議,需要使用第三方的包,而aiohttp包,則是支持HTTP協議的

import asyncio
import time
import aiohttp
import sys
import os
from time import strftime, sleep

POP20_CC = ["pms_1508850965.67096774", "pms_1509723338.05097112", "pms_1508125822.19716710",
            "pms_1512614327.2483640", "pms_1525853341.8312102", "pms_1511228654.33099308"]

BASE_URL = 'https://i1.mifile.cn/a1'

DEST_DIR = 'downloads/'


async def get_flag(cc):  # <1>
    url = '{}/{cc}.jpg'.format(BASE_URL, cc=cc.lower())
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            image = await resp.read()
    return image


def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)


async def download_one(cc):  # <2>
    image = await get_flag(cc)
    sys.stdout.flush()
    save_flag(image, cc.lower() + '.jpg')
    return cc


def download_many(cc_list):  # <3>
    loop = asyncio.get_event_loop()
    to_do = [download_one(cc) for cc in sorted(cc_list)]
    wait_coro = asyncio.wait(to_do)
    res, _ = loop.run_until_complete(wait_coro)
    loop.close()
    return len(res)


def main(download_many):
    path = os.path.join(DEST_DIR)
    if not os.path.exists(path):
        os.mkdir(path)
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))


if __name__ == '__main__':
    main(download_many)

  

運行結果:

6 flags downloaded in 0.25s

  

<1>處,我們通過async/await將這個生成器聲明為協程類型,我們用aiohttp獲取遠程的圖片資源,當發生網絡請求的時候,主線程會切換到其他的協程執行

<2>處,當<1>處的網絡請求發回響應時,將返回的圖片存入本地

<3>處,我們在這個方法里生成多個協程,並提交到EventLoop中運行

 

上面的程序,還有幾處值的修改的地方:

第一處是IO問題,程序員往往忽略一個事實,就是訪問本地文件系統會阻塞,想當然的認為這種操作不會受網絡訪問高延遲的影響,而在上述示例中,save_flag()函數會阻塞客戶端代碼和asyncio事件循環共用的唯一線程,因此保存圖片時,整個應用程序都會被凍結,而一旦受到I/O阻塞,則會浪費掉幾百萬個CPU周期,所以,就算是本地文件系統的訪問,我們也應該把他提到另一個線程去執行,避免造成CPU周期的浪費。

第二處是管理協程的並發數,假設我們這里抓取的不再是僅僅幾張圖片,而是成千上百,可能我們的鏈接會斷掉,甚至對方的網絡因為我們的頻繁訪問禁止了我們的IP。

所以,我們還要對我們的圖片下載代碼進行修改

import asyncio
import collections
import contextlib
import time
import aiohttp
from aiohttp import web
import os
from collections import namedtuple
from enum import Enum

POP20_CC = ["pms_1508850965.67096774", "pms_1509723338.05097112", "pms_1508125822.19716710",
            "pms_1512614327.2483640", "pms_1525853341.8312102", "pms_1511228654.33099308", "error"]

BASE_URL = 'https://i1.mifile.cn/a1'

DEST_DIR = 'downloads/'

DEFAULT_CONCUR_REQ = 3
VERBOSE = True
Result = namedtuple('Result', 'status data')
HTTPStatus = Enum('Status', 'ok not_found error')


class FetchError(Exception):
    def __init__(self, country_code):
        self.country_code = country_code


def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)


async def get_flag(base_url, cc):
    url = '{}/{cc}.jpg'.format(base_url, cc=cc.lower())
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            with contextlib.closing(resp):  # <1>
                if resp.status == 200:
                    image = await resp.read()
                    return image
                elif resp.status == 404:
                    raise web.HTTPNotFound()
                else:
                    raise aiohttp.HttpProcessingError(
                        code=resp.status, message=resp.reason,
                        headers=resp.headers)


async def download_one(cc, base_url, semaphore, verbose):
    try:
        with (await semaphore):  # <2>
            image = await get_flag(base_url, cc)
    except web.HTTPNotFound:
        status = HTTPStatus.not_found
        msg = 'is not found'
    except Exception as exc:
        raise FetchError(cc) from exc
    else:
        loop = asyncio.get_event_loop()
        loop.run_in_executor(None, save_flag, image, cc.lower() + '.jpg')  # <3>
        status = HTTPStatus.ok
        msg = 'is OK'

    if verbose and msg:
        print(cc, msg)

    return Result(status, cc)


async def downloader_coro(cc_list, base_url, verbose, concur_req):
    counter = collections.Counter()
    semaphore = asyncio.Semaphore(concur_req)
    to_do = [download_one(cc, base_url, semaphore, verbose)
             for cc in sorted(cc_list)]
    to_do_iter = asyncio.as_completed(to_do)
    for future in to_do_iter:
        try:
            res = await future
        except FetchError as exc:
            country_code = exc.country_code
            try:
                error_msg = exc.__cause__.args[0]
            except IndexError:
                error_msg = exc.__cause__.__class__.__name__
            if verbose and error_msg:
                msg = '*** Error for {}: {}'
                print(msg.format(country_code, error_msg))
            status = HTTPStatus.error
        else:
            status = res.status

        counter[status] += 1

    return counter


def download_many(cc_list, base_url, verbose, concur_req):
    loop = asyncio.get_event_loop()
    coro = downloader_coro(cc_list, base_url, verbose, concur_req)
    counts = loop.run_until_complete(coro)
    return counts


def main(download_many):
    path = os.path.join(DEST_DIR)
    if not os.path.exists(path):
        os.mkdir(path)
    t0 = time.time()
    counter = download_many(POP20_CC, BASE_URL, VERBOSE, DEFAULT_CONCUR_REQ)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(counter, elapsed))


if __name__ == '__main__':
    main(download_many)

    

運行結果:

error is not found
pms_1511228654.33099308 is OK
pms_1512614327.2483640 is OK
pms_1509723338.05097112 is OK
pms_1525853341.8312102 is OK
pms_1508125822.19716710 is OK
pms_1508850965.67096774 is OK

Counter({<Status.ok: 1>: 6, <Status.not_found: 2>: 1}) flags downloaded in 0.41s

  

<1>處,在網絡請求完畢,我們要關閉網絡,避免因為網絡請求過多最后造成鏈接中斷

<2>處,我們用asyncio.Semaphore(concur_req)設置協程最大並發數,這里我們設置是3,然后再用with (await semaphore)執行協程

<3>處,loop.run_in_executor()方法是用來傳入需要執行的對象,以及執行參數,這個方法會維護一個ThreadPoolExecutor()線程池,如果我們第一個參數是None,run_in_executor()就會把我們的執行對象和參數提交給背后維護的ThreadPoolExecutor()執行,如果我們傳入自己定義的一個線程池,則把執行對象和參數傳給我們定義的線程池執行

使用aiohttp編寫web服務器

asyncio可以實現單線程並發IO操作,但asyncio只實現了TCP、UDP、SSL等協議,而aiohttp則是基於asyncio上實現了HTTP協議,所以,我們可以基於這asyncio和aiohttp兩個框架實現自己的一個web服務器,代碼如下:

import asyncio

from aiohttp import web, web_runner

CONTENT_TYPE = "text/html;"


async def index(request):
    await asyncio.sleep(0.5)
    return web.Response(body=b"<h1>Index</h1>", content_type=CONTENT_TYPE)


async def hello(request):
    await asyncio.sleep(0.5)
    text = "<h1>hello, %s!</h1>" % request.match_info["name"]
    return web.Response(body=text, content_type=CONTENT_TYPE)


async def init(loop):
    app = web.Application(loop=loop)
    app = web_runner.AppRunner(app=app).app()
    app.router.add_route("GET", "/", index)
    app.router.add_route("GET", "/hello/{name}", hello)
    srv = await loop.create_server(app.make_handler(), "127.0.0.1", 8000)
    print("Server started at http://127.0.0.1:8000...")
    return srv


loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()

  

運行腳本后,在瀏覽器輸入:

http://127.0.0.1:8000/

 

如果輸入:http://127.0.0.1:8000/hello/Lily,就可以看見如下頁面,/hello/后面的name可以替換

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM