Python 協程 (Coroutine)


協程 (Coroutine)

什么是協程

協程(微線程)是比線程更輕量化的存在,像一個進程可以擁有多個線程一樣,一個線程也可以擁有多個協程
最重要的是,協程不是被操作系統內核所管理,而完全是由程序所控制

如何判斷

  • 必須在只有一個單線程里實現並發
  • 修改共享數據不需加鎖
  • 用戶程序里自己保存多個控制流的上下文棧
  • 一個協程遇到 IO 操作自動切換到其它協程

協程的好處:

  1. 無需線程上下文切換的開銷
  2. 無需原子操作鎖定及同步的開銷
    "原子操作(atomic operation)是不需要synchronized",所謂原子操作是指不會被線程調度機制打斷的操作;這種操作一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另一個線程)。原子操作可以是一個步驟,也可以是多個操作步驟,但是其順序是不可以被打亂,或者切割掉只執行部分。視作整體是原子性的核心。
  3. 方便切換控制流,簡化編程模型
  4. 高並發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。所以很適合用於高並發處理。

缺點:

  1. 無法利用多核資源:協程的本質是個單線程,它不能同時將單個 CPU 的多個核用上,協程需要和進程配合才能運行在多 CPU 上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是 CPU 集型應用。
  2. 進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程序

greenlet

greenlet 通過 greenlet(func) 啟動一個協程,通過 switch() 手動切換協程
示例:

from greenlet import greenlet


def func1():
    print('from func1: 1')
    greenlet.switch(gr2)
    print('from func1: 2')
    greenlet.switch(gr2)


def func2():
    print('from func2: 1')
    greenlet.switch(gr1)
    print('from func2: 2')


gr1 = greenlet(func1)
gr2 = greenlet(func2)
greenlet.switch(gr1)

輸出結果:
from func1: 1
from func2: 1
from func1: 2
from func2: 2

gevent

gevent 封裝了 greenlet,並實現了遇到 IO 自動切換
通過 gevent.spawn(func) 創建一個要執行 func 的 gevent 類,用 gevent.joinall() 等待執行完成
注意: gevent.sleep() 是用於模仿 IO 操作的,實際使用中不需要 gevent.sleep()

示例:

import gevent


def func1():
    print('from func1: 1')
    gevent.sleep(0)
    print('from func1: 2')
    gevent.sleep(1)


def func2():
    print('from func2: 1')
    gevent.sleep(2)
    print('from func2: 2')


def func3():
    print('from func3: 1')
    gevent.sleep(1)
    print('from func3: 2')


gevent.joinall([
    gevent.spawn(func1),
    gevent.spawn(func2),
    gevent.spawn(func3),
])

輸出結果:
from func1: 1
from func2: 1
from func3: 1
from func1: 2
from func3: 2
from func2: 2

通過運行結果可以看出:每次 sleep 都會自動切換

實際使用示例

注意: 如果不使用 monkey.patch_all() 就無法自動識別 IO 操作,無法自動切換,變成同步執行

import gevent
import time
from gevent import monkey
from urllib import request

monkey.patch_all()  # 把當前程序的所有 IO 操作標記起來,否則模塊無法知道 IO 操作


def func(url):
    print('GET:', url)
    resp = request.urlopen(url)
    data = resp.read()
    print('%i bytes received from %s' % (len(data), url))


urls = [
    'http://www.python.org/',
    'http://github.com/',
    'http://cnblogs.com/dbf-/',
]
time_start = time.time()
for item in urls:
    func(item)
print('同步耗時:', time.time() - time_start)
async_time_start = time.time()
gevent.joinall([
    gevent.spawn(func, 'http://www.python.org/'),
    gevent.spawn(func, 'http://www.github.com/'),
    gevent.spawn(func, 'http://cnblogs.com/dbf-/'),
])
print('異步耗時:', time.time() - async_time_start)

通過結果可以看出異步明顯更快

socket 並發連接

server:

import gevent
from gevent import socket, monkey

monkey.patch_all()


def server(port):
    s = socket.socket()
    s.bind(('0.0.0.0', port))
    s.listen(500)
    while True:
        cli, addr = s.accept()
        gevent.spawn(handle_request, cli)


def handle_request(conn):
    try:
        while True:
            data = conn.recv(1024)
            print("recv:", data)
            conn.send(data)
            if not data:
                conn.shutdown(socket.SHUT_WR)
    except Exception as ex:
        print(ex)
    finally:
        conn.close()


if __name__ == '__main__':
    server(8001)

client:

import socket

HOST = 'localhost'
PORT = 8001
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
    msg = bytes(input(">>:"), encoding="utf8")
    s.sendall(msg)
    data = s.recv(1024)
    print('Received', repr(data))
# s.close()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM