鎖和隊列


1、鎖multiprocessing-Lock

鎖的應用場景:當多個進程需要操作同一個文件/數據的時候;

當多個進程使用同一份數據資源的時候,就會引發數據安全或順序混亂問題。

為保證數據的安全性,多進程中只有去操作一些進程之間可以共享的數據資源的時候才需要進行加鎖;

枷鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務進行修改,即串行的修改,沒錯速度式慢了,但犧牲了速度卻保證了數據的安全;

模擬查票搶票:

 

import json
import time
from multiprocessing import Process,Lock

"""
模擬查票/搶票
查票是不需要排隊的,可以使用多進程,每個人都可以在同一時間查看
買票為避免多個進程操作同一個文件造成數據不安全,使用鎖來避免多個進程同時修改同一個文件
"""
def search_ticket(name):
    with open('piao',encoding='utf-8') as f:
        dic = json.load(f)
        print('%s查看余票為%s張票'%(name,dic['count']))

def buy_ticket(name):
    with open('piao', encoding='utf-8') as f:
        dic = json.load(f)
    time.sleep(2)
    if dic['count'] >= 1:
        print('%s買到票了'%name)
        dic['count'] -= 1
        time.sleep(2)
        with open('piao','w',encoding='utf-8') as f:
            json.dump(dic,f)
    else:
        print('余票為0 %s沒有買到票'%name)

def user(name,lock):
    search_ticket(name)
    print('%s在等待' % name)
    # lock.acquire()    # 拿鑰匙取數據
    # buy_ticket(name)
    # lock.release()    # 歸還鑰匙
    with lock:
        print('%s開始執行了' % name)
        buy_ticket(name)

if __name__ == '__main__':
    lock = Lock()  # 創建鎖
    lis = ['alex','wusir','taibai','eva_j']
    for name in lis:
        Process(target=user,args=(name,lock)).start()
模擬查票/搶票

 

#加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然可以用文件共享數據實現進程間通信,但問題是:
1.效率低(共享數據基於文件,而文件是硬盤上的數據)
2.需要自己加鎖處理

#因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個進程共享一塊內存的數據)2、幫我們處理好鎖問題。這就是mutiprocessing模塊為我們提供的基於消息的IPC通信機制:隊列和管道。
隊列和管道都是將數據存放於內存中
隊列又是基於(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來,
我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。 

2、進程之間的通信—隊列multiprocessing.Queue

 

進程之間的通信:IPC(Inter-Process Communication)

multiprocessing.Queue可以完成進程之間通信的特殊隊列

from multiprocessing import Queue
def son(q):
    print(q.get())  # 從隊列里面取值
if __name__ == '__main__':
    q = Queue()   # 創建一個隊列
    Process(target=son,args=(q,)).start()
    q.put('wanhaha')   # 往隊列里放值
使用隊列完成主進程與子進程的通信
1、隊列是進程之間通信的安全保障,自帶了鎖
2、隊列是基於文件家族的socket服務實現的
3、管道Pipe也是基於文件家族的socket服務實現的IPC機制
4、隊列 = 管道 +5、管道沒有鎖,不安全

3、生產者消費模型

在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。

為什么要使用生產者和消費者模式

在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什么是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

import time
from multiprocessing import Queue,Process
def producer(q):
    """
    生產者
    :param q:
    :param name:
    :return:
    """
    for i in range(10):
        time.sleep(0.2)
        q.put('泔水%s'%i)
        print('正在生產泔水%s'%i)
def consumer(q,name):
    """
    消費者
    :param q:
    :return:
    """
    while True:
        s = q.get()
        if not s:break
        time.sleep(0.2)
        print('%s正在吃%s'%(name,s))
if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer,args=(q,))
    p1.start()
    c1 = Process(target=consumer,args = (q,'alex'))
    c1.start()
    p1.join()
    q.put(None)
向隊列中添加None來結束消費者中的循環
import time
import random
from multiprocessing import JoinableQueue,Process

def consumer(jq,name):
    while True:
        food = jq.get()
        time.sleep(random.uniform(1,2))
        print('%s吃完%s'%(name,food))
        jq.task_done()

def producer(jq):
    for i in range(10):
        time.sleep(random.random())
        food = '泔水%s'%i
        print('%s生產了%s'%('taibai',food))
        jq.put(food)
    jq.join()

if __name__ == '__main__':
    jq = JoinableQueue(5)
    c1 = Process(target=consumer,args=(jq,'alex'))
    p1 = Process(target=producer,args=(jq,))
    c1.daemon = True  # 守護進程
    c1.start()
    p1.start()
    p1.join()
利用守護進程結束循環

 

邊生產邊吃,供不應求時會get()阻塞住,直到生產者往隊列里放有值才能繼續吃,每吃一次都會向隊列發送一個信號q.task_done()(相當於告訴隊列我吃完一個了)此時隊列中的計數器會減去一,只要隊列中的計數器不為0,程序就會被q.join()阻塞住;


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM