Python協程之Gevent模塊


背景

進程是操作系統分配資源的最小單位,每個進程獨享4G的內存地址空間,因此進程內數據是安全的,檢查間的通信需要使用特定的方法。同理,正是因為進程是數據安全的,所以導致進程的切換是一個很麻煩效率不高的操作。為了解決進程切換帶來的問題,線程這個名詞出現了,一個進程可以包含多個線程,一個進程下的所有線程共享所有的數據,數據可以直接訪問,協程的切換比進程的切換更快。進程和線程的切換是有操作系統控制,不是應用程序自己控制,是被動的。為了開發出應用程序自己可控制,百萬級任務切換的方案,協程這個概念被提出了。

協程是一個“輕”量級的任務,協程的切換是單純的應用程序內的代碼變動,不涉及操作系統的動作。一個線程可以包含多個協程,協程的並發可以極大的提升代碼的運行效率,而且協程的並發不用考慮互斥鎖的問題,共享全局數據更簡單更安全。

線程適用的場景:

  • IO密集型操作的系統,協程遇見IO操作自動切換

  • 百萬級任務的切換

python和協程

python因為GIL鎖的原因,使得python環境下的多線程達不到理想的效果,因為GIL鎖保證了同一時間下只有一個線程運行。為了實現python環境下多任務的並發,一般是多進程+多協程。python從3.0版本開始開發自帶的協程庫asyncio,到3.7版本已經發展成熟到了一定程度,完全可以用於線上運行。在asyncio庫出來以前,第三方庫Gevent是一個非常優秀的協程庫,一直到現在還在被廣泛使用。

Gevent庫

例子:Gevent實現socket服務端和客戶端之間的通信,以及使用queue進行協程見的消息通信。

from gevent import monkey; monkey.patch_all()
import gevent
import queue
import time
from socket import *

q = queue.Queue()

def func1():
    while True:
        msg = q.get()
        print("func1 : %s" % (msg, ))

def func2():
    s = socket(AF_INET, SOCK_STREAM)
    s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    s.bind(("127.0.0.1", 5555))
    s.listen(5)
    while True:
        conn, addr = s.accept()
        while True:
            res = conn.recv(1024)
            q.put(res)
            conn.send("ok".encode('utf-8'))

gevent.joinall([
    gevent.spawn(func1),
    gevent.spawn(func2)
])
from socket import *

client = socket(AF_INET, SOCK_STREAM)
client.connect(('127.0.0.1', 5555))

while True:
    msg = input('>>: ').strip()
    client.send(msg.encode('utf-8'))
    msg = client.recv(1024)
    print(msg)

注意:

  • Gevent實現遇見IO自動切換的功能,需要調用如下代碼from gevent import monkey; monkey.patch_all()實現,且處於該代碼下面的代碼才能自動切換,處於代碼前面的代碼不能自動切換
  • 使用queue時,如果協程的等待IO全部是在從queue獲取數據,那么Gevent會報異常“gevent.exceptions.LoopExit: This operation would block forever”,如下例子。解決的辦法是不讓Gevent全部等待在queue的get操作即可,如上面的socket例子。
from gevent import monkey; monkey.patch_all()
import gevent
import queue
import time
from socket import *

q = queue.Queue()

def func1():
    while True:
        msg = q.get()
        print("func1 : %s" % (msg, ))

def func2():
    q.put("abc".encode('utf-8'))
    time.sleep(1)
    q.put("def".encode('utf-8'))

gevent.joinall([
    gevent.spawn(func1),
    gevent.spawn(func2)
])

asyncio庫(python3.7以上)

import asyncio

async def foo():
    print('----start foo')
    await asyncio.sleep(1)
    print('----end foo')

async def bar():
    print('****start bar')
    await asyncio.sleep(2)
    print('****end bar')

async def main():
    # res = await asyncio.gather(foo(), bar()) # 所有的協程並發執行
    # 等價
    task1 = asyncio.create_task(foo())
    task2 = asyncio.create_task(bar())
    await task1
    await task2

if __name__ == '__main__':
    asyncio.run(main())
  
# 結果  
----start foo
****start bar
----end foo
****end bar

將上述socker例子,使用asyncio開發,如下所示。

import asyncio

async def myprint(): while True: msg = await q.get() print("myprint: %s" % msg) q.task_done() async def handle(reader, writer): while True: data = await reader.read(100) message = data.decode() addr = writer.get_extra_info('peername') await q.put(f"Received {message!r} from {addr!r}") writer.write("ok".encode('UTF-8')) await writer.drain() async def myserver(): server = await asyncio.start_server(handle, '127.0.0.1', 8888) async with server: await server.serve_forever() async def main():
global q
q = asyncio.Queue()
await asyncio.gather(myserver(), myprint()) asyncio.run(main())
import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection('127.0.0.1', 8888)

    while True:
        msg = input(">>")
        writer.write(msg.encode('UTF-8'))

        data = await reader.read(100)
        print(f'Received: {data.decode()!r}')


asyncio.run(tcp_echo_client('Hello World!'))

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM