在我之前的一篇博文中詳細介紹了Python多線程的應用:
進程,線程,GIL,Python多線程,生產者消費者模型都是什么鬼
但是由於GIL的存在,使得python多線程沒有充分利用CPU的多核,為了利用多核,我可以采用多進程;
1. 父進程與子進程
wiki上對於父進程與子進程的定義:
a)Parent process
In Unix-like operating systems, every process except process 0 (the swapper) is created when another process executes the fork() system call. The process that invoked fork is the parent process and the newly created process is the child process. Every process (except process 0) has one parent process, but can have many child processes.
In the Linux kernel, in which there is a very slim difference between processes and POSIX threads, there are two kinds of parent processes, namely real parent and parent. Parent is the process that receives the SIGCHLD signal on child's termination, whereas real parent is the thread that actually created this child process in a multithreaded environment. For a normal process, both these two values are same, but for a POSIX thread which acts as a process, these two values may be different.[1]
b)Child process
A child process in computing is a process created by another process (the parent process). This technique pertains to multitasking operating systems, and is sometimes called a subprocess or traditionally a subtask.
There are two major procedures for creating a child process: the fork system call (preferred in Unix-like systems and the POSIX standard) and the spawn (preferred in the modern (NT) kernel of Microsoft Windows, as well as in some historical operating systems).
即,Unix/Linux操作系統提供了一個fork()系統調用,用於創建子進程;fork()非常特殊。普通的函數調用,調用一次,返回一次,但是fork()調用一次,返回兩次,因為操作系統自動把當前進程(稱為父進程)復制了一份(稱為子進程),然后,分別在父進程和子進程內返回。對於返回值,子進程永遠返回0,而父進程返回子進程的ID。這樣做的理由是,一個父進程可以fork出很多子進程,所以,父進程要記下每個子進程的ID;
python的os模塊,就含有fork函數:
#!/bin/env python #coding:utf-8 import os import time print('Process %s start...' % os.getpid()) pid = os.fork() if pid == 0: print('i am child process %s and my parent is %s' % (os.getpid(), os.getppid())) else: print('i %s just created a child process %s' % (os.getpid(), pid))
運行結果:
Process 3522 start... i 3522 just created a child process 3523 i am child process 3523 and my parent is 3522
因為fork()調用一次,返回兩次,所以得到上面的結果;這里注意:由於Windows沒有fork調用,上面的代碼在Windows上無法運行;有了fork調用,一個進程在接到新任務時就可以復制出一個子進程來處理新任務,常見的Apache服務器就是由父進程監聽端口,每當有新的http請求時,就fork出子進程來處理新的http請求。
2. multiprocessing
上面說到windows沒有fork調用,那么如何在windows上實現多進程呢?
通過multiprocess模塊,由於Python是跨平台的,自然也應該提供一個跨平台的多進程支持。multiprocessing模塊就是跨平台版本的多進程模塊。
python中兩個常用來處理進程的模塊分別是subprocess和multiprocessing,其中subprocess通常用於執行外部程序,比如一些第三方應用程序,而不是Python程序。如果需要實現調用外部程序的功能,python的psutil模塊是更好的選擇,它不僅支持subprocess提供的功能,而且還能對當前主機或者啟動的外部程序進行監控,比如獲取網絡、cpu、內存等信息使用情況,在做一些自動化運維工作時支持的更加全面。multiprocessing是python的多進程模塊,主要通過啟動python進程,調用target回調函數來處理任務。
注意:multiprocessing的方法與threading的方法類似,所以我們這里只給出示例代碼,而不做詳細介紹;
1)multiprocessing基本使用
與threading類似,也是有兩種方式
a)直接調用
1 from multiprocessing import Process, freeze_support 2 import os 3 4 processes = [] 5 6 def run(item): 7 print('-'*50) 8 print('child process %s id: %s'%(item, os.getpid())) 9 print('child process %s parent id: %s' % (item, os.getppid())) 10 11 def main(): 12 #打印主進程進程號 13 print('main process id: ', os.getpid()) 14 #創建多個子進程 15 for item in range(2): 16 p = Process(target=run, args=(item, )) 17 processes.append(p) 18 print('child process %s name: %s' % (item, p.name)) 19 print('child process %s id: %s' % (item, p.pid)) 20 21 for item in processes: 22 item.start() 23 24 for item in processes: 25 item.join() 26 27 if __name__ == '__main__': 28 main() 29 freeze_support()
b)面向對象方式調用
1 from multiprocessing import Process, freeze_support 2 import os 3 4 processes = [] 5 6 class MyProcess(Process): 7 def __init__(self, func, item): 8 super(MyProcess, self).__init__() 9 self.__func = func 10 self.__item = item 11 12 def run(self): 13 self.__func(self.__item) 14 15 def proc(item): 16 print('-'*50) 17 print('child process %s id: %s'%(item, os.getpid())) 18 print('child process %s parent id: %s' % (item, os.getppid())) 19 20 def main(): 21 #打印主進程進程號 22 print('main process id: ', os.getpid()) 23 #創建多個子進程 24 for item in range(2): 25 p = MyProcess(proc, item) 26 processes.append(p) 27 print('child process %s name: %s' % (item, p.name)) 28 print('child process %s id: %s' % (item, p.pid)) 29 30 for item in processes: 31 item.start() 32 33 for item in processes: 34 item.join() 35 36 if __name__ == '__main__': 37 main() 38 freeze_support()
注:2.7中,if __name__ == '__main__'的代碼塊中必須加上freeze_support(),python3好像不需要了
結果:
main process id: 10972 child process 0 name: MyProcess-1 child process 0 id: None child process 1 name: MyProcess-2 child process 1 id: None -------------------------------------------------- child process 0 id: 10636 child process 0 parent id: 10972 -------------------------------------------------- child process 1 id: 8076 child process 1 parent id: 10972
2)daemon屬性設置
1 from multiprocessing import Process 2 import time 3 4 processes = [] 5 6 def run(item): 7 time.sleep(1) 8 print('item: ', item) 9 10 def main(): 11 #創建多個子進程 12 for item in range(2): 13 p = Process(target=run, args=(item, )) 14 processes.append(p) 15 p.daemon = True 16 17 for item in processes: 18 item.start() 19 20 print('all done') 21 22 if __name__ == '__main__': 23 main()
結果:
all done
注意daemon和threading的方式不同,這里是直接設置屬性,而不是調用方法;另外要在start前設置daemon;
3)進程同步
既然進程之間不共享數據,為什么還有進程同步問題呢?如果多個進程打開同一個文件,在同一個屏幕輸出呢?這些還是需要進程同步的,通過Lock
4)Semaphore
同threading.Semaphore()用法相同,只是創建的Semaphore需要作為參數傳入子進程,因為進程間不共享資源
5)Event
同threading.Event()用法相同,只是創建的Event需要作為參數傳入子進程
6)進程間通訊
因為進程之間不共享資源,我們先看一個例子證明一下:
1 from multiprocessing import Process 2 3 processes = [] 4 data_list = [] 5 6 def run(lst, item): 7 lst.append(item) 8 print('%s : %s' % (item, lst)) 9 10 def main(): 11 for item in range(4): 12 p = Process(target=run, args=(data_list, item)) 13 processes.append(p) 14 15 for item in processes: 16 item.start() 17 18 for item in processes: 19 item.join() 20 21 print('final lst: ', data_list) 22 23 if __name__ == '__main__': 24 main()
結果:
1 : [1] 2 : [2] 0 : [0] 3 : [3] final lst: []
所以必須通過第三方實現進程間通訊,下面介紹3種方法
a)Queue
用法與queue.Queue在多線程中的應用相同,只是創建的queue要作為參數傳入子進程
1 from multiprocessing import Process, Queue 2 import time 3 4 q = Queue(10) 5 6 def put(q): 7 for i in range(3): 8 q.put(i) 9 print('queue size after put: %s' % q.qsize()) 10 11 def get(q): 12 print('queue size before get: %s' % q.qsize()) 13 while not q.empty(): 14 print('queue get: ', q.get()) 15 16 def main(): 17 p_put = Process(target=put, args=(q,)) 18 p_get = Process(target=get, args=(q,)) 19 p_put.start() 20 time.sleep(1) 21 p_get.start() 22 p_get.join() 23 print('all done') 24 25 if __name__ == '__main__': 26 main()
結果:
queue size after put: 3 queue size before get: 3 queue get: 0 queue get: 1 queue get: 2 all done
b)Pipe
1 import multiprocessing 2 import time 3 4 pipe = multiprocessing.Pipe() 5 6 def send(pipe): 7 for i in range(5): 8 print("send: %s" % (i,)) 9 pipe.send(i) 10 time.sleep(0.2) 11 12 def recv_1(pipe): 13 while True: 14 print("rev_1:", pipe.recv()) 15 time.sleep(1) 16 17 def recv_2(pipe): 18 while True: 19 print("rev_2:", pipe.recv()) 20 time.sleep(1) 21 22 def main(): 23 p_send = multiprocessing.Process(target=send, args=(pipe[0],)) 24 p_recv_1 = multiprocessing.Process(target=recv_1, args=(pipe[1],)) 25 p_recv_2 = multiprocessing.Process(target=recv_2, args=(pipe[1],)) 26 27 p_send.start() 28 p_recv_1.start() 29 p_recv_2.start() 30 31 p_send.join() 32 p_recv_1.join() 33 p_recv_2.join() 34 35 if __name__ == "__main__": 36 main()
結果:
send: 0 rev_1: 0 send: 1 rev_2: 1 send: 2 send: 3 send: 4 rev_1: 2 rev_2: 3 rev_1: 4
c)Manager
相當相當給力,上面的Queue,Pipe僅僅可以傳遞數據,而不能做到數據共享(不同進程修改同一份數據),但是Manger可以做到數據共享
看一下官方文檔:
A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value and Array.
from multiprocessing import Process, Manager def run(d, l): d['name'] = 'winter' l.reverse() def main(): p = Process(target=run, args=(d, l, )) p.start() p.join() print('final dict: ', d) print('final list: ', l) if __name__ == "__main__": mgmt = Manager() d = mgmt.dict() l = mgmt.list(range(10)) main()
注意:mgmt = Manger()必須放在if __name__ == "__main__"的代碼塊中,不然報freeze_support()的錯誤
而且,注意這里:
Server process managers are more flexible than using shared memory objects because they can be made to support arbitrary object types. Also, a single manager can be shared by processes on different computers over a network. They are, however, slower than using shared memory.
還可以在不同主機之間共享數據;
7)進程池Pool
如果要啟動大量的子進程,可以用進程池pool批量創建子進程:Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,才會創建新的進程來執行。
有兩種方法:阻塞方法Pool.apply()和非阻塞方法Pool.apply_async()
a)阻塞方法Pool.apply()
import multiprocessing import time def func(name): print("start: %s" % name) time.sleep(2) return 'end: %s' % name if __name__ == "__main__": name_list = ['winter', 'elly', 'james', 'yule'] res_list = [] # 創建一個進程總數為3的進程池 pool = multiprocessing.Pool(3) for member in name_list: # 創建子進程,並執行,不需要start res = pool.apply(func, (member,)) print(res) pool.close() # 調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool pool.join() print("all done...")
結果:
start: winter
end: winter
start: elly
end: elly
start: james
end: james
start: yule
end: yule
all done...
發現,阻塞方式下,進程是一個一個執行的,還是串行,所以apply用的少;
注意兩點:
1. 進程池執行子進程不需要start;
2. 調用join()之前必須先調用close(),調用close()之后就不能繼續添加新的Process了;
b)非阻塞方法Pool.apply_async()
import multiprocessing import time def func(name): print("start: %s" % name) time.sleep(2) return 'end: %s' % name def func_exp(msg): print('callback: %s' % msg) if __name__ == "__main__": name_list = ['winter', 'elly', 'james', 'yule'] res_list = [] # 創建一個進程總數為3的進程池 pool = multiprocessing.Pool() for member in name_list: # 創建子進程,並執行,不需要start res = pool.apply_async(func, (member,), callback=func_exp) #注意這里是append了res,不是res.get(),不然又要阻塞了 res_list.append(res) for res_mem in res_list: print(res_mem.get()) pool.close() # 調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool pool.join() print("all done...")
結果:
start: winter
start: elly
start: james
start: yule
callback: end: winter
end: winter
callback: end: elly
end: elly
callback: end: james
end: james
callback: end: yule
end: yule
all done...
結果分析:
1. 可以看到非阻塞情況下,充分利用了多核,實現了並行;
2. apply_async方法含有callback參數,可以用於回調
3.為什么apply方法是阻塞的呢?到底阻塞在了哪里呢?同時apply_async方法做了什么改進呢?
查看apply方法源碼:
def apply(self, func, args=(), kwds={}): ''' Equivalent of `func(*args, **kwds)`. ''' assert self._state == RUN return self.apply_async(func, args, kwds).get()
apply方法最終執行了self.apply_async(func, args, kwds).get(),同樣調用了apply_async()方法,只是對結果執行了get()方法;阻塞就是阻塞在了這里;
那我修改一下apply_async()的代碼是不是可以讓apply_async()可以變成阻塞的呢?試一下
1 import multiprocessing 2 import time 3 4 def func(name): 5 print("start: %s" % name) 6 time.sleep(2) 7 return 'end: %s' % name 8 9 def func_exp(msg): 10 print('callback: %s' % msg) 11 12 if __name__ == "__main__": 13 name_list = ['winter', 'elly', 'james', 'yule'] 14 # 創建一個進程總數為3的進程池 15 pool = multiprocessing.Pool() 16 for member in name_list: 17 # 創建子進程,並執行,不需要start 18 res = pool.apply_async(func, (member,), callback=func_exp) 19 print(res.get()) 20 pool.close() 21 # 調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool 22 pool.join() 23 print("all done...")
注意紅色部分是我修改的編碼,結果果然變成了阻塞狀態:
start: winter
callback: end: winter
end: winter
start: elly
callback: end: elly
end: elly
start: james
callback: end: james
end: james
start: yule
callback: end: yule
end: yule
all done...
c)進程池該設置多少個進程數?
既然多進程可以利用多核,那么是不是創建越多的進程越好呢?不是的,因為進程的切換成本高,所以數量太多的進程來回切換反而會降低效率!
進程數是一個經驗值,和系統的硬件資源有很大關系;最優的進程數需要通過不斷調整得出;
Pool創建時,進程池的進程數默認大小為CPU的邏輯CPU數目(內核線程數);
經驗上來說:
