https://www.cnblogs.com/Eva-J/articles/8253549.html 参考链接
multiprocess模块
进程的生命周期:
1.主进程
2.子进程
开启子进程的主进程:
主进程自己的代码如果长,等待自己的代码执行结束。
子进程的执行时间长,主进程会在主进程代码执行完毕后等待子进程执行完毕后 主进程才结束。
开启一个进程
from multiprocessing import Process import time def func(): print('我是一个子进程') if __name__ == '__main__': # p是一个进程对象 还没有启动进程 p = Process(target=func) # 主进程 # 启动一个子进程. 操作系统创建新进程执行新进程中的代码 p.start() # 主进程 # 一般都是异步开启子进程,主进程先执行 print('riven') print('mark') print('mimi')
传参和查看进程号
Os.getpid :查看当前进程的进程号。
Os.getppid :查看当前进程的父进程号。
from multiprocessing import Process import time import os # 传值给子进程 def func(args, kwargs): print(args) print(kwargs) # 查看当前进程进程号. print(os.getpid()) # 查看当前进程父进程号. print(os.getppid()) if __name__ == '__main__': # args = 传入的参数 p = Process(target=func, args=("我的乖乖", "我太难了")) # 主进程 # 启动一个子进程. 操作系统创建新进程执行新进程中的代码 p.start() # 主进程 # 一般都是异步开启子进程,主进程先执行 print('riven') print('mark') print('mimi') # 查看当前进程父进程号. print(os.getppid()) # 查看当前进程进程号. print(os.getpid())
Join
加了join 将先执行子进程 再执行主进程。
from multiprocessing import Process import time import os # 传值给子进程 def func(args, kwargs): print(args) print(kwargs) if __name__ == '__main__': # args = 传入的参数 p = Process(target=func, args=("我的乖乖", "我太难了")) # 主进程 # 启动一个子进程. 操作系统创建新进程执行新进程中的代码. # 感知一个子程序的结束,将异步程序改为同步. p.start() # 子进程 p.join() print('先执行子进程 再执行主进程')
执行多个子进程(两种方法)
1.基于函数
from multiprocessing import Process import time import os # 传值给子进程 def func(args): print('#' * args) if __name__ == '__main__': # 启动多个子进程 re = [] for i in range(20): p = Process(target=func, args=(i,)) # 主进程 tt = re.append(p) # 1.启动一个子进程. 操作系统创建新进程执行新进程中的代码. # 2.感知一个子程序的结束,将异步程序改为同步. p.start() # 主进程 # join = 先执行子进程 再执行主进程 p.join() print('先执行子进程 再执行主进程')
2.基于类
from multiprocessing import Process import time import os # 子进程 class Myprocess(Process): # 添加init属性 def __init__(self, arg1, arg2): super().__init__() self.arg1 = arg1 self.arg2 = arg2 # 必须执行一个run方法 def run(self): # 查看当前进程的进程号 print(self.pid) # 查看当前进程的名称 print(self.name) print(self.arg1) print(self.arg2) if __name__ == '__main__': # 启动多个子进程 for i in range(20): p = Myprocess('这是一个好的开始', '代码改变世界') # 1.启动一个子进程. 操作系统创建新进程执行新进程中的代码. # 2.感知一个子程序的结束,将异步程序改为同步. p.start() p = Myprocess('good idea', '我想你了') p.start() # 主进程 # join = 先执行子进程 再执行主进程 p.join() print('先执行子进程 再执行主进程')
进程与进程之间的变量问题
from multiprocessing import Process import time import os # 子进程 class Myprocess(Process): # 添加init属性 def __init__(self, arg1, arg2): super().__init__() self.arg1 = arg1 self.arg2 = arg2 # 必须执行一个run方法 def run(self): global n n = 0 if __name__ == '__main__': # 启动多个子进程 for i in range(20): p = Myprocess('这是一个好的开始', '代码改变世界') # 1.启动一个子进程. 操作系统创建新进程执行新进程中的代码. # 2.感知一个子程序的结束,将异步程序改为同步. p.start() # 主进程 # join = 先执行子进程 再执行主进程 p.join() print(n) # PS:在每个进程中定义的变量,只能在本进程中使用
进程之间实现聊天
服务端
# 进程之间实现聊天 import socket from multiprocessing import Process # 子进程 def server(conn): # 接受数据 ret = conn.recv(1024).decode('utf-8') print(ret) conn.send(b'Hello') if __name__ == '__main__': # 创建一个socket sk = socket.socket() # 创建一个域名和端口 sk.bind(('127.0.0.1', 8070)) # 监听客户端的连接 sk.listen() # 接受客户端的数据 conn, addr = sk.accept() # while 1: # 这个循环没有一直在启动进程,因为socket会亢住等待客户端连接 p = Process(target=server, args=(conn,)) p.start()
客户端
import socket from multiprocessing import Process def client(sk, msg): sk.send(bytes(msg, encoding='utf-8')) ret = sk.recv(1024) print(ret) if __name__ == '__main__': sk = socket.socket() sk.connect(('127.0.0.1', 8070)) # while 1: msg = input() p = Process(target=client, args=(sk, msg)) p.start()
守护进程
p.terminate() :在主程序内结束一个子进程。 p.is_alive() :检验一个进程是否还活着的状态。 p.name)() :这个进程的名字。 p.pid() :这个进程的进程号。
from multiprocessing import Process import time def fun1(): while 1: # 给主进程 反馈信息,证明自己在运行。 time.sleep(0.5) print('我还活着呢') if __name__ == '__main__': p = Process(target=fun1) # 设置子进程为守护进程 p.daemon = True p.start() # 结束一个子进程 p.terminate() i = 0 while i < 10 : print('我是主进程') time.sleep(1) # 检验一个主进程 是否还活着 i = i + 1 # 守护进程会随着主进程的代码执行完毕而结束
LOCK锁
Lock:一次只能执行一个子程序,而且只能等执行完之后才能执行下一个。
不加锁会造成数据不安全的操作。
import json from multiprocessing import Process from multiprocessing import Lock import time # 修改数据库必须加锁 def show(i): with open('ticket') as f: str = f.read() obj = json.loads(str) print('%s号查看了 余票: %s' % (i, obj['ticket'])) def buy_ticket(i, lock): # 拿钥匙进门 lock.acquire() with open('ticket') as f: str = f.read() obj = json.loads(str) time.sleep(0.1) if obj['ticket'] > 0: obj['ticket'] -= 1 print('\033[34m%s 买到票了 \033[0m ' % i) else: print('\033[34m%s 没买到票 \033[0m' % i) with open('ticket', 'w') as f1: f1.write(json.dumps(obj)) # 还钥匙 lock.release() if __name__ == '__main__': for i in range(10): p = Process(target=show, args=(i,)) p.start() lock = Lock() for i in range(10): p = Process(target=buy_ticket, args=(i, lock)) p.start()
信号量(Semaphore)
Semaphore: 用锁的原理实现的,内置了一个记数器。在同一时间只能有指定数量的进程执行某一段被控制住的代码。
# 限定进程访问次数,指定一次进2个人 等他们出来后其他人才能进去
Sem.acquire(): 获取钥匙。
Sem.release(): 还钥匙。
import time from multiprocessing import Semaphore, Process import random def ktv(i, sem): # 获取钥匙 sem.acquire() print('%s 走进ktv' % i) time.sleep(random.randint(1, 5)) print('%s走出ktv' % i) # 归还钥匙 sem.release() if __name__ == '__main__': # 限定进程访问次数,指定一次进2个人 等他们出来后其他人才能进去 sem = Semaphore(2) for i in range(20): p = Process(target=ktv, args=(i, sem)) p.start()
事件(Event)
Set 和 cleat
分别用来修改一个事件的状态 Ture或者 False
Is__set
用来查看一个事件的状态
Wait
是依据事件的状态来决定自己是否阻塞 False阻塞 True 不阻塞
# 事件(Event) from multiprocessing import Event # 一个信号可以使所有的进程都进入阻塞状态 # 也可以控制所有的进程解除阻塞 # 一个世界被创建之后,默认是阻塞状态 # 创建一个事件 e = Event() # 查看一个事件的状态,默认被设置成阻塞false print(e.is_set()) # 将这个事件的状态改为Ture e.set() # 是依据 e.is_set() 的值 来决定是否阻塞 e.wait() # 查看一个事件的状态,默认被设置成阻塞false print(e.is_set()) # 将这个事件的状态改为False print(e.clear()) # 是依据 e.is_set() 的值 来决定是否阻塞 e.wait()
红绿灯效应
from multiprocessing import Process, Event import time import random def car(i, e): if e.is_set(): print('车%s在等待' % i) e.wait() else: print('\033[33m车%s通过了\033[0m'%i) def lint(e): while 1: # 判断这个事件是否为Ture if e.is_set(): print('\033[31m 红灯亮了 \033[0m') time.sleep(2) # 将这个事件的状态改为False e.clear() else: print('\033[32m 绿灯亮了 \033[0m') time.sleep(2) # 将这个事件的状态改为Ture e.set() if __name__ == '__main__': e = Event() p = Process(target=lint, args=(e,)) p.start() for i in range(20): p1 = Process(target=car, args=(i, e)) p1.start() time.sleep(random.randint(1, 2))
进程间的通信 ----队列和管道
队列
q = Queue(5) #创建共享的进程队列,如果省略此参数,则无大小限制。
q.put(1) #将1放入队列。如果队列已满,此方法将阻塞至有空间可用为止.
q.full() #用于判断队列是否已经满了。
q.get() #返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。
q.get_nowait() #和get方法一样 但是q如果为空的话会报错。
q.empty() #用于判断队列是否已经为空。
Full empty #不完全准确。
from multiprocessing import Queue # 创建共享的进程队列,如果省略此参数,则无大小限制. q = Queue(5) # 将1 放入队列。 如果队列已满,此方法将阻塞至有空间可用为止 q.put(1) q.put(2) q.put(3) q.put(4) q.put(5) # 判断队列是否已经满了。 print(q.full()) # 返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。 print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) # get_nowait 和get方法一样不过 get_nowait 如果发现没有取到值就会报错 try: print(q.get_nowait()) except Exception: print('队列已经空了') # 用于判断队列是否为空 print(q.empty())
子进程是与主进程之间通信的
from multiprocessing import Queue from multiprocessing import Process def func(e): # 放入队列。如果队列已经满,此方法将阻塞至有空间可用为止 e.put('我是Mark I am Strong man') if __name__ == '__main__': e = Queue() q = Process(target=func,args=(e,)) q.start() # 在队列中 获取子进程放进来的值 print(e.get())
子进程是可以与子进程之间通信的
例子: 生产者消费者模型
# 生产者 def producer(name, food, q): for i in range(20): time.sleep(random.random()) f = "%s 制作了的第%s个%s" % (name, i, food) print(f) # 将数据放入队列中 q.put(f) # 消费者 def chibaozi(name, q): while 1: # 在队列中取值 food = q.get() # 不能用字符形式格式,需要用is关键字才能和none配合 if food is None: break print("%s 消费了 %s" % (name, food)) if __name__ == '__main__': # 创建一个队列 q = Queue() # 生产者 qq = Process(target=producer, args=('Mark', '包子', q)) qq1 = Process(target=producer, args=('Riven', '馒头', q)) # 消费之 qq2 = Process(target=chibaozi, args=('黄埔', q)) qq3 = Process(target=chibaozi, args=('佘义', q)) # 统一启动子进程 qq.start() qq1.start() qq2.start() qq3.start() # 先执行子程序,后执行主程序代码 qq.join() qq1.join() # 放入None 让消费者跳出循环 q.put(None) q.put(None)
JoinableQueue
例 : 进阶版(生产消费者模型)
q.join()
# 阻塞 直到一个队列中的所有数据 全部被执行完毕。接受消费端发送过来的标记。
生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。
阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。
q.task_done()
内部执行了一个 count - 1的操作,发送信号给q.join使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。
from multiprocessing import JoinableQueue from multiprocessing import Process import random import time # 生产者 def producer(name, food, q): for i in range(20): time.sleep(random.random()) f = "%s 制作了的第%s个%s" % (name, i, food) print(f) # 将数据放入队列中 q.put(f) # 阻塞 直到一个队列中的所有数据 全部被执行完毕。接受消费端发送过来的标记. q.join() # 消费者 def chibaozi(name, q): while 1: # 在队列中取值 food = q.get() print("%s 消费了 %s" % (name, food)) time.sleep(random.randint(1,3)) # 在消费者这一端:每次获取一个数据 处理一个数据. 发送一个记号:标志一个数据被处理成功 # 内部执行了一个 count - 1 的操作 q.task_done() if __name__ == '__main__': # 创建一个队列 q = JoinableQueue() # 生产者 qq = Process(target=producer, args=('Mark', '包子', q)) qq1 = Process(target=producer, args=('Riven', '馒头', q)) # 消费者 qq2 = Process(target=chibaozi, args=('黄埔', q)) qq3 = Process(target=chibaozi, args=('佘义', q)) # 统一启动子进程 qq.start() qq1.start() # 设置 消费者 为守护进程 主进程中的代码执行完毕之后,子进程自动结束 qq2.daemon =True qq3.daemon =True qq2.start() qq3.start() # 感知一个子程序的结束 qq.join() qq1.join()
文字 总结:
#在生产者这一端:
#每一次生产一个数据
#且每一次生产的数据都放这队列中
#在队列中刻上一个记号
#当生产者全部生产完毕之后
#join 信号:已经停止生产数据了
#且要等待之前被刻上的记号都被消费完
#当数据都被处理完时,join阻塞结束
#消费端 中把所有的任务消耗完
#生产端 中的join感知到,停止阻塞
#所有 生产端 进程结束
#主进程中的p.join结束
#主进程中代码结束
#守护进程(消费者进程)结束.
在消费着者这一端:
#每次获取一个数据
#处理一个数据
#发送一个记号:标志一个数据被处理成功
管道
数据不安全性
#IPC。
#加锁来控制操作管道的行为 来避免进程之间争抢数据造成的数据不安全现象。
#队列 进程之间数据安全。
#管道 + 锁。
例1、
from multiprocessing import Pipe # 接受2个地址 conn1, conn2 = Pipe() # conn1 是发送端, conn1.send('123456') # conn2 是接受端 print(conn2.recv())
例2、
from multiprocessing import Pipe from multiprocessing import Process def func(conn1): # 发送消息 conn1.send('大傻我爱你') if __name__ == "__main__": # 接受2个端口 conn1, conn2 = Pipe() p = Process(target=func,args=(conn1,)) p.start() # 接受消息 print(conn2.recv())
例3、通过条件判断关闭管道进程
from multiprocessing import Pipe from multiprocessing import Process def func(conn2): while 1: # 接收端 ret = conn2.recv() print(ret) # 通过判断条件来关闭进程 if ret is None: break if __name__ == "__main__": # 接受2个端口 conn1, conn2 = Pipe() p = Process(target=func, args=(conn2,)) p.start() for i in range(20): # 发送端 conn1.send('吃了吗?') conn1.send(None)
例4、使用close关闭进程的方法
当最后一个端口没关的时候就会报错。我们捕获错误信息进行操作就可以了。
from multiprocessing import Pipe from multiprocessing import Process def func(conn2, conn1): # 发送端关闭 conn1.close() while 1: # 管道只有一端没有关闭就会报错异常 我们又不能让它每一次循环都关闭 只能try一下。 try: # 接受端 接受消息 ret = conn2.recv() print(ret) except EOFError: # 接受端关闭 conn2.close() break if __name__ == "__main__": # 接受2个端口 conn1, conn2 = Pipe() p = Process(target=func, args=(conn2, conn1)) p.start() # 主进程接受端关闭 conn2.close() for i in range(20): # 发送端 conn1.send('吃了吗?') # 主进程 发送端关闭 conn1.close()
基于 管道的生产者消费者模型
from multiprocessing import Pipe from multiprocessing import Process from multiprocessing import Lock import time import random def func(name, food, conn1, conn2): # 关闭接受端 conn2.close() for i in range(20): ret = "\033[31m %s制作了%s个%s\033[0" % (name, i, food) print(ret) # 发送数据到管道 conn1.send(ret) time.sleep(random.randint(1, 2)) # 关闭发送端 conn1.close() def chi(name, conn1, conn2, lock): # 把发送端关闭 conn1.close() while 1: try: #加锁 lock.acquire() ret1 = conn2.recv() print("\033[32m %s吃了%s \033[0m" % (name, ret1)) #加锁 lock.release() except EOFError: conn2.close() break if __name__ == "__main__": # 接受2个端口 conn1, conn2 = Pipe() # 创建锁 lock = Lock() p = Process(target=func, args=('Mark', '包子', conn1, conn2,)) p.start() p2 = Process(target=chi, args=('黄埔', conn1, conn2, lock,)) p2.start() # 记住一定要 关闭主进程 conn1.close() conn2.close()
进程之间的数据共享(Manager)
例1、
from multiprocessing import Manager, Process def main(dic): dic['count'] -=1 print(dic) if __name__ == "__main__": # 虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此 m = Manager() # 放入一个字典 dic = m.dict({"count": 100}) p_lst = [] p = Process(target=main,args=(dic,)) p.start() # 感知一个子程序的结束 p.join() print('主进程',dic)
例2、会出现数据不安全
from multiprocessing import Manager, Process def main(dic): dic['count'] -= 1 print('子进程', dic) if __name__ == "__main__": # 虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此 m = Manager() # 放入一个字典 dic = m.dict({"count": 100}) for i in range(20): p = Process(target=main, args=(dic,)) p.start() p.join() print('测试', dic)
数据不安全性可能会出现一个进程同时用一个数据
解决数据不安全问题(加锁)
from multiprocessing import Manager from multiprocessing import Process from multiprocessing import Lock def main(dic,lock): # 加锁 lock.acquire() dic['count'] -= 1 print('子进程', dic) lock.release() if __name__ == "__main__": # 虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此 m = Manager() lock = Lock() # 放入一个字典 dic = m.dict({"count": 100}) for i in range(20): p = Process(target=main, args=(dic,lock)) p.start() p.join() print('测试', dic)
进程池(Pool)
为什么要有进程池?进程池的概念。
在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。
那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程也需要消耗时间。
第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。
因此我们不能无限制的根据任务开启或者结束进程。那么我们要怎么做呢?
在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,
等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,
拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。
# 效率。
# 每开启进程,开启属于这个进程的内存空间。
# 寄存器 堆栈 文件。
进程池的数量是 cpu个数+1
# 进程过多 操作系统的调度。
# 更高级的进程: 在忙的时候可以 20+ ,在不忙的时候自动降为3个左右。
例1、起一个进程池
from multiprocessing import Pool from multiprocessing import Process def func(n): for i in range(10): print(n + 1) if __name__ == "__main__": # 开启了5个进程 pool = Pool(5)
进程池的同步调用(apply):(一般不用)
对比:
1. 正常情况下先执行5个start 后执行5个end
from multiprocessing import Pool from multiprocessing import Process import time import os def func(n): print("子进程开始: %s"%n, os.getpid()) time.sleep(1) print("子进程结束: %s" % n, os.getpid()) if __name__ == "__main__": # 开启了5个进程 pool = Pool(5) for i in range(10): # 正常情况下先执行5个start 后执行5个end p = Process(target=func,args=(i,)) p.start()
子进程开始: 1 13304 子进程开始: 3 13276 子进程开始: 2 12304 子进程开始: 4 12384 子进程开始: 5 12380 子进程开始: 9 6200 子进程开始: 0 13288 子进程开始: 7 4288 子进程开始: 8 13244 子进程开始: 6 7688 子进程结束: 1 13304 子进程结束: 3 13276 子进程结束: 2 12304 子进程结束: 4 12384 子进程结束: 5 12380 子进程结束: 9 6200 子进程结束: 0 13288 子进程结束: 7 4288 子进程结束: 8 13244 子进程结束: 6 7688
2.使用apply(# 在使用了 apply后start和end变成了同步)
from multiprocessing import Pool from multiprocessing import Process import time import os def func(n): print("子进程开始: %s"%n, os.getpid()) time.sleep(1) print("子进程结束: %s" % n, os.getpid()) return n if __name__ == "__main__": # 开启了5个进程(进程池中的进程 永远都是活着的) pool = Pool(5) for i in range(10): # 正常情况下先执行5个start 后执行5个end # p = Process(target=func,args=(i,)) # 在使用了 apply后start和end变成了同步 p = pool.apply(func,args=(i,)) # 获取返回值 print(p)
进程池的异步调用(用的比较多)
# 异步的apply_async用法:
主进程需要使用 jion,
等待进程池内任务都处理完,然后可以用get收集结果
否则, 主进程结束,进程池可能还没来得及执行,也就跟着结束了.
返回值:为了能使用返回值需要使用 obj.get()方法
# 使用get来获取apply_aync 的结果,如果是apply,则没有get方法
# 因为apply是同步执行,立刻获取结果,也根本无需get
from multiprocessing import Pool from multiprocessing import Process import time import os def func(n): print("子进程开始: %s"%n, os.getpid()) time.sleep(10) print("子进程结束: %s" % n, os.getpid()) return n*10 if __name__ == "__main__": # 开启了5个进程(进程池中的进程 永远都是活着的) pool = Pool(5) lis = [] for i in range(10): """ # 异步的apply_async用法: 如果使用异步提交的任务 主进程需要使用 jion,等待进程池内任务都处理完,然后可以用get收集结果 否则, 主进程结束,进程池可能还没来得及执行,也就跟着结束了 """ ret = pool.apply_async(func,args=(i,)) lis.append(ret) # 使用get来获取apply_aync 的结果,如果是apply,则没有get方法 # 因为apply是同步执行,立刻获取结果,也根本无需get for li in lis: print(li.get()) # 结束进程池接受任务 pool.close() # 感知进程池中的任务执行结束 pool.join()
进程池的socket
进程池中的回调函数
回调函数与爬虫的应用