主進程被殺死時,如何保證子進程同時退出,而不變為孤兒進程(三)


  之前兩篇文章討論了進程意外退出時,如何殺死子進程,這節我們研究下在使用進程池multiprocessing.Pool時,如何保證主進程意外退出,進程池中的worker進程同時退出,不產生孤兒進程。如果對python標准庫進程池不清楚的園友,可以看下之前寫的幾篇文章。我們嘗試下主進程中使用進程池,看看worker進程是否會退出:

 1 import time
 2 import os
 3 import signal
 4 from multiprocessing import Pool
 5 
 6 
 7 def fun(x):
 8     print 'current sub-process pid is %s' % os.getpid()
 9     while True:
10         print 'args is %s ' % x
11         time.sleep(1)
12 
13 def term(sig_num, addtion):
14     print 'current pid is %s, group id is %s' % (os.getpid(), os.getpgrp())
15     os.killpg(os.getpgid(os.getpid()), signal.SIGKILL)
16 
17 if __name__ == '__main__':
18     print 'current pid is %s' % os.getpid()
19     mul_pool = Pool()
20     signal.signal(signal.SIGTERM, term)
21 
22     for i in range(3):
23         mul_pool.apply_async(func=fun, args=(str(i),))

    運行上面的代碼,發現在我還沒來得及通過kill命令發送SIGTERM時,進程竟然退出了,而且主進程和進程池中的worker進程都退出了。結合線程的特征想了下,可能在新建worker進程時,默認啟動方式為daemon。通過查看源碼,發現worker進程啟動之前,被設置為daemon=True,也就是說主進程不會等待worker進程執行完再退出,這種情況下worker進程作為主進程的子進程,會隨着主進程的退出而退出,部分源碼如下:

1 w = self.Process(target=worker,
2                  args=(self._inqueue, self._outqueue,
3                        self._initializer,
4                        self._initargs, self._maxtasksperchild)
5                 )
6 self._pool.append(w)
7 w.name = w.name.replace('Process', 'PoolWorker')
8 w.daemon = True
9 w.start()

     接着我手動改了下源碼,將daemon設置為False,接着啟動進程,發現現象依然如前,程序剛啟動緊接着就全部退出(主進程和子進程)。很奇怪,難道daemon表示的含義在進程和線程中有不同?聯系之前對進程池分析的兩篇文章,發現進程池中的幾個線程在啟動之前也被設置為daemon=True,繼續手動修改下源碼,將線程的daemon設置為False,再次啟動進程,這次進程持續運行,主進程並未退出,通過kill命令發送SIGTERM信號后,整個進程組退出。編碼中,我們當然不能去修改源碼了,標准庫中的Pool提供了一個join方法,它可以對進程池中的線程以及worker進程進行等待,注意在調用join之前調用close方法保證進程池不在接收新任務。我們在對上面的代碼進行一些修改:

 1 if __name__ == '__main__':
 2     print 'current pid is %s' % os.getpid()
 3     mul_pool = Pool()
 4     signal.signal(signal.SIGTERM, term)
 5 
 6     for i in range(3):
 7         mul_pool.apply_async(func=fun, args=(str(i),))
 8         
 9     mul_pool.close()
10     mul_pool.join()

  改過之后程序不會自動退出了,但是又出現了新的問題,向進程發送kill命令,進程並沒有捕獲到信號,仍然繼續運行。在stackoverflow找到了類似的問題,對標准庫中signal有如下描述:A Python signal handler does not get executed inside the low-level (C) signal handler. Instead, the low-level signal handler sets a flag which tells the virtual machine to execute the corresponding Python signal handler at a later point(for example at the next bytecode instruction). This has consequences:

  • It makes little sense to catch synchronous errors like SIGFPE or SIGSEGV that are caused by an invalid operation in C code. Python will return from the signal handler to the C code, which is likely to raise the same signal again, causing Python to apparently hang. From Python 3.3 onwards, you can use the faulthandler module to report on synchronous errors.
  • A long-running calculation implemented purely in C (such as regular expression matching on a large body of text) may run uninterrupted for an arbitrary amount of time, regardless of any signals received. The Python signal handlers will be called when the calculation finishes.

   標准庫對signal handler的解釋大致是說,python中的信號處理函數不會被低級別的信號處理器觸發調用。取而代之的是,低級別的信號處理程序會設置一個標志,用來告訴虛擬機在稍后(例如下一個字節代碼指令)來執行信號處理函數。這樣的結果是:

  • 難以捕獲C代碼無效操作引起的同步異常,例如SIGFPE、SIGSEGV。Python將從信號處理返回到C代碼,這很可能會再次提出同樣的信號,導致python掛起。
  • 用C實現的長時計算程序(比如正則表達式匹配大段文本)可能不被中斷的運行任意長時間,而不管信號的接收。在計算完成時,python的信號處理函數將被執行。

  調用mul_pool.join使得主進程(線程)阻塞在join處,意味着它阻塞在C方法pthread_join調用中。pthread_join並不是一個long-running calculation的程序,而是一個系統調用的阻塞,盡管如此,直到它結束,否則信號處理函數無法被執行。帖子中給出的解決方法時更新python版本至3.3,而我使用的版本是python2.7。這里我並未嘗試使用python3.3版本,而是將join用loop sleep代替,簡單修改下上面的代碼:

 1 if __name__ == '__main__':
 2     print 'current pid is %s' % os.getpid()
 3     mul_pool = Pool()
 4     signal.signal(signal.SIGTERM, term)
 5 
 6     for i in range(3):
 7         mul_pool.apply_async(func=fun, args=(str(i),))
 8         
 9     while True:
10         time.sleep(60)

  這樣整個進程組仍然能夠在收到SIGTERM命令之后退出,而不留下孤兒進程。但是仔細想想我們這樣做是不是有些武斷,如果一些worker進程在運行一些重要的業務邏輯,強制結束可能會使得數據的丟失,或者一些其他難以恢復的后果,那么有沒有更合理的處理方式,使worker進程在處理完本輪數據后,再退出呢?答案同樣是肯定的,python標准庫中提供了一些進程間同步的工具,這里我們使用Event對象來做同步。首先我們需要通過multiprocessing.Manager類來獲取一個Event對象,用Event來控制worker進程的退出,首先修改worker進程的回調函數:

1 def fun(x, event):
2     while not event.is_set():
3         print 'process %s running args is %s' % (os.getpid(), x)
4         time.sleep(3)
5     print 'process %s, call fun finish' % os.getpid()

 

   event對象是用來控制worker進程的,當然代碼中的使用只是一個簡單的示例,現實情況中worker進程並非一個while這么簡單。我們要通過event來控制worker進程的退出,那么可以看到,當event.is_set() == True時,worker會自動退出,那么可以捕獲SIGTERM信號,在signal_handler中將event對象進行set:

1 def terminate(pool, event, sig_num, addtion):
2     print 'terminate process %d' % os.getpid()
3     if not event.is_set():
4         event.set()
5     
6     pool.close()
7     pool.join()
8     
9     print 'exit...'

 

  在主進程中,首先要創建一個Manager對象,有它來產生Event對象,注意在創建Manager對象后,通過后台ps命令可以看到,此時會多了一個進程,實際上創建Manager對象就會創建一個新的進程,用於數據的同步,我們在signal信號處理函數中實現設置event,並且終止進程池,而signal.signal回調函數只能有兩個參數,所以依舊使用partial偏函數進行處理:

 1 if __name__ == '__main__':
 2     print 'current pid is %s' % os.getpid()
 3     mul_pool = Pool()
 4     manager = Manager()
 5     event = manager.Event()
 6 
 7     handler = functools.partial(terminate, mul_pool, event)
 8     signal.signal(signal.SIGTERM, handler)
 9 
10     for i in range(4):
11         mul_pool.apply_async(func=fun, args=(str(i), event))
12 
13     while True:
14         time.sleep(60)

 

  運行程序,通過kill命令發送SIGTERM信號,觀察到的現象是收到signal信號之后,執行了event.set()方法,worker進程退出,進程池關閉,但是ps之后,發現還有兩個進程在運行,通過進程id和strace命令發現一個是主進程,一個是Manager進程同步對象。代碼中,主進程最后進入了loop sleep狀態,所以當我們收到信號之后,雖然通過event將worker進程和進程池結束,但是主進程的仍然在sleep,所以Manager進程同步對象也為退出。這樣我們可以簡單修改下代碼來處理,可以在terminate方法中添加manager參數,在方法中顯示調用manager.shutdown()關閉進程同步對象,然后強制退出,也可以在主進程中同樣使用event來代替whlie True循環。這里我們采用第一種方式,簡單修改下上面的代碼:

 1 def terminate(pool, event, manager, sig_num, addtion):
 2     print 'terminate process %d' % os.getpid()
 3     if not event.is_set():
 4         event.set()
 5 
 6     pool.close()
 7     pool.join()
 8     manager.shutdown()
 9     print 'exit ...'
10     os._exit(0)
11 
12 if __name__ == '__main__':
13     print 'current pid is %s' % os.getpid()
14     mul_pool = Pool()
15     manager = Manager()
16     event = manager.Event()
17 
18     handler = functools.partial(terminate, mul_pool, event, manager)
19     signal.signal(signal.SIGTERM, handler)
20 
21     for i in range(4):
22         mul_pool.apply_async(func=fun, args=(str(i), event))
23 
24     while True:
25         time.sleep(60)

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM