在Python中,由於全局解釋器鎖GIL的存在,使得Python中的多線程並不能大大提高程序的運行效率(這里單指CPU密集型),那么在處理CPU密集型計算時,多用多進程模型來處理,而Python標准庫中提供了multiprocessing庫來支持多進程模型的編程。multiprocessing中提供了的Process類用於開發人員編寫創建子進程,接口類似於標准庫提供的threading.Thread類,還提供了進程池Pool類,減少進程創建和銷毀帶來開銷,用以提高復用(見前文)。
在多線程模型中,默認情況下(sub-Thread.daemon=False)主線程會等待子線程退出后再退出,而如果sub-Thread.setDaemon(True)時,主線程不會等待子線程,直接退出,而此時子線程會隨着主線程的對出而退出,避免這種情況,主線程中需要對子線程進行join,等待子線程執行完畢后再退出。對應的,在多進程模型中,Process類也有daemon屬性,而它表示的含義與Thread.daemon類似,當設置sub-Process.daemon=True時,主進程中需要對子進程進行等待,否則子進程會隨着主進程的退出而退出:
1 import threading 2 import time 3 import multiprocessing 4 5 6 def fun(args): 7 for i in range(100): 8 print args 9 time.sleep(1) 10 11 12 if __name__ == '__main__': 13 threads = [] 14 for i in range(4): 15 # t = threading.Thread(target=fun, args=(str(i),)) 16 # t.setDaemon(True) 17 t = multiprocessing.Process(target=fun, args=(str(i),)) 18 t.daemon = True 19 t.start() 20 threads.append(t) 21 22 for i in threads: 23 i.join()
運行上面的代碼,主進程會等待子進程執行結束后退出,整個程序結束。line15、16為多線程模式,運行效果和多進程相似。這里說的相似表示的是程序運行正常的情況下,而當有人為的干擾時,例如在進程啟動之后,通過kill -9將進程殺死時,情況就不同了,我們知道多線程模型再復雜,也只是在同一個進程中,殺死主進程,所有的線程都會隨着主進程的退出而退出,而多進程模型中,每個進程都是獨立的,在殺死主進程之后,其他子進程並不會受到影響,還會繼續運行,上面的代碼中進程的target函數很簡單,只進行了有限次數的循環輸出,而在真是的場景,子進程可能會始終在loop處理業務,而如果在子進程被殺死后,沒有有效回收子進程,需要人工的殺死,這樣的話就比較麻煩。注意,在python官方文檔中有聲明,對於deamon=True的子進程:When a process exits, it attempts to terminate all of its daemonic child processes,這里的exits表示的是進程正常結束,而如果父進程在運行中非正常退出,比如前面提到的kill -9命令直接殺死,它並沒有機會去回收子進程。
對於這種情況,首先想到的是用信號signal來處理,這樣一來,在殺死主進程時就不能再用kill -9命令了,因為kill -9命令表示向進程發送SIGKILL命令,而在系統中,SIGKILL和SIGSTOP兩種信號,進程是無法捕獲的,收到后會立即退出。在linux下執行kill -l,可以看到全部的信號量,這里使用SIGTERM信號,SIGTERM表示終止信號,是kill命令傳送的系統默認信號,它與SIGKIIL的區別是,SIGTERM更為友好,進程能捕捉SIGTERM信號,進而根據需要來做一些清理工作,明確了這點之后,對上面的代碼進行一些修改:
1 processes = [] 2 def fun(x): 3 print 'current sub-process pid is %s' % os.getpid() 4 while True: 5 print 'args is %s ' % x 6 time.sleep(1) 7 8 9 def term(sig_num, addtion): 10 print 'terminate process %d' % os.getpid() 11 try: 12 print 'the processes is %s' % processes 13 for p in processes: 14 print 'process %d terminate' % p.pid 15 p.terminate() 16 # os.kill(p.pid, signal.SIGKILL) 17 except Exception as e: 18 print str(e) 19 20 21 if __name__ == '__main__': 22 print 'current pid is %s' % os.getpid() 23 for i in range(3): 24 t = Process(target=fun, args=(str(i),)) 25 t.daemon = True 26 t.start() 27 processes.append(t) 28 signal.signal(signal.SIGTERM, term) 29 try: 30 for p in processes: 31 p.join() 32 except Exception as e: 33 print str(e)
運行上面的代碼,輸出主進程id,然后通過kill -15 pid向主進程發送SIGTERM信號,主進程退出之前,會將子進程也terminate掉。但是退出時,line32會捕獲到異常信息OSError: [Errno 4] Interrupted system call,表示主進程在對子進程進行join時,被信號中斷並退出。程序得到了預期的結果,在向主進程發送SIGTERM信號時,首先結束所有子進程,之后主進程退出。接着使用kill -15加上子進程的進程id,向子進程發送SIGTERM信號,看看子進程是否能夠得到同樣的效果,然而在向進程發送信號之后,並未進入term函數,通過ps可以看到,子進程收到SIGTERM信號之后,自行退出,而主進程和其他子進程沒有受到影響,依然正常運行,這里並沒有得到相同的效果。我們知道,子進程會繼承父進程的信號處理機制,但是這里子進程在收到SIGTERM信號后,沒有運行term函數,仔細觀察上面的代碼示例,注意到在注冊信號處理函數時,子進程已經啟動,所以這里子進程並沒有注冊信號處理函數,接着,我們對主進程進行修改,確保子進程啟動前,進行信號處理函數的注冊:
1 if __name__ == '__main__': 2 signal.signal(signal.SIGTERM, term) 3 print 'current main-process pid is %s' % os.getpid() 4 for i in range(3): 5 t = Process(target=fun, args=(str(i),)) 6 t.daemon = True 7 t.start() 8 processes.append(t) 9 10 try: 11 for p in processes: 12 p.join() 13 except Exception as e: 14 print str(e)
再次運行程序,通過kill -15向父進程發送SIGTERM信號,這時進程收到了信號,但是程序仍然繼續運行,觀察下面的輸出信息,主進程收到信號后,執行了term函數,並且通過調用子進程的p.terminate(),注意在linux系統下terminate的實現方式有: Terminate the process. On Unix this is done using the SIGTERM signal,也就是說,當對子進程調用p.terminate()時,實際上仍是向子進程發送SIGTERM信號,之前我們已經將信號處理函數的注冊放置子進程啟動前,使得子進程也能夠執行信號處理函數。從輸出的processes信息可以看到,由於啟動順序,全局的processes變量並沒有對子進程信息進行很好地共享。在收到由p.terminate()發送的信號量之后,子進程執行term函數,會再次通過調用p.terminate()來試圖殺死子進程,這樣就會進入一個無限的循環,kill -15向子進程發送SIGTERM信號,會得到相同的結果。
至此,基本了解了如何向主進程發送信號量來結束主進程和其子進程的方法,那么有沒有什么方式可以通過向子進程發送信號,取得同樣的效果呢?答案是肯定的,當我們在主進程中創建子進程時,主進程與其創建的子進程隸屬於同一個分組里,這個分組的概念在linux中成為進程組,它是一個或多個進程的組成的集合,同一個進程組中的進程,它們的進程組ID是一致的。利用python標准庫中os.getpgid方法,通過進程的ID來獲取進程對應的組ID,接着調用os.killpg方法,向進程的組ID發送信號,現在對上面的代碼進行簡單修改:
1 def fun(x): 2 print 'current pid is %s, group id is %s' % (os.getpid(), os.getpgrp()) 3 while True: 4 print 'args is %s ' % x 5 time.sleep(1) 6 7 8 def term(sig_num, addtion): 9 print 'current pid is %s, group id is %s' % (os.getpid(), os.getpgrp()) 10 os.killpg(os.getpgid(os.getpid()), signal.SIGKILL) 11 12 13 if __name__ == '__main__': 14 signal.signal(signal.SIGTERM, term) 15 print 'current pid is %s' % os.getpid() 16 for i in range(3): 17 t = Process(target=fun, args=(str(i),)) 18 t.daemon = True 19 t.start() 20 processes.append(t) 21 22 try: 23 for p in processes: 24 p.join() 25 except Exception as e: 26 print str(e)
注意在代碼中,為了防止之前出現的無限循環,在term函數中,我們通過os.killpg,直接向進程組發送SIGKILL信號。運行代碼,通過輸出我們可以看出,進程組中,主進程和子進程的進程組id相同,都是主進程的pid。通過kill -15向主進程或者子進程發送SIGTERM信號時,都會將進程組主進程和子進程全部殺死: