把應用程序的代碼分為多個代碼塊,正常情況代碼自上而下順序執行。如果代碼塊A運行過程中,能夠切換執行代碼塊B,又能夠從代碼塊B再切換回去繼續執行代碼塊A,這就實現了協程
我們知道線程的調度(線程上下文切換)是由操作系統決定的,當一個線程啟動后,什么時候占用CPU、什么時候讓出CPU,程序員都無法干涉。假設現在啟動4個線程,CPU線程時間片為 5 毫秒,也就是說,每個線程每隔5ms就讓出CPU,讓其他線程搶占CPU。可想而知,等4個線程運行結束,要進行多少次切換?
如果我們能夠自行調度自己寫的程序,讓一些代碼塊遇到IO操作時,切換去執行另外一些需要CPU操作的代碼塊,是不是節約了很多無畏的上下文切換呢?是的,協程就是針對這一情況而生的。我們把寫好的一個應用程序分為很多個代碼塊,如下圖所示:
把應用程序的代碼分為多個代碼塊,正常情況代碼自上而下順序執行。如果代碼塊A運行過程中,能夠切換執行代碼塊B,又能夠從代碼塊B再切換回去繼續執行代碼塊A,這就實現了協程(通常是遇到IO操作時切換才有意義)。示意圖如下:
所以,關於協程可以總結以下兩點:
(1)線程的調度是由操作系統負責,協程調度是程序自行負責。
(2)與線程相比,協程減少了無畏的操作系統切換。
實際上當遇到IO操作時做切換才更有意義,(因為IO操作不用占用CPU),如果沒遇到IO操作,按照時間片切換,無意義。
python中的yield 關鍵字用來實現生成器,但是生成器在一定的程度上與協程其實也是差不多。我們來看個例子:
def sayHello(n): while n > 0: print("hello~", n) yield n n -= 1 print('say hello') if __name__ == "__main__": sayHello(5) # 測試1 # next(sayHello(5)) # 測試2 # 測試3 # for i in sayHello(5): # pass
挨個測試,你會發現第一個測試是不能通過的,什么都不會輸出,這就是我們的生成器特性了,一旦函數內部有yield關鍵字,此函數就是生成器,只有調用next 或是 for之類的能夠迭代的才能夠使得生成器執行。那么這與我們的協程有什么關系呢?請看代碼:
from collections import deque def sayHello(n): while n > 0: print("hello~", n) yield n n -= 1 print('say hello') def sayHi(n): x = 0 while x < n: print('hi~', x) yield x += 1 print("say hi") # 使用yield語句,實現簡單任務調度器 class TaskScheduler(object): def __init__(self): self._task_queue = deque() def new_task(self, task): ''' 向調度隊列添加新的任務 ''' self._task_queue.append(task) def run(self): ''' 不斷運行,直到隊列中沒有任務 ''' while self._task_queue: task = self._task_queue.popleft() try: next(task) self._task_queue.append(task) except StopIteration: # 生成器結束 pass if __name__ == "__main__": sched = TaskScheduler() sched.new_task(sayHello(10)) sched.new_task(sayHi(15)) sched.run()
代碼運行下,你就發現了,這就是我們對協程的定義了。接下來我們說下actor模型。actor模式是一種最古老的也是最簡單的並行和分布式計算解決方案。下面我們通過yield來實現:
from collections import deque class ActorScheduler: def __init__(self): self._actors = {} self._msg_queue = deque() def new_actor(self, name, actor): self._msg_queue.append((actor, None)) self._actors[name] = actor def send(self, name, msg): actor = self._actors.get(name) if actor: self._msg_queue.append((actor, msg)) def run(self): while self._msg_queue: # print("隊列:", self._msg_queue) actor, msg = self._msg_queue.popleft() # print("actor", actor) # print("msg", msg) try: actor.send(msg) except StopIteration: pass if __name__ == '__main__': def say_hello(): while True: msg = yield print("say hello", msg) def say_hi(): while True: msg = yield print("say hi", msg) def counter(sched): while True: n = yield print("counter:", n) if n == 0: break sched.send('say_hello', n) sched.send('say_hi', n) sched.send('counter', n-1) sched = ActorScheduler() # 創建初始化 actors sched.new_actor('say_hello', say_hello()) sched.new_actor('say_hi', say_hi()) sched.new_actor('counter', counter(sched)) sched.send('counter', 10) sched.run()
(1) ActorScheduler 負責事件循環
(2) counter() 負責控制終止
(3) say_hello() / say_hi() 相當於切換的協程,當程序運行到這些函數內部的yield處,就開始切換。
所以,當執行時,我們能夠看到say_hello() / say_hi()不斷交替切換執行,直到counter滿足終止條件之后,協程終止。看懂上例可能需要花費一些時間。實際上我們已經實現了一個“操作系統”的最小核心部分。 生成器函數(含有yield的函數)就是認為,而yield語句是任務掛起的信號。 調度器循環檢查任務列表直到沒有任務要執行為止。
而這就是廖雪峰的python官網教程里面的協程代碼的最好解釋,這也是之前一直在思考的問題,請看代碼:
def consumer(): r = '' while True: n = yield r if not n: return print('[CONSUMER] Consuming %s...' % n) r = '200 OK' def produce(c): c.send(None) n = 0 while n < 5: n = n + 1 print('[PRODUCER] Producing %s...' % n) r = c.send(n) print('[PRODUCER] Consumer return: %s' % r) c.close() c = consumer() produce(c)
我之前一直納悶send()函數是如何激活生成器的,原來是實現了actor模型的協程!