python并发-多进程
多进程能实现真正意义上的并发(并行),能利用多核优势,适用计算密集型的程序
1 Process类
- 开启子进程—函数
import time
from multiprocessing import Process
def p_func(name):
time.sleep(3)
print(name, ": ok")
if __name__ == '__main__':
p = Process(target=p_func, args=("lynn",))
p.start()
print("主进程")
注意:
Process中的关键字参数,target的值是方法名字,args是元组
start()方法,开启子进程
子进程是异步的
- 开启子进程—类
import time
from multiprocessing import Process
class PClass(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
time.sleep(2)
print("name: ", self.name)
if __name__ == '__main__':
p = PClass("lynn")
p.start()
print("主")
注意:
继承Process,重写__init__
方法,必须有super().__init__()
必须有run
方法,start()方法会自动调用run
方法
子进程是异步的
进程之间的内存是完全隔离的
- Process对象的join方法
import time
from multiprocessing import Process
class PClass(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
time.sleep(1)
print("name:", self.name)
if __name__ == '__main__':
p = PClass("lynn")
p.start()
print("主开")
p.join()
print("主")
注意:
join()方法会阻塞,上边开启的所有的子进程全部运行完成,才继续往下运行,并不是所有的进程是串行,只是在这会阻塞
-
Process对象的其他方法或属性
is_alive
p.is_alive() # 查看进程是否存活,True/False
p.pid # 进程的pid
2 特殊的进程
-
守护进程
守护进程会在主进程运行结束时,立马停止运行
守护进程内无法开启子进程,会报错
from multiprocessing import Process
import time
class PClass(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
time.sleep(1)
print("name:", self.name)
if __name__ == '__main__':
p = PClass("lynn")
p1 = PClass("fancy")
p.daemon = True # 设置p为守护进程
p.start()
p1.start()
print("主") # 打印时,p进程停止运行
注意:
p.daemon = True
必须在p.start()
之前
- 其他
僵尸进程:
子进程在运行完后,不会把所有的资源回收,会告诉父进程自己运行完毕,等待父进程回收,那么这个过程中没有完全死的进程,就是僵尸进程
孤儿进程:
父进程死了,但是还有子进程活着,这些子进程称为孤儿进程
3 进程的高级应用
- 进程同步—锁
进程之间数据虽然隔离,但是可能会操作同一个数据库,同一个文件等外部资源,可能会造成数据的错乱,所以多进程在处理外部资源时要加锁处理
数据错乱
from multiprocessing import Process
import time
class PClass(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
with open("name.txt", "wt", encoding="utf-8")as f:
f.write(self.name)
time.sleep(0.1)
with open("name.txt", "rt", encoding="utf-8")as f:
print("name:", self.name ,f.read())
if __name__ == '__main__':
lock = Lock()
lynn_p = PClass("lynn")
fancy_p = PClass("fancy")
lynn_p.start()
fancy_p.start()
print("完")
加锁
from multiprocessing import Process
from multiprocessing import Lock
import time
class PClass(Process):
def __init__(self, name, lock):
super().__init__()
self.name = name
self.lock = lock
def run(self):
self.lock.acquire() # 加锁 相当于with self.lock:
# with self.lock:
with open("name.txt", "wt", encoding="utf-8")as f:
f.write(self.name)
time.sleep(0.1)
with open("name.txt", "rt", encoding="utf-8")as f:
print("name:", self.name, f.read())
self.lock.release() # 开锁
if __name__ == '__main__':
lock = Lock()
lynn_p = PClass("lynn", lock)
fancy_p = PClass("fancy", lock)
lynn_p.start()
fancy_p.start()
print("完")
注意:
把数据处理部分加锁,把并行变成串行
降低了效率,但是保证了数据安全
加锁一定记得开锁,或者直接用with lock:
- 队列—Queue
进程之间数据是隔离的,想实现进程间的数据通信,multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的
生产者—消费者模型
生产者:只负责生产,生产者把生产的东西放到队列中
消费者:只负责消费,队列中有一直消费,没有就等着
import time
from multiprocessing import Process
from multiprocessing import Queue
from multiprocessing import Lock
class PClass(Process):
def __init__(self, name, lock, q):
super().__init__()
self.name = name
self.lock = lock
self.q = q
def run(self):
while True:
res = self.q.get()
print(res)
print("name:", self.name)
try:
if res == "ok": # 接到完成信号,停止接受
break
except Exception:
print(1)
class ProClass(Process):
def __init__(self, name, q):
super().__init__()
self.name = name
self.q = q
def run(self):
for i in range(10):
time.sleep(1)
self.q.put(i)
self.q.put("ok") # 生产者完成后,会发送完成信号
if __name__ == '__main__':
lock = Lock()
q = Queue(10)
pc = PClass("lynn", lock, q)
pro = ProClass("fancy", q)
pc.start()
pro.start()
# 生产者把生产的数字加到队列中,消费者不停的从队列中去数字
注意:
Queue(max)
max数字,队列限制的最大数据量
q.put(data)
把数据data加到队列中
q.get()
取出队列中的数据
q.full()
队列中数据个数等于max时为True
q.empty()
队列中为空时,等于True
-
管道
-
进程池
进程利用的是计算机的多核优势,但是当进程过多时,效率反而受到影响,所以要对进程数量做限制。
进程池:进程池可以指定数量的进程供用户调用,当有新的请求(进程)发起时,如果进程池满了,就等进程池中有进程结束,进入进程池。
同步:
import time
from multiprocessing import Pool
def p_func(name):
time.sleep(1)
print(name)
if __name__ == '__main__':
p = Pool(3)
for i in range(5):
p.apply(p_func, args=("lynnadsadasda",))
p.close()
print('w')
注意:
apply()
同步开启子进程,只有一个进程完成后,才会起下一个进程,返回值会立马返回。
异步
import time
from multiprocessing import Pool
def p_func(name):
time.sleep(1)
print(name)
return 100
if __name__ == '__main__':
p = Pool(3)
res_list = []
for i in range(5):
res = p.apply_async(p_func, args=("lynnadsadasda",))
res_list.append(res)
p.close()
p.join()
for i in res_list:
print(i.get())
print('w')
注意:
apply_async
异步开启子进程
注意join()
异步调用时必须加该方法,主进程运行完毕后,进程池中开启的子进程全部结束,所以要加该方法
close()
方法必须在join()
方法之前,意思是先关闭进程池,在等待
get()
获取返回值,只有异步时才有该方法,同步没有