process類介紹
Process 類用來描述一個進程對象。創建子進程的時候,只需要傳入一個執行函數和函數的參數即可完成 Process 示例的創建。
python中的多線程無法利用多核優勢,如果想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分情況需要使用多進程。Python提供了multiprocessing。
multiprocessing模塊用來開啟子進程,並在子進程中執行我們定制的任務(比如函數),該模塊與多線程模塊threading的編程接口類似。
- star() 方法啟動進程。
join() 方法實現進程間的同步,等待所有進程退出。- close() 用來阻止多余的進程涌入進程池 Pool 造成進程阻塞。
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
- target 是函數名字,需要調用的函數
- args 函數需要的參數,
以 tuple 的形式傳入
創建子進程方式一:
rom multiprocessing import Process
import time
def f(name):
time.sleep(2)
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
創建子進程方式二:
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self,name):
super().__init__()
self.name=name
def run(self):
print('task <%s> is runing' % self.name)
time.sleep(2)
print('task <%s> is done' % self.name)
if __name__ == '__main__':
p=MyProcess('egon')
p.start()
print('主')
注意:run方法是必須去重寫的。
查看進程父子進程的進程號,示例:
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
print("\n\n")
def f(name):
info('\033[31;1mfunction f\033[0m')
print('hello', name)
if __name__ == '__main__':
info('\033[32;1mmain process line\033[0m')
p = Process(target=f, args=('bob',))
p.start()
p.join()
進程間通信
- 先要聲明一點,這里所說的進程間通信指的是具有父子關系的進程間通信機制,如果兩個進程間沒有任何關系,這里的機制是無法實現的。
Queues
使用方法跟threading里的queue差不多
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()
Pipes
常用來在兩個進程間通信,兩個進程分別位於管道的兩端。
multiprocessing.Pipe([duplex])
示例一:
rom multiprocessing import Process, Pipe
def send(pipe):
pipe.send(['spam'] + [42, 'egg']) # send 傳輸一個列表
pipe.close()
if __name__ == '__main__':
(con1, con2) = Pipe() # 創建兩個 Pipe 實例
sender = Process(target=send, args=(con1, )) # 函數的參數,args 一定是實例化之后的 Pip 變量,不能直接寫 args=(Pip(),)
sender.start() # Process 類啟動進程
print("con2 got: %s" % con2.recv()) # 管道的另一端 con2 從send收到消息
con2.close() # 關閉管道
結果:
con2 got: ['spam', 42, 'egg']
示例二:
from multiprocessing import Process, Pipe
def talk(pipe):
pipe.send(dict(name='Bob', spam=42)) # 傳輸一個字典
reply = pipe.recv() # 接收傳輸的數據
print('talker got:', reply)
if __name__ == '__main__':
(parentEnd, childEnd) = Pipe() # 創建兩個 Pipe() 實例,也可以改成 conf1, conf2
child = Process(target=talk, args=(childEnd,)) # 創建一個 Process 進程,名稱為 child
child.start() # 啟動進程
print('parent got:', parentEnd.recv()) # parentEnd 是一個 Pip() 管道,可以接收 child Process 進程傳輸的數據
parentEnd.send({x * 2 for x in 'spam'}) # parentEnd 是一個 Pip() 管道,可以使用 send 方法來傳輸數據
child.join() # 傳輸的數據被 talk 函數內的 pip 管道接收,並賦值給 reply
print('parent exit')
結果:
parent got: {'name': 'Bob', 'spam': 42}
talker got: {'ss', 'aa', 'pp', 'mm'}
parent exit
Managers
A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example:
from multiprocessing import Process, Manager
def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.append(1)
print(l)
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(5))
p_list = []
for i in range(10):
p = Process(target=f, args=(d, l))
p.start()
p_list.append(p)
for res in p_list:
res.join()
print(d)
print(l)
進程池
在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多台主機,並行操作可以節約大量的時間。當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,10幾個還好,但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,這時候進程池Pool發揮作用的時候就到了。
Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,才會創建新的進程來它。這里有一個簡單的例子:
#!/usr/bin/env python
# _*_ coding utf-8 _*_
#Author: aaron
from multiprocessing import Process, Pool
import time, os
def Foo(i):
time.sleep(5)
print('in process[Foo]', os.getpid())
return i + 100
def Bar(arg): # 父進程去執行,而不是子進程調用
print('-->exec done:', arg)
print('in process[Bar]', os.getpid())
if __name__ == '__main__':
pool = Pool(5) # 允許進程池里同時放入5個進程 其他多余的進程處於掛起狀態
for i in range(10):
pool.apply_async(func=Foo, args=(i,), callback=Bar)
# pool.apply(func=Foo, args=(i,))
print('end:', os.getpid())
pool.close() # close() 必須在join()前被調用
pool.join() # 進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。
- pool.apply_async()用來向進程池提交目標請求。
- pool.join()是用來等待進程池中的worker進程執行完畢,防止主進程在worker進程結束前結束。但pool.join()必須使用在pool.close()或者pool.terminate()之后。
- close()跟terminate()的區別在於close()會等待池中的worker進程執行結束再關閉pool,而terminate()則是直接關閉。
- result.successful()表示整個調用執行的狀態,如果還有worker沒有執行完,則會拋出AssertionError異常。
- 利用multiprocessing下的Pool可以很方便的同時自動處理幾百或者上千個並行操作,腳本的復雜性也大大降低.
