一 多進程編程
Python實現多進程的方式有兩種:一種方法是os模塊中的fork方法,另一種是使用multiprocessing模塊。
前者僅適用於LINUX/UNIX操作系統,對Windows不支持,后者則是跨平台的實現方式。
第一種方式:使用os模塊中的fork方式實現多進程
import os if __name__ == '__main__': print 'current Process (%s) start ...'%(os.getpid()) pid = os.fork() if pid < 0: print 'error in fork' elif pid == 0: print 'I am child process(%s) and my parent process is (%s)',(os.getpid(),os.getppid()) else: print 'I(%s) created a chlid process (%s).',(os.getpid(),pid)
第二種方式:multiprocessing
由於GIL的存在,python中的多線程其實並不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。Python提供了非常好用的多進程包multiprocessing,只需要定義一個函數,Python會完成其他所有事情。借助這個包,可以輕松完成從單進程到並發執行的轉換。multiprocessing支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。
- 在UNIX平台上,當某個進程終結之后,該進程需要被其父進程調用wait,否則進程成為僵屍進程(Zombie)。所以,有必要對每個Process對象調用join()方法 (實際上等同於wait)。對於多線程來說,由於只有一個進程,所以不存在此必要性。
- multiprocessing提供了threading包中沒有的IPC(比如Pipe和Queue),效率上更高。應優先考慮Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因為它們占據的不是用戶進程的資源)。
- 多進程應該避免共享資源。在多線程中,我們可以比較容易地共享資源,比如使用全局變量或者傳遞參數。在多進程情況下,由於每個進程有自己獨立的內存空間,以上方法並不合適。此時我們可以通過共享內存和Manager的方法來共享資源。但這樣做提高了程序的復雜度,並因為同步的需要而降低了程序的效率。
Process.PID中保存有PID,如果進程還沒有start(),則PID為None。
window系統下,需要注意的是要想啟動一個子進程,必須加上那句if __name__ == "main",進程相關的要寫在這句下面。
創建進程的類:Process([group [, target [, name [, args [, kwargs]]]]]),target表示調用對象,args表示調用對象的位置參數元組。kwargs表示調用對象的字典。name為別名。group實質上不使用。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()啟動某個進程。join()方法實現進程間的同步。
#__author: greg #date: 2017/9/19 23:52 from multiprocessing import Process import time def f(name): time.sleep(1) print('hello', name,time.ctime()) if __name__ == '__main__': p_list=[] for i in range(3): p = Process(target=f, args=('alvin',)) p_list.append(p) p.start() for i in p_list: i.join() print('end')#一個主進程,三個子進程 # output: # hello alvin Fri Nov 24 19:10:08 2017 # hello alvin Fri Nov 24 19:10:08 2017 # hello alvin Fri Nov 24 19:10:08 2017 # end
類式調用:
#__author: greg #date: 2017/9/21 20:02 from multiprocessing import Process import time class MyProcess(Process): def __init__(self): super(MyProcess, self).__init__() #self.name = name def run(self): time.sleep(1) print ('hello', self.name,time.ctime())
if __name__ == '__main__': p_list=[] for i in range(3): p = MyProcess() p.start() p_list.append(p) for p in p_list: p.join() print('end') #output: # hello MyProcess-1 Fri Nov 24 19:12:17 2017 # hello MyProcess-2 Fri Nov 24 19:12:17 2017 # hello MyProcess-3 Fri Nov 24 19:12:17 2017 # end
顯示進程ID號:
#__author: greg #date: 2017/9/21 20:16 from multiprocessing import Process import os import time def info(title): print(title) print('module name:', __name__) print('parent process:', os.getppid())#父進程號 print('process id:', os.getpid())#進程號 def f(name): info('\033[31;1mfunction f\033[0m') print('hello', name) if __name__ == '__main__': info('\033[32;1mmain process line\033[0m') time.sleep(10) p = Process(target=info, args=('bob',)) p.start() p.join() #output: # main process line # module name: __main__ # parent process: 1548 pycharm的進程號 # process id: 8416 Python進程號 # bob # module name: __mp_main__ # parent process: 8416 Python進程號 # process id: 5556 info進程號
二 Process類
構造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 線程組,目前還沒有實現,庫引用中提示必須是None;
target: 要執行的方法;
name: 進程名;
args/kwargs: 要傳入方法的參數。
實例方法:
is_alive():返回進程是否在運行。
join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。
start():進程准備就緒,等待CPU調度
run():strat()調用run方法,如果實例進程時未制定傳入target,這star執行t默認run()方法。
terminate():不管任務是否完成,立即停止工作進程
屬性:
authkey
daemon:和線程的setDeamon功能一樣
exitcode(進程在運行時為None、如果為–N,表示被信號N結束)
name:進程名字。
pid:進程號。
三 進程間通訊
不同進程間內存是不共享的,要想實現兩個進程間的數據交換,可以用以下方法:
Queues 用來在多個進程間通信:
1. 阻塞模式
import queue import time q = queue.Queue(10) #創建一個隊列 start=time.time() for i in range(10): q.put('A') time.sleep(0.5) end=time.time() print(end-start)
這是一段極其簡單的代碼(另有兩個線程也在操作隊列q),我期望每隔0.5秒寫一個'A'到隊列中,但總是不能如願:
間隔時間有時會遠遠超過0.5秒。
原來,Queue.put()默認有 block = True 和 timeout兩個參數。
源碼:def put(self, item, block=True, timeout=None):
當 block = True 時,寫入是阻塞式的,阻塞時間由 timeout確定。
當隊列q被(其他線程)寫滿后,這段代碼就會阻塞,直至其他線程取走數據。
Queue.put()方法加上 block=False 的參數,即可解決這個隱蔽的問題。
但要注意,非阻塞方式寫隊列,當隊列滿時會拋出 exception Queue.Full 的異常。
#__author: greg #date: 2017/9/21 22:27 from multiprocessing import Process, Queue def f(q,n): q.put([42, n, 'hello']) print('subprocess id',id(q)) if __name__ == '__main__': q = Queue() p_list=[] print('process id',id(q)) for i in range(3): p = Process(target=f, args=(q,i)) p_list.append(p) p.start() print(q.get()) print(q.get()) print(q.get()) for i in p_list: i.join() # output # process id 2284856854176 # subprocess id 2607348001872 # [42, 0, 'hello'] # subprocess id 1712786975824 # [42, 2, 'hello'] # subprocess id 2254764977120 # [42, 1, 'hello']
Pipe常用來兩個進程間進行通信,兩個進程分別位於管道的兩端
def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()
Pipe方法返回(conn1, conn2)代表一個管道的兩個端。Pipe方法有duplex參數,如果duplex參數為True(默認值),那么這個管道是全雙工模式,也就是說conn1和conn2均可收發。duplex為False,conn1只負責接受消息,conn2只負責發送消息。
#__author: greg #date: 2017/9/21 22:57 import multiprocessing import random import time,os def proc_send(pipe,urls): for url in urls: print("Process(%s) send: %s" %(os.getpid(),url)) pipe.send(url) time.sleep(random.random()) def proc_recv(pipe): while True: print("Process(%s) rev:%s" %(os.getpid(),pipe.recv())) time.sleep(random.random()) if __name__=="__main__": pipe=multiprocessing.Pipe() p1=multiprocessing.Process(target=proc_send,args=(pipe[0],['url_'+str(i) for i in range(10)])) p2=multiprocessing.Process(target=proc_recv,args=(pipe[1],)) p1.start() p2.start() p1.join() p2.terminate()
Manager()返回的管理器對象控制一個服務器進程,該進程持有Python對象,並允許其他進程使用代理來操縱它們。
#__author: greg #date: 2017/9/21 23:10 from multiprocessing import Process, Manager def f(d, l,n): d[n] = '1' d['2'] = 2 d[0.25] = None l.append(n) # print(l) if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(5)) p_list = [] for i in range(10): p = Process(target=f, args=(d, l,i)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)
四 進程同步
當多個進程需要訪問共享資源的時候,Lock可以用來避免訪問的沖突。
#__author: greg #date: 2017/9/21 23:25 from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
五 進程池 Pool類
Pool可以提供指定數量的進程供用戶使用,默認大小是CPU的核數。當有新的請求提交到Pool中時,如果池還沒有滿,那么就會創建一個新的進程來執行該請求
但如果池中的進程數已經達到規定的最大值,那么該請求就會等待,直到池中有進程結束,才會創建新的進程來處理它。
# -*- coding: utf-8 -*- # 2017/11/24 20:15 from multiprocessing import Pool import os, time, random def run_task(name): print('Task %s (pid = %s) is running...' % (name, os.getpid())) time.sleep(random.random() * 3) print('Task %s end.' % name) if __name__=='__main__': print('Current process %s.' % os.getpid()) p = Pool(processes=3) for i in range(5): p.apply_async(run_task, args=(i,)) print('Waiting for all subprocesses done...') p.close() p.join() print('All subprocesses done.') """ Current process 9788. Waiting for all subprocesses done... Task 0 (pid = 5916) is running... Task 1 (pid = 3740) is running... Task 2 (pid = 6964) is running... Task 2 end. Task 3 (pid = 6964) is running... Task 1 end. Task 4 (pid = 3740) is running... Task 0 end. Task 3 end. Task 4 end. All subprocesses done. """
- apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的
- close() 關閉pool,使其不在接受新的任務。
- terminate() 結束工作進程,不在處理未完成的任務。
- join() 主進程阻塞,等待子進程的退出, join方法要在close或terminate之后使用。
每次最多運行3個進程,當一個任務結束了,新的任務依次添加進來,任務執行使用的進程依然是原來的進程,這一點通過進程的pid可以看出來。