再議Python協程——從yield到asyncio


協程,英文名Coroutine。
前面介紹Python的多線程,以及用多線程實現並發(參見這篇文章【淺析Python多線程】),今天介紹的協程也是常用的並發手段。本篇主要內容包含:協程的基本概念、協程庫的實現原理以及Python中常見的協程庫。

1 協程的基本概念

我們知道線程的調度(線程上下文切換)是由操作系統決定的,當一個線程啟動后,什么時候占用CPU、什么時候讓出CPU,程序員都無法干涉。假設現在啟動4個線程,CPU線程時間片為 5 毫秒,也就是說,每個線程每隔5ms就讓出CPU,讓其他線程搶占CPU。可想而知,等4個線程運行結束,要進行多少次切換?

如果我們能夠自行調度自己寫的程序,讓一些代碼塊遇到IO操作時,切換去執行另外一些需要CPU操作的代碼塊,是不是節約了很多無畏的上下文切換呢?是的,協程就是針對這一情況而生的。我們把寫好的一個應用程序分為很多個代碼塊,如下圖所示:

把應用程序的代碼分為多個代碼塊,正常情況代碼自上而下順序執行。如果代碼塊A運行過程中,能夠切換執行代碼塊B,又能夠從代碼塊B再切換回去繼續執行代碼塊A,這就實現了協程(通常是遇到IO操作時切換才有意義)。示意圖如下:

 

所以,關於協程可以總結以下兩點:

(1)線程的調度是由操作系統負責協程調度是程序自行負責

(2)與線程相比,協程減少了無畏的操作系統切換

實際上當遇到IO操作時做切換才更有意義,(因為IO操作不用占用CPU),如果沒遇到IO操作,按照時間片切換,無意義。

舉個例子,你在做一頓飯你要蒸飯和炒菜:最笨的方法是先蒸飯,飯蒸好了再去炒菜。這樣一頓飯得花不少時間,就跟我們沒采用並發編程一樣。

多線程相當於,你5分鍾在做蒸飯的工作,到了5分鍾開始炒菜,又過了5分鍾,你又去忙蒸飯。

協程相當於,你淘完米,放在電飯鍋,按下煮飯鍵之后,你開始去炒菜。炒菜的時候油沒熱,你可以調佐料。這樣,你炒兩個菜出來,飯蒸好了。整個過程你沒閑着,但是節約了不少時間。

2 基於yield實現協程

如1中所述,代碼塊A能夠中斷去執行代碼塊B,代碼塊B能夠中斷,執行代碼塊A。這不是和yield功能如出一轍嗎?我們先回憶一下yield的功能:

(1) 在函數中,語句執行到yield,會返回yield 后面的內容;當再回來執行時,從yield的下一句開始執行;
(2) 使用yield語法的函數是一個生成器;
(3) python3中,通過 .__next__() 或者 next() 方法獲取生成器的下一個值。

來看一個yield實現協程的例子:

from collections import deque

def sayHello(n):
    while n > 0:
        print("hello~", n)
        yield n
        n -= 1
    print('say hello')

def sayHi(n):
    x = 0
    while x < n:
        print('hi~', x)
        yield
        x += 1
    print("say hi")

# 使用yield語句,實現簡單任務調度器
class TaskScheduler(object):
    def __init__(self):
        self._task_queue = deque()

    def new_task(self, task):
        '''
        向調度隊列添加新的任務
        '''
        self._task_queue.append(task)

    def run(self):
        '''
        不斷運行,直到隊列中沒有任務
        '''
        while self._task_queue:
            task = self._task_queue.popleft()
            try:
                next(task)
                self._task_queue.append(task)
            except StopIteration:
                # 生成器結束
                pass

sched = TaskScheduler()
sched.new_task(sayHello(10))
sched.new_task(sayHi(15))
sched.run() 

上例執行時,你會看到sayHello()和sayHi() 不斷交替執行,當執行sayHello()時,在yield處中斷,當執行sayHi()時從yield處中斷,切換回sayHello()從yield之后的一句開始執行。。。如此來回交替無縫連接。

3 基於yield實現actor模型

actor模式是一種最古老的也是最簡單的並行和分布式計算解決方案。下面我們通過yield來實現:

from collections import deque

class ActorScheduler:
    def __init__(self):
        self._actors = {}
        self._msg_queue = deque()

    def new_actor(self, name, actor):
        self._msg_queue.append((actor, None))
        self._actors[name] = actor

    def send(self, name, msg):
        actor = self._actors.get(name)
        if actor:
            self._msg_queue.append((actor, msg))

    def run(self):
        while self._msg_queue:
            # print("隊列:", self._msg_queue)
            actor, msg = self._msg_queue.popleft()
            # print("actor", actor)
            # print("msg", msg)
            try:
                 actor.send(msg)
            except StopIteration:
                 pass


if __name__ == '__main__':
    def say_hello():
        while True:
            msg = yield
            print("say hello", msg)

    def say_hi():
        while True:
            msg = yield
            print("say hi", msg)

    def counter(sched):
        while True:
            n = yield
            print("counter:", n)
            if n == 0:
                break
            sched.send('say_hello', n)
            sched.send('say_hi', n)
            sched.send('counter', n-1)

    sched = ActorScheduler()
    # 創建初始化 actors
    sched.new_actor('say_hello', say_hello())
    sched.new_actor('say_hi', say_hi())
    sched.new_actor('counter', counter(sched))

    sched.send('counter', 10)
    sched.run()

上例中:

(1) ActorScheduler 負責事件循環
(2) counter() 負責控制終止
(3) say_hello() / say_hi() 相當於切換的協程,當程序運行到這些函數內部的yield處,就開始切換。

所以,當執行時,我們能夠看到say_hello() / say_hi()不斷交替切換執行,直到counter滿足終止條件之后,協程終止。看懂上例可能需要花費一些時間。實際上我們已經實現了一個“操作系統”的最小核心部分。 生成器函數(含有yield的函數)就是認為,而yield語句是任務掛起的信號。 調度器循環檢查任務列表直到沒有任務要執行為止。

4 協程庫的實現及asyncio

有了前面對協程的了解,我們可以思考怎樣去實現一個協程庫?我覺得可以從以下兩個個方面去思考:

(1)事件循環 (event loop)。事件循環需要實現兩個功能,一是順序執行協程代碼;二是完成協程的調度,即一個協程“暫停”時,決定接下來執行哪個協程。

(2)協程上下文的切換。基本上Python 生成器的 yeild 已經能完成切換,Python3中還有特定語法支持協程切換。

我們看一個比較復雜的例子:

from collections import deque
from select import select

class YieldEvent:
    def handle_yield(self, sched, task):
        pass

    def handle_resume(self, sched, task):
        pass

# 任務調度(相當於EventLoop)
class Scheduler:
    def __init__(self):
        self._numtasks = 0         # 任務總數量
        self._ready = deque()      # 等待執行的任務隊列
        self._read_waiting = {}    # 正等待讀的任務
        self._write_waiting = {}   # 正等待寫的任務

    # 利用I/O多路復用 監聽讀寫I/0
    def _iopoll(self):
        rset, wset, eset = select(self._read_waiting,
                                  self._write_waiting, [])
        for r in rset:
            evt, task = self._read_waiting.pop(r)
            evt.handle_resume(self, task)
        for w in wset:
            evt, task = self._write_waiting.pop(w)
            evt.handle_resume(self, task)

    def new(self, task):
        """添加一個新的任務"""
        self._ready.append((task, None))
        self._numtasks += 1

    def add_ready(self, task, msg=None):
        """添加到任務對列等待執行"""
        self._ready.append((task, msg))

    def _read_wait(self, fileno, evt, task):
        self._read_waiting[fileno] = (evt, task)

    def _write_wait(self, fileno, evt, task):
        self._write_waiting[fileno] = (evt, task)

    def run(self):
        while self._numtasks:
            # 如果任務數量為空,阻塞在select處,保持監聽
            if not self._ready:
                self._iopoll()
            task, msg = self._ready.popleft()
            try:
                r = task.send(msg)
                if isinstance(r, YieldEvent):
                    r.handle_yield(self, task)
                else:
                    raise RuntimeError('unrecognized yield event')
            except StopIteration:
                self._numtasks -= 1

# 示例: 將協程抽象成YieldEvent的子類,並重寫handle_yield和handle_resume方法
class ReadSocket(YieldEvent):
    def __init__(self, sock, nbytes):
        self.sock = sock
        self.nbytes = nbytes

    def handle_yield(self, sched, task):
        sched._read_wait(self.sock.fileno(), self, task)

    def handle_resume(self, sched, task):
        data = self.sock.recv(self.nbytes)
        sched.add_ready(task, data)

class WriteSocket(YieldEvent):
    def __init__(self, sock, data):
        self.sock = sock
        self.data = data

    def handle_yield(self, sched, task):
        sched._write_wait(self.sock.fileno(), self, task)

    def handle_resume(self, sched, task):
        nsent = self.sock.send(self.data)
        sched.add_ready(task, nsent)

class AcceptSocket(YieldEvent):
    def __init__(self, sock):
        self.sock = sock

    def handle_yield(self, sched, task):
        sched._read_wait(self.sock.fileno(), self, task)

    def handle_resume(self, sched, task):
        r = self.sock.accept()
        sched.add_ready(task, r)


class Socket(object):
    def __init__(self, sock):
        self._sock = sock

    def recv(self, maxbytes):
        return ReadSocket(self._sock, maxbytes)

    def send(self, data):
        return WriteSocket(self._sock, data)

    def accept(self):
        return AcceptSocket(self._sock)

    def __getattr__(self, name):
        return getattr(self._sock, name)

if __name__ == '__main__':
    from socket import socket, AF_INET, SOCK_STREAM

    def readline(sock):
        chars = []
        while True:
            c = yield sock.recv(1)
            print(c)
            if not c:
                break
            chars.append(c)
            if c == b'\n':
                break
        return b''.join(chars)

    # socket server 使用生成器
    class EchoServer:
        def __init__(self, addr, sched):
            self.sched = sched
            sched.new(self.server_loop(addr))

        def server_loop(self, addr):
            s = Socket(socket(AF_INET, SOCK_STREAM))
            s.bind(addr)
            s.listen(5)
            while True:
                c, a = yield s.accept()
                print('Got connection from ', a)
                print("got", c)
                self.sched.new(self.client_handler(Socket(c)))

        def client_handler(self, client):
            while True:
                try:
                    line = yield from readline(client)
                    if not line:
                        break

                    print("from Client::", str(line))
                except Exception:
                    break

                while line:
                    try:
                        nsent = yield client.sendall(line)
                        print("nsent", nsent)
                        line = line[nsent:]
                    except Exception:
                        break
            client.close()
            print('Client closed')

    sched = Scheduler()
    EchoServer(('localhost', 9999), sched)
    sched.run()  

Scheduler相當於實現事件循環並調度協程, 添加到事件循環中的事件必須繼承YieldEvent, 並重寫它定義的兩個方法。此例比較難,看不懂可以忽略。

我們看一下Python3中的協程庫asyncio是怎么實現的:

import asyncio

@asyncio.coroutine
def say_hi(n):
    print("start:", n)
    r = yield from asyncio.sleep(2)
    print("end:", n)

loop = asyncio.get_event_loop()
tasks = [say_hi(0), say_hi(1)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

# start: 1
# start: 0
# 停頓兩秒
# end: 1
# end: 0

(1)@asyncio.coroutine把一個generator標記為coroutine類型,然后,我們就把這個coroutine扔到EventLoop中執行。
(2)yield from語法可以讓我們方便地調用另一個generator。由於asyncio.sleep()也是一個coroutine,所以線程不會等待asyncio.sleep(),而是直接中斷並執行下一個消息循環。當asyncio.sleep()返回時,線程就可以從yield from拿到返回值(此處是None),然后接着執行下一行語句。
(3)asyncio.sleep(1)相當於一個耗時1秒的IO操作,在此期間,主線程並未等待,而是去執行EventLoop中其他可以執行的coroutine了,因此可以實現並發執行。

asyncio中get_event_loop()就是事件循環,而裝飾器@asyncio.coroutine標記了一個協程,並yield from 語法實現協程切換。在Python3.5中,新增了asyncawait的新語法,代替裝飾器和yield from。上例可以用新增語法完全代替。

async def say_hi(n):
    print("start:", n)
    r = await asyncio.sleep(2)
    print("end:", n)

loop = asyncio.get_event_loop()
tasks = [say_hi(0), say_hi(1)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

# start: 1
# start: 0
# 停頓兩秒
# end: 1
# end: 0

@asyncio.coroutine換成async, 將yield from 換成await  即可。

5 協程的缺點

(1)使用協程,只能使用單線程,多線程的便利就一點都用不到。例如,I/O阻塞程序,CPU仍然會將整個任務掛起直到操作完成。
(2) 一旦使用協程,大部分ython庫並不能很好的兼容,這就會導致要改寫大量的標准庫函數。
所以,最好別用協程,一旦用不好,協程給程序性能帶來的提升,遠遠彌補不了其帶來的災難。

  

 

 

  

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM