python使用多進程


python多線程適合IO密集型場景,而在CPU密集型場景,並不能充分利用多核CPU,而協程本質基於線程,同樣不能充分發揮多核的優勢。

針對計算密集型場景需要使用多進程,python的multiprocessing與threading模塊非常相似,支持用進程池的方式批量創建子進程。

  • 創建單個Process進程(使用func)

只需要實例化Process類,傳遞函數給target參數,這點和threading模塊非常的類似,args為函數的參數

import os
from multiprocessing import Process


# 子進程要執行的代碼
def task(name):
    print('run child process %s (%s)...' % (name, os.getpid()))


if __name__ == '__main__':
    print('parent process %s.' % os.getpid())
    p = Process(target=task, args=('test',))
    p.start()
    p.join()
    print('process end.')
  • 創建單個Process進程(使用class)

繼承Process類,重寫run方法創建進程,這點和threading模塊基本一樣

import multiprocessing
import os
from multiprocessing import current_process


class Worker(multiprocessing.Process):
    def run(self):
        name = current_process().name  # 獲取當前進程的名稱
        print('run child process <%s>  (%s)' % (name, os.getpid()))

        print('In %s' % self.name)
        return

if __name__ == '__main__':
    print('parent process %s.' % os.getpid())
    p = Worker()
    p.start()
    p.join()
    print('process end.')

  * 停止進程

terminate()結束子進程,但是會導致子進程的資源無法釋放掉,是不推薦的做法,因為結束的時候不清楚子線程的運行狀況,有很大可能性導致子線程在不恰當的時刻被結束。

import multiprocessing
import time

def worker():
    print('starting worker')
    time.sleep(0.1)
    print('finished worker')

if __name__ == '__main__':
    p = multiprocessing.Process(target=worker)
    print('執行前:', p.is_alive())
    p.start()
    print('執行中:', p.is_alive())
    p.terminate()  # 發送停止號
    print('停止:', p.is_alive())
    p.join()
    print('等待完成:', p.is_alive()) 
  • 直接創建多個Process進程

import multiprocessing

def worker(num):
    print(f'Worker:%s %s', num)
    return

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()
  • 使用進程池創建多個進程

在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多台主機,並行操作可以節約大量的時間。當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。
Pool可以提供指定數量的進程供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,才會創建新的進程來它。

import os
import random
import time
from multiprocessing import Pool
from time import ctime


def task(name):
    print('start task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)

    print('end task %s runs %0.2f seconds.' % (name, (time.time() - start)))


if __name__ == '__main__':
    print('parent process %s.' % os.getpid())

    p = Pool()  # 初始化進程池
    for i in range(5):
        p.apply_async(task, args=(i,))  # 追加任務 apply_async 是異步非阻塞的,就是不用等待當前進程執行完畢,隨時根據系統調度來進行進程切換。

    p.close()

    p.join()  # 等待所有結果執行完畢,會等待所有子進程執行完畢,調用join()之前必須先調用close()
    print(f'all done at: {ctime()}')

如果關心每個進程的執行結果,可以使用返回結果的get方法獲取,代碼如下

import os
import random
import time
from multiprocessing import Pool, current_process
from time import ctime


def task(name):
    print('start task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    print('end task %s runs %0.2f seconds.' % (name, (time.time() - start)))

    return current_process().name + 'done'


if __name__ == '__main__':
    print('parent process %s.' % os.getpid())

    result = []

    p = Pool()  # 初始化進程池
    for i in range(5):
        result.append(p.apply_async(task, args=(i,)))  # 追加任務 apply_async 是異步非阻塞的,就是不用等待當前進程執行完畢,隨時根據系統調度來進行進程切換。

    p.close()

    p.join()  # 等待所有結果執行完畢

    for res in result:
        print(res.get())  # get()函數得出每個返回結果的值

    print(f'all done at: {ctime()}')

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM