一 多進程multiprocessing
multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.
import multiprocessing,time
def run(name):
print("hello",name)
time.sleep(2)
if __name__ == '__main__':
for i in range(10):
p = multiprocessing.Process(target=run,args=('Bob %s'%i,))
p.start()
import multiprocessing,time,threading def thread_run(): print(threading.get_ident()) #線程號 def run(name): print("hello",name) t = threading.Thread(target=thread_run,) t.start() time.sleep(2) if __name__ == '__main__': for i in range(10): p = multiprocessing.Process(target=run,args=('Bob %s'%i,)) p.start()
# 在主進程里調用了info,在子進程了又調用了info,我們看看效果? # 可以看到,每一個進程都是由父進程啟動的。主程序的父進程是pyCharm,子進程的父進程是主進程。 from multiprocessing import Process import os def info(title): print(title) print('module name:', __name__) print('parent process:', os.getppid()) #得到父進程ID print('process id:', os.getpid()) #得到進程ID print("\n\n") def f(name): info('\033[31;1mfunction f\033[0m') print('hello', name) if __name__ == '__main__': info('\033[32;1mmain process line\033[0m') p = Process(target=f, args=('bob',)) p.start() p.join() #####輸出: ####ain process line ####odule name: __main__ ####arent process: 8268 ####rocess id: 4448 ####unction f ####odule name: __mp_main__ ####arent process: 4448 ####rocess id: 9596 ####ello bob
二 進程間通信
不同進程間內存是不共享的,要想實現兩個進程間的數據交換,可以用以下方法:
1.Queues
首先我們知道,線程之間是數據共享的,子線程放進queue數據,父線程可以取出來。如下示例
import threading,queue def f(): q.put([42,None,'hello']) if __name__ == '__main__': q = queue.Queue() t = threading.Thread(target=f) t.start() print(q.get()) ####輸出:[42, None, 'hello']
把線程改為進程,會發現報錯。
import multiprocessing,queue def f(): q.put([42,None,'hello']) if __name__ == '__main__': q = queue.Queue() p = multiprocessing.Process(target=f) p.start() print(q.get()) ####輸出報錯:NameError: name 'q' is not defined
報錯的原因是進程之間數據不共享。子進程和父進程分別擁有獨立的內存空間,所以子進程是訪問不了父進程的queue的。那有什么辦法可以使子進程訪問到父進程的queue呢?我們可以嘗試將這個queue當做變量傳給子進程。發現還是報錯。
import multiprocessing,queue def f(q): q.put([42,None,'hello']) if __name__ == '__main__': q = queue.Queue() p = multiprocessing.Process(target=f,args=(q,)) p.start() print(q.get()) ####輸出報錯:TypeError: can't pickle _thread.lock objects
報錯的原因是我們錯將線程queue(通過import queue引入)傳遞給了子進程,實際上我們傳遞給子進程的應該是進程queue(通過from multiprocessing import Queue引入)。接下來才是正確的示例:
from multiprocessing import Process,Queue #引入進程queue
def f(q):
q.put([42,None,'hello']) #子進程放入數據
if __name__ == '__main__':
q = Queue()
p = Process(target=f,args=(q,)) #將q傳遞給子進程
p.start()
print(q.get()) #主進程取出數據
####輸出:[42, None, 'hello']
上面的例子,我們把進程queue傳遞給了子進程,表面上看,子進程和父進程共用一個queue,實際上並不是這樣,而是子進程克隆了一個父進程的queue,子進程將數據放入克隆queue中,克隆queue將其序列化保存,然后進行反序列化后放到父進程的原始queue中,所以嚴格意義上子進程和父進程的queue並不是一個共享queue。
2.Pipes
要想實現兩個進程間的數據傳遞,除了Queues,還可以使用Pipes。
Pipe()返回的兩個連接對象代表管道的兩端。 每個連接對象都有send()和recv()方法(以及其他方法)。
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
print('from parent:',conn.recv())
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print('from son:',parent_conn.recv())
parent_conn.send('hello')
p.join()
3.Managers
Queues和Pipes僅能實現兩個進程之間的數據傳遞,而Managers可以實現進程之間數據的共享。
A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example,
from multiprocessing import Process, Manager
import os
def f(d,l):
d[os.getpid()] = os.getpid()
l.append(os.getpid())
print(l)
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict() #生成一個字典,可在多個進程間共享和傳遞
l = manager.list(range(5)) #生成一個列表,可在多個進程間共享和傳遞
p_list = []
for i in range(10):
p = Process(target=f,args=(d,l))
p.start()
p_list.append(p)
for res in p_list: #等待結果
res.join()
print(d)
[0, 1, 2, 3, 4, 8512] [0, 1, 2, 3, 4, 8512, 11060] [0, 1, 2, 3, 4, 8512, 11060, 4820] [0, 1, 2, 3, 4, 8512, 11060, 4820, 9496] [0, 1, 2, 3, 4, 8512, 11060, 4820, 9496, 4264] [0, 1, 2, 3, 4, 8512, 11060, 4820, 9496, 4264, 8420] [0, 1, 2, 3, 4, 8512, 11060, 4820, 9496, 4264, 8420, 9184] [0, 1, 2, 3, 4, 8512, 11060, 4820, 9496, 4264, 8420, 9184, 6592] [0, 1, 2, 3, 4, 8512, 11060, 4820, 9496, 4264, 8420, 9184, 6592, 9808] [0, 1, 2, 3, 4, 8512, 11060, 4820, 9496, 4264, 8420, 9184, 6592, 9808, 5064] {8512: 8512, 11060: 11060, 4820: 4820, 9496: 9496, 4264: 4264, 8420: 8420, 9184: 9184, 6592: 6592, 9808: 9808, 5064: 5064}
進程鎖
雖然進程之間是獨立運行的,但是對於各進程來說,終端屏幕是共享的,為了防止輸出結果時,各個進程爭搶輸出,造成打印結果混亂,可以給進程加一把鎖。
from multiprocessing import Process,Lock
def f(l,i):
l.acquire() #得到鎖
print("hello world",i)
l.release() #釋放鎖
if __name__ == '__main__':
lock = Lock() #生成鎖的實例
for num in range(10):
Process(target=f,args=(lock,num)).start() #將lock傳遞給子進程
三 進程池
我們每起一個進程實際上就是克隆一份父進程數據給子進程使用,起多個進程時就會占用很多內存空間。為了節省開銷,我們使用進程池。進程池就是限制同一時間有多少個進程運行。
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進程,那么程序就會等待,直到進程池中有可用進程為止。
進程池中有兩個方法:
- apply #同步執行,即串行
- apply_async #異步執行,即並發
from multiprocessing import Pool
import time,os
def Foo(i):
time.sleep(2)
print('my processid is ',os.getpid())
return i+100
if __name__ == '__main__': #windows上運行進程池必須加這行代碼,否則報錯
pool = Pool(5) #運行進程池中同時放入5個進程
for i in range(10):
# pool.apply(func=Foo,args=(i,)) #同步執行,即串行
pool.apply_async(func=Foo, args=(i,)) # 異步執行,即並發,此時有10個進程,同時執行的有5個,其他的掛起
print('end')
pool.close() #注意:一定要先關閉進程池再join
pool.join() #表示等進程池中進程執行完畢后稱程序再關閉,如果注釋,則程序直接關閉。
下面的例子,實現了主進程起了10個子進程,分別執行Foo函數,每次子進程執行完畢后,父進程回調Bar函數(可觀察到執行Bar函數的進程ID與主進程ID相同)。
from multiprocessing import Pool import time,os def Foo(i): time.sleep(2) print('my processid is ',os.getpid()) return i+100 def Bar(arg): print('--exec done:',arg,'my processid is ',os.getpid()) if __name__ == '__main__': #windows上運行進程池必須加這行代碼,否則報錯 pool = Pool(5) #運行進程池中同時放入5個進程 for i in range(10): pool.apply_async(func=Foo, args=(i,),callback=Bar) # callback=回調 print('end',os.getpid()) pool.close() #注意:先close再join pool.join() #表示等進程池中進程執行完畢后稱程序再關閉,如果注釋,則程序直接關閉。
