Python multiprocess模塊(下)


主要內容:(參考資料)

一. 管道

二. 數據共享

  數據共享是不安全的

三. 進程池

  進程池的map傳參

  進程池的同步方法

  進程池的異步方法

  詳解apply和apply_async

  apply_async的其他方法

 

一. 管道

管道(不推薦使用,了解即可)是進程間通信(IPC)的第二種方式,它會導致數據不安全的情況出現.

#創建管道的類:
Pipe([duplex]): 在進程之間創建一條管道, 並返回元組(conn1, conn2), 其中conn1, conn2表示管道兩端的連接對象. 強調一點: 必須在產生Process對象之前產生管道.

#參數介紹:
dumplex: 默認管道是全雙工的, 如果將duplex設置成False, conn1只能用於接收, conn2只能用於發送.

#主要方法:
conn1.recv(): 接收conn2.send(obj)發送的對象. 如果沒有消息可接收, recv()方法會一直阻塞. 如果連接的另外一端已經關閉, 那么recv()方法會拋出EOFError.
conn1.send(obj):通過連接發送對象。obj是與序列化兼容的任意對象
#其他方法:
conn1.close(): 關閉連接. 如果conn1被垃圾回收, 將自動調用此方法
conn1.fileno(): 返回連接使用的整數文件描述符
conn1.poll([timeout]): 如果連接上的數據可用, 返回True. timeout指定等待的最長時限. 如果省略此參數, 方法將立即返回結果. 如果將timeout設置成None, 操作將無限期地等待數據到達.
 
conn1.recv_bytes([maxlength]): 接收c.send_bytes()方法發送的一條完整的字節消息. maxlength指定要接收的最大字節數. 如果進入的消息, 超過了這個最大值, 將引發IOError異常, 並且在連接上無法進行進一步讀取. 如果連接的另外一端已經關閉, 再也不存在任何數據, 將引發EOFError異常. 
conn.send_bytes(buffer[,offset[,size]]): 通過連接發送字節數據緩沖區, buffer是支持緩沖區接口的任意對象, offset是緩沖區中的字節偏移量, 而size是要發送的字節數. 數據結果以單條消息的形式發出, 然后調用c.recv_bytes()函數進行接收
 
conn1.recv_bytes_into(buffer[,offset]): 接收一條完整的字節消息, 並把它保存在buffer對象中, 該對象支持可寫入的緩沖區接口(即bytearray對象或類似的對象). offset指定緩沖區中放置消息處的字節位移. 返回值是收到的字節數. 如果消息長度大於可用的緩沖區空間, 將引發BufferTooShort異常.
管道介紹

 

管道的簡單使用:

from multiprocessing import Process, Pipe   # 引入Pipe模塊

def func(conn):
    conn.send("HelloWorld!")    # 子進程發送了消息
    conn.close()                # 子進程關閉通道的一端

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()    # 建立管道,拿到管道的兩端,雙工通信方式,兩端都可以收發消息
    p = Process(target=func, args=(child_conn,))    # 將管道的一端給子進程
    p.start()   # 開啟子進程
    print("主進程接收>>>", parent_conn.recv())   # 主進程接收了消息
    p.join()
    print("主進程執行結束!")
例1:子進程給主進程發送消息
from multiprocessing import Process, Pipe   # 引入Pipe模塊

def func(conn):
    msg = conn.recv()  # (5)子進程通過管道的另一端接收信息
    print("The massage from parent_process is>>>", msg)

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()   # (1)創建管道,拿到管道的兩端
    p = Process(target=func, args=(child_conn,))    # (2)創建子進程func, 把child_conn給func
    p.start()   # (3)啟動子進程
    parent_conn.send("Hello,child_process!")    # (4)主進程通過parent_conn給子進程發送信息

# 執行結果:
# The massage from parent_process is>>> Hello,child_process!
例2:主進程給子進程發送消息
from multiprocessing import Process, Pipe

def func(parent_conn, child_conn):
    msg = parent_conn.recv()    # (5)子進程使用parent_conn接收主進程的消息
    print("子進程使用parent_conn接收>>>", msg)  # (6)打印接收到的消息
    child_conn.send("子進程使用child_conn給主進程發送了一條消息")  # (7)子進程發送消息
    print("子進程執行完畢")


if __name__ == '__main__':
    parent_conn, child_conn = Pipe()    # (1)創建管道,拿到管道兩端
    child_conn.send("主進程使用child_conn給子進程發送了一條消息")  # (2)主進程發消息
    p = Process(target=func, args=(parent_conn, child_conn))    # (3)創建子進程,把管道兩端都給子進程
    p.start()   # (4)開啟子進程
    p.join()    # (8)等待子進程執行完畢
    msg = parent_conn.recv()  # (9)主進程使用parent_conn接收子進程的消息
    print("主進程使用parent_conn接收>>>", msg)  # (10)打印接收到的消息
    print("主進程執行完畢!")

# 執行結果:
# 子進程使用parent_conn接收>>> 主進程使用child_conn給子進程發送了一條消息
# 子進程執行完畢
# 主進程使用parent_conn接收>>> 子進程使用child_conn給主進程發送了一條消息
# 主進程執行完畢!
主進程和子進程互相收發消息

 

應該特別注意管道端點的正確管理問題. 如果生產者或消費者中都沒有使用管道的某個端點, 就應將它關閉,否則就會拋出異常. 例如: 當生產者關閉了管道的輸出端時, 消費者也要同時關閉管道的輸入端. 如果忘記執行這些步驟, 程序可能在消費者中的recv()操作上掛起(就是阻塞). 管道是由操作系統進行引用計數的, 在所有進程中關閉管道的相同一端就會生成EOFError異常. 因此, 在生產者中關閉管道不會有任何效果, 除非消費者也關閉了相同的管道端點.

from multiprocessing import Process, Pipe

def f(parent_conn,child_conn):
    #parent_conn.close() #不寫close將不會引發EOFError
    while True:
        try:
            print(child_conn.recv())
        except EOFError:
            child_conn.close()
            break

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(parent_conn,child_conn,))
    p.start()
    child_conn.close()
    parent_conn.send('hello')
    parent_conn.close()
    p.join()   
引發EOFError

管道可以用於雙工通信, 通常利用在客戶端/服務端中使用的請求/響應模型, 或者遠程過程調用, 就可以使用管道編寫與進程交互的程序, 像前面將網絡通信的時候, 我們使用了一個叫subprocess的模塊, 里面有個參數是pipe管道, 執行系統指令, 並通過管道獲取結果.

 

 

二. 數據共享

展望未來, 基於消息傳遞的並發編程是大勢所趨. 即便是使用線程, 推薦做法也是將程序設計為大量獨立的線程集合, 通過消息隊列交換數據. 這樣極大地減少了對使用鎖定和其他同步手段的需求, 還可以擴展到分布式系統中.

進程間應該盡量避免通信, 即便需要通信, 也應該選擇進程安全的工具來避免加鎖帶來的問題, 應該盡量避免使用本節所講的共享數據的方式, 以后我們會嘗試使用數據庫來解決進程之間的數據共享問題.

進程之間數據共享的模塊之一Manager模塊:

進程間數據是獨立的, 可以借助於隊列或管道實現通信, 二者都是基於消息傳遞的. 雖然進程間數據獨立, 但可以通過Manager實現數據共享.

from multiprocessing import Process, Manager    # 引入Manager模塊

def func(m_list):
    m_list.pop()    # 對manager列表進行改變

if __name__ == '__main__':
    m = Manager()   # 創建Manager對象
    m_list = m.list(["王力宏", "王乃卉", "王少軒"])  # 創建manager列表
    print("主進程>>>", m_list)
    p = Process(target=func, args=(m_list,))    # 創建子進程
    p.start()
    p.join()
    print("主進程>>>", m_list)

# 執行結果:
# 主進程>>> ['王力宏', '王乃卉', '王少軒']
# 主進程>>> ['王力宏', '王乃卉']
子進程修改共享列表
from multiprocessing import Process, Manager    # 引入Manager模塊

def func(m_dic):
    m_dic["name"] = "王力宏"   # 修改manager字典

if __name__ == '__main__':
    m = Manager()   # 創建Manager對象
    m_dic = m.dict({"name": "王乃卉"}) # 創建manager字典
    print("主進程>>>", m_dic)
    p = Process(target=func, args=(m_dic,)) # 創建子進程
    p.start()
    p.join()
    print("主進程>>>", m_dic)

# 執行結果:
# 主進程>>> {'name': '王乃卉'}
# 主進程>>> {'name': '王力宏'}
子進程修改共享字典

 

多進程共同去處理共享數據的時候, 就和我們多進程同時去操作一個文件中的數據是一樣的, 不加鎖就會出現錯誤的結果, 進程不安全的, 所以也需要加鎖.

from multiprocessing import Process, Manager

def func(m_dic):
    m_dic["count"] -= 1

if __name__ == '__main__':
    m = Manager()
    m_dic = m.dict({"count": 100})
    p_list = []
    # 開啟20個進程來對共享數據進行修改
    for i in range(20):
        p = Process(target=func, args=(m_dic, ))
        p.start()
        p_list.append(p)
    [p.join() for p in p_list]
    print("主進程>>>", m_dic)

# 執行結果:
# 主進程>>> {'count': 80}
# 但是偶爾會出現  主進程>>> {'count': 81}  的情況, 這是因為共享數據不變, 但是當多個子進程同時訪問共享數據並對其進行修改時, 由於修改的過程是要重寫對共享數據進行賦值的, 在這個賦值的過程中, 可能一個子進程還沒來得及賦值成功, 就有另外的一個子進程拿到原先的值, 這樣一來, 就會出現多個子進程修改同一個共享數據, 於是就出現了上面代碼結果偶爾會少減了一次的現象. 綜上所述,共享數據是不夠安全的, 而"加鎖"是一個很好的解決辦法.
不加鎖對共享數據進行修改,是不安全的
from multiprocessing import Process, Manager, Lock

def func(m_dic, m_lock):
    with m_lock:
        m_dic["count"] -= 1
    # 等同於:
    # m_lock.acquire()
    # m_dic["count"] -= 1
    # m_lock.release()

if __name__ == '__main__':
    m = Manager()
    m_lock = Lock()
    m_dic = m.dict({"count": 100})
    p_list = []
    # 開啟20個進程來對共享數據進行修改
    for i in range(20):
        p = Process(target=func, args=(m_dic, m_lock))
        p.start()
        p_list.append(p)
    [p.join() for p in p_list]
    print("主進程", m_dic)

# 執行結果:
# 主進程 {'count': 80}

# 加鎖后, 多次嘗試運行程序, 執行結果也沒有發生改變. 不難看出, 加鎖后 共享數據是安全的.
加鎖后的共享數據是安全的

 

 

三. 進程池

為什么要有進程池?

在程序實際處理問題過程中, 繁忙時會有成千上萬的任務需要被執行, 空閑時卻可能只有零星任務. 那么在成千上萬個任務需要被執行的時候, 我們就需要去創建成千上萬個進程么? 首先, 創建進程需要消耗時間, 銷毀進程(空間, 變量, 文件信息等等的內容)也需要消耗時間. 第二, 即便開啟了成千上萬的進程, 操作系統也不能讓他們同時執行, 維護一個很大的進程列表的同時, 調度的時候, 還需要進行切換並且記錄每個進程的執行節點, 也就是記錄上下文(各種變量等等), 這樣反而會影響程序的效率. 因此我們不能無限制的根據任務數量頻繁開啟或者結束進程. 就看我們上面的一些代碼例子, 可以發現有些程序執行后需要較長的時間才能得出結果, 這就是問題的原因, 那么我們需要如何做才能避免這種情況呢?

進程池的概念:

  在這里, 介紹一個進程池的概念: 定義一個池子, 在里面放上固定數量的進程, 有需求來了, 就拿這個池中的進程來處理任務, 等到處理完畢, 進程並不關閉, 而是將進程再放回進程池中繼續等待任務. 如果有很多任務需要執行, 池中的進程數量不夠, 任務就要等待之前的進程執行任務完畢歸來, 拿到空閑進程才能繼續執行. 也就是說, 池中進程的數量是固定的, 那么同一時間最多有固定數量的進程在運行. 這樣不僅降低了操作系統的調度難度, 還節省了開閉進程的時間, 也在一定程度上能夠實現並發效果.

 

multiprocess中的Pool模塊

創建進程池的類: 如果指定numprocess為3, 則進程池會從無到有創建三個進程, 然后自始至終使用這三個進程去執行所有任務(高級一些的進程池可以根據並發量, 設置成動態增加或減少進程池中的進程數量的操作), 這種方式不會開啟其他進程, 它提高操作系統效率, 減少了空間的占用.

#語法:
Pool([numprocess  [, initializer [, initargs]]]): 創建進程池

#參數:
numprocess: 要創建的進程數, 如果省略, 將默認使用os.cpu_count()(os模塊中查看電腦CPU數量的一個方法)的值
initializer: 是每個工作進程啟動時要執行的可調用對象, 默認為None
initargs: 是要傳給initializer的參數組
p.apply(func [, args [, kwargs]]): 在一個池工作進程中執行func(*args,**kwargs), 然后返回結果.
#需要強調的是: 此操作並不會在進程池的工作過程中並發執行func函數. 如果要通過不同參數並發地執行func函數, 必須從不同線程調用p.apply()函數或者使用p.apply_async()

p.apply_async(func [, args [, kwargs]]): 在一個進程池工作過程中執行func(*args,**kwargs), 然后返回結果.
#此方法的結果是AsyncResult類的實例, callback是可調用對象, 接收輸入參數. 當func的結果變為可用時, 將結果傳遞給callback. callback禁止執行任何阻塞操作, 否則將接收其他異步操作中的結果.

p.close(): 不允許再有其他的任務來使用進程池. 如果所有操作持續掛起, 它們將在工作進程終止前完成.

P.join(): 等待所有工作進程退出. 此方法只能在close()或teminate()之后調用.
主要方法介紹
方法apply_async()和map_async()的返回值是AsyncResul的實例obj. 實例具有以下方法:

obj.get(): 返回結果, 如果有必要則等待結果到達. timeout是可選的. 如果在指定時間內還沒有到達, 將引發異常. 如果遠程操作中引發了異常, 它將在調用此方法時再次被引發.
obj.ready(): 如果調用完成, 返回True
obj.successful(): 如果調用完成且沒有引發異常, 返回True, 如果在結果就緒之前調用此方法, 引發異常
obj.wait([timeout]): 等待結果變為可用.
obj.terminate(): 立即終止所有工作進程, 同時不執行任何清理或結束任何掛起工作. 如果p被垃圾回收, 將自動調用此函數.
其他方法(了解)

 

1. 進程池的map傳參

map(func, iterables)是異步執行的, 並且自帶close和join.

import time
from multiprocessing import Pool

def func(n):
    time.sleep(0.5)
    print(n)

if __name__ == '__main__':
    pool = Pool(4)  # 創建進程池對象,進程池中放置了4個進程,一般來說,這個數量是電腦的CPU數量
    pool.map(func, range(100)) #參數必須是可迭代的
進程池的map傳參
import time
from multiprocessing import Process, Pool

def func(n):
    for i in range(5):
        n = n + i

if __name__ == '__main__':
    pool_start_time = time.time()   # 進程池開始執行時間
    pool = Pool(4)  #創建進程池對象,進程池中設置了4個進程
    pool.map(func, range(100))  # map是異步執行的,
    pool_end_time = time.time()     # 進程池執行完畢時間
    pool_different_time = pool_end_time - pool_start_time   # 進程池執行時間差

    p_start_time = time.time()      # 多進程開始執行時間
    p_list = []
    for i in range(100):
        p1 = Process(target=func, args=(i,))
        p1.start()
        p_list.append(p1)
    [p.join() for p in p_list]
    p_end_time = time.time()        # 多進程執行完畢時間
    p_different_time = p_end_time - p_start_time    # 多進程執行時間差

    print("進程池的執行時間>>>", pool_different_time)
    print("多進程的執行時間>>>", p_different_time)

# 執行結果:
# 進程池的執行時間>>> 0.16112160682678223
# 多進程的執行時間>>> 3.6605968475341797

# 可以明顯地看出,進程池的執行效率遠遠高於多進程.
進程池與多進程的效率對比

 

2. 進程池的同步調用

import time
from multiprocessing import Pool

def func(i):
    time.sleep(0.5)
    return i**2

if __name__ == '__main__':
    p = Pool(4)
    for i in range(10):
        res = p.apply(func, args=(i,))
        """p.apply() --> 同步執行的方法,它會等待子進程的返回結果,所以最后的執行結果是勻速打印出來的"""
        print(res)
進程池的同步調用

 

3. 進程池的異步調用

import os
import time
import random
from multiprocessing import Pool

def work(n):
    print('%s run' % os.getpid())   # 進程ID號
    time.sleep(random.random())
    return n**2

if __name__ == '__main__':
    p = Pool(4) # 進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務
    res_l = []
    for i in range(10):
        res = p.apply_async(work, args=(i,))
        """異步運行,根據進程池中的進程數,每次最多4個子進程在異步執行,並且可以執行不同的任務,傳送任意的參數了.
        返回結果之后,將結果放入列表,歸還進程,之后再執行新的任務.需要注意的是,進程池中的三個進程不會同時開啟或
        者同時結束而是執行完一個就釋放一個進程,這個進程就去接收新的任務."""
        res_l.append(res)

    """異步apply_async用法:如果使用異步提交的任務,主進程需要使用join,等待進程池內任務都處理完,然后可以用get收集結果.
        否則,主進程結束,進程池可能還沒來得及執行,也就跟着一起結束了."""
    p.close()   # 不是關閉進程池,而是結束進程池接收任務,確保沒有新任務再提交過來.
    p.join()    # 感知進程池中的任務已經執行結束,只有當沒有新的任務添加進來的時候,才能感知到任務結束了,所以在join之前必須加上close方法.
    for res in res_l:
        print(res.get())    # 使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get.
進程池的異步調用

 

4. 詳解apply和apply_async

#一:使用進程池(異步調用,apply_async)
#coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply_async(func, (msg, ))   #維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去
        res_l.append(res)
        # s = res.get() #如果直接用res這個結果對象調用get方法獲取結果的話,這個程序就變成了同步,因為get方法直接就在這里等着你創建的進程的結果,第一個進程創建了,並且去執行了,那么get就會等着第一個進程的結果,沒有結果就一直等着,那么主進程的for循環是無法繼續的,所以你會發現變成了同步的效果
    print("==============================>") #沒有后面的join,或get,則程序整體結束,進程池中的任務還沒來得及全部執行完也都跟着主進程一起結束了

    pool.close() #關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成
    pool.join()   #調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束

    print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>對象組成的列表,而非最終的結果,但這一步是在join后執行的,證明結果已經計算完畢,剩下的事情就是調用每個對象下的get方法去獲取結果
    for i in res_l:
        print(i.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get

#二:使用進程池(同步調用,apply)
#coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(0.1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply(func, (msg, ))   #維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去
        res_l.append(res) #同步執行,即執行完一個拿到結果,再去執行另外一個
    print("==============================>")
    pool.close()
    pool.join()   #調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束

    print(res_l) #看到的就是最終的結果組成的列表
    for i in res_l: #apply是同步的,所以直接得到結果,沒有get()方法
        print(i)
詳解apply和apply_async

 

5. 回調函數

需要回調函數的場景: 進程池中任何一個任務一旦處理完了, 就立即告知主進程自己已處理完畢了. 主進程則調用一個函數去處理該任務的執行結果, 該函數即回調函數, 這是進程池特有的, 普通進程沒有這個機制, 但是我們也可以通過進程通信來拿到返回值, 進程池的這個回調也是進程通信的機制完成的.

我們可以把比較消耗時間(阻塞)的任務放到進程池中, 然后指定回調函數(主進程負責執行), 這樣主進程在執行回調函數時就省去了I/O的過程, 直接拿到的是任務的結果.
回調函數應用場景
import os
from multiprocessing import Pool

def func1(n):
    print('func1>>',os.getpid())
    print('func1')
    return n*n

def func2(nn):
    print('func2>>',os.getpid())
    print('func2')
    print(nn)
    # import time
    # time.sleep(0.5)
if __name__ == '__main__':
    print('主進程:',os.getpid())
    p = Pool(5)
    #args里面的10給了func1,func1的返回值作為回調函數的參數給了callback對應的函數,不能直接給回調函數直接傳參數,他只能是你任務函數func1的函數的返回值
    # for i in range(10,20): #如果是多個進程來執行任務,那么當所有子進程將結果給了回調函數之后,回調函數又是在主進程上執行的,那么就會出現打印結果是同步的效果。我們上面func2里面注銷的時間模塊打開看看
    #     p.apply_async(func1,args=(i,),callback=func2)
    p.apply_async(func1,args=(10,),callback=func2)

    p.close()
    p.join()

#結果
# 主進程: 11852  #發現回調函數是在主進程中完成的,其實如果是在子進程中完成的,那我們直接將代碼寫在子進程的任務函數func1里面就行了,對不對,這也是為什么稱為回調函數的原因。
# func1>> 17332
# func1
# func2>> 11852
# func2
# 100
回調函數簡單使用

回調函數在寫的時候注意一點, 回調函數的形參只有一個, 如果你的執行函數有多個返回值, 那么也可以被回調函數的這一個形參接收, 接收的是一個元組, 包含着你執行函數的所有返回值.

使用進程池來進行爬蟲操作的時候, 最耗時間的是請求地址的網絡請求延遲, 那么如果我們在將處理數據的操作加到每個子進程中, 於是所有在進程池后面排隊的進程就需要等更長的時間才能獲取進程池里面的進程來執行自己, 所以一般我們就將請求作成一個執行函數, 通過進程池去異步執行, 剩下的數據處理的內容放到另外一個進程或者主進程中去執行, 將網絡延遲的時間也利用起來, 效率就會更高了.

#進程池和信號量的區別:

進程池是多個需要被執行的任務在進程池外面排隊等待獲取進程對象去執行自己, 而信號量是一堆進程等待着去執行一段邏輯代碼.

信號量不能控制創建多少個進程, 但是可以控制同時多少個進程能夠執行.
進程池能控制可以創建多少個進程.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM