【python進階】深入理解系統進程2


前言

在上一篇【python進階】深入理解系統進程1中,我們講述了多任務的一些概念,多進程的創建,fork等一些問題,這一節我們繼續接着講述系統進程的一些方法及注意點

multiprocessing

如果你打算編寫多進程的服務程序,Unix/Linux⽆疑是正確的選擇。由於 Windows沒有fork調⽤,難道在Windows上⽆法⽤Python編寫多進程的程 序?
由於Python是跨平台的,⾃然也應該提供⼀個跨平台的多進程⽀持。 multiprocessing模塊就是跨平台版本的多進程模塊。
multiprocessing模塊提供了⼀個Process類來代表⼀個進程對象,下⾯的例⼦ 演示了啟動⼀個⼦進程並等待其結束:

from multiprocessing import Process
import os

# 子進程要執行的代碼
def run_proc(name):
    print('子進程運行中,name= %s ,pid=%d...' % (name, os.getpid()))

if __name__=='__main__':
    print('父進程 %d.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('子進程將要執行')
    p.start()
    p.join()
    print('子進程已結束')

運行結果:

說明

  • 創建子進程時,只需要傳入一個執行函數和函數的參數,創建一個Process實例,用start()方法啟動,這樣創建進程比fork()還要簡單。
  • join()方法可以等待子進程結束后再繼續往下運行,通常用於進程間的同步。

Process語法結構如下:

Process([group [, target [, name [, args [, kwargs]]]]])

  • target:表示這個進程實例所調用對象;

  • args:表示調用對象的位置參數元組;

  • kwargs:表示調用對象的關鍵字參數字典;

  • name:為當前進程實例的別名;

  • group:大多數情況下用不到;

Process類常用方法:

  • is_alive():判斷進程實例是否還在執行;

  • join([timeout]):是否等待進程實例執行結束,或等待多少秒;

  • start():啟動進程實例(創建子進程);

  • run():如果沒有給定target參數,對這個對象調用start()方法時,就將執行對象中的run()方法;

  • terminate():不管任務是否完成,立即終止;

Process類常用屬性:

  • name:當前進程實例別名,默認為Process-N,N為從1開始遞增的整數;

  • pid:當前進程實例的PID值;

實例1

from multiprocessing import Process
import os
from time import sleep

# 子進程要執行的代碼
def run_proc(name, age, **kwargs):
    for i in range(10):
        print('子進程運行中,name= %s,age=%d ,pid=%d...' % (name, age,os.getpid()))
        print(kwargs)
        sleep(0.5)

if __name__=='__main__':
    print('父進程 %d.' % os.getpid())
    p = Process(target=run_proc, args=('test',18), kwargs={"m":20})
    print('子進程將要執行')
    p.start()
    sleep(1)
    p.terminate()
    p.join()
    print('子進程已結束')

運行結果:

實例2

from multiprocessing import Process
import time
import os

#兩個子進程將會調用的兩個方法
def  worker_1(interval):
    print("worker_1,父進程(%s),當前進程(%s)"%(os.getppid(),os.getpid()))
    t_start = time.time()
    time.sleep(interval) #程序將會被掛起interval秒
    t_end = time.time()
    print("worker_1,執行時間為'%0.2f'秒"%(t_end - t_start))

def  worker_2(interval):
    print("worker_2,父進程(%s),當前進程(%s)"%(os.getppid(),os.getpid()))
    t_start = time.time()
    time.sleep(interval)
    t_end = time.time()
    print("worker_2,執行時間為'%0.2f'秒"%(t_end - t_start))

#輸出當前程序的ID
print("進程ID:%s"%os.getpid())

#創建兩個進程對象,target指向這個進程對象要執行的對象名稱,
#args后面的元組中,是要傳遞給worker_1方法的參數,
#因為worker_1方法就一個interval參數,這里傳遞一個整數2給它,
#如果不指定name參數,默認的進程對象名稱為Process-N,N為一個遞增的整數
p1=Process(target=worker_1,args=(2,))
p2=Process(target=worker_2,name="dongGe",args=(1,))

#使用"進程對象名稱.start()"來創建並執行一個子進程,
#這兩個進程對象在start后,就會分別去執行worker_1和worker_2方法中的內容
p1.start()
p2.start()

#同時父進程仍然往下執行,如果p2進程還在執行,將會返回True
print("p2.is_alive=%s"%p2.is_alive())

#輸出p1和p2進程的別名和pid
print("p1.name=%s"%p1.name)
print("p1.pid=%s"%p1.pid)
print("p2.name=%s"%p2.name)
print("p2.pid=%s"%p2.pid)

#join括號中不攜帶參數,表示父進程在這個位置要等待p1進程執行完成后,
#再繼續執行下面的語句,一般用於進程間的數據同步,如果不寫這一句,
#下面的is_alive判斷將會是True,在shell(cmd)里面調用這個程序時
#可以完整的看到這個過程,大家可以嘗試着將下面的這條語句改成p1.join(1),
#因為p2需要2秒以上才可能執行完成,父進程等待1秒很可能不能讓p1完全執行完成,
#所以下面的print會輸出True,即p1仍然在執行
p1.join()
print("p1.is_alive=%s"%p1.is_alive())

執行結果:

進程的創建-Process子類

創建新的進程還能夠使用類的方式,可以自定義一個類,繼承Process類,每次實例化這個類的時候,就等同於實例化一個進程對象,請看下面的實例:

from multiprocessing import Process
import time
import os

#繼承Process類
class Process_Class(Process):
    #因為Process類本身也有__init__方法,這個子類相當於重寫了這個方法,
    #但這樣就會帶來一個問題,我們並沒有完全的初始化一個Process類,所以就不能使用從這個類繼承的一些方法和屬性,
    #最好的方法就是將繼承類本身傳遞給Process.__init__方法,完成這些初始化操作
    def __init__(self,interval):
        Process.__init__(self)
        self.interval = interval

    #重寫了Process類的run()方法
    def run(self):
        print("子進程(%s) 開始執行,父進程為(%s)"%(os.getpid(),os.getppid()))
        t_start = time.time()
        time.sleep(self.interval)
        t_stop = time.time()
        print("(%s)執行結束,耗時%0.2f秒"%(os.getpid(),t_stop-t_start))

if __name__=="__main__":
    t_start = time.time()
    print("當前程序進程(%s)"%os.getpid())        
    p1 = Process_Class(2)
    #對一個不包含target屬性的Process類執行start()方法,就會運行這個類中的run()方法,所以這里會執行p1.run()
    p1.start()
    p1.join()
    t_stop = time.time()
    print("(%s)執行結束,耗時%0.2f"%(os.getpid(),t_stop-t_start))

執行結果:

進程池Pool

當需要創建的子進程數量不多時,可以直接利用multiprocessing中的Process動態成生多個進程,但如果是上百甚至上千個目標,手動的去創建進程的工作量巨大,此時就可以用到multiprocessing模塊提供的Pool方法。

初始化Pool時,可以指定一個最大進程數,當有新的請求提交到Pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到指定的最大值,那么該請求就會等待,直到池中有進程結束,才會創建新的進程來執行,請看下面的實例:

from multiprocessing import Pool
import os,time,random

def worker(msg):
    t_start = time.time()
    print("%s開始執行,進程號為%d"%(msg,os.getpid()))
    #random.random()隨機生成0~1之間的浮點數
    time.sleep(random.random()*2) 
    t_stop = time.time()
    print(msg,"執行完畢,耗時%0.2f"%(t_stop-t_start))

po=Pool(3) #定義一個進程池,最大進程數3
for i in range(0,10):
    #Pool.apply_async(要調用的目標,(傳遞給目標的參數元祖,))
    #每次循環將會用空閑出來的子進程去調用目標
    po.apply_async(worker,(i,))

print("----start----")
po.close() #關閉進程池,關閉后po不再接收新的請求
po.join() #等待po中所有子進程執行完成,必須放在close語句之后
print("-----end-----")

運行結果:

multiprocessing.Pool常用函數解析:

  • apply_async(func[, args[, kwds]]) :使用非阻塞方式調用func(並行執行,堵塞方式必須等待上一個進程退出才能執行下一個進程),args為傳遞給func的參數列表,kwds為傳遞給func的關鍵字參數列表;

  • apply(func[, args[, kwds]]):使用阻塞方式調用func

  • close():關閉Pool,使其不再接受新的任務;

  • terminate():不管任務是否完成,立即終止;

  • join():主進程阻塞,等待子進程的退出, 必須在close或terminate之后使用;

apply堵塞式

from multiprocessing import Pool
import os,time,random

def worker(msg):
    t_start = time.time()
    print("%s開始執行,進程號為%d"%(msg,os.getpid()))
    #random.random()隨機生成0~1之間的浮點數
    time.sleep(random.random()*2) 
    t_stop = time.time()
    print(msg,"執行完畢,耗時%0.2f"%(t_stop-t_start))

po=Pool(3) #定義一個進程池,最大進程數3
for i in range(0,10):
    po.apply(worker,(i,))

print("----start----")
po.close() #關閉進程池,關閉后po不再接收新的請求
po.join() #等待po中所有子進程執行完成,必須放在close語句之后
print("-----end-----")

運行結果:

進程間通信-Queue

Process之間有時需要通信,操作系統提供了很多機制來實現進程間的通信。

1. Queue的使用

可以使用multiprocessing模塊的Queue實現多進程之間的數據傳遞,Queue本身是一個消息列隊程序,首先用一個小實例來演示一下Queue的工作原理:

from multiprocessing import Queue
q=Queue(3) #初始化一個Queue對象,最多可接收三條put消息
q.put("消息1") 
q.put("消息2")
print(q.full())  #False
q.put("消息3")
print(q.full()) #True

#因為消息列隊已滿下面的try都會拋出異常,第一個try會等待2秒后再拋出異常,第二個Try會立刻拋出異常
try:
    q.put("消息4",True,2)
except:
    print("消息列隊已滿,現有消息數量:%s"%q.qsize())

try:
    q.put_nowait("消息4")
except:
    print("消息列隊已滿,現有消息數量:%s"%q.qsize())

#推薦的方式,先判斷消息列隊是否已滿,再寫入
if not q.full():
    q.put_nowait("消息4")

#讀取消息時,先判斷消息列隊是否為空,再讀取
if not q.empty():
    for i in range(q.qsize()):
        print(q.get_nowait())

運行結果:

說明

初始化Queue()對象時(例如:q=Queue()),若括號中沒有指定最大可接收的消息數量,或數量為負值,那么就代表可接受的消息數量沒有上限(直到內存的盡頭);

  • Queue.qsize():返回當前隊列包含的消息數量;

  • Queue.empty():如果隊列為空,返回True,反之False ;

  • Queue.full():如果隊列滿了,返回True,反之False;

  • Queue.get([block[, timeout]]):獲取隊列中的一條消息,然后將其從列隊中移除,block默認值為True;

1)如果block使用默認值,且沒有設置timeout(單位秒),消息列隊如果為空,此時程序將被阻塞(停在讀取狀態),直到從消息列隊讀到消息為止,如果設置了timeout,則會等待timeout秒,若還沒讀取到任何消息,則拋出"Queue.Empty"異常;

2)如果block值為False,消息列隊如果為空,則會立刻拋出"Queue.Empty"異常;

  • Queue.get_nowait():相當Queue.get(False);

  • Queue.put(item,[block[, timeout]]):將item消息寫入隊列,block默認值為True;

1)如果block使用默認值,且沒有設置timeout(單位秒),消息列隊如果已經沒有空間可寫入,此時程序將被阻塞(停在寫入狀態),直到從消息列隊騰出空間為止,如果設置了timeout,則會等待timeout秒,若還沒空間,則拋出"Queue.Full"異常;

2)如果block值為False,消息列隊如果沒有空間可寫入,則會立刻拋出"Queue.Full"異常;

  • Queue.put_nowait(item):相當Queue.put(item, False);

2. Queue實例

我們以Queue為例,在父進程中創建兩個子進程,一個往Queue里寫數據,一個從Queue里讀數據:

from multiprocessing import Process, Queue
import os, time, random

# 寫數據進程執行的代碼:
def write(q):
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 讀數據進程執行的代碼:
def read(q):
    while True:
        if not q.empty():
            value = q.get(True)
            print('Get %s from queue.' % value)
            time.sleep(random.random())
        else:
            break

if __name__=='__main__':
    # 父進程創建Queue,並傳給各個子進程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 啟動子進程pw,寫入:
    pw.start()    
    # 等待pw結束:
    pw.join()
    # 啟動子進程pr,讀取:
    pr.start()
    pr.join()
    # pr進程里是死循環,無法等待其結束,只能強行終止:
    print('')
    print('所有數據都寫入並且讀完')

運行結果:

3. 進程池中的Queue

如果要使用Pool創建進程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否則會得到一條如下的錯誤信息:

RuntimeError: Queue objects should only be shared between processes through inheritance.

下面的實例演示了進程池中的進程如何通信:

#修改import中的Queue為Manager
from multiprocessing import Manager,Pool
import os,time,random

def reader(q):
    print("reader啟動(%s),父進程為(%s)"%(os.getpid(),os.getppid()))
    for i in range(q.qsize()):
        print("reader從Queue獲取到消息:%s"%q.get(True))

def writer(q):
    print("writer啟動(%s),父進程為(%s)"%(os.getpid(),os.getppid()))
    for i in "dongGe":
        q.put(i)

if __name__=="__main__":
    print("(%s) start"%os.getpid())
    q=Manager().Queue() #使用Manager中的Queue來初始化
    po=Pool()
    #使用阻塞模式創建進程,這樣就不需要在reader中使用死循環了,可以讓writer完全執行完成后,再用reader去讀取
    po.apply(writer,(q,))
    po.apply(reader,(q,))
    po.close()
    po.join()
    print("(%s) End"%os.getpid())

運行結果:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM