一、守护进程:
借助 daemon=True,必须在被守护的进程开启之前写上 会让被守护的子进程随着主进程的结束而结束
start 开启进程
join 阻塞进
举例守护进程,异步阻塞
import time from multiprocessing import Process def func(): #设置要守护的函数 print('~' * 10) time.sleep(15) #让子进程执行时间是十五秒 ,对比主进程10秒之后,是否会打印 --》 '@'*20 print('@'*20) #打印标识; def compire(): ##设置对比的函数 while True: #守护之后,会在主进程10秒之后也跟着结束 time.sleep(1) print('过去1秒') if __name__=='__main__': p=Process(target=func) p.daemon=True #守护第一个子进程 p.start() #开启第一个子进程 c=Process(target=compire) c.daemon=True #守护第二个子进程 c.start() #开启第二个子进程 for i in range(100): time.sleep(0.1) print('*'*i) # 让主进程十秒后结束
总结:
""" 守护进程,就是能够在主进程结束之后,子进程无论是循环还是有其他没有执行的内容,都不会执行了 如果有两个子进程,只保护其中一个,则两另一个子进程不会守护影响,会继续执行 进程守护要写在,start 之前 被守护进程中不能再开启子进程 """
二、进程中的其他方法:
pid查看进程ip
name查看进程名字
terminate 终结一个进程
is_alive() 查看一个进程是否活着,返回True False

import os import time from multiprocessing import Process def func(p): #由于主进程已经关闭了子进程,所以子进程不会再执行了 print('%s子进程ip'%os.getpid()) #子进程ip if __name__ == '__main__': p = Process(target=func) p.start() print(p.name,p.pid) #打印进程名字和id Process-1 8220 p.name = '进程名改啦' print(p.name) #进程名会更改 p.terminate() #异步,主进程发出关闭子进程命令,交给操作系统执行,至于什么时候执行,主进程并不关心,如果操作系统不是立马执行了,下面打印子进程可能还活着。 print(p.is_alive()) #True time.sleep(2) #睡两秒,操作系统一定是已经执行完了终结命令,下面再判断进程是否活着,就是False print(p.is_alive()) #False
三、给进程加锁 lock
# 在异步中,子进程的执行不受主进程是控制,当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。
比如售票系统,在同一时间,多个人(子进程)买一张票,就会造成混乱,这时候就出现了锁的概念
#lock #由并发变成了串行,牺牲了运行效率,但避免了竞争,保证了数据的安全。 from multiprocessing import Lock #导入 l=Lock() #制作一把锁 ,只有一把钥匙 l.acquire() #需要一把钥匙 l.acquire() #又需要一把钥匙 但是已经被拿走了,所以会阻塞在这 l.release() #把钥匙还回去了,别人就可以来取钥匙了

from multiprocessing import Lock from multiprocessing import process import random import json import time #首先建立一个‘piao’ 的text,手动存入字符串形式的字典{"count":4} 代表有四张火车票 def check_ticket(i): with open ('piao','r',encoding='utf-8') as f : ret=json.load(f)['count'] print('%s进来了,还有%d张票'%(i,ret)) def get_ticket(i): with open ('piao','r',encoding='utf-8') as f : ret = json.load(f)['count'] #读文件夹里,还有几张票 time.sleep(random.random()) if ret>0: #如果还有票,就可以买票 with open ('piao','w',encoding='utf-8') as f: json.dump({'count':ret-1},f) #每买走一张票,就把文件夹里票的数量减去1 print('%s买走了一张票,还剩%s张票'%(i,ret-1)) else: print('没有票了') def task(i,l): #建立任务子进程 check_ticket(i) #检查是谁进来了,还有几张票的 l.acquire() #如果还有票,就让他那一把钥匙 get_ticket(i) #买走一张票,这时候其他人都在等候 l.release() #买完票了还钥匙 # # if __name__=='__main__': l=Lock() #在建立子进程之前就把锁做好 for i in range(10): #循环建立子进程,模拟有10个人一起买票 p= Process(target=task,args=(i,l)) p.start()
四、信号量 multiprocess.Semaphoren 是同时允许一定数量的线程更改数据 .也就是可以指定有几把钥匙
from multiprocessing import Semaphore #导入 ˈseməˌfôr s=Semaphore(2) #设置有2把钥匙 s.acquire() #需要一把钥匙 print(1) #打印1,说明成功拿走一把钥匙 s.acquire() #需要一把钥匙 print(2) #打印2,说明成功拿走一把钥匙 s.release() #只有还了一把钥匙,下面才能打印出3 s.acquire() #需要一把钥匙 print(3) #这里并没有打印出来3 因为只有两把钥匙,当都被拿走了 ,再来拿就会被阻塞 知直到有人还了钥匙
模拟迷你唱吧, 只能容纳四个人,出一个人,才能进一个人

from multiprocessing import Semaphore from multiprocessing import Process import random import time def sing_bar(i,s): s.acquire() print('%s进来了'%i) time.sleep(random.random()) #随机某个时间,让一个人走 print('%s走了'%i) s.release() if __name__=='__main__': s=Semaphore(4) #模拟唱吧只能容纳4个人 for i in range(10): #模拟十个人一起去唱吧 p=Process(target=sing_bar,args=(i,s)) p.start()
五、进程之间的通信:基于IPC(Inter-Process Communication)协议
(1)事件
#事件:是主进程控制 多个 子进程的执行 from multiprocessing import Event #导入事件 s=Event() #建立一个信号灯,默认是红灯 s.wait() #红灯会阻塞 s.set() #红灯变绿灯 s.clear() #清除绿灯 -->即绿灯变红灯 s.is_set() # 是否阻塞 True就是绿灯 False就是红灯 #机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时#就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。 #clear:将“Flag”设置为False #set:将“Flag”设置为True
模拟卡车过红绿灯,卡车是随机来,红绿灯每3秒替换一次

from multiprocessing import Event import random import time from multiprocessing import Process def light(e): while True: #让红 绿 灯循环替换 if e.is_set(): # 是否阻塞 True就是绿灯 False就是红灯 time.sleep(2) e.clear() # 阻塞 绿变红 print('红灯亮了') else: time.sleep(2) e.set() #阻塞变非阻塞 红变绿 print('绿灯亮了') def car(i,e): e.wait() #绿灯会通过,红灯就等待 print('%s车过了' % i) if __name__=='__main__': e=Event() p=Process(target=light,args=(e,)) #启动 新线程控制红绿灯 p.start() for i in range(20): #20辆卡车 if i%6==0: #如果整除6 那么就启动一个卡车子进程 time.sleep(random.randint(1,3)) #启动前 随机等1-3秒 car_p = Process(target=car, args=(i,e)) car_p.start()
(2)对列
创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递
from multiprocessing import Queue #导入对列,提供get 和 put 方法 put 几个,get几个,get完了就没有了,再get 就会阻塞,且按照put的顺序依次get q=Queue() q.put(1) #往对列里面放 q.put(2) print(q.get()) # 从对列里往外拿 print(q.get()) q.put(3) print(q.get()) q=Queue(4) #可以设置参数,意味着对列里只能放四个值 #q.qsize() #得出对列长度 #通过队列实现了 主进程与子进程的通信 子进程与子进程之间的通信

from multiprocessing import Queue from multiprocessing import Process import time def producer(q): for i in range(20): #准备生产20个包子 q.put('%s个包子'%i) print(q) def consumer(q): for i in range(10): #一个消费者一次get10个包子,吃的速度慢与生产 time.sleep(1) print(q.get()) if __name__=='__main__': q=Queue(10) #设置对托盘(对列)只允许放10个包子 p=Process(target=producer,args=(q,)) p.start() time.sleep(1) c1 = Process(target=consumer, args=(q,)) #一个消费者get十个包子 c1.start() # c2 = Process(target=consumer, args=(q,)) #增加一个消费者,就可以把20个包子吃完,生产和消费就平衡了 # c2.start()
生产者消费者模型进阶:由于正常情况下,消费者是不知道生产者生产多少,这时候 怎么实现双向通信呢?这里有几种方法:
(1)基于队列实现生产者消费者模型,传递一个成产结束的信号

from multiprocessing import Queue from multiprocessing import Process import time import random def producer(q,food): for i in range(20): #两个生产进程准备生产40个包子 q.put('%s个%s'%(i,food)) time.sleep(random.random()) q.put('生产完了') #生产结束的信号(由于有三个消费者,两个生产者,那么只会产生两个结束的信号,也就是还有一个消费者拿不到结束信号,程序还是没法结束,所以下面还要再设置一个结束信号) q.put('生产完了') # 生产结束的信号 def consumer(q,name): while True:#两个消费者 循环消费,直到拿到生产结束的信号 food=q.get() if food =='生产完了':break #判断拿出来的数据是不是’生产完了‘的信号 ,如果是则说明生产结束,直接退出 else: print(name, food) if __name__=='__main__': q=Queue() p1=Process(target=producer,args=(q,'包子')) #第一个生产者生产20个包子 p2 = Process(target=producer, args=(q,'馒头'))#第二个生产者生产20个馒头 p1.start() p2.start() c1 = Process(target=consumer, args=(q,'zxe')) #第一个消费者 c2= Process(target=consumer, args=(q,'zzxxcc')) # 第二个消费者 c3 = Process(target=consumer, args=(q, 'zzxxcc')) # 第三个消费者 c1.start() c2.start() c3.start()
""" 对列是安全的 生产者消费者 模型: #由于消费者不知道生产者生产多少数据,所以要用while循环 #循环无法结束,就要生产者给一个结束信号 #信号数量=消费者的数量-生产者的数量+1,这种方法就略微麻烦了 """
(2)JoinableQueue 创建可连接的共享进程队列,队列允许项目的使用者通知生产者项目已经被成功处理
# q=JoinableQueue() # q.get q.put方法 # q.join 生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。 # q.task_done 使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。

from multiprocessing import JoinableQueue from multiprocessing import Process import time import random def producer(q,): for i in range(5): q.put('生产的第%s个包子'%(i)) time.sleep(random.random()) q.join() #直到收到5个task_done信号,才会结束 def consumer(q,name): while True: #由于不知道生产者生产多少,所以用循环来处理不确定的数据 food=q.get() #处理数据(消费包子) print('%s吃了%s'%(name,food)) q.task_done() #每次消费一个包子都会向生产者发送一个task_done if __name__=='__main__': q=JoinableQueue() #创建可连接的共享进程队列对象 p=Process(target=producer,args=(q,)) p.start() c1=Process(target=consumer,args=(q,'zxc')) #要把q对列对象传进去 c1.daemon=True #守护进程,因为consumer子进程是一个无线循环的函数,为了让它在处理完数据之后正常结束 c2=Process(target=consumer,args=(q,'zzxc')) c2.daemon = True #守护进程要写在进程start前面 c1.start() c2.start() p.join() #等待p进程执行完,只要p结束了就代表consumer处理完数据了(如果不加阻塞,主进程会很快执行完了,那么守护进程也会随之结束,而p进程的join 会一直等待task_done 整个程序将无法结束)
总结:生产者生产的数据全部被消费 ——> 生产者进程结束 ——> 主进程代码执行结束 —— >消费者守护进程结束