python多進程總結


 

概述

由於python中全局解釋器鎖(GIL)的存在,所以python多線程並不能有效利用CPU多核的性能(相當於單核並發)實現多線程多核並行,所以在對CPU密集型的程序時處理效率較低,反而對IO密集型的才有效率的大幅度提高。

如果想要充分地使用多核CPU的資源,需要使用多進程,python中提供multiprocessing實現。

CPU密集型:主要特點是需要進行大量的計算,消耗CPU資源,比如計算圓周率、對視頻進行高清解碼等等,全靠CPU的運算能力。這種計算密集型任務雖然也可以用多任務完成,但是任務越多,花在任務切換的時間就越多,CPU執行任務的效率就越低,所以,要最高效地利用CPU,計算密集型任務同時進行的數量應當等於CPU的核心數。

IO密集型:主要涉及到網絡、磁盤IO的任務都是IO密集型任務,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待IO操作完成(因為IO的速度遠遠低於CPU和內存的速度)。對於IO密集型任務,任務越多,CPU效率越高,但也有一個限度。常見的大部分任務都是IO密集型任務,比如Web應用。

所以python在多線程處理CPU密集型程序時可以選擇多進程實現,有效的利用多核提升效率;而IO密集型的由於99%的時間都花在IO上,花在CPU上的時間很少,所以多線程也能提高很大效率

 

Process對象

multiprocessing.Process類類似於threading.Thread,涉及參數以及屬性方法如下

multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

參數

  • group 應該始終是 None,它僅用於兼容 threading.Thread
  • target 是由 run() 方法調用的可調用對象,它默認為 None ,意味着什么都沒有被調用
  • name 是進程名稱
  • args 是目標調用的參數元組
  • kwargs 是目標調用的關鍵字參數字典
  • daemon 表是否為守護進程,為 True 或 False

方法

  • run() 表示進程活動的方法。
  • start() 啟動進程活動。每個進程對象最多只能調用一次
  • join([timeout])  如果可選參數 timeout 是 None (默認值),則該方法將阻塞,直到調用 join() 方法的進程終止。簡單說哪個子進程調用了join方法,主進程就要等該子進程執行完后才能繼續向下執行
  • is_alive() 返回進程是否還活着

屬性

  • pid 返回進程ID
  • name 進程的名稱
  • daemon 進程的守護標志,一個布爾值
  • exitcode 子進程退出代碼。如果進程尚未終止,這將是 None 。

 

創建多進程

類似與多線程,創建方式都是差不多的

1.通過函數方式創建

import multiprocessing
import time


def run(sec):
    print('這是進程名字', multiprocessing.current_process().name)
    print('這是進程PID', multiprocessing.current_process().pid)
    time.sleep(sec)


if __name__ == '__main__':

    print('這是主進程名字:', multiprocessing.current_process().name)
    print('這是主進程PID:', multiprocessing.current_process().pid)
    s_time = time.time()
    p1 = multiprocessing.Process(target=run, args=(1,))
    p2 = multiprocessing.Process(target=run, args=(2,))
    p3 = multiprocessing.Process(target=run, args=(3,))
    p1.start()
    p2.start()
    p3.start()
    p1.join()
    p2.join()
    p3.join()
    print('主進程結束', multiprocessing.current_process().name)
    print('一共用時', time.time()-s_time)

 

2.通過類來創建

import multiprocessing
import time


class MyProcess(multiprocessing.Process):

    def __init__(self, sec):
        super(MyProcess, self).__init__()
        self.sec = sec

    def run(self):
        print('這是進程名字', multiprocessing.current_process().name)
        print('這是進程PID', multiprocessing.current_process().pid)
        time.sleep(self.sec)


if __name__ == '__main__':

    print('這是主進程名字:', multiprocessing.current_process().name)
    print('這是主進程PID:', multiprocessing.current_process().pid)
    s_time = time.time()
    p1 = MyProcess(1)
    p2 = MyProcess(2)
    p3 = MyProcess(3)
    p1.start()
    p2.start()
    p3.start()
    p1.join()
    p2.join()
    p3.join()
    print('主進程結束', multiprocessing.current_process().name)
    print('一共用時', time.time()-s_time)

 

進程間通信

進程是資源(CPU、內存等)分配的最小單位,每個進程有獨立的地址空間與系統資源,每啟動一個新的進程相當創建全局變量的一份副本,子進程里的數據修改無法影響到主進程以及其他子進程中的數據,不同子進程之間的數據也不能共享,這是多進程與多線程最明顯的區別

示例如下

import multiprocessing
import time

tmp = 0

class MyProcess(multiprocessing.Process):

    def __init__(self, q):
        super(MyProcess, self).__init__()
        self.q = q

    def run(self):
        global tmp
        tmp = tmp+self.q
        print('進程%s,tmp值為%d'%(multiprocessing.current_process().name,tmp))


if __name__ == '__main__':

    p1 = MyProcess(1)
    p2 = MyProcess(2)
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print('主進程%s,tmp值為%d'%(multiprocessing.current_process().name,tmp))

輸出為:

進程MyProcess-1,tmp值為1
進程MyProcess-2,tmp值為2
主進程MainProcess,tmp值為0

所以多進程之間數據獨立,通過id(tmp)也可以看出。但是對文件file這種存儲在硬盤中的資源讀寫操作,或者一些通過multiprocessing 下 Value, Array創建的共享變量,對於這些資源進程之間會存在競爭,如果要避免多進程訪問這種共享資源出現沖突,會使用進程鎖的方式

共享內存的創建如下,具體方法這里省略

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

 

進程鎖

進程鎖可以避免因為多個進程訪問共享資源而發生沖突,這里的共享資源不是像多線程中那樣的全局變量,上面已經說了普通的全局變量不會在進程間共享,而是系統中的文件或者console輸出這類系統的資源,還有特別的能在進程間通信的共享內存資源,這些能被進程競爭。

這里以文件為例,因為同一時間,只能有一個進程,對文件進行寫入操作,這是操作系統的設定。同時由操作系統隨機決定哪個進程來寫入操作

from multiprocessing import Process, Lock   # 導入進程鎖

def f1(l,num):
    l.acquire()    # 加鎖
    f = open("file.txt", "a+")
    i = 10000
    while i > 0:
        f.write("hello word %s\n" % i)
        i -= 1
    print("process", num)
    f.close()
    l.release()   # 釋放鎖


def f2(l,num):
    l.acquire()    # 加鎖
    f = open("file.txt", "a+")
    i = 10000
    while i > 0:
        f.write("hello best word %s\n" % i)
        i -= 1
    print("process", num)
    f.close()
    l.release()   # 釋放鎖


if __name__ == "__main__":
    lock = Lock()     # 定義鎖
    p1 = Process(target=f1, args=(lock, 1,))
    p2 = Process(target=f2, args=(lock, 2,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

沒有加鎖的時候,會發現文檔中一個進程寫到一半,另一個進程的數據也寫入,是亂序的(可以將鎖去掉試試,這里不給出代碼了)

加了鎖后就會是正常一個進程寫完,再另一個進程繼續寫入

 

進程隊列Queue

Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。

與線程中使用的隊列Queue區別:

1.from queue import Queue:是一種隊列模型數據結構,類似於普通列表,有先進先出模式,堆棧模式,優先級模式等

2.from multiprocessing import Queue:是多進程並發的Queue隊列,用於解決多進程間的通信問題。可以將對象序列化傳遞再進程間,普通Queue實現不了。

主要方法還是get()與put()

put(obj[, block[, timeout]]):將對象放入隊列。如果blocked為True(默認值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。

get([block[, timeout]]):從隊列取出一個對象,如果blocked為True(默認值),並且timeout為正值,那么在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常。

close(): 指示當前進程將不會再往隊列中放入對象。一旦所有緩沖區中的數據被寫入管道之后,后台的線程會退出。這個方法在隊列被gc回收時會自動調用。

from multiprocessing import Process,Queue

class book():
    Storage = False

    def __init__(self,name):
        self.name = name
    
    def storagebook(self):
        Storage = True
        print("%s has storaged"%self.name)

def worker1(q):
    n = 1
    while n < 10:
        q.put(book("book-%d"%n))
        n+=1

def worker2(q):
    while True:
        #qsize()返回隊列長度
        if q.qsize() != 0:
            q.get().storagebook()
        else:
            break

if __name__ == "__main__":
    q = Queue()
    p1 = Process(target=worker1, args=(q,))
    p2 = Process(target=worker2, args=(q,))
    p1.start() #這里先啟動生產書的進程
    p2.start()
    p1.join()
    p2.join()

 

管道Pipe

通過multiprocessing.Pipe([duplex])會返回(conn1, conn2)一對Connection對象,代表一個管道的兩個端。

Pipe()有duplex參數,如果duplex參數為True(默認值),那么這個管道是全雙工模式,也就是說conn1和conn2均可收發。duplex為False,conn1只負責接受消息,conn2只負責發送消息。

send和recv方法分別是Connection對象的發送和接受消息的方法。例如,在全雙工模式下,可以調用conn1.send發送消息,conn1.recv接收消息。如果沒有消息可接收,recv方法會一直阻塞。如果管道已經被關閉,那么recv方法會拋出EOFError。

Connection對象方法主要有:

  • send(obj) 將一個對象發送到連接的另一端,可以用 recv() 讀取。發送的對象必須是可以序列化的,過大的對象 ( 接近 32MiB+ ,這個值取決於操作系統 ) 有可能引發 ValueError 異常。
  • recv() 返回一個由另一端使用 send()發送的對象。該方法會一直阻塞直到接收到對象。 如果對端關閉了連接或者沒有東西可接收,將拋出 EOFError 異常。
  • close() 關閉連接對象。當連接對象被垃圾回收時會自動調用。
  • poll([timeout]) 返回連接對象中是否有可以讀取的數據。如果未指定 timeout ,此方法會馬上返回。如果 timeout 是一個數字,則指定了最大阻塞的秒數。如果 timeout 是 None  ,那么將一直等待,不會超時。
from multiprocessing import Process,Pipe

class book():
    Storage = False

    def __init__(self,name):
        self.name = name
    
    def storagebook(self):
        Storage = True
        print("%s has storaged"%self.name)

def worker1(p):
    n = 1
    while n < 10:
        p.send(book("book-%d"%n))
        n+=1
    p.close()

def worker2(p):
    while True:
        if p.poll():  #判斷還有沒有數據
            p.recv().storagebook()
        else:
            break

if __name__ == "__main__":
    conn1, conn2 = Pipe(duplex=False)
    p1 = Process(target=worker1, args=(conn2,))
    p2 = Process(target=worker2, args=(conn1,))
    p1.start() #這里先啟動生產書的進程
    p2.start()
    p1.join()
    p2.join()

 

進程池Pool

可以使用multiprocessing.Pool實現簡單的多進程任務,進程池事先划分系統資源,並將資源分配給池中的進程,這些進程是創建Pool對象時已經創建及初始化好了的。當我們想創建新的進程任務時,新建的任務就可以直接取得Pool中的進程資源,而不用動態的從系統獲取新的資源。如果進程池中沒有可用的進程資源時,程序就會等待。

Pool類主要方法有:

  • apply(): 直到得到結果之前一直阻塞。同步操作
  • apply_async(): 這是 apply() 方法的一個變體,返回的是一個result對象。這是一個異步的操作,在所有的子類執行之前不會鎖住主進程。
  • map(): 這是內置的 map() 函數的並行版本。在得到結果之前一直阻塞,此方法將可迭代的數據的每一個元素作為進程池的一個任務來執行。
  • map_async(): 這是 map() 方法的一個變體,返回一個result對象。如果指定了回調函數,回調函數應該是callable的,並且只接受一個參數。當result准備好時會自動調用回調函數(除非調用失敗)。回調函數應該立即完成,否則,持有result的進程將被阻塞。

 

簡單使用進程池

import multiprocessing
from multiprocessing import Process, Pool
import time


def func(sec):
    time.sleep(sec)
    print('當前進程:%s pid:%d' % (multiprocessing.current_process().name,
                              multiprocessing.current_process().pid))


if __name__ == '__main__':
    print('主進程開始:%s' % multiprocessing.current_process().name)
    s_time = time.time()
    p = Pool(5)      # 創建pool對象,5表示池中創建5個進程
    for i in range(10):
        p.apply_async(func, args=(2,))

    p.close()  # 關閉進程池,防止將任何其他任務提交到池中。需要在join之前調用,否則會報ValueError: Pool is still running錯誤
    p.join()    # 等待進程池中的所有進程執行完畢

    print('主進程結束:%s' % multiprocessing.current_process().name)
    print('一共用時: ', time.time()-s_time)

打印結果如下

可以看到我們創建的任務實際是用進程池里面的進程資源

 

使用callback

進程池中回調函數callback作用是:進程池中任何一個任務一旦處理完了,就立即告知主進程,主進程則調用一個函數去處理該結果,該函數即回調函數

使用callback的好處是可以將耗時的任務放在子進程中,等子進程有結果時再去通知主進程處理,實際就是異步操作的實現

把上面的例子改一下

import multiprocessing
from multiprocessing import Process, Pool
import time


def func(sec):
    time.sleep(1)  # sleep一秒是為了模擬阻塞的情況
    print('當前進程:%s pid:%d' % (multiprocessing.current_process().name,
                              multiprocessing.current_process().pid))
    return {multiprocessing.current_process().name: sec}


def func2(res):
    print('當前進程:%s pid:%d' % (multiprocessing.current_process().name,
                              multiprocessing.current_process().pid))
    print(res)


if __name__ == '__main__':
    print('主進程開始:%s' % multiprocessing.current_process().name)
    s_time = time.time()
    p = Pool(5)      # 創建pool對象,5表示池中創建5個進程
    for i in range(10):
        p.apply_async(func, args=(i,), callback=func2)  # 使用callback

    p.close()  # 關閉進程池,防止將任何其他任務提交到池中。需要在join之前調用,否則會報ValueError: Pool is still running錯誤
    p.join()    # 等待進程池中的所有進程執行完畢

    print('主進程結束:%s' % multiprocessing.current_process().name)
    print('一共用時: ', time.time()-s_time)

打印結果如下

 

AsyncResult對象

由Pool.apply_async()和Pool.map_async()返回的result實例對象的類,主要方法有

  • get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發multiprocessing.TimeoutError。
  • ready():如果調用完成,返回True
  • successful():如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發AssertionError異常
  • wait([timeout]):等待結果變為可用。

示例如下

import multiprocessing
from multiprocessing import Process, Pool
import time


def func(sec):
    time.sleep(sec)
    print('當前進程:%s pid:%d' % (multiprocessing.current_process().name,
                              multiprocessing.current_process().pid))
    return {multiprocessing.current_process().name: sec}

if __name__ == '__main__':
    print('主進程開始:%s' % multiprocessing.current_process().name)
    s_time = time.time()
    p = Pool(5)      # 創建pool對象,5表示池中創建5個進程
    result = []
    for i in range(10):
        result.append(p.apply_async(func, args=(i,)))

    p.close()  # 關閉進程池,防止將任何其他任務提交到池中。需要在join之前調用,否則會報ValueError: Pool is still running錯誤
    p.join()    # 等待進程池中的所有進程執行完畢
    for res in result:
        print(res.get()) #get()獲取返回值

    print('主進程結束:%s' % multiprocessing.current_process().name)
    print('一共用時: ', time.time()-s_time)

與使用callback不同的是,這個需要子進程都結束后,才能在主進程中處理

 

map用法

進程池中map()方法等價於內置函數map(func, *iterables),也是一種創建進程任務的簡化方法

內置函數map()是對iterables依次執行func(item),將執行結果組成一個 List 返回(python2是list,python3是map對象)

進程池中map()則是返回list,而map_async()返回的是AsyncResult的變體,要通過get()得到list

import multiprocessing
from multiprocessing import Process, Pool
import time


def func(sec):
    time.sleep(sec)
    print('當前進程:%s pid:%d' % (multiprocessing.current_process().name,
                              multiprocessing.current_process().pid))
    return {multiprocessing.current_process().name: sec}


if __name__ == '__main__':
    print('主進程開始:%s' % multiprocessing.current_process().name)
    s_time = time.time()
    p = Pool(5)
    result = p.map_async(func, range(10))  # map創建任務

    p.close()
    p.join()
    for res in result.get():  # map對象使用get方法返回list
        print(res)
    print(result.get())
    print('主進程結束:%s' % multiprocessing.current_process().name)
    print('一共用時: ', time.time()-s_time)

 

 

參考:https://docs.python.org/zh-cn/3/library/multiprocessing.html 以及部分網上資料

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM