《Python》進程之間的通信(IPC)、進程之間的數據共享、進程池


一、進程間通信---隊列和管道(multiprocess.Queue、multiprocess.Pipe)

  進程間通信:IPC(inter-Process Communication)

1、隊列

概念介紹:

  創建共享的進程隊列,Queue是多進程的安全的隊列,可以使用Queue實現多進程之間的數據傳遞。

# Queue([maxsize]) 
創建共享的進程隊列。
參數 :maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。
# Queue([maxsize]) 

創建共享的進程隊列。maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還需要運行支持線程以便隊列中的數據傳輸到底層管道中。 
Queue的實例q具有以下方法:

q.get( [ block [ ,timeout ] ] ) 
返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。block用於控制阻塞行為,默認為True. 如果設置為False,將引發Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有項目變為可用,將引發Queue.Empty異常。

q.get_nowait( ) 
同q.get(False)方法。

q.put(item [, block [,timeout ] ] ) 
將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認為True。如果設置為False,將引發Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時后將引發Queue.Full異常。

q.qsize() 
返回隊列中目前項目的正確數量。此函數的結果並不可靠,因為在返回結果和在稍后程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引發NotImplementedError異常。


q.empty() 
如果調用此方法時 q為空,返回True。如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。

q.full() 
如果q已滿,返回為True. 由於線程的存在,結果也可能是不可靠的(參考q.empty()方法)。。
方法介紹
q.close() 
關閉隊列,防止隊列中加入更多數據。調用此方法時,后台線程將繼續寫入那些已入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。

q.cancel_join_thread() 
不會再進程退出時自動連接后台線程。這可以防止join_thread()方法阻塞。

q.join_thread() 
連接隊列的后台線程。此方法用於在調用q.close()方法后,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread()方法可以禁止這種行為。
其他方法(了解)

代碼實現:

'''
multiprocessing模塊支持進程間通信的兩種主要形式:管道和隊列
都是基於消息傳遞實現的,但是隊列接口
'''

from multiprocessing import Queue
q = Queue(3)    # 只能往這個隊列放3個值

# put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
# q.put(3)  # 如果隊列已經滿了,程序就會停在這里,等待數據被別人取走,再將數據放入隊列。
            # 如果隊列中的數據一直不被取走,程序就會永遠停在這里。
try:
    q.put_nowait(3)    # 可以使用put_nowait,如果隊列滿了不會阻塞,但是會因為隊列滿了而報錯。
except:     # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,但是會丟掉這個消息。
    print('隊列已經滿了')

# 因此,我們再放入數據之前,可以先看一下隊列的狀態,如果已經滿了,就不繼續put了。
print(q.full())    # 判斷是否滿了

print(q.get())
print(q.get())
print(q.get())
# print(q.get())   # 同put方法一樣,如果隊列已經空了,那么繼續取就會出現阻塞。
try:
    q.get_nowait(3)    # 可以使用get_nowait,如果隊列滿了不會阻塞,但是會因為沒取到值而報錯。
except:    # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。
    print('隊列已經空了')

print(q.empty())   # 判斷是否空了
from multiprocessing import Process, Queue

def consume(q):
    print('son-->', q.get())    # 取走隊列一條數據
    q.put('abc')    # 給隊列增加一條數據

if __name__ == '__main__':
    q = Queue()
    p = Process(target=consume, args=(q,))
    p.start()
    q.put({'haha': 123})    # 給隊列增加一條數據
    p.join()    # 等待子進程執行完畢
    print('Foo-->', q.get())    # 取走隊列一條數據
# son--> {'haha': 123}
# Foo--> abc 

2、生產者消費者模型:

  在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。

為什么要使用生產者和消費者模式

  在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什么是生產者消費者模式

  生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

import time
import random
from multiprocessing import Process, Queue

def consumer(q, name):
    # 處理數據
    while 1:
        food = q.get()
        if food is None: break  # 如果不結束的話程序就會一直不結束
        time.sleep(random.uniform(0.5, 1))
        print('%s吃了一個%s' % (name, food))

def producer(q, name, food):
    # 獲取數據
    for i in range(10):
        time.sleep(random.uniform(0.3, 0.8))
        print('%s生產了%s%s' % (name, food, i + 1))
        q.put(food + str(i))

if __name__ == '__main__':
    q = Queue()
    Process(target=consumer, args=(q, 'alex')).start()
    Process(target=consumer, args=(q, 'wusir')).start()
    p1 = Process(target=producer, args=(q, 'yang', '包子'))
    p2 = Process(target=producer, args=(q, 'sihao', '饅頭'))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    q.put(None)    # 有幾個consumer就需要放幾個None
    q.put(None)    # 結束信號 

3、JoinableQueue([maxsize])

  創建可連接的共享進程隊列。這就像是一個Queue對象,但隊列允許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

JoinableQueue的實例p除了與Queue對象相同的方法之外,還具有以下方法:

q.task_done() 
使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。如果調用此方法的次數大於從隊列中刪除的項目數量,將引發ValueError異常。

q.join() 
生產者將使用此方法進行阻塞,直到隊列中所有項目均被處理。阻塞將持續到為隊列中的每個項目均調用q.task_done()方法為止。 
下面的例子說明如何建立永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,並等待它們被處理。
方法介紹
import time
import random
from multiprocessing import Process, JoinableQueue
 
def consumer(q, name):
    # 處理數據
    while 1:
        food = q.get()
        time.sleep(random.uniform(0.5, 1))
        print('%s吃了一個%s' % (name, food))
        q.task_done()   # 通知隊列已經有一個數據被處理了
        
def producer(q, name, food):
    # 獲取數據
    for i in range(10):
        time.sleep(random.uniform(0.3, 0.8))
        print('%s生產了%s%s' % (name, food, i+1))
        q.put(food + str(i))
        
if __name__ == '__main__':
    q = JoinableQueue()
    c1 = Process(target=consumer, args=(q, 'alex'))
    c2 = Process(target=consumer, args=(q, 'wusir'))
    c1.daemon = True    # 設置守護進程
    c2.daemon = True    # 設置守護進程
    c1.start()
    c2.start()
    p1 = Process(target=producer, args=(q, 'yang', '包子'))
    p2 = Process(target=producer, args=(q, 'sihao', '饅頭'))
    p1.start()
    p2.start()
    p1.join()   # 生產者要先把所有的數據都放到隊列中
    p2.join()   # 生產者要先把所有的數據都放到隊列中
    q.join()    # 阻塞直到放入隊列中所有的數據都被處理掉(有多少個數據就接收到了多少task_done)

4、管道

  隊列是基於管道實現的,管道是基於 socket 實現的

  隊列 + 鎖  簡便的IPC機制,使得進程之間數據安全

  管道  進程之間數據不安全,且存取數據復雜

  socket + pickle

#創建管道的類:
Pipe([duplex]):在進程之間創建一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對象,強調一點:必須在產生Process對象之前產生管道
#參數介紹:
dumplex:默認管道是全雙工的,如果將duplex射成False,conn1只能用於接收,conn2只能用於發送。
#主要方法:
    conn1.recv():接收conn2.send(obj)發送的對象。如果沒有消息可接收,recv方法會一直阻塞。如果連接的另外一端已經關閉,那么recv方法會拋出EOFError。
    conn1.send(obj):通過連接發送對象。obj是與序列化兼容的任意對象
 #其他方法:
conn1.close():關閉連接。如果conn1被垃圾回收,將自動調用此方法
conn1.fileno():返回連接使用的整數文件描述符
conn1.poll([timeout]):如果連接上的數據可用,返回True。timeout指定等待的最長時限。如果省略此參數,方法將立即返回結果。如果將timeout射成None,操作將無限期地等待數據到達。
 
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。如果進入的消息,超過了這個最大值,將引發IOError異常,並且在連接上無法進行進一步讀取。如果連接的另外一端已經關閉,再也不存在任何數據,將引發EOFError異常。
conn.send_bytes(buffer [, offset [, size]]):通過連接發送字節數據緩沖區,buffer是支持緩沖區接口的任意對象,offset是緩沖區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,然后調用c.recv_bytes()函數進行接收    
 
conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩沖區接口(即bytearray對象或類似的對象)。offset指定緩沖區中放置消息處的字節位移。返回值是收到的字節數。如果消息長度大於可用的緩沖區空間,將引發BufferTooShort異常。
介紹
import time
from multiprocessing import Pipe, Process

def consumer(left, right):
    time.sleep(1)
    print(right.recv())

if __name__ == '__main__':
    left, right = Pipe()
    Process(target=consumer, args=(left, right)).start()
    left.send(1234)

  應該特別注意管道端點的正確管理問題。如果是生產者或消費者中都沒有使用管道的某個端點,就應將它關閉。這也說明了為何在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。如果忘記執行這些步驟,程序可能在消費者中的recv()操作上掛起。管道是由操作系統進行引用計數的,必須在所有進程中關閉管道后才能生成EOFError異常。因此,在生產者中關閉管道不會有任何效果,除非消費者也關閉了相同的管道端點。

from multiprocessing import Process, Pipe

def consumer(left, right):
    left.close()    # 不寫close將不會引發EOFError
    while 1:
        try:
            print(right.recv())
        except EOFError:
            break

if __name__ == '__main__':
    left, right = Pipe()
    Process(target=consumer, args=(left, right)).start()
    right.close()
    for i in range(10):
        left.send('包子%s' % i)
    left.close()
# pipe的端口管理不會隨着某一個進程的關閉就關閉
# 操作系統來管理進程對這些端口的使用
# left,right
# left,right
# 操作系統管理4個端口  每關閉一個端口計數-1,直到所有的端口都關閉了
# 剩余1個端口的時候 recv就會報錯

 二、進程之間的數據共享   

  展望未來,基於消息傳遞的並發編程是大勢所趨

  即便是使用線程,推薦做法也是將程序設計為大量獨立的線程集合,通過消息隊列交換數據。

  這樣極大地減少了對使用鎖定和其他同步手段的需求,還可以擴展到分布式系統中。

  但進程間應該盡量避免通信,即便需要通信,也應該選擇進程安全的工具來避免加鎖帶來的問題。

  以后我們會嘗試使用數據庫來解決現在進程之間的數據共享問題。

進程間數據是獨立的,可以借助於隊列或管道實現通信,二者都是基於消息傳遞的
雖然進程間數據獨立,但可以通過Manager實現數據共享,事實上Manager的功能遠不止於此

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
Manager模塊介紹
from multiprocessing import Manager, Process, Lock

def work(d, lock):
    with lock:  # 不加鎖而操作共享的數據,肯定會出現數據錯亂
        d['count'] -= 1
        
if __name__ == '__main__':
    lock = Lock()
    with Manager() as m:
        dic = m.dict({'count': 100})
        p_l = []
        for i in range(100):
            p = Process(target=work, args=(dic, lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)

三、數據池和multiprocess.Pool模塊

 進程池:

  為什么要有進程池?進程池的概念。

  在程序實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閑時可能只有零星任務。那么在成千上萬個任務需要被執行的時候,我們就需要去創建成千上萬個進程么?首先,創建進程需要消耗時間,銷毀進程也需要消耗時間。第二即便開啟了成千上萬的進程,操作系統也不能讓他們同時執行,這樣反而會影響程序的效率。因此我們不能無限制的根據任務開啟或者結束進程。那么我們要怎么做呢?

  在這里,要給大家介紹一個進程池的概念,定義一個池子,在里面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等到處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。如果有很多任務需要執行,池中的進程數量不夠,任務就要等待之前的進程執行任務完畢歸來,拿到空閑進程才能繼續執行。也就是說,池中進程的數量是固定的,那么同一時間最多有固定數量的進程在運行。這樣不會增加操作系統的調度難度,還節省了開閉進程的時間,也一定程度上能夠實現並發效果。

 multiprocess.Pool 模塊

 概念介紹:

Pool([numprocess  [,initializer [, initargs]]]):創建進程池

# 參數介紹:
1 numprocess:要創建的進程數,如果省略,將默認使用cpu_count()的值
2 initializer:是每個工作進程啟動時要執行的可調用對象,默認為None
3 initargs:是要傳給initializer的參數組

# 主要方法:
1 p.apply(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。
2 '''需要強調的是:此操作並不會在所有池工作進程中並執行func函數。如果要通過不同參數並發地執行func函數,
必須從不同線程調用p.apply()函數或者使用p.apply_async()'''
3 p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。
4 '''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,
將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他異步操作中的結果。'''   
5 p.close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成
6 P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之后調用

# 其他方法(了解)
1 方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具有以下方法
2 obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。
如果遠程操作中引發了異常,它將在調用此方法時再次被引發。
3 obj.ready():如果調用完成,返回True 4 obj.successful():如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發異常 5 obj.wait([timeout]):等待結果變為可用。 6 obj.terminate():立即終止所有工作進程,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動調用此函數

 

 代碼實例:

   同步:

# 進程池的 同步調用  apply

import os
import time
from multiprocessing import Pool

def task(num):
    time.sleep(1)
    print('%s : %s' % (num, os.getpid()))
    return num**2

if __name__ == '__main__':
    p = Pool()
    for i in range(20):
        res = p.apply(task, args=(i,))  # 提交任務的方法,同步提交
        print('-->', res)

  異步:

# 進程池的 異步調用  apply_async

import os
import time
from multiprocessing import Pool
# 沒有取返回值
def task(num):
    time.sleep(1)
    print('%s : %s' % (num, os.getpid()))
    return num**2

if __name__ == '__main__':
    p = Pool()
    for i in range(20):
        p.apply_async(task, args=(i,))  # 提交任務的方法,異步提交
    p.close()
    p.join()

# 通過隊列取返回值
def task(num):
    time.sleep(1)
    print('%s : %s' % (num, os.getpid()))
    return num**2

if __name__ == '__main__':
    p = Pool()
    res_lst = []
    for i in range(20):
        res = p.apply_async(task, args=(i,))    # 提交任務的方法,異步提交
        res_lst.append(res)
    for res in res_lst:
        print(res.get())

   map() 方法:

import os
import time
from multiprocessing import Pool

def task(num):
    time.sleep(1)
    print('%s : %s' % (num, os.getpid()))
    return num**2

if __name__ == '__main__':
    p = Pool()
    p.map(task, range(20))

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM