本文參考:http://www.dabeaz.com/coroutines/ 作者:David Beazley
緣起:
本人最近在學習python的協程。偶然發現了David Beazley的coroutine課程,花了幾天時間讀完后,為了加深理解就把其中個人認為最為精華的部分摘下來並加上個人理解寫了本篇博客。
扯一些淡:
既然要搞一個操作系統,那我們就先來設一個目標吧!就像找女朋友,我們不可能隨隨便便的是個女的就上,肯定要對女方有一定的要求,比如膚白貌美氣質佳……
所以,我們對這個' 姑娘 '的要求是這樣的:
1 使用純Python代碼開發 (必須是人形,非狗非貓非其他)
2 真正的操作系統,不僅僅能調度任務,還提供了許許多多的系統調用,比如說新建一個進程,kill一個進程,這些我們也要實現!!(所以說就算是找個充氣的也必須是有多功能的)
3 可以處理多任務 (洗衣做飯啥都能干)
雖然這要求有點低,但也湊活着用了…… 好,不扯淡了。
因為我們的主要目的是為了了解協程如何應用,所以我們不使用子進程或者線程模塊,我們要使用協程(coroutine)!!!
ps:這里的協程指的是由Python中原生的generator衍生出來的coroutine,不是greenlet等庫!
大體分析:
1 為了讓coroutine/generator更像是task,我們定義一個Task類,用來封裝coroutine。我們把一個task模仿為一個進程或者線程。
2 既然要處理多任務,我們需要搞一個調度器(scheduler)用來調度任務的執行。
3 需要定義系統調用類。
好,接下來我們擼起袖子就是干
實現多任務處理
1 先來實現一個Task類
每一個task都擁有一個唯一的tid
為Task定義一個run()方法,用來向generator中傳遞值

class Task(object): ''' generator的wrapper ''' tid = 0 def __init__(self,target): ''' :param target: 一個generator對象 ''' Task.tid += 1 self.tid = Task.tid # 用來唯一地標識一個task self.target = target self.sendval = None def run(self): return self.target.send(self.sendval)
2 定義一個scheduler
1 首先我們需要開辟一段數據結構來保存所有的task,這里使用dict來保存,使用tid作為key,task對象作為value
2 因為我們要實現多任務處理,所以我們需要有一個有序的序列來存儲每一個處於ready狀態(也就是可運行)的task,在這里我們選擇使用隊列queue
3 我們需要一個方法來創建task
4 我們需要一個方法來處理運行結束的task
5 我們需要一個方法來將ready狀態的task放入queue中
6 然后就是搞一個loop,不斷的從queue中獲取task並運行。
class Scheduler(object):
def __init__(self): self.task_map = {} self.ready = queue.Queue()
def new(self,target): ''' 新建一個task :param target: generator對象 :return: 新建task的tid ''' task = Task(target) self.task_map[task.tid] = task self.schedule(task) return task.tid
def schedule(self,task): ''' 將task放入ready隊列中 :param task: Task對象 :return: ''' self.ready.put(task) def exit(self,tid): ''' 處理運行結束的task :param task: Task對象 :return: ''' del self.task_map[tid] def main_loop(self): ''' 啟動loop :return: ''' while True: task = self.ready.get() try: result = task.run() except StopIteration: self.exit(task.tid) continue self.schedule(task)
試着運行一下:

def Laundry(): for i in range(5): yield print('I am doing the laundry') def Cook(): for i in range(10): yield print('I am cooking') s = Scheduler() s.new(Cook()) s.new(Laundry()) s.main_loop()
在上面的代碼中:
每當進行task.run()就會執行某個邏輯,然后yield。 (在yield之前的這段時間task擁有cpu控制權)
當task進行yield時,scheduler取代task獲得cpu執行權,然后將運行了一次yield的task重新加入到queue中
然后scheduler獲取queue中的下一個task,執行task.run(),這就表示新的task獲得了cpu控制權。
是不是很像上下文切換?
到現在為止,我們已經實現了多任務處理,接下來搞定系統調用。
我們知道當某個進程進行系統調用時,表示內核獲得cpu控制權,內核代替該進程執行相關操作。 這里和上面的yield是不是很類似?
實現系統調用:
1 首先我們定義一個所有系統調用的基類:SystemCall
我們得知道是哪個task調用了該系統調用吧,所以增加一個屬性self.task
我們得知道當前調度器是誰,因為系統調用不僅要進行相關操作,還需要在相關操作完成后決定如何調度該task(比如說結束該task,繼續執行該task……),
所以增加一個屬性self.schedule
我們還要知道該如何運行,所以我們統一定義一個handle方法來進行相關操作
根據以上分析,我們這么定義:

class SystemCall(object): def __init__(self): self.task = None self.scheduler = None def handler(self): pass
2 我們可以使用yield 來傳遞或者說調用 系統調用
那么回想一下:系統調用(准確的說是yield返回的值)是在哪里被捕獲的呢?
class Scheduler(object): ...... def main_loop(self): while True: task = self.ready.get() try: result = task.run() # 就是這!!!! except StopIteration: self.exit(task) continue self.schedule(task)
所以我們需要對 result 進行一些處理。修改main_loop()方法:
def main_loop(self): ''' 啟動loop :return: ''' while True: task = self.ready.get() try: result = task.run() if isinstance(result, SystemCall): # 判斷result是否是系統調用 result.task = task # 保存當前環境: 當前task result.scheduler = self # 保存當前環境: 當前調度器 result.handler() # 運行該系統調用的handler方法 continue except StopIteration: self.exit(task.tid) continue self.schedule(task)
3 真正的操作系統有什么系統調用?
1 GetTid:獲取當前tid

class GetTid(SystemCall): def __init__(self): super().__init__() def handler(self): self.task.sendval = self.task.tid self.scheduler.ready.put(self.task)
實例:

def Laundry(): while True: tid = yield GetTid() print('I am doing the laundry,my tid is %s'% tid) def Cook(): for i in range(10): tid = yield GetTid() print('I am cooking,my tid is %s'% tid) s = Scheduler() s.new(Laundry()) s.new(Cook()) s.main_loop()
2 New: 新建一個task
class New(SystemCall): def __init__(self,target): super().__init__() self.target = target def handler(self): tid = self.scheduler.new(self.target) self.task.sendval = tid self.scheduler.schedule(self.task)
實例:
def foo(): for i in range(5): tid = yield GetTid() print('I am foo,my tid is %s'% tid) def bar(): for i in range(10): tid = yield GetTid() print('I am bar,my tid is %s'% tid) r = yield New(foo()) if r: print('create a new task,the tid of the new task is %s'% r) s = Scheduler() s.new(bar()) s.main_loop()
3 kill一個task
class Kill(SystemCall): def __init__(self,tid): super().__init__() self.kill_tid = tid def handler(self): target_task = self.scheduler.task_map.get(self.kill_tid, None) if target_task: target_task.target.close() self.task.sendval = True self.scheduler.schedule(self.task)
實例:
def foo(): for i in range(5): tid = yield GetTid() print('I am foo,my tid is %s'% tid) def bar(): for i in range(10): tid = yield GetTid() print('I am bar,my tid is %s'% tid) r = yield New(foo()) if r: print('create a new task,the tid of the new task is %s'% r) yield r = yield Kill(r) if r: print('killed success') s = Scheduler() s.new(bar()) s.main_loop()
4 Task waiting
就是說,a task需要等待b task執行完成后才能繼續運行
先來講一下大體思路:
1 我們為調度程序設置一個字典,用來存儲waiting task(相當於a task) 與 wait for task(相當於b task)的關系
2 字典的鍵值對該如何設計呢?
a方案: 假如是 a:[b1,b2...] 這樣設計,那么a可以有很多的wait for task,但缺點就是當某個wait for task運行完后,
我們需要遍歷該字典,並進行相關檢測,才能知道可以運行哪些task
b方案: 假如是 b:[a1,a2...] 這樣設計,那么當b完成后我們可以很快的知道哪些task可以運行了,缺點就是無法為a設置
多個wait for task。(當然也可以設置多個,但很麻煩)
3 我們偷個懶:每個task僅允許一個wait for task 。所以我們選擇b方案。為調度器設立一個新的屬性 self.exit_wating = {}
class Scheduler(object): def __init__(self): self.task_map = {} self.ready = queue.Queue() self.exit_waiting = {}
4 當一個task運行完成后(也就是調度器執行了exit(task)),我們需要檢查該task是否有關聯的waiting task
如果有的話: 將waiting task放入ready隊列
沒有: do noting
def exit(self,tid): ''' 處理運行結束的task :param task: Task對象 :return: ''' del self.task_map[tid] waiting_tasks = self.exit_waiting.pop(tid, None) if waiting_tasks: for task in waiting_tasks: self.schedule(task)
5 為調度器添加一個方法wait_for_exit:該方法接收兩個參數:waiting_task wait_for_task_tid
該方法會在調度器的exit_waiting中生成/修改對應的鍵值對
返回是否成功
def wait_for_exit(self,wait_tid,waiting_task): ''' 為task設置wait for task :param wait_tid: wait for task的tid :param waiting_task: waiting task :return: ''' if self.task_map.get(wait_tid,None): self.exit_waiting.setdefault(wait_tid,[]).append(waiting_task) return True else: return False
6 創建一個系統調用 TaskWait
該系統調用接受一個參數,接收wait for task的tid
運行調度器的wait_for_exit方法
若失敗則調度該task
class TaskWait(SystemCall): def __init__(self,wait_tid): super().__init__() self.wait_tid = wait_tid def handler(self): r = self.scheduler.wait_for_exit(self.wait_tid,self.task) if not r: self.scheduler.schedule(self.task)
實例:

def foo(): for i in range(5): tid = yield GetTid() print('I am foo,my tid is %s'% tid) def bar(): for i in range(10): tid = yield GetTid() print('I am bar,my tid is %s'% tid) r = yield New(foo()) if r: print('create a new task,the tid of the new task is %s'% r) yield r = yield TaskWait(r) yield if r: print('stop bar') s = Scheduler() s.new(bar()) s.main_loop()
好了到現在為止:
我們已經實現了多任務處理和系統調用。
大家可能對generator/coroutine的用途有了一點點的認知----> 我們可以自己來定義task的切換(也就是在用戶空間級別進行task切換!), 但是!!! 這有什么用呢?
繼續深入
所謂交淺言深:不要怪別人太深,只能怪自己太短!! 所以,繼續成長並深入吧,騷年!!
下一步,根據我們構建的系統搞一個web服務器
def handle_client(client, addr): print("Connection from", addr) while True: data = client.recv(65536) if not data: break client.send(data) client.close() print("Client closed") yield # Make the function a generator/coroutine def server(port): print("Server starting") sock = socket.socket() sock.bind(("", port)) sock.listen(5) while True: # 阻塞 client, addr = sock.accept() yield New(handle_client(client, addr)) def alive(): while True: print("I'm alive!") yield sched = Scheduler() sched.new(alive()) sched.new(server(45000)) sched.main_loop() # 結果: I'm alive! Server starting # server阻塞了整個scheduler,無法運行其他task(也就是alive)
當某個task需要進行I/O時(也就是阻塞時),會阻塞整個scheduler,其他task無法運行
這很不科學啊,真正的操作系統可不會這樣,所以我們要改進。
改進:
1 我們增加一個屬性(dict格式),用來存放文件描述符與task的對應關系
class Scheduler(object): def __init__(self): self.task_map = {} self.ready = queue.Queue() self.exit_waiting = {} self.read_waiting = {} # fd可讀 self.write_waiting = {} # fd可寫
2 給調度器增加兩個方法,用來將對應的fd與task增加到read_waiting或者write_waiting中
def wait_for_read(self, fd, task): self.read_waiting[fd] = task def wait_for_write(self, fd, task): self.write_waiting[fd] = task
3 再加一些系統調用,當task調用該系統調用時,代表該task需要等待某個文件描述符就緒。在文件描述符沒有准備就緒時,將task放入dict中,不加入ready隊列。 這樣一個task阻塞就不會導致整體阻塞了。
class ReadWait(SystemCall): def __init__(self,fd): super().__init__() self.fd = fd.fileno() or fd def handler(self): self.scheduler.wait_for_read(self.fd, self.task) class WriteWait(SystemCall): def __init__(self,fd): super().__init__() self.fd = fd.fileno() or fd def handler(self): self.scheduler.wait_for_read(self.fd, self.task)
4 然后我們要解決文件描述符就緒后喚醒task的問題。我們引入select模塊用來監控文件描述符,當某個文件描述符就緒時,將其所對應的task放入ready隊列中。
def ioloop(self,timeout): ''' 檢測當前是否有文件描述符就緒,若有則將對應的task放入調度隊列中 :param timeout: 超時時間 :return: ''' if self.write_waiting or self.read_waiting: r, w, e = select.select(self.read_waiting, self.write_waiting, [], timeout) for i in r: task = self.read_waiting.pop(i,None) if task: self.schedule(task) for i in w: task = self.write_waiting.pop(i, None) if task: self.schedule(task)
5 然后我們要考慮什么時候監控?
方式一: 我們可以在每次從ready隊列中取出task之前進行運行監控方法。 就像這樣:
class Scheduler(object): ... def mainloop(self): while self.taskmap: self.iopoll(0) # 放在這里的話,表示: 在每一次運行task之前都運行一次 task = self.ready.get() try: result = task.run() ...
方式二: 我們可以將監控方法作為一個task,放入ready隊列中。
比較優劣: 方式一執行監控方法過於頻繁,如果ready隊列中task過多,則很浪費cpu資源。,而且每一次運行select()就會導致一次真正意義上的內核上下文切換!
所以方式二是較為可行的:
def io_task(self): while True: if self.ready.empty(): self.ioloop(None) else: self.ioloop(0) yield def main_loop(self): ''' 啟動loop :return: ''' self.new(self.io_task()) # 在這里添加 while True: task = self.ready.get() try: ...
6 修改web服務器
def handle_client(client, addr): print("Connection from", addr) try: while True: yield ReadWait(client) data = client.recv(65536) client.sendall(data) except ConnectionResetError: client.close() print("Client closed") def server(port): print("Server starting") sock = socket.socket() sock.bind(("127.0.0.1", port)) sock.listen(5) while True: yield ReadWait(sock) client, addr = sock.accept() yield New(handle_client(client, addr)) def alive(): while True: print("I'm alive!") yield sched = Scheduler() # sched.new(alive()) sched.new(server(8888)) sched.main_loop()
來一個客戶端:
import socket def client(): s = socket.socket() s.connect(('127.0.0.1',8888)) try: while True: input_data = input('please input message').encode() s.sendall(input_data) print(s.recv(200).decode()) except: s.close() client()
總結:
我們好像是搞了一個純Python開發的' 操作系統 ' 這個操作系統:
1 采用協程與I/O多路復用相結合
2 加上一個修改過的簡單 web 服務器
就可以處理多個連接!!,而且最重要的是: 我們僅僅使用了一個進程/線程!! 好了,是時候 自舔一波了!!--(tm我太帥了……)
1 多進程/多線程網絡編程都是一個進程或者線程處理一個task,當task過多時,就會導致巨量的進程/線程。
2 巨量的進程/線程會導致 上下文切換極其頻繁! 大家知道:上下文切換是要消耗cpu資源的 所以當進程/線程數量過多時,cpu資源就得不到有效利用
3 而協程實際上就是:在用戶空間實現task的上下文切換! 這種上下文切換消耗的代價相較而言微乎其微。這就是協程的優勢!
4 當然協程也有劣勢:就是無法利用多核cpu,但是我們有解決辦法:多進程 + 協程
The end
示例代碼:地址