進程 線程 協程 異步
並發編程(不是並行)目前有四種方式:多進程、多線程、協程和異步。
- 多進程編程在python中有類似C的os.fork,更高層封裝的有multiprocessing標准庫
- 多線程編程python中有Thread和threading
- 異步編程在linux下主+要有三種實現select,poll,epoll
- 協程在python中通常會說到yield,關於協程的庫主要有greenlet,stackless,gevent,eventlet等實現。
進程
- 不共享任何狀態
- 調度由操作系統完成
- 有獨立的內存空間(上下文切換的時候需要保存棧、cpu寄存器、虛擬內存、以及打開的相關句柄等信息,開銷大)
- 通訊主要通過信號傳遞的方式來實現(實現方式有多種,信號量、管道、事件等,通訊都需要過內核,效率低)
線程
- 共享變量(解決了通訊麻煩的問題,但是對於變量的訪問需要加鎖)
- 調度由操作系統完成(由於共享內存,上下文切換變得高效)
- 一個進程可以有多個線程,每個線程會共享父進程的資源(創建線程開銷占用比進程小很多,可創建的數量也會很多)
- 通訊除了可使用進程間通訊的方式,還可以通過共享內存的方式進行通信(通過共享內存通信比通過內核要快很多)
協程
- 調度完全由用戶控制
- 一個線程(進程)可以有多個協程
- 每個線程(進程)循環按照指定的任務清單順序完成不同的任務(當任務被堵塞時,執行下一個任務;當恢復時,再回來執行這個任務;任務間切換只需要保存任務的上下文,沒有內核的開銷,可以不加鎖的訪問全局變量)
- 協程需要保證是非堵塞的且沒有相互依賴
- 協程基本上不能同步通訊,多采用異步的消息通訊,效率比較高
總結
- 進程擁有自己獨立的堆和棧,既不共享堆,亦不共享棧,進程由操作系統調度
- 線程擁有自己獨立的棧和共享的堆,共享堆,不共享棧,線程亦由操作系統調度(標准線程是的)
- 協程和線程一樣共享堆,不共享棧,協程由程序員在協程的代碼里顯示調度
聊聊協程
協程,又稱微線程,纖程。
Python的線程並不是標准線程,是系統級進程,線程間上下文切換有開銷,而且Python在執行多線程時默認加了一個全局解釋器鎖(GIL),因此Python的多線程其實是串行的,所以並不能利用多核的優勢,也就是說一個進程內的多個線程只能使用一個CPU。
def coroutine(func): def ret(): f = func() f.next() return f return ret @coroutine def consumer(): print "Wait to getting a task" while True: n = (yield) print "Got %s",n import time def producer(): c = consumer() task_id = 0 while True: time.sleep(1) print "Send a task to consumer" % task_id c.send("task %s" % task_id) if __name__ == "__main__": producer()
結果:
Wait to getting a task
Send a task 0 to consumer
Got task 0
Send a task 1 to consumer
Got task 1
Send a task 2 to consumer
Got task 2
...
傳統的生產者-消費者模型是一個線程寫消息,一個線程取消息,通過鎖機制控制隊列和等待,但容易死鎖。
如果改用協程,生產者生產消息后,直接通過yield跳轉到消費者開始執行,待消費者執行完畢后,切換回生產者繼續生產,效率極高。
Gevent
介紹
gevent是基於協程的Python網絡庫。特點:
- 基於libev的快速事件循環(Linux上epoll,FreeBSD上kqueue)。
- 基於greenlet的輕量級執行單元。
- API的概念和Python標准庫一致(如事件,隊列)。
- 可以配合socket,ssl模塊使用。
- 能夠使用標准庫和第三方模塊創建標准的阻塞套接字(gevent.monkey)。
- 默認通過線程池進行DNS查詢,也可通過c-are(通過GEVENT_RESOLVER=ares環境變量開啟)。
- TCP/UDP/HTTP服務器
- 子進程支持(通過gevent.subprocess)
- 線程池
安裝和依賴
依賴於greenlet library
支持python 2.6+ 、3.3+
核心部分
- Greenlets
- 同步和異步執行
- 確定性
- 創建Greenlets
- Greenlet狀態
- 程序停止
- 超時
- 猴子補丁
Greenlets
gevent中的主要模式, 它是以C擴展模塊形式接入Python的輕量級協程。 全部運行在主程序操作系統進程的內部,但它們被程序員協作式地調度。
在任何時刻,只有一個協程在運行。
區別於multiprocessing、threading等提供真正並行構造的庫, 這些庫輪轉使用操作系統調度的進程和線程,是真正的並行。
同步和異步執行
並發的核心思想在於,大的任務可以分解成一系列的子任務,后者可以被調度成 同時執行或異步執行,而不是一次一個地或者同步地執行。兩個子任務之間的 切換也就是上下文切換。
在gevent里面,上下文切換是通過yielding來完成的.
import gevent def foo(): print('Running in foo') gevent.sleep(0) print('Explicit context switch to foo again') def bar(): print('Explicit context to bar') gevent.sleep(0) print('Implicit context switch back to bar') gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), ])
執行結果:
Running in foo Explicit context to bar Explicit context switch to foo again Implicit context switch back to bar
代碼執行過程:
[圖片上傳失敗...(image-fd7606-1522313744163)]
網絡延遲或IO阻塞隱式交出greenlet上下文的執行權。
import time import gevent from gevent import select start = time.time() tic = lambda: 'at %1.1f seconds' % (time.time() - start) def gr1(): print('Started Polling: %s' % tic()) select.select([], [], [], 1) print('Ended Polling: %s' % tic()) def gr2(): print('Started Polling: %s' % tic()) select.select([], [], [], 2) print('Ended Polling: %s' % tic()) def gr3(): print("Hey lets do some stuff while the greenlets poll, %s" % tic()) gevent.sleep(1) gevent.joinall([ gevent.spawn(gr1), gevent.spawn(gr2), gevent.spawn(gr3), ])
執行結果:
Started Polling: at 0.0 seconds Started Polling: at 0.0 seconds Hey lets do some stuff while the greenlets poll, at 0.0 seconds Ended Polling: at 1.0 seconds Ended Polling: at 2.0 seconds
同步vs異步
import gevent import random def task(pid): gevent.sleep(random.randint(0,2)*0.001) print('Task %s done' % pid) def synchronous(): for i in xrange(5): task(i) def asynchronous(): threads = [gevent.spawn(task, i) for i in xrange(5)] gevent.joinall(threads) print('Synchronous:') synchronous() print('Asynchronous:') asynchronous()
執行結果:
Synchronous:
Task 0 done
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Asynchronous:
Task 2 done
Task 0 done
Task 1 done
Task 3 done
Task 4 done
確定性
greenlet具有確定性。在相同配置相同輸入的情況下,它們總是會產生相同的輸出。
import time
def echo(i):
time.sleep(0.001)
return i
# Non Deterministic Process Pool
from multiprocessing.pool import Pool
p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]
print(run1 == run2 == run3 == run4)
# Deterministic Gevent Pool
from gevent.pool import Pool
p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]
print(run1 == run2 == run3 == run4)
執行結果:
False True
即使gevent通常帶有確定性,當開始與如socket或文件等外部服務交互時, 不確定性也可能溜進你的程序中。因此盡管gevent線程是一種“確定的並發”形式, 使用它仍然可能會遇到像使用POSIX線程或進程時遇到的那些問題。
涉及並發長期存在的問題就是競爭條件(race condition)(當兩個並發線程/進程都依賴於某個共享資源同時都嘗試去修改它的時候, 就會出現競爭條件),這會導致資源修改的結果狀態依賴於時間和執行順序。 這個問題,會導致整個程序行為變得不確定。
解決辦法: 始終避免所有全局的狀態.
創建Greenlets
gevent對Greenlet初始化提供了一些封裝.
import gevent from gevent import Greenlet def foo(message, n): gevent.sleep(n) print(message) thread1 = Greenlet.spawn(foo, "Hello", 1) thread2 = gevent.spawn(foo, "I live!", 2) thread3 = gevent.spawn(lambda x: (x+1), 2) threads = [thread1, thread2, thread3] gevent.joinall(threads)
執行結果:
Hello
I live!
除使用基本的Greenlet類之外,你也可以子類化Greenlet類,重載它的_run方法。
import gevent
from gevent import Greenlet class MyGreenlet(Greenlet): def __init__(self, message, n): Greenlet.__init__(self) self.message = message self.n = n def _run(self): print(self.message) gevent.sleep(self.n) g = MyGreenlet("Hi there!", 3) g.start() g.join()
執行結果:
Hi there!
Greenlet狀態
greenlet的狀態通常是一個依賴於時間的參數:
- started -- Boolean, 指示此Greenlet是否已經啟動
- ready() -- Boolean, 指示此Greenlet是否已經停止
- successful() -- Boolean, 指示此Greenlet是否已經停止而且沒拋異常
- value -- 任意值, 此Greenlet代碼返回的值
- exception -- 異常, 此Greenlet內拋出的未捕獲異常
程序停止
程序
當主程序(main program)收到一個SIGQUIT信號時,不能成功做yield操作的 Greenlet可能會令意外地掛起程序的執行。這導致了所謂的僵屍進程, 它需要在Python解釋器之外被kill掉。
通用的處理模式就是在主程序中監聽SIGQUIT信號,調用gevent.shutdown退出程序。
import gevent import signal def run_forever(): gevent.sleep(1000) if __name__ == '__main__': gevent.signal(signal.SIGQUIT, gevent.shutdown) thread = gevent.spawn(run_forever) thread.join()
超時
通過超時可以對代碼塊兒或一個Greenlet的運行時間進行約束。
import gevent from gevent import Timeout seconds = 10 timeout = Timeout(seconds) timeout.start() def wait(): gevent.sleep(10) try: gevent.spawn(wait).join() except Timeout: print('Could not complete')
超時類
import gevent from gevent import Timeout time_to_wait = 5 # seconds class TooLong(Exception): pass with Timeout(time_to_wait, TooLong): gevent.sleep(10)
另外,對各種Greenlet和數據結構相關的調用,gevent也提供了超時參數。
import gevent from gevent import Timeout def wait(): gevent.sleep(2) timer = Timeout(1).start() thread1 = gevent.spawn(wait) try: thread1.join(timeout=timer) except Timeout: print('Thread 1 timed out') # -- timer = Timeout.start_new(1) thread2 = gevent.spawn(wait) try: thread2.get(timeout=timer) except Timeout: print('Thread 2 timed out') # -- try: gevent.with_timeout(1, wait) except Timeout: print('Thread 3 timed out')
執行結果:
Thread 1 timed out Thread 2 timed out Thread 3 timed out
猴子補丁(Monkey patching)
gevent的死角.
import socket print(socket.socket) print("After monkey patch") from gevent import monkey monkey.patch_socket() print(socket.socket) import select print(select.select) monkey.patch_select() print("After monkey patch") print(select.select)
執行結果:
class 'socket.socket' After monkey patch class 'gevent.socket.socket' built-in function select After monkey patch function select at 0x1924de8
Python的運行環境允許我們在運行時修改大部分的對象,包括模塊,類甚至函數。 這是個一般說來令人驚奇的壞主意,因為它創造了“隱式的副作用”,如果出現問題 它很多時候是極難調試的。雖然如此,在極端情況下當一個庫需要修改Python本身 的基礎行為的時候,猴子補丁就派上用場了。在這種情況下,gevent能夠修改標准庫里面大部分的阻塞式系統調用,包括socket、ssl、threading和 select等模塊,而變為協作式運行。
例如,Redis的python綁定一般使用常規的tcp socket來與redis-server實例通信。 通過簡單地調用gevent.monkey.patch_all(),可以使得redis的綁定協作式的調度 請求,與gevent棧的其它部分一起工作。
這讓我們可以將一般不能與gevent共同工作的庫結合起來,而不用寫哪怕一行代碼。 雖然猴子補丁仍然是邪惡的(evil),但在這種情況下它是“有用的邪惡(useful evil)”。
數據結構
- 事件
- 隊列
- 組和池
- 鎖和信號量
- 線程局部變量
- 子進程
- Actors
事件
事件(event)是一個在Greenlet之間異步通信的形式。
import gevent from gevent.event import Event evt = Event() def setter(): print('A: Hey wait for me, I have to do something') gevent.sleep(3) print("Ok, I'm done") evt.set() def waiter(): print("I'll wait for you") evt.wait() # blocking print("It's about time") def main(): gevent.joinall([ gevent.spawn(setter), gevent.spawn(waiter), gevent.spawn(waiter), gevent.spawn(waiter) ]) if __name__ == '__main__': main()
執行結果:
A: Hey wait for me, I have to do something
I'll wait for you
I'll wait for you
I'll wait for you
Ok, I'm done
It's about time
It's about time
It's about time
事件對象的一個擴展是AsyncResult,它允許你在喚醒調用上附加一個值。 它有時也被稱作是future或defered,因為它持有一個指向將來任意時間可設置為任何值的引用。
import gevent from gevent.event import AsyncResult a = AsyncResult() def setter(): gevent.sleep(3) a.set('Hello!') def waiter(): print(a.get()) gevent.joinall([ gevent.spawn(setter), gevent.spawn(waiter), ])
隊列
隊列是一個排序的數據集合,它有常見的put / get操作, 但是它是以在Greenlet之間可以安全操作的方式來實現的。
import gevent from gevent.queue import Queue tasks = Queue() def worker(n): while not tasks.empty(): task = tasks.get() print('Worker %s got task %s' % (n, task)) gevent.sleep(0) print('Quitting time!') def boss(): for i in xrange(1,10): tasks.put_nowait(i) gevent.spawn(boss).join() gevent.joinall([ gevent.spawn(worker, 'steve'), gevent.spawn(worker, 'john'), gevent.spawn(worker, 'nancy'), ])
執行結果:
Worker steve got task 1
Worker john got task 2
Worker nancy got task 3
Worker steve got task 4
Worker john got task 5
Worker nancy got task 6
Worker steve got task 7
Worker john got task 8
Worker nancy got task 9
Quitting time!
Quitting time!
Quitting time!
put和get操作都是阻塞的,put_nowait和get_nowait不會阻塞, 然而在操作不能完成時拋出gevent.queue.Empty或gevent.queue.Full異常。
組和池
組(group)是一個運行中greenlet集合,集合中的greenlet像一個組一樣會被共同管理和調度。 它也兼飾了像Python的multiprocessing庫那樣的平行調度器的角色,主要用在在管理異步任務的時候進行分組。
import gevent
from gevent.pool import Group def talk(msg): for i in xrange(2): print(msg) g1 = gevent.spawn(talk, 'bar') g2 = gevent.spawn(talk, 'foo') g3 = gevent.spawn(talk, 'fizz') group = Group() group.add(g1) group.add(g2) group.join() group.add(g3) group.join()
執行結果:
bar
bar
foo
foo
fizz
fizz
池(pool)是一個為處理數量變化並且需要限制並發的greenlet而設計的結構。
import gevent from gevent.pool import Pool pool = Pool(2) def hello_from(n): print('Size of pool %s' % len(pool)) pool.map(hello_from, xrange(3))
執行結果:
Size of pool 2 Size of pool 2 Size of pool 1
構造一個socket池的類,在各個socket上輪詢。
from gevent.pool import Pool class SocketPool(object): def __init__(self): self.pool = Pool(10) self.pool.start() def listen(self, socket): while True: socket.recv() def add_handler(self, socket): if self.pool.full(): raise Exception("At maximum pool size") else: self.pool.spawn(self.listen, socket) def shutdown(self): self.pool.kill()
鎖和信號量
信號量是一個允許greenlet相互合作,限制並發訪問或運行的低層次的同步原語。 信號量有兩個方法,acquire和release。在信號量是否已經被 acquire或release,和擁有資源的數量之間不同,被稱為此信號量的范圍 (the bound of the semaphore)。如果一個信號量的范圍已經降低到0,它會 阻塞acquire操作直到另一個已經獲得信號量的greenlet作出釋放。
from gevent import sleep from gevent.pool import Pool from gevent.coros import BoundedSemaphore sem = BoundedSemaphore(2) def worker1(n): sem.acquire() print('Worker %i acquired semaphore' % n) sleep(0) sem.release() print('Worker %i released semaphore' % n) def worker2(n): with sem: print('Worker %i acquired semaphore' % n) sleep(0) print('Worker %i released semaphore' % n) pool = Pool() pool.map(worker1, xrange(0,2))
執行結果:
Worker 0 acquired semaphore
Worker 1 acquired semaphore
Worker 0 released semaphore
Worker 1 released semaphore
鎖(lock)是范圍為1的信號量。它向單個greenlet提供了互斥訪問。 信號量和鎖常被用來保證資源只在程序上下文被單次使用。
線程局部變量
Gevent允許程序員指定局部於greenlet上下文的數據。 在內部,它被實現為以greenlet的getcurrent()為鍵, 在一個私有命名空間尋址的全局查找。
import gevent from gevent.local import local stash = local() def f1(): stash.x = 1 print(stash.x) def f2(): stash.y = 2 print(stash.y) try: stash.x except AttributeError: print("x is not local to f2") g1 = gevent.spawn(f1) g2 = gevent.spawn(f2) gevent.joinall([g1, g2])
執行結果:
1 2 x is not local to f2
很多集成了gevent的web框架將HTTP會話對象以線程局部變量的方式存儲在gevent內。 例如使用Werkzeug實用庫和它的proxy對象,我們可以創建Flask風格的請求對象。
from gevent.local import local from werkzeug.local import LocalProxy from werkzeug.wrappers import Request from contextlib import contextmanager from gevent.wsgi import WSGIServer _requests = local() request = LocalProxy(lambda: _requests.request) @contextmanager def sessionmanager(environ): _requests.request = Request(environ) yield _requests.request = None def logic(): return "Hello " + request.remote_addr def application(environ, start_response): status = '200 OK' with sessionmanager(environ): body = logic() headers = [ ('Content-Type', 'text/html') ] start_response(status, headers) return [body] WSGIServer(('', 8000), application).serve_forever()
子進程
從gevent 1.0起,支持gevent.subprocess,支持協作式的等待子進程。
import gevent from gevent.subprocess import Popen, PIPE def cron(): while True: print("cron") gevent.sleep(0.2) g = gevent.spawn(cron) sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True) out, err = sub.communicate() g.kill() print(out.rstrip())
執行結果:
cron
cron
cron
cron
cron
Linux
很多人也想將gevent和multiprocessing一起使用。最明顯的挑戰之一 就是multiprocessing提供的進程間通信默認不是協作式的。由於基於 multiprocessing.Connection的對象(例如Pipe)暴露了它們下面的 文件描述符(file descriptor),gevent.socket.wait_read和wait_write 可以用來在直接讀寫之前協作式的等待ready-to-read/ready-to-write事件。
import gevent from multiprocessing import Process, Pipe from gevent.socket import wait_read, wait_write # To Process a, b = Pipe() # From Process c, d = Pipe() def relay(): for i in xrange(5): msg = b.recv() c.send(msg + " in " + str(i)) def put_msg(): for i in xrange(5): wait_write(a.fileno()) a.send('hi') def get_msg(): for i in xrange(5): wait_read(d.fileno()) print(d.recv()) if __name__ == '__main__': proc = Process(target=relay) proc.start() g1 = gevent.spawn(get_msg) g2 = gevent.spawn(put_msg) gevent.joinall([g1, g2], timeout=1)
執行結果:
hi in 0
hi in 1
hi in 2
hi in 3
hi in 4
然而要注意,組合multiprocessing和gevent必定帶來 依賴於操作系統(os-dependent)的缺陷,其中有:
在兼容POSIX的系統創建子進程(forking)之后, 在子進程的gevent的狀態是不適定的(ill-posed)。一個副作用就是, multiprocessing.Process創建之前的greenlet創建動作,會在父進程和子進程兩方都運行。
上例的put_msg()中的a.send()可能依然非協作式地阻塞調用的線程:一個 ready-to-write事件只保證寫了一個byte。在嘗試寫完成之前底下的buffer可能是滿的。
上面表示的基於wait_write()/wait_read()的方法在Windows上不工作 (IOError: 3 is not a socket (files are not supported)),因為Windows不能監視 pipe事件。
Python包gipc以大體上透明的方式在 兼容POSIX系統和Windows上克服了這些挑戰。它提供了gevent感知的基於 multiprocessing.Process的子進程和gevent基於pipe的協作式進程間通信。
Actors
actor模型是一個由於Erlang變得普及的更高層的並發模型。 簡單的說它的主要思想就是許多個獨立的Actor,每個Actor有一個可以從 其它Actor接收消息的收件箱。Actor內部的主循環遍歷它收到的消息,並根據它期望的行為來采取行動。
Gevent沒有原生的Actor類型,但在一個子類化的Greenlet內使用隊列, 我們可以定義一個非常簡單的。
import gevent from gevent.queue import Queue class Actor(gevent.Greenlet): def __init__(self): self.inbox = Queue() Greenlet.__init__(self) def receive(self, message): """ Define in your subclass. """ raise NotImplemented() def _run(self): self.running = True while self.running: message = self.inbox.get() self.receive(message)
下面是一個使用的例子:
import gevent from gevent.queue import Queue from gevent import Greenlet class Pinger(Actor): def receive(self, message): print(message) pong.inbox.put('ping') gevent.sleep(0) class Ponger(Actor): def receive(self, message): print(message) ping.inbox.put('pong') gevent.sleep(0) ping = Pinger() pong = Ponger() ping.start() pong.start() ping.inbox.put('start') gevent.joinall([ping, pong])
實際應用
- Gevent ZeroMQ
- 簡單server
- WSGI Servers
- 流式server
- Long Polling
- Websockets
簡單server
# On Unix: Access with ``$ nc 127.0.0.1 5000`` # On Window: Access with ``$ telnet 127.0.0.1 5000`` from gevent.server import StreamServer def handle(socket, address): socket.send("Hello from a telnet!\n") for i in range(5): socket.send(str(i) + '\n') socket.close() server = StreamServer(('127.0.0.1', 5000), handle) server.serve_forever()
WSGI Servers And Websockets
Gevent為HTTP內容服務提供了兩種WSGI server。從今以后就稱為 wsgi和pywsgi:
- gevent.wsgi.WSGIServer
- gevent.pywsgi.WSGIServer
glb中使用
import click from flask import Flask from gevent.pywsgi import WSGIServer from geventwebsocket.handler import WebSocketHandler import v1 from .settings import Config from .sockethandler import handle_websocket def create_app(config=None): app = Flask(__name__, static_folder='static') if config: app.config.update(config) else: app.config.from_object(Config) app.register_blueprint( v1.bp, url_prefix='/v1') return app def wsgi_app(environ, start_response): path = environ['PATH_INFO'] if path == '/websocket': handle_websocket(environ['wsgi.websocket']) else: return create_app()(environ, start_response) @click.command() @click.option('-h', '--host_port', type=(unicode, int), default=('0.0.0.0', 5000), help='Host and port of server.') @click.option('-r', '--redis', type=(unicode, int, int), default=('127.0.0.1', 6379, 0), help='Redis url of server.') @click.option('-p', '--port_range', type=(int, int), default=(50000, 61000), help='Port range to be assigned.') def manage(host_port, redis=None, port_range=None): Config.REDIS_URL = 'redis://%s:%s/%s' % redis Config.PORT_RANGE = port_range http_server = WSGIServer(host_port, wsgi_app, handler_class=WebSocketHandler) print '----GLB Server run at %s:%s-----' % host_port print '----Redis Server run at %s:%s:%s-----' % redis http_server.serve_forever()
缺陷
和其他異步I/O框架一樣,gevent也有一些缺陷:
- 阻塞(真正的阻塞,在內核級別)在程序中的某個地方停止了所有的東西.這很像C代碼中monkey patch沒有生效
- 保持CPU處於繁忙狀態.greenlet不是搶占式的,這可能導致其他greenlet不會被調度.
- 在greenlet之間存在死鎖的可能.
一個gevent回避的缺陷是,你幾乎不會碰到一個和異步無關的Python庫--它將阻塞你的應用程序,因為純Python庫使用的是monkey patch的stdlib.