背景
進程是操作系統分配資源的最小單位,每個進程獨享4G的內存地址空間,因此進程內數據是安全的,檢查間的通信需要使用特定的方法。同理,正是因為進程是數據安全的,所以導致進程的切換是一個很麻煩效率不高的操作。為了解決進程切換帶來的問題,線程這個名詞出現了,一個進程可以包含多個線程,一個進程下的所有線程共享所有的數據,數據可以直接訪問,協程的切換比進程的切換更快。進程和線程的切換是有操作系統控制,不是應用程序自己控制,是被動的。為了開發出應用程序自己可控制,百萬級任務切換的方案,協程這個概念被提出了。
協程是一個“輕”量級的任務,協程的切換是單純的應用程序內的代碼變動,不涉及操作系統的動作。一個線程可以包含多個協程,協程的並發可以極大的提升代碼的運行效率,而且協程的並發不用考慮互斥鎖的問題,共享全局數據更簡單更安全。
線程適用的場景:
-
IO密集型操作的系統,協程遇見IO操作自動切換
-
百萬級任務的切換
python和協程
python因為GIL鎖的原因,使得python環境下的多線程達不到理想的效果,因為GIL鎖保證了同一時間下只有一個線程運行。為了實現python環境下多任務的並發,一般是多進程+多協程。python從3.0版本開始開發自帶的協程庫asyncio,到3.7版本已經發展成熟到了一定程度,完全可以用於線上運行。在asyncio庫出來以前,第三方庫Gevent是一個非常優秀的協程庫,一直到現在還在被廣泛使用。
Gevent庫
例子:Gevent實現socket服務端和客戶端之間的通信,以及使用queue進行協程見的消息通信。
from gevent import monkey; monkey.patch_all() import gevent import queue import time from socket import * q = queue.Queue() def func1(): while True: msg = q.get() print("func1 : %s" % (msg, )) def func2(): s = socket(AF_INET, SOCK_STREAM) s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) s.bind(("127.0.0.1", 5555)) s.listen(5) while True: conn, addr = s.accept() while True: res = conn.recv(1024) q.put(res) conn.send("ok".encode('utf-8')) gevent.joinall([ gevent.spawn(func1), gevent.spawn(func2) ])
from socket import * client = socket(AF_INET, SOCK_STREAM) client.connect(('127.0.0.1', 5555)) while True: msg = input('>>: ').strip() client.send(msg.encode('utf-8')) msg = client.recv(1024) print(msg)
注意:
- Gevent實現遇見IO自動切換的功能,需要調用如下代碼from gevent import monkey; monkey.patch_all()實現,且處於該代碼下面的代碼才能自動切換,處於代碼前面的代碼不能自動切換
- 使用queue時,如果協程的等待IO全部是在從queue獲取數據,那么Gevent會報異常“gevent.exceptions.LoopExit: This operation would block forever”,如下例子。解決的辦法是不讓Gevent全部等待在queue的get操作即可,如上面的socket例子。
from gevent import monkey; monkey.patch_all() import gevent import queue import time from socket import * q = queue.Queue() def func1(): while True: msg = q.get() print("func1 : %s" % (msg, )) def func2(): q.put("abc".encode('utf-8')) time.sleep(1) q.put("def".encode('utf-8')) gevent.joinall([ gevent.spawn(func1), gevent.spawn(func2) ])
asyncio庫(python3.7以上)
import asyncio async def foo(): print('----start foo') await asyncio.sleep(1) print('----end foo') async def bar(): print('****start bar') await asyncio.sleep(2) print('****end bar') async def main(): # res = await asyncio.gather(foo(), bar()) # 所有的協程並發執行 # 等價 task1 = asyncio.create_task(foo()) task2 = asyncio.create_task(bar()) await task1 await task2 if __name__ == '__main__': asyncio.run(main()) # 結果 ----start foo ****start bar ----end foo ****end bar
將上述socker例子,使用asyncio開發,如下所示。
import asyncio
async def myprint(): while True: msg = await q.get() print("myprint: %s" % msg) q.task_done() async def handle(reader, writer): while True: data = await reader.read(100) message = data.decode() addr = writer.get_extra_info('peername') await q.put(f"Received {message!r} from {addr!r}") writer.write("ok".encode('UTF-8')) await writer.drain() async def myserver(): server = await asyncio.start_server(handle, '127.0.0.1', 8888) async with server: await server.serve_forever() async def main():
global q
q = asyncio.Queue() await asyncio.gather(myserver(), myprint()) asyncio.run(main())
import asyncio async def tcp_echo_client(message): reader, writer = await asyncio.open_connection('127.0.0.1', 8888) while True: msg = input(">>") writer.write(msg.encode('UTF-8')) data = await reader.read(100) print(f'Received: {data.decode()!r}') asyncio.run(tcp_echo_client('Hello World!'))