Python並行編程(十一):基於進程的並行


1、基本概念

      多進程主要用multiprocessing和mpi4py這兩個模塊。

      multiprocessing是Python標准庫中的模塊,實現了共享內存機制,可以讓運行在不同處理器核心的進程能讀取共享內存。

      mpi4py庫實現了消息傳遞的編程范例(設計模式)。簡單來說就是進程之間不靠任何共享信息來進行通訊,所有的交流都通過傳遞信息代替。

      這與使用共享內存通訊、加鎖或類似機制實現互斥的技術形成對比。在信息傳遞的代碼中,進程通過send和receive進行交流。

 

2、創建一個進程

      由父進程創建子進程。父進程既可以在產生子進程之后繼續異步執行,也可以暫停等待子進程創建完成之后再繼續執行。創建進程的步驟如下:

      1. 創建進程對象

      2. 調用start()方法,開啟進程的活動

      3. 調用join()方法,在進程結束之前一直等待

 

3、創建進程用例

# coding : utf-8

import multiprocessing

def foo(i):
    print('called function in process: %s' %i)
    return

if __name__ == '__main__':
    Process_jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=foo, args=(i, ))
        Process_jobs.append(p)
        p.start()
        p.join()

      運行結果:

      

      創建進程對象的時候需要分配一個函數,作為進程的執行任務,本例為foo()。最后進程對象調用join()方法,如果沒有join主進程退出之后子進程會留在idle中。

      提示:為了預防無限遞歸調用,可以在不同腳本文件中定義目標函數,然后導入進來使用。

 

4、進程命名

      進程命名和線程命名大同小異。

      使用示例:

# coding:utf-8

import multiprocessing
import time

def foo():
    # get name of process
    name = multiprocessing.current_process().name
    print("Starting %s \n" %name)
    time.sleep(3)
    print("Exiting %s \n" %name)

if __name__ == '__main__':
    # create process with DIY name
    process_with_name = multiprocessing.Process(name='foo_process', target=foo)
    # process_with_name.daemon = True

    # create process with default name
    process_with_default_name = multiprocessing.Process(target=foo)

    process_with_name.start()
    process_with_default_name.start()

 

5、殺死一個進程

      通過terminate方法殺死一個進程,也可以使用is_alive方法判斷一個進程是否存活。

      測試用例:

import multiprocessing, time

def foo():
    print('Starting function')
    time.sleep(0.1)
    print('Finished function')

if __name__ == '__main__':
    p = multiprocessing.Process(target=foo)
    print('Process before execution:', p, p.is_alive())
    p.start()
    print('Process running:', p, p.is_alive())
    p.terminate()
    print('Process terminated:', p, p.is_alive())
    p.join()
    print('Process joined:', p, p.is_alive())
    print('Process exit code:',p.exitcode)

      運行結果:

      

      正常結束返回值為0,且foo會被執行。exitcode為0為正常結束,為負表示信號殺死,大於0進程有錯誤。

 

6、子類中使用進程

      實現一個自定義的進程子類,需要以下三步:

      - 定義Process子類

      - 覆蓋__init__(self [,args])方法來添加額外的參數

      - 覆蓋run方法來實現Process啟動的時候執行的任務

      創建Process子類之后,可以創建它的實例。並且通過start方法啟動它,啟動之后會運行run方法。

      測試用例:

# coding:utf-8

import multiprocessing

class MyProcess(multiprocessing.Process):
    def run(self):
        print('called run method in process: %s' %self.name)
        return

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = MyProcess()
        jobs.append(p)
        p.start()
        p.join()

      運行結果:

      

 

7、進程之間交換對象

      並行應用常常需要在進程之間交換數據。Multiprocessing庫有兩個Communication Channel可以交換對象:隊列queue和管道pipe。

      使用隊列交換對象:

            Queue返回一個進程共享的隊列,是線程安全的,也是進程安全的。任何可序列化的對象(Python通過pickable模塊序列化對象)都可以通過它進行交換。

      測試用例:

import multiprocessing
import random
import time

class Producer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        for i in range(10):
            item = random.randint(0,256)
            self.queue.put(item)
            print("Process Producer:item %d appended to queue %s" %(item, self.name))
            time.sleep(1)
            print("The size of queue is %s" % self.queue.qsize())

class Consumer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            if self.queue.empty():
                print("The queue is empty")
                break
            else:
                time.sleep(2)
                item = self.queue.get()
                print("Process Consumer:item %d popped from by %s \n" %(item, self.name))
                time.sleep(1)

if __name__ == "__main__":
    # create Queue in the main process
    queue = multiprocessing.Queue()
    # create 
    process_producer = Producer(queue)
    process_consumer = Consumer(queue)

    process_producer.start()
    process_consumer.start()
    process_producer.join()
    process_consumer.join()

      運行結果:

Process Producer:item 106 appended to queue Producer-1
The size of queue is 1
Process Producer:item 167 appended to queue Producer-1
The size of queue is 2
Process Producer:item 202 appended to queue Producer-1
Process Consumer:item 106 popped from by Consumer-2 

The size of queue is 2
Process Producer:item 124 appended to queue Producer-1
The size of queue is 3
Process Producer:item 19 appended to queue Producer-1
The size of queue is 4
Process Producer:item 5 appended to queue Producer-1
Process Consumer:item 167 popped from by Consumer-2 

The size of queue is 4
Process Producer:item 178 appended to queue Producer-1
The size of queue is 5
Process Producer:item 207 appended to queue Producer-1
The size of queue is 6
Process Producer:item 154 appended to queue Producer-1
Process Consumer:item 202 popped from by Consumer-2 

The size of queue is 6
Process Producer:item 228 appended to queue Producer-1
The size of queue is 7
Process Consumer:item 124 popped from by Consumer-2 

Process Consumer:item 19 popped from by Consumer-2 

Process Consumer:item 5 popped from by Consumer-2 

Process Consumer:item 178 popped from by Consumer-2 

Process Consumer:item 207 popped from by Consumer-2 

Process Consumer:item 154 popped from by Consumer-2 

Process Consumer:item 228 popped from by Consumer-2 

The queue is empty

      隊列補充:

            隊列還有一個JoinaleQueue子類,有以下兩個額外的方法:

            - task_done():此方法意味着之前入隊的一個任務已經完成,比如,get方法從隊列取回item之后調用。所以此方法只能被隊列的消費者調用。

            - join():此方法將進程阻塞,直到隊列中的item全部被取出並執行。

            因為使用隊列進行通信是一個單向的、不確定的過程,所以你不知道什么時候隊列的元素被取出來了,所以使用task_done來表示隊列里的一個任務已經完成,這個方法一般和join一起使用,當隊列的所有任務都處理之后,也就是說put到隊列的每個任務都調用task_done方法后,join才會完成阻塞。

            JoinaleQueue測試用例:

from multiprocessing import Process, JoinableQueue
import time,random
def consumer(name, q):
    while True:
        time.sleep(1)
        get_res = q.get()
        print("%s got %s" %(name, get_res))
        q.task_done()

def producer(seq, q):
    for item in seq:
        # time.sleep(1)
        q.put(item)
        print("Produced %s" %item)
    # block main process and don't run "print("Ended")"
    q.join()

if __name__ == "__main__":
    q = JoinableQueue()
    seq = ("item-%s" %i for i in range(10))

    c1 = Process(target=consumer, args=("c1", q))
    c2 = Process(target=consumer, args=("c2", q))
    c3 = Process(target=consumer, args=("c3", q))

    c1.daemon = True
    c2.daemon = True
    c3.daemon = True

    c1.start()
    c2.start()
    c3.start()

    # start producer
    producer(seq,q)

    # run the command when all the item is consumed
    print("Ended")

      使用管道交換對象:

      一個管道可以做一下事情:

      - 返回一對被管道連接的連接對象

      - 然后對象使用send/receive方法可以在進程之間通信

      簡單示例:

import multiprocessing

def create_items(pipe):
    output_pipe, _ = pipe
    for item in range(10):
        output_pipe.send(item)
    output_pipe.close()

def multiply_items(pipe_1, pipe_2):
    close, input_pipe = pipe_1
    close.close()
    output_pipe, _ = pipe_2
    try:
        while True:
            item = input_pipe.recv()
            output_pipe.send(item * item)
    except EOFError:
        output_pipe.close()


if __name__ == "__main__":
    # The first pipe sends numbers
    pipe_1 = multiprocessing.Pipe(True)
    process_pipe_1 = multiprocessing.Process(target=create_items, args=(pipe_1,))
    process_pipe_1.start()

    # The second pipe receives numbers and Calculations
    pipe_2 = multiprocessing.Pipe(True)
    process_pipe_2 = multiprocessing.Process(target=multiply_items, args=(pipe_1, pipe_2))
    process_pipe_2.start()

    pipe_1[0].close()
    pipe_2[0].close()

    try:
        while True:
            # print(pipe_2)
            print(pipe_2[1].recv())
    except EOFError:
        print("End")

      上述代碼定義兩個進程,一個發送數字0-9到管道pipe_1,另一個進程通過receive獲取pipe_1的數字,並進行平方,然后將結果輸出到管道pipe_2中。最后通過recv獲取pipe_2的數據。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM