python multiprocessing深度解析


在寫python多線程代碼的時候,會用到multiprocessing這個包,這篇文章總結了一些這個包在多進程管理方面的一些原理和代碼分析。

1. 問題一:是否需要顯式調用pool的close和join方法,不調用的話,子進程是否無法退出?

首先初始化Pool的時候,指定processes的個數,就是pool中worker的個數,pool初始化的時候,會把worker以daemon=True的子進程方式啟動起來。

    def _repopulate_pool(self):
        """Bring the number of pool processes up to the specified number,
        for use after reaping workers which have exited.
        """
        for i in range(self._processes - len(self._pool)):
            w = self.Process(target=worker,
                             args=(self._inqueue, self._outqueue,
                                   self._initializer,
                                   self._initargs, self._maxtasksperchild)
                            )
            self._pool.append(w)
            w.name = w.name.replace('Process', 'PoolWorker')
            w.daemon = True
            w.start()
            debug('added worker')

推薦在使用完pool之后,用thread pool的時候調用close()和join()方法,這樣可以把pool中的worker都釋放掉(等待子任務結束)。但是如果不顯式的調用,在主進程退出的時候,這些子進程也會退出(原因是設置了daemon這個flag)。

def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
                   active_children=active_children,
                   current_process=current_process):
    # NB: we hold on to references to functions in the arglist due to the
    # situation described below, where this function is called after this
    # module's globals are destroyed.

    global _exiting

    info('process shutting down')
    debug('running all "atexit" finalizers with priority >= 0')
    _run_finalizers(0)

    if current_process() is not None:
        # NB: we check if the current process is None here because if
        # it's None, any call to ``active_children()`` will throw an
        # AttributeError (active_children winds up trying to get
        # attributes from util._current_process).  This happens in a
        # variety of shutdown circumstances that are not well-understood
        # because module-scope variables are not apparently supposed to
        # be destroyed until after this function is called.  However,
        # they are indeed destroyed before this function is called.  See
        # issues 9775 and 15881.  Also related: 4106, 9205, and 9207.

        for p in active_children():
            if p._daemonic:
                info('calling terminate() for daemon %s', p.name)
                p._popen.terminate()

        for p in active_children():
            info('calling join() for process %s', p.name)
            p.join()

    debug('running the remaining "atexit" finalizers')
    _run_finalizers()

主進程退出的時候,會調用_exit_function, 如果看到active的children是_daemonic的就會調用其terninate方法,讓子進程退出。exit是通過這個調用注冊的,atexit.register(_exit_function),本質是利用系統的退出hook方法,在退出的時候觸發對應的函數。

2. 問題二:如果啟動之后,kill -9主進程,子進程會不會無法退出?

如下代碼是pool中worker的主代碼邏輯,如果kill -9主進程,子進程如果沒有在處理作業,因為主進程退出了,get()方法從queue中拿task的時候,就會發生exception,這樣worker會退出。如果子進程正在處理任務,任務結束的時候,需要往queue中扔回結果,因為主進程退出了,所以也會exception,worker一樣會退出。

def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
    assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
    put = outqueue.put
    get = inqueue.get
    if hasattr(inqueue, '_writer'):
        inqueue._writer.close()
        outqueue._reader.close()

    if initializer is not None:
        initializer(*initargs)
    completed = 0
    while maxtasks is None or (maxtasks and completed < maxtasks):
        try:
            task = get()
        except (EOFError, IOError):
            debug('worker got EOFError or IOError -- exiting')
            break

        if task is None:
            debug('worker got sentinel -- exiting')
            break

        job, i, func, args, kwds = task
        try:
            result = (True, func(*args, **kwds))
        except Exception, e:
            result = (False, e)
        try:
            put((job, i, result))
        except Exception as e:
            wrapped = MaybeEncodingError(e, result[1])
            debug("Possible encoding error while sending result: %s" % (
                wrapped))
            put((job, i, (False, wrapped)))
        completed += 1
    debug('worker exiting after %d tasks' % completed)

worker退出的時候,看如下代碼

## process.py
def _bootstrap(self):
        from . import util
        global _current_process

        try:
            self._children = set()
            self._counter = itertools.count(1)
            try:
                sys.stdin.close()
                sys.stdin = open(os.devnull)
            except (OSError, ValueError):
                pass
            _current_process = self
            util._finalizer_registry.clear()
            util._run_after_forkers()
            util.info('child process calling self.run()')
            try:
                self.run()
                exitcode = 0
            finally:
                util._exit_function()

子進程run()會結束,然后調用_exit_function()清理一些子進程,調用_run_finalizers()結束進程。

但是如果子進程在pool的worker中跑的是長時間不退出的task,那這個子進程就會無法退出,一直在運行。如果task都是短作業,即使主進程被kill -9,子進程也會在作業跑完之后都退出。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM