前言:gevent是python的一個並發框架,以微線程greenlet為核心,使用了epoll事件監聽機制以及諸多其他優化而變得高效.而且其中有個monkey類,
將現有基於Python線程直接轉化為greenlet(類似於打patch).他和線程框架性能比高大概4倍(看下圖,是gevent和paste的對比):
工作暫時沒有用gevent的地方,這里就簡單的對http://sdiehl.github.com/gevent-tutorial的一些例子和內容翻譯:
1 同步和異步
def foo(): print('Running in foo') gevent.sleep(0) #讓當前的greenlet睡眠N秒,這0標識控制其它協程而不會讓其它進程睡眠 print('Explicit context switch to foo again') def bar(): print('Explicit context to bar') gevent.sleep(0) print('Implicit context switch back to bar') gevent.joinall([ #gevent.Greenlet實例,直到這個greenlet完成或者超時 gevent.spawn(foo), #spawn可以實現一個grennlet實例並且加到隊列並且啟動,效果類似於gevent.Greenlet(foo).start() gevent.spawn(bar), ])
執行結果的效果圖:
dongwm@localhost ~ $ python test.py
Explicit context to bar
Running in foo
Explicit context switch to foo again
Implicit context switch back to bar
import time
import gevent from gevent import select #類似於內置的select.select()實現(請關注http://www.dongwm.com/archives/guanyuselectyanjiu/),只是將線程操作改成了greenlet start = time.time() tic = lambda: 'at %1.1f seconds' % (time.time() - start) def gr1(): print('Started Polling: ', tic()) select.select([], [], [], 2) #參數分別是,等待的可讀列表,等待的可寫列表,等待的可執行列表,超時時間(這里是2秒) print('Ended Polling: ', tic()) def gr2(): print('Started Polling: ', tic()) select.select([], [], [], 2) print('Ended Polling: ', tic()) def gr3(): print("Hey lets do some stuff while the greenlets poll, at", tic()) gevent.sleep(1) gevent.joinall([ gevent.spawn(gr1), gevent.spawn(gr2), gevent.spawn(gr3), ])
執行結果:
dongwm@localhost ~ $ python test.py
(‘Hey lets do some stuff while the greenlets poll, at’, ‘at 0.0 seconds’) #因為gr1和gr2開始是阻塞的,gr3直接打印
(‘Started Polling: ‘, ‘at 0.0 seconds’)
(‘Started Polling: ‘, ‘at 0.0 seconds’)
(‘Ended Polling: ‘, ‘at 2.0 seconds’)
(‘Ended Polling: ‘, ‘at 2.0 seconds’)
import gevent import random def task(pid): gevent.sleep(random.randint(0,2)*0.001) print('Task', pid, 'done') def synchronous(): #同步 for i in range(1,10): task(i) def asynchronous(): #異步 threads = [gevent.spawn(task, i) for i in xrange(10)] gevent.joinall(threads) print('Synchronous:') synchronous() print('Asynchronous:') asynchronous()
執行結果:
dongwm@localhost ~ $ python test.py
Synchronous: #協程不會控制其它進程睡眠,所以挨個執行
(‘Task’, 1, ‘done’)
(‘Task’, 2, ‘done’)
(‘Task’, 3, ‘done’)
(‘Task’, 4, ‘done’)
(‘Task’, 5, ‘done’)
(‘Task’, 6, ‘done’)
(‘Task’, 7, ‘done’)
(‘Task’, 8, ‘done’)
(‘Task’, 9, ‘done’)
Asynchronous: #他們放在grennlet里面,sleep的時間是隨機的,完成順序也就不同了
(‘Task’, 2, ‘done’)
(‘Task’, 3, ‘done’)
(‘Task’, 5, ‘done’)
(‘Task’, 7, ‘done’)
(‘Task’, 9, ‘done’)
(‘Task’, 6, ‘done’)
(‘Task’, 1, ‘done’)
(‘Task’, 0, ‘done’)
(‘Task’, 8, ‘done’)
(‘Task’, 4, ‘done’)
import gevent from gevent import Greenlet def foo(message, n): gevent.sleep(n) print(message) thread1 = Greenlet.spawn(foo, "Hello", 1) #實例化Greenlet thread2 = gevent.spawn(foo, "I live!", 2) #實例化gevent,其實也是創建Greenlet實例,只是包裝了一下 thread3 = gevent.spawn(lambda x: (x+1), 2) #一個lambda表達式 threads = [thread1, thread2, thread3] gevent.joinall(threads) #等待所有greenlet完成
執行結果:
dongwm@localhost ~ $ python test.py
Hello
I live! #打印出來效果不明顯,事實上等待一秒打印第一行,再等待一秒打印第二行,然后馬上完成(lambda沒有顯示)
import gevent from gevent import Greenlet class MyGreenlet(Greenlet): #重載Greenlet類 def __init__(self, message, n): Greenlet.__init__(self) self.message = message self.n = n def _run(self): #重寫_run方法 print(self.message) gevent.sleep(self.n) g = MyGreenlet("Hi there!", 3) g.start() g.join()
import gevent def win(): return 'You win!' def fail(): raise Exception('You fail at failing.') winner = gevent.spawn(win) loser = gevent.spawn(fail) print(winner.started) # started表示的Greenlet是否已經開始,返回布爾值 print(loser.started) # True try: gevent.joinall([winner, loser]) except Exception as e: print('This will never be reached') print(winner.value) # value表示greenlet實例返回值:'You win!' print(loser.value) # None print(winner.ready()) # 是否已停止Greenlet的布爾值,True print(loser.ready()) # True print(winner.successful()) # 表示的Greenlet是否已成功停止,而不是拋出異常,True print(loser.successful()) # False print(loser.exception) #打印異常的報錯信息
執行結果:
dongwm@localhost ~ $ python test.py
True
True
Traceback (most recent call last):
File “/usr/lib/python2.7/site-packages/gevent-1.0dev-py2.7-linux-i686.egg/gevent/greenlet.py”, line 328, in run
result = self._run(*self.args, **self.kwargs)
File “test.py”, line 7, in fail
raise Exception(‘You fail at failing.’)
Exception: You fail at failing.
<Greenlet at 0xb73cd39cL: fail> failed with Exception
You win!
None
True
True
True
False
You fail at failing.
import gevent from gevent import Timeout seconds = 10 timeout = Timeout(seconds) timeout.start() def wait(): gevent.sleep(10) try: gevent.spawn(wait).join() except Timeout: print 'Could not complete'
上面的例子是可以執行完成的,但是假如修改seconds = 5,讓數值少入sleep,那么就會有超時被捕捉到
還可以使用with關鍵字處理上下文:
import gevent from gevent import Timeout time_to_wait = 5 # seconds class TooLong(Exception): pass with Timeout(time_to_wait, TooLong): gevent.sleep(10)
以及其他的方式的:
import gevent from gevent import Timeout def wait(): gevent.sleep(2) timer = Timeout(1).start() thread1 = gevent.spawn(wait) #這種超時類型前面講過 try: thread1.join(timeout=timer) except Timeout: print('Thread 1 timed out') timer = Timeout.start_new(1) #start_new是一個快捷方式 thread2 = gevent.spawn(wait) try: thread2.get(timeout=timer) #get返回greenlet的結果,包含異常 except Timeout: print('Thread 2 timed out') try: gevent.with_timeout(1, wait) #如果超時前返回異常,取消這個方法 except Timeout: print('Thread 3 timed out')
2 數據結構
import gevent from gevent.event import AsyncResult a = AsyncResult() #保存一個值或者一個異常的事件實例 def setter(): gevent.sleep(3) #3秒后喚起所有線程的a的值 a.set() #保存值,喚起等待線程 def waiter(): a.get() # 3秒后get方法不再阻塞,返回存貯的值或者異常 print 'I live!' gevent.joinall([ gevent.spawn(setter), gevent.spawn(waiter), ])
更清晰的例子:
from gevent.event import AsyncResult a = AsyncResult() def setter(): gevent.sleep(3) a.set('Hello!') def waiter(): print a.get() gevent.joinall([ gevent.spawn(setter), gevent.spawn(waiter), ])
import gevent from gevent.queue import Queue #類似於內置的Queue tasks = Queue() #隊列實例 def worker(n): while not tasks.empty(): task = tasks.get() print('Worker %s got task %s' % (n, task)) gevent.sleep(0) print('Quitting time!') def boss(): for i in xrange(1,25): tasks.put_nowait(i) #非阻塞的把數據放到隊列里面 gevent.spawn(boss).join() gevent.joinall([ gevent.spawn(worker, 'steve'), gevent.spawn(worker, 'john'), gevent.spawn(worker, 'nancy'), ])
執行結果:
[root@248_STAT ~]# python !$
python test.py
Worker steve got task 1 #3個用戶循環的取出數據
Worker john got task 2
Worker nancy got task 3
Worker steve got task 4
Worker nancy got task 5
Worker john got task 6
Worker steve got task 7
Worker john got task 8
Worker nancy got task 9
Worker steve got task 10
Worker nancy got task 11
Worker john got task 12
Worker steve got task 13
Worker john got task 14
Worker nancy got task 15
Worker steve got task 16
Worker nancy got task 17
Worker john got task 18
Worker steve got task 19
Worker john got task 20
Worker nancy got task 21
Worker steve got task 22
Worker nancy got task 23
Worker john got task 24
Quitting time!
Quitting time!
Quitting time!
一個更復雜的例子:
import gevent from gevent.queue import Queue, Empty tasks = Queue(maxsize=3) #限制隊列的長度 def worker(n): try: while True: task = tasks.get(timeout=1) # 減少隊列,超時為1秒 print('Worker %s got task %s' % (n, task)) gevent.sleep(0) except Empty: print('Quitting time!') def boss(): """ Boss will wait to hand out work until a individual worker is free since the maxsize of the task queue is 3. """ for i in xrange(1,10): tasks.put(i) #這里boss沒有盲目的不停放入數據,而是在當最大三個隊列數有空余才放入數據,事實上方法轉換過程中,boss放入三個數據,worker取出三個數據,boss再放入數據.... print('Assigned all work in iteration 1') for i in xrange(10,20): tasks.put(i) print('Assigned all work in iteration 2') gevent.joinall([ gevent.spawn(boss), gevent.spawn(worker, 'steve'), gevent.spawn(worker, 'john'), gevent.spawn(worker, 'bob'), ])
import gevent
from gevent.pool import Group def talk(msg): for i in xrange(3): print(msg) g1 = gevent.spawn(talk, 'bar') g2 = gevent.spawn(talk, 'foo') g3 = gevent.spawn(talk, 'fizz') group = Group() #保持greenlet實例的組運行,連接到沒個項目,在其完成后刪除 group.add(g1) group.add(g2) group.join() group.add(g3) group.join()
看更加明確的例子:
import gevent from gevent import getcurrent from gevent.pool import Group group = Group() def hello_from(n): print('Size of group', len(group)) print('Hello from Greenlet %s' % id(getcurrent())) #獲取當前gevent實例的id group.map(hello_from, xrange(3)) #map迭代方法,參數為方法和其參數 def intensive(n): gevent.sleep(3 - n) return 'task', n print('Ordered') ogroup = Group() for i in ogroup.imap(intensive, xrange(3)): #相當於 itertools.imap,返回一個迭代器, 它是調用了一個其值在輸入迭代器上的函數, 返回結果. 它類似於函數 map() , 只是前者在 #任意輸入迭代器結束后就停止(而不是插入None值來補全所有的輸入) print(i) print('Unordered') igroup = Group() for i in igroup.imap_unordered(intensive, xrange(3)): print(i)
執行結果:
[root@248_STAT ~]# python test.py
(‘Size of group’, 3)
Hello from Greenlet 314818960
(‘Size of group’, 3)
Hello from Greenlet 314819280
(‘Size of group’, 3)
Hello from Greenlet 314819440
Ordered
(‘task’, 0)
(‘task’, 1)
(‘task’, 2)
Unordered
(‘task’, 2)
(‘task’, 1)
(‘task’, 0)
還能限制pool池的大小
import gevent from gevent import getcurrent from gevent.pool import Pool pool = Pool(2) def hello_from(n): print('Size of pool', len(pool)) pool.map(hello_from, xrange(3))
返回結果:
[root@248_STAT ~]# python test.py
(‘Size of pool’, 2)
(‘Size of pool’, 2)
(‘Size of pool’, 1) #因為上面的pool容納不了第三個,這是一個新的pool
以下是作者寫的一個pool操作類:
from gevent.pool import Pool class SocketPool(object): def __init__(self): self.pool = Pool(1000) #設置池容量1000 self.pool.start() def listen(self, socket): while True: socket.recv() def add_handler(self, socket): if self.pool.full(): #容量慢報錯 raise Exception("At maximum pool size") else: #否則執行在新的grenlet里面執行listen方法 self.pool.spawn(self.listen, socket) def shutdown(self): self.pool.kill() #關閉pool
from gevent.pool import Pool from gevent.coros import BoundedSemaphore sem = BoundedSemaphore(2) #設定對共享資源的訪問數量 def worker1(n): sem.acquire() #獲取資源 print('Worker %i acquired semaphore' % n) sleep(0) sem.release() #釋放資源 print('Worker %i released semaphore' % n) def worker2(n): with sem: #使用with關鍵字 print('Worker %i acquired semaphore' % n) sleep(0) print('Worker %i released semaphore' % n) pool = Pool() pool.map(worker1, xrange(0,2)) pool.map(worker2, xrange(3,6))
執行結果:
[root@248_STAT ~]# python test.py
Worker 0 acquired semaphore
Worker 1 acquired semaphore #因為pool能容納這2個請求,所以同時獲取,再釋放
Worker 0 released semaphore
Worker 1 released semaphore
Worker 3 acquired semaphore #因為只能接收2個,那么5就要到下一輪
Worker 4 acquired semaphore
Worker 3 released semaphore
Worker 4 released semaphore
Worker 5 acquired semaphore
Worker 5 released semaphore
一個gevent教材上面說過的ping pong的那個協程例子的另一個實現:
import gevent
from gevent.queue import Queue from gevent import Greenlet class Actor(gevent.Greenlet): #自定義actor類 def __init__(self): self.inbox = Queue() #收件箱作為一個隊列 Greenlet.__init__(self) def receive(self, message): raise NotImplemented() #內置常量,表面意為沒有實施 def _run(self): # self.running = True while self.running: message = self.inbox.get() #獲取隊列數據 self.receive(message) class Pinger(Actor): def receive(self, message): #重寫方法 print message pong.inbox.put('ping') #當獲取收件箱有數據,獲取數據,再放入數據(注意:是ping中放pong數據),其中pong是一個局部變量,它是Ponger的實例,以下的同理 gevent.sleep(0) class Ponger(Actor): def receive(self, message): print message ping.inbox.put('pong') gevent.sleep(0) ping = Pinger() pong = Ponger() ping.start() pong.start() ping.inbox.put('start') #最開始都是阻塞的,給一個觸發 gevent.joinall([ping, pong])
from:http://www.dongwm.com/archives/guanyugeventdeyixielijieyi-2/