multiprocess模塊


仔細說來,multiprocess不是一個模塊而是python中一個操作、管理進程的包。 之所以叫multi是取自multiple的多功能的意思,在這個包中幾乎包含了和進程有關的所有子模塊。由於提供的子模塊非常多,為了方便大家歸類記憶,我將這部分大致分為四個部分:創建進程部分,進程同步部分,進程池部分,進程之間數據共享。



multiprocess.process模塊

process模塊介紹

process模塊是一個創建進程的模塊,借助這個模塊,就可以完成進程的創建。

復制代碼
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動)

強調:
1. 需要使用關鍵字的方式來指定參數
2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號

參數介紹:
1 group參數未使用,值始終為None
2 target表示調用對象,即子進程要執行的任務
3 args表示調用對象的位置參數元組,args=(1,2,'egon',)
4 kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}
5 name為子進程的名稱
復制代碼
1 p.start():啟動進程,並調用該子進程中的p.run() 
2 p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法  
3 p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖
4 p.is_alive():如果p仍然運行,返回True
5 p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程  
復制代碼
1 p.start():啟動進程,並調用該子進程中的p.run() 
2 p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法  
3 p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖
4 p.is_alive():如果p仍然運行,返回True
5 p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程  
復制代碼
1 p.daemon:默認值為False,如果設為True,代表p為后台運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True后,p不能創建自己的新進程,必須在p.start()之前設置
2 p.name:進程的名稱
3 p.pid:進程的pid
4 p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可)
5 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)
復制代碼
1 p.daemon:默認值為False,如果設為True,代表p為后台運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True后,p不能創建自己的新進程,必須在p.start()之前設置
2 p.name:進程的名稱
3 p.pid:進程的pid
4 p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可)
5 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)
復制代碼

在Windows操作系統中由於沒有fork(linux操作系統中創建進程的機制),在創建子進程的時候會自動 import 啟動它的這個文件,而在 import 的時候又執行了整個文件。因此如果將process()直接寫在文件中就會無限遞歸創建子進程報錯。所以必須把創建子進程的部分使用if __name__ ==‘__main__’ 判斷保護起來,import 的時候 ,就不會遞歸運行了。
在Windows操作系統中由於沒有fork(linux操作系統中創建進程的機制),在創建子進程的時候會自動 import 啟動它的這個文件,而在 import 的時候又執行了整個文件。因此如果將process()直接寫在文件中就會無限遞歸創建子進程報錯。所以必須把創建子進程的部分使用if __name__ ==‘__main__’ 判斷保護起來,import 的時候  ,就不會遞歸運行了。

 

使用process模塊創建進程

在一個python進程中開啟子進程,start方法和並發效果。

import time
from multiprocessing import Process

def f(name):
    print('hello', name)
    print('我是子進程')

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    time.sleep(1)
    print('執行主進程的內容了')
復制代碼
import time
from multiprocessing import Process

def f(name):
    print('hello', name)
    print('我是子進程')

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    time.sleep(1)
    print('執行主進程的內容了')
    
復制代碼
import time
from multiprocessing import Process

def f(name):
    print('hello', name)
    time.sleep(1)
    print('我是子進程')


if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    #p.join()
    print('我是父進程')
復制代碼
import time
from multiprocessing import Process

def f(name):
    print('hello', name)
    time.sleep(1)
    print('我是子進程')


if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    #p.join()
    print('我是父進程')
復制代碼
import os
from multiprocessing import Process

def f(x):
    print('子進程id :',os.getpid(),'父進程id :',os.getppid())
    return x*x

if __name__ == '__main__':
    print('主進程id :', os.getpid())
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=(i,))
        p.start()
復制代碼
import os
from multiprocessing import Process

def f(x):
    print('子進程id :',os.getpid(),'父進程id :',os.getppid())
    return x*x

if __name__ == '__main__':
    print('主進程id :', os.getpid())
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=(i,))
        p.start()
復制代碼

 

進階,多個進程同時運行(注意,子進程的執行順序不是根據啟動順序決定的)

  多個進程同時運行
import time
from multiprocessing import Process


def f(name):
    print('hello', name)
    time.sleep(1)


if __name__ == '__main__':
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=('bob',))
        p.start()
        p_lst.append(p)
  多個進程同時運行,再談join方法(1)
import time
from multiprocessing import Process


def f(name):
    print('hello', name)
    time.sleep(1)


if __name__ == '__main__':
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=('bob',))
        p.start()
        p_lst.append(p)
        p.join()
    # [p.join() for p in p_lst]
    print('父進程在執行')
  多個進程同時運行,再談join方法(2)
import time
from multiprocessing import Process

def f(name):
    print('hello', name)
    time.sleep(1)

if __name__ == '__main__':
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=('bob',))
        p.start()
        p_lst.append(p)
    # [p.join() for p in p_lst]
    print('父進程在執行')

 

除了上面這些開啟進程的方法,還有一種以繼承Process類的形式開啟進程的方式

  通過繼承Process類開啟進程
import os
from multiprocessing import Process


class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print(os.getpid())
        print('%s 正在和女主播聊天' %self.name)

p1=MyProcess('wupeiqi')
p2=MyProcess('yuanhao')
p3=MyProcess('nezha')

p1.start() #start會自動調用run
p2.start()
# p2.run()
p3.start()


p1.join()
p2.join()
p3.join()

print('主線程')

 

進程之間的數據隔離問題

  進程之間的數據隔離問題
from multiprocessing import Process

def work():
    global n
    n=0
    print('子進程內: ',n)


if __name__ == '__main__':
    n = 100
    p=Process(target=work)
    p.start()
    print('主進程內: ',n)

守護進程

會隨着主進程的結束而結束。

主進程創建守護進程

  其一:守護進程會在主進程代碼執行結束后就終止

  其二:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children

注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止

  守護進程的啟動
import os
import time
from multiprocessing import Process

class Myprocess(Process):
    def __init__(self,person):
        super().__init__()
        self.person = person
    def run(self):
        print(os.getpid(),self.name)
        print('%s正在和女主播聊天' %self.person)


p=Myprocess('哪吒')
p.daemon=True #一定要在p.start()前設置,設置p為守護進程,禁止p創建子進程,並且父進程代碼執行結束,p即終止運行
p.start()
time.sleep(10) # 在sleep時查看進程id對應的進程ps -ef|grep id
print('主')
  主進程代碼執行結束守護進程立即結束
from multiprocessing import Process

def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


p1=Process(target=foo)
p2=Process(target=bar)

p1.daemon=True
p1.start()
p2.start()
time.sleep(0.1)
print("main-------")#打印該行則主進程代碼結束,則守護進程p1應該被終止.#可能會有p1任務執行的打印信息123,因為主進程打印main----時,p1也執行了,但是隨即被終止.

socket聊天並發實例

from socket import *
from multiprocessing import Process

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn,client_addr):
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__': #windows下start進程一定要寫到這下面
    while True:
        conn,client_addr=server.accept()
        p=Process(target=talk,args=(conn,client_addr))
        p.start()
復制代碼
from socket import *
from multiprocessing import Process

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn,client_addr):
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__': #windows下start進程一定要寫到這下面
    while True:
        conn,client_addr=server.accept()
        p=Process(target=talk,args=(conn,client_addr))
        p.start()
復制代碼
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))
復制代碼
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))
復制代碼

多進程中的其他方法

from multiprocessing import Process
import time
import random

class Myprocess(Process):
    def __init__(self,person):
        self.name=person
        super().__init__()

    def run(self):
        print('%s正在和網紅臉聊天' %self.name)
        time.sleep(random.randrange(1,5))
        print('%s還在和網紅臉聊天' %self.name)


p1=Myprocess('哪吒')
p1.start()

p1.terminate()#關閉進程,不會立即關閉,所以is_alive立刻查看的結果可能還是存活
print(p1.is_alive()) #結果為True

print('開始')
print(p1.is_alive()) #結果為False
from multiprocessing import Process
import time
import random

class Myprocess(Process):
    def __init__(self,person):
        self.name=person
        super().__init__()

    def run(self):
        print('%s正在和網紅臉聊天' %self.name)
        time.sleep(random.randrange(1,5))
        print('%s還在和網紅臉聊天' %self.name)


p1=Myprocess('哪吒')
p1.start()

p1.terminate()#關閉進程,不會立即關閉,所以is_alive立刻查看的結果可能還是存活
print(p1.is_alive()) #結果為True

print('開始')
print(p1.is_alive()) #結果為False
復制代碼
 1 class Myprocess(Process):
 2     def __init__(self,person):
 3         self.name=person   # name屬性是Process中的屬性,標示進程的名字
 4         super().__init__() # 執行父類的初始化方法會覆蓋name屬性
 5         #self.name = person # 在這里設置就可以修改進程名字了
 6         #self.person = person #如果不想覆蓋進程名,就修改屬性名稱就可以了
 7     def run(self):
 8         print('%s正在和網紅臉聊天' %self.name)
 9         # print('%s正在和網紅臉聊天' %self.person)
10         time.sleep(random.randrange(1,5))
11         print('%s正在和網紅臉聊天' %self.name)
12         # print('%s正在和網紅臉聊天' %self.person)
13 
14 
15 p1=Myprocess('哪吒')
16 p1.start()
17 print(p1.pid)    #可以查看子進程的進程id
復制代碼
 1 class Myprocess(Process):
 2     def __init__(self,person):
 3         self.name=person   # name屬性是Process中的屬性,標示進程的名字
 4         super().__init__() # 執行父類的初始化方法會覆蓋name屬性
 5         #self.name = person # 在這里設置就可以修改進程名字了
 6         #self.person = person #如果不想覆蓋進程名,就修改屬性名稱就可以了
 7     def run(self):
 8         print('%s正在和網紅臉聊天' %self.name)
 9         # print('%s正在和網紅臉聊天' %self.person)
10         time.sleep(random.randrange(1,5))
11         print('%s正在和網紅臉聊天' %self.name)
12         # print('%s正在和網紅臉聊天' %self.person)
13 
14 
15 p1=Myprocess('哪吒')
16 p1.start()
17 print(p1.pid)    #可以查看子進程的進程id
復制代碼


進程同步(multiprocess.Lock、multiprocess.Semaphore、multiprocess.Event)

鎖 —— multiprocess.Lock

 通過剛剛的學習,我們千方百計實現了程序的異步,讓多個任務可以同時在幾個進程中並發處理,他們之間的運行沒有順序,一旦開啟也不受我們控制。盡管並發編程讓我們能更加充分的利用IO資源,但是也給我們帶來了新的問題。

  當多個進程使用同一份數據資源的時候,就會引發數據安全或順序混亂問題。

  多進程搶占輸出資源
import os
import time
import random
from multiprocessing import Process

def work(n):
    print('%s: %s is running' %(n,os.getpid()))
    time.sleep(random.random())
    print('%s:%s is done' %(n,os.getpid()))

if __name__ == '__main__':
    for i in range(3):
        p=Process(target=work,args=(i,))
        p.start()
  使用鎖維護執行順序
# 由並發變成了串行,犧牲了運行效率,但避免了競爭
import os
import time
import random
from multiprocessing import Process,Lock

def work(lock,n):
    lock.acquire()
    print('%s: %s is running' % (n, os.getpid()))
    time.sleep(random.random())
    print('%s: %s is done' % (n, os.getpid()))
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    for i in range(3):
        p=Process(target=work,args=(lock,i))
        p.start()

  上面這種情況雖然使用加鎖的形式實現了順序的執行,但是程序又重新變成串行了,這樣確實會浪費了時間,卻保證了數據的安全。

  接下來,我們以模擬搶票為例,來看看數據安全的重要性。 

  多進程同時搶購余票
#文件db的內容為:{"count":1}
#注意一定要用雙引號,不然json無法識別
#並發運行,效率高,但競爭寫同一文件,數據寫入錯亂
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db'))
    print('\033[43m剩余票數%s\033[0m' %dic['count'])

def get():
    dic=json.load(open('db'))
    time.sleep(0.1) #模擬讀數據的網絡延遲
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(0.2) #模擬寫數據的網絡延遲
        json.dump(dic,open('db','w'))
        print('\033[43m購票成功\033[0m')

def task():
    search()
    get()

if __name__ == '__main__':
    for i in range(100): #模擬並發100個客戶端搶票
        p=Process(target=task)
        p.start()
  使用鎖來保證數據安全
#文件db的內容為:{"count":5}
#注意一定要用雙引號,不然json無法識別
#並發運行,效率高,但競爭寫同一文件,數據寫入錯亂
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db'))
    print('\033[43m剩余票數%s\033[0m' %dic['count'])

def get():
    dic=json.load(open('db'))
    time.sleep(random.random()) #模擬讀數據的網絡延遲
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(random.random()) #模擬寫數據的網絡延遲
        json.dump(dic,open('db','w'))
        print('\033[32m購票成功\033[0m')
    else:
        print('\033[31m購票失敗\033[0m')

def task(lock):
    search()
    lock.acquire()
    get()
    lock.release()

if __name__ == '__main__':
    lock = Lock()
    for i in range(100): #模擬並發100個客戶端搶票
        p=Process(target=task,args=(lock,))
        p.start()

 

復制代碼
#加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然可以用文件共享數據實現進程間通信,但問題是:
1.效率低(共享數據基於文件,而文件是硬盤上的數據)
2.需要自己加鎖處理

#因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個進程共享一塊內存的數據)2、幫我們處理好鎖問題。這就是mutiprocessing模塊為我們提供的基於消息的IPC通信機制:隊列和管道。
隊列和管道都是將數據存放於內存中
隊列又是基於(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來,
我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。
復制代碼

信號量 —— multiprocess.Semaphore(了解)

  信號量介紹Semaphore
互斥鎖同時只允許一個線程更改數據,而信號量Semaphore是同時允許一定數量的線程更改數據 。
假設商場里有4個迷你唱吧,所以同時可以進去4個人,如果來了第五個人就要在外面等待,等到有人出來才能再進去玩。
實現:
信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器為0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。
信號量與進程池的概念很像,但是要區分開,信號量涉及到加鎖的概念
  例子
from multiprocessing import Process,Semaphore
import time,random

def go_ktv(sem,user):
    sem.acquire()
    print('%s 占到一間ktv小屋' %user)
    time.sleep(random.randint(0,3)) #模擬每個人在ktv中待的時間不同
    sem.release()

if __name__ == '__main__':
    sem=Semaphore(4)
    p_l=[]
    for i in range(13):
        p=Process(target=go_ktv,args=(sem,'user%s' %i,))
        p.start()
        p_l.append(p)

    for i in p_l:
        i.join()
    print('============》')
復制代碼

事件 —— multiprocess.Event(了解)

  事件介紹
python線程的事件用於主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。

    事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么當程序執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那么event.wait 方法時便不再阻塞。

clear:將“Flag”設置為False
set:將“Flag”設置為True

 

  紅綠燈實例
from multiprocessing import Process, Event
import time, random


def car(e, n):
    while True:
        if not e.is_set():  # 進程剛開啟,is_set()的值是Flase,模擬信號燈為紅色
            print('\033[31m紅燈亮\033[0m,car%s等着' % n)
            e.wait()    # 阻塞,等待is_set()的值變成True,模擬信號燈為綠色
            print('\033[32m車%s 看見綠燈亮了\033[0m' % n)
            time.sleep(random.randint(3, 6))
            if not e.is_set():   #如果is_set()的值是Flase,也就是紅燈,仍然回到while語句開始
                continue
            print('車開遠了,car', n)
            break


def police_car(e, n):
    while True:
        if not e.is_set():# 進程剛開啟,is_set()的值是Flase,模擬信號燈為紅色
            print('\033[31m紅燈亮\033[0m,car%s等着' % n)
            e.wait(0.1) # 阻塞,等待設置等待時間,等待0.1s之后沒有等到綠燈就闖紅燈走了
            if not e.is_set():
                print('\033[33m紅燈,警車先走\033[0m,car %s' % n)
            else:
                print('\033[33;46m綠燈,警車走\033[0m,car %s' % n)
        break



def traffic_lights(e, inverval):
    while True:
        time.sleep(inverval)
        if e.is_set():
            print('######', e.is_set())
            e.clear()  # ---->將is_set()的值設置為False
        else:
            e.set()    # ---->將is_set()的值設置為True
            print('***********',e.is_set())


if __name__ == '__main__':
    e = Event()
    for i in range(10):
        p=Process(target=car,args=(e,i,))  # 創建是個進程控制10輛車
        p.start()

    for i in range(5):
        p = Process(target=police_car, args=(e, i,))  # 創建5個進程控制5輛警車
        p.start()
    t = Process(target=traffic_lights, args=(e, 10))  # 創建一個進程控制紅綠燈
    t.start()

    print('============》')

 

 



進程間通信——隊列和管道(multiprocess.Queue、multiprocess.Pipe)

進程間通信

IPC(Inter-Process Communication)

隊列 

概念介紹

創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。 

Queue([maxsize]) 
創建共享的進程隊列。
參數 :maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。
  方法介紹
Queue([maxsize]) 
創建共享的進程隊列。maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還需要運行支持線程以便隊列中的數據傳輸到底層管道中。 
Queue的實例q具有以下方法:

q.get( [ block [ ,timeout ] ] ) 
返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。block用於控制阻塞行為,默認為True. 如果設置為False,將引發Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有項目變為可用,將引發Queue.Empty異常。

q.get_nowait( ) 
同q.get(False)方法。

q.put(item [, block [,timeout ] ] ) 
將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認為True。如果設置為False,將引發Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時后將引發Queue.Full異常。

q.qsize() 
返回隊列中目前項目的正確數量。此函數的結果並不可靠,因為在返回結果和在稍后程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引發NotImplementedError異常。


q.empty() 
如果調用此方法時 q為空,返回True。如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。

q.full() 
如果q已滿,返回為True. 由於線程的存在,結果也可能是不可靠的(參考q.empty()方法)。。
  其他方法(了解)復制代碼
q.close() 
關閉隊列,防止隊列中加入更多數據。調用此方法時,后台線程將繼續寫入那些已入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。

q.cancel_join_thread() 
不會再進程退出時自動連接后台線程。這可以防止join_thread()方法阻塞。

q.join_thread() 
連接隊列的后台線程。此方法用於在調用q.close()方法后,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread()方法可以禁止這種行為。

代碼實例

  單看隊列用法復制代碼
'''
multiprocessing模塊支持進程間通信的兩種主要形式:管道和隊列
都是基於消息傳遞實現的,但是隊列接口
'''

from multiprocessing import Queue
q=Queue(3)

#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
# q.put(3)   # 如果隊列已經滿了,程序就會停在這里,等待數據被別人取走,再將數據放入隊列。
           # 如果隊列中的數據一直不被取走,程序就會永遠停在這里。
try:
    q.put_nowait(3) # 可以使用put_nowait,如果隊列滿了不會阻塞,但是會因為隊列滿了而報錯。
except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,但是會丟掉這個消息。
    print('隊列已經滿了')

# 因此,我們再放入數據之前,可以先看一下隊列的狀態,如果已經滿了,就不繼續put了。
print(q.full()) #滿了

print(q.get())
print(q.get())
print(q.get())
# print(q.get()) # 同put方法一樣,如果隊列已經空了,那么繼續取就會出現阻塞。
try:
    q.get_nowait(3) # 可以使用get_nowait,如果隊列滿了不會阻塞,但是會因為沒取到值而報錯。
except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。
    print('隊列已經空了')

print(q.empty()) #空了

上面這個例子還沒有加入進程通信,只是先來看看隊列為我們提供的方法,以及這些方法的使用和現象。

  子進程發送數據給父進程
import time
from multiprocessing import Process, Queue

def f(q):
    q.put([time.asctime(), 'from Eva', 'hello'])  #調用主函數中p進程傳遞過來的進程參數 put函數為向隊列中添加一條數據。

if __name__ == '__main__':
    q = Queue() #創建一個Queue對象
    p = Process(target=f, args=(q,)) #創建一個進程
    p.start()
    print(q.get())
    p.join()

上面是一個queue的簡單應用,使用隊列q對象調用get函數來取得隊列中最先進入的數據。 接下來看一個稍微復雜一些的例子:

  批量生產數據放入隊列再批量獲取結果 x復制代碼
import os
import time
import multiprocessing

# 向queue中輸入數據的函數
def inputQ(queue):
    info = str(os.getpid()) + '(put):' + str(time.asctime())
    queue.put(info)

# 向queue中輸出數據的函數
def outputQ(queue):
    info = queue.get()
    print ('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info))

# Main
if __name__ == '__main__':
    multiprocessing.freeze_support()
    record1 = []   # store input processes
    record2 = []   # store output processes
    queue = multiprocessing.Queue(3)

    # 輸入進程
    for i in range(10):
        process = multiprocessing.Process(target=inputQ,args=(queue,))
        process.start()
        record1.append(process)

    # 輸出進程
    for i in range(10):
        process = multiprocessing.Process(target=outputQ,args=(queue,))
        process.start()
        record2.append(process)

    for p in record1:
        p.join()

    for p in record2:
        p.join()

生產者消費者模型

在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。

為什么要使用生產者和消費者模式

在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什么是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

基於隊列實現生產者消費者模型
  基於隊列實現生產者消費者模型
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()
    print('主')

此時的問題是主進程永遠不會結束,原因是:生產者p在生產完后就結束了,但是消費者c在取空了q之后,則一直處於死循環中且卡在q.get()這一步。

解決方式無非是讓生產者在生產完畢后,往隊列中再發一個結束信號,這樣消費者在接收到結束信號后就可以break出死循環。

  改良版——生產者消費者模型
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
    q.put(None) #發送結束信號
if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()
    print('主')

注意:結束信號None,不一定要由生產者發,主進程里同樣可以發,但主進程需要等生產者結束后才應該發送該信號

  主進程在生產者生產完畢后發送結束信號None
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()

    p1.join()
    q.put(None) #發送結束信號
    print('主')

但上述解決方式,在有多個生產者和多個消費者時,我們則需要用一個很low的方式去解決

  多個消費者的例子:有幾個消費者就需要發送幾次結束信號
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(name,q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res='%s%s' %(name,i)
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨頭',q))
    p3=Process(target=producer,args=('泔水',q))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    p2.start()
    p3.start()
    c1.start()

    p1.join() #必須保證生產者全部生產完畢,才應該發送結束信號
    p2.join()
    p3.join()
    q.put(None) #有幾個消費者就應該發送幾次結束信號None
    q.put(None) #發送結束信號
    print('主')

 

JoinableQueue([maxsize]) 
創建可連接的共享進程隊列。這就像是一個Queue對象,但隊列允許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。 

  方法介紹
JoinableQueue的實例p除了與Queue對象相同的方法之外,還具有以下方法:

q.task_done() 
使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。如果調用此方法的次數大於從隊列中刪除的項目數量,將引發ValueError異常。

q.join() 
生產者將使用此方法進行阻塞,直到隊列中所有項目均被處理。阻塞將持續到為隊列中的每個項目均調用q.task_done()方法為止。 
下面的例子說明如何建立永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,並等待它們被處理。
  JoinableQueue隊列實現消費之生產者模型
from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
        q.task_done() #向q.join()發送一次信號,證明一個數據已經被取走了

def producer(name,q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='%s%s' %(name,i)
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
    q.join() #生產完畢,使用此方法進行阻塞,直到隊列中所有項目均被處理。


if __name__ == '__main__':
    q=JoinableQueue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨頭',q))
    p3=Process(target=producer,args=('泔水',q))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))
    c1.daemon=True
    c2.daemon=True

    #開始
    p_l=[p1,p2,p3,c1,c2]
    for p in p_l:
        p.start()

    p1.join()
    p2.join()
    p3.join()
    print('主') 
    
    #主進程等--->p1,p2,p3等---->c1,c2
    #p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發到隊列的數據
    #因而c1,c2也沒有存在的價值了,不需要繼續阻塞在進程中影響主進程了。應該隨着主進程的結束而結束,所以設置成守護進程就可以了。

管道(了解)

  介紹
#創建管道的類:
Pipe([duplex]):在進程之間創建一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對象,強調一點:必須在產生Process對象之前產生管道
#參數介紹:
dumplex:默認管道是全雙工的,如果將duplex射成False,conn1只能用於接收,conn2只能用於發送。
#主要方法:
    conn1.recv():接收conn2.send(obj)發送的對象。如果沒有消息可接收,recv方法會一直阻塞。如果連接的另外一端已經關閉,那么recv方法會拋出EOFError。
    conn1.send(obj):通過連接發送對象。obj是與序列化兼容的任意對象
 #其他方法:
conn1.close():關閉連接。如果conn1被垃圾回收,將自動調用此方法
conn1.fileno():返回連接使用的整數文件描述符
conn1.poll([timeout]):如果連接上的數據可用,返回True。timeout指定等待的最長時限。如果省略此參數,方法將立即返回結果。如果將timeout射成None,操作將無限期地等待數據到達。
 
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。如果進入的消息,超過了這個最大值,將引發IOError異常,並且在連接上無法進行進一步讀取。如果連接的另外一端已經關閉,再也不存在任何數據,將引發EOFError異常。
conn.send_bytes(buffer [, offset [, size]]):通過連接發送字節數據緩沖區,buffer是支持緩沖區接口的任意對象,offset是緩沖區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,然后調用c.recv_bytes()函數進行接收    
 
conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩沖區接口(即bytearray對象或類似的對象)。offset指定緩沖區中放置消息處的字節位移。返回值是收到的字節數。如果消息長度大於可用的緩沖區空間,將引發BufferTooShort異常。
  pipe初使用
from multiprocessing import Process, Pipe


def f(conn):
    conn.send("Hello The_Third_Wave")
    conn.close()


if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    p.join()

應該特別注意管道端點的正確管理問題。如果是生產者或消費者中都沒有使用管道的某個端點,就應將它關閉。這也說明了為何在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。如果忘記執行這些步驟,程序可能在消費者中的recv()操作上掛起。管道是由操作系統進行引用計數的,必須在所有進程中關閉管道后才能生成EOFError異常。因此,在生產者中關閉管道不會有任何效果,除非消費者也關閉了相同的管道端點。 

  引發EOFError
from multiprocessing import Process, Pipe

def f(parent_conn,child_conn):
    #parent_conn.close() #不寫close將不會引發EOFError
    while True:
        try:
            print(child_conn.recv())
        except EOFError:
            child_conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(parent_conn,child_conn,))
    p.start()
    child_conn.close()
    parent_conn.send('hello')
    parent_conn.close()
    p.join()
  pipe實現生產者消費者模型
from multiprocessing import Process,Pipe

def consumer(p,name):
    produce, consume=p
    produce.close()
    while True:
        try:
            baozi=consume.recv()
            print('%s 收到包子:%s' %(name,baozi))
        except EOFError:
            break

def producer(seq,p):
    produce, consume=p
    consume.close()
    for i in seq:
        produce.send(i)

if __name__ == '__main__':
    produce,consume=Pipe()

    c1=Process(target=consumer,args=((produce,consume),'c1'))
    c1.start()


    seq=(i for i in range(10))
    producer(seq,(produce,consume))

    produce.close()
    consume.close()

    c1.join()
    print('主進程')
  多個消費之之間的競爭問題帶來的數據不安全問題
from multiprocessing import Process,Pipe,Lock

def consumer(p,name,lock):
    produce, consume=p
    produce.close()
    while True:
        lock.acquire()
        baozi=consume.recv()
        lock.release()
        if baozi:
            print('%s 收到包子:%s' %(name,baozi))
        else:
            consume.close()
            break


def producer(p,n):
    produce, consume=p
    consume.close()
    for i in range(n):
        produce.send(i)
    produce.send(None)
    produce.send(None)
    produce.close()

if __name__ == '__main__':
    produce,consume=Pipe()
    lock = Lock()
    c1=Process(target=consumer,args=((produce,consume),'c1',lock))
    c2=Process(target=consumer,args=((produce,consume),'c2',lock))
    p1=Process(target=producer,args=((produce,consume),10))
    c1.start()
    c2.start()
    p1.start()

    produce.close()
    consume.close()

    c1.join()
    c2.join()
    p1.join()
    print('主進程')
復制代碼


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM