1、CPU運行原理
我們都知道CPU的根本任務就是執行指令,對計算機來說最終都是一串由“0”和“1”組成的序列。CPU從邏輯上可以划分成3個模塊,分別是控制單元、運算單元和存儲單元,這三部分由CPU內部總線連接起來:
控制單元:控制單元是整個CPU的指揮控制中心,由指令寄存器IR(Instruction Register)、指令譯碼器ID(Instruction Decoder)和操作控制器OC(Operation Controller)等,對協調整個電腦有序工作極為重要。它根據用戶預先編好的程序,依次從存儲器中取出各條指令,放在指令寄存器IR中,通過指令譯碼(分析)確定應該進行什么操作,然后通過操作控制器OC,按確定的時序,向相應的部件發出微操作控制信號。操作控制器OC中主要包括節拍脈沖發生器、控制矩陣、時鍾脈沖發生器、復位電路和啟停電路等控制邏輯。
運算單元:是運算器的核心。可以執行算術運算(包括加減乘數等基本運算及其附加運算)和邏輯運算(包括移位、邏輯測試或兩個值比較)。相對控制單元而言,運算器接受控制單元的命令而進行動作,即運算單元所進行的全部操作都是由控制單元發出的控制信號來指揮的,所以它是執行部件。
存儲單元:包括CPU片內緩存和寄存器組,是CPU中暫時存放數據的地方,里面保存着那些等待處理的數據,或已經處理過的數據,CPU訪問寄存器所用的時間要比訪問內存的時間短。采用寄存器,可以減少CPU訪問內存的次數,從而提高了CPU的工作速度。但因為受到芯片面積和集成度所限,寄存器組的容量不可能很大。寄存器組可分為專用寄存器和通用寄存器。專用寄存器的作用是固定的,分別寄存相應的數據。而通用寄存器用途廣泛並可由程序員規定其用途,通用寄存器的數目因微處理器而異。
cpu的工作原理:
總的來說,CPU從內存中一條一條地取出指令和相應的數據,按指令操作碼的規定,對數據進行運算處理,直到程序執行完畢為止。
控制單元在時序脈沖的作用下,將指令計數器里所指向的指令地址(這個地址是在內存里的)送到地址總線上去,然后CPU將這個地址里的指令讀到指令寄存器進行譯碼。對於執行指令過程中所需要用到的數據,會將數據地址也送到地址總線,然后CPU把數據讀到CPU的內部存儲單元(就是內部寄存器)暫存起來,最后命令運算單元對數據進行處理加工。
cpu的工作效率:
基本上,CPU就是這樣去執行讀出數據、處理數據和往內存寫數據3項基本工作。但在通常情況下,一條指令可以包含按明確順序執行的許多操作,CPU的工作就是執行這些指令,完成一條指令后,CPU的控制單元又將告訴指令讀取器從內存中讀取下一條指令來執行。這個過程不斷快速地重復,快速地執行一條又一條指令,產生你在顯示器上所看到的結果。我們很容易想到,在處理這么多指令和數據的同時,由於數據轉移時差和CPU處理時差,肯定會出現混亂處理的情況。為了保證每個操作准時發生,CPU需要一個時鍾,時鍾控制着CPU所執行的每一個動作。時鍾就像一個節拍器,它不停地發出脈沖,決定CPU的步調和處理時間,這就是我們所熟悉的CPU的標稱速度,也稱為主頻。主頻數值越高,表明CPU的工作速度越快。
而在執行效率方面,一些廠商通過流水線方式或以幾乎並行工作的方式執行指令的方法來提高指令的執行速度。剛才我們提到,指令的執行需要許多獨立的操作,諸如取指令和譯碼等。最初CPU在執行下一條指令之前必須全部執行完上一條指令,而現在則由分布式的電路各自執行操作。也就是說,當這部分的電路完成了一件工作后,第二件工作立即占據了該電路,這樣就大大增加了執行方面的效率。
另外,為了讓指令與指令之間的連接更加准確,現在的CPU通常會采用多種預測方式來控制指令更高效率地執行。
2、線程與進程的區別
3、python3調用線程
Python3 通過兩個標准庫 _thread 和 threading 提供對線程的支持。
_thread 提供了低級別的、原始的線程以及一個簡單的鎖,它相比於 threading 模塊的功能還是比較有限的。
threading 模塊除了包含 _thread 模塊中的所有方法外,還提供的其他方法:
- threading.currentThread(): 返回當前的線程變量。
- threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。
- threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
除了使用方法外,線程模塊同樣提供了Thread類來處理線程,Thread類提供了以下方法:
- run(): 用以表示線程活動的方法。
- start():啟動線程活動。
- join([time]): 等待至線程中止。這阻塞調用線程直至線程的join() 方法被調用中止-正常退出或者拋出未處理的異常-或者是可選的超時發生。
- isAlive(): 返回線程是否活動的。
- getName(): 返回線程名。
- setName(): 設置線程名。
- setDaemon():設置為后台線程或前台線程(默認)如果是后台線程,主線程執行過程中,后台線程也在執行,主線程執行完畢后,后台線程不論成功與否,均停止;如果是前台線程,主線程執行過程中,前台線程也在執行,主線程執行完畢后,等待前台線程也執行完成后,程序停止。
直接調用啟動線程:
#!/usr/bin/env python #coding:utf8 import threading #線程模塊 import time def sayhi(num): #定義每個線程要運行的函數 print('running on number',num) time.sleep(3) if __name__ == "__main__": t1 = threading.Thread(target=sayhi,args=(33,)) #生成一個線程實例 t2 = threading.Thread(target=sayhi,args=(22,)) #生成另一個線程實例 t1.start() #啟動線程 t2.start() print(t1.getName()) #獲取線程名 print(t2.getName()) t1.join() #阻塞主線程,等待t1子線程執行完后再執行后面的代碼 t2.join() #阻塞主線程,等待t2子線程執行完后再執行后面的代碼 print('-----end')
繼承式調用啟動線程:
#!/usr/bin/env python #coding:utf8 import threading,time class mythreading(threading.Thread): #寫一個類方法繼承threading模塊 def __init__(self): #threading.Thread.__init__(self) #金典類重寫父類方法 super(mythreading,self).__init__() #重寫父類屬性 self.name = n+self.name.split('d')[1] def run(self): #運行線程的函數,函數名必須是run名稱 super(mythreading,self).run() print('starting on threading',self.name) time.sleep(5) if __name__ == '__main__': #t1 = mythreading(1) #通過類創建線程 #t2 = mythreading(2) #t1.start() #啟動進程 #t2.start() ttr = [] for i in range(10): #啟動十個線程 t = mythreading() ttr.append(t) t.start() t.setName('hehe-{}'.format(i)) #修改線程名 print(t.getName()) #獲取線程名 for item in ttr: item.join() #阻斷線程等待執行完后再執行后續代碼 print('-----end')
守護線程:
#!/usr/bin/env python #coding:utf8 import time import threading def run(num): #子線程運行函數 print('---starting',num) time.sleep(2) print('---done') def main(): #主線程運行函數 print('開啟主線程') for i in range(4): #在主線程中運行4個子線程 t1 = threading.Thread(target=run,args=(i,)) t1.start() print('啟動線程',t1.getName()) t1.join() print('結束主線程') m = threading.Thread(target=main,args=()) m.setDaemon(True) #設置主線程為守護線程 m.start() m.join(timeout=3) #等待3秒后主線程退出,不管子線程是否運行完 print('------end') #output: 開啟主線程 ---starting 0 啟動線程 Thread-2 ---starting 1 啟動線程 Thread-3 ---starting 2 啟動線程 Thread-4 ---starting 3 啟動線程 Thread-5 ---done ---done ---done ---done 結束主線程 ------end 進程已結束,退出代碼0
(1)線程同步
如果多個線程共同對某個數據修改,則可能出現不可預料的結果,為了保證數據的正確性,需要對多個線程進行同步。
使用 Thread 對象的 Lock 和 Rlock 可以實現簡單的線程同步,這兩個對象都有 acquire 方法和 release 方法,對於那些需要每次只允許一個線程操作的數據,可以將其操作放到 acquire 和 release 方法之間。如下:
多線程的優勢在於可以同時運行多個任務(至少感覺起來是這樣)。但是當線程需要共享數據時,可能存在數據不同步的問題。
考慮這樣一種情況:一個列表里所有元素都是0,線程”set”從后向前把所有元素改成1,而線程”print”負責從前往后讀取列表並打印。
那么,可能線程”set”開始改的時候,線程”print”便來打印列表了,輸出就成了一半0一半1,這就是數據的不同步。為了避免這種情況,引入了鎖的概念。
鎖有兩種狀態——鎖定和未鎖定。每當一個線程比如”set”要訪問共享數據時,必須先獲得鎖定;如果已經有別的線程比如”print”獲得鎖定了,那么就讓線程”set”暫停,也就是同步阻塞;等到線程”print”訪問完畢,釋放鎖以后,再讓線程”set”繼續。
經過這樣的處理,打印列表時要么全部輸出0,要么全部輸出1,不會再出現一半0一半1的尷尬場面。
鎖提供如下方法:
1.Lock.acquire([blocking])
2.Lock.release()
3.threading.Lock() 加載線程的鎖對象,是一個基本的鎖對象,一次只能一個鎖定,其余鎖請求,需等待鎖釋放后才能獲取
4.threading.RLock() 多重鎖,在同一線程中可用被多次acquire。如果使用RLock,那么acquire和release必須成對出現, 調用了n次acquire鎖請求,則必須調用n次的release才能在線程中釋放鎖對象。
import threading,time class mythread(threading.Thread): def __init__(self,threadID,threadName): super(mythread,self).__init__() self.threadID = threadID self.threadName = threadName def run(self): print('開啟線程:',self.threadName) threadLock.acquire() #獲取線程鎖 print_time(time.time(),self.threadName) threadLock.release() #釋放線程鎖 def print_time(suntime,threadName): for i in range(3): time.sleep(2) print('%s,,,%s'%(suntime,threadName)) threadLock = threading.Lock() #創建線程鎖 t1 = mythread(1,'thread1') #創建線程1 t2 = mythread(2,'thread2') t1.start() #啟動線程 t2.start() t1.join() #阻塞主線程,等待線程1完成 t2.join() print('退出程序') #鎖住運行線程函數后,會等待線程1執行完成后在執行線程2 #output 開啟線程: thread1 開啟線程: thread2 1516081515.3328698,,,thread1 1516081515.3328698,,,thread1 1516081515.3328698,,,thread1 1516081521.3341322,,,thread2 1516081521.3341322,,,thread2 1516081521.3341322,,,thread2 退出程序
from threading import Thread,Lock,RLock import time class mythread(Thread): def __init__(self,number1,number2): super(mythread,self).__init__() self.number1 = number1 self.number2 = number2 def run(self): print('開啟線程',self.name) lock.acquire() print('run is:',time.time(),self.number1+self.number2) arithmetic(self.number1,self.number2) time.sleep(2) lock.release() def arithmetic(avg1,avg2): lock.acquire() print('arithmetic:',time.time(),avg1+avg2) time.sleep(2) lock.release() #創建鎖對象時如果使用Lock則會在運行線程2時一直處於等待狀態 #如果使用RLock則可正常運行,RLock支持多重鎖 lock = RLock() #創建多重鎖 if __name__ == "__main__": t1 = mythread(3,4) t2 = mythread(5,6) t1.start() t2.start() t1.join() t2.join() print('程序結束!') #OUTPUT 開啟線程 Thread-1 run is: 1516088282.0478237 7 arithmetic: 1516088282.0478237 7 開啟線程 Thread-2 run is: 1516088286.0496652 11 arithmetic: 1516088286.0496652 11 程序結束!
(2)queue同步隊列
該queue
模塊實現多生產者,多用戶隊列。當在多線程之間必須安全地交換信息時,它在線程編程中特別有用。該Queue
模塊中的類實現了所有必需的鎖定語義
該模塊實現三種類型的隊列,它們的區別僅在於檢索條目的順序:
在FIFO隊列中,第一個添加的任務是第一個被檢索的。
在LIFO隊列中,最后添加的條目是第一個被檢索(像堆棧一樣操作)。
使用優先級隊列,條目保持排序(使用heapq
模塊),並且首先檢索最低值的條目。
在內部,模塊使用鎖來暫時阻止競爭的線程; 但是,它不是為了處理線程內的重入而設計的;該queue模塊定義了以下類:
-
class
queue.
Queue
(maxsize = 0 ) -
FIFO隊列的構造器。 maxsize是一個整數,用於設置可放入隊列的項目數的上限。一旦達到此大小,插入將會阻塞,直到隊列項被消耗。如果 maxsize小於或等於零,隊列大小是無限的。
-
class
queue.
LifoQueue
(maxsize = 0 ) -
LIFO隊列的構造器。 maxsize是一個整數,用於設置可放入隊列的項目數的上限。一旦達到此大小,插入將會阻塞,直到隊列項被消耗。如果 maxsize小於或等於零,隊列大小是無限的。
-
class
queue.
PriorityQueue
(maxsize = 0 ) -
優先隊列的構造函數。 maxsize是一個整數,用於設置可放入隊列的項目數的上限。一旦達到此大小,插入將會阻塞,直到隊列項被消耗。如果 maxsize小於或等於零,隊列大小是無限的。
首先檢索最低值的條目(最低值條目是返回的條目
sorted(list(entries))[0]
)。對於條目的典型圖案的形式是一個元組:。(priority_number, data)
-
exception queue.
Empty
-
在空對象上調用非阻塞
get()
(或get_nowait()
)時引發異常Queue
。
-
exception queue.
Full
-
如果在已滿的對象上調用非阻塞
put()
(或put_nowait()
),則會引發異常Queue
。
隊列對象(Queue,LifoQueue或PriorityQueue)提供以下的公共方法:
-
Queue.
qsize
() -
返回隊列的大小。請注意,qsize()> 0並不保證后續的get()不會被阻塞,qsize()也不會保證put()不會被阻塞。
-
Queue.
empty
() -
如果隊列為空則返回True,否則返回False。如果empty()返回
True
,則不保證對put()的后續調用不會被阻塞。同樣,如果empty()返回False
,則不保證后續調用get()不會被阻塞。
-
Queue.
full
() -
如果隊列已滿
則返回True,否則返回Fales。如果full()返回
True
,則不保證后續的get()調用不會被阻塞。同樣,如果full()返回False
它並不能保證后續調用put()不會被阻塞。
-
Queue.
put
(item,block = True,timeout = None ) -
將項目放入隊列中。如果可選參數block為True,並且timeout為
None
(默認),則根據需要進行阻止,直到空閑插槽可用。如果 timeout是一個正數,則最多會阻塞timeout秒數,Full
如果在此時間內沒有空閑插槽,則會引發異常。否則(block為false),如果一個空閑插槽立即可用,則在隊列中放置一個項目,否則引發Full
異常(在這種情況下timeout被忽略)。
-
Queue.
put_nowait
(item ) -
相當於。
put(item, False)
-
Queue.
get
(block = True,timeout = None ) -
從隊列中移除並返回一個項目。如果可選參數塊為真,並且 timeout為
None
(默認),則在必要時阻塞,直到項目可用。如果timeout是一個正數,則最多會阻塞timeout秒數,Empty
如果在該時間內沒有可用項目,則會引發異常。否則(block為false),返回一個項目,如果一個是立即可用的,否則引發Empty
異常(在這種情況下timeout被忽略)。
-
Queue.
get_nowait
() -
相當於
get(False)
。
提供了兩種方法來支持跟蹤入隊任務是否已完全由守護進程消費者線程處理。
-
Queue.
task_done
() -
表明以前排隊的任務已經完成。由隊列消費者線程使用。對於每個
get()
用於獲取任務的對象,隨后的調用都會task_done()
告訴隊列,任務的處理已完成。如果a
join()
當前被阻塞,則在處理所有項目時(意味着task_done()
已經接收到已經put()
進入隊列的每個項目的呼叫),則將恢復。提出了一個
ValueError
好象叫更多的時間比中放入隊列中的項目。
-
Queue.
join
() -
阻塞,直到隊列中的所有項目都被獲取並處理。
每當將項目添加到隊列中時,未完成任務的數量就會增加。只要消費者線程調用
task_done()
來指示該項目已被檢索,並且所有工作都已完成,計數就會減少。當未完成任務的數量下降到零時,join()
取消阻止。
#Queue先進先出隊列 import queue def show(q,i): if q.empty() or q.qsize() >= 1: q.put(i) #存隊列 elif q.full(): print('queue not size') que = queue.Queue(5) #允許5個隊列的隊列對象 for i in range(5): show(que,i) print('queue is number:',que.qsize()) #隊列元素個數 for j in range(5): print(que.get()) #取隊列 print('......end') #output: queue is number: 5 0 1 2 3 4 ......end
#LifoQueue先進后出隊列 import queue lifoque = queue.LifoQueue() lifoque.put('hello1') lifoque.put('hello2') lifoque.put('hello3') print(lifoque.qsize()) print(lifoque.get()) print(lifoque.get()) print(lifoque.get()) #output: 3 hello3 hello2 hello1
#PriorityQueue按數據大小取最小值優先 import queue pque = queue.PriorityQueue() #優先級的隊列 pque.put(7) #先存入隊列 pque.put(5) pque.put(3) print(pque.qsize()) print(pque.get()) #取出最小值的數據 print(pque.get()) print(pque.get()) #output: 3 22 52 71
(3)信號量(Semaphore)
Lock鎖是同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據,如一個場所內只有3把椅子給人坐,那么只允許3個人,其它人則排隊,只有等里面的人出來后才能進去。
import threading,time class mythreading(threading.Thread): #寫一個類方法繼承hreading模塊 def run(self): #運行線程的函數,函數名必須是run名稱 semaphore.acquire() #獲取信號量鎖 print('running the thread:',self.getName()) time.sleep(2) semaphore.release() #釋放信號量鎖 if __name__ == '__main__': semaphore = threading.BoundedSemaphore(3) #創建信號量對象,只運行3個進程同時運行 for i in range(20): t1 = mythreading() t1.start() t1.join() print('---end')
threading模塊的Timer方法,在經過一定時間后才能運行的操作,一般只針對函數運行。
import threading def run(): print('is running...') t=threading.Timer(5.0,run) #等待5秒后執行函數 t.start()
(4)事件(event)
python線程的事件用於主線程控制其它線程的執行,事件主要提供了三個方法:
event.wait 線程阻塞
event.set 將全局對象“Flag”設置為False
event.clear 將全局對象"Flag"設置為True
事件處理的機制:全局定義了一個“Flag”,默認為“False”,如果“Flag”值為 False,那么當程序執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那么event.wait 方法時便不再阻塞。
#!/usr/bin/env python #coding:utf8 import threading def show(event): print('start') event.wait() #阻塞線程執行程序 print('done') event_obj = threading.Event() #創建event事件對象 for i in range(10): t1 = threading.Thread(target=show,args=(event_obj,)) t1.start() inside = input('>>>:') if inside == '1': event_obj.set() #當用戶輸入1時,set全局Flag為True,線程不再阻塞,打印"done" event_obj.clear() #將Flag設置為False
(5)條件(Condition)
使得線程等待,只有滿足某條件時,才釋放N個線程
import threading def condition_func(): ret = False inp = input('>>>:') if inp == '1': ret = True return ret def run(n): con.acquire() #條件鎖 con.wait_for(condition_func) #判斷條件 print('running...',n) con.release() #釋放鎖 print('------------') if __name__ == '__main__': con = threading.Condition() #建立線程條件對象 for i in range(10): t = threading.Thread(target=run,args=(i,)) t.start() t.join() print('退出程序')
4、python3進程調用
multiprocessing是一個使用類似於threading模塊的API來支持多進程。該multiprocessing軟件包提供本地和遠程並發,通過使用子進程來充分利用服務器上的多個處理器,它可以在Unix和Windows上運行。
class multiprocessing.
Process
(group = None,target = None,name = None,args =(),kwargs = {},*,daemon = None )
過程對象表示在單獨的過程中運行的活動。這個 Process
類的所有方法的等價與 threading.Thread
。
應始終使用關鍵字參數調用構造函數。群體 應該永遠是None
; 它僅僅是為了兼容而存在 threading.Thread
。 target是run()
方法調用的可調用對象。它默認為None
,意味着什么都不叫。name是進程名稱(name
更多細節見)。 args是目標調用的參數元組。 kwargs是目標調用的關鍵字參數字典。如果提供,則僅關鍵字守護進程參數將進程daemon
標志設置為True
或False
。如果None
(默認),這個標志將從創建過程繼承。
默認情況下,沒有參數傳遞給目標。
如果一個子類重寫了構造函數,那么必須確保它Process.__init__()
在執行任何其他操作之前調用基類的構造函數()。
在版本3.3中進行了更改:添加了守護進程參數。
-
run
() -
表示進程活動的方法。
您可以在子類中重寫此方法。標准
run()
方法調用傳遞給對象構造函數的可調用對象作為目標參數,如果有的話,分別從args和kwargs參數中獲取順序和關鍵字參數。
-
start
() -
開始進程的活動。
每個進程對象最多只能調用一次。它安排對象的
run()
方法在一個單獨的進程中被調用。
-
join
([ timeout ] ) -
如果可選參數timeout是
None
(缺省值),則該方法將阻塞,直到其join()
方法被調用的進程終止。如果超時是一個正數,它最多會阻塞超時秒數。請注意,None
如果方法終止或方法超時,則方法返回。檢查流程exitcode
,確定是否終止。一個過程可以連接多次。
進程無法自行加入,因為這會導致死鎖。嘗試在啟動之前加入進程是錯誤的。
-
name
-
該進程的名稱。名稱是一個字符串,僅用於識別目的。它沒有語義。多個進程可以被賦予相同的名稱。
初始名稱由構造函數設置。如果沒有明確的名字被提供給構造函數,就會構造一個名為“Process-N 1:N 2:...:N k ” 的表單,其中每個N k是其父代的第N個孩子。
-
is_alive
() -
返回進程是否存在;從
start()
方法返回的那一刻起,一個進程對象是活着的,直到子進程終止。
-
daemon
-
進程的守護進程標志,一個布爾值。這必須在
start()
調用之前設置 。初始值是從創建過程繼承的。當進程退出時,它將嘗試終止其所有守護進程的子進程。
請注意,守護進程不允許創建子進程。否則,如果父進程退出時被終止,則守護進程將使其子進程成為孤兒。此外,這些不是 Unix守護進程或服務,它們是正常的進程,如果非守護進程已經退出,它將被終止(而不是加入)。
除了 threading.Thread
API之外,Process
對象還支持以下屬性和方法:
-
pid
-
返回進程ID。在這個過程產生之前,這將是
None
。
-
exitcode
-
孩子的退出代碼。這將是
None
如果過程還沒有結束。負值-N表示孩子被信號N終止。
-
authkey
-
進程的身份驗證密鑰(一個字節字符串)。
當
multiprocessing
初始化主進程正在使用分配一個隨機串os.urandom()
。當一個
Process
對象被創建時,它會繼承父進程的認證密鑰,盡管這可以通過設置authkey
成另一個字節字符串來改變。請參閱驗證密鑰。
-
sentinel
-
系統對象的數字句柄,當進程結束時它將變為“就緒”。
如果您想一次使用多個事件,則可以使用此值
multiprocessing.connection.wait()
。否則,調用join()
更簡單。在Windows中,這是與使用的OS手柄
WaitForSingleObject
和WaitForMultipleObjects
家庭的API調用。在Unix上,這是一個可用於select
模塊原語的文件描述符。3.3版本中的新功能
-
terminate
() -
終止這個進程。在Unix上,這是使用
SIGTERM
信號完成的。在WindowsTerminateProcess()
上使用。請注意,退出處理程序和最后的子句等將不會被執行。
需要注意的是start()
,join()
,is_alive()
, terminate()
和exitcode
方法只能由創建進程對象的過程調用。
異常:
-
exception multiprocessing.
ProcessError
-
所有
multiprocessing
異常的基類。
-
exception
multiprocessing.
BufferTooShort
-
Connection.recv_bytes_into()
當提供的緩沖區對象太小而不能讀取消息時引發異常。如果
e
是的一個實例BufferTooShort
,然后e.args[0]
會給出消息作為字節串。
-
exception multiprocessing.
AuthenticationError
-
發生認證錯誤時引發。
-
exception multiprocessing.
TimeoutError
-
當超時到期時由超時方法提出。
創建10個並發進程:
from multiprocessing import Process def proce(pn): print('hello',pn) if __name__ == '__main__': for i in range(10): #啟動10個進程 p1 = Process(target=proce,args=(i,)) #創建進程 p1.start() #啟動進程 #print('nn:',p1.name) p1.join() #等待進程結束 print('exit project')
打印進程ID:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/17 14:15 # @Author : Py.qi # @File : 進程ID.py # @Software: PyCharm from multiprocessing import Process import os def info(title): print(title) print('module name',__name__) #是否主程序 print('parent process',os.getppid()) #上層進程號 print('process id:',os.getpid()) #自身進程號 def f(name): info('function f') print('hello',name) if __name__ == '__main__': info('main line') #通過pycharm進程運行函數 p = Process(target=f,args=('bob',)) #調用子進程運行函數 p.start() p.join() #output: main line module name __main__ parent process 5756 #pycharm進程號 process id: 9864 #自身進程 function f module name __mp_main__ parent process 9864 process id: 9024 #文件創建的進程號 hello bob
(1)上下文和啟動方法
multiprocessing支持三種啟動流程的方式:
spawn:
父進程啟動一個新鮮的Python解釋器進程。子進程只會繼承運行進程對象run()
方法所必需的資源。特別是父進程中不必要的文件描述符和句柄將不會被繼承。與使用fork或forkserver相比,使用此方法啟動進程相當慢。
在Unix和Windows上可用。Windows上是默認的。
fork:
父進程使用os.fork()
fork來解釋Python。子進程在開始時與父進程有效地相同。父進程的所有資源都由子進程繼承。請注意,安全分叉多線程的過程是有問題的。
僅在Unix上提供。Unix上是默認的。
forkserver:
當程序啟動並選擇forkserver啟動方法時,啟動一個服務器進程。從那時起,無論何時需要一個新的進程,父進程都連接到服務器並請求它分叉一個新的進程。fork服務器進程是單線程的,所以它是安全的使用os.fork()
。沒有不必要的資源被繼承。
在支持通過Unix管道傳遞文件描述符的Unix平台上可用。
在3.4版中進行了更改:在所有unix平台上添加了spawn,並為某些unix平台添加了forkserver。子進程不再繼承Windows上的所有父代繼承句柄。
在Unix上,使用spawn或forkserver啟動方法也將啟動一個信號量跟蹤器進程,該進程跟蹤程序進程創建的未鏈接的已命名信號量。當所有進程退出信號量跟蹤器時,將取消所有剩余信號量的鏈接。通常應該沒有,但如果一個進程被一個信號殺死,那么可能會有一些“泄漏”的信號量。(取消鏈接命名的信號量是一個嚴重的問題,因為系統只允許有限的數量,並且在下一次重新啟動之前它們不會自動斷開連接。)
在主模塊中使用set_start_method()的啟動方法:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/17 15:44 # @Author : Py.qi # @File : set_start1.py # @Software: PyCharm import multiprocessing as mp def foo(q): #傳遞隊列對象給函數 q.put('hello') q.put('123') q.put('python') if __name__ == '__main__': mp.set_start_method('spawn') #設置啟動進程方式 q = mp.Queue() #創建隊列對象 p = mp.Process(target=foo, args=(q,)) #啟動一個進程,傳遞運行函數和參數 p.start() #啟動進程 print(q.get()) #獲取隊列數據 print(q.qsize()) print(q.get()) p.join()
set_start_method()不能在程序中使用多次,可以使用get_context()獲取上下文對象,上下文對象與多處理模塊具有相同的API,並允許在同一個程序中使用多個啟動方法。
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/18 13:14 # @Author : Py.qi # @File : get_context_1.py # @Software: PyCharm import multiprocessing as mp import time def foo(q): for i in range(10): q.put('python%s'%i) time.sleep(1) #q.put('context') if __name__ == '__main__': ext = mp.get_context('spawn') q = ext.Queue() p = ext.Process(target=foo,args=(q,)) p2 = ext.Process(target=foo,args=(q,)) p.start() p2.start() for i in range(10): print(q.get()) #print(q.get()) #print(q.get()) #print(q.get()) #print(q.get()) p.join() p2.join() #output: python0 python0 python1 python1 python2 python2 python3 python3 python4 python4
請注意,與一個上下文相關的對象可能與另一個上下文的進程不兼容。特別是,使用fork上下文創建的鎖不能被傳遞給使用spawn或forkserver啟動方法啟動的進程 。
想要使用特定啟動方法的庫應該可以get_context()
用來避免干擾庫用戶的選擇。
(1)進程之間交換對象
multiprocessing
支持兩種進程之間的通訊通道:
class multiprocessing.
Queue
([ maxsize ] )
返回使用管道和一些鎖/信號量實現的進程共享隊列。當一個進程首先把一個項目放到隊列中時,一個進給線程被啟動,將一個緩沖區中的對象傳送到管道中。
Queue
實現queue.Queue
除了task_done()
和之外的 所有方法join()
。
-
qsize
() -
返回隊列的近似大小。由於多線程/多處理語義,這個數字是不可靠的。
請注意,
NotImplementedError
在Mac OS X等Unix平台上可能會出現這種sem_getvalue()
情況。
-
empty
() -
True
如果隊列為空False
則返回,否則返回。由於多線程/多處理語義,這是不可靠的。
-
full
() -
True
如果隊列已滿False
則返回,否則返回。由於多線程/多處理語義,這是不可靠的。
-
put
(obj [,block [,timeout ] ] ) -
將obj放入隊列中。如果可選參數塊是
True
(缺省值)並且超時是None
(缺省值),則在需要時禁止,直到空閑插槽可用。如果超時是一個正數,則最多會阻塞超時秒數,queue.Full
如果在此時間內沒有空閑插槽,則會引發異常。否則(塊是False
),如果一個空閑插槽立即可用,則在隊列中放置一個項目,否則引發queue.Full
異常(在這種情況下超時被忽略)。
-
put_nowait
(obj ) -
相當於。
put(obj, False)
-
get
([ block [,timeout ] ] ) -
從隊列中移除並返回一個項目。如果可選ARGS 塊是
True
(默認值),超時是None
(默認),塊如有必要,直到一個項目是可用的。如果超時是一個正數,則最多會阻塞超時秒數,queue.Empty
如果在該時間內沒有可用項目,則會引發異常。否則(塊是False
),返回一個項目,如果一個是立即可用的,否則引發queue.Empty
異常(在這種情況下超時被忽略)。
-
get_nowait
() -
相當於
get(False)
。
multiprocessing.Queue
有一些其他的方法沒有找到 queue.Queue
。大多數代碼通常不需要這些方法:
-
close
() -
表明當前進程不會有更多的數據放在這個隊列中。一旦將所有緩沖的數據刷新到管道,后台線程將退出。當隊列被垃圾收集時,這被自動調用。
-
join_thread
() -
加入后台線程。這只能在
close()
被調用后才能使用。它阻塞,直到后台線程退出,確保緩沖區中的所有數據已經刷新到管道。默認情況下,如果一個進程不是隊列的創建者,那么在退出時它將嘗試加入隊列的后台線程。該過程可以調用
cancel_join_thread()
做出join_thread()
什么也不做。
-
cancel_join_thread
() -
防止
join_thread()
阻塞。尤其是,這可以防止后台線程在進程退出時自動加入
class multiprocessing.
SimpleQueue
-
empty
() -
True
如果隊列為空False
則返回,否則返回。
-
get
() -
從隊列中移除並返回一個項目。
-
put
(item ) -
將元素放入隊列中。
class multiprocessing.
JoinableQueue
([ maxsize ] )
JoinableQueue
,一個Queue
子類,是另外有一個隊列task_done()
和join()
方法。
-
task_done
() -
表明以前排隊的任務已經完成。由隊列使用者使用。對於每個
get()
用於獲取任務的對象,隨后的調用都會task_done()
告訴隊列,任務的處理已完成。如果a
join()
當前被阻塞,則在處理所有項目時(意味着task_done()
已經接收到已經put()
進入隊列的每個項目的呼叫),則將恢復。提出了一個
ValueError
好象叫更多的時間比中放入隊列中的項目。
-
join
() -
阻塞,直到隊列中的所有項目都被獲取並處理。
每當將項目添加到隊列中時,未完成任務的數量就會增加。每當消費者打電話
task_done()
表明該物品已被檢索並且所有工作都已完成時,計數就會下降 。當未完成任務的數量下降到零時,join()
取消阻止。
-
multiprocessing.
active_children
() -
返回當前進程的所有活動的列表。調用這個函數有“加入”已經完成的任何進程的副作用。
-
multiprocessing.
cpu_count
() -
返回系統中的CPU數量。可能會提高
NotImplementedError
。
-
multiprocessing.
current_process
() -
返回
Process
當前進程對應的對象。一個類似的threading.current_thread()
。
-
multiprocessing.
freeze_support
() -
添加對何時使用的程序
multiprocessing
進行凍結以產生Windows可執行文件的支持。(已經用py2exe, PyInstaller和cx_Freeze進行了測試。)需要在主模塊后面直接調用這個函數:if __name__ == '__main__'freeze_support()
在Windows以外的操作系統上調用時,調用不起作用。另外,如果模塊正在通過Windows上的Python解釋器正常運行(程序還沒有被凍結),那么這個模塊freeze_support()
沒有任何作用。
-
multiprocessing.
get_all_start_methods
() -
返回支持的啟動方法的列表,其中第一個是默認的。可能的啟動方法是
'fork'
,'spawn'
和'forkserver'
。在Windows上只'spawn'
可用。在Unix上'fork'
並且'spawn'
始終受支持,並且'fork'
是默認的。3.4版本中的新功能
-
multiprocessing.
get_context
(method = None ) -
返回與
multiprocessing
模塊具有相同屬性的上下文對象 。如果方法是
None
那么返回默認的上下文。否則,方法應該是'fork'
,'spawn'
,'forkserver'
。ValueError
如果指定的啟動方法不可用,則引發。3.4版本中的新功能
-
multiprocessing.
get_start_method
(allow_none = False ) -
返回用於啟動進程的啟動方法的名稱。
如果start方法沒有被修復,並且allow_none為false,那么start方法被固定為默認值,並返回名稱。如果start方法沒有被修復,並且allow_none 為true,則
None
返回。返回值可以是
'fork'
,'spawn'
,'forkserver'
或None
。'fork'
在Unix上是默認的,而'spawn'
在Windows上是默認的。3.4版本中的新功能
-
multiprocessing.
set_executable
() -
設置啟動子進程時使用的Python解釋器的路徑。(默認
sys.executable
使用)。嵌入可能需要做一些事情set_executable (OS ,路徑,加入(SYS 。exec_prefix , 'pythonw.exe' ))
才可以創建子進程。在版本3.4中更改:現在在Unix上支持
'spawn'
啟動方法時使用。
-
multiprocessing.
set_start_method
(method) -
設置應該用來啟動子進程的方法。 方法可以
'fork'
,'spawn'
或者'forkserver'
。請注意,這應該至多被調用一次,並且應該在主模塊的子句內受到保護。
if __name__ == '__main__'
3.4版本中的新功能
Queue(隊列):這個Queue類為queue.Queue的克隆
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/18 13:40 # @Author : Py.qi # @File : queue_pbet.py # @Software: PyCharm from multiprocessing import Process,Queue def f(q): q.put([42, None, 'hello']) q.put('python') q.put('python3') if __name__ == '__main__': q = Queue() #創建隊列對象 p = Process(target=f, args=(q,)) #新進程 p.start() print(q.get()) # prints "[42, None, 'hello']" print(q.get()) print(q.get()) p.join()
Pipes(管道):該Pipe()函數返回一對由默認為雙工(雙向)的管道連接的連接對象。
連接對象允許發送和接收可選對象或字符串。他們可以被認為是面向消息的連接套接字。
連接對象通常使用創建Pipe()
class multiprocessing.
Connection
-
send
(obj ) -
將對象發送到應該讀取的連接的另一端
recv()
。該對象必須是可挑選的。非常大的包(大約32 MB +,雖然取決於操作系統)可能會引發
ValueError
異常。
-
fileno
() -
返回連接使用的文件描述符或句柄。
-
close
() -
關閉連接。
當連接被垃圾收集時,這被自動調用。
-
poll
([ timeout ] )
返回是否有可讀的數據。
如果沒有指定超時,則會立即返回。如果超時是一個數字,那么這指定了以秒為單位的最大時間來阻止。如果timeout,None
則使用無限超時。
請注意,可以使用一次輪詢多個連接對象multiprocessing.connection.wait()
。
send_bytes
(buffer [,offset [,size ] ] )
-
從類似字節的對象發送字節數據作為完整的消息。
如果給出偏移量,則從緩沖區中的該位置讀取數據。如果 給定大小,那么將從緩沖區中讀取多個字節。非常大的緩沖區(大約32 MB +,但取決於操作系統)可能會引發
ValueError
異常
-
recv_bytes
([ maxlength ] ) -
將連接另一端發送的字節數據的完整消息作為字符串返回。阻止,直到有東西要接收。
EOFError
如果沒有什么可以接收,而另一端已經關閉,就會引發。如果最大長度被指定並且所述消息是長於最大長度 然后
OSError
升至並連接將不再是可讀的。
-
recv_bytes_into
(buffer [,offset ] ) -
從連接的另一端讀取緩沖區中的字節數據的完整消息,並返回消息中的字節數。阻止,直到有東西要接收。提出
EOFError
,如果沒有什么留下來接收,而另一端被關閉。緩沖區必須是一個可寫的字節類對象。如果 給出了偏移量,則消息將從該位置寫入緩沖區。偏移量必須是小於緩沖區長度的非負整數(以字節為單位)。
如果緩沖區太短,則
BufferTooShort
引發異常,並且完整的消息可用, 異常實例e.args[0]
在哪里e
。
在版本3.3中更改:現在可以使用Connection.send()
和在進程之間傳輸連接對象本身Connection.recv()
。
3.3版本中的新功能:連接對象現在支持上下文管理協議 - 請參閱 上下文管理器類型。 __enter__()
返回連接對象,並__exit__()
調用close()
。
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/18 13:50 # @Author : Py.qi # @File : queue_pipes.py # @Software: PyCharm from multiprocessing import Process,Pipe def f(parent,child): parent.send('parent1') #父鏈接寫入數據 parent.send('parent2') child.send('python1') #子鏈接寫入數據 child.send('python2') parent.close() #關閉管道 child.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() #創建父和子管道連接 p = Process(target=f, args=(parent_conn,child_conn)) p.start() print(parent_conn.recv()) #從父鏈接收的數據是從子鏈接寫入的數據 print(parent_conn.recv()) print(child_conn.recv()) #從子鏈接收的數據是從父鏈接寫入的數據 print(child_conn.recv()) p.join()
Pipe()返回兩個連接對象,表示管道的兩端(parent_conn)為頭,(child_conn)為尾。這兩個連接對象都有send()和recv()方法等。請注意:如果兩個或多個進程(線程)試圖同時讀取或寫入管道的同一端,則管道中的數據可能會損壞,可以同時使用管道的頭和尾流程式的讀取和寫入數據就沒有問題了。
(2)進程之間的同步
通常,同步原語在多進程程序中並不像在多線程程序中那樣必要;也可以使用管理器對象創建同步基元。
-
class
multiprocessing.
Barrier
(parties [,action [,timeout ] ] ) -
屏蔽對象:一個克隆的
threading.Barrier
。3.3版本中的新功能
-
class
multiprocessing.
BoundedSemaphore
([ value ] ) -
一個有界的信號量對象
threading.BoundedSemaphore
。與其相近的模擬存在單一差異:其acquire
方法的第一個參數是命名塊,與之一致Lock.acquire()
。注意:在Mac OS X上,這是無法區分的,
Semaphore
因為sem_getvalue()
沒有在該平台上實現。
-
class
multiprocessing.
Condition
([ lock ] ) -
一個條件變量:一個別名
threading.Condition
。如果指定了鎖,那么它應該是一個Lock
或一個RLock
對象multiprocessing
。在版本3.3中更改:該wait_for()
方法已添加。
-
class multiprocessing.
Event
-
克隆的
threading.Event
。
-
class multiprocessing.
Lock
-
一個非遞歸鎖對象:一個非常類似的
threading.Lock
。一旦一個進程或線程獲得一個鎖,隨后的嘗試從任何進程或線程獲取它將被阻塞,直到它被釋放; 任何進程或線程可能會釋放它。threading.Lock
應用於線程的概念和行為在 這里被復制,multiprocessing.Lock
因為它適用於進程或線程,除非指出。請注意,這
Lock
實際上是一個工廠函數,它返回一個multiprocessing.synchronize.Lock
使用默認上下文初始化的實例。-
acquire
(block = True,timeout = None ) -
獲取一個鎖,阻塞或不阻塞。
在block參數設置為
True
(默認)的情況下,方法調用將被阻塞直到鎖處於解鎖狀態,然后將其設置為鎖定並返回True
。請注意,這第一個參數的名稱不同於threading.Lock.acquire()
。將塊參數設置為
False
,方法調用不會阻止。如果鎖目前處於鎖定狀態,則返回False
; 否則將鎖設置為鎖定狀態並返回True
。當用正值,浮點值超時調用時,只要無法獲取鎖定,最多只能阻塞超時指定的秒數。具有負值的超時調用 相當於超時零。超時值
None
(默認值)的調用 將超時期限設置為無限。請注意,處理負值或超時None
值 與實施的行為不同 。該超時參數有,如果沒有實際意義塊參數被設置為,並因此忽略。返回threading.Lock.acquire()
False
True
如果鎖已被獲取或False
超時時間已過。
-
release
() -
釋放一個鎖。這可以從任何進程或線程調用,不僅是最初獲取鎖的進程或線程。
threading.Lock.release()
除了在解鎖的鎖上調用a時,行為與其相同ValueError
。
-
-
class multiprocessing.
RLock
-
遞歸鎖對象:一個緊密的類比
threading.RLock
。遞歸鎖必須由獲取它的進程或線程釋放。一旦一個進程或線程獲得遞歸鎖定,相同的進程或線程可以再次獲得它,而不會阻塞; 該進程或線程必須每次釋放一次它被獲取。請注意,這
RLock
實際上是一個工廠函數,它返回一個multiprocessing.synchronize.RLock
使用默認上下文初始化的實例。RLock
支持上下文管理器協議,因此可以用在with
語句中。-
acquire
(block = True,timeout = None ) -
獲取一個鎖,阻塞或不阻塞。
當調用塊參數設置為
True
,阻塞直到鎖處於解鎖狀態(不屬於任何進程或線程),除非該鎖已被當前進程或線程所擁有。當前進程或線程接着獲取鎖的所有權(如果它還沒有所有權),並且鎖內的遞歸級別增加1,導致返回值為True
。請注意,第一個參數的行為與實現相比有幾個不同之處threading.RLock.acquire()
,從參數本身的名稱開始。當調用塊參數設置為
False
,不要阻塞。如果鎖已經被另一個進程或線程獲取(並因此被擁有),則當前進程或線程不獲取所有權,並且鎖內的遞歸級別不改變,導致返回值為False
。如果鎖處於未鎖定狀態,則當前進程或線程獲取所有權,遞歸級別遞增,結果返回值為True
。timeout參數的使用和行為與in中的相同
Lock.acquire()
。請注意,這些超時行為中的一些 與實施的行為有所不同threading.RLock.acquire()
。
-
release
() -
釋放一個鎖,遞減遞歸級別。如果在遞減之后遞歸級別為零,則將鎖重置為解鎖(不由任何進程或線程擁有),並且如果有其他進程或線程被阻塞,等待鎖解鎖,則准許其中的一個繼續進行。如果在遞減之后遞歸級別仍然不為零,那么鎖保持鎖定並由調用進程或線程擁有。
只有在調用進程或線程擁有鎖定時才調用此方法。一個
AssertionError
如果該方法是通過一個過程調用或線程以外的雇主或升高如果鎖處於解鎖(無主)的狀態。請注意,在這種情況下引發的異常的類型不同於執行的行為threading.RLock.release()
。
-
-
class
multiprocessing.
Semaphore
([ value ] ) -
一個信號量對象:一個接近的類比
threading.Semaphore
。與其相近的模擬存在單一差異:其
acquire
方法的第一個參數是命名塊,與之一致Lock.acquire()
。
multiprocessing
包含來自所有同步方法等價於threading
。例如,可以使用鎖來確保一次只有一個進程打印到標准輸出:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/18 14:16 # @Author : Py.qi # @File : mult_sync.py # @Software: PyCharm from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start() #創建進程並啟動
(3)進程之間共享狀態
在進行並發編程時,通常最好盡可能避免使用共享狀態。使用多個進程時尤其如此。
但是,如果你確實需要使用一些共享數據,那么 multiprocessing
提供了一些方法。
可以使用可以由子進程繼承的共享內存來創建共享對象。
multiprocessing.
Value
(typecode_or_type,* args,lock = True )
返回ctypes
從共享內存分配的對象。默認情況下,返回值實際上是對象的同步包裝器。對象本身可以通過a的value屬性來訪問Value
。
typecode_or_type決定了返回對象的類型:它是一個ctypes類型或array
模塊使用的一種字符類型。 *參數傳遞給類型的構造函數。
如果鎖是True
(默認),則會創建一個新的遞歸鎖對象來同步對該值的訪問。如果鎖是一個Lock
或一個RLock
對象,那么將用於同步訪問值。如果是鎖,False
那么訪問返回的對象將不會被鎖自動保護,因此它不一定是“過程安全的”。
Shared memory(共享內存):可以使用value或將數據存儲在共享內存映射中Array
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/18 14:24 # @Author : Py.qi # @File : mult_sharememory.py # @Software: PyCharm from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = i*i if __name__ == '__main__': num = Value('d', 0.0) #d表示一個雙精度浮點數 arr = Array('i', range(10)) #i表示符號整數 p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:]) #output: 3.1415927 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
在'd'
與'i'
創建時使用的參數num
和arr
被使用的那種的TypeCodes array
模塊:'d'
表示一個雙精度浮點數和'i'
指示符號整數。這些共享對象將是進程和線程安全的。
為了更靈活地使用共享內存,可以使用multiprocessing.sharedctypes
支持創建從共享內存分配的任意ctypes對象的 模塊。
Server process(服務器進程):通過Manager()
控制一個服務器進程來返回管理器對象,該進程持有Python對象,並允許其他進程使用代理來操作它們。
管理者提供了一種方法來創建可以在不同進程之間共享的數據,包括在不同機器上運行的進程之間通過網絡進行共享。管理員對象控制管理共享對象的服務器進程 。其他進程可以通過使用代理來訪問共享對象。
-
返回
SyncManager
可用於在進程之間共享對象的已啟動對象。返回的管理對象對應於一個衍生的子進程,並具有創建共享對象並返回相應代理的方法。
Manager進程在垃圾收集或其父進程退出時將立即關閉。管理員類在multiprocessing.managers
模塊中定義 :
-
class
multiprocessing.managers.
BaseManager
([ address [,authkey ] ] ) -
創建一個BaseManager對象。
一旦創建,應該調用
start()
或get_server().serve_forever()
確保管理器對象引用已啟動的管理器進程。地址是管理進程偵聽新連接的地址。如果地址是
None
一個任意的選擇。authkey是將用於檢查到服務器進程的傳入連接的有效性的身份驗證密鑰。如果使用 authkey的
None
話current_process().authkey
。否則使用authkey,它必須是一個字節字符串。-
start
([ initializer [,initargs ] ] ) -
啟動一個子流程來啟動管理器。如果初始化程序不是,
None
那么子進程將initializer(*initargs)
在啟動時調用。
-
get_server
() -
Server
在Manager的控制下返回一個代表實際服務器的對象。該Server
對象支持該serve_forever()
方法:
-
connect
()
將本地管理器對象連接到遠程管理器進程
-
shutdown
() -
停止經理使用的過程。這僅
start()
在用於啟動服務器進程時才可用 。這可以被多次調用。
-
register
(typeid[,callable[,proxyType [,exposed[,method_to_typeid [,create_method ] ] ] ] ] ) -
可用於注冊類型或可與經理類一起調用的類方法。
typeid是一個“類型標識符”,用於標識特定類型的共享對象。這必須是一個字符串。
callable是可調用的,用於為這個類型標識符創建對象。如果一個管理器實例將被連接到使用該
connect()
方法的服務器,或者如果 create_method參數是,False
那么這可以保留為None
。proxytype是其中的一個子類,
BaseProxy
用於使用此typeid為共享對象創建代理。如果None
那么一個代理類是自動創建的。exposed用於指定其中用於此typeid的代理應該被允許使用訪問方法的名稱序列
BaseProxy._callmethod()
。(如果exposed在None
隨后proxytype._exposed_
被用於代替如果它存在)。在其中沒有指定暴露列表中的情況下,共享對象的所有“公共方法”將是可訪問的。(這里的“公共方法”是指任何具有__call__()
方法且名稱不以其開頭的屬性'_'
)。method_to_typeid是一個映射,用於指定那些應該返回代理的公開方法的返回類型。它將方法名稱映射到typeid字符串。(如果method_to_typeid存在,
None
則proxytype._method_to_typeid_
使用它。)如果一個方法的名稱不是這個映射的關鍵字,或者如果這個映射是None
這個方法返回的對象將被值復制。create_method確定是否應該使用名稱typeid創建一個方法, 該方法可用於通知服務器進程創建一個新的共享對象並為其返回一個代理。默認情況下是
True
。BaseManager
實例也有一個只讀屬性:address manager
使用的地址。
版本3.3中更改:管理器對象支持上下文管理協議 - 請參閱 上下文管理器類型。 __enter__()
啟動服務器進程(如果尚未啟動),然后返回管理器對象。 __exit__()
來電shutdown()
。
在以前的版本__enter__()
中,如果尚未啟動管理器的服務器進程,則不啟動。
通過返回的管理manager()將支持的類型: list
,dict
,Namespace
,Lock
, RLock
,Semaphore
,BoundedSemaphore
, Condition
,Event
,Barrier
, Queue
,Value
和Array
。
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/18 14:40 # @Author : Py.qi # @File : mult_servprocess.py # @Software: PyCharm from multiprocessing import Process, Manager def f(d,l,i): d[i] = i+1 #每個進程添加字典元素 l.append(i) #每個進程啟動后將列表添加數據i if __name__ == '__main__': manager = Manager() d = manager.dict() l = manager.list() pro_list=[] for i in range(5): #啟動5個進程 p = Process(target=f, args=(d,l,i)) p.start() pro_list.append(p) #將進程都添加到列表 for j in pro_list: j.join() #等待每個進程執行完成 print(d) #打印字典數據為5個進程操作的后的數據,實現進程間的數據共享 print(l) #output: {0: 1, 1: 2, 2: 3, 3: 4, 4: 5} [0, 1, 2, 3, 4]
(4)進程池(Pool)
class multiprocessing.pool.
Pool
([ processes [,initializer [,initargs [,maxtasksperchild [,context ] ] ] ] ] )
一個進程池對象,用於控制可以提交作業的工作進程池。它支持超時和回調的異步結果,並具有並行映射實現。
進程是要使用的工作進程的數量。如果過程是 None
然后通過返回的數字os.cpu_count()
被使用。
如果初始化程序不是,None
那么每個工作進程將initializer(*initargs)
在啟動時調用 。
maxtasksperchild是一個工作進程在退出之前可以完成的任務數量,並被一個新的工作進程取代,以便釋放未使用的資源。默認的maxtasksperchild是None
,這意味着工作進程的生活時間與池一樣長。
上下文可用於指定用於啟動工作進程的上下文。通常使用函數multiprocessing.Pool()
或Pool()
上下文對象的方法來創建池。在這兩種情況下都適當設置。
請注意,池對象的方法只應由創建池的進程調用。
-
apply
(func [,args [,kwds ] ] ) -
呼叫FUNC帶參數ARGS和關鍵字參數kwds。它阻塞,直到結果准備就緒。鑒於這些塊,
apply_async()
更適合於並行執行工作。此外,func 只能在進程池的一名工作人員中執行。
-
apply_async
(func [,args [,kwds [,callback [,error_callback ] ] ] ] ) -
apply()
返回結果對象的方法的變體。如果指定了callback,那么它應該是一個可接受的參數。當結果變為就緒callback被應用到它,這是除非調用失敗,在這種情況下,error_callback 被應用。
如果指定了error_callback,那么它應該是一個可接受的參數。如果目標函數失敗,則使用異常實例調用error_callback。
回調應該立即完成,否則處理結果的線程將被阻塞。
-
map
(func,iterable [,chunksize ] ) -
map()
內置函數的並行等價物(盡管它只支持一個可迭代的參數)。它阻塞,直到結果准備就緒。這種方法把迭代器切成許多塊,它們作為單獨的任務提交給進程池。可以通過將chunksize設置為正整數來指定這些塊的(近似)大小。
-
map_async
(func,iterable [,chunksize [,callback [,error_callback ] ] ] ) -
map()
返回結果對象的方法的變體。如果指定了callback,那么它應該是一個可接受的參數。當結果變為就緒callback被應用到它,這是除非調用失敗,在這種情況下,error_callback 被應用。
如果指定了error_callback,那么它應該是一個可接受的參數。如果目標函數失敗,則使用異常實例調用error_callback。
回調應該立即完成,否則處理結果的線程將被阻塞。
-
imap
(func,iterable [,chunksize ] ) -
一個懶惰的版本
map()
。所述CHUNKSIZE參數是與由所使用的一個
map()
方法。對於使用了一個較大的值很長iterables CHUNKSIZE可以使作業完成的太多比使用的默認值加快1
。此外,如果CHUNKSIZE是
1
則next()
通過返回的迭代器的方法imap()
方法有一個可選的timeout參數:next(timeout)
將提高multiprocessing.TimeoutError
如果結果不能內退回timeout秒。
-
imap_unordered
(func,iterable [,chunksize ] ) -
同樣
imap()
,除了從返回的迭代結果的排序應該考慮隨心所欲。(只有當只有一個工作進程時,才是保證“正確”的順序。)
-
starmap
(func,iterable [,chunksize ] ) -
就像
map()
除了可迭代的元素被期望是被解包為參數的迭代。因此,一個可迭代的結果。
[(1,2), (3, 4)]
[func(1,2), func(3,4)]
3.3版本中的新功能
-
starmap_async
(func,iterable [,chunksize [,callback [,error_back ] ] ] ) -
的組合
starmap()
和map_async()
,超過迭代 迭代 iterables,並呼吁FUNC與解壓縮iterables。返回一個結果對象。3.3版本中的新功能
-
close
() -
防止將更多任務提交到池中。一旦所有任務完成,工作進程將退出。
-
terminate
() -
立即停止工作進程而不完成傑出的工作。當池對象被垃圾收集時
terminate()
會立即調用。
-
join
() -
等待工作進程退出。必須打電話
close()
或terminate()
在使用之前join()
。
3.3版新增功能:池對象現在支持上下文管理協議 - 請參閱 上下文管理器類型。 __enter__()
返回池對象,並__exit__()
調用terminate()
。
class multiprocessing.pool.
AsyncResult
Pool.apply_async()
和 返回的結果的類Pool.map_async()
。
-
get
([ timeout ] ) -
到達時返回結果。如果timeout不是
None
,並且結果沒有在超時秒內到達,那么multiprocessing.TimeoutError
會引發。如果遠程調用引發異常,那么該異常將被重新調整get()
。
-
wait
([ timeout ] ) -
等到結果可用或timeout秒數通過。
-
ready
() -
返回通話是否完成。
-
successful
() -
返回調用是否完成而不引發異常。
AssertionError
如果結果沒有准備好,會提高。
Pool
類表示一個工作進程池。它具有允許以幾種不同方式將任務卸載到工作進程的方法。
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/19 14:19 # @Author : Py.qi # @File : mult_Pool_1.py # @Software: PyCharm from multiprocessing import Pool import time def f(x): return x*x if __name__ == '__main__': with Pool(processes=4) as pool: #4個開始工作進程 result = pool.apply_async(f, (10,)) #異步進程處理 print(result) print(result.get(timeout=1)) #1秒超時,返回結果 print(pool.map(f, range(10))) #並行阻塞等待結果 it = pool.imap(f, range(10)) print(next(it)) # prints "0" print(next(it)) # prints "1" print(it.next(2)) # prints "4" unless your computer is *very* slow result = pool.apply_async(time.sleep, (10,)) print(result.get(timeout=1)) #引發異常multiprocessing.TimeoutError #output: <multiprocessing.pool.ApplyResult object at 0x0000008BC4E76278> 100 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] 0 1 4 Traceback (most recent call last): File "Z:/python_project/day20/mult_Pool_1.py", line 28, in <module> print(result.get(timeout=1)) #引發異常multiprocessing.TimeoutError File "Z:\Program Files\Python35\lib\multiprocessing\pool.py", line 640, in get raise TimeoutError multiprocessing.context.TimeoutError
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/18 15:28 # @Author : Py.qi # @File : mult_Pool.py # @Software: PyCharm from multiprocessing import Pool, TimeoutError import time import os def f(x): return x*x if __name__ == '__main__': #啟動4個工作進程作為進程池 with Pool(processes=4) as pool: #返回函數參數運行結果列表 print(pool.map(f, range(10))) #在進程池中以任意順序打印相同的數字 for i in pool.imap_unordered(f, range(10)): print(i,end=' ') #異步評估 res = pool.apply_async(f,(20,)) #在進程池中只有一個進程運行 print('\n',res.get(timeout=1)) #打印結果,超時為1秒 #打印該進程的PID res = pool.apply_async(os.getpid,()) #在進程池中只有一個進程運行 print(res.get(timeout=1)) #打印進程PID #打印4個進程的PID multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)] print([res.get(timeout=1) for res in multiple_results]) #進程等待10秒,獲取數據超時為1秒,將輸出異常 res = pool.apply_async(time.sleep, (10,)) try: print(res.get(timeout=1)) except TimeoutError: print("We lacked patience and got a multiprocessing.TimeoutError") #output: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] 0 1 4 9 16 25 36 49 64 81 400 2164 [8152, 2032, 2164, 8152] We lacked patience and got a multiprocessing.TimeoutError
請注意:池的方法只能右創建它的進程使用,包中的功能要求__main__,在交互解釋器中將不起作用。
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/18 16:14 # @Author : Py.qi # @File : sync_apply.py # @Software: PyCharm from multiprocessing import Pool,freeze_support import time def foo(n): time.sleep(1) print(n*n) def back(arg): print('exec done:',arg) if __name__ == '__main__': freeze_support() #windows下新啟動進程需要先運行此函數 pool = Pool(2) #創建進程池,同時2個進程運行 for i in range(10): #pool.apply(func=foo, args=(i,)) #創建同步進程 #創建異步進程,傳遞函數和參數,在函數執行完后執行callback,並將函數foo的結構返回給callback pool.apply_async(func=foo,args=(i,),callback=back) pool.close() #此處必須是先關閉進程再join pool.join() print('end')