python多進程詳解


仔細說來,multiprocess不是一個模塊而是python中一個操作、管理進程的包。 之所以叫multi是取自multiple的多功能的意思,在這個包中幾乎包含了和進程有關的所有子模塊。由於提供的子模塊非常多,為了方便大家歸類記憶,我將這部分大致分為四個部分:創建進程部分進程同步部分進程池部分進程之間數據共享。重點強調:進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內,但是通過一些特殊的方法,可以實現進程之間數據的共享。

一、process模塊介紹

process模塊是一個創建進程的模塊,借助這個模塊,就可以完成進程的創建。

使用:

Process([group [, target [, name [, args [, kwargs]]]]]),

由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動)

強調:
1. 需要使用關鍵字的方式來指定參數
2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號

1.1 進程創建的第一種方法

程序示例:

from multiprocessing import Process

def func():
    print(12345)

if __name__ == '__main__':
    p = Process(target=func,)
    p.start()
    print('*'*10)

 

功能講解

if __name__ == '__main__':
# windows 下才需要寫這個,這和系統創建進程的機制有關系,不用深究,記着windows下要寫就好啦
     #首先我運行當前這個test.py文件,運行這個文件的程序,那么就產生了進程,這個進程我們稱為主進程

    p = Process(target=func,) #將函數注冊到一個進程中,p是一個進程對象,此時還沒有啟動進程,只是創建了一個進程對象。並且func是不加括號的,因為加上括號這個函數就直接運行了對吧。

    p.start() #告訴操作系統,給我開啟一個進程,func這個函數就被我們新開的這個進程執行了,而這個進程是我主進程運行過程中創建出來的,所以稱這個新創建的進程為主進程的子進程,而主進程又可以稱為這個新進程的父進程。
          #而這個子進程中執行的程序,相當於將現在這個test.py文件中的程序copy到一個你看不到的python文件中去執行了,就相當於當前這個文件,被另外一個py文件import過去並執行了。
          #start並不是直接就去執行了,我們知道進程有三個狀態,進程會進入進程的三個狀態,就緒,(被調度,也就是時間片切換到它的時候)執行,阻塞,並且在這個三個狀態之間不斷的轉換,等待cpu執行時間片到了。

    print('*' * 10) #這是主進程的程序,上面開啟的子進程的程序是和主進程的程序同時運行的,我們稱為異步

 

上面我們說過,我們通過主進程創建的子進程是異步執行的,那么我們就驗證一下,並且看一下子進程和主進程(也就是父進程)的ID號(講一下pidppid,使用pycharm舉例),來看看是否是父子關系。

from multiprocessing import Process
import time
import os

def func():
    print('aaaaa')
    time.sleep(1)
    print('該子進程ID:', os.getpid())  #獲取自己的進程ID號
    print('該子進程的父進程ID:', os.getppid())  #獲取自己進程的父進程ID
    print(12345)

if __name__ == '__main__':
    p = Process(target=func,)
    p.start()
    print('*'*10)
    print('父進程ID>>>', os.getpid())
    print('父進程的父進程ID>>>', os.getpid())
打印結果:

**********
父進程ID>>> 57235
父進程的父進程ID>>> 57235
aaaaa
該子進程ID: 57236
該子進程的父進程ID: 57235
12345

# 首先打印出來了主進程的程序,然后打印的是子進程的,也就是子進程是異步執行的,相當於主進程和子進程同時運行着,如果是同步的話,我們先執行的是func(),然后再打印主進程最后的10個*號。

 

一個進程的生命周期:

如果子進程的運行時間長,那么等到子進程執行結束程序才結束,如果主進程的執行時間長,那么主進程執行結束程序才結束,實際上我們在子進程中打印的內容是在主進程的執行結果中看不出來的,但是pycharm幫我們做了優化,因為它會識別到你這是開的子進程,幫你把子進程中打印的內容打印到了顯示台上。

​ 如果說一個主進程運行完了之后,我們把pycharm關了,但是子進程還沒有執行結束,那么子進程還存在嗎?這要看你的進程是如何配置的,如果說我們沒有配置說我主進程結束,子進程要跟着結束,那么主進程結束的時候,子進程是不會跟着結束的,他會自己執行完,如果我設定的是主進程結束,子進程必須跟着結束,那么就不會出現單獨的子進程(孤兒進程)了,具體如何設置,看下面的守護進程的講解。比如說,我們將來啟動項目的時候,可能通過cmd來啟動,那么我cmd關閉了你的項目就會關閉嗎,不會的,因為你的項目不能停止對外的服務,對吧。

1.2 Process類中參數的介紹:

參數介紹:
1 group參數未使用,值始終為None
2 target表示調用對象,即子進程要執行的任務
3 args表示調用對象的位置參數元組,args=(1,2,'egon',)
4 kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}
5 name為子進程的名稱
給要執行的函數傳參數:

def func(x, y):
    print(x)
    time.sleep(1)
    print(y)

if __name__ == '__main__':
    p = Process(target=func, args=('hello', 'world'))  # 這是func需要接收的參數的傳送方式。
    p.start()
    print('父進程執行結束!')

# 執行結果:
父進程執行結束!
hello
world

 

1.3 Process類中各方法的介紹

  1. p.start():啟動進程,並調用該子進程中的p.run()

  2. p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法

  3. p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖

  4. p.is_alive():如果p仍然運行,返回True

  5. p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能joinstart開啟的進程,而不能join住run開啟的進程

join方法的例子

讓主進程加上join的地方等待(也就是阻塞住),等待子進程執行完之后,再繼續往下執行我的主進程,好多時候,我們主進程需要子進程的執行結果,所以必須要等待。join感覺就像是將子進程和主進程拼接起來一樣,將異步改為同步執行。

from multiprocessing import Process
import time

def func(x, y):
    print(x)
    time.sleep(1)
    print(y)

if __name__ == '__main__':
    p = Process(target=func, args=('hello', 'world'))  # 這是func需要接收的參數的傳送方式。
    p.start()
    print("我這里是異步的!")
    p.join()  # 只有在join的地方才會阻塞住,將子進程和主進程之間的異步改為同步
    print('父進程執行結束!')

# 執行結果
我這里是異步的!
hello
world
父進程執行結束!

 

開啟多個進程

for循環。並且我有個需求就是說,所有的子進程異步執行,然后所有的子進程全部執行完之后,我再執行主進程,怎么搞?看代碼


1.4 Process類中自帶封裝的各屬性的介紹

  1. p.daemon:默認值為False,如果設為True,代表p為后台運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True后,p不能創建自己的新進程,必須在p.start()之前設置

  2. p.name:進程的名稱

  3. p.pid:進程的pid

  4. p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可)

  5. p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)

二、process類的使用

注意:

Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module.
If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources).
This is the reason for hiding calls to Process() inside
if __name__ == "__main__"
since statements inside this if-statement will not get called upon import.

由於Windows沒有fork,多處理模塊啟動一個新的Python進程並導入調用模塊。
如果在導入時調用Process(),那么這將啟動無限繼承的新進程(或直到機器耗盡資源)。
這是隱藏對Process()內部調用的原,使用if __name__ == “__main __”,這個if語句中的語句將不會在導入時被調用。

 

在windows中Process()必須放到 if __name__ == '__main__':

2.1 進程的創建第二種方法(繼承)

from multiprocessing import Process
import os

class MyProcess(Process):  # 自己寫一個類,繼承Process類
    # 我們通過init方法可以傳參數,如果只寫一個run方法,那么沒法傳參數,因為創建對象的是傳參就是在init方法里面
    def __init__(self, person):
        super().__init__()
        self.person = person

    def run(self):
        print(os.getpid())
        print(self.pid)
        print(self.pid)
        print('%s 正在和女主播聊天' % self.person)
    # def start(self):
    #     #如果你非要寫一個start方法,可以這樣寫,並且在run方法前后,可以寫一些其他的邏輯
    #     self.run()

if __name__ == '__main__':
    p1 = MyProcess('Jedan')
    p2 = MyProcess('太白')
    p3 = MyProcess('alexDSB')

    p1.start()  # start內部會自動調用run方法
    p2.start()
    # p2.run()
    p3.start()

    p1.join()
    p2.join()
    p3.join()
運行結果:

57683
57683
57683
Jedan 正在和女主播聊天
57684
57684
57684
太白 正在和女主播聊天
57685
57685
57685
alexDSB 正在和女主播聊天

 

2.2 進程之間的數據隔離

進程之間的數據是隔離的,也就是數據不共享,下面的是驗證:

from multiprocessing import Process

n = 100
# 首先定義一個全局變量,在windows系統中應該把全局變量定義在if __name__ == '__main__'之上

def work():
    global n
    n = 0
    print('子進程內: ', n)

if __name__ == '__main__':
    p = Process(target=work,)
    p.start()
    p.join()  # 等待子進程執行完畢,如果數據共享的話,我子進程是不是通過global將n改為0了,但是你看打印結果,主進程在子進程執行結束之后,仍然是n=100,子進程n=0,說明子進程對n的修改沒有在主進程中生效,說明什么?說明他們之間的數據是隔離的,互相不影響的
    print('主進程內: ', n)

# 打印結果
子進程內:  0
主進程內:  100

 

2.3 練習:多進程實現socket多客戶端通訊

我們之前學socket的時候,知道tcp協議的socket是不能同時和多個客戶端進行連接的,(這里先不考慮socketserver那個模塊),那我們自己通過多進程來實現一下同時和多個客戶端進行連接通信:

服務端代碼示例:

from socket import *
from multiprocessing import Process

def talk(conn, client_addr):
    while True:
        try:
            msg = conn.recv(1024)
            print('客戶端消息>>', msg)
            if not msg: break
            conn.send(msg.upper())
            # 在這里有同學可能會想,我能不能在這里寫input來自己輸入內容和客戶端進行對話?朋友,是這樣的,按說是可以的,但是需要什么呢?需要你像我們用pycharm的是一樣下面有一個輸入內容的控制台,當我們的子進程去執行的時候,我們是沒有地方可以顯示能夠讓你輸入內容的控制台的,所以你沒辦法輸入,就會給你報錯。
        except Exception:
            break

if __name__ == '__main__':  # windows下start進程一定要寫到這下面
    server = socket(AF_INET, SOCK_STREAM)
    # server.setsockopt(SOL_SOCKET, SO_REUSEADDR,1)  # 如果你將如果你將bind這些代碼寫到if __name__ == '__main__'這行代碼的上面,那么地址重用必須要有,因為我們知道windows創建的子進程是對整個當前文件的內容進行的copy,前面說了就像import,如果你開啟了子進程,那么子進程是會執行bind的,那么你的主進程bind了這個ip和端口,子進程在進行bind的時候就會報錯。
    server.bind(('127.0.0.1', 8080))
    # 有同學可能還會想,我為什么多個進程就可以連接一個server段的一個ip和端口了呢,我記得當時說tcp的socket的時候,我是不能在你這個ip和端口被連接的情況下再連接你的啊,這里是因為當時我們就是一個進程,一個進程里面是只能一個連接的,多進程是可以多連接的,這和進程之間是單獨的內存空間有關系,先這樣記住他,好嗎?
    server.listen(5)
    while True:
        conn, client_addr = server.accept()
        p = Process(target=talk, args=(conn, client_addr))
        p.start()

 

(注意:通過這個是不能做qq聊天的,因為qq聊天是qq的客戶端把信息發給另外一個qq的客戶端,中間有一個服務端幫你轉發消息,而不是我們這樣的單純的客戶端和服務端對話,並且子進程開啟之后咱們是沒法操作的,並且沒有為子進程input輸入提供控制台,所有你再在子進程中寫上了input會報錯,EOFError錯誤,這個錯誤的意思就是你的input需要輸入,但是你輸入不了,就會報這個錯誤。而子進程的輸出打印之類的,是pycharm做了優化,將所有子進程中的輸出結果幫你打印出來了,但實質還是不同進程的。)

客戶端代碼示例:

from socket import *

client = socket(AF_INET, SOCK_STREAM)
client.connect(('127.0.0.1', 8080))

while True:
    msg = input('>>: ').strip()
    if not msg: continue

    client.send(msg.encode('utf-8'))
    msg = client.recv(1024)
    print(msg.decode('utf-8'))

 

2.4 Process對象的其他方法或屬性

進程對象的其他方法一: terminate, is_alive

from multiprocessing import Process
import time
import random

class Student(Process):
    def __init__(self, name):
        self.name = name
        super().__init__()

    def run(self):
        print('%s is 做作業' % self.name)
        # s = input('???') #別忘了再pycharm下子進程中不能input輸入,會報錯EOFError: EOF when reading a line,因為子進程中沒有像我們主進程這樣的在pycharm下的控制台可以輸入東西的地方
        time.sleep(2)
        print('%s is 做作業結束' % self.name)

if __name__ == '__main__':
    p1 = Student('太白')
    p1.start()
    time.sleep(2)
    p1.terminate()  # 關閉進程,不會立即關閉,有個等着操作系統去關閉這個進程的時間,所以is_alive立刻查看的結果可能還是存活,但是稍微等一會,就被關掉了
    print(p1.is_alive())  # 結果為True
    print('等會。。。。')
    time.sleep(1)
    print(p1.is_alive())  # 結果為False

# 打印結果:
Student-1 is 做作業
Student-1 is 做作業結束
True
等會。。。。
False

 

進程對象的其他方法二:namepid

from multiprocessing import Process
import time
import random

class Study(Process):
    def __init__(self, name):
        # self.name=name
        # super().__init__() #Process的__init__方法會執行self.name=Study-1,所以加到這里,會覆蓋我們的self.name=name

        # 為我們開啟的進程設置名字的做法
        super().__init__()
        self.name = name

    def run(self):
        print('%s is studying' % self.name)
        time.sleep(random.randrange(1, 3))
        print('%s is study end' % self.name)

p = Study('young')
p.start()
print('開始')
print(p.pid)  # 查看pid

# 打印結果
開始
58745
young is studying
young is study end

 

注意:Process__init__方法會執行self.name=Study-1,所以我們的self.name=name應該加在super().__init__()之后,這樣進程的名字就是我們設置的了

2.5 僵屍進程和孤兒進程

詳見:

三、守護進程

​ 之前我們講的子進程是不會隨着主進程的結束而結束,子進程全部執行完之后,程序才結束,那么如果有一天我們的需求是我的主進程結束了,由我主進程創建的那些子進程必須跟着結束,怎么辦?守護進程就來了!

主進程創建守護進程

  • 其一:守護進程會在主進程代碼執行結束后就終止

  • 其二:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children

注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止

import os
import time
from multiprocessing import Process

class Myprocess(Process):
    def __init__(self, person):
        super().__init__()
        self.person = person

    def run(self):
        print(os.getpid(), self.person)
        print('%s正在和女主播聊天' % self.person)
        time.sleep(3)

if __name__ == '__main__':
    p = Myprocess('太白')
    p.daemon = True  # 一定要在p.start()前設置,設置p為守護進程,禁止p創建子進程,並且父進程代碼執行結束,p即終止運行
    p.start()
    # time.sleep(1)  # 在sleep時linux下查看進程id對應的進程ps -ef|grep id
    print('')

 

四、進程同步(鎖)

​ 通過剛剛的學習,我們千方百計實現了程序的異步,讓多個任務可以同時在幾個進程中並發處理,他們之間的運行沒有順序,一旦開啟也不受我們控制。盡管並發編程讓我們能更加充分的利用IO資源,但是也給我們帶來了新的問題:進程之間數據不共享,但是共享同一套文件系統,所以訪問同一個文件,或同一個打印終端,是沒有問題的,而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理。

多進程搶占輸出資源,導致打印混亂的示例:

import os
import time
import random
from multiprocessing import Process

def work(n):
    print('%s: %s is running' % (n, os.getpid()))
    time.sleep(random.random())
    print('%s:%s is done' % (n, os.getpid()))

if __name__ == '__main__':
    for i in range(5):
        p = Process(target=work, args=(i,))
        p.start()

# 打印結果:
0: 59118 is running
1: 59119 is running
2: 59120 is running
3: 59121 is running
4: 59122 is running
0:59118 is done
2:59120 is done
1:59119 is done
3:59121 is done
4:59122 is done

 

看結果,可以看出兩個問題:

問題一:每個進程中work函數的第一個打印就不是按照我們for循環的0-4的順序來打印的

問題二:我們發現,每個work進程中有兩個打印,但是我們看到所有進程中第一個打印的順序為0-1-2-3-4,但是第二個打印沒有按照這個順序,變成了0-2-1-3-4,說明我們一個進程中的程序的執行順序都混亂了。

問題的解決方法: 第二個問題可以加鎖來解決,第一個問題是沒有辦法解決的,因為進程開到了內核,有操作系統來決定進程的調度,我們自己控制不了

加鎖:由並發改成了串行,犧牲了運行效率,但避免了競爭

from multiprocessing import Process, Lock
import os, time

def work(n, lock):
    # 加鎖,保證每次只有一個進程在執行鎖里面的程序,這一段程序對於所有寫上這個鎖的進程,大家都變成了串行
    lock.acquire()
    print('%s: %s is running' % (n, os.getpid()))
    time.sleep(1)
    print('%s:%s is done' % (n, os.getpid()))
    # 解鎖,解鎖之后其他進程才能去執行自己的程序
    lock.release()

if __name__ == '__main__':
    lock = Lock()
    for i in range(5):
        p = Process(target=work, args=(i, lock))
        p.start()
#打印結果:
0: 59138 is running
0:59138 is done
1: 59139 is running
1:59139 is done
2: 59140 is running
2:59140 is done
3: 59141 is running
3:59141 is done
4: 59142 is running
4:59142 is done

 

結果分析:通過結果我們可以看出,多進程剛開始去執行的時候,每次運行,首先打印出來哪個進程的程序是不固定的,但是我們解決了上面打印混亂示例代碼的第二個問題,那就是同一個進程中的兩次打印都是先完成的,然后才切換到下一個進程去,打印下一個進程中的兩個打印結果,說明我們控制住了同一進程中的代碼執行順序,如果涉及到多個進程去操作同一個數據或者文件的時候,就不擔心數據算錯或者文件中的內容寫入混亂了。

4.1 模擬搶票

​ 上面這種情況雖然使用加鎖的形式實現了順序的執行,但是程序又重新變成串行了,這樣確實會浪費了時間,卻保證了數據的安全。接下來,我們以模擬搶票為例,來看看數據安全的重要性:

  • 並發運行,效率高,但是競爭同一個文件,導致數據混亂
from multiprocessing import Process, Lock
import json, random, time

# 查看剩余票數
def search():
    dic = json.load(open('db'))  # 打開文件,直接load文件中的內容,拿到文件中的包含剩余票數的字典
    print('剩余票數%s' % dic['count'])

# 搶票
def get():
    dic = json.load(open('db'))
    time.sleep(0.1)  # 模擬讀數據的網絡延遲,那么進程之間的切換,導致所有人拿到的字典都是{"count": 1},也就是每個人都拿到了這一票。
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(0.2)  # 模擬寫數據的網絡延遲
        json.dump(dic, open('db', 'w'))
        # 最終結果導致,每個人顯示都搶到了票,這就出現了問題~
        print('購票成功')

def task():
    search()
    get()

if __name__ == '__main__':
    for i in range(3):  # 模擬並發100個客戶端搶票
        p = Process(target=task)
        p.start()

 

注意:首先在當前文件目錄下創建一個名為db的文件,文件db的內容為:{"count":1},只有這一行數據,並且注意,每次運行完了之后,文件中的1變成了0,你需要手動將0改為1,然后在去運行代碼。注意一定要用雙引號,不然json無法識別

運行結果:

剩余票數1
剩余票數1
剩余票數1
購票成功
購票成功
購票成功

 

結果分析:由於網絡延遲等原因使得進程切換,導致每個人都搶到了這最后一張票

  • 加鎖:購票行為由並發變成了串行,犧牲了效率,但是保證了數據安全
from multiprocessing import Process, Lock
import time, json

# 查看剩余票數
def search():
    dic = json.load(open('db'))  # 打開文件,直接load文件中的內容,拿到文件中的包含剩余票數的字典
    print('剩余票數%s' % dic['count'])

# 搶票
def get():
    dic = json.load(open('db'))
    time.sleep(0.1)  # 模擬讀數據的網絡延遲,那么進程之間的切換,導致所有人拿到的字典都是{"count": 1},也就是每個人都拿到了這一票。
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(0.2)  # 模擬寫數據的網絡延遲
        json.dump(dic, open('db', 'w'))
        # 最終結果導致,每個人顯示都搶到了票,這就出現了問題~
        print('購票成功')
    else:
        print('sorry,沒票了親!')

def task(lock):
    search()
    # 因為搶票的時候是發生數據變化的時候,所有我們將鎖加加到這里
    lock.acquire()
    get()
    lock.release()

if __name__ == '__main__':
    lock = Lock()  # 創建一個鎖
    for i in range(3):  # 模擬並發100個客戶端搶票
        p = Process(target=task, args=(lock,))  # 將鎖作為參數傳給task函數
        p.start()

# 打印結果:
剩余票數1
剩余票數1
剩余票數1
購票成功
sorry,沒票了親!
sorry,沒票了親!

 

4.2 進程鎖總結

加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,犧牲了速度卻保證了數據安全。進程鎖的問題:

  1. 效率低(共享數據基於文件,而文件是硬盤上的數據)
  2. 需要自己加鎖處理

因此我們最好找尋一種解決方案能夠兼顧:

1、效率高(多個進程共享一塊內存的數據)

2、幫我們處理好鎖問題。

這就是mutiprocessing模塊為我們提供的基於消息的IPC通信機制:隊列和管道:

隊列和管道都是將數據存放於內存中

隊列又是基於(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來,
我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。

IPC通信機制(了解):IPC是intent-Process Communication的縮寫,含義為進程間通信或者跨進程通信,是指兩個進程之間進行數據交換的過程。IPC不是某個系統所獨有的,任何一個操作系統都需要有相應的IPC機制,
比如Windows上可以通過剪貼板、管道和郵槽等來進行進程間通信,而Linux上可以通過命名共享內容、信號量等來進行進程間通信。Android它也有自己的進程間通信方式,Android建構在Linux基礎上,繼承了一部分Linux的通信方式。

五、隊列

​ 進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的。

​ 進程間通信(IPC)方式一:隊列。隊列就像一個特殊的列表,但是可以設置固定長度,並且從前面插入數據,從后面取出數據,先進先出。

Queue([maxsize]) - 創建共享的進程隊列。
參數 :maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。
注意:底層隊列使用管道和鎖實現。

5.1 Queue的方法介紹

q = Queue([maxsize])
創建共享的進程隊列。maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還需要運行支持線程以便隊列中的數據傳輸到底層管道中。
Queue的實例q具有以下方法:

q.get( [ block [ ,timeout ] ] )
返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。block用於控制阻塞行為,默認為True. 如果設置為False,將引發Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有項目變為可用,將引發Queue.Empty異常。

q.get_nowait( )
同q.get(False)方法。

q.put(item [, block [,timeout ] ] )
將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認為True。如果設置為False,將引發Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時后將引發Queue.Full異常。

q.qsize()
返回隊列中目前項目的正確數量。此函數的結果並不可靠,因為在返回結果和在稍后程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引發NotImplementedError異常。

q.empty()
如果調用此方法時 q為空,返回True。如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。

q.full()
如果q已滿,返回為True. 由於線程的存在,結果也可能是不可靠的(參考q.empty()方法)

q.close()
關閉隊列,防止隊列中加入更多數據。調用此方法時,后台線程將繼續寫入那些已入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。

q.cancel_join_thread()
不會再進程退出時自動連接后台線程。這可以防止join_thread()方法阻塞。

q.join_thread()
連接隊列的后台線程。此方法用於在調用q.close()方法后,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread()方法可以禁止這種行為。

 

5.2 隊列的簡單用法

from multiprocessing import Queue

q = Queue(3)  # 創建一個隊列對象,隊列長度為3

# put ,get ,put_nowait,get_nowait,full,empty
q.put(3)  # 往隊列中添加數據
q.put(2)
q.put(1)
# q.put(4)   # 如果隊列已經滿了,程序就會停在這里,等待數據被別人取走,再將數據放入隊列。
# 如果隊列中的數據一直不被取走,程序就會永遠停在這里。
try:
    q.put_nowait(4)  # 可以使用put_nowait,如果隊列滿了不會阻塞,但是會因為隊列滿了而報錯。
except:  # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,但是會丟掉這個消息。
    print('隊列已經滿了')

# 因此,我們再放入數據之前,可以先看一下隊列的狀態,如果已經滿了,就不繼續put了。
print(q.full())  # 查看是否滿了,滿了返回True,不滿返回False

print(q.get())  # 取出數據
print(q.get())
print(q.get())
# print(q.get()) # 同put方法一樣,如果隊列已經空了,那么繼續取就會出現阻塞。
try:
    q.get_nowait(3)  # 可以使用get_nowait,如果隊列滿了不會阻塞,但是會因為沒取到值而報錯。
except:  # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。
    print('隊列已經空了')

print(q.empty())  # 空了

# 打印結果:
隊列已經滿了
True
3
2
1
隊列已經空了
True

 

子進程與父進程通過隊列進行通信

from multiprocessing import Process, Queue
import time

# 8. q = Queue(2) #創建一個Queue對象,如果寫在這里,那么在windows還子進程去執行的時候,我們知道子進程中還會執行這個代碼,但是子進程中不能夠再次創建了,也就是這個q就是你主進程中創建的那個q,通過我們下面在主進程中先添加了一個字符串之后,在去開啟子進程,你會發現,小鬼這個字符串還在隊列中,也就是說,我們使用的還是主進程中創建的這個隊列。
def f(q):
    # q = Queue() #9. 我們在主進程中開啟了一個q,如果我們在子進程中的函數里面再開一個q,那么你下面q.put('姑娘,多少錢~')添加到了新創建的這q里里面了
    q.put('姑娘,多少錢~')  # 4.調用主函數中p進程傳遞過來的進程參數 put函數為向隊列中添加一條數據。
    # print(q.qsize())  #6.查看隊列中有多少條數據了

def f2(q):
    print('>>>>>: ')
    print(q.get())  # 5.取數據

if __name__ == '__main__':
    q = Queue()  # 1.創建一個Queue對象
    q.put('小鬼')

    p = Process(target=f, args=(q,))  # 2.創建一個進程
    p2 = Process(target=f2, args=(q,))  # 3.創建一個進程
    p.start()
    p2.start()
    time.sleep(1)  # 7.如果阻塞一點時間,就會出現主進程運行太快,導致我們在子進程中查看qsize為1個。
    print(q.get())  #結果:小鬼
    # print(q.get())  # 結果:姑娘,多少錢~
    p.join()

 

批量的生產輸入放入隊列,再批量的獲取結果

接下來看一個稍微復雜的例子:

import os
import time
import multiprocessing

# 向queue中輸入數據的函數
def inputQ(queue):
    info = str(os.getpid()) + '(put):' + str(time.asctime())
    queue.put(info)

# 向queue中輸出數據的函數
def outputQ(queue):
    info = queue.get()
    print('%s%s\033[32m%s\033[0m' % (str(os.getpid()), '(get):', info))

# Main
if __name__ == '__main__':
    # windows下,如果開啟的進程比較多的話,程序會崩潰,為了防止這個問題,使用freeze_support()方法來解決。知道就行啦
    multiprocessing.freeze_support()
    record1 = []  # store input processes
    record2 = []  # store output processes
    queue = multiprocessing.Queue(3)

    # 輸入進程
    for i in range(10):
        process = multiprocessing.Process(target=inputQ, args=(queue,))
        process.start()
        record1.append(process)

    # 輸出進程
    for i in range(10):
        process = multiprocessing.Process(target=outputQ, args=(queue,))
        process.start()
        record2.append(process)

    for p in record1:
        p.join()

    for p in record2:
        p.join()
輸出結果:

63226(get):63216(put):Wed Apr 10 11:08:57 2019
63227(get):63217(put):Wed Apr 10 11:08:57 2019
63228(get):63218(put):Wed Apr 10 11:08:57 2019
63229(get):63219(put):Wed Apr 10 11:08:57 2019
63230(get):63220(put):Wed Apr 10 11:08:57 2019
63231(get):63221(put):Wed Apr 10 11:08:57 2019
63232(get):63222(put):Wed Apr 10 11:08:57 2019
63233(get):63223(put):Wed Apr 10 11:08:57 2019
63234(get):63224(put):Wed Apr 10 11:08:57 2019
63235(get):63225(put):Wed Apr 10 11:08:57 2019

 

隊列是進程安全的:同一時間只能一個進程拿到隊列中的一個數據,你拿到了一個數據,這個數據別人就拿不到了。

5.3 生產者消費者模型

​ 在並發編程中使用生產者和消費者模式能夠解決絕大數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。

為什么要使用生產者和消費者模式:

​ 在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題引入了生產者和消費者模式。

什么是生產者和消費者模式?

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力,並且我可以根據生產速度和消費速度來均衡一下多少個生產者可以為多少個消費者提供足夠的服務,就可以開多進程等,而這些進程都是到阻塞隊列或者說是緩沖區中去獲取或者添加數據。

基於隊列實現一個生產者消費者模型

from multiprocessing import Process, Queue
import time, random, os

def consumer(q):
    while True:
        res = q.get()
        time.sleep(random.randint(1, 3))
        print('\033[45m%s 吃 %s\033[0m' % (os.getpid(), res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1, 3))
        res = '包子%s' % i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' % (os.getpid(), res))

if __name__ == '__main__':
    q = Queue()
    # 生產者們:即廚師們
    p1 = Process(target=producer, args=(q,))

    # 消費者們:即吃貨們
    c1 = Process(target=consumer, args=(q,))

    # 開始
    p1.start()
    c1.start()
    print('')

 

通過上面基於隊列的生產者消費者代碼示例,我們發現一個問題:主進程永遠不會結束,原因是:生產者p在生產完后就結束了,但是消費者c在取空了q之后,則一直處於死循環中且卡在了q.get()這一步。解決方式則是讓生產者在生產完畢后,往隊列中再發一個結束信號,這樣消費者在接收到結束信號后就可以break出死循環。

1.子進程生產者在生產完畢后發送結束信號None:

# 子進程生產者在生產完畢后發送結束信號
from multiprocessing import Process, Queue
import time, random, os

def consumer(q):
    while True:
        res = q.get()
        if res is None:  #收到結束信號則結束
            break
        time.sleep(random.randint(1, 3))
        print('\033[45m%s 吃 %s\033[0m' % (os.getpid(), res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1, 3))
        res = '包子%s' % i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' % (os.getpid(), res))
    q.put(None)  # 在自己的子進程的最后加入一個結束信號

if __name__ == '__main__':
    q = Queue()
    # 生產者們:即廚師們
    p1 = Process(target=producer, args=(q,))

    # 消費者們:即吃貨們
    c1 = Process(target=consumer, args=(q,))

    # 開始
    p1.start()
    c1.start()
    print('')

 

​ 注意:結束信號None,不一定由生產者發,主進程同樣可以發,但主進程需要等生產者結束后才應該發送該信號

2.主進程在生產者完畢后發送結束信號None:

# 主進程在生產者完畢后發送結束信號None
from multiprocessing import Process, Queue
import time, random, os

def consumer(q):
    while True:
        res = q.get()
        if res is None:
            break
        time.sleep(random.randint(1, 3))
        print('\033[45m%s 吃 %s\033[0m' % (os.getpid(), res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1, 3))
        res = '包子%s' % i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' % (os.getpid(), res))

if __name__ == '__main__':
    q = Queue()
    # 生產者們:即廚師們
    p1 = Process(target=producer, args=(q,))

    # 消費者們:即吃貨們
    c1 = Process(target=consumer, args=(q,))

    # 開始
    p1.start()
    c1.start()

    p1.join()  #等待生產者進程結束
    q.put(None)  #發送結束信號
    print('')

 

但上述解決方式,在有多個生產者和消費者時,由於隊列我們說了是進程安全的,我一個進程拿走了結束信號,另一個進程就拿不到了,還需要多發送一個結束信號,有幾個取數據 的進程就要發送幾個結束信號,我們則需要用一個比較low的方式去解決:

3.有多個消費者和生產者的時候需要發送多次結束信號

# 有多個消費者和生產者的時候需要發送多次結束信號
from multiprocessing import Process, Queue
import time, random, os

def consumer(q):
    while True:
        res = q.get()
        if res is None:
            break  # 收到結束信號則結束
        time.sleep(random.randint(1, 3))
        print('\033[45m%s 吃 %s\033[0m' % (os.getpid(), res))

def producer(name, q):
    for i in range(2):
        time.sleep(random.randint(1, 3))
        res = '%s%s' % (name, i)
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' % (os.getpid(), res))

if __name__ == '__main__':
    q = Queue()
    # 生產者們:即廚師們
    p1 = Process(target=producer, args=('包子', q))
    p2 = Process(target=producer, args=('骨頭', q))
    p3 = Process(target=producer, args=('泔水', q))

    # 消費者們:即吃貨們
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))

    # 開始
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()  # 必須保證生產者全部生產完畢,才應該發送結束信號
    p2.join()
    p3.join()
    q.put(None)  # 有幾個消費者就應該發送幾次結束信號None
    q.put(None)  # 發送結束信號
    print('')

 

其實我們的思路無非就是發送結束信號而已,有另外一種隊列提供了這種機制。

5.4 JoinableQueue

JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

參數介紹:
maxsize是隊列中允許最大項數,省略則無大小限制

方法介紹:
JoinableQueue實例p除了與Queue對象相同的方法之外還具有:

  1. q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大於從隊列中刪除項目的數量,將引發ValueError異常
  2. q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止,也就是隊列中的數據全部被get拿走了。

JoinableQueue隊列實現生產者消費者模型

# JoinableQueue隊列實現生產者消費者模型
from multiprocessing import Process, JoinableQueue
import time, random, os

def consumer(q):
    while True:
        res = q.get()
        # time.sleep(random.randint(1,3))
        time.sleep(random.random())
        print('\033[45m%s 吃 %s\033[0m' % (os.getpid(), res))

        q.task_done()  # 向q.join()發送一次信號,證明一個數據已經被取走並執行完了

def producer(name, q):
    for i in range(10):
        # time.sleep(random.randint(1,3))
        time.sleep(random.random())
        res = '%s%s' % (name, i)
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' % (os.getpid(), res))
    print('%s生產結束' % name)

    q.join()  # 生產完畢,使用此方法進行阻塞,直到隊列中所有項目均被處理。

    print('%s生產結束~~~~~~' % name)

if __name__ == '__main__':
    q = JoinableQueue()
    # 生產者們:即廚師們
    p1 = Process(target=producer, args=('包子', q))
    p2 = Process(target=producer, args=('骨頭', q))
    p3 = Process(target=producer, args=('泔水', q))

    # 消費者們:即吃貨們
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))
    c1.daemon = True  # 如果不加守護,那么主進程結束不了,但是加了守護之后,必須確保生產者的內容生產完並且被處理完了,所有必須還要在主進程給生產者設置join,才能確保生產者生產的任務被執行完了,並且能夠確保守護進程在所有任務執行完成之后才隨着主進程的結束而結束。
    c2.daemon = True

    # 開始
    p_l = [p1, p2, p3, c1, c2]
    for p in p_l:
        p.start()

    p1.join()  # 我要確保你的生產者進程結束了,生產者進程的結束標志着你生產的所有的人任務都已經被處理完了
    p2.join()
    p3.join()
    print('')

    # 主進程等--->p1,p2,p3等---->c1,c2
    # p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發到隊列的數據
    # 因而c1,c2也沒有存在的價值了,不需要繼續阻塞在進程中影響主進程了。應該隨着主進程的結束而結束,所以設置成守護進程就可以了。

 

5.5 生產者消費者模型總結

#程序中有兩類角色
    一類負責生產數據(生產者)
    一類負責處理數據(消費者)

#引入生產者消費者模型為了解決的問題是:
    平衡生產者與消費者之間的工作能力,從而提高程序整體處理數據的速度

#如何實現:
    生產者<-->隊列<——>消費者

    生產者消費者模型實現 類程序的解耦和

6.管道(了解)

進程間通信(IPC)方式二:管道(不推薦使用,了解即可),會導致數據不安全的情況出現,后面我們會說到為什么會帶來數據 不安全的問題。

6.1 管道介紹

創建管道的類:
Pipe([duplex]):在進程之間創建一條管道,並返回元組(conn1,conn2),其中conn1conn2表示管道兩端的連接對象,強調一點:必須在產生Process對象之前產生管道

參數介紹:
dumplex:默認管道是全雙工的,如果將duplex設成Falseconn1只能用於接收,conn2只能用於發送。

主要方法:

conn1.recv():接收conn2.send(obj)發送的對象。如果沒有消息可接收,recv方法會一直阻塞。如果連接的另外一端已經關閉,那么recv方法會拋出EOFError。

conn1.send(obj):通過連接發送對象。obj是與序列化兼容的任意對象

其他方法:

conn1.close():關閉連接。如果conn1被垃圾回收,將自動調用此方法

conn1.fileno():返回連接使用的整數文件描述符

conn1.poll([timeout]):如果連接上的數據可用,返回True。timeout指定等待的最長時限。如果省略此參數,方法將立即返回結果。如果將timeout射成None,操作將無限期地等待數據到達。

conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。如果進入的消息,超過了這個最大值,將引發IOError異常,並且在連接上無法進行進一步讀取。如果連接的另外一端已經關閉,再也不存在任何數據,將引發EOFError異常。

conn.send_bytes(buffer [, offset [, size]]):通過連接發送字節數據緩沖區,buffer是支持緩沖區接口的任意對象,offset是緩沖區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,然后調用c.recv_bytes()函數進行接收    

conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩沖區接口(即bytearray對象或類似的對象)。offset指定緩沖區中放置消息處的字節位移。返回值是收到的字節數。如果消息長度大於可用的緩沖區空間,將引發BufferTooShort異常。

6.2 管道初使用

from multiprocessing import Process, Pipe

def f(conn):
    conn.send("Hello World")  # 子進程發送了消息
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()  # 建立管道,拿到管道的兩端,雙工通信方式,兩端都可以收發消息
    p = Process(target=f, args=(child_conn,))  # 將管道的一段給子進程
    p.start()  # 開啟子進程
    print(parent_conn.recv())  # 主進程接受了消息
    p.join()

 

應該特別注意管道端點的正確管理問題。如果是生產者或消費者中都沒有使用管道的某個端點,就應將它關閉。這也說明了為何在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。如果忘記執行這些步驟,程序可能在消費者中的recv()操作上掛起(就是阻塞)。管道是由操作系統進行引用計數的,必須在所有進程中關閉管道的相同一端就會能生成EOFError異常。因此,在生產者中關閉管道不會有任何效果,除非消費者也關閉了相同的管道端點。

引發EOFError

from multiprocessing import Process, Pipe

def f(parent_conn, child_conn):
    # parent_conn.close() #不寫close將不會引發EOFError
    while True:
        try:
            print(child_conn.recv())
        except EOFError:
            child_conn.close()
            break

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(parent_conn, child_conn,))
    p.start()
    child_conn.close()
    parent_conn.send('hello')
    parent_conn.close()
    p.join()

 

主進程將管道的兩端都傳送給子進程,子進程和主進程共用管道的兩種報錯情況,都是在recv接收的時候報錯的:
1.主進程和子進程中的管道的相同一端都關閉了,出現EOFError
2.如果你管道的一端在主進程和子進程中都關閉了,但是你還用這個關閉的一端去接收消息,那么就會出現OSError

所以你關閉管道的時候,就容易出現問題,需要將所有只用這個管道的進程中的兩端全部關閉才行。當然也可以通過異常捕獲(try:except EOFerror)來處理。

雖然我們在主進程和子進程中都打印了一下conn1一端的對象,發現兩個不再同一個地址,但是子進程中的管道和主進程中的管道還是可以通信的,因為管道是同一套,系統能夠記錄。    

我們的目的就是關閉所有的管道,那么主進程和子進程進行通信的時候,可以給子進程傳管道的一端就夠了,並且用我們之前學到的,信息發送完之后,再發送一個結束信號None,那么你收到的消息為None的時候直接結束接收或者說結束循環,就不用每次都關閉各個進程中的管道了。

通過結束信號None來結束程序:

from multiprocessing import Pipe, Process

def func(conn):
    while True:
        msg = conn.recv()
        if msg is None: break
        print(msg)

if __name__ == '__main__':
    conn1, conn2 = Pipe()
    p = Process(target=func, args=(conn1,))
    p.start()
    for i in range(10):
        conn2.send('約吧')
    conn2.send(None)

 

6.3 通過管道實現生產者消費者模型

# 管道實現消費者生產者模型
from multiprocessing import Process, Pipe

def consumer(p, name):
    produce, consume = p
    produce.close()
    while True:
        try:
            baozi = consume.recv()
            print('%s 收到包子:%s' % (name, baozi))
        except EOFError:
            break

def producer(seq, p):
    produce, consume = p
    consume.close()
    for i in seq:
        produce.send(i)

if __name__ == '__main__':
    produce, consume = Pipe()

    c1 = Process(target=consumer, args=((produce, consume), 'c1'))
    c1.start()

    seq = (i for i in range(10))
    producer(seq, (produce, consume))

    produce.close()
    consume.close()

    c1.join()
    print('主進程')

 

關於管道會造成數據不安全問題的官方解釋:

The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a 
pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from
processes using different ends of the pipe at the same time.

由Pipe方法返回的兩個連接對象表示管道的兩端。每個連接對象都有send和recv方法(除其他之外)。注意,如果兩個進程(或線程)試圖同時從管道的同一端讀取或寫入數據,那么管道中的數據可能會損壞。當然,在使用管道的不同端部的過程中不存在損壞風險。

多個消費者競爭會出現數據不安全的問題的解決方案- 加鎖:

from multiprocessing import Process, Pipe, Lock

def consumer(p, name, lock):
    produce, consume = p
    produce.close()
    while True:
        lock.acquire()
        baozi = consume.recv()
        lock.release()
        if baozi:
            print('%s 收到包子:%s' % (name, baozi))
        else:
            consume.close()
            break

def producer(p, n):
    produce, consume = p
    consume.close()
    for i in range(n):
        produce.send(i)
    produce.send(None)
    produce.send(None)
    produce.close()

if __name__ == '__main__':
    produce, consume = Pipe()
    lock = Lock()
    c1 = Process(target=consumer, args=((produce, consume), 'c1', lock))
    c2 = Process(target=consumer, args=((produce, consume), 'c2', lock))
    p1 = Process(target=producer, args=((produce, consume), 10))
    c1.start()
    c2.start()
    p1.start()

    produce.close()
    consume.close()

    c1.join()
    c2.join()
    p1.join()
    print('主進程')

 

管道可以用於雙工通信,通常利用在客戶端/服務端中使用的請求/響應模型,或者遠程過程調用,就可以使用管道編寫與進程交互的程序,像前面將網絡通信的時候,我們使用了一個叫subprocess的模塊,里面有個參數是pipe管道,執行系統指令,並通過管道獲取結果。

7.數據共享(了解)

展望未來,基於消息傳遞的並發編程是大勢所趨

即便是使用線程,推薦做法也是將程序設計為大量獨立的線程集合

通過消息隊列交換數據。這樣極大地減少了對使用鎖定和其他同步手段的需求,還可以擴展到分布式系統中

進程間應該盡量避免通信,即便需要通信,也應該選擇進程安全的工具來避免加鎖帶來的問題,應該盡量避免使用本節所講的共享數據的方式,以后我們會嘗試使用數據庫來解決進程之間的數據共享問題。

7.1 Manager模塊介紹

Manager模塊介紹:

進程間數據是獨立的,可以借助於隊列或管道實現通信,二者都是基於消息傳遞的
雖然進程間數據獨立,但可以通過Manager實現數據共享,事實上Manager的功能遠不止於此
A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.

 

多進程共同去處理共享數據的時候,就和我們多進程同時去操作一個文件中的數據是一樣的,不加鎖就會出現錯誤的結果,進程不安全的,所以也需要加鎖

7.2 Manager模塊使用

from multiprocessing import Manager, Process, Lock

def work(d, lock):
    with lock:  # 不加鎖而操作共享的數據,肯定會出現數據錯亂
        d['count'] -= 1

if __name__ == '__main__':
    lock = Lock()
    with Manager() as m:
        dic = m.dict({'count': 100})
        p_l = []
        for i in range(100):
            p = Process(target=work, args=(dic, lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)

 

7.3 進程間通信總結

總結一下,進程之間的通信:隊列、管道、數據共享也算

下面要講的信號量和事件也相當於鎖,也是全局的,所有進程都能拿到這些鎖的狀態,進程之間這些鎖啊信號量啊事件啊等等的通信,其實底層還是socekt,只不過是基於文件的socket通信,而不是跟上面的數據共享啊空間共享啊之類的機制,我們之前學的是基於網絡的socket通信,還記得socket的兩個家族嗎,一個文件的一個網絡的,所以將來如果說這些鎖之類的報錯,可能你看到的就是類似於socket的錯誤,簡單知道一下就可以啦~~~

工作中常用的是鎖,信號量和事件不常用,但是信號量和事件面試的時候會問到

八、信號量(了解)

8.1 信號量介紹

互斥鎖同時只允許一個線程更改數據,而信號量Semaphore是同時允許一定數量的線程更改數據 。

假設商場里有4個迷你唱吧,所以同時可以進去4個人,如果來了第五個人就要在外面等待,等到有人出來才能再進去玩。

實現:
信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器為0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。

信號量與進程池的概念很像,但是要區分開,信號量涉及到加鎖的概念

8.2 信號量使用

from multiprocessing import Process, Semaphore
import time, random

def go_ktv(sem, user):
    sem.acquire()
    print('%s 占到一間ktv小屋' % user)
    time.sleep(random.randint(0, 3))  # 模擬每個人在ktv中待的時間不同
    sem.release()

if __name__ == '__main__':
    sem = Semaphore(4)
    p_l = []
    for i in range(13):
        p = Process(target=go_ktv, args=(sem, 'user%s' % i,))
        p.start()
        p_l.append(p)

    for i in p_l:
        i.join()
    print('============》')

 

九、事件(了解)

9.1 事件介紹

python線程的事件(Event)用於主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。

事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么當程序執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那么event.wait 方法時便不再阻塞。

clear:將“Flag”設置為False
set:將“Flag”設置為True

9.2 事件方法的使用

# 事件方法的使用
from multiprocessing import Event

e = Event()  # 創建一個事件對象
print(e.is_set())  # is_set()查看一個事件的狀態,默認為False,可通過set方法改為True
print('look here!')
# e.set()          #將is_set()的狀態改為True。
# print(e.is_set())#is_set()查看一個事件的狀態,默認為False,可通過set方法改為Tr
# e.clear()        #將is_set()的狀態改為False
# print(e.is_set())#is_set()查看一個事件的狀態,默認為False,可通過set方法改為Tr
e.wait()  # 根據is_set()的狀態結果來決定是否在這阻塞住,is_set()=False那么就阻塞,is_set()=True就不阻塞
print('give me!!')

# set和clear  修改事件的狀態 set-->True   clear-->False
# is_set     用來查看一個事件的狀態
# wait       依據事件的狀態來決定是否阻塞 False-->阻塞  True-->不阻塞

 

9.3 通過事件來模擬紅綠燈

# 通過事件來模擬紅綠燈示例
from multiprocessing import Process, Event
import time, random

def car(e, n):
    while True:
        if not e.is_set():  # 進程剛開啟,is_set()的值是Flase,模擬信號燈為紅色
            print('\033[31m紅燈亮\033[0m,car%s等着' % n)
            e.wait()  # 阻塞,等待is_set()的值變成True,模擬信號燈為綠色
            print('\033[32m車%s 看見綠燈亮了\033[0m' % n)
            time.sleep(random.randint(2, 4))
            if not e.is_set():  # 如果is_set()的值是Flase,也就是紅燈,仍然回到while語句開始
                continue
            print('車開遠了,car', n)
            break

# def police_car(e, n):
#     while True:
#         if not e.is_set():# 進程剛開啟,is_set()的值是Flase,模擬信號燈為紅色
#             print('\033[31m紅燈亮\033[0m,car%s等着' % n)
#             e.wait(0.1) # 阻塞,等待設置等待時間,等待0.1s之后沒有等到綠燈就闖紅燈走了
#             if not e.is_set():
#                 print('\033[33m紅燈,警車先走\033[0m,car %s' % n)
#             else:
#                 print('\033[33;46m綠燈,警車走\033[0m,car %s' % n)
#         break

def traffic_lights(e, inverval):
    while True:
        time.sleep(inverval)
        if e.is_set():
            print('######', e.is_set())
            e.clear()  # ---->將is_set()的值設置為False
        else:
            e.set()  # ---->將is_set()的值設置為True
            print('***********', e.is_set())

if __name__ == '__main__':
    e = Event()
    for i in range(10):
        p = Process(target=car, args=(e, i,))  # 創建10個進程控制10輛車
        time.sleep(random.randint(1, 3))  # 車不是一下子全過來
        p.start()

    # for i in range(5):
    #     p = Process(target=police_car, args=(e, i,))  # 創建5個進程控制5輛警車
    #     p.start()

    # 信號燈必須是單獨的進程,因為它不管你車開到哪了,我就按照我紅綠燈的規律來閃爍變換,對吧
    t = Process(target=traffic_lights, args=(e, 5))  # 創建一個進程控制紅綠燈
    t.start()

    print('預備~~~~開始!!!')

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM