Gevent官網文檔地址:http://www.gevent.org/contents.html
進程、線程、協程區分
我們通常所說的協程Coroutine其實是corporate routine的縮寫,直接翻譯為協同的例程,一般我們都簡稱為協程。
在linux系統中,線程就是輕量級的進程,而我們通常也把協程稱為輕量級的線程即微線程。
進程和協程
下面對比一下進程和協程的相同點和不同點:
相同點:
相同點存在於,當我們掛起一個執行流的時,我們要保存的東西:
- 棧, 其實在你切換前你的局部變量,以及要函數的調用都需要保存,否則都無法恢復
- 寄存器狀態,這個其實用於當你的執行流恢復后要做什么
而寄存器和棧的結合就可以理解為上下文,上下文切換的理解:
CPU看上去像是在並發的執行多個進程,這是通過處理器在進程之間切換來實現的,操作系統實現這種交錯執行的機制稱為上下文切換
操作系統保持跟蹤進程運行所需的所有狀態信息。這種狀態,就是上下文。
在任何一個時刻,操作系統都只能執行一個進程代碼,當操作系統決定把控制權從當前進程轉移到某個新進程時,就會進行上下文切換,即保存當前進程的上下文,恢復新進程的上下文,然后將控制權傳遞到新進程,新進程就會從它上次停止的地方開始。
不同點:
- 執行流的調度者不同,進程是內核調度,而協程是在用戶態調度,也就是說進程的上下文是在內核態保存恢復的,而協程是在用戶態保存恢復的,很顯然用戶態的代價更低
- 進程會被強占,而協程不會,也就是說協程如果不主動讓出CPU,那么其他的協程,就沒有執行的機會。
- 對內存的占用不同,實際上協程可以只需要4K的棧就足夠了,而進程占用的內存要大的多
- 從操作系統的角度講,多協程的程序是單進程,單協程
線程和協程
既然我們上面也說了,協程也被稱為微線程,下面對比一下協程和線程:
- 線程之間需要上下文切換成本相對協程來說是比較高的,尤其在開啟線程較多時,但協程的切換成本非常低。
- 同樣的線程的切換更多的是靠操作系統來控制,而協程的執行由我們自己控制。
協程只是在單一的線程里不同的協程之間切換,其實和線程很像,線程是在一個進程下,不同的線程之間做切換,這也可能是協程稱為微線程的原因吧。
Gevent模塊
Gevent是一種基於協程的Python網絡庫,它用到Greenlet提供的,封裝了libevent事件循環的高層同步API。它讓開發者在不改變編程習慣的同時,用同步的方式寫異步I/O的代碼。
簡單示例:
import gevent def test1(): print 12 gevent.sleep(0) print 34 def test2(): print 56 gevent.sleep(0) print 78 gevent.joinall([ gevent.spawn(test1), gevent.spawn(test2), ])
結果:
12 56 34 78
猴子補丁 Monkey patching
這個補丁是Gevent模塊最需要注意的問題,有了它,才會讓Gevent模塊發揮它的作用。我們往往使用Gevent是為了實現網絡通信的高並發,但是,Gevent直接修改標准庫里面大部分的阻塞式系統調用,包括socket、ssl、threading和 select等模塊,而變為協作式運行。但是我們無法保證你在復雜的生產環境中有哪些地方使用這些標准庫會由於打了補丁而出現奇怪的問題。
一種方法是使用gevent下的socket模塊,我們可以通過”from gevent import socket”來導入。不過更常用的方法是使用猴子布丁(Monkey patching)。使用猴子補丁褒貶不一,但是官網上還是建議使用”patch_all()”,而且在程序的第一行就執行。
from gevent import monkey; monkey.patch_socket() import gevent import socket urls = ['www.baidu.com', 'www.gevent.org', 'www.python.org'] jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls] gevent.joinall(jobs, timeout=5) print [job.value for job in jobs]
上述代碼的第一行就是對socket標准庫打上猴子補丁,此后socket標准庫中的類和方法都會被替換成非阻塞式的,所有其他的代碼都不用修改,這樣協程的效率就真正體現出來了。Python中其它標准庫也存在阻塞的情況,gevent提供了”monkey.patch_all()”方法將所有標准庫都替換。
獲取協程狀態
- started屬性/ready()方法:判斷協程是否已啟動。
- successful()方法:判斷協程是否成功運行且沒有拋出異常。
- value屬性:獲取協程執行完之后的返回值。
另外,greenlet協程運行過程中發生的異常是不會被拋出到協程外的,因此需要用協程對象的”exception”屬性來獲取協程中的異常。
下面的例子很好的演示了各種方法和屬性的使用。
#!/usr/bin/env python # _*_ coding utf-8 _*_ #Author: aaron import gevent def win(): return 'You win!' def fail(): raise Exception('You failed!') winner = gevent.spawn(win) loser = gevent.spawn(fail) print(winner.started) # True print(loser.started) # True # 在Greenlet中發生的異常,不會被拋到Greenlet外面。 # 控制台會打出Stacktrace,但程序不會停止 try: gevent.joinall([winner, loser]) except Exception as e: # 這段永遠不會被執行 print('This will never be reached') print(winner.ready()) # True print(loser.started) # True print(winner.value) # 'You win!' print(loser.value) # None print('successful ',winner.successful()) # True print('successful ',loser.successful()) # False # 這里可以通過raise loser.exception 或 loser.get() # 來將協程中的異常拋出 print(loser.exception)
協程運行超時控制
之前我們講過在”gevent.joinall()”方法中可以傳入timeout參數來設置超時,我們也可以在全局范圍內設置超時時間:
import gevent from gevent import Timeout timeout = Timeout(2) # 2 seconds timeout.start() def wait(): gevent.sleep(10) try: gevent.spawn(wait).join() except Timeout: print('Could not complete')
上例中,我們將超時設為2秒,此后所有協程的運行,如果超過兩秒就會拋出”Timeout”異常。我們也可以將超時設置在with語句內,這樣該設置只在with語句塊中有效:
with Timeout(1): gevent.sleep(10)
此外,我們可以指定超時所拋出的異常,來替換默認的”Timeout”異常。比如下例中超時就會拋出我們自定義的”TooLong”異常。
class TooLong(Exception): pass with Timeout(1, TooLong): gevent.sleep(10)
協程間通信
事件(Event)對象
greenlet協程間的異步通訊可以使用事件(Event)對象。該對象的”wait()”方法可以阻塞當前協程,而”set()”方法可以喚醒之前阻塞的協程。在下面的例子中,5個waiter協程都會等待事件evt,當setter協程在3秒后設置evt事件,所有的waiter協程即被喚醒。
#!/usr/bin/env python # _*_ coding utf-8 _*_ #Author: aaron import gevent from gevent.event import Event evt = Event() def setter(): print 'Wait for me' gevent.sleep(3) # 3秒后喚醒所有在evt上等待的協程 print "Ok, I'm done" evt.set() # 喚醒 def waiter(): print "I'll wait for you" evt.wait() # 等待 print 'Finish waiting' gevent.joinall([ gevent.spawn(setter), gevent.spawn(waiter), gevent.spawn(waiter), gevent.spawn(waiter), gevent.spawn(waiter), gevent.spawn(waiter) ])
AsyncResult事件
除了Event事件外,gevent還提供了AsyncResult事件,它可以在喚醒時傳遞消息。讓我們將上例中的setter和waiter作如下改動:
#!/usr/bin/env python # _*_ coding utf-8 _*_ #Author: aaron from gevent.event import AsyncResult aevt = AsyncResult() def setter(): print 'Wait for me' gevent.sleep(3) # 3秒后喚醒所有在evt上等待的協程 print "Ok, I'm done" aevt.set('Hello!') # 喚醒,並傳遞消息 def waiter(): print("I'll wait for you") message = aevt.get() # 等待,並在喚醒時獲取消息 print 'Got wake up message: %s' % message
隊列 Queue
隊列Queue的概念相信大家都知道,我們可以用它的put和get方法來存取隊列中的元素。gevent的隊列對象可以讓greenlet協程之間安全的訪問。運行下面的程序,你會看到3個消費者會分別消費隊列中的產品,且消費過的產品不會被另一個消費者再取到:
#!/usr/bin/env python # _*_ coding utf-8 _*_ #Author: aaron
import gevent from gevent.queue import Queue products = Queue() def consumer(name): #while not products.empty(): while True: try: print('%s got product %s' % (name, products.get_nowait())) gevent.sleep(0) except gevent.queue.Empty: break print('Quit') def producer(): for i in range(1, 10): products.put(i) gevent.joinall([ gevent.spawn(producer), gevent.spawn(consumer, 'steve'), gevent.spawn(consumer, 'john'), gevent.spawn(consumer, 'nancy'), ])
注意:協程隊列跟線程隊列是一樣的,put和get方法都是阻塞式的,它們都有非阻塞的版本:put_nowait和get_nowait。如果調用get方法時隊列為空,則是不會拋出”gevent.queue.Empty”異常。我們只能使用get_nowait()的方式讓氣拋出異常。
信號量
信號量可以用來限制協程並發的個數。它有兩個方法,acquire和release。顧名思義,acquire就是獲取信號量,而release就是釋放。當所有信號量都已被獲取,那剩余的協程就只能等待任一協程釋放信號量后才能得以運行:
#!/usr/bin/env python # _*_ coding utf-8 _*_ #Author: aaron import gevent from gevent.coros import BoundedSemaphore sem = BoundedSemaphore(2) def worker(n): sem.acquire() print('Worker %i acquired semaphore' % n) gevent.sleep(0) sem.release() print('Worker %i released semaphore' % n) gevent.joinall([gevent.spawn(worker, i) for i in xrange(0, 6)])
上面的例子中,我們初始化了”BoundedSemaphore”信號量,並將其個數定為2。所以同一個時間,只能有兩個worker協程被調度。程序運行后的結果如下:
Worker 0 acquired semaphore Worker 1 acquired semaphore Worker 0 released semaphore Worker 1 released semaphore Worker 2 acquired semaphore Worker 3 acquired semaphore Worker 2 released semaphore Worker 3 released semaphore Worker 4 acquired semaphore Worker 4 released semaphore Worker 5 acquired semaphore Worker 5 released semaphore
如果信號量個數為1,那就等同於同步鎖。
協程本地變量
同線程類似,協程也有本地變量,也就是只在當前協程內可被訪問的變量:
#!/usr/bin/env python # _*_ coding utf-8 _*_ #Author: aaron import gevent from gevent.local import local data = local() def f1(): data.x = 1 print data.x def f2(): try: print data.x except AttributeError: print 'x is not visible' gevent.joinall([ gevent.spawn(f1), gevent.spawn(f2) ])
通過將變量存放在local對象中,即可將其的作用域限制在當前協程內,當其他協程要訪問該變量時,就會拋出異常。不同協程間可以有重名的本地變量,而且互相不影響。因為協程本地變量的實現,就是將其存放在以的”greenlet.getcurrent()”的返回為鍵值的私有的命名空間內。
多並發socket模型
服務器端:
#!/usr/bin/env python # _*_ coding utf-8 _*_ #Author: aaron import socket import gevent from gevent import socket, monkey monkey.patch_all() def server(port): s = socket.socket() s.bind(('0.0.0.0', port)) s.listen(500) while True: cli, addr = s.accept() gevent.spawn(handle_request, cli) def handle_request(conn): try: while True: data = conn.recv(1024) print("recv:", data) conn.send(data) if not data: conn.shutdown(socket.SHUT_WR) except Exception as ex: print(ex) finally: conn.close() if __name__ == '__main__': server(8001)
當客戶端連接上服務器端時,服務器端通過開辟一個協程與該客戶端完成交互任務,同時由於使用了Gevent協程的方式,在每個客戶端與服務器交互時,並不會影響到服務器端的工作。
客戶端:
#!/usr/bin/env python # _*_ coding utf-8 _*_ #Author: aaron import socket HOST = 'localhost' # The remote host PORT = 8001 # The same port as used by the server s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((HOST, PORT)) while True: msg = bytes(input(">>:"), encoding="utf8") s.sendall(msg) data = s.recv(1024) # print(data) print('Received', repr(data)) # repr 格式化輸出 s.close()