python 多進程


python3.6

多進程

多進程

1 多進程

 

1.1 linux/unix/win 啟動方式對比

在不同平台上系統使用的多進程機制是不一樣的,所以在python實現中有三種不同的開啟多進程的方式

1.1.1 spawn win的默認方法,支持unix/win

父進程開啟一個新的python解釋程序 子進程只獲得足夠運行run()方法的資源 父進程的文件描述符和句柄不被繼承 此方式的速度在三種方式中最慢

  1. run() 該方法是target的參數,參數是一個可調用對象即可

    經測試,當run調用的是function時,資源仍無法保留,只有調用一個可調用對象且初始化方法(init)除self外有其他參數才行

    def hello():
        pass
    class Hello():
        def __init__(self,name):
    	print(name)
    hello.__init__()  # 不需要傳參,所以資源無法保留
    hello_cla = Hello('name')
    hello_cla.__init__('name')  # 有傳參,資源才能保留
    

    實例:

    from multiprocessing import Process, Queue
    import multiprocessing as mp
    import os
    
    def hello_procs(name):
        print("這是{}{}進程".format(name,os.getpid()))
        print("父進程ID%s"%os.getppid())
        try:
    	print(sourceA)
    	print("調用成功,因為未使用spawn方式")
        except Exception as e:
    	print("調用失敗,因為該資源不是必須資源")
    	print(e)
    
    
    class Sou():
        def __init__(self,soua):
    	self.soua = soua
    
        def show_soua(self):
    	print(self.soua)
        def __call__(self):
    	self.show_soua()
    
    if __name__ == '__main__':
        sourceA = "AA"
        sourceB = "BB"
        # 使用forkserver在資源繼承方面與spawn表現一致
        mp.set_start_method('spawn')
        print("當前進程的進程id是%s"%os.getpid())
        #p = Process(target=hello_procs, args=('第一個進程',))
        a = Sou(sourceA)
        p = Process(target=a)
        """
        上面的是不帶參數的傳遞方式,如需要給可調用函數傳遞參數,那么需要修改為__call__(self,參數)
        再接下來就是修改Process參數為(target=a,args=('參數',))
        """
        print('開始進入子進程')
        p.start()
        p.join()
        print("進程結束")
    

1.1.2 fork unix的默認方法,僅支持unix

使用unix的fork()[os.fork()]來創建一個當前解釋器的子進程 子進程獲得父進程全部的資源 此方式的安全問題不好控制

1.1.3 forkserver 當平台支持unix的管道文件時該方法可用

在使用這種方式時,開啟多進程會開啟一個額外的服務進程 當需要一個子進程時父進程去請求服務進程並得到一個子進程 由於服務進程是單線程的,所以該方式是線程安全的

1.1.4 啟動子進程

 
  1. 創建進程對象 表示在單獨進程中運行的活動

    Process(group=None,target=None,name=None,args=(),kwargs={},*,daemon=None) group一直為None即可,只是為了和threading.Thread兼容 target 是run()要調用的對象,需要是可執行的 name 無實意,名字 args 可調用對象的位置參數 kwargs 可調用對象的關鍵字參數 daemon True/False/None 與過程繼承有關

  2. run() 表示進程活動的方法
  3. start() 啟動進程的活動
  4. isalive() 檢測子進程是否存活,只能檢測子進程
  5. join([timeout]) 阻塞調用該方法的進程

    None 阻塞,直到進程完畢 正數 阻塞timeout秒

  6. daemon 標識該進程是否為守護進程,True是,False不是,None從上層繼承

    主進程不是守護進程,所以只要不明確指定為True,那么創建的所有進程都不是守護進程

    from multiprocessing import Process as ps
    import sys
    
    def hello(*,a=None):
        if a == None:
    	pass
        else:
    	print(a)
    
    
    def not_sh():
        global pish,pfsh,ex
        print("非守護進程%s"%ex)
        pish = ps(target=is_sh, args=(1,), daemon=True)
        pfsh = ps(target=is_sh, args=('非守護進程',)) #daemon為None從上層繼承屬性
        pish.start()
        pfsh.start()
        for i in range(1): #當該值為1時可以看到守護進程在打印出消息前就退出了
    	print("第二個主進程")
    
        ex="有變化"
    
    def is_sh(jc_type):
        if jc_type == 1:
    	print("守護進程")
    	for i in range(1000):
    	    print("守護進程")
        else:
           print()
           print("不是守護進程")
           for i in range(100):
    	   print("非守護進程")
    
    
    
    if __name__ == '__main__':
        p = ps(target=hello, name='hello', args=(), kwargs={'a':'A'})
        print("進程是否存活%s"%p.is_alive())
        p.start()
        print("守護進程daemon? %s"%p.daemon)
        print("進程是否存活%s"%p.is_alive())
    
        # 創建一個非守護進程
        pish, pfsh =None,None
        ex = "原始"
    
        pnsh = ps(target=not_sh)
    
        pnsh.start()
        pnsh.join()
        print(ex)
    

1.1.5 選擇啟動的方法

import multiprocessing
multiprocessing.set_start_method('spawn') #傳入方式的名字
# 該方法在程序中至多使用一次

1.1.6 其他內容

 
  1. 使用global可以將數據發送到子進程,但是子進程對數據的修改不會反饋到父進程

1.2 進程通信

 

1.2.1 隊列 Queue

 
  1. Queue([maxsize]) 創建並可以設置最大值
  2. qsize() 返回隊列大致大小(不准確是因為並發) 在MAC上回引發異常NotImplementedError
  3. empty() 是否為空,不准確
  4. full() 隊列是否已滿,不准確
  5. put(obj[,block[,timeout]])
  6. putnowait(obj) 相當於put(obj,False)
  7. get()
  8. getnowite()
  9. close() 沒有返回值,關閉隊列,后台線程將數據一次性刷新到管道當關閉后仍去操作會拋出異常OSError: handle is closed
  10. jointhread() 只能在close()后調用,他阻塞直到后台線程退出,確保數據刷新到管道
  11. canceljointhread() 立即關閉隊列,不等待后台線程將數據刷新到管道

1.2.2 TODO 管道 Pipes

1.2.3 實例

"""進程間通信"""
from multiprocessing import Process, Queue, Pipe
import os, random

def write(m):
    print('進程:%s'%os.getpid())
    m.put('數據A')
    """
    put(obj[,block[,timeout]])
    將值放入隊列
    當block為True(默認值),且timeout為None(默認值)時,不會拋出異常,會一直等到可以入隊時將值入隊
    當timeout為正值時,等待timeout秒,超時則拋出queue.Full異常
    當block為False時,一旦無法入隊立即拋出異常
    """

def read(m):    
    print('進程:%s'%os.getpid())
    try:
	print(m.qsize())
    except Exception as e:
	print("在MAC上會引發異常")
    finally:
	value = m.get()

    """
    get([block[,timeout]])
    獲取一個值后刪除
    當block為True(默認值)且timeout為None(默認值),那么只有當隊列中有內容時獲取值
    timeout為正數時,當隊列中無值時阻塞timeout秒,而后仍無值則拋出queue.Empty異常
    block為False時一旦無值立即拋出異常
    """
    print(value)

if __name__ == '__main__':
    q = Queue()

    pw = Process(target=write, name='寫進程', args=(q,))
    print('開始寫入數據 %s'%pw.name, end=' :  ')
    pw.start()

    pr = Process(target=read, name='讀進程', args=(q,))
    print('開始讀取數據 %s'%pr.name, end=' :  ')
    pr.start()

# TODO Pipe 通過管道傳遞消息

1.3 進程同步

 

1.3.1 鎖Lock

一旦進程,線程獲得了鎖,那么隨后的任何進程,線程在獲取鎖時將阻塞

  1. acquire(block=True, timeout=None)

    當block為True,方法調用將阻塞,直到解鎖 當block為True時,timeout為正數,那么最多只能被阻塞timeout秒,當timeout為負數,阻塞時長為0,當為None一直阻塞

  2. release() 釋放鎖

    一把鎖可以被任意對象釋放,未必是上鎖的對象來解鎖

1.3.2 實例

from multiprocessing import Process, Lock

def show_lock(l):
    #l.release() 在try_get_lock中上的鎖可以在這里解開
    l.acquire(True,-1)
    # 超時時長為負數,即使被鎖定也會執行
    print("函數正常執行")

def try_get_lock(l):
    l.acquire()
    print("獲得了鎖")
#    l.release()

if __name__ == '__main__':
    l = Lock()
    #l.acquire(True)
    pg = Process(target=try_get_lock,args=(l,))
    pg.start()
    ps = Process(target=show_lock,args=(l,))
    ps.start()

1.4 進程池 Pool

 

1.4.1 創建Pool([processes[,initalizer[,initargs[,maxtasksperchild[,context]]]])

processes 進程的數量 initializer 如果不為None,則在每個工作進程啟動時調用initializer(*initargs) maxtasksperchild context 工作進程的上下文

該類實現了上下文管理

  1. apply(func[,args[,kwds]])

    使用args,kwds調用func,直到結果完成

  2. applyasync(func[,args[,kwds[,callback[,errorcallback]]]])

    返回一個結果對象 返回的對象是AsyncResult 當指定callback(一個接受單參數的可調用對象)時,完成時會調用callback,調用失敗則調用errorcallback 回調應該立即完成,否則線程將會阻塞

  3. map(func,iterable[,chunksize])

    與內置函數map()相同,它阻塞直到map完成

  4. mapasync(func,iterable[,chunksize[,callback[,errorcallback]]])

    返回結果的map()

  5. imap(func,iterable[,chunkszie])

    惰性map() chunkszie參數與map()方法的參數相同

  6. starmap(func,iterable[,chunksize]) iterable必須為可迭代對象

    需要注意'abc'也是可迭代對象,一旦加上(),('abc')更不行 func,('abc',) 會給func傳入三個參數,而不是一個整體 正確做法 傳入(('abc',),),同理,傳入其他可迭代內容也可以這樣做

  7. starmapasync(func,iterable[,chuunksize[,callback[,errorback]]])

    將iterable拆分后調用func並返回一個結果對象

  8. close()

    一旦任務完成,退出進程

  9. terminate()

    立即停止進程並退出

  10. join()

    等待進程結束,在此之前必須調用close或terminate

1.4.2 AsyncResult applyasync,mapasync與starmapasync的返回對象

 
  1. get([timeout]) 返回結果,並要求在timeout秒內到達,當timeout不為None時,N秒內未到達拋出異常TimeoutError
  2. wait([timeout]) 等待結果或直到N秒超時
  3. ready() 判斷返回是否就緒
  4. successful() 返回調用是否完成且無異常

1.4.3 實例

from multiprocessing import Pool, TimeoutError, Process
import time
import os

def proc_pool(name):
    print("asd")
    for i in range(5):
	print(str(i)+' : %s')
    #return "返回的結果值","有兩個會怎樣?"  不要返回一個以上的值,會導致map調用產生歧義(使用map(func,[1,2])時會返回[返回值1,返回值2]而不是[(返回值1,返回值2),(返回值1,返回值2)]) 當需要返回兩個值要顯式的返回一個元組
    #return ("返回的結果值","第二個值")
    return "返回值"

def proc_err(name):
    raise Exception

def proc_mm(name):
    print('該函數被調用了%s%s'%(name,type(name)))
    return name

if __name__ == '__main__':
    print("開始啟動線程池")
    p = Pool(4)

    for i in range(5):
	p.apply_async(proc_pool, args=('cc',))

    #p.map(proc_pool,['cc','dd'])
    def callback(name):
	print("回調函數%s")

    def err_callback(err):
	try:
	    print("yc")
	except Exception as e:
	    print('發生異常')
	finally:
	    print("ww")

    mapr = p.map_async(proc_err, 'ee', 3, callback, err_callback)
    #mapr.get() 獲得可調用對象的返回值
    #print("返回的結果值%s"%mapr.get())

    #mmap = p.starmap(proc_mm,[('abcd'),('a')])
    mmap = p.starmap(proc_mm,((('abc',),),))
    mmaps = p.starmap_async(proc_mm,((('abc',),),))
    list(mmap)
    print(mmaps.get())
    p.close()
    p.join()

1.5 資源共享

 

1.5.1 使用Array,Value作為存儲空間來保存需要共享的資源

Value(typecodeortype, *args, lock=True) Array(typecodeortype, sizeorinitializer, *, lock=True)

  1. Array,Value的共通之處

    在創建存儲空間時,在lock的參數選取上,默認情況是自己創建一個資源鎖 但是也可以選擇使用一個已經存在的鎖,當lock被傳入一個已存在的鎖時受該鎖影響 當設置為False時,資源不被鎖保護,導致線程不安全

    tyoecodeortype 都是array模塊使用的類型代碼 array 表示基本類型的數組,有:字符,整數,浮點數

  2. 區別

    Array存儲一個隊列,Value存儲一個值 Array的sizeofinitializer就是保存的數組,同時該數組的長度也是Array的長度

1.5.2 實例

"""
進程共享內容
使用Value,Array使內容共享
"""

from multiprocessing import Process, Value, Array, Lock

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
	a[i] = -a[i]

def fun(l, strr):
    # 獲得鎖,當被鎖住,等待最多3秒,繼續執行
    l.acquire(True,3)
    try:
	print(num.value)
	print(strr.value)
	print(chr(strr.value))
    except Exception as ex:
	print(ex)
    finally:
	print('完成')

if __name__ == '__main__':
    l = Lock()
    num = Value('d', 0.0)
    arr = Array('i', range(10))
    lisi = [1,2,3]
    arrs = Array('i',lisi)
    # 因為python中沒有char類型,所以在這里只能轉換為數字,最后在轉回來
    strr = Value('b',ord('c'))
    p = Process(target=f, args=(num, arrs))
    pl = Process(target=fun, args=(l,strr))
    # 上鎖
    l.acquire()
    p.start()
    p.join()
    pl.start()
    pl.join()
    print(num.value)
    print(arrs[:])

1.5.3 使用服務進程server process

 
  1. 使用Manager()會返回一個管理對象

    該管理對象支持的類型更廣泛,有: list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value, Array

    該類實現了上下文管理

  2. Manager的兩個子類,Manager()返回的就是SyncManager
    1. BaseManager([adress[,authkey]])

      adress是管理器進程偵聽新鏈接的地址,None為隨機選一個 authkey是認證密匙,None為使用currentprocess().authkey,否則使用authkey,必須為字符串 currentprocess() 返回當前Process對象 authkey 進程的認證密鑰(字節字符串) 當初始化multiprocessing時,使用os.urandom()為主進程分配一個隨機字符串 當創建Process對象時,它將繼承其父進程的認證密鑰 但可以通過將authkey設置為另一個字節字符串來更改。

      1. start([initializer[,initargs]])

        啟動子過程以啟動管理器

      2. getserver()

        返回Server對象,他表示在manger控制下的實際服務器

      3. connect()

        本地管理器對象鏈接到遠程管理器進程

      4. shutdown()

        停止manager進程,僅當啟動使用start()時可用

      5. register(typeid[, callable[, proxytype[, exposed[, methodtotypeid[, createmethod]]]]])

        向Manager注冊類型或可調用的類方法

        typeid 用於標識特定類型的共享對象的類型標識符,必須是字符串 callable 用於typeid類型的可調用選項, proxytype 是BaseProxy的子類,用於創建typeid的共享對象代理,None,自動創建 exposed 用於指定代理類型所使用的方法 methodtotypeid 返回代理類型的公開方法 createmethod 確定是否使用typeid創建方法,默認為True

    2. SyncManager

      BaseManager 主要用來創建自定義的Manager

      1. Queue([maxsize]) 創建queue.Queue對象返回其代理

        在進程通信中展示了部分Queue隊列的使用方法

      2. Array(typecode,sequence) 創建一個數組並返回其代理

        在進程共享中展示了部分Array的用法

      3. Value(typecode,value) 創建一個值並返回其代理

        在進程共享中展示了部分Value的用法

      4. dict([dict]) 創建一個dict,並返回其代理
      5. list([list]) 創建一個list,並返回其代理
      6. Lock() 創建一個threading.Lock對象並返回其代理
    3. 實例
      from multiprocessing import Process, Manager
      def f(d, l, q, a, v, lo):
          d[1] = '1'
          d['2'] = 2
          d[0.25] = None
          q.put(100)
          lo.acquire(True,3)
          for i in range(len(a)):
      	a[i]=1
          v,value = 100
          l.reverse()
      
      
      if __name__ == '__main__':
          with Manager() as manager:
      	d = manager.dict()
      	l = manager.list(range(10))
      	q = manager.Queue(10)
      	a = manager.Array('i',[1,2,3])
      	v = manager.Value('i',3)
      	lo = manager.Lock()
      	lo.acquire(True)
      	p = Process(target=f, args=(d, l, q, a, v, lo))
      
      	p.start()
      	p.join()
      	print(d)
      	print(l)
      	print("********")
      	print(q.get())
      	print(a[:])
      	print(v.value)
      

Author: vz liū

Created: 2017-04-08 Sat 22:27

Emacs 25.1.1 (Org mode 8.2.10)

Validate


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM