python類庫32[多進程通信Queue+Pipe+Value+Array]



多進程通信

queue和pipe的區別: pipe用來在兩個進程間通信。queue用來在多個進程間實現通信。 此兩種方法為所有系統多進程通信的基本方法,幾乎所有的語言都支持此兩種方法。

 

1)Queue & JoinableQueue

queue用來在進程間傳遞消息,任何可以pickle-able的對象都可以在加入到queue。 

multiprocessing.JoinableQueue 是 Queue的子類,增加了task_done()和join()方法。

 

task_done()用來告訴queue一個task完成。一般地在調用get()獲得一個task,在task結束后調用task_done()來通知Queue當前task完成。

join() 阻塞直到queue中的所有的task都被處理(即task_done方法被調用)。


代碼:

import multiprocessing
import time

class Consumer(multiprocessing.Process):
    
     def  __init__(self, task_queue, result_queue):
        multiprocessing.Process. __init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

     def run(self):
        proc_name = self.name
         while True:
            next_task = self.task_queue.get()
             if next_task  is None:
                 #  Poison pill means shutdown
                 print ( ' %s: Exiting ' % proc_name)
                self.task_queue.task_done()
                 break
             print ( ' %s: %s ' % (proc_name, next_task))
            answer = next_task()  #  __call__()
            self.task_queue.task_done()
            self.result_queue.put(answer)
         return


class Task(object):
     def  __init__(self, a, b):
        self.a = a
        self.b = b
     def  __call__(self):
        time.sleep(0.1)  #  pretend to take some time to do the work
         return  ' %s * %s = %s ' % (self.a, self.b, self.a * self.b)
     def  __str__(self):
         return  ' %s * %s ' % (self.a, self.b)


if  __name__ ==  ' __main__ ':
     #  Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()
    
     #  Start consumers
    num_consumers = multiprocessing.cpu_count()
     print ( ' Creating %d consumers ' % num_consumers)
    consumers = [ Consumer(tasks, results)
                   for i  in range(num_consumers) ]
     for w  in consumers:
        w.start()
    
     #  Enqueue jobs
    num_jobs = 10
     for i  in range(num_jobs):
        tasks.put(Task(i, i))
    
     #  Add a poison pill for each consumer
     for i  in range(num_consumers):
        tasks.put(None)

     #  Wait for all of the tasks to finish
    tasks.join()
    
     #  Start printing results
     while num_jobs:
        result = results.get()
         print ( ' Result: ', result)
        num_jobs -= 1

 注意小技巧: 使用None來表示task處理完畢。

 

運行結果:


 

2) pipe

 

pipe()返回一對連接對象,代表了pipe的兩端。每個對象都有send()和recv()方法。

 

 

代碼:

 

from multiprocessing  import Process, Pipe

def f(conn):
    conn.send([42, None,  ' hello '])
    conn.close()

if  __name__ ==  ' __main__ ':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    p.join()
     print(parent_conn.recv())    #  prints "[42, None, 'hello']"

 

 

3)Value + Array

Value + Array 是python中共享內存 映射文件的方法,速度比較快。

from multiprocessing  import Process, Value, Array

def f(n, a):
    n.value = n.value + 1
     for i  in range(len(a)):
        a[i] = a[i] * 10

if  __name__ ==  ' __main__ ':
    num = Value( ' i ', 1)
    arr = Array( ' i ', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

     print(num.value)
     print(arr[:])
    
    p2 = Process(target=f, args=(num, arr))
    p2.start()
    p2.join()

     print(num.value)
     print(arr[:])

#  the output is :
#
 2
#
 [0, 10, 20, 30, 40, 50, 60, 70, 80, 90]
#
 3
#
 [0, 100, 200, 300, 400, 500, 600, 700, 800, 900]


參考:
The Python Standard Library By Example

http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html 

 

完! 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM