知識點一:守護進程
守護進程:p1.daemon=True
守護進程其實就是一個“子進程“,守護=》伴隨
守護進程會伴隨主進程的代碼運行完畢后而死掉
進程:當父進程需要將一個任務並發出去執行,需要將該任務放到以個子進程里
守護:當該子進程內的代碼在父進程代碼運行完畢后就沒有存在的意義了,就應該
將該子進程設置為守護進程,會在父進程代碼結束后死掉
from multiprocessing import Process import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") if __name__ == '__main__': p1=Process(target=foo) p2=Process(target=bar) # 將子進程p1設置為守護進程, # 所以p1會在print("main-------")打印完成后死掉,所以p1進程不會打印 p1.daemon=True p1.start() p2.start() # time.sleep(1) print("main-------")
主進程代碼運行完,但是主進程不會死,還要回收子進程的僵屍進程,
所有守護進程只是守護主進程代碼運行完就會死掉
p.daemon=True #守護進程要放在進程start之前
p.start()
print('主')
(1):正常情況子進程p不會打印,而會直接打印‘主’
(2):個別請況電腦速度快的時候,子類p里面的代碼可能會運行(可能不是全部),然后等
主進程打印輸出后,p會死掉(代碼自上而下運行,操作系統產生並造出子類的速度快的前提下)
知識點二:進程的互斥鎖
互斥鎖:就是將要執行任務的部門代碼(只涉及到修改共享數據的代碼)變成串行
第一步:導入multiprocessing方法下面的Lock類
第二步:在if __name__ == '__main__':方法下面調用Lock類mutex=Lock(),拿到一個對象
第三步:在子類中需要共享的數據前后加入 加鎖:mutex.acquire()——》需要共享修改數據的代碼體《解鎖:——mutex.release()
實驗:實現簡單的搶票功能
分析:1)查看變為並發(實現大家查詢到的車票信息是一致的)
2)購買變為串行(因為購買涉及到了修改數據信息,所以一定要遵循先到先得)
(理解為用鎖來限制,同一時間只能讓一個人拿着鎖去改數據,先搶到鎖的人
就有優先購買的權限)
join:是將子類里面的所有代碼都整體串行(就會將查詢、購買2部分全部都變為串行,延遲更高)
互斥鎖:可以挑選一段代碼(修改數據即買票的那部分)單獨抽出來加鎖
#模擬搶票軟件的原理:
from multiprocessing import Lock,Process import json,os,time,random def check(): #查票功能實現並行訪問 time.sleep(1) with open('db.txt','rt',encoding='utf-8') as f: dic=json.load(f) print('%s 查看余票數為 %s'%(os.getpid(),dic['count'])) def get(): #購票因為牽涉到對后台數據的修改,所以加互斥鎖目的是逐一進行訪問修改,以免數據錯亂 with open('db.txt', 'rt',encoding='utf-8') as f: dic = json.load(f) time.sleep(2) if dic['count'] >0: #有票 dic['count']-=1 time.sleep(random.randint(1, 3)) #在購票時,模擬網絡延遲... with open('db.txt', 'wt',encoding='utf-8') as f: json.dump(dic,f) print('%s 購票成功'%os.getpid()) else: print('%s 沒有余票'%os.getpid()) def task(mutex): #查看(並行訪問) check() #搶票(加入互斥鎖,實現串行訪問,先到先得原則) mutex.acquire() get() mutex.release() #第一個購買完成后,解鎖,后續進入繼續購買 if __name__ == '__main__': mutex=Lock() #調用Lock類拿到一個對象 for i in range(10): p=Process(target=task,args=(mutex,)) p.start()
知識點三:進程間的通信
需求的演變思路:就是為了用一塊兒共享的內存==》實現進程間的共享
必須有2個特點:
1.一定是內存空間
2.能夠自動幫忙處理鎖的問題
IPC機制:
PIPE:管道
Queue:PIPE+鎖(隊列)
注意:
1.隊列占用的是內存空間
2.不應該往隊列中放大數據,應該只存放數據量較小的精簡的內容
重點講隊列:Queue 同樣也是在multiprocessing方法里面導入Queue模塊: from multiprocessing import Queue # q.put()括號里面可以傳多種數據類型 #定義隊列里面存在數據的個數,先進先出原則 # 1.block、time是搭配使用的 # 2.False:一般單獨使用,會立即報錯(打印提示信息) q=Queue(3) # #存數據 # q.put('first') q.put({'k':'second'}) q.put(['two',]) q.put(['three',]) # q.put(4,block=True,timeout=3) #*****如果不加block=True,timeout=3備注,一直是收取的狀態 # #block=True timeout:意思是管道滿了無法再往里面放,並且會等待3秒 # 如果還是滿的狀態,就會報錯(如果不加程序是一直加載狀態不會報錯) print(q.get()) print(q.get()) print(q.get()) # print(q.get(block=True,timeout=2)) #***如果不跟參數,會打印前3個,后卡在那里.. #block=True,timeout=2,加了如果管道里面沒有數據,等待2秒后會報錯(提示信息) #注意無論是收還是發:當block=False的時候,是會直接報錯,(所以False一般不與time搭配使用) # print(q.get(block=False)
知識點四:生產者消費者模型
1.為什么是生產者消費者模型
生產者:比喻的是程序中負責產生數據的任務
消費者:比喻的是程序中負責處理數據的任務
生產者———>共享的介質(隊列)<————消費者
2.為何用
實現生產者與消費者的解耦和,生產者可以不停的生產,消費者也可以不停的消費
從而平衡了生產者的生產能力與消費者消費能力,提升了而整體運行的效率
什么時候用:
當我們程序中存在明顯的兩類任務,一類是負責產生數據,一類是負責處理數據,此時
就應該考慮使用生產者消費者模型來提升程序的效率
第一個初始版本(分析): from multiprocessing import Queue,Process import time import os import random def producer(name,food,q): for i in range(2): res='第%s食物%s'%(i,food) time.sleep(random.randint(1,3)) q.put(res) #往管道傳數據 print('\033[45m%s 生產了 %s\033[0m'%(name,res)) # q.put(None) def consumer(name,q): while True: time.sleep(random.randint(1, 3)) res=q.get() #從管道取數據 if res is None:break print('%s 吃了 %s'%(name,res)) if __name__ == '__main__': q=Queue() p1=Process(target=producer,args=('生產1','包子',q,)) p2=Process(target=producer,args=('生產2','面條',q,)) p3=Process(target=producer,args=('生產3','米飯',q,)) c1=Process(target=consumer,args=('消費1',q,)) c2=Process(target=consumer,args=('消費2',q,)) #開始造食物 p1.start() p2.start() p3.start() #開始造消費者 c1.start() c2.start() #保證食物完全造好到管道里面 p1.join() #保證全部生產完,放入管道 p1.join() p1.join() q.put(None) #保證所有東西都在管道里面 q.put(None) #2個None是並排放在最后面的,為了保證每個用戶進去取完最后取None不至於一致卡在那里接收 print('主程序') #消費者取走了多少,取決於每個消費者網絡延遲各方面的速度,另外管道里面的東西取走就沒了,搜所以同時取,取得多少是不一定的
終極版本:
#終極版本:消費者給生產者發信號,...等隊列結束JoinableQueue,對隊列被取空了 from multiprocessing import Queue,Process,JoinableQueue import time import os import random def producer(name,food,q): for i in range(2): res='第%s食物%s'%(i,food) time.sleep(random.randint(1,3)) q.put(res) #往管道傳數據 print('\033[45m%s 生產了 %s\033[0m'%(name,res)) def consumer(name,q): while True: time.sleep(random.randint(1, 3)) res=q.get() #從管道取數據 if res is None:break print('%s 吃了 %s'%(name,res)) q.task_done() #最終版新增,執行完一次get 就告訴系統有一個數據被取走了 if __name__ == '__main__': # q=Queue() q=JoinableQueue() # p1=Process(target=producer,args=('生產1','包子',q,)) p2=Process(target=producer,args=('生產2','面條',q,)) p3=Process(target=producer,args=('生產3','米飯',q,)) c1=Process(target=consumer,args=('消費1',q,)) c2=Process(target=consumer,args=('消費2',q,)) c1.daemon=True #開始前將子進程設置為守護進程 c2.daemon=True #開始造食物 p1.start() p2.start() p3.start() #開始造消費者 c1.start() c2.start() #保證食物完全造好到管道里面 # p1.join() #保證全部生產完,放入管道 # p1.join() # p1.join() # q.put(None) # q.put(None) #等所有生產者都生產完了,再執行 p1.join() p2.join() p3.join() #這樣就保證一定是隊列里面就不會有新東西再放了... q.join() #等待隊列被取干凈,執行的同時統計隊列里面有多少個值,碰到一個q.task_done()就減1 # 主進程的代碼運行完畢--->(生產者運行完畢)+隊列中的數據也被取干凈了->消費者沒有存在的意義 #等到隊列取空就執行下一行代碼 # print('主程序') #消費者沒結束: #生產者已經結束了,此時就是消費者也已經取完了數據,那么就用到了守護進程
