Python多進程編程


一 多進程編程

Python實現多進程的方式有兩種:一種方法是os模塊中的fork方法,另一種是使用multiprocessing模塊。

前者僅適用於LINUX/UNIX操作系統,對Windows不支持,后者則是跨平台的實現方式。

第一種方式:使用os模塊中的fork方式實現多進程

import os
if __name__ == '__main__':
    print 'current Process (%s) start ...'%(os.getpid())
    pid = os.fork()
    if pid < 0:
        print 'error in fork'
    elif pid == 0:
        print 'I am child process(%s) and my parent process is (%s)',(os.getpid(),os.getppid())
    else:
        print 'I(%s) created a chlid process (%s).',(os.getpid(),pid)

第二種方式:multiprocessing

由於GIL的存在,python中的多線程其實並不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。Python提供了非常好用的多進程包multiprocessing,只需要定義一個函數,Python會完成其他所有事情。借助這個包,可以輕松完成從單進程到並發執行的轉換。multiprocessing支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。

  • 在UNIX平台上,當某個進程終結之后,該進程需要被其父進程調用wait,否則進程成為僵屍進程(Zombie)。所以,有必要對每個Process對象調用join()方法 (實際上等同於wait)。對於多線程來說,由於只有一個進程,所以不存在此必要性。
  • multiprocessing提供了threading包中沒有的IPC(比如Pipe和Queue),效率上更高。應優先考慮Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因為它們占據的不是用戶進程的資源)。
  • 多進程應該避免共享資源。在多線程中,我們可以比較容易地共享資源,比如使用全局變量或者傳遞參數。在多進程情況下,由於每個進程有自己獨立的內存空間,以上方法並不合適。此時我們可以通過共享內存和Manager的方法來共享資源。但這樣做提高了程序的復雜度,並因為同步的需要而降低了程序的效率。

Process.PID中保存有PID,如果進程還沒有start(),則PID為None。

window系統下,需要注意的是要想啟動一個子進程,必須加上那句if __name__ == "main",進程相關的要寫在這句下面。

創建進程的類:Process([group [, target [, name [, args [, kwargs]]]]]),target表示調用對象,args表示調用對象的位置參數元組。kwargs表示調用對象的字典。name為別名。group實質上不使用。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()啟動某個進程。join()方法實現進程間的同步。

#__author: greg
#date: 2017/9/19 23:52
from multiprocessing import Process
import time

def f(name):
    time.sleep(1)
    print('hello', name,time.ctime())

if __name__ == '__main__':
    p_list=[]
    for i in range(3):
        p = Process(target=f, args=('alvin',))
        p_list.append(p)
        p.start()
    for i in p_list:
        i.join()
    print('end')#一個主進程,三個子進程


# output:
# hello alvin Fri Nov 24 19:10:08 2017
# hello alvin Fri Nov 24 19:10:08 2017
# hello alvin Fri Nov 24 19:10:08 2017
# end

類式調用:

#__author: greg
#date: 2017/9/21 20:02
from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self):
        super(MyProcess, self).__init__()
        #self.name = name
    def run(self):
        time.sleep(1)
        print ('hello', self.name,time.ctime())
if __name__ == '__main__': p_list=[] for i in range(3): p = MyProcess() p.start() p_list.append(p) for p in p_list: p.join() print('end') #output: # hello MyProcess-1 Fri Nov 24 19:12:17 2017 # hello MyProcess-2 Fri Nov 24 19:12:17 2017 # hello MyProcess-3 Fri Nov 24 19:12:17 2017 # end

顯示進程ID號:

#__author: greg
#date: 2017/9/21 20:16
from multiprocessing import Process
import os
import time
def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())#父進程號
    print('process id:', os.getpid())#進程號

def f(name):
    info('\033[31;1mfunction f\033[0m')
    print('hello', name)

if __name__ == '__main__':
    info('\033[32;1mmain process line\033[0m')
    time.sleep(10)
    p = Process(target=info, args=('bob',))
    p.start()
    p.join()

#output:
# main process line
# module name: __main__
# parent process: 1548 pycharm的進程號
# process id: 8416  Python進程號
# bob
# module name: __mp_main__
# parent process: 8416  Python進程號
# process id: 5556  info進程號

二 Process類

構造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 線程組,目前還沒有實現,庫引用中提示必須是None; 
  target: 要執行的方法; 
  name: 進程名; 
  args/kwargs: 要傳入方法的參數。

實例方法:

  is_alive():返回進程是否在運行。

  join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。

  start():進程准備就緒,等待CPU調度

  run():strat()調用run方法,如果實例進程時未制定傳入target,這star執行t默認run()方法。

  terminate():不管任務是否完成,立即停止工作進程

屬性:

  authkey

  daemon:和線程的setDeamon功能一樣

  exitcode(進程在運行時為None、如果為–N,表示被信號N結束)

  name:進程名字。

  pid:進程號。

三 進程間通訊

不同進程間內存是不共享的,要想實現兩個進程間的數據交換,可以用以下方法:

Queues 用來在多個進程間通信

1. 阻塞模式

import queue
import time

q = queue.Queue(10) #創建一個隊列
start=time.time()
for i in range(10):
q.put('A')
time.sleep(0.5)
end=time.time()
print(end-start)

這是一段極其簡單的代碼(另有兩個線程也在操作隊列q),我期望每隔0.5秒寫一個'A'到隊列中,但總是不能如願:
間隔時間有時會遠遠超過0.5秒。
原來,Queue.put()默認有 block = True 和 timeout兩個參數。
源碼:def put(self, item, block=True, timeout=None):
當 block = True 時,寫入是阻塞式的,阻塞時間由 timeout確定。
當隊列q被(其他線程)寫滿后,這段代碼就會阻塞,直至其他線程取走數據。
Queue.put()方法加上 block=False 的參數,即可解決這個隱蔽的問題。
但要注意,非阻塞方式寫隊列,當隊列滿時會拋出 exception Queue.Full 的異常。

#__author: greg
#date: 2017/9/21 22:27
from multiprocessing import Process, Queue

def f(q,n):
    q.put([42, n, 'hello'])
    print('subprocess id',id(q))

if __name__ == '__main__':
    q = Queue()
    p_list=[]
    print('process id',id(q))
    for i in range(3):
        p = Process(target=f, args=(q,i))
        p_list.append(p)
        p.start()
    print(q.get())
    print(q.get())
    print(q.get())
    for i in p_list:
        i.join()

# output
# process id 2284856854176
# subprocess id 2607348001872
# [42, 0, 'hello']
# subprocess id 1712786975824
# [42, 2, 'hello']
# subprocess id 2254764977120
# [42, 1, 'hello']

Pipe常用來兩個進程間進行通信,兩個進程分別位於管道的兩端

 

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())  # prints "[42, None, 'hello']"
    p.join()

 

 

Pipe方法返回(conn1, conn2)代表一個管道的兩個端。Pipe方法有duplex參數,如果duplex參數為True(默認值),那么這個管道是全雙工模式,也就是說conn1和conn2均可收發。duplex為False,conn1只負責接受消息,conn2只負責發送消息。

 

send和recv方法分別是發送和接受消息的方法。例如,在全雙工模式下,可以調用conn1.send發送消息,conn1.recv接收消息。如果沒有消息可接收,recv方法會一直阻塞。如果管道已經被關閉,那么recv方法會拋出EOFError。 
#__author: greg
#date: 2017/9/21 22:57
import multiprocessing
import random
import time,os

def proc_send(pipe,urls):
    for url in urls:
        print("Process(%s) send: %s" %(os.getpid(),url))
        pipe.send(url)
        time.sleep(random.random())

def proc_recv(pipe):
    while True:
        print("Process(%s) rev:%s" %(os.getpid(),pipe.recv()))
        time.sleep(random.random())

if __name__=="__main__":
    pipe=multiprocessing.Pipe()
    p1=multiprocessing.Process(target=proc_send,args=(pipe[0],['url_'+str(i)
                                                      for i in range(10)]))
    p2=multiprocessing.Process(target=proc_recv,args=(pipe[1],))
    p1.start()
    p2.start()
    p1.join()
    p2.terminate()

Manager()返回的管理器對象控制一個服務器進程,該進程持有Python對象,並允許其他進程使用代理來操縱它們。

#__author: greg
#date: 2017/9/21 23:10
from multiprocessing import Process, Manager
def f(d, l,n):
    d[n] = '1'
    d['2'] = 2
    d[0.25] = None
    l.append(n)
    # print(l)

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(5))
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l,i))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()
        print(d)
        print(l)

四 進程同步

當多個進程需要訪問共享資源的時候,Lock可以用來避免訪問的沖突。

#__author: greg
#date: 2017/9/21 23:25
from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()
    for num in range(10):
        Process(target=f, args=(lock, num)).start()

五 進程池 Pool類

Pool可以提供指定數量的進程供用戶使用,默認大小是CPU的核數。當有新的請求提交到Pool中時,如果池還沒有滿,那么就會創建一個新的進程來執行該請求

但如果池中的進程數已經達到規定的最大值,那么該請求就會等待,直到池中有進程結束,才會創建新的進程來處理它。

# -*- coding: utf-8 -*-
# 2017/11/24 20:15
from multiprocessing import Pool
import os, time, random

def run_task(name):
    print('Task %s (pid = %s) is running...' % (name, os.getpid()))
    time.sleep(random.random() * 3)
    print('Task %s end.' % name)

if __name__=='__main__':
    print('Current process %s.' % os.getpid())
    p = Pool(processes=3)
    for i in range(5):
        p.apply_async(run_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')


"""
Current process 9788.
Waiting for all subprocesses done...
Task 0 (pid = 5916) is running...
Task 1 (pid = 3740) is running...
Task 2 (pid = 6964) is running...
Task 2 end.
Task 3 (pid = 6964) is running...
Task 1 end.
Task 4 (pid = 3740) is running...
Task 0 end.
Task 3 end.
Task 4 end.
All subprocesses done.
"""
  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞
  • close()    關閉pool,使其不在接受新的任務。
  • terminate()    結束工作進程,不在處理未完成的任務。
  • join()    主進程阻塞,等待子進程的退出, join方法要在close或terminate之后使用。

每次最多運行3個進程,當一個任務結束了,新的任務依次添加進來,任務執行使用的進程依然是原來的進程,這一點通過進程的pid可以看出來。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM