並發編程(五)


前言

上篇博客的內容是守護進程,對於操作系統來說可以在后台執行一些程序.這篇的內容是互斥鎖,在上上篇博客上說到進程內存空間互相隔離,所以可以通過共享文件來操作同一個文件,那么這樣操作的話會發生什么呢?

互斥鎖

多個進程需要共享數據時,先將其鎖定,此時資源狀態為'鎖定',其他進程不能更改;知道該進程釋放資源,將資源的狀態變成非'鎖定',其他的線程才能再次鎖定該資源.互斥鎖保證了每次只有一個進程進入寫入操作,從而保證了多進程情況下數據的正確性.

我們使用一個demo 來模擬多個進程操作同一個文件:

import json
import time,random
from multiprocessing import Process

def show_tickets(name):
    time.sleep(random.randint(1,3))
    with open('ticket.json', 'rt', encoding='utf-8') as f:
        data = json.load(f)
        print('%s 查看 剩余票數: %s' % (name, data['count']))

def buy_ticket(name):
    with open('ticket.json', 'rt', encoding='utf-8') as f:
        dic = json.load(f)

        if dic['count'] > 0:
            dic['count'] -= 1

            time.sleep(random.randint(1,3))

            with open('ticket.json', 'wt', encoding='utf-8') as f:
                json.dump(dic, f)
                print('%s: 購票成功' % name)

def task(name):
    show_tickets(name)
    buy_ticket(name)

if __name__ == '__main__':
    for i in range(1,11):
        p = Process(target=task, args=(i,))
        p.start()

運行結果:

在 ticket.json 里面只有一張票,結果卻造成多個用戶購買成功,這很顯然是不符合實際情況的.\

那么怎么解決呢?如果多個進程對同一個文件進行讀操作可以不進行限制,但是對同一個文件進行寫操作就必要要進行限制,不可以同時多個人對同一個文件進行寫操作.python 在多進程模塊里提供一個類, Lock 類,當進程獲取到鎖的時候其他的進程就必須要等待鎖釋放才可以進行爭搶,在這個例子里面就可以加上一把鎖來保護數據安全.

from multiprocessing import Process, Lock
import json,time,random


def show_tickets(name):
    time.sleep(random.randint(1,3))
    with open('ticket.json', 'rt', encoding='utf-8') as f:
        data = json.load(f)
        print('%s 查看 剩余票數: %s' % (name, data['count']))

def buy_ticket(name):
    time.sleep(random.randint(1,3))
    with open('ticket.json', 'rt', encoding='utf-8') as f:
        dic = json.load(f)

        if dic['count'] > 0:
            dic['count'] -= 1

            time.sleep(random.randint(1,3))

            with open('ticket.json', 'wt', encoding='utf-8') as f:
                json.dump(dic, f)
                print('%s: 購票成功' % name)

def task(name,lock):
    show_tickets(name)
    lock.acquire()
    buy_ticket(name)
    lock.release()

if __name__ == '__main__':
    mutex = Lock()
    for i in range(1,11):
        p = Process(target=task, args=(i,mutex))
        p.start()

運行結果:

這樣加了鎖(互斥鎖)就可以解決同時操作同一個文件造成的數據混亂問題了.

當使用多進程開發時,如果多個進程同時讀寫同一個資源,可能會造成數據的混亂,為了防止發生問題,使用鎖,或者使用 Process 的方法 join 將並行變為串行.

join 和鎖的區別

  1. join 人為控制進程的執行順序
  2. join 把整個進程全部串行,而鎖可以指定部分代碼串行

一旦串行,效率就會降低,一旦並行,數據就可能會出錯.

進程間通信

進程間通信( internal-process communication),我們在開啟子進程是希望子進程幫助完成任務,很多情況下需要將數據返回給父進程,然而進程間內存是物理隔離的.

解決辦法:

  1. 將共享數據放到文件中
  2. 管道 多進程模塊中的一個類,需要有父子關系
  3. 共享一快內存區域 需要操作系統分配

管道通信

Pipe類返回一個由管道連接的連接對象,默認情況下為雙工:

from multiprocessing import Process,Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()
    
if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    p.join()

運行結果:

[42, None, 'hello']

實例化 Pipe 類會返回兩個連接對象表示管道的兩端.每個連接對象都有 send() 和 recv() 方法(及其他).請注意,如果兩個進程同時嘗試讀寫管道的同一端,則管道中的數據可能會損壞.當然,同時使用管道的不同端部的過程不存在損壞的風險.

共享內存通信

Queue 通信

Queue類會生成一個先進先出的容器,通過往隊列中存取數據而進行進程間通信.

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])
    
if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())
    p.join()

運行結果:

[42, None, 'hello']

隊列其他特性

# 阻塞操作 必須掌握
q = Queue(3)
# # 存入數據
q.put("hello",block=False)
q.put(["1","2","3"],block=False)
q.put(1,block=False)
# 當容量滿的時候 再執行put 默認會阻塞直到執行力了get為止
# 如果修改block=False 直接報錯 因為沒地方放了
# q.put({},block=False)

# # # 取出數據
print(q.get(block=False))
print(q.get(block=False))
print(q.get(block=False))
# 對於get   當隊列中中沒有數據時默認是阻塞的  直達執行了put
# 如果修改block=False 直接報錯 因為沒數據可取了
print(q.get(block=False))



# 了解
q = Queue(3)
q.put("q",timeout=3)
q.put("q2",timeout=3)
q.put("q3",timeout=3)
# 如果滿了 願意等3秒  如果3秒后還存不進去 就炸
# q.put("q4",timeout=3)

print(q.get(timeout=3))
print(q.get(timeout=3))
print(q.get(timeout=3))
# 如果沒了 願意等3秒  如果3秒后還取不到數據 就炸
print(q.get(timeout=3))

Manager 通信

demo

from multiprocessing import  Process,Manager
import time

def task(dic):
    print("子進程xxxxx")
    # li[0] = 1
    # print(li[0])
    dic["name"] = "xx"

if __name__ == '__main__':
    m = Manager()
    # li = m.list([100])
    dic = m.dict({})
    # 開啟子進程
    p = Process(target=task,args=(dic,))
    p.start()
    time.sleep(3)

可以創建一片共享內存區域用來存取數據.

生產者消費者模型

什么是生產者消費者模型

在軟件開發過程中,經常碰到這樣的場景:

某些模塊負責生產數據,這些數據由其他模塊來負責處理(此處的模塊可能是:函數,線程,進程等).生產數據的模塊稱為生產者,而處理數據的模塊稱為消費者.在生產者與消費者之間的緩沖區稱之為倉庫.生產者負責往倉庫運輸商品,而消費者負責從倉庫里取出商品,這就構成了生產者消費者模型.

結構圖如下:

為了便於理解,我們舉一個寄信的例子。假設你要寄一封信,大致過程如下:

  1. 你把信寫好——相當於生產者生產數據;
  2. 你把信放入郵箱——相當於生產者把數據放入緩沖區;
  3. 郵遞員把信從郵箱取出,做相應處理——相當於消費者把數據取出緩沖區,處理數據.

生產者消費者模型的優點

  • 解耦

假設生產者和消費者分別是兩個線程.如果讓生產者直接調用消費者的某個方法,那么生產者對於消費者就會產生依賴(耦合).如果未來消費者的代碼發生改變,可能會影響到生產者的代碼.而如果兩者都依賴於某個緩沖區,兩者之間不直接依賴,耦合也就相應降低了.

舉個例子,我們去郵局投遞信件,如果不使用郵箱(也就是緩沖區,你必須得把信直接交給郵遞員.有同學會說,直接給郵遞員不是挺簡單的嘛?其實不簡單,你必須 得認識誰是郵遞員,才能把信給他.這就產生了你和郵遞員之間的依賴(相當於生產者和消費者的強耦合).萬一哪天郵遞員換人了,你還要重新認識一下(相當於消費者變化導致修改生產者代碼).而郵箱相對來說比較固定,你依賴它的成本就比較低(相當於和緩沖區之間的弱耦合).

  • 並發

由於生產者與消費者是兩個獨立的並發體,它們之間是使用緩沖區通信的,生產者只需要往緩沖區里丟數據,就可以接着生產下一個數據了,而消費者只需要從緩沖區拿數據即可,這樣就不會因為彼此的處理速度而發生阻塞.

繼續上面的例子,如果沒有郵箱,就得在郵局等郵遞員,知道他回來,把信交給他,這期間我們什么事都干不了(生產者阻塞).或者郵遞員挨家挨戶問,誰要寄信(消費者阻塞).

  • 支持忙閑不均

當生產者制造數據快的時候,消費者來不及處理,為處理的數據可以暫時存在緩沖區中,慢慢處理,而不至於因為消費者的性能過慢造成數據丟失或影響生產者生產數據.

再拿寄信的例子,假設郵遞員一次只能帶走1000封信,萬一碰上情人節或者其他的緊急任務,需要寄出的信超過了1000封,這個時候郵箱作為緩沖區就派上用場了.郵遞員把來不及帶走的信暫存在郵箱中,等下次過來時在拿走.

使用

from multiprocessing import Process, Queue
import time, random

def producer(name, food, q):
    for i in range(10):
        res = '%s %s' % (food, i)
        time.sleep(random.randint(1,3))
        q.put(res)
        print('%s 生產了 %s' % (name, res))
        
def consumer(name, q):
    while True:
        res = q.get()
        time.sleep(random.randint(1,3))
        print('%s 消費了 %s' % (name, res))

if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer, args=('musibii', '🍔', q))
    c = Process(target=consumer, args=('thales', q))
    p.start()
    c.start()
    
    p.join()
    c.join()
    print('主進程')

運行結果:

這樣的話該進程並不會結束,因為 get 方法是阻塞的,數據消費完就會一直等待知道生產者生產新的數據,而生產者只能生產9個數據.所以會一直阻塞.

改進使用

我們需要在消費者消費的時候知道隊列里面有多少數據,應該什么時候消費完了,所以可以在生產者里面生產結束后添加一個標志,比如 None.

import time, random
from multiprocessing import Process, Queue


# 制作熱狗
def make_hotdog(queue, name):
    for i in range(1, 4):
        time.sleep(random.randint(1, 2))
        print("%s 制作了一個🌭 %s" % (name, i))
        # 生產得到的數據
        data = "%s 生產的🌭%s" % (name, i)
        # 存到隊列中
        queue.put(data)
    # 裝入一個特別的數據 告訴消費方 沒有了
    # queue.put(None)


# 吃熱狗
def eat_hotdog(queue, name):
    while True:
        data = queue.get()
        if not data: break
        time.sleep(random.randint(1, 2))
        print("%s 吃了 %s" % (name, data))


if __name__ == '__main__':
    # 創建隊列
    q = Queue()
    p1 = Process(target=make_hotdog, args=(q, "musibii的熱狗店"))
    p2 = Process(target=make_hotdog, args=(q, "egon的熱狗店"))
    p3 = Process(target=make_hotdog, args=(q, "eureka的熱狗店"))

    c1 = Process(target=eat_hotdog, args=(q, "thales"))
    c2 = Process(target=eat_hotdog, args=(q, "maffia"))

    p1.start()
    p2.start()
    p3.start()

    c1.start()
    c2.start()

    # 讓主進程等三家店全都做完后....
    p1.join()
    p2.join()
    p3.join()

    # 添加結束標志   注意這種方法有幾個消費者就加幾個None 不太合適 不清楚將來有多少消費者
    q.put(None)
    q.put(None)

    # 現在 需要知道什么時候做完熱狗了 生產者不知道  消費者也不知道
    # 只有隊列知道

    print("主進程over")

    # 生產方不生產了 然而消費方不知道 所以已知等待  get函數阻塞
    # 三家店都放了一個空表示沒熱狗了  但是消費者只有兩個 他們只要看見None 就認為沒有了
    # 於是進程也就結束了  造成一些數據沒有被處理
    # 等待做有店都做完熱狗在放None

運行結果:

這樣就解決了最初版本消費之因為沒有數據而阻塞的問題了,但是這里還是有問題,因為不知道到底有多少消費者,因為想讓消費者知道數據已經結束了的話,需要給每個消費者一個標志位,這樣是不現實的.

完美使用

python 多進程模塊提供了一個JoinableQueue類,追根溯源繼承於 Queue,源碼看的頭疼.

import time, random
from multiprocessing import Process, JoinableQueue


# 制作熱狗
def make_hotdog(queue, name):
    for i in range(1,4):
        time.sleep(random.randint(1, 2))
        print("%s 制作的🌭 %s" % (name, i))
        # 生產得到的數據
        data = "%s 生產的🌭 %s" % (name, i)
        # 存到隊列中
        queue.put(data)
    # 裝入一個特別的數據 告訴消費方 沒有了
    # queue.put(None)


# 吃熱狗
def eat_hotdog(queue, name):
    while True:
        data = queue.get()
        time.sleep(random.randint(1, 2))
        print("%s 吃了%s" % (name, data))
        # 該函數就是用來記錄一共給消費方多少數據了 就是get次數
        queue.task_done()


if __name__ == '__main__':
    # 創建隊列
    q = JoinableQueue()
    p1 = Process(target=make_hotdog, args=(q, "musibii的熱狗店"))
    p2 = Process(target=make_hotdog, args=(q, "egon的熱狗店"))
    p3 = Process(target=make_hotdog, args=(q, "eureka的熱狗店"))

    c1 = Process(target=eat_hotdog, args=(q, "thales"))
    c2 = Process(target=eat_hotdog, args=(q, "maffia"))

    p1.start()
    p2.start()
    p3.start()

    # 將消費者作為主進程的守護進程
    c1.daemon = True
    c2.daemon = True

    c1.start()
    c2.start()

    # 讓主進程等三家店全都做完后....
    p1.join()
    p2.join()
    p3.join()

    # 如何知道生產方生產完了 並且 消費方也吃完了
    # 方法一:等待做有店都做完熱狗在放None
    # # 添加結束標志   注意這種方法有幾個消費者就加幾個None 不太合適 不清楚將來有多少消費者
    # q.put(None)
    # q.put(None)

    # 主進程等到隊列結束時再繼續  那隊列什么時候算結束? 生產者已經生產完了 並且消費者把數據全取完了
    q.join()  # 已經明確生產放一共有多少數據

    # 現在 需要知道什么時候做完熱狗了 生產者不知道  消費者也不知道
    # 只有隊列知道

    print("主進程over")
    # 生產方不生產了 然而消費方不知道 所以一直等待  get函數阻塞
    # 三家店都放了一個空表示沒熱狗了  但是消費者只有兩個 他們只要看見None 就認為沒有了
    # 於是進程也就結束了  造成一些數據沒有被處理

運行結果:

查看 JoinableQueue 類方法 task_done 源碼:

看不懂.........


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM