守護進程
注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止(主進程和子進程是異步的),當主進程停止,該守護進程不在繼續執行.守護進程也是一種子進程.
主進程創建守護進程
其一:守護進程會在主進程代碼執行結束后就終止.(但本質上是在主進程結束之前結束的,主進程需要負責回收資源)
其二:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children

from multiprocessing import Process import time import os def func(num): print(f'{num},pid:{os.getpid()},ppid:{os.getppid()}') while True: print('is alive') time.sleep(0.5) def wahaha(): i = 0 while i < 5: i += 1 print(f'第{i}秒') time.sleep(1) if __name__ == '__main__': Process(target=wahaha).start() #子進程在主進程結束后仍然正常執行 p = Process(target=func,args=(1,)) p.daemon = True #主進程結束,該守護進程結束 p.start() time.sleep(3) print(f'pid:{os.getpid()},ppid:{os.getppid()}') print('主進程結束') ============================= 第1秒 1,pid:8200,ppid:2000 is alive is alive 第2秒 is alive is alive 第3秒 is alive is alive pid:2000,ppid:7244 主進程結束 第4秒 第5秒
多進程中的方法
p = Process(target=func,args=(1,)) #創建一個進程對象
p.start() 啟動一個進程
p.daemon = True 設置進程為守護進程,隨主進程結束而結束.
p.is_alive() 判斷進程是否存活,返回bool值
p.terminate() 發送給操作系統指令,關閉進程
p.pid() 查看進程pid

from multiprocessing import Process import time import os def func(num): print(f'{num},pid:{os.getpid()},ppid:{os.getppid()}') while True: print('is alive') time.sleep(0.5) def wahaha(): i = 0 while i < 10: i += 1 print(f'第{i}秒') time.sleep(1) if __name__ == '__main__': p2 = Process(target=wahaha) p2.start() # p = Process(target=func,args=(1,)) p.daemon = True #主進程結束,該子進程結束 p.start() time.sleep(3) print(p.is_alive()) print(p2.is_alive()) p2.terminate() time.sleep(0.1) print(p.is_alive()) print(p2.is_alive()) print(f'pid:{os.getpid()},ppid:{os.getppid()}') print('主進程結束') p2.join()

import socket from multiprocessing import Process def talk(conn,addr): while True: msg_r = conn.recv(1024).decode('utf-8') print(addr,msg_r) msg_s = 'client{}登陸'.format(addr) conn.send(msg_s.encode('utf-8')) conn.close() if __name__ == '__main__': sk = socket.socket() sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) sk.bind(('127.0.0.1',8091)) sk.listen(5) try: while True: conn,addr = sk.accept() Process(target=talk,args=(conn,addr)).start() finally: sk.close()

import socket sk = socket.socket() sk.connect(('127.0.0.1',8091)) while True: msg_s = input('請輸入內容:') sk.send(msg_s.encode('utf-8')) msg_r = sk.recv(1024).decode('utf-8') print(msg_r) sk.close()
進程同步(multiprocessing.Lock、multiprocessing.Semaphore、multiprocessing.Event)
鎖 multiprocessing.Lock (*****)
避免同一段代碼被多個進程同時執行
lock = Lock() 創建鎖對象
lock.acquire() 查詢鑰匙,如果有就拿走,如果沒有就等待
lock.release() 歸還鑰匙
lock可以使用with上下文進行管理(類似於文件讀取)
with lock:
print('hello' )

維護數據的安全
降低了程序的效率
所有的效率都是建立在數據安全的角度上的
但凡涉及到並發編程都要考慮數據的安全性
我們需要在並發部分對數據修改的操作格外小心,如果會涉及到數據的不安全,就需要進行加鎖控制
lock acquire release的另外一種用法
lock 內部實現了進程之間的通信,使得誰acquire了誰release了能夠在多個擁有lock參數的子進程中透明

from multiprocessing import Lock lock = Lock() #創建一個鎖對象 lock.acquire() #想拿鑰匙,如果有就拿,沒有就一直等 print('拿到要鑰匙了1') lock.release() #還鑰匙 lock.acquire() #想拿鑰匙 print('拿到要鑰匙了2') lock.release() #還鑰匙

#db文件內容 {"count": 0} import json import time from multiprocessing import Process,Lock def search(i): f =open('db') ticket_dic =json.load(f) f.close() print(f"{i} 正在查票,剩余票數{ticket_dic['count']}") def buy(i): with open('db') as f: ticket_dic = json.load(f) time.sleep(0.2) if ticket_dic['count'] > 0: ticket_dic['count'] -= 1 print(f'{i} 買到票了') time.sleep(0.2) with open('db','w') as f :json.dump(ticket_dic,f) else: print(f"{i} 太火爆被搶購一空了,剩余票數{ticket_dic['count']}") # def get_ticket(i,lock): # search(i) # lock.acquire() # buy(i) # lock.release() def get_ticket(i,lock): search(i) with lock: buy(i) if __name__ == '__main__': lock = Lock() for i in range(10): p = Process(target=get_ticket,args=(i,lock)) p.start()
信號量(標志True False) multiprocessing.Semaphore(***) (鎖+計數器)
有多個鑰匙的鎖

互斥鎖同時只允許一個線程更改數據,而信號量Semaphore是同時允許一定數量的線程更改數據 。 假設商場里有4個迷你唱吧,所以同時可以進去4個人,如果來了第五個人就要在外面等待,等到有人出來才能再進去玩。 實現: 信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器為0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。 信號量與進程池的概念很像,但是要區分開,信號量涉及到加鎖的概念
sem = Semaphore(4) 創建鎖對象,4把鑰匙,可以被連續acquire4次
sem.acquire() 查詢鑰匙,如果有就拿走,如果沒有就等待
sem.release() 歸還鑰匙
sem 可以使用with上下文進行管理(類似於文件讀取)
with sem:
print('hello' )

from multiprocessing import Semaphore sem = Semaphore(4) #4把鑰匙 sem.acquire() print(1) sem.acquire() print(2) sem.release() sem.acquire() print(3) sem.acquire() print(4) sem.acquire() print(5) sem.acquire() print(6)

from multiprocessing import Semaphore,Process import time import random # def ktv(sem,i): # sem.acquire() # print(f'{i}走進ktv') # time.sleep(random.randint(1,3)) # print(f'{i}走出ktv') # sem.release() def ktv(sem,i): with sem: print(f'{i}走進ktv') time.sleep(random.randint(1,3)) print(f'{i}走出ktv') if __name__ == '__main__': sem = Semaphore(4) for i in range(10): p = Process(target=ktv,args=(sem,i)) p.start()
事件 multiprocessing.Event(**)
控制子進程執行還是阻塞的一個機制
e = Event() 創建一個事件對象
Event方法 在事件中有一個信號(標志)
wait() 如果這個標志是True wait的執行效果就是pass ,如果是False,wait方法的效果就是阻塞,直到這個標志變成True
控制標志方法
is_set() 判斷標志的狀態,返回bool值
set() 將標志設置為True
clear() 將標志設置為False

from multiprocessing import Event e = Event() #阻塞,事件的創建之初標志的狀態是False print(e.is_set()) e.set() #將標志改為True print(e.is_set()) e.wait() #當標志為True是pass,不阻塞

from multiprocessing import Event,Process import time def func1(e): print('start func1') print(e.is_set()) #事件創建之初是False e.wait(1) #不修改狀態(網絡測試,發送短信,發送郵件),超時后繼續執行,不繼續阻塞 print(e.is_set()) e.wait() #持續阻塞 print(e.is_set()) #主進程3(異步)s后修改信號標志為True ,繼續執行 print('end func1') if __name__ == '__main__': e = Event() Process(target=func1,args=(e,)).start() time.sleep(3) e.set()

from multiprocessing import Event,Process import time import random def tarffic_light(e): while True: while e.is_set(): print('\033[1;32m綠燈亮\033[0m') time.sleep(2) e.clear() else: print('\033[1;31m紅燈亮\033[0m') time.sleep(2) e.set() def car(i,e): while not e.is_set(): print(f'{i}正在等待通過...') e.wait() else: print(f'{i}通過.') if __name__ == '__main__': e = Event() light = Process(target=tarffic_light,args=(e,)) light.daemon =True light.start() car_list = [] for i in range(1,21): p = Process(target=car,args=(i,e)) car_list.append(p) p.start() time.sleep(random.randint(0,3)) for i2 in car_list:i2.join() #控制子進程先執行完畢 print('執行完啦')

import time import random from multiprocessing import Process,Event def traffic_light(e): print('\033[1;31m紅燈亮\033[0m') while True: time.sleep(2) if e.is_set(): print('\033[1;31m紅燈亮\033[0m') e.clear() else: print('\033[1;32m綠燈亮\033[0m') e.set() def car(i,e): if not e.is_set(): print('car%s正在等在通過'%i) e.wait() print('car%s通過'%i) if __name__ == '__main__': e = Event() light = Process(target=traffic_light,args=(e,)) light.daemon = True light.start() car_lst = [] for i in range(20): p = Process(target=car,args=(i,e)) p.start() time.sleep(random.randint(0,3)) car_lst.append(p) for car in car_lst:car.join()
說明:紅綠燈的的變化和汽車的通行是兩個獨立的進程,汽車通過對紅綠燈的事件信號的查詢判斷等待和放行,每一個汽車都是獨立的進程
進程間通信(進程之間數據共享)
進程間通信 IPC(Inter-Process Communication)
隊列 multiprocessing.Queue (先進先出) 隊列是基於(管道+鎖)實現的
創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。
Queue([maxsize]) 創建共享的進程隊列。 參數 :maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。 底層隊列使用管道和鎖定實現。
q =Queue() 創建一個隊列 q = Queue(5) 隊列長度為5
q.put(1) 向隊列中放一個數據,可以是int list dict ... 當隊列滿時會阻塞
q.get() 從隊列中獲取一個數據 沒有值會一直阻塞
q.empty() 判斷隊列是否為空 返回bool值 多進程時不准 ,如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。
q.full() 判斷隊列是否已滿 返回bool值 多進程時不准 由於線程的存在,結果也可能是不可靠的
q.qsize() 返回隊列中目前項目的正確數量。此函數的結果並不可靠,因為在返回結果和在稍后程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引發NotImplementedError異常。
q.close() 關閉隊列,防止隊列中加入更多數據。調用此方法時,后台線程將繼續寫入那些已入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。
q.cancel_join_thread() 不會再進程退出時自動連接后台線程。這可以防止join_thread()方法阻塞。
q.join_thread() 連接隊列的后台線程。此方法用於在調用q.close()方法后,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread()方法可以禁止這種行為。

q = Queue(3) try: q.get_nowait() except: print('隊列中沒有值')

q = Queue(3) q.put(1) q.put('aaa') q.put([1,2,3]) # q.put('alex') #隊列滿會阻塞 try: q.put_nowait('alex') except: print('丟失了一個數據')


from multiprocessing import Process,Queue def func(num,q): q.put({num:num**num}) if __name__ == '__main__': q = Queue() # p = Process(target=func, args=(10,q)) # p.start() # print(q.get()) for i in range(10): p = Process(target=func,args=(i,q)) p.start() for i in range(10): print(q.get()) ============= {0: 1} {1: 1} {3: 27} {4: 256} {2: 4} {5: 3125} {9: 387420489} {8: 16777216} {6: 46656} {7: 823543}
生產者消費者模型
包子的故事
效率問題
生產者 托盤 消費者