進程池線程池的使用*****
無論是開線程還是開進程都會消耗資源,即使開線程消耗的資遠比開進程的少
而物理設備的性能是有限的,雖然可以加設備來提升上限,但如果像淘寶雙十一那樣,只有很少的時刻需要大量的資源,為了滿足這個去買一大堆服務器顯然是不划算的
(計算機中)池的目的:在保證計算機硬件安全的情況下最大限度的利用計算機硬件,池其實是降低了程序的運行效率,但是保證了計算機硬件的安全(硬件的發展跟不上軟件的速度)
進程池線程池的目的:為了限制開設的進程數和線程數,從而保證計算機硬件的安全
進程池/線程池的創建和提交回調
import random
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def task(i):
time.sleep(random.random())
print(f"{i} is over...")
return f"{i}² = {i * i}"
if __name__ == '__main__': # 進程池的時候一定要放在這里面
'''不放報錯 concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.'''
# -------------------------------------------------
# 1.實例化進程池/線程池對象,並限制進程池/線程池中進程/線程數量
# -------------------------------------------------
# pool = ThreadPoolExecutor(3, 'MyThread-') # 不指定參數的情況下,默認是當前 CPU個數*5 , 也可以指定線程個數
pool = ProcessPoolExecutor(3) # 不指定參數的情況下,默認是當前 CPU個數 , 也可以指定進程個數(創進程不能傳第二個參數)
# for i in range(5):
# # -------------------------------------------------
# # 2.線程池對象.submit() 異步提交任務
# # 提交任務的兩種方式
# # 同步:提交完任務之后,在原地等待任務的返回結果,再繼續執行下一步代碼
# # 異步:提交任務之后,不等待任務的返回結果(這個結果怎么拿?),直接進行下一步操作
# # -------------------------------------------------
# pool.submit(task, i)
# print("主")
#
# # 0 is running...
# # 1 is running...
# # 2 is running...
# # 主
# # 1 is over...
# # 3 is running...
# # 0 is over...
# # 4 is running...
# # 4 is over...
# # 3 is over...
# # 2 is over...
# for i in range(5):
# future = pool.submit(task, i)
# # print(future) # <Future at 0x21a130dbb00 state=running> <Future at 0x21a1321ec50 state=pending>
# # -------------------------------------------------
# # future = pool.submit(task, i)
# # future.result() 接收返回值並獲取回調值
# # -------------------------------------------------
# print(future.result())
# print("主")
# # 0 is running...
# # 0 is over...
# # 0² = 0
# # 1 is running...
# # 1 is over...
# # 1² = 1
# # 2 is running...
# # 2 is over...
# # 2² = 4
# # 3 is running...
# # 3 is over...
# # 3² = 9
# # 4 is running...
# # 4 is over...
# # 4² = 16
# # 主
# future_list = []
# for i in range(5):
# future = pool.submit(task, i)
# future_list.append(future)
#
# for future in future_list:
# print(f">>:{future.result()}") # 依次等每個 future的結果,所以是絕對有序的
# print("主")
# # 0 is running...
# # 1 is running...
# # 2 is running...
# # 0 is over...
# # 3 is running...
# # >>:0² = 0
# # 1 is over...
# # 4 is running...
# # >>:1² = 1
# # 4 is over...
# # 2 is over...
# # >>:2² = 4
# # 3 is over...
# # >>:3² = 9
# # >>:4² = 16
# # 主
future_list = []
for i in range(5):
future = pool.submit(task, i)
future_list.append(future)
pool.shutdown() # 關閉池子且等待池子中所有的任務運行完畢
for future in future_list:
print(f">>:{future.result()}") # 依次等每個 future的結果,所以是絕對有序的
print("主")
# 0 is running...
# 1 is running...
# 2 is running...
# 2 is over...
# 3 is running...
# 0 is over...
# 4 is running...
# 4 is over...
# 1 is over...
# 3 is over...
# >>:0² = 0
# >>:1² = 1
# >>:2² = 4
# >>:3² = 9
# >>:4² = 16
# 主
驗證復用池子里的線程或進程
池子中創建的進程或線程創建一次就不會再創建了,至始至終用的都是最初的那幾個,這樣的話就可以節省反復開辟進程或線程的資源了
不是動態創建動態銷毀的(如果是好幾百個,可想而知)
import random
import time
import os
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from threading import current_thread
def task(i):
time.sleep(random.random())
# print(f"{os.getpid()} {i} is over...")
print(f"{os.getpid()} {current_thread().name} {i} is over...")
return f"{i}² = {i * i}"
if __name__ == '__main__': # 進程池的時候一定要放在這里面
# pool = ProcessPoolExecutor(3)
pool = ThreadPoolExecutor(3, 'MyThreading')
future_list = []
for i in range(5):
future = pool.submit(task, i)
future_list.append(future)
pool.shutdown() # 關閉池子且等待池子中所有的任務運行完畢
for future in future_list:
print(f">>:{future.result()}") # 依次等每個 future的結果,所以是絕對有序的
print("主")
# 11000 0 is over... # 復用了進程號(即沒有去開辟新的內存空間)
# 8024 2 is over...
# 10100 1 is over...
# 11000 3 is over...
# 8024 4 is over...
# >>:0² = 0
# >>:1² = 1
# >>:2² = 4
# >>:3² = 9
# >>:4² = 16
# 主
# 使用線程池的打印結果
# 13024 MyThreading_1 1 is over... # 1.復用了線程
# 13024 MyThreading_1 3 is over... # 2.復用了線程
# 13024 MyThreading_2 2 is over...
# 13024 MyThreading_0 0 is over...
# 13024 MyThreading_1 4 is over...
# >>:0² = 0
# >>:1² = 1
# >>:2² = 4
# >>:3² = 9
# >>:4² = 16
# 主
異步回調機制
這(
.add_done_callback()
)其實是.submit() 返回結果對象的方法
異步回調機制:當異步提交的任務有返回結果之后,會自動觸發回調函數的執行
import random
import time
import os
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from threading import current_thread
def callback(future):
print(f"我拿到了回調結果:{future.result()}")
def task(i):
time.sleep(random.random())
# print(f"{os.getpid()} {i} is over...")
print(f"{os.getpid()} {current_thread().name} {i} is over...")
return f"{i}² = {i * i}"
if __name__ == '__main__': # 進程池的時候一定要放在這里面
# pool = ProcessPoolExecutor(3)
pool = ThreadPoolExecutor(3, 'MyThreading')
future_list = []
for i in range(5):
# -----------------------------------------------------
# .submit().add_done_callback() 自動調用回調函數
# 會自動將 .submit()的返回結果作為參數傳給.add_done_callback() 中傳入的函數去調用執行
# .add_done_callback() 其實是 .submit()返回對象自身的方法
# -----------------------------------------------------
future = pool.submit(task, i).add_done_callback(callback)
future_list.append(future)
pool.shutdown() # 關閉池子且等待池子中所有的任務運行完畢
print("主")
# 11348 MyThreading_0 0 is over...
# 我拿到了回調結果:0² = 0
# 11348 MyThreading_2 2 is over...
# 我拿到了回調結果:2² = 4
# 11348 MyThreading_0 3 is over...
# 我拿到了回調結果:3² = 9
# 11348 MyThreading_1 1 is over...
# 我拿到了回調結果:1² = 1
# 11348 MyThreading_2 4 is over...
# 我拿到了回調結果:4² = 16
# 主
通過閉包給回調函數添加額外參數(擴展)
# 省略導模塊等
# 線程池/進程池對象.submit() 會返回一個 future對象,該對象有.add_done_callback()方法(是一個對象綁定函數),參數是一個函數名(除了對象自身默認傳入,無法為該函數傳參)
# 這里利用閉包函數返回內部函數名的特點 直接調用這個閉包函數,達到傳參的效果,可為回調函數添加更多的擴展性
def outter(*args, **kwargs):
def callback(res):
# 可以拿到 *args, **kwargs 參數做一些事情
print(res.result())
return callback
pool_list = []
for i in range(15):
pool_list.append(pool.submit(task, i).add_done_callback(outter(1, 2, 3, a=1, c=3))) # 朝線程池中提交任務(異步)
協程***
后期項目支持高並發可能才會用到
概念回顧(協程這里再理一下)
進程:資源單位(車間)
線程:操作系統的最小執行單位(流水線)
協程:單線程下實現並發的效果(完全是技術人員編造出來的名詞)
並發:看起來像同時執行(多道技術核心:切換+保存狀態)
協程:通過代碼層面自己監測程序中的I/O行為,自己實現切換,讓操作系統誤認為這個線程沒有I/O,從而保證程序在運行態和就緒態來回切換(不進入阻塞態),更大限度地利用CPU,最大程度上提高線程的執行效率
切換+保存狀態就一定能夠提升效率嗎?
切換+保存狀態 不一定能提升程序的效率
- 當任務是計算密集型,反而會降低效率
- 如果是IO密集型,會提升效率
如何實現協程
生成器的yield 可以實現保存狀態(行不通)
但,效率更低了
# # 串行執行
# import time
#
#
# def func1():
# for i in range(10000000):
# i + 1
#
#
# def func2():
# for i in range(10000000):
# i + 1
#
#
# start = time.time()
# func1()
# func2()
# stop = time.time()
# print(stop - start)
# # 1.2481744289398193
# 基於yield並發執行
import time
def func1():
while True:
10000000 + 1
yield
def func2():
g = func1()
for i in range(10000000):
i + 1
next(g)
start = time.time()
func2()
stop = time.time()
print(stop - start)
# 1.9084477424621582
gevent模塊實現
模塊安裝下載
搜索並下載(這里是因為我配了兩個鏡像源,所以出來了兩個選項,隨便選一個)
gevent基本介紹
from gevent import spawn, monkey
monkey.patch_all() # 一般這個要寫在很前面(例如導socket模塊之前)
# 兩行亦可寫成一行 from gevent import monkey;monkey.patch_all()
g1 = spawn(eat, 1, 2, 3, x=4, y=5)
# 創建一個協程對象g1,spawn括號內第一個參數是函數名,如eat,后面是該函數(eat)所需要的參數
g2 = spawn(func2)
g1.join() # 等待協程g1結束
g2.join() # 等待協程g2結束
# 上述兩步亦可合作一步:joinall([g1,g2])
g1.value # 拿到eat函數執行的返回值
通過gevent實現遇到 IO自動切換狀態(單線程下並發)
import time
from gevent import spawn
# gevent 本身識別不了time.sleep() 等不屬於該模塊內的I/O操作
# 使用下面的操作來支持
from gevent import monkey
monkey.patch_all() # 監測代碼中所有 I/O 行為
def heng(name):
print(f"{name} 哼")
time.sleep(2)
print(f"{name} 哼 ...")
def ha(name):
print(f"{name} 哈")
time.sleep(3)
print(f"{name} 哈 ...")
# start_time = time.time()
# heng('egon')
# ha('jason')
# print(f"主 {time.time() - start_time}")
# # 主 5.005069732666016
start_time = time.time()
s1 = spawn(heng, 'egon')
s2 = spawn(ha, 'jason')
s1.join()
s2.join()
print(f"主 {time.time() - start_time}")
# 主 3.0046989917755127
在計算密集型任務中使用
from gevent import spawn, monkey
monkey.patch_all()
import time
def func1():
for i in range(10000000):
i + 1
def func2():
for i in range(10000000):
i + 1
start = time.time()
g = spawn(func1)
g2 = spawn(func2)
g.join()
g2.join()
stop = time.time()
print(stop - start)
# 1.1324069499969482
# 與前面普通的串行執行時間 1.2481744289398193 相近
利用gevent在單線程下實現並發(協程)
服務端
import socket
from gevent import spawn
from gevent import monkey # 讓 gevent 能夠識別python的 IO
monkey.patch_all()
server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen(5)
def talk(conn):
while True:
try:
data = conn.recv(1024)
if len(data) == 0: break
print(data.decode('utf-8'))
conn.send(data.upper())
except ConnectionResetError as e:
print(e)
break
conn.close()
def wait_client_connect():
while True:
conn, addr = server.accept()
spawn(talk, conn)
if __name__ == '__main__':
g1 = spawn(wait_client_connect)
g1.join() # 別忘了加上
客戶端
import socket
from threading import Thread, current_thread
def create_client():
client = socket.socket()
client.connect(('127.0.0.1', 8080))
n = 0
while True:
data = '%s %s' % (current_thread().name, n)
client.send(data.encode('utf-8'))
res = client.recv(1024)
print(res.decode('utf-8'))
n += 1
for i in range(400): # 手動開400個線程連接客戶端(測試的是服務端單線程實現並發)
t = Thread(target=create_client)
t.start()
最大程度下提高代碼的執行效率(實現高並發)
- 多進程下使用多線程
- 多線程下使用多協程
大前提
IO密集型任務
I/O 模型(只放了幾張圖)
此部分內容摘抄自博客: Python從入門到精通之IO模型
程序間數據交互,本質上數據都是從內存中取的(包括socket的recv等)
阻塞I/O模型
當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一個階段:准備數據。對於network io來說,很多時候數據在一開始還沒有到達(比如,還沒有收到一個完整的UDP包),這個時候kernel就要等待足夠的數據到來。
而在用戶進程這邊,整個進程會被阻塞。當kernel一直等到數據准備好了,它就會將數據從kernel中拷貝到用戶內存,然后kernel返回結果,用戶進程才解除block的狀態,重新運行起來。
非阻塞I/O模型
從圖中可以看出,當用戶進程發出read操作時,如果kernel中的數據還沒有准備好,那么它並不會block用戶進程,而是立刻返回一個error。從用戶進程角度講 ,它發起一個read操作后,並不需要等待,而是馬上就得到了一個結果。用戶進程判斷結果是一個error時,它就知道數據還沒有准備好,於是用戶就可以在本次到下次再發起read詢問的時間間隔內做其他事情,或者直接再次發送read操作。一旦kernel中的數據准備好了,並且又再次收到了用戶進程的system call,那么它馬上就將數據拷貝到了用戶內存(這一階段仍然是阻塞的),然后返回。
也就是說非阻塞的recvform系統調用調用之后,進程並沒有被阻塞,內核馬上返回給進程,如果數據還沒准備好,此時會返回一個error。進程在返回之后,可以干點別的事情,然后再發起recvform系統調用。重復上面的過程,循環往復的進行recvform系統調用。這個過程通常被稱之為輪詢。輪詢檢查內核數據,直到數據准備好,再拷貝數據到進程,進行數據處理。需要注意,拷貝數據整個過程,進程仍然是屬於阻塞的狀態。
多路復用I/O模型
當用戶進程調用了select,那么整個進程會被block,而同時,kernel會“監視”所有select負責的socket,當任何一個socket中的數據准備好了,select就會返回。這個時候用戶進程再調用read操作,將數據從kernel拷貝到用戶進程。
這個圖和blocking IO的圖其實並沒有太大的不同,事實上還更差一些。因為這里需要使用兩個系統調用(select和recvfrom),而blocking IO只調用了一個系統調用(recvfrom)。但是,用select的優勢在於它可以同時處理多個connection。
信號驅動I/O模型
涉及太少,暫不做了解
異步I/O模型
用戶進程發起read操作之后,立刻就可以開始去做其它的事。而另一方面,從kernel的角度,當它受到一個asynchronous read之后,首先它會立刻返回,所以不會對用戶進程產生任何block。然后,kernel會等待數據准備完成,然后將數據拷貝到用戶內存,當這一切都完成之后,kernel會給用戶進程發送一個signal,告訴它read操作完成了。