python之multiprocessing多進程


multiprocessing 充分利用cpu多核
一般情況下cpu密集使用進程池,IO密集使用線程池。python下想要充分利用多核CPU,就用多進程。

Process 類
Process 類用來描述一個進程對象。創建子進程的時候,只需要傳入一個執行函數和函數的參數即可完成 Process 示例的創建。
star() 方法啟動進程,
join() 方法實現進程間的同步,等待所有進程退出。
close() 用來阻止多余的進程涌入進程池 Pool 造成進程阻塞。


multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
group: 線程組,目前還沒有實現,庫引用中提示必須是None;
target 是函數名字,需要調用的函數
args 函數需要的參數,以 tuple 的形式傳入


實例方法:
  is_alive():返回進程是否在運行。
  join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。
  start():進程准備就緒,等待CPU調度
  run():strat()調用run方法,如果實例進程時未制定傳入target,這star執行t默認run()方法。
  terminate():不管任務是否完成,立即停止工作進程
屬性:
  authkey
  daemon:和線程的setDeamon功能一樣
  exitcode(進程在運行時為None、如果為–N,表示被信號N結束)
  name:進程名字。
  pid:進程號。

列子一:

import multiprocessing
import os

def run_proc(name):
    print('Child process {0} {1} Running '.format(name, os.getpid()))

if __name__ == '__main__':
    print('Parent process {0} is Running'.format(os.getpid()))
    for i in range(5):
        p = multiprocessing.Process(target=run_proc, args=(str(i),))
        print('process start')
        p.start()
    p.join()
    print('Process close')
[python@master test]$ python3 a.py 
Parent process 12665 is Running
process start
process start
process start
Child process 0 12666 Running 
process start
process start
Child process 2 12668 Running 
Child process 1 12667 Running 
Child process 3 12669 Running 
Child process 4 12670 Running 
Process close

列子二:

#coding=utf-8

import multiprocessing

def do(n) :
 name = multiprocessing.current_process().name
 print(name,'starting')
 print ("worker ", n)

if __name__ == '__main__' :
 for i in range(5) :
    p = multiprocessing.Process(target=do, args=(i,))
    p.start()
    p.join()
    print ("Process end.")
[python@master test]$ python3 b.py 
Process-1 starting
worker  0
Process end.
Process-2 starting
worker  1
Process end.
Process-3 starting
worker  2
Process end.
Process-4 starting
worker  3
Process end.
Process-5 starting
worker  4
Process end.

Pool類

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那么程序就會等待,直到進程池中有可用進程為止。進程池設置最好等於CPU核心數量
構造方法:

Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
processes :使用的工作進程的數量,如果processes是None那么使用 os.cpu_count()返回的數量。
initializer: 如果initializer是None,那么每一個工作進程在開始的時候會調用initializer(*initargs)。
maxtasksperchild:工作進程退出之前可以完成的任務數,完成后用一個新的工作進程來替代原進程,來讓閑置的資源被釋放。maxtasksperchild默認是None,意味着只要Pool存在工作進程就會一直存活。
context: 用在制定工作進程啟動時的上下文,一般使用 multiprocessing.Pool() 或者一個context對象的Pool()方法來創建一個池,兩種方法都適當的設置了context

實例方法:
  apply(func[, args[, kwds]]):同步進程池
  apply_async(func[, args[, kwds[, callback[, error_callback]]]]) :異步進程池
  close() : 關閉進程池,阻止更多的任務提交到pool,待任務完成后,工作進程會退出。
  terminate() : 結束工作進程,不在處理未完成的任務
  join() : wait工作線程的退出,在調用join()前,必須調用close() or terminate()。這樣是因為被終止的進程需要被父進程調用wait(join等價與wait),否則進程會成為僵屍進程。

pool.join()必須使用在pool.close()或者pool.terminate()之后。其中close()跟terminate()的區別在於:
close()會等待池中的worker進程執行結束再關閉pool,而terminate()則是直接關閉。
異步進程池

#每次循環將會用空閑出來的子進程去調用目錄--異步
#不等待只要進程池的位置空出來就立刻補上

# coding:utf-8
from  multiprocessing import Pool
import time

def Foo(i):
    time.sleep(2)
    return i + 100
def Bar(arg):
    print("callback"+str(arg))
if __name__ == '__main__':
    t_start=time.time()
    pool = Pool(5)
    for i in range(10):
        pool.apply_async(func=Foo, args=(i,), callback=Bar)#維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去
    pool.close()
    pool.join()  # 進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。
    pool.terminate()
    t_end=time.time()
    t=t_end-t_start
    print ('the program time is :%s' %t)
[python@master test]$ python3 c.py 
callback100
callback103
callback101
callback102
callback104
callback107
callback106
callback109
callback105
callback108
the program time is :4.0822553634643555

同步進程池

#阻塞式的請求 自加阻塞  順序結構
#必須要在進程池中沒有進程的的時候 才會有新進程進入進程池

# -*- coding:utf-8 -*-
from  multiprocessing import Process, Pool
import time

def Foo(i):
    time.sleep(1)
    print (i + 100)
if __name__ == '__main__':
    t_start=time.time()
    pool = Pool(5)   #定義一個進程池,最大的進程數量
    for i in range(10):
        pool.apply(Foo, (i,))
    pool.close()
    pool.join()  # 進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。
    t_end=time.time()
    t=t_end-t_start
    print('the program time is :%s' %t)
[python@master test]$ python3 d.py 
100
101
102
103
104
105
106
107
108
109
the program time is :10.224181175231934

正確使用get()方法獲取結果

# coding:utf-8
from  multiprocessing import Pool
import time

def Foo(i):
    time.sleep(2)
    return i + 100

def Bar(arg):
    print('callback'+str(arg))

if __name__ == '__main__':
    res_list=[]
    t_start=time.time()
    pool = Pool(5)

    for i in range(10):
       res = pool.apply_async(func=Foo, args=(i,), callback=Bar)#維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去
       res_list.append(res)
    pool.close()
    pool.join()  # 進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。
    for res in res_list:
        print(res.get())
    pool.terminate()
    t_end=time.time()
    t=t_end-t_start
    print ('the program time is :%s' %t)
[python@master test]$ python3 e.py 
callback101
callback100
callback102
callback104
callback103
callback105
callback109
callback106
callback107
callback108
100
101
102
103
104
105
106
107
108
109
the program time is :4.145965099334717

進程數據共享

進程各自持有一份數據,默認無法共享數據

# coding:utf-8
from multiprocessing import Process
li = []
def foo(i):
    li.append(i)
    print ('say hi', li)
if __name__ == '__main__':

    for i in range(10):
        p = Process(target=foo, args=(i,))
        p.start()

    print ('ending', li)
[python@master test]$ python3 a.py 
say hi [0]
say hi [2]
say hi [3]
say hi [5]
say hi [1]
say hi [6]
ending []
say hi [7]
say hi [4]
say hi [9]
say hi [8]

方法一(使用Array):

from multiprocessing import Process, Array

def f(a):
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    arr = Array('i', range(10))
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    print(arr[:])
[python@master test]$ python3 b.py 
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

方法二(使用Manager):

Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array類型的支持。

from multiprocessing import Process, Manager


def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()


if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)
[python@master test]$ python3 c.py 
{1: '1', '2': 2, 0.25: None}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

 使用多個進程池:

#coding: utf-8
import multiprocessing
import os, time, random

def Lee():
    print ("\nRun task Lee-%s" %(os.getpid())) #os.getpid()獲取當前的進程的ID
    start = time.time()
    time.sleep(random.random() * 10) #random.random()隨機生成0-1之間的小數
    end = time.time()
    print ('Task Lee, runs %0.2f seconds.' %(end - start))

def Marlon():
    print ("\nRun task Marlon-%s" %(os.getpid()))
    start = time.time()
    time.sleep(random.random() * 40)
    end=time.time()
    print ('Task Marlon runs %0.2f seconds.' %(end - start))

def Allen():
    print ("\nRun task Allen-%s" %(os.getpid()))
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print ('Task Allen runs %0.2f seconds.' %(end - start))

def Frank():
    print ("\nRun task Frank-%s" %(os.getpid()))
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print ('Task Frank runs %0.2f seconds.' %(end - start))
        
if __name__=='__main__':
    function_list=  [Lee, Marlon, Allen, Frank] 
    print ("parent process %s" %(os.getpid()))

    pool=multiprocessing.Pool(4)
    for func in function_list:
        pool.apply_async(func)     #Pool執行函數,apply執行函數,當有一個進程執行完畢后,會添加一個新的進程到pool中

    print ('Waiting for all subprocesses done...')
    pool.close()
    pool.join()    #調用join之前,一定要先調用close() 函數,否則會出錯, close()執行后不會有新的進程加入到pool,join函數等待素有子進程結束
    print ('All subprocesses done.')
    
[python@master test]$ python3 e.py 
parent process 20714
Waiting for all subprocesses done...

Run task Lee-20715

Run task Marlon-20716

Run task Allen-20718

Run task Frank-20717
Task Lee, runs 3.18 seconds.
Task Frank runs 11.47 seconds.
Task Allen runs 23.24 seconds.
Task Marlon runs 38.09 seconds.
All subprocesses done.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM