python之多進程multiprocessing模塊


process類介紹

multiprocessing 模塊官方說明文檔

Process 類用來描述一個進程對象。創建子進程的時候,只需要傳入一個執行函數和函數的參數即可完成 Process 示例的創建。

python中的多線程無法利用多核優勢,如果想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分情況需要使用多進程。Python提供了multiprocessing。

multiprocessing模塊用來開啟子進程,並在子進程中執行我們定制的任務(比如函數),該模塊與多線程模塊threading的編程接口類似。

  • star() 方法啟動進程。
  • join() 方法實現進程間的同步,等待所有進程退出。
  • close() 用來阻止多余的進程涌入進程池 Pool 造成進程阻塞。
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
  • target 是函數名字,需要調用的函數
  • args 函數需要的參數,以 tuple 的形式傳入

創建子進程方式一:

rom multiprocessing import Process
import time
def f(name):
    time.sleep(2)
    print('hello', name)
 
if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

創建子進程方式二: 

from multiprocessing import Process
import time
class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name

    def run(self):
        print('task <%s> is runing' % self.name)
        time.sleep(2)
        print('task <%s> is done' % self.name)

if __name__ == '__main__':
    p=MyProcess('egon')
    p.start()

    print('主')

   注意:run方法是必須去重寫的。

 

查看進程父子進程的進程號,示例:

from multiprocessing import Process
import os
 
def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())
    print("\n\n")
 
def f(name):
    info('\033[31;1mfunction f\033[0m')
    print('hello', name)
 
if __name__ == '__main__':
    info('\033[32;1mmain process line\033[0m')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

 進程間通信

  • 先要聲明一點,這里所說的進程間通信指的是具有父子關系的進程間通信機制,如果兩個進程間沒有任何關系,這里的機制是無法實現的。

Queues

使用方法跟threading里的queue差不多

from multiprocessing import Process, Queue
 
def f(q):
    q.put([42, None, 'hello'])
 
if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

 Pipes

常用來在兩個進程間通信,兩個進程分別位於管道的兩端。

multiprocessing.Pipe([duplex])

 示例一:

rom multiprocessing import Process, Pipe

def send(pipe):
    pipe.send(['spam'] + [42, 'egg'])   # send 傳輸一個列表
    pipe.close()

if __name__ == '__main__':
    (con1, con2) = Pipe()                            # 創建兩個 Pipe 實例
    sender = Process(target=send, args=(con1, ))     # 函數的參數,args 一定是實例化之后的 Pip 變量,不能直接寫 args=(Pip(),)
    sender.start()                                   # Process 類啟動進程
    print("con2 got: %s" % con2.recv())              # 管道的另一端 con2 從send收到消息
    con2.close()                                     # 關閉管道

 結果:

con2 got: ['spam', 42, 'egg']

 示例二:

from multiprocessing import Process, Pipe

def talk(pipe):
    pipe.send(dict(name='Bob', spam=42))            # 傳輸一個字典
    reply = pipe.recv()                             # 接收傳輸的數據
    print('talker got:', reply)

if __name__ == '__main__':
    (parentEnd, childEnd) = Pipe()                  # 創建兩個 Pipe() 實例,也可以改成 conf1, conf2
    child = Process(target=talk, args=(childEnd,))  # 創建一個 Process 進程,名稱為 child
    child.start()                                   # 啟動進程
    print('parent got:', parentEnd.recv())          # parentEnd 是一個 Pip() 管道,可以接收 child Process 進程傳輸的數據
    parentEnd.send({x * 2 for x in 'spam'})         # parentEnd 是一個 Pip() 管道,可以使用 send 方法來傳輸數據
    child.join()                                    # 傳輸的數據被 talk 函數內的 pip 管道接收,並賦值給 reply
    print('parent exit')

   結果:

parent got: {'name': 'Bob', 'spam': 42}
talker got: {'ss', 'aa', 'pp', 'mm'}
parent exit

 Managers

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array. For example:

from multiprocessing import Process, Manager
 
def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.append(1)
    print(l)
 
if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
 
        l = manager.list(range(5))
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()
 
        print(d)
        print(l)

進程池

在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多台主機,並行操作可以節約大量的時間。當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,10幾個還好,但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,這時候進程池Pool發揮作用的時候就到了。

Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,才會創建新的進程來它。這里有一個簡單的例子:

#!/usr/bin/env python
# _*_ coding utf-8 _*_
#Author: aaron

from multiprocessing import Process, Pool
import time, os


def Foo(i):
    time.sleep(5)
    print('in process[Foo]', os.getpid())
    return i + 100


def Bar(arg):  # 父進程去執行,而不是子進程調用
    print('-->exec done:', arg)
    print('in process[Bar]', os.getpid())


if __name__ == '__main__':
    pool = Pool(5)  # 允許進程池里同時放入5個進程 其他多余的進程處於掛起狀態

    for i in range(10):
        pool.apply_async(func=Foo, args=(i,), callback=Bar)  
        # pool.apply(func=Foo, args=(i,))  

    print('end:', os.getpid())
    pool.close()  # close() 必須在join()前被調用
    pool.join()   # 進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。
  • pool.apply_async()用來向進程池提交目標請求。
  • pool.join()是用來等待進程池中的worker進程執行完畢,防止主進程在worker進程結束前結束。但pool.join()必須使用在pool.close()或者pool.terminate()之后。
  • close()terminate()的區別在於close()會等待池中的worker進程執行結束再關閉pool,而terminate()則是直接關閉。
  • result.successful()表示整個調用執行的狀態,如果還有worker沒有執行完,則會拋出AssertionError異常。    
  • 利用multiprocessing下的Pool可以很方便的同時自動處理幾百或者上千個並行操作,腳本的復雜性也大大降低.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM