~~並發編程(五):重要方法~~


進擊のpython

*****

並發編程——重要方法


前面提到的方法都是比較基本的,比較基礎

這個小節我們着重了解一下以下幾個方法:

1.守護進程

2.互斥鎖

3.隊列


守護進程

守護進程,就有點像古時候的封建奴隸制下的皇上與太監,皇上死了,太監就得陪着殉葬,守護着皇上

守護進程就是這樣的關系,當主進程結束,守護進程隨之消失

守護進程一般是怎么用呢?如果我們有兩個進程並發執行,那我們就可以開一個主進程,再開一個子進程

但是呢,如果我們開的這個子進程在主進程結束之后就沒什么用了,那這個時候就建議用守護進程了

p.daemon:默認值為False,如果設為True,代表p為后台運行的守護進程

當p的父進程終止時,p也隨之終止,並且設定為True后,p不能創建自己的新進程

必須在p.start()之前設置

from multiprocessing import Process


def func(name, *args, **kwargs):
    print(f'{name}執行了')
    pass


if __name__ == '__main__':
    p = Process(target=func, args=('子進程',))
    # p.daemon = True
    p.start()
    print('我是主進程... ...')

正常開啟子進程,都不是守護進程,由於聲明之后還要有一段時間才能執行子進程

所以先打印 “我是主進程... ...” ,再打印 “子進程被執行了”

但是當我把注釋的語句打開,p就變成了守護進程(注意,是在start上面)

由於還是主進程會先打印,就會導致,子進程還沒打印就陪葬了

就只會打印“我是主進程... ...”


互斥鎖

首先啊,原則上進程之間是不能夠相互通信的,但是捏,進程之間還是共享一套文件資源的

所以訪問同一個文件或者同一個打印終端,是沒有問題的

但是計算機是貪婪地,有共享,就會有競爭

有競爭,如果沒有限制,沒有秩序,就會混亂

import time
from multiprocessing import Process


def func(name, *args, **kwargs):
    print(f'{name}正在執行.. ..')
    time.sleep(2)
    print(f'{name}執行完畢!')

    pass


if __name__ == '__main__':
    for i in range(3):
        p = Process(target=func, args=(f'進程{i}',))
        p.start()

我想達到的效果是不是應該是

進程0正在執行.. ..
進程0執行完畢!
進程1正在執行.. ..
進程1執行完畢!
進程2正在執行.. ..
進程2執行完畢!

而實際情況打印的是:

進程1正在執行.. ..
進程0正在執行.. ..
進程2正在執行.. ..
進程1執行完畢!
進程0執行完畢!
進程2執行完畢!

這就出現了競爭(多打印幾次,還會出現別的結果)

就好像你去上廁所,但是大家都不排隊,你擠進去了,別人還往里擠

還有的時候開門看看你,所以為了解決這個問題,我們一般都在衛生間加把鎖

上廁所的時候鎖一下門,我不完事,誰也別想進來

是對於程序也是,加一個鎖,就讓程序變得有序了

其實也就相當於把並發變成了串行,誠然降低了效率,但是保證了數據的安全

import time
from multiprocessing import Process, Lock


def func(name, lock, *args, **kwargs):
    lock.acquire()
    print(f'{name}正在執行.. ..')
    time.sleep(2)
    print(f'{name}執行完畢!')
    lock.release()
    pass


if __name__ == '__main__':
    lock = Lock()
    for i in range(3):
        p = Process(target=func, args=(f'進程{i}', lock))
        p.start()

lock = Lock(),對象的實例化,實例化出一個鎖

然后鎖,有關鎖(acquire)就有開鎖(release)

這樣就保證某個子進程在用打印的時候,別的子進程只能在外面等待,就達到了我們的目的

搶票系統

這個經典的例子也可以通過這個搶票系統來幫助理解

首先所有人都可以查票,如果有票就可以買

我們可以先寫一下

import json

from multiprocessing import Process


def search(name, *args, **kwargs):
    msg = json.load(open('db.json'))
    print(f'{name}查看票數,當前有{msg["count"]}張')
    pass


def get(name, *args, **kwargs):
    msg = json.load(open('db.json'))
    if msg["count"] > 0:
        msg["count"] -= 1
        json.dump(msg, open('db.json', 'w'))
        print(f'{name}買到票了,當前有{msg["count"]}張')


def main(name):
    search(name)
    get(name)


if __name__ == '__main__':
    for i in range(1, 5):
        p = Process(target=main, args=(f"第{i}個人",))
        p.start()

json.dump({'count': 1}, open("db.json", 'w'))

執行一下

第2個人查看票數,當前有1張
第2個人買到票了,當前有0張
第4個人查看票數,當前有1張
第3個人查看票數,當前有1張
第4個人買到票了,當前有0張
第3個人買到票了,當前有0張
第1個人查看票數,當前有1張
第1個人買到票了,當前有0張

問題出現了,只有一張票,但是我們卻賣出去四張

這是不對的,應該是查詢大家一起查,買票只能一個一個買

所以可以這樣:

import json
import time

from multiprocessing import Process, Lock


def search(name, *args, **kwargs):
    msg = json.load(open('db.json'))
    print(f'{name}查看票數,當前有{msg["count"]}張')
    pass


def get(name,lock, *args, **kwargs):
    lock.acquire()
    msg = json.load(open('db.json'))
    if msg["count"] > 0:
        msg["count"] -= 1
        json.dump(msg, open('db.json', 'w'))
        print(f'{name}買到票了,當前有{msg["count"]}張')
    else:
        print(f'{name}沒買到票!')
    lock.release()
def main(name,lock):
    time.sleep(1) # 模擬網絡延遲
    search(name)
    get(name,lock)


if __name__ == '__main__':
    lock = Lock()

    for i in range(1, 5):
        p = Process(target=main, args=(f"第{i}個人",lock))
        p.start()

json.dump({'count': 1}, open("db.json", 'w'))

輸出結果就是我們想的那樣了,只有一個人買到了

第2個人查看票數,當前有1張
第2個人買到票了,當前有0張
第4個人查看票數,當前有0張
第4個人沒買到票!
第3個人查看票數,當前有0張
第3個人沒買到票!
第1個人查看票數,當前有0張
第1個人沒買到票!

橫向對比join方法,在剛開始的時候我們也利用join方法進行了等待的操作

我們看看用join怎么做

import json
import time

from multiprocessing import Process, Lock


def search(name, *args, **kwargs):
    msg = json.load(open('db.json'))
    print(f'{name}查看票數,當前有{msg["count"]}張')
    pass


def get(name, lock, *args, **kwargs):
    lock.acquire()
    msg = json.load(open('db.json'))
    if msg["count"] > 0:
        msg["count"] -= 1
        json.dump(msg, open('db.json', 'w'))
        print(f'{name}買到票了,當前有{msg["count"]}張')
    else:
        print(f'{name}沒買到票!')
    lock.release()


def main(name, lock):
    time.sleep(1)  # 模擬網絡延遲
    search(name)
    get(name, lock)


if __name__ == '__main__':
    lock = Lock()

    for i in range(1, 5):
        p = Process(target=main, args=(f"第{i}個人", lock))
        p.start()
        p.join()

發現也可以達到我哦們想要的效果,但是查票的過程也變成串行了

很明顯大家查票時應該是並發地去查詢而無需考慮數據准確與否

此時join與互斥鎖的區別就顯而易見了:

join是將一個任務整體串行,而互斥鎖的好處則是可以將一個任務中的某一段代碼串行

比如只讓main函數中的get任務串行


隊列

進程彼此之間互相隔離,要實現進程間通信(IPC)

multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的

是消息傳遞,不是大數據!

看一下隊列中有什么

class Queue(object):
    def __init__(self, maxsize=-1):
        self._maxsize = maxsize

maxsize:可放入的允許的最大項數,如果不設置,默認無限

​ 但是隊列是基於內存的,真正的無限其實取決於內存

 def qsize(self):
        return 0

    def empty(self):
        return False

    def full(self):
        return False

    def put(self, obj, block=True, timeout=None):
        pass

    def put_nowait(self, obj):
        pass

    def get(self, block=True, timeout=None):
        pass

    def get_nowait(self):
        pass

    def close(self):
        pass

    def join_thread(self):
        pass

    def cancel_join_thread(self):
        pass

隊列的方法大概是上述這些

put():往隊列里放信息

get():從隊列里拿數據

empty():判斷隊列是否為空

full():判斷隊列滿沒滿

同時也要記住,隊列是“先進先出”

當隊列滿了,再往里放,就會阻塞

當隊列空了,再往外取,也會阻塞

from multiprocessing import Queue

q = Queue(3)
q.put(1)
q.put(2)
q.put(3)
print(q.full())

print(q.get())
print(q.get())
print(q.get())
print(q.empty())

可以看到打印出來的是1 2 3

因為我們放的順序就是1 2 3


*****
*****


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM