python並發-多進程
多進程能實現真正意義上的並發(並行),能利用多核優勢,適用計算密集型的程序
1 Process類
- 開啟子進程—函數
import time
from multiprocessing import Process
def p_func(name):
time.sleep(3)
print(name, ": ok")
if __name__ == '__main__':
p = Process(target=p_func, args=("lynn",))
p.start()
print("主進程")
注意:
Process中的關鍵字參數,target的值是方法名字,args是元組
start()方法,開啟子進程
子進程是異步的
- 開啟子進程—類
import time
from multiprocessing import Process
class PClass(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
time.sleep(2)
print("name: ", self.name)
if __name__ == '__main__':
p = PClass("lynn")
p.start()
print("主")
注意:
繼承Process,重寫__init__
方法,必須有super().__init__()
必須有run
方法,start()方法會自動調用run
方法
子進程是異步的
進程之間的內存是完全隔離的
- Process對象的join方法
import time
from multiprocessing import Process
class PClass(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
time.sleep(1)
print("name:", self.name)
if __name__ == '__main__':
p = PClass("lynn")
p.start()
print("主開")
p.join()
print("主")
注意:
join()方法會阻塞,上邊開啟的所有的子進程全部運行完成,才繼續往下運行,並不是所有的進程是串行,只是在這會阻塞
-
Process對象的其他方法或屬性
is_alive
p.is_alive() # 查看進程是否存活,True/False
p.pid # 進程的pid
2 特殊的進程
-
守護進程
守護進程會在主進程運行結束時,立馬停止運行
守護進程內無法開啟子進程,會報錯
from multiprocessing import Process
import time
class PClass(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
time.sleep(1)
print("name:", self.name)
if __name__ == '__main__':
p = PClass("lynn")
p1 = PClass("fancy")
p.daemon = True # 設置p為守護進程
p.start()
p1.start()
print("主") # 打印時,p進程停止運行
注意:
p.daemon = True
必須在p.start()
之前
- 其他
僵屍進程:
子進程在運行完后,不會把所有的資源回收,會告訴父進程自己運行完畢,等待父進程回收,那么這個過程中沒有完全死的進程,就是僵屍進程
孤兒進程:
父進程死了,但是還有子進程活着,這些子進程稱為孤兒進程
3 進程的高級應用
- 進程同步—鎖
進程之間數據雖然隔離,但是可能會操作同一個數據庫,同一個文件等外部資源,可能會造成數據的錯亂,所以多進程在處理外部資源時要加鎖處理
數據錯亂
from multiprocessing import Process
import time
class PClass(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
with open("name.txt", "wt", encoding="utf-8")as f:
f.write(self.name)
time.sleep(0.1)
with open("name.txt", "rt", encoding="utf-8")as f:
print("name:", self.name ,f.read())
if __name__ == '__main__':
lock = Lock()
lynn_p = PClass("lynn")
fancy_p = PClass("fancy")
lynn_p.start()
fancy_p.start()
print("完")
加鎖
from multiprocessing import Process
from multiprocessing import Lock
import time
class PClass(Process):
def __init__(self, name, lock):
super().__init__()
self.name = name
self.lock = lock
def run(self):
self.lock.acquire() # 加鎖 相當於with self.lock:
# with self.lock:
with open("name.txt", "wt", encoding="utf-8")as f:
f.write(self.name)
time.sleep(0.1)
with open("name.txt", "rt", encoding="utf-8")as f:
print("name:", self.name, f.read())
self.lock.release() # 開鎖
if __name__ == '__main__':
lock = Lock()
lynn_p = PClass("lynn", lock)
fancy_p = PClass("fancy", lock)
lynn_p.start()
fancy_p.start()
print("完")
注意:
把數據處理部分加鎖,把並行變成串行
降低了效率,但是保證了數據安全
加鎖一定記得開鎖,或者直接用with lock:
- 隊列—Queue
進程之間數據是隔離的,想實現進程間的數據通信,multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的
生產者—消費者模型
生產者:只負責生產,生產者把生產的東西放到隊列中
消費者:只負責消費,隊列中有一直消費,沒有就等着
import time
from multiprocessing import Process
from multiprocessing import Queue
from multiprocessing import Lock
class PClass(Process):
def __init__(self, name, lock, q):
super().__init__()
self.name = name
self.lock = lock
self.q = q
def run(self):
while True:
res = self.q.get()
print(res)
print("name:", self.name)
try:
if res == "ok": # 接到完成信號,停止接受
break
except Exception:
print(1)
class ProClass(Process):
def __init__(self, name, q):
super().__init__()
self.name = name
self.q = q
def run(self):
for i in range(10):
time.sleep(1)
self.q.put(i)
self.q.put("ok") # 生產者完成后,會發送完成信號
if __name__ == '__main__':
lock = Lock()
q = Queue(10)
pc = PClass("lynn", lock, q)
pro = ProClass("fancy", q)
pc.start()
pro.start()
# 生產者把生產的數字加到隊列中,消費者不停的從隊列中去數字
注意:
Queue(max)
max數字,隊列限制的最大數據量
q.put(data)
把數據data加到隊列中
q.get()
取出隊列中的數據
q.full()
隊列中數據個數等於max時為True
q.empty()
隊列中為空時,等於True
-
管道
-
進程池
進程利用的是計算機的多核優勢,但是當進程過多時,效率反而受到影響,所以要對進程數量做限制。
進程池:進程池可以指定數量的進程供用戶調用,當有新的請求(進程)發起時,如果進程池滿了,就等進程池中有進程結束,進入進程池。
同步:
import time
from multiprocessing import Pool
def p_func(name):
time.sleep(1)
print(name)
if __name__ == '__main__':
p = Pool(3)
for i in range(5):
p.apply(p_func, args=("lynnadsadasda",))
p.close()
print('w')
注意:
apply()
同步開啟子進程,只有一個進程完成后,才會起下一個進程,返回值會立馬返回。
異步
import time
from multiprocessing import Pool
def p_func(name):
time.sleep(1)
print(name)
return 100
if __name__ == '__main__':
p = Pool(3)
res_list = []
for i in range(5):
res = p.apply_async(p_func, args=("lynnadsadasda",))
res_list.append(res)
p.close()
p.join()
for i in res_list:
print(i.get())
print('w')
注意:
apply_async
異步開啟子進程
注意join()
異步調用時必須加該方法,主進程運行完畢后,進程池中開啟的子進程全部結束,所以要加該方法
close()
方法必須在join()
方法之前,意思是先關閉進程池,在等待
get()
獲取返回值,只有異步時才有該方法,同步沒有