使用进程池创(pool)建进程,用队列(queue)进行进程间通信。在子进程里边用协程去处理。
直接上代码:
from datetime import datetime from multiprocessing import Pool, Manager import asyncio from random import randint import math import os ''' 需求: 有一个列表,列表中的元素求二次幂,并将值返回 方案: 用3个子进程,并在每个子进程中用协程去处理 ''' async def power(num): # 幂运算 用延时来模拟阻塞 # print(f'pid: {os.getpid}') await asyncio.sleep(randint(1, 5)) return num*num def create_task(data:list, queue): # print(f'pid: {os.getpid}') # 创建event_loop loop = asyncio.new_event_loop() # 创建task 将每一个元素创建一个task去执行 tasks = [loop.create_task(power(el)) for el in data] # 执行 loop.run_until_complete(asyncio.wait(tasks)) # 获取task结果 for task in tasks: # 将结果写到队列中 # print(f'---: {task.result()}') queue.put(task.result()) def handle(data: list): process_num = 3 # 初始化进程池 pool = Pool(processes=process_num) # 创建队列 使用进程池的时候,创建队列用Manager才能使用 queue = Manager().Queue() start = 0 num = math.ceil(len(data) / process_num) # 数据长度 / 子进程数量 向上取整,将数据分成份,分给给每个进程 for i in range(1, num + 1): end = i*num # 创建子进程,并传入数据和队列 pool.apply_async(create_task, args=(data[start:end], queue)) start = end # 等所有进程执行完之后关闭进程 pool.close() pool.join() # 获取执行结果 res = list() while not queue.empty(): # 从队列中获取数据 res.append(queue.get()) return res if __name__ == '__main__': bg = datetime.now() data = list(range(10)) res = handle(data) print('res', res) print('spend time: ', datetime.now()-bg)
在django中使用多进程时,如果出现
django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.
解决方案:
在使用到多进程的文件的最上边添加以下代码:
import django
django.setup()