python 多進程+協程 實現並發


使用進程池創(pool)建進程,用隊列(queue)進行進程間通信。在子進程里邊用協程去處理。

直接上代碼:

from datetime import datetime
from multiprocessing import Pool, Manager
import asyncio
from random import randint
import math
import os

'''
  需求:
    有一個列表,列表中的元素求二次冪,並將值返回
  方案:
    用3個子進程,並在每個子進程中用協程去處理
'''
async def power(num):
  # 冪運算 用延時來模擬阻塞
  # print(f'pid: {os.getpid}')
  await asyncio.sleep(randint(1, 5))
  return num*num

def create_task(data:list, queue):
  # print(f'pid: {os.getpid}')
  # 創建event_loop
  loop = asyncio.new_event_loop()
  # 創建task 將每一個元素創建一個task去執行
  tasks = [loop.create_task(power(el)) for el in data]
  # 執行
  loop.run_until_complete(asyncio.wait(tasks))

  # 獲取task結果
  for task in tasks:
    # 將結果寫到隊列中
    # print(f'---: {task.result()}')
    queue.put(task.result())

  

def handle(data: list):
  process_num = 3
  # 初始化進程池
  pool = Pool(processes=process_num)
  # 創建隊列 使用進程池的時候,創建隊列用Manager才能使用
  queue = Manager().Queue()  

  start = 0
  num = math.ceil(len(data) / process_num)  # 數據長度 / 子進程數量 向上取整,將數據分成份,分給給每個進程

  for i in range(1, num + 1):
    end = i*num
    # 創建子進程,並傳入數據和隊列
    pool.apply_async(create_task, args=(data[start:end], queue))
    start = end
  
  # 等所有進程執行完之后關閉進程
  pool.close()
  pool.join()

  # 獲取執行結果
  res = list()
  while not queue.empty():
    # 從隊列中獲取數據
    res.append(queue.get())
  
  return res

if __name__ == '__main__':
  bg = datetime.now()
  data = list(range(10))
  res = handle(data)
  print('res', res)
  print('spend time: ', datetime.now()-bg)

django中使用多進程時,如果出現 

django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.

解決方案

  在使用到多進程的文件的最上邊添加以下代碼:

import django
django.setup()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM