使用進程池創(pool)建進程,用隊列(queue)進行進程間通信。在子進程里邊用協程去處理。
直接上代碼:
from datetime import datetime from multiprocessing import Pool, Manager import asyncio from random import randint import math import os ''' 需求: 有一個列表,列表中的元素求二次冪,並將值返回 方案: 用3個子進程,並在每個子進程中用協程去處理 ''' async def power(num): # 冪運算 用延時來模擬阻塞 # print(f'pid: {os.getpid}') await asyncio.sleep(randint(1, 5)) return num*num def create_task(data:list, queue): # print(f'pid: {os.getpid}') # 創建event_loop loop = asyncio.new_event_loop() # 創建task 將每一個元素創建一個task去執行 tasks = [loop.create_task(power(el)) for el in data] # 執行 loop.run_until_complete(asyncio.wait(tasks)) # 獲取task結果 for task in tasks: # 將結果寫到隊列中 # print(f'---: {task.result()}') queue.put(task.result()) def handle(data: list): process_num = 3 # 初始化進程池 pool = Pool(processes=process_num) # 創建隊列 使用進程池的時候,創建隊列用Manager才能使用 queue = Manager().Queue() start = 0 num = math.ceil(len(data) / process_num) # 數據長度 / 子進程數量 向上取整,將數據分成份,分給給每個進程 for i in range(1, num + 1): end = i*num # 創建子進程,並傳入數據和隊列 pool.apply_async(create_task, args=(data[start:end], queue)) start = end # 等所有進程執行完之后關閉進程 pool.close() pool.join() # 獲取執行結果 res = list() while not queue.empty(): # 從隊列中獲取數據 res.append(queue.get()) return res if __name__ == '__main__': bg = datetime.now() data = list(range(10)) res = handle(data) print('res', res) print('spend time: ', datetime.now()-bg)
在django中使用多進程時,如果出現
django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.
解決方案:
在使用到多進程的文件的最上邊添加以下代碼:
import django
django.setup()
