Python並發之多進程


一,進程的理論基礎

  一個應用程序,歸根結底是一堆代碼,是靜態的,而進程才是執行中的程序,在一個程序運行的時候會有多個進程並發執行。

  進程和線程的區別:

  •     進程是系統資源分配的基本單位。
  •     一個進程內可以包含多個線程,屬於一對多的關系,進程內的資源,被其內的線程共享
  •     線程是進程運行的最小單位,如果說進程是完成一個功能,那么其線程就是完成這個功能的基本單位
  •     進程間資源不共享,多進程切換資源開銷,難度大,同一進程內的線程資源共享,多線程切換資源開銷,難度小

  進程與線程的共同點:

    都是為了提高程序運行效率,都有執行的優先權

 

二,Python的多進程( multiprocessing模塊)

創建一個進程(和創建線程類似)

方法一:創建Process對象,通過對象調用start()方法啟動進程

from multiprocessing import Process

def foo(name):
    print('hello,%s'%name)

if __name__ == '__main__':
    p1=Process(target=foo,args=('world',))
    p2 = Process(target=foo, args=('China',))
    p1.start()
    p2.start()
    print('=====主進程=====')
    
    # == == =主進程 == == =
    # hello, world
    # hello, China
    #主進程和子進程並發執行  

 注意:Process對象只能在在 if __name__ == '__main__':下創建,不然會報錯。

方法二:自定義一個類繼承Process類,並重寫run()方法,將執行代碼放在其內

from multiprocessing import Process

class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name = name
    def run(self):
        print('hello,%s'%self.name)

if __name__ == '__main__':
    myprocess1 = MyProcess('world')
    myprocess2 = MyProcess('world')
    myprocess1.start()
    myprocess2.start()

 

 

 Process內置方法

實例方法:
p.start():啟動進程,並調用該子進程中的p.run() 

p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法  

p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖

p.is_alive():如果p仍然運行,返回True

p.join([timeout]):主線程等待p終止。timeout是可選的超時時間

 

Process屬性

p.daemon:默認值為False,如果設為True,代表p為后台運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True后,p不能創建自己的新進程,必須在p.start()之前設置

p.name:進程的名稱

p.pid:進程的pid

p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可) 

守護進程

類似於守護線程,只不過守護線程是對象的一個方法,而守護進程封裝成對象的屬性。

from multiprocessing import Process
import time
class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name = name
    def run(self):
        time.sleep(3)
        print('hello,%s'%self.name)

if __name__ == '__main__':
    myprocess1=MyProcess('world')
    myprocess1.daemon = True
    myprocess1.start()
    print('結束')

#不會輸出‘hello world’,因為設置為守護進程,主進程不會等待

 

也可以使用join方法,使主進程等待

from multiprocessing import Process
import time
class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name = name
    def run(self):
        time.sleep(3)
        print('hello,%s'%self.name)

if __name__ == '__main__':
    myprocess1=MyProcess('world')
    myprocess1.daemon = True
    myprocess1.start()
    myprocess1.join()  #程序阻塞
    print('結束')
join()

 

進程同步和鎖

  進程雖然不像線程共享資源,但是這並不意味着進程間不需要加鎖,比如不同進程會共享同一個終端屏幕),或者操作同一個文件,數據庫,那么數據安全還是很有必要的,因此我們可以加鎖,

from multiprocessing import Process,Lock
import time
def a_print(l): #需要傳入對象,因為信息不共享
    l.acquire()
    print('我要打印信息')
    time.sleep(1)
    print('我打印完了')
    l.release()

if __name__ == '__main__':
    l = Lock()
    for i in range(20):
        p = Process(target=a_print,args=(l,))
        p.start()

 

信號量(Semaphore)

能夠並發執行的進程數,超出的進程阻塞,直到有進程運行完成。

  Semaphore管理一個內置的計數器,
  每當調用acquire()時內置計數器-1;
  調用release() 時內置計數器+1;
  計數器不能小於0;當計數器為0時,acquire()將阻塞進程直到其他進程調用release()。

from multiprocessing import Process,Queue,Semaphore
import time,random

def seat(s,n):
    s.acquire()
    print('學生%d坐下了'%n)
    time.sleep(random.randint(1,2))
    s.release()

if __name__ == '__main__':
    s = Semaphore(5)
    for i in range(20):
        p = Process(target=seat,args=(s,i))
        p.start()

    print('-----主進程-------')

 

注意:其實信號量和鎖類似,只是限制進程運行某個代碼塊的數量(鎖為1個),並不是能限制並發的進程,如上述代碼,一次性還是創建了20個進程

 

事件(Event)

from multiprocessing import Process,Event
import time, random
def eating(event):
    event.wait()
    print('去吃飯的路上...')

def makeing(event):
    print('做飯中')
    time.sleep(random.randint(1,2))
    print('做好了,快來...')
    event.set()

if __name__ == '__main__':
    event=Event()
    t1 = Process(target=eating,args=(event,))
    t2 = Process(target=makeing,args=(event,))
    t1.start()
    t2.start()
    # 做飯中
    # 做好了,快來...
    # 去吃飯的路上...
和線程事件幾乎一致

 

 

進程隊列(Queue)

進程隊列是進程通訊的方式之一。使用multiprocessing 下的Queue

from multiprocessing import Process,Queue
import time
def func1(queue):
    while True:
        info=queue.get()
        if info == None:
            return 
        print(info)

def func2(queue):
    for i in range(10):
        time.sleep(1)
        queue.put('is %d'%i)
    queue.put(None) #結束的標志
if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=func1,args=(q,))
    p2 = Process(target=func2, args=(q,))

    p1.start()
    p2.start()

 

Queue類的方法,源碼如下:

class Queue(object):
    def __init__(self, maxsize=-1):  #可以傳參設置隊列最大容量
        self._maxsize = maxsize

    def qsize(self): #返回當前時刻隊列中的個數
        return 0

    def empty(self): #是否為空
        return False

    def full(self):    是否滿了
        return False

    def put(self, obj, block=True, timeout=None): #放值,blocked和timeout。如果blocked為True(默認值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常
        pass

    def put_nowait(self, obj):  #=put(False)
        pass

    def get(self, block=True, timeout=None): 獲取值,get方法有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,那么在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常.
        pass

    def get_nowait(self): # = get(False)
        pass

    def close(self): #將隊列關閉
        pass

    def join_thread(self): #略,幾乎不用
        pass

    def cancel_join_thread(self):
        pass
進程隊列源碼注釋

 

進程池

  進程的消耗是很大的,因此我們不能無節制的開啟新進程,因此我們可以通過維護一個進程池來控制進程的數量。這就不同於信號量,進程池可以從源頭控制進程數量。在Python中可以通過如下方法使用

同步調用

from multiprocessing import Pool
import time, random, os
def func(n):
    pid = os.getpid()
    print('進程%s正在處理第%d個任務'%(pid,n),'時間%s'%time.strftime('%H-%M-%S'))
    time.sleep(2)
    res = '處理%s'%random.choice(['成功','失敗'])
    return res

if __name__ == '__main__':
    p = Pool(4) #創建4個進程,
    li = []
    for i in range(10):
        res = p.apply(func,args=(i,)) 交給進程池處理,處理完成才返回值,會阻塞,即使池內還有空余進程,相當於順序執行
        li.append(res)

   
    for i in li:
        print(i)

#進程1916正在處理第0個任務 時間21-02-53
#進程1240正在處理第1個任務 時間21-02-55
#進程3484正在處理第2個任務 時間21-02-57
#進程7512正在處理第3個任務 時間21-02-59
#進程1916正在處理第4個任務 時間21-03-01
#進程1240正在處理第5個任務 時間21-03-03
#進程3484正在處理第6個任務 時間21-03-05
#進程7512正在處理第7個任務 時間21-03-07
#進程1916正在處理第8個任務 時間21-03-09
#進程1240正在處理第9個任務 時間21-03-11

 

從結果可以發現兩點:

  1. 不是並發處理
  2. 一直都只有四個進程,串行執行

 

因此進程池提供了異步處理的方式

from multiprocessing import Pool
import time, random, os
def func(n):
    pid = os.getpid()
    print('進程%s正在處理第%d個任務'%(pid,n),'時間%s'%time.strftime('%H-%M-%S'))
    time.sleep(2)
    res = '處理%s'%random.choice(['成功','失敗'])
    return res

if __name__ == '__main__':
    p = Pool(4)
    li = []
    for i in range(10):
        res = p.apply_async(func,args=(i,)) 結果不會立刻返回,遇到阻塞,開啟下一個進程,在這,相當於幾乎同時出現四個打印結果(一個線程處理一個任務,處理完下個任務才能進來)
        li.append(res)

    p.close() #join之前需要關閉進程池
    p.join()  #因為異步,所以需要等待池內進程工作結束再繼續
    for i in li:
        print(i.get()) #i是一個對象,通過get方法獲取返回值,而同步則沒有該方法

 關於回調函數

from multiprocessing import Pool
import time, random, os
def func(n):
    pid = os.getpid()
    print('進程%s正在處理第%d個任務'%(pid,n),'時間%s'%time.strftime('%H-%M-%S'))
    time.sleep(2)
    res = '處理%s'%random.choice(['成功','失敗'])
    return res

def foo(info):
    print(info) #傳入值為進程執行結果

if __name__ == '__main__':
    p = Pool(4)
    li = []
    for i in range(10):
        res = p.apply_async(func,args=(i,),callback = foo) callback()回調函數會在進程執行完之后調用(主進程調用) 
        li.append(res)

    p.close() 
    p.join()  
    for i in li:
        print(i.get()) 
有回調函數

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM