補充知識點:關於查看父進程子進程pid
from multiprocessing import Process
import os
import time
def task():
print("父進程pid:%s,自己的pid:%s" %(os.getppid(),os.getpid()))
time.sleep(30)
if __name__ == '__main__':
p = Process(target=task)
p.start()
print('主進程的pid:%s 主進程的父進程pid:%s' %(os.getpid(),os.getppid()))
"""
Windows系統上查看及殺死父進程的命令:
C:\Users\Administrator>tasklist | findstr 12416
python.exe 12416 Console 1 9,792 K
C:\Users\Administrator>taskkill /F /PID 12416
成功: 已終止 PID 為 12416 的進程。
"""
"""
Linux系統上查看及殺死父進程的命令:
[root@localhost ~]# ps aux | grep 14769
[root@localhost ~]# kill -9 14769
"""
僵屍進程
一個進程使用fork創建子進程,如果子進程退出,而父進程並沒有調用wait或waitpid獲取子進程的狀態信息,那么子進程的進程描述符仍然保存在系統中。這種進程稱之為僵屍進程。詳解如下
** 我們知道在正常情況下子進程是通過父進程創建的,子進程在創建新的進程。子進程的結束和父進程的運行是一個異步過程,即父進程永遠無法預測子進程到底什么時候結束,子進程結束,父進程沒有明確的答復操作系統內核:已收到子進程結束的消息。此時操作系統內核會一直保存該子進程的部分PCB信息,即為僵屍進程**
僵屍進程的危害:占用PCB資源(主要是PID資源),系統將會因為產生大量的僵屍進程,而沒有可用的進程號來產生新進程,導致系統資源不足
因此,UNⅨ提供了一種機制可以保證父進程可以在任意時刻獲取子進程結束時的狀態信息:
-
1、在每個進程退出的時候,內核釋放該進程所有的資源,包括打開的文件,占用的內存等。但是仍然為其保留一定的信息(包括進程號the process ID,退出狀態the termination status of the process,運行時間the amount of CPU time taken by the process等)
-
2、直到父進程通過wait / waitpid來取時才釋放. 但這樣就導致了問題,如果進程不調用wait / waitpid的話,那么保留的那段信息就不會釋放,其進程號就會一直被占用,但是系統所能使用的進程號是有限的,如果大量的產生僵死進程,將因為沒有可用的進程號而導致系統不能產生新的進程. 此即為僵屍進程的危害,應當避免。
任何一個子進程(init除外)在exit()之后,並非馬上就消失掉,而是留下一個稱為僵屍進程(Zombie)的數據結構,等待父進程處理。這是每個子進程在結束時都要經過的階段。如果子進程在exit()之后,父進程沒有來得及處理,這時用ps命令就能看到子進程的狀態是“Z”。如果父進程能及時 處理,可能用ps命令就來不及看到子進程的僵屍狀態,但這並不等於子進程不經過僵屍狀態。 如果父進程在子進程結束之前退出,則子進程將由init接管。init將會以父進程的身份對僵屍狀態的子進程進行處理。
"""
僵屍進程危害場景:
例如有個進程,它定期的產 生一個子進程,這個子進程需要做的事情很少,做完它該做的事情之后就退出了,因此這個子進程的生命周期很短,但是,父進程只管生成新的子進程,至於子進程 退出之后的事情,則一概不聞不問,這樣,系統運行上一段時間之后,系統中就會存在很多的僵死進程,倘若用ps命令查看的話,就會看到很多狀態為Z的進程。 嚴格地來說,僵死進程並不是問題的根源,罪魁禍首是產生出大量僵死進程的那個父進程。因此,當我們尋求如何消滅系統中大量的僵死進程時,答案就是把產生大 量僵死進程的那個元凶槍斃掉(也就是通過kill發送SIGTERM或者SIGKILL信號啦)。槍斃了元凶進程之后,它產生的僵死進程就變成了孤兒進 程,這些孤兒進程會被init進程接管,init進程會wait()這些孤兒進程,釋放它們占用的系統進程表中的資源,這樣,這些已經僵死的孤兒進程 就能瞑目而去了。
"""
# 測試:
# 手動制造一個產生僵屍進程的程序test.py內容如下
# coding:utf-8
from multiprocessing import Process
import time,os
def run():
print('子',os.getpid())
if __name__ == '__main__':
p=Process(target=run)
p.start()
print('主',os.getpid())
time.sleep(1000)
清除僵屍進程的三種解決方法
核心思想:父進程的知道子進程的結束,並且明確的回復操作系統,此時操作系統才能回收資源,避免僵屍進程的產生
-
第一種方法就是結束父進程(一般是主進程)。當父進程退出的時候僵屍進程隨后也會被清除。 當然這個是個暴力的手段,因為我們一般肯定是希望父進程繼續運行的。
-
第二種方法就是通過
wait
調用來讀取子進程退出狀態。比如通過multiprocessing.Process
產出的進程可以通過子進程的join()
方法來wait
,也可以在父進程中處理 SIGCHLD 信號,在處理程序中調用 wait 系統調用或者直接設置為 SIG_IGN 來清除僵屍進程。 -
第三種辦法就說把進程變成孤兒進程,這樣進程就會自動交由 init 進程(pid 為 1 的進程)來處理,一般 init 進程都包含對僵屍進程進行處理的邏輯。(這里有個坑,那就是 docker 容器中一般 pid 為 1 進程就是主程序的進程,而不是我們預期的 init 進程。如果要使用這種方法的話,需要注意一下類似的場景)
孤兒進程
當父進程退出,而它的一個或多個子進程還在運行,那么那些子進程將成為孤兒進程,由於進程不可能脫離進程樹而獨立存在,孤兒進程將被PID為1的init進程所收養,並由init進程對它們完成狀態收集工作。孤兒進程被收養后進行正常的釋放,沒有危害
創建完子進程后,主進程所在的這個腳本就退出了,當父進程先於子進程結束時,子進程會被init收養,成為孤兒進程,而非僵屍進程。演示如下:
import os,sys
import time
pid = os.getpid()
ppid = os.getppid()
print 'im father', 'pid', pid, 'ppid', ppid
pid = os.fork()
# 執行pid=os.fork()則會生成一個子進程
# 返回值pid有兩種值:
# 如果返回的pid值為0,表示在子進程當中
# 如果返回的pid值>0,表示在父進程當中
if pid > 0:
print 'father died..'
sys.exit(0)
# 保證主線程退出完畢
time.sleep(1)
print 'im child', os.getpid(), os.getppid()
# 執行文件,輸出結果:
im father pid 32515 ppid 32015
father died..
im child 32516 1
# 看,子進程已經被pid為1的init進程接收了,所以僵屍進程在這種情況下是不存在的,
# 存在只有孤兒進程而已,孤兒進程聲明周期結束自然會被init來銷毀。
守護進程
守護一個服務,長期駐留在內存中提供服務,不能夠受制於終端;
如何讓一個進程成為守護進程?
主進程創建守護進程
其一:守護進程會在主進程代碼執行結束后就終止
其二:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children
守護進程就是Daemon程序,也稱為精靈進程,是一種在系統后台執行的程序,它獨立於控制終端並且執行一些周期任務或觸發事件,通常被命名為"d"字母結尾,如常見的httpd、syslogd、systemd和dockerd等
注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止
from multiprocessing import Process
import os,time
def task():
print("進程%s開啟" %os.getpid()) # 2.接着會打印進程開啟,可以看到打印結果
time.sleep(10)
print("進程%s結束" %os.getpid()) # 看不到,在time.sleep(10)的時候主進程代碼結束
if __name__ == '__main__':
p = Process(target=task)
p.daemon = True # 調用守護進程
p.start()
print("主:%s" %os.getpid()) # 1.先打印"主",主進程睡三秒的時間足夠守護進程啟動起來
time.sleep(3)
# 主進程代碼運行完畢,守護進程就會結束
from multiprocessing import Process
import time
def foo():
print(123)
time.sleep(1)
print("end123")
def bar():
print(456)
time.sleep(3)
print("end456")
if __name__ == '__main__':
p1=Process(target=foo)
p2=Process(target=bar)
p1.daemon=True
p1.start()
p2.start()
print("main-------")
# 打印可能出現的三種結果:
"""
main-------
456
end456
"""
"""
main-------
123
456
end456
"""
"""
123
main-------
456
end456
"""
互斥鎖(同步鎖)
進程之間數據不共享,但是共享同一套文件系統,所以訪問同一個文件,或同一個打印終端,是沒有問題的,而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理
part1:多個進程共享同一打印終端
# 並發運行,效率高,但競爭同一打印終端,帶來了打印錯亂
from multiprocessing import Process
import os,time
def work():
print("進程%s開啟" %os.getpid())
time.sleep(2)
print("進程%s結束" %os.getpid())
if __name__ == '__main__':
for i in range(3):
p=Process(target=work)
p.start()
# 加鎖:由並發變成了串行,犧牲了運行效率,但避免了競爭
from multiprocessing import Process,Lock
import os,time
def work(lock):
lock.acquire() # 鎖定
print("進程%s開啟" %os.getpid())
time.sleep(2) # 模擬網絡延遲
print("進程%s結束" %os.getpid())
lock.release() # 釋放
if __name__ == '__main__':
lock=Lock() # 創建鎖
for i in range(3):
p=Process(target=work,args=(lock,))
p.start()
加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然可以用文件共享數據實現進程間通信,但問題是:
-
1.效率低(共享數據基於文件,而文件是硬盤上的數據)
-
2.需要自己加鎖處理
隊列
進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的
創建隊列的類(底層就是以管道和鎖定的方式實現):
Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。
參數介紹:
maxsize是隊列中允許最大項數,省略則無大小限制。
方法介紹:
q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。blocked為True(默認值)如果
隊列滿了就鎖住了並且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會
拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。
q.get方法可以從隊列讀取並且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked
為True(默認值),並且timeout為正值,那么在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果
blocked為False有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出
Queue.Empty異常.
q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)
q.empty():調用此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中又加入了項目。
q.full():調用此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中的項目被取走。
q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣
其它方法(了解)
q.cancel_join_thread():不會在進程退出時自動連接后台線程。可以防止join_thread()方法阻塞
q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,后台線程將繼續寫入那些已經入隊列但尚未寫入的
數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中
產生任何類型的數據結束信號或異常。例如,如果某個使用者正在被阻塞在get()操作上,關閉生產者中
的隊列不會導致get()方法返回錯誤。
q.join_thread():連接隊列的后台線程。此方法用於在調用q.close()方法之后,等待所有隊列項被消耗。默認
情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread方法可
以禁 止這種行為
隊列應用
'''
共享內存:
1.管道
tasklist | findstr xxx
ps aux | grep xxx
2.隊列
multiprocessing模塊支持進程間通信的兩種主要形式:管道和隊列
都是基於消息傳遞實現的,但是隊列接口
'''
from multiprocessing import Queue
q = Queue(3) # 創建共享的進程隊列,指定隊列的長度為3,最多放三個值
q.put([1,2,3]) # 放入值到隊列中
q.put({"a":1})
q.put("xxx")
# q.put(1000) # 超出值無法放入
print(q.get())
print(q.get())
print(q.get())
# print(q.get()) # 超值取不到q.get()默認為 q.get(block=True,timeout=None)
# print(q.get(block=True,timeout=3)) # 取不到三秒拋出異常
print(q.get(block=False)) # 取不到值立馬拋異常
生產者消費者模型
在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。
-
為什么要使用生產者和消費者模式
- 在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。
-
什么是生產者消費者模式
- 生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。
生產者消費者模型:
該模型有兩種角色,一種是生產者,另外一種是消費者
生產者負責產生數據,消費者負責取走數據進行處理
生產者與消費者通過隊列通信
優點:解耦合,平衡了生產者的生產力與消費者的處理能力
基於隊列實現生產者消費者模型
import random
import time
from multiprocessing import Process, Queue
def producer(q, name, food):
for i in range(3):
res = "%s%s" % (food, i)
time.sleep(random.randint(1, 3))
q.put(res)
print("%s 生產了 %s" % (name, res))
def consumer(q, name):
while True:
res = q.get()
if res is None:
break
time.sleep(random.randint(1, 3))
print("%s 吃了 %s" % (name, res))
if __name__ == '__main__':
q = Queue() # 創建隊列
p1 = Process(target=producer, args=(q,"廚師1", "包子"))
p2 = Process(target=producer, args=(q,"廚師2", "燒麥"))
p3 = Process(target=producer, args=(q,"廚師3", "饅頭"))
c1 = Process(target=consumer, args=(q, "lxx"))
c2 = Process(target=consumer, args=(q, "hxx"))
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
q.put(None)
q.put(None)
print("主")
基於隊列實現生產者消費者模型加入守護進程
import random
import time
from multiprocessing import Process, JoinableQueue
def producer(q, name, food):
for i in range(3):
res = "%s%s" % (food, i)
time.sleep(random.randint(1, 3))
q.put(res)
print("%s 生產了 %s" % (name, res))
q.join()
def consumer(q, name):
while True:
res = q.get()
if res is None:
break
time.sleep(random.randint(1, 3))
print("%s 吃了 %s" % (name, res))
q.task_done()
if __name__ == '__main__':
q = JoinableQueue()
p1 = Process(target=producer, args=(q,"廚師1", "包子"))
p2 = Process(target=producer, args=(q,"廚師2", "燒麥"))
p3 = Process(target=producer, args=(q,"廚師3", "饅頭"))
c1 = Process(target=consumer, args=(q, "lxx"))
c2 = Process(target=consumer, args=(q, "hxx"))
c1.daemon = True # 主進程結束順便帶走了守護進程
c2.daemon = True
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join() # p1、p2、p3都結束,代表隊列一定被取空
print("主")
生產者消費者模型總結
程序中有兩類角色
一類負責生產數據(生產者)
一類負責處理數據(消費者)
引入生產者消費者模型為了解決的問題是:
平衡生產者與消費者之間的工作能力,從而提高程序整體處理數據的速度
如何實現:
生產者<-->隊列<——>消費者
生產者消費者模型實現類程序的解耦和