Python3學習之路~10.1 多進程、進程間通信、進程池


一 多進程multiprocessing

multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

import multiprocessing,time

def run(name):
    print("hello",name)
    time.sleep(2)

if __name__ == '__main__':

    for i in range(10):
        p = multiprocessing.Process(target=run,args=('Bob %s'%i,))
        p.start()
import multiprocessing,time,threading

def thread_run():
    print(threading.get_ident()) #線程號

def run(name):
    print("hello",name)
    t = threading.Thread(target=thread_run,)
    t.start()
    time.sleep(2)

if __name__ == '__main__':

    for i in range(10):
        p = multiprocessing.Process(target=run,args=('Bob %s'%i,))
        p.start()
可以在進程中起線程
# 在主進程里調用了info,在子進程了又調用了info,我們看看效果?
# 可以看到,每一個進程都是由父進程啟動的。主程序的父進程是pyCharm,子進程的父進程是主進程。

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid()) #得到父進程ID
    print('process id:', os.getpid()) #得到進程ID
    print("\n\n")

def f(name):
    info('\033[31;1mfunction f\033[0m')
    print('hello', name)

if __name__ == '__main__':
    info('\033[32;1mmain process line\033[0m')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

#####輸出:
####ain process line
####odule name: __main__
####arent process: 8268
####rocess id: 4448

####unction f
####odule name: __mp_main__
####arent process: 4448
####rocess id: 9596

####ello bob
得到進程ID

二 進程間通信

不同進程間內存是不共享的,要想實現兩個進程間的數據交換,可以用以下方法:

1.Queues

首先我們知道,線程之間是數據共享的,子線程放進queue數據,父線程可以取出來。如下示例

import threading,queue

def f():
    q.put([42,None,'hello'])

if __name__ == '__main__':
    q = queue.Queue()
    t = threading.Thread(target=f)
    t.start()
    print(q.get())
    
####輸出:[42, None, 'hello']
線程之間數據共享

把線程改為進程,會發現報錯。

import multiprocessing,queue

def f():
    q.put([42,None,'hello'])

if __name__ == '__main__':
    q = queue.Queue()
    p = multiprocessing.Process(target=f)
    p.start()
    print(q.get())

####輸出報錯:NameError: name 'q' is not defined
將線程改為進程,嘗試數據傳輸,報錯NameError: name 'q' is not defined

報錯的原因是進程之間數據不共享。子進程和父進程分別擁有獨立的內存空間,所以子進程是訪問不了父進程的queue的。那有什么辦法可以使子進程訪問到父進程的queue呢?我們可以嘗試將這個queue當做變量傳給子進程。發現還是報錯。

import multiprocessing,queue

def f(q):
    q.put([42,None,'hello'])

if __name__ == '__main__':
    q = queue.Queue()
    p = multiprocessing.Process(target=f,args=(q,))
    p.start()
    print(q.get())

####輸出報錯:TypeError: can't pickle _thread.lock objects
將線程queue傳遞給子進程是不可以的,報錯TypeError: can't pickle _thread.lock objects

報錯的原因是我們錯將線程queue(通過import queue引入)傳遞給了子進程,實際上我們傳遞給子進程的應該是進程queue(通過from multiprocessing import  Queue引入)。接下來才是正確的示例:

from multiprocessing import Process,Queue #引入進程queue

def f(q):
    q.put([42,None,'hello']) #子進程放入數據

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f,args=(q,)) #將q傳遞給子進程
    p.start()
    print(q.get()) #主進程取出數據

####輸出:[42, None, 'hello']

上面的例子,我們把進程queue傳遞給了子進程,表面上看,子進程和父進程共用一個queue,實際上並不是這樣,而是子進程克隆了一個父進程的queue,子進程將數據放入克隆queue中,克隆queue將其序列化保存,然后進行反序列化后放到父進程的原始queue中,所以嚴格意義上子進程和父進程的queue並不是一個共享queue。

2.Pipes

要想實現兩個進程間的數據傳遞,除了Queues,還可以使用Pipes。

Pipe()返回的兩個連接對象代表管道的兩端。 每個連接對象都有send()和recv()方法(以及其他方法)。

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    print('from parent:',conn.recv())
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print('from son:',parent_conn.recv())
    parent_conn.send('hello')
    p.join()

3.Managers

Queues和Pipes僅能實現兩個進程之間的數據傳遞,而Managers可以實現進程之間數據的共享。

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array. For example,

from multiprocessing import Process, Manager
import os

def f(d,l):
    d[os.getpid()] = os.getpid()
    l.append(os.getpid())
    print(l)

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()    #生成一個字典,可在多個進程間共享和傳遞
        l = manager.list(range(5))    #生成一個列表,可在多個進程間共享和傳遞
        p_list = []
        for i in range(10):
            p = Process(target=f,args=(d,l))
            p.start()
            p_list.append(p)
        for res in p_list: #等待結果
            res.join()

        print(d)
[0, 1, 2, 3, 4, 8512]
[0, 1, 2, 3, 4, 8512, 11060]
[0, 1, 2, 3, 4, 8512, 11060, 4820]
[0, 1, 2, 3, 4, 8512, 11060, 4820, 9496]
[0, 1, 2, 3, 4, 8512, 11060, 4820, 9496, 4264]
[0, 1, 2, 3, 4, 8512, 11060, 4820, 9496, 4264, 8420]
[0, 1, 2, 3, 4, 8512, 11060, 4820, 9496, 4264, 8420, 9184]
[0, 1, 2, 3, 4, 8512, 11060, 4820, 9496, 4264, 8420, 9184, 6592]
[0, 1, 2, 3, 4, 8512, 11060, 4820, 9496, 4264, 8420, 9184, 6592, 9808]
[0, 1, 2, 3, 4, 8512, 11060, 4820, 9496, 4264, 8420, 9184, 6592, 9808, 5064]
{8512: 8512, 11060: 11060, 4820: 4820, 9496: 9496, 4264: 4264, 8420: 8420, 9184: 9184, 6592: 6592, 9808: 9808, 5064: 5064}
輸出

進程鎖

雖然進程之間是獨立運行的,但是對於各進程來說,終端屏幕是共享的,為了防止輸出結果時,各個進程爭搶輸出,造成打印結果混亂,可以給進程加一把鎖。

from multiprocessing import Process,Lock

def f(l,i):
    l.acquire() #得到鎖
    print("hello world",i)
    l.release() #釋放鎖

if __name__ == '__main__':
    lock = Lock() #生成鎖的實例

    for num in range(10):
        Process(target=f,args=(lock,num)).start() #將lock傳遞給子進程

三 進程池

我們每起一個進程實際上就是克隆一份父進程數據給子進程使用,起多個進程時就會占用很多內存空間。為了節省開銷,我們使用進程池。進程池就是限制同一時間有多少個進程運行。

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進程,那么程序就會等待,直到進程池中有可用進程為止。

進程池中有兩個方法:

  • apply                #同步執行,即串行
  • apply_async     #異步執行,即並發
from multiprocessing import Pool
import time,os

def Foo(i):
    time.sleep(2)
    print('my processid is ',os.getpid())
    return  i+100


if __name__ == '__main__': #windows上運行進程池必須加這行代碼,否則報錯
    pool = Pool(5)  #運行進程池中同時放入5個進程

    for i in range(10):
        # pool.apply(func=Foo,args=(i,))  #同步執行,即串行
        pool.apply_async(func=Foo, args=(i,))  # 異步執行,即並發,此時有10個進程,同時執行的有5個,其他的掛起

    print('end')
    pool.close() #注意:一定要先關閉進程池再join
    pool.join() #表示等進程池中進程執行完畢后稱程序再關閉,如果注釋,則程序直接關閉。

下面的例子,實現了主進程起了10個子進程,分別執行Foo函數,每次子進程執行完畢后,父進程回調Bar函數(可觀察到執行Bar函數的進程ID與主進程ID相同)。

from multiprocessing import Pool
import time,os

def Foo(i):
    time.sleep(2)
    print('my processid is ',os.getpid())
    return  i+100

def Bar(arg):
    print('--exec done:',arg,'my processid is ',os.getpid())

if __name__ == '__main__': #windows上運行進程池必須加這行代碼,否則報錯
    pool = Pool(5)  #運行進程池中同時放入5個進程

    for i in range(10):
        pool.apply_async(func=Foo, args=(i,),callback=Bar)  # callback=回調

    print('end',os.getpid())
    pool.close() #注意:先close再join
    pool.join() #表示等進程池中進程執行完畢后稱程序再關閉,如果注釋,則程序直接關閉。
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM