進擊のpython
並發編程——協程理論
本節的主題的重點是基於單線程來實現並發,即只用一個主線程的我情況下實現並發
所以在說這節知識點之前,我們需要先回顧一下並發的本質:切換+保存狀態
那可以肯定一點的就是CPU正在運行一個任務的時候,會在兩種情況切走去執行其他的任務
但是這種切換機制,不是我們控制的,而是操作系統強制控制的
這兩種情況是:1.發生了阻塞 2.該任務計算時間過長,或者來了優先級更高的程序
而很明顯第二種情況並不能提升效率!只是為了讓CPU能夠雨露均沾,看起來是同時執行的假象
所以才會說並發是假的並行,因為他是看起來是同時,但是實際情況下並不是
如果在計算的時候,這種切換其實是降低效率的,我們可以驗證一下,模擬切換機制yield
在串行的前提:
from time import time
def a():
n = 0
for i in range(10000):
n += 1
return n
def b(res):
pass
start_time = time()
res = a()
b(res)
stop_time = time()
print(stop_time - start_time) # 0.0025005340576171875
在yeild切換的狀態下:
from time import time
def a():
g = b()
next(g)
n = 0
for i in range(10000):
n += 1
g.send(n)
return n
def b(res=None):
while True:
res = yield
pass
start_time = time()
a()
stop_time = time()
print(stop_time - start_time) # 0.006000518798828125
可以看到,來回切換的這種,速度確實慢!即使差得很不多,但是也三倍之多!
第一種情況的切換:在任務一遇到I/O的情況下,切換到任務二去執行
這樣就可以利用任務一阻塞的時間完成任務二
第二種情況就不會執行I/O切換
而在單線程的情況下我們是不可避免的會遇到I/O阻塞的
但是如果我們能在自己的程序中控制單線程下的多個任務能夠在一個任務遇到I/O阻塞時
就切換到另一個任務去計算,這樣就能夠保證這個線程最大程度的處於就緒態
這樣就能“迷惑”操作系統,以為沒有遇到I/O,讓其感覺好像線程一直在工作,這樣就可以一直“霸占”CPU
協程
協程記住一句話:他是可控的線程!
python的線程屬於內核級別的,即由操作系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其他線程運行)
單線程內開啟協程,一旦遇到io,就會從應用程序級別(而非操作系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)
對比線程,協程開銷更小,更加的輕量級,而且可以在單線程里就達到並發的效果
但是缺點也就很明顯了:無法利用多核,而且攜程阻塞,也就阻塞了整個線程
那我們就簡單的介紹一下可以實現協程的模塊:
greenlet
其實上面的yeild就實現了協程,只是這么寫,太麻煩了
於是就提供了一個模塊來處理這種情況
from greenlet import greenlet
def eat(name):
print('%s eat 1' % name)
g2.switch('jevious')
print('%s eat 2' % name)
g2.switch()
def play(name):
print('%s play 1' % name)
g1.switch()
print('%s play 2' % name)
g1 = greenlet(eat)
g2 = greenlet(play)
g1.switch('ponny')
只需要第一次調用的時候傳參
但是這只是優化了yield寫法,本質上還是沒有解決遇到阻塞就切換的問題
所以接下來這個方法就出現了
gevent
這也是一個第三方庫,他是C擴展模塊形式接入Python的輕量級協程
import gevent
def eat(name):
print('%s eat 1' % name)
gevent.sleep(1)
print('%s eat 2' % name)
def play(name):
print('%s play 1' % name)
gevent.sleep(1)
print('%s play 2' % name)
g1 = gevent.spawn(eat, 'ponny')
g2 = gevent.spawn(play, 'ponny')
g1.join()
g2.join()
這種程度的模塊不會很細的講解
這種就很好的解決了遇到I/O主動切換的問題(否則上面的程序執行時間就應該是2s+)
但是有個弊端,就是他只認識自己造出來的阻塞gevent.sleep(1)
但是這樣不行,系統產生的I/O可不是模塊造出來的阻塞,也就意味着這樣的阻塞不會被處理!
所以,gevent又內嵌一個方法:猴子(monkey)
from gevent import monkey;monkey.patch_all()
簡稱:打補丁,他就讓所有的阻塞都可以被識別
那既然想讓所有的阻塞都被識別,很明顯這個語句就應該放在最前面才對!
代碼優化如下:
from gevent import monkey;monkey.patch_all()
import gevent
import time
def eat(name):
print('%s eat 1' % name)
time.sleep(3)
print('%s eat 2' % name)
def play(name):
print('%s play 1' % name)
time.sleep(4)
print('%s play 2' % name)
start_time = time.time()
g1 = gevent.spawn(eat, 'ponny')
g2 = gevent.spawn(play, 'ponny')
g1.join()
g2.join()
print(f'執行時間為:{time.time()-start_time}') # 執行時間為:4.0118021965026855
這是在單線程實現了並發的效果!
ps:如果兩個join寫着麻煩,也可以gevent.joinall([g1, g2])
socket通信
我確定,這是最后一個版本了~哈哈
不斷的優化,不斷地修改,也終於該有個結束了
最后就用協程的方式來寫socket通信:
# 服務端
from gevent import monkey;monkey.patch_all()
from socket import *
import gevent
def server(server_ip, port):
s = socket()
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) # 前面提過,用於解決端口復用
s.bind((server_ip, port))
s.listen()
while True:
conn, addr = s.accept()
gevent.spawn(talk, conn, addr)
def talk(conn, addr):
try:
while True:
res = conn.recv(1024)
print('client %s:%s msg: %s' % (addr[0], addr[1], res))
conn.send(res.upper())
except Exception as e:
print(e)
finally:
conn.close()
if __name__ == '__main__':
server('127.0.0.1', 8080)
# 服務端
from threading import Thread
from socket import *
import threading
def client(server_ip, port):
c = socket()
c.connect((server_ip, port))
count = 0
while True:
c.send(('%s say hello %s' % (threading.current_thread().getName(), count)).encode('utf-8'))
msg = c.recv(1024)
print(msg.decode('utf-8'))
count += 1
if __name__ == '__main__':
for i in range(500):
t = Thread(target=client, args=('127.0.0.1', 8080))
t.start()
協程的也寫完了,至此,socket通信就結束了