Python Day 32 並發編程 (守護進程, 進程同步(multiprocessing.Lock、multiprocessing.Semaphore、multiprocessing.Event) 進程間通信 multiprocessing.Queue)


守護進程

注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止(主進程和子進程是異步的),當主進程停止,該守護進程不在繼續執行.守護進程也是一種子進程.

主進程創建守護進程

  其一:守護進程會在主進程代碼執行結束后就終止.(但本質上是在主進程結束之前結束的,主進程需要負責回收資源)

  其二:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children

from multiprocessing import Process
import time
import os

def func(num):
    print(f'{num},pid:{os.getpid()},ppid:{os.getppid()}')
    while True:
        print('is alive')
        time.sleep(0.5)

def wahaha():
    i = 0
    while i < 5:
        i += 1
        print(f'第{i}秒')
        time.sleep(1)


if __name__ == '__main__':
    Process(target=wahaha).start()  #子進程在主進程結束后仍然正常執行
    p = Process(target=func,args=(1,))
    p.daemon = True #主進程結束,該守護進程結束
    p.start()
    time.sleep(3)
    print(f'pid:{os.getpid()},ppid:{os.getppid()}')
    print('主進程結束')
=============================
第1秒
1,pid:8200,ppid:2000
is alive
is alive
第2秒
is alive
is alive
第3秒
is alive
is alive
pid:2000,ppid:7244
主進程結束
第4秒
第5秒
創建守護進程

 

多進程中的方法

  p = Process(target=func,args=(1,)) #創建一個進程對象

       p.start()  啟動一個進程

  p.daemon = True  設置進程為守護進程,隨主進程結束而結束.

  p.is_alive()  判斷進程是否存活,返回bool值

  p.terminate()  發送給操作系統指令,關閉進程

  p.pid()   查看進程pid

from multiprocessing import Process
import time
import os

def func(num):
    print(f'{num},pid:{os.getpid()},ppid:{os.getppid()}')
    while True:
        print('is alive')
        time.sleep(0.5)

def wahaha():
    i = 0
    while i < 10:
        i += 1
        print(f'第{i}秒')
        time.sleep(1)


if __name__ == '__main__':
    p2 = Process(target=wahaha)
    p2.start()
#
    p = Process(target=func,args=(1,))
    p.daemon = True #主進程結束,該子進程結束
    p.start()
    time.sleep(3)
    print(p.is_alive())
    print(p2.is_alive())
    p2.terminate()
    time.sleep(0.1)
    print(p.is_alive())
    print(p2.is_alive())
    print(f'pid:{os.getpid()},ppid:{os.getppid()}')
    print('主進程結束')
    p2.join()
Process方法

 

import socket
from multiprocessing import Process

def talk(conn,addr):
    while True:
        msg_r = conn.recv(1024).decode('utf-8')
        print(addr,msg_r)
        msg_s = 'client{}登陸'.format(addr)
        conn.send(msg_s.encode('utf-8'))
    conn.close()


if __name__ == '__main__':
    sk = socket.socket()
    sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
    sk.bind(('127.0.0.1',8091))
    sk.listen(5)
    try:
        while True:
            conn,addr = sk.accept()
            Process(target=talk,args=(conn,addr)).start()
    finally:
        sk.close()
socket多進程server
import socket


sk = socket.socket()
sk.connect(('127.0.0.1',8091))

while True:
    msg_s = input('請輸入內容:')
    sk.send(msg_s.encode('utf-8'))
    msg_r = sk.recv(1024).decode('utf-8')
    print(msg_r)


sk.close()
socket多進程client

 

進程同步(multiprocessing.Lock、multiprocessing.Semaphore、multiprocessing.Event) 

鎖 multiprocessing.Lock (*****) 

  避免同一段代碼被多個進程同時執行

 

  lock = Lock()  創建鎖對象

  lock.acquire()  查詢鑰匙,如果有就拿走,如果沒有就等待

  lock.release()  歸還鑰匙

  lock可以使用with上下文進行管理(類似於文件讀取)

  with lock:

    print('hello' )

維護數據的安全
降低了程序的效率
所有的效率都是建立在數據安全的角度上的
但凡涉及到並發編程都要考慮數據的安全性
我們需要在並發部分對數據修改的操作格外小心,如果會涉及到數據的不安全,就需要進行加鎖控制

lock acquire release的另外一種用法
lock 內部實現了進程之間的通信,使得誰acquire了誰release了能夠在多個擁有lock參數的子進程中透明
Lock
from multiprocessing import Lock
lock = Lock()  #創建一個鎖對象
lock.acquire() #想拿鑰匙,如果有就拿,沒有就一直等
print('拿到要鑰匙了1')
lock.release() #還鑰匙
lock.acquire() #想拿鑰匙
print('拿到要鑰匙了2')
lock.release() #還鑰匙
開啟一個進程鎖
#db文件內容  {"count": 0}

import json
import time
from multiprocessing import Process,Lock
def search(i):
    f =open('db')
    ticket_dic =json.load(f)
    f.close()
    print(f"{i} 正在查票,剩余票數{ticket_dic['count']}")

def buy(i):
    with open('db') as f: ticket_dic = json.load(f)
    time.sleep(0.2)
    if ticket_dic['count'] > 0:
        ticket_dic['count'] -= 1
        print(f'{i} 買到票了')
        time.sleep(0.2)
        with open('db','w') as f :json.dump(ticket_dic,f)
    else:
        print(f"{i} 太火爆被搶購一空了,剩余票數{ticket_dic['count']}")


# def get_ticket(i,lock):
#     search(i)
#     lock.acquire()
#     buy(i)
#     lock.release()

def get_ticket(i,lock):
    search(i)
    with lock:
        buy(i)

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        p = Process(target=get_ticket,args=(i,lock))
        p.start()
火車票查詢購買

 

 

信號量(標志True False) multiprocessing.Semaphore(***)  (鎖+計數器)

  有多個鑰匙的鎖

互斥鎖同時只允許一個線程更改數據,而信號量Semaphore是同時允許一定數量的線程更改數據 。
假設商場里有4個迷你唱吧,所以同時可以進去4個人,如果來了第五個人就要在外面等待,等到有人出來才能再進去玩。
實現:
信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器為0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。
信號量與進程池的概念很像,但是要區分開,信號量涉及到加鎖的概念
信號量

 

 

  sem = Semaphore(4)  創建鎖對象,4把鑰匙,可以被連續acquire4次

  sem.acquire()  查詢鑰匙,如果有就拿走,如果沒有就等待

  sem.release()  歸還鑰匙

  sem 可以使用with上下文進行管理(類似於文件讀取)

  with sem:

    print('hello' )

from multiprocessing import Semaphore
sem = Semaphore(4) #4把鑰匙
sem.acquire()
print(1)
sem.acquire()
print(2)
sem.release()
sem.acquire()
print(3)
sem.acquire()
print(4)
sem.acquire()
print(5)
sem.acquire()
print(6)
Semaphore(4)
from multiprocessing import Semaphore,Process
import time
import random

# def ktv(sem,i):
#     sem.acquire()
#     print(f'{i}走進ktv')
#     time.sleep(random.randint(1,3))
#     print(f'{i}走出ktv')
#     sem.release()

def ktv(sem,i):
    with sem:
        print(f'{i}走進ktv')
        time.sleep(random.randint(1,3))
        print(f'{i}走出ktv')


if __name__ == '__main__':
    sem = Semaphore(4)
    for i in range(10):
        p = Process(target=ktv,args=(sem,i))
        p.start()
唱吧 ktv

 

事件 multiprocessing.Event(**)

  控制子進程執行還是阻塞的一個機制

  e = Event()  創建一個事件對象

Event方法  在事件中有一個信號(標志)

  wait()   如果這個標志是True wait的執行效果就是pass ,如果是False,wait方法的效果就是阻塞,直到這個標志變成True

  控制標志方法

    is_set()  判斷標志的狀態,返回bool值

    set()  將標志設置為True

    clear()  將標志設置為False

from multiprocessing import Event
e = Event() #阻塞,事件的創建之初標志的狀態是False
print(e.is_set())
e.set() #將標志改為True
print(e.is_set())
e.wait() #當標志為True是pass,不阻塞
創建一個事件
from multiprocessing import Event,Process
import time

def func1(e):
    print('start func1')
    print(e.is_set())  #事件創建之初是False
    e.wait(1)  #不修改狀態(網絡測試,發送短信,發送郵件),超時后繼續執行,不繼續阻塞
    print(e.is_set())  
    e.wait()  #持續阻塞
    print(e.is_set()) #主進程3(異步)s后修改信號標志為True ,繼續執行
    print('end func1')

if __name__ == '__main__':
    e = Event()
    Process(target=func1,args=(e,)).start()
    time.sleep(3)
    e.set()
事件的控制
from multiprocessing import Event,Process
import time
import random

def tarffic_light(e):
    while True:
        while e.is_set():
            print('\033[1;32m綠燈亮\033[0m')
            time.sleep(2)
            e.clear()
        else:
            print('\033[1;31m紅燈亮\033[0m')
            time.sleep(2)
            e.set()


def car(i,e):
    while not e.is_set():
        print(f'{i}正在等待通過...')
        e.wait()
    else:
        print(f'{i}通過.')

if __name__ == '__main__':
    e = Event()
    light = Process(target=tarffic_light,args=(e,))
    light.daemon =True
    light.start()
    car_list = []
    for i in range(1,21):
        p = Process(target=car,args=(i,e))
        car_list.append(p)
        p.start()
        time.sleep(random.randint(0,3))
    for i2 in car_list:i2.join()  #控制子進程先執行完畢
    print('執行完啦')
while版紅綠燈
import time
import random
from multiprocessing import Process,Event
def traffic_light(e):
    print('\033[1;31m紅燈亮\033[0m')
    while True:
        time.sleep(2)
        if e.is_set():
            print('\033[1;31m紅燈亮\033[0m')
            e.clear()
        else:
            print('\033[1;32m綠燈亮\033[0m')
            e.set()

def car(i,e):
    if not e.is_set():
        print('car%s正在等在通過'%i)
        e.wait()
    print('car%s通過'%i)

if __name__ == '__main__':
    e = Event()
    light = Process(target=traffic_light,args=(e,))
    light.daemon = True
    light.start()
    car_lst = []
    for i in range(20):
        p = Process(target=car,args=(i,e))
        p.start()
        time.sleep(random.randint(0,3))
        car_lst.append(p)
    for car in car_lst:car.join()
if 紅綠燈

說明:紅綠燈的的變化和汽車的通行是兩個獨立的進程,汽車通過對紅綠燈的事件信號的查詢判斷等待和放行,每一個汽車都是獨立的進程

 

進程間通信(進程之間數據共享)

  進程間通信 IPC(Inter-Process Communication)

隊列 multiprocessing.Queue  (先進先出)  隊列是基於(管道+鎖)實現的

創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。 

Queue([maxsize]) 
創建共享的進程隊列。
參數 :maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。

  q =Queue()  創建一個隊列   q = Queue(5) 隊列長度為5

  q.put(1)  向隊列中放一個數據,可以是int list dict ... 當隊列滿時會阻塞

  q.get()  從隊列中獲取一個數據  沒有值會一直阻塞

  q.empty()  判斷隊列是否為空 返回bool值   多進程時不准 ,如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。

  q.full()  判斷隊列是否已滿 返回bool值  多進程時不准 由於線程的存在,結果也可能是不可靠的

  q.qsize()   返回隊列中目前項目的正確數量。此函數的結果並不可靠,因為在返回結果和在稍后程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引發NotImplementedError異常。

  q.close()   關閉隊列,防止隊列中加入更多數據。調用此方法時,后台線程將繼續寫入那些已入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。

  q.cancel_join_thread()   不會再進程退出時自動連接后台線程。這可以防止join_thread()方法阻塞。  

  q.join_thread()   連接隊列的后台線程。此方法用於在調用q.close()方法后,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread()方法可以禁止這種行為。

 

 

q = Queue(3)
try:
    q.get_nowait()
except:
    print('隊列中沒有值')
q.get_nowait()
q = Queue(3)
q.put(1)
q.put('aaa')
q.put([1,2,3])
# q.put('alex')   #隊列滿會阻塞
try:
    q.put_nowait('alex')
except:
    print('丟失了一個數據')
q.put_nowait()
創建一個隊列
from multiprocessing import Process,Queue
def func(num,q):
    q.put({num:num**num})

if __name__ == '__main__':
    q = Queue()
    # p = Process(target=func, args=(10,q))
    # p.start()
    # print(q.get())
    for i in range(10):
        p = Process(target=func,args=(i,q))
        p.start()
    for i in range(10):
        print(q.get())


=============
{0: 1}
{1: 1}
{3: 27}
{4: 256}
{2: 4}
{5: 3125}
{9: 387420489}
{8: 16777216}
{6: 46656}
{7: 823543}
使用Queue隊列特性使用put,get解決返回值問題

 

 

 

生產者消費者模型
包子的故事

效率問題

生產者 托盤 消費者



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM