process類介紹
Process 類用來描述一個進程對象。創建子進程的時候,只需要傳入一個執行函數和函數的參數即可完成 Process 示例的創建。
python中的多線程無法利用多核優勢,如果想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分情況需要使用多進程。Python提供了multiprocessing。
multiprocessing模塊用來開啟子進程,並在子進程中執行我們定制的任務(比如函數),該模塊與多線程模塊threading的編程接口類似。
- star() 方法啟動進程。
join() 方法實現進程間的同步,
等待所有進程退出。- close() 用來阻止多余的進程涌入進程池 Pool 造成進程阻塞。
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
- target 是函數名字,需要調用的函數
- args 函數需要的參數,
以 tuple 的形式傳入
創建子進程方式一:
rom multiprocessing import Process import time def f(name): time.sleep(2) print('hello', name) if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() p.join()
創建子進程方式二:
from multiprocessing import Process import time class MyProcess(Process): def __init__(self,name): super().__init__() self.name=name def run(self): print('task <%s> is runing' % self.name) time.sleep(2) print('task <%s> is done' % self.name) if __name__ == '__main__': p=MyProcess('egon') p.start() print('主')
注意:run方法是必須去重寫的。
查看進程父子進程的進程號,示例:
from multiprocessing import Process import os def info(title): print(title) print('module name:', __name__) print('parent process:', os.getppid()) print('process id:', os.getpid()) print("\n\n") def f(name): info('\033[31;1mfunction f\033[0m') print('hello', name) if __name__ == '__main__': info('\033[32;1mmain process line\033[0m') p = Process(target=f, args=('bob',)) p.start() p.join()
進程間通信
- 先要聲明一點,這里所說的進程間通信指的是具有父子關系的進程間通信機制,如果兩個進程間沒有任何關系,這里的機制是無法實現的。
Queues
使用方法跟threading里的queue差不多
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()
Pipes
常用來在兩個進程間通信,兩個進程分別位於管道的兩端。
multiprocessing.Pipe([duplex])
示例一:
rom multiprocessing import Process, Pipe def send(pipe): pipe.send(['spam'] + [42, 'egg']) # send 傳輸一個列表 pipe.close() if __name__ == '__main__': (con1, con2) = Pipe() # 創建兩個 Pipe 實例 sender = Process(target=send, args=(con1, )) # 函數的參數,args 一定是實例化之后的 Pip 變量,不能直接寫 args=(Pip(),) sender.start() # Process 類啟動進程 print("con2 got: %s" % con2.recv()) # 管道的另一端 con2 從send收到消息 con2.close() # 關閉管道
結果:
con2 got: ['spam', 42, 'egg']
示例二:
from multiprocessing import Process, Pipe def talk(pipe): pipe.send(dict(name='Bob', spam=42)) # 傳輸一個字典 reply = pipe.recv() # 接收傳輸的數據 print('talker got:', reply) if __name__ == '__main__': (parentEnd, childEnd) = Pipe() # 創建兩個 Pipe() 實例,也可以改成 conf1, conf2 child = Process(target=talk, args=(childEnd,)) # 創建一個 Process 進程,名稱為 child child.start() # 啟動進程 print('parent got:', parentEnd.recv()) # parentEnd 是一個 Pip() 管道,可以接收 child Process 進程傳輸的數據 parentEnd.send({x * 2 for x in 'spam'}) # parentEnd 是一個 Pip() 管道,可以使用 send 方法來傳輸數據 child.join() # 傳輸的數據被 talk 函數內的 pip 管道接收,並賦值給 reply print('parent exit')
結果:
parent got: {'name': 'Bob', 'spam': 42} talker got: {'ss', 'aa', 'pp', 'mm'} parent exit
Managers
A manager object returned by Manager()
controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager()
will support types list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Barrier
, Queue
, Value
and Array
. For example:
from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.append(1) print(l) if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(5)) p_list = [] for i in range(10): p = Process(target=f, args=(d, l)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)
進程池
在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多台主機,並行操作可以節約大量的時間。當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,10幾個還好,但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,這時候進程池Pool發揮作用的時候就到了。
Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,才會創建新的進程來它。這里有一個簡單的例子:
#!/usr/bin/env python # _*_ coding utf-8 _*_ #Author: aaron from multiprocessing import Process, Pool import time, os def Foo(i): time.sleep(5) print('in process[Foo]', os.getpid()) return i + 100 def Bar(arg): # 父進程去執行,而不是子進程調用 print('-->exec done:', arg) print('in process[Bar]', os.getpid()) if __name__ == '__main__': pool = Pool(5) # 允許進程池里同時放入5個進程 其他多余的進程處於掛起狀態 for i in range(10): pool.apply_async(func=Foo, args=(i,), callback=Bar) # pool.apply(func=Foo, args=(i,)) print('end:', os.getpid()) pool.close() # close() 必須在join()前被調用 pool.join() # 進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。
- pool.apply_async()用來向進程池提交目標請求。
- pool.join()是用來等待進程池中的worker進程執行完畢,防止主進程在worker進程結束前結束。但pool.join()必須使用在pool.close()或者pool.terminate()之后。
- close()跟terminate()的區別在於close()會等待池中的worker進程執行結束再關閉pool,而terminate()則是直接關閉。
- result.successful()表示整個調用執行的狀態,如果還有worker沒有執行完,則會拋出AssertionError異常。
- 利用multiprocessing下的Pool可以很方便的同時自動處理幾百或者上千個並行操作,腳本的復雜性也大大降低.