終結python協程----從yield到actor模型的實現


把應用程序的代碼分為多個代碼塊,正常情況代碼自上而下順序執行。如果代碼塊A運行過程中,能夠切換執行代碼塊B,又能夠從代碼塊B再切換回去繼續執行代碼塊A,這就實現了協程

我們知道線程的調度(線程上下文切換)是由操作系統決定的,當一個線程啟動后,什么時候占用CPU、什么時候讓出CPU,程序員都無法干涉。假設現在啟動4個線程,CPU線程時間片為 5 毫秒,也就是說,每個線程每隔5ms就讓出CPU,讓其他線程搶占CPU。可想而知,等4個線程運行結束,要進行多少次切換?

如果我們能夠自行調度自己寫的程序,讓一些代碼塊遇到IO操作時,切換去執行另外一些需要CPU操作的代碼塊,是不是節約了很多無畏的上下文切換呢?是的,協程就是針對這一情況而生的。我們把寫好的一個應用程序分為很多個代碼塊,如下圖所示:

把應用程序的代碼分為多個代碼塊,正常情況代碼自上而下順序執行。如果代碼塊A運行過程中,能夠切換執行代碼塊B,又能夠從代碼塊B再切換回去繼續執行代碼塊A,這就實現了協程(通常是遇到IO操作時切換才有意義)。示意圖如下:

所以,關於協程可以總結以下兩點:

(1)線程的調度是由操作系統負責,協程調度是程序自行負責。

(2)與線程相比,協程減少了無畏的操作系統切換。

實際上當遇到IO操作時做切換才更有意義,(因為IO操作不用占用CPU),如果沒遇到IO操作,按照時間片切換,無意義。

python中的yield 關鍵字用來實現生成器,但是生成器在一定的程度上與協程其實也是差不多。我們來看個例子:

def sayHello(n):
    while n > 0:
        print("hello~", n)
        yield n
        n -= 1
    print('say hello')

    
if __name__ == "__main__":
    sayHello(5)  # 測試1
    # next(sayHello(5))  # 測試2
    
    # 測試3
    # for i in sayHello(5):
    #     pass

挨個測試,你會發現第一個測試是不能通過的,什么都不會輸出,這就是我們的生成器特性了,一旦函數內部有yield關鍵字,此函數就是生成器,只有調用next 或是 for之類的能夠迭代的才能夠使得生成器執行。那么這與我們的協程有什么關系呢?請看代碼:

from collections import deque
 
def sayHello(n):
    while n > 0:
        print("hello~", n)
        yield n
        n -= 1
    print('say hello')
 
def sayHi(n):
    x = 0
    while x < n:
        print('hi~', x)
        yield
        x += 1
    print("say hi")
 
# 使用yield語句,實現簡單任務調度器
class TaskScheduler(object):
    def __init__(self):
        self._task_queue = deque()
 
    def new_task(self, task):
        '''
        向調度隊列添加新的任務
        '''
        self._task_queue.append(task)
 
    def run(self):
        '''
        不斷運行,直到隊列中沒有任務
        '''
        while self._task_queue:
            task = self._task_queue.popleft()
            try:
                next(task)
                self._task_queue.append(task)
            except StopIteration:
                # 生成器結束
                pass


if __name__ == "__main__":
    sched = TaskScheduler()
    sched.new_task(sayHello(10))
    sched.new_task(sayHi(15))
    sched.run()

代碼運行下,你就發現了,這就是我們對協程的定義了。接下來我們說下actor模型。actor模式是一種最古老的也是最簡單的並行和分布式計算解決方案。下面我們通過yield來實現:

from collections import deque
 
class ActorScheduler:
    def __init__(self):
        self._actors = {}
        self._msg_queue = deque()
 
    def new_actor(self, name, actor):
        self._msg_queue.append((actor, None))
        self._actors[name] = actor
 
    def send(self, name, msg):
        actor = self._actors.get(name)
        if actor:
            self._msg_queue.append((actor, msg))
 
    def run(self):
        while self._msg_queue:
            # print("隊列:", self._msg_queue)
            actor, msg = self._msg_queue.popleft()
            # print("actor", actor)
            # print("msg", msg)
            try:
                 actor.send(msg)
            except StopIteration:
                 pass
 
 
if __name__ == '__main__':
    def say_hello():
        while True:
            msg = yield
            print("say hello", msg)
 
    def say_hi():
        while True:
            msg = yield
            print("say hi", msg)
 
    def counter(sched):
        while True:
            n = yield
            print("counter:", n)
            if n == 0:
                break
            sched.send('say_hello', n)
            sched.send('say_hi', n)
            sched.send('counter', n-1)
 
    sched = ActorScheduler()
    # 創建初始化 actors
    sched.new_actor('say_hello', say_hello())
    sched.new_actor('say_hi', say_hi())
    sched.new_actor('counter', counter(sched))
 
    sched.send('counter', 10)
    sched.run()

(1) ActorScheduler 負責事件循環
(2) counter() 負責控制終止
(3) say_hello() / say_hi() 相當於切換的協程,當程序運行到這些函數內部的yield處,就開始切換。

所以,當執行時,我們能夠看到say_hello() / say_hi()不斷交替切換執行,直到counter滿足終止條件之后,協程終止。看懂上例可能需要花費一些時間。實際上我們已經實現了一個“操作系統”的最小核心部分。 生成器函數(含有yield的函數)就是認為,而yield語句是任務掛起的信號。 調度器循環檢查任務列表直到沒有任務要執行為止。

而這就是廖雪峰的python官網教程里面的協程代碼的最好解釋,這也是之前一直在思考的問題,請看代碼:

def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'

def produce(c):
    c.send(None)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

c = consumer()
produce(c)

我之前一直納悶send()函數是如何激活生成器的,原來是實現了actor模型的協程!

相關鏈接:再議Python協程——從yield到asyncio


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM