Python3的multiprocessing多進程-示例


Python3的multiprocessing多進程-示例

一、概述

由於GIL的存在,python中的多線程其實並不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。Python提供了非常好用的多進程包multiprocessing,只需要定義一個函數,Python會完成其他所有事情。借助這個包,可以輕松完成從單進程到並發執行的轉換。multiprocessing支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。

multiprocessing包是Python中的多進程管理包。與threading.Thread類似,它可以利用multiprocessing.Process對象來創建一個進程。該進程可以運行在Python程序內部編寫的函數。該Process對象與Thread對象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類 (這些對象可以像多線程那樣,通過參數傳遞給各個進程),用以同步進程,其用法與threading包中的同名類一致。所以,multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情境。

但在使用這些共享API的時候,我們要注意以下幾點:

multiprocessing提供了threading包中沒有的IPC(比如Pipe和Queue),效率上更高。應優先考慮Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因為它們占據的不是用戶進程的資源)。
多進程應該避免共享資源。在多線程中,我們可以比較容易地共享資源,比如使用全局變量或者傳遞參數。在多進程情況下,由於每個進程有自己獨立的內存空間,以上方法並不合適。此時我們可以通過共享內存和Manager的方法來共享資源。但這樣做提高了程序的復雜度,並因為同步的需要而降低了程序的效率。

Process.PID中保存有PID,如果進程還沒有start(),則PID為None。

window系統下,需要注意的是要想啟動一個子進程,必須加上那句if name == “main”,進程相關的要寫在這句下面。

二、簡單創建多進程:有兩種使用方法

1、直接傳入要運行的方法:

from multiprocessing import Process

def foo(i):
    print ('say hi', i)

if __name__ == '__main__':
    for i in range(10):
        p = Process(target=foo, args=(i,))
        p.start()
------------------------------------------
say hi 0
say hi 1
say hi 2
say hi 3
say hi 4
say hi 5
say hi 6
say hi 7
say hi 8
say hi 9

2、Process繼承並覆蓋run()

from multiprocessing import Process
import time


class MyProcess(Process):
def __init__(self, arg):
super(MyProcess, self).__init__()
self.arg = arg

def run(self):
print('say hi', self.arg)
time.sleep(1)


if __name__ == '__main__':

for i in range(10):
p = MyProcess(i)
p.start()

三、Process類

1、構造方法:

Process([group [, target [, name [, args [, kwargs]]]]])
  • group: 線程組,目前還沒有實現,庫引用中提示必須是None;
  • target: 要執行的方法;
  • name: 進程名;
  • args/kwargs: 要傳入方法的參數。

2、實例方法:

  • is_alive():返回進程是否在運行。
  • join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。
  • start():進程准備就緒,等待CPU調度
  • run():strat()調用run方法,如果實例進程時未制定傳入target,這star執行t默認run()方法。
  • terminate():不管任務是否完成,立即停止工作進程

3、屬性:

  • authkey
  • daemon:和線程的setDeamon功能一樣
  • exitcode(進程在運行時為None、如果為–N,表示被信號N結束)
  • name:進程名字。
  • pid:進程號。

例子一:

from multiprocessing import Process

def foo(i):
    print('say hi', i)

if __name__ == '__main__':
    for i in range(10):
        p = Process(target=foo, args=(i,))
        p.start()
----------------------------------------------
say hi 0
say hi 1
say hi 2
say hi 3
say hi 4
say hi 5
say hi 6
say hi 7
say hi 8
say hi 9

例子二:

from multiprocessing import Process
import time
def foo(i):
    time.sleep(1)
    print('say hi', i)
    time.sleep(1)



if __name__ == '__main__':
    p_list=[]
    for i in range(10):
        p = Process(target=foo, args=(i,))
        p.daemon=True
        p_list.append(p)

    for p in p_list:
        p.start()
    for p in p_list:
        p.join()

    print('main process end')
------------------------------------------------
say hi 0
say hi 1
say hi 2
say hi 3
say hi 4
say hi 5
say hi 6
say hi 7
say hi 8
say hi 9
main process end

可以看出join方法和deamon屬性的用法和多線程的基本一致。

四、Pool類

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那么程序就會等待,直到進程池中有可用進程為止。進程池設置最好等於CPU核心數量

1、構造方法:

Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
  • processes :使用的工作進程的數量,如果processes是None那么使用 os.cpu_count()返回的數量。
  • initializer:如果initializer是None,那么每一個工作進程在開始的時候會調用initializer(*initargs)。
  • maxtasksperchild:工作進程退出之前可以完成的任務數,完成后用一個新的工作進程來替代原進程,來讓閑置的資源被釋放。maxtasksperchild默認是None,意味着只要Pool存在工作進程就會一直存活。
  • context: 用在制定工作進程啟動時的上下文,一般使用 multiprocessing.Pool()或者一個context對象的Pool()方法來創建一個池,兩種方法都適當的設置了context

2、實例方法:

  • apply(func[, args[, kwds]]):同步進程池
  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]):異步進程池
  • close() : 關閉進程池,阻止更多的任務提交到pool,待任務完成后,工作進程會退出。
  • terminate() : 結束工作進程,不在處理未完成的任務
  • join() : wait工作線程的退出,在調用join()前,必須調用close() or terminate()。這樣是因為被終止的進程需要被父進程調用wait(join等價與wait),否則進程會成為僵屍進程。

例子一(異步進程池):
pool.join()必須使用在pool.close()或者pool.terminate()之后。其中close()跟terminate()的區別在於close()會等待池中的worker進程執行結束再關閉pool,而terminate()則是直接關閉。

from  multiprocessing import Pool
import time


def Foo(i):
    time.sleep(2)
    return i + 100

def Bar(arg):
    print(arg)

if __name__ == '__main__':
    t_start=time.time()
    pool = Pool(5)

    for i in range(10):
        pool.apply_async(func=Foo, args=(i,), callback=Bar)#維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去

    pool.close()
    pool.join()  # 進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。
    pool.terminate()
    t_end=time.time()
    t=t_end-t_start
    print('the program time is :%s' %t)

例子二(同步進程池):

from  multiprocessing import Process, Pool
import time


def Foo(i):
    time.sleep(1)
    print(i + 100)


if __name__ == '__main__':
    t_start=time.time()
    pool = Pool(5)

    for i in range(10):
        pool.apply(Foo, (i,))

    pool.close()
    pool.join()  # 進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。
    t_end=time.time()
    t=t_end-t_start
    print('the program time is :%s' %t)

可以看出進程同步順序執行了,效率降低

例子三:異步進程池使用get()方法獲得進程執行結果值(錯誤使用get()方法獲取結果)

from  multiprocessing import Process, Pool
import time

def Foo(i):
    time.sleep(1)
    return i+100

def Bar(arg):
    return arg

if __name__ == '__main__':
    t_start=time.time()
    pool = Pool(5)

    for i in range(10):
        res = pool.apply_async(func=Foo, args=(i,), callback=Bar)#維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去
        print(res.get())

    pool.close()
    pool.join()  # 進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。
    pool.terminate()
    t_end=time.time()
    t=t_end-t_start
    print('the program time is :%s' %t)
----------------------------------------------------------
100
101
102
103
104
105
106
107
108
109
the program time is :10.273606538772583

可以看出由於每個進程的get()方法,程序變成同步執行了

例子四(正確使用get()方法獲取結果)

from  multiprocessing import Pool
import time


def Foo(i):
    time.sleep(2)
    return i + 100

def Bar(arg):
    return arg

if __name__ == '__main__':
    res_list=[]
    t_start=time.time()
    pool = Pool(5)

    for i in range(10):
        res = pool.apply_async(func=Foo, args=(i,), callback=Bar)
        res_list.append(res)

    pool.close()
    pool.join()
    for res in res_list:
        print(res.get())
    t_end=time.time()
    t=t_end-t_start
    print('the program time is :%s' %t)
---------------------------------------------------------------
100
101
102
103
104
105
106
107
108
109
the program time is :4.311059236526489

不能在每個進程中執行獲取結果值得方式,首先將結果值存在列表里面,對列表循環再取里面的值實現異步獲取。

五、進程數據共享

進程各自持有一份數據,默認無法共享數據;

from multiprocessing import Process
li = []

def foo(i):
    li.append(i)
    print('say hi', li)
if __name__ == '__main__':

    for i in range(10):
        p = Process(target=foo, args=(i,))
        p.start()

    print('ending', li)

方法一(使用Array):

Array(‘i’, range(10))中的‘i’參數C語言中的類型:

‘c’: ctypes.c_char     ‘u’: ctypes.c_wchar    ‘b’: ctypes.c_byte     ‘B’: ctypes.c_ubyte
‘h’: ctypes.c_short     ‘H’: ctypes.c_ushort    ‘i’: ctypes.c_int      ‘I’: ctypes.c_uint
‘l’: ctypes.c_long,    ‘L’: ctypes.c_ulong    ‘f’: ctypes.c_float    ‘d’: ctypes.c_double
from multiprocessing import Process, Array

def f(a):
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    arr = Array('i', range(10))
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    print(arr[:])
--------------------------------------------------
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

方法二(使用Manager):
Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array類型的支持。

from multiprocessing import Process, Manager


def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()


if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)
-------------------------------------------------------------
{1: '1', '2': 2, 0.25: None}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM