協程 (Coroutine)
什么是協程
協程(微線程)是比線程更輕量化的存在,像一個進程可以擁有多個線程一樣,一個線程也可以擁有多個協程
最重要的是,協程不是被操作系統內核所管理,而完全是由程序所控制
如何判斷
- 必須在只有一個單線程里實現並發
- 修改共享數據不需加鎖
- 用戶程序里自己保存多個控制流的上下文棧
- 一個協程遇到 IO 操作自動切換到其它協程
協程的好處:
- 無需線程上下文切換的開銷
- 無需原子操作鎖定及同步的開銷
"原子操作(atomic operation)是不需要synchronized",所謂原子操作是指不會被線程調度機制打斷的操作;這種操作一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另一個線程)。原子操作可以是一個步驟,也可以是多個操作步驟,但是其順序是不可以被打亂,或者切割掉只執行部分。視作整體是原子性的核心。 - 方便切換控制流,簡化編程模型
- 高並發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。所以很適合用於高並發處理。
缺點:
- 無法利用多核資源:協程的本質是個單線程,它不能同時將單個 CPU 的多個核用上,協程需要和進程配合才能運行在多 CPU 上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是 CPU 集型應用。
- 進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程序
greenlet
greenlet 通過 greenlet(func)
啟動一個協程,通過 switch()
手動切換協程
示例:
from greenlet import greenlet
def func1():
print('from func1: 1')
greenlet.switch(gr2)
print('from func1: 2')
greenlet.switch(gr2)
def func2():
print('from func2: 1')
greenlet.switch(gr1)
print('from func2: 2')
gr1 = greenlet(func1)
gr2 = greenlet(func2)
greenlet.switch(gr1)
輸出結果:
from func1: 1
from func2: 1
from func1: 2
from func2: 2
gevent
gevent 封裝了 greenlet,並實現了遇到 IO 自動切換
通過 gevent.spawn(func)
創建一個要執行 func 的 gevent 類,用 gevent.joinall()
等待執行完成
注意: gevent.sleep()
是用於模仿 IO 操作的,實際使用中不需要 gevent.sleep()
示例:
import gevent
def func1():
print('from func1: 1')
gevent.sleep(0)
print('from func1: 2')
gevent.sleep(1)
def func2():
print('from func2: 1')
gevent.sleep(2)
print('from func2: 2')
def func3():
print('from func3: 1')
gevent.sleep(1)
print('from func3: 2')
gevent.joinall([
gevent.spawn(func1),
gevent.spawn(func2),
gevent.spawn(func3),
])
輸出結果:
from func1: 1
from func2: 1
from func3: 1
from func1: 2
from func3: 2
from func2: 2
通過運行結果可以看出:每次 sleep 都會自動切換
實際使用示例
注意: 如果不使用 monkey.patch_all()
就無法自動識別 IO 操作,無法自動切換,變成同步執行
import gevent
import time
from gevent import monkey
from urllib import request
monkey.patch_all() # 把當前程序的所有 IO 操作標記起來,否則模塊無法知道 IO 操作
def func(url):
print('GET:', url)
resp = request.urlopen(url)
data = resp.read()
print('%i bytes received from %s' % (len(data), url))
urls = [
'http://www.python.org/',
'http://github.com/',
'http://cnblogs.com/dbf-/',
]
time_start = time.time()
for item in urls:
func(item)
print('同步耗時:', time.time() - time_start)
async_time_start = time.time()
gevent.joinall([
gevent.spawn(func, 'http://www.python.org/'),
gevent.spawn(func, 'http://www.github.com/'),
gevent.spawn(func, 'http://cnblogs.com/dbf-/'),
])
print('異步耗時:', time.time() - async_time_start)
通過結果可以看出異步明顯更快
socket 並發連接
server:
import gevent
from gevent import socket, monkey
monkey.patch_all()
def server(port):
s = socket.socket()
s.bind(('0.0.0.0', port))
s.listen(500)
while True:
cli, addr = s.accept()
gevent.spawn(handle_request, cli)
def handle_request(conn):
try:
while True:
data = conn.recv(1024)
print("recv:", data)
conn.send(data)
if not data:
conn.shutdown(socket.SHUT_WR)
except Exception as ex:
print(ex)
finally:
conn.close()
if __name__ == '__main__':
server(8001)
client:
import socket
HOST = 'localhost'
PORT = 8001
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
msg = bytes(input(">>:"), encoding="utf8")
s.sendall(msg)
data = s.recv(1024)
print('Received', repr(data))
# s.close()