python3.6
多進程
多進程
Table of Contents
1 多進程
1.1 linux/unix/win 啟動方式對比
在不同平台上系統使用的多進程機制是不一樣的,所以在python實現中有三種不同的開啟多進程的方式
1.1.1 spawn win的默認方法,支持unix/win
父進程開啟一個新的python解釋程序 子進程只獲得足夠運行run()方法的資源 父進程的文件描述符和句柄不被繼承 此方式的速度在三種方式中最慢
- run() 該方法是target的參數,參數是一個可調用對象即可
經測試,當run調用的是function時,資源仍無法保留,只有調用一個可調用對象且初始化方法(init)除self外有其他參數才行
def hello(): pass class Hello(): def __init__(self,name): print(name) hello.__init__() # 不需要傳參,所以資源無法保留 hello_cla = Hello('name') hello_cla.__init__('name') # 有傳參,資源才能保留
實例:
from multiprocessing import Process, Queue import multiprocessing as mp import os def hello_procs(name): print("這是{}{}進程".format(name,os.getpid())) print("父進程ID%s"%os.getppid()) try: print(sourceA) print("調用成功,因為未使用spawn方式") except Exception as e: print("調用失敗,因為該資源不是必須資源") print(e) class Sou(): def __init__(self,soua): self.soua = soua def show_soua(self): print(self.soua) def __call__(self): self.show_soua() if __name__ == '__main__': sourceA = "AA" sourceB = "BB" # 使用forkserver在資源繼承方面與spawn表現一致 mp.set_start_method('spawn') print("當前進程的進程id是%s"%os.getpid()) #p = Process(target=hello_procs, args=('第一個進程',)) a = Sou(sourceA) p = Process(target=a) """ 上面的是不帶參數的傳遞方式,如需要給可調用函數傳遞參數,那么需要修改為__call__(self,參數) 再接下來就是修改Process參數為(target=a,args=('參數',)) """ print('開始進入子進程') p.start() p.join() print("進程結束")
1.1.2 fork unix的默認方法,僅支持unix
使用unix的fork()[os.fork()]來創建一個當前解釋器的子進程 子進程獲得父進程全部的資源 此方式的安全問題不好控制
1.1.3 forkserver 當平台支持unix的管道文件時該方法可用
在使用這種方式時,開啟多進程會開啟一個額外的服務進程 當需要一個子進程時父進程去請求服務進程並得到一個子進程 由於服務進程是單線程的,所以該方式是線程安全的
1.1.4 啟動子進程
- 創建進程對象 表示在單獨進程中運行的活動
Process(group=None,target=None,name=None,args=(),kwargs={},*,daemon=None) group一直為None即可,只是為了和threading.Thread兼容 target 是run()要調用的對象,需要是可執行的 name 無實意,名字 args 可調用對象的位置參數 kwargs 可調用對象的關鍵字參數 daemon True/False/None 與過程繼承有關
- run() 表示進程活動的方法
- start() 啟動進程的活動
- isalive() 檢測子進程是否存活,只能檢測子進程
- join([timeout]) 阻塞調用該方法的進程
None 阻塞,直到進程完畢 正數 阻塞timeout秒
- daemon 標識該進程是否為守護進程,True是,False不是,None從上層繼承
主進程不是守護進程,所以只要不明確指定為True,那么創建的所有進程都不是守護進程
from multiprocessing import Process as ps import sys def hello(*,a=None): if a == None: pass else: print(a) def not_sh(): global pish,pfsh,ex print("非守護進程%s"%ex) pish = ps(target=is_sh, args=(1,), daemon=True) pfsh = ps(target=is_sh, args=('非守護進程',)) #daemon為None從上層繼承屬性 pish.start() pfsh.start() for i in range(1): #當該值為1時可以看到守護進程在打印出消息前就退出了 print("第二個主進程") ex="有變化" def is_sh(jc_type): if jc_type == 1: print("守護進程") for i in range(1000): print("守護進程") else: print() print("不是守護進程") for i in range(100): print("非守護進程") if __name__ == '__main__': p = ps(target=hello, name='hello', args=(), kwargs={'a':'A'}) print("進程是否存活%s"%p.is_alive()) p.start() print("守護進程daemon? %s"%p.daemon) print("進程是否存活%s"%p.is_alive()) # 創建一個非守護進程 pish, pfsh =None,None ex = "原始" pnsh = ps(target=not_sh) pnsh.start() pnsh.join() print(ex)
1.1.5 選擇啟動的方法
import multiprocessing multiprocessing.set_start_method('spawn') #傳入方式的名字 # 該方法在程序中至多使用一次
1.2 進程通信
1.2.1 隊列 Queue
- Queue([maxsize]) 創建並可以設置最大值
- qsize() 返回隊列大致大小(不准確是因為並發) 在MAC上回引發異常NotImplementedError
- empty() 是否為空,不准確
- full() 隊列是否已滿,不准確
- put(obj[,block[,timeout]])
- putnowait(obj) 相當於put(obj,False)
- get()
- getnowite()
- close() 沒有返回值,關閉隊列,后台線程將數據一次性刷新到管道當關閉后仍去操作會拋出異常OSError: handle is closed
- jointhread() 只能在close()后調用,他阻塞直到后台線程退出,確保數據刷新到管道
- canceljointhread() 立即關閉隊列,不等待后台線程將數據刷新到管道
1.2.2 TODO 管道 Pipes
1.2.3 實例
"""進程間通信""" from multiprocessing import Process, Queue, Pipe import os, random def write(m): print('進程:%s'%os.getpid()) m.put('數據A') """ put(obj[,block[,timeout]]) 將值放入隊列 當block為True(默認值),且timeout為None(默認值)時,不會拋出異常,會一直等到可以入隊時將值入隊 當timeout為正值時,等待timeout秒,超時則拋出queue.Full異常 當block為False時,一旦無法入隊立即拋出異常 """ def read(m): print('進程:%s'%os.getpid()) try: print(m.qsize()) except Exception as e: print("在MAC上會引發異常") finally: value = m.get() """ get([block[,timeout]]) 獲取一個值后刪除 當block為True(默認值)且timeout為None(默認值),那么只有當隊列中有內容時獲取值 timeout為正數時,當隊列中無值時阻塞timeout秒,而后仍無值則拋出queue.Empty異常 block為False時一旦無值立即拋出異常 """ print(value) if __name__ == '__main__': q = Queue() pw = Process(target=write, name='寫進程', args=(q,)) print('開始寫入數據 %s'%pw.name, end=' : ') pw.start() pr = Process(target=read, name='讀進程', args=(q,)) print('開始讀取數據 %s'%pr.name, end=' : ') pr.start() # TODO Pipe 通過管道傳遞消息
1.3 進程同步
1.3.1 鎖Lock
一旦進程,線程獲得了鎖,那么隨后的任何進程,線程在獲取鎖時將阻塞
1.3.2 實例
from multiprocessing import Process, Lock def show_lock(l): #l.release() 在try_get_lock中上的鎖可以在這里解開 l.acquire(True,-1) # 超時時長為負數,即使被鎖定也會執行 print("函數正常執行") def try_get_lock(l): l.acquire() print("獲得了鎖") # l.release() if __name__ == '__main__': l = Lock() #l.acquire(True) pg = Process(target=try_get_lock,args=(l,)) pg.start() ps = Process(target=show_lock,args=(l,)) ps.start()
1.4 進程池 Pool
1.4.1 創建Pool([processes[,initalizer[,initargs[,maxtasksperchild[,context]]]])
processes 進程的數量 initializer 如果不為None,則在每個工作進程啟動時調用initializer(*initargs) maxtasksperchild context 工作進程的上下文
該類實現了上下文管理
- apply(func[,args[,kwds]])
使用args,kwds調用func,直到結果完成
- applyasync(func[,args[,kwds[,callback[,errorcallback]]]])
返回一個結果對象 返回的對象是AsyncResult 當指定callback(一個接受單參數的可調用對象)時,完成時會調用callback,調用失敗則調用errorcallback 回調應該立即完成,否則線程將會阻塞
- map(func,iterable[,chunksize])
與內置函數map()相同,它阻塞直到map完成
- mapasync(func,iterable[,chunksize[,callback[,errorcallback]]])
返回結果的map()
- imap(func,iterable[,chunkszie])
惰性map() chunkszie參數與map()方法的參數相同
- starmap(func,iterable[,chunksize]) iterable必須為可迭代對象
需要注意'abc'也是可迭代對象,一旦加上(),('abc')更不行 func,('abc',) 會給func傳入三個參數,而不是一個整體 正確做法 傳入(('abc',),),同理,傳入其他可迭代內容也可以這樣做
- starmapasync(func,iterable[,chuunksize[,callback[,errorback]]])
將iterable拆分后調用func並返回一個結果對象
- close()
一旦任務完成,退出進程
- terminate()
立即停止進程並退出
- join()
等待進程結束,在此之前必須調用close或terminate
1.4.2 AsyncResult applyasync,mapasync與starmapasync的返回對象
1.4.3 實例
from multiprocessing import Pool, TimeoutError, Process import time import os def proc_pool(name): print("asd") for i in range(5): print(str(i)+' : %s') #return "返回的結果值","有兩個會怎樣?" 不要返回一個以上的值,會導致map調用產生歧義(使用map(func,[1,2])時會返回[返回值1,返回值2]而不是[(返回值1,返回值2),(返回值1,返回值2)]) 當需要返回兩個值要顯式的返回一個元組 #return ("返回的結果值","第二個值") return "返回值" def proc_err(name): raise Exception def proc_mm(name): print('該函數被調用了%s%s'%(name,type(name))) return name if __name__ == '__main__': print("開始啟動線程池") p = Pool(4) for i in range(5): p.apply_async(proc_pool, args=('cc',)) #p.map(proc_pool,['cc','dd']) def callback(name): print("回調函數%s") def err_callback(err): try: print("yc") except Exception as e: print('發生異常') finally: print("ww") mapr = p.map_async(proc_err, 'ee', 3, callback, err_callback) #mapr.get() 獲得可調用對象的返回值 #print("返回的結果值%s"%mapr.get()) #mmap = p.starmap(proc_mm,[('abcd'),('a')]) mmap = p.starmap(proc_mm,((('abc',),),)) mmaps = p.starmap_async(proc_mm,((('abc',),),)) list(mmap) print(mmaps.get()) p.close() p.join()
1.5 資源共享
1.5.1 使用Array,Value作為存儲空間來保存需要共享的資源
Value(typecodeortype, *args, lock=True) Array(typecodeortype, sizeorinitializer, *, lock=True)
1.5.2 實例
""" 進程共享內容 使用Value,Array使內容共享 """ from multiprocessing import Process, Value, Array, Lock def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] def fun(l, strr): # 獲得鎖,當被鎖住,等待最多3秒,繼續執行 l.acquire(True,3) try: print(num.value) print(strr.value) print(chr(strr.value)) except Exception as ex: print(ex) finally: print('完成') if __name__ == '__main__': l = Lock() num = Value('d', 0.0) arr = Array('i', range(10)) lisi = [1,2,3] arrs = Array('i',lisi) # 因為python中沒有char類型,所以在這里只能轉換為數字,最后在轉回來 strr = Value('b',ord('c')) p = Process(target=f, args=(num, arrs)) pl = Process(target=fun, args=(l,strr)) # 上鎖 l.acquire() p.start() p.join() pl.start() pl.join() print(num.value) print(arrs[:])
1.5.3 使用服務進程server process
- 使用Manager()會返回一個管理對象
該管理對象支持的類型更廣泛,有: list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value, Array
該類實現了上下文管理
- Manager的兩個子類,Manager()返回的就是SyncManager
- BaseManager([adress[,authkey]])
adress是管理器進程偵聽新鏈接的地址,None為隨機選一個 authkey是認證密匙,None為使用currentprocess().authkey,否則使用authkey,必須為字符串 currentprocess() 返回當前Process對象 authkey 進程的認證密鑰(字節字符串) 當初始化multiprocessing時,使用os.urandom()為主進程分配一個隨機字符串 當創建Process對象時,它將繼承其父進程的認證密鑰 但可以通過將authkey設置為另一個字節字符串來更改。
- start([initializer[,initargs]])
啟動子過程以啟動管理器
- getserver()
返回Server對象,他表示在manger控制下的實際服務器
- connect()
本地管理器對象鏈接到遠程管理器進程
- shutdown()
停止manager進程,僅當啟動使用start()時可用
- register(typeid[, callable[, proxytype[, exposed[, methodtotypeid[, createmethod]]]]])
向Manager注冊類型或可調用的類方法
typeid 用於標識特定類型的共享對象的類型標識符,必須是字符串 callable 用於typeid類型的可調用選項, proxytype 是BaseProxy的子類,用於創建typeid的共享對象代理,None,自動創建 exposed 用於指定代理類型所使用的方法 methodtotypeid 返回代理類型的公開方法 createmethod 確定是否使用typeid創建方法,默認為True
- start([initializer[,initargs]])
- SyncManager
BaseManager 主要用來創建自定義的Manager
- 實例
from multiprocessing import Process, Manager def f(d, l, q, a, v, lo): d[1] = '1' d['2'] = 2 d[0.25] = None q.put(100) lo.acquire(True,3) for i in range(len(a)): a[i]=1 v,value = 100 l.reverse() if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) q = manager.Queue(10) a = manager.Array('i',[1,2,3]) v = manager.Value('i',3) lo = manager.Lock() lo.acquire(True) p = Process(target=f, args=(d, l, q, a, v, lo)) p.start() p.join() print(d) print(l) print("********") print(q.get()) print(a[:]) print(v.value)
- BaseManager([adress[,authkey]])