[源碼分析] 分布式任務隊列 Celery 多線程模型 之 子進程


[源碼分析] 分布式任務隊列 Celery 多線程模型 之 子進程

0x00 摘要

Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統,專注於實時處理的異步任務隊列,同時也支持任務調度。

在前文中,我們介紹了Celery 多線程模型,但是我們略過了子進程這一個階段,本文看看子進程如何繼續啟動。

我們依然先要提出幾個問題:

  • 在啟動子進程之前,需要做哪些准備?
    • 如何知道子進程要運行什么命令?
    • 如何構建父子進程通訊機制?
    • 如何把父進程信息傳遞給子進程?
  • 目前,Celery 應用是在父進程中。
    • 子進程如何得到 Celery 應用?
    • 如何恢復 Celery 應用?
  • 父進程如何知道子進程已經ready,從而可以給子進程安排工作?
  • 子進程如何接受父進程安排的任務?

為了便於大家理解,我們先給出本文一個最終關系圖。

0x01 前文回顧

1.1 基類作用

前文我們講到,在 AsynPool 的基類 Pool(object) 之中,建立了各種 消息處理函數,並且建立了子進程

代碼位置在:billiard/pool.py

具體代碼如下,這里 _create_worker_process 就建立了子進程。

class Pool(object):
    def __init__(self, processes=None, initializer=None, initargs=(),
                 maxtasksperchild=None, timeout=None, soft_timeout=None,
                 lost_worker_timeout=None,
                 max_restarts=None, max_restart_freq=1,
                 on_process_up=None,
                 on_process_down=None,
                 on_timeout_set=None,
                 on_timeout_cancel=None,
                 threads=True,
                 semaphore=None,
                 putlocks=False,
                 allow_restart=False,
                 synack=False,
                 on_process_exit=None,
                 context=None,
                 max_memory_per_child=None,
                 enable_timeouts=False,
                 **kwargs):
        for i in range(self._processes):
            self._create_worker_process(i)

1.2 子進程抽象

如下代碼建立子進程抽象。

for i in range(self._processes):
    self._create_worker_process(i)

_create_worker_process 主要工作如下:

  • inq, outq, synq = self.get_process_queues() 拿到的是一個讀和寫的管道的抽象對象。這個管道是之前預先創建好的(就是上面 self.create_process_queues() 創建的)。主要是給即將 fork 的子進程用的,子進程會監聽這管道數據結構抽象實例中的讀事件,還可以從寫管道寫數據。

  • w,也就是 self.WorkerProcess 的實例,其實是對 fork 出來的子進程的一個抽象封裝。用來方便快捷的管理子進程,抽象成一個進程池,這個 w 會記錄 fork 出來的子進程的一些 meta 信息,比如 pid,管道的讀寫的 fd 等等,並注冊在主進程中,主進程可以利用它進行任務分發

  • 把 WorkerProcess 的實例記錄在 self._pool;

  • w.start() 中包含具體的 fork 過程;

w.start() 中包含具體的 fork 過程。

def _create_worker_process(self, i):

    w = self.WorkerProcess(self.Worker(
        inq, outq, synq, self._initializer, self._initargs,
        self._maxtasksperchild, sentinel, self._on_process_exit,
        # Need to handle all signals if using the ipc semaphore,
        # to make sure the semaphore is released.
        sigprotection=self.threads,
        wrap_exception=self._wrap_exception,
        max_memory_per_child=self._max_memory_per_child,
        on_ready_counter=on_ready_counter,
    ))

    w.start() # 到了這里

    return w

1.3 Fork過程

Fork 的具體代碼如下:

class BaseProcess(object):
    '''
    Process objects represent activity that is run in a separate process

    The class is analagous to `threading.Thread`
    '''

    def run(self):
        '''
        Method to be run in sub-process; can be overridden in sub-class
        '''
        if self._target:
            self._target(*self._args, **self._kwargs)

    def start(self):
        '''
        Start child process
        '''
        assert self._popen is None, 'cannot start a process twice'
        assert self._parent_pid == os.getpid(), \
            'can only start a process object created by current process'
        _cleanup()
        self._popen = self._Popen(self)
        self._sentinel = self._popen.sentinel
        _children.add(self)

其中主要是 self._popen = self._Popen(self) 比較重要。

代碼位於:billiard/context.py。

其中可以看到,因為操作系統的不同,具體使用也不同。

下面為 *nix 系統的各種 類fork函數。

    class ForkProcess(process.BaseProcess):
        _start_method = 'fork'

        @staticmethod
        def _Popen(process_obj):
            from .popen_fork import Popen
            return Popen(process_obj)

    class SpawnProcess(process.BaseProcess):
        _start_method = 'spawn'

        @staticmethod
        def _Popen(process_obj):
            from .popen_spawn_posix import Popen
            return Popen(process_obj)

    class ForkServerProcess(process.BaseProcess):
        _start_method = 'forkserver'

        @staticmethod
        def _Popen(process_obj):
            from .popen_forkserver import Popen
            return Popen(process_obj)

下面為 windows系統。

class SpawnProcess(process.BaseProcess):
    _start_method = 'spawn'

    @staticmethod
    def _Popen(process_obj):
        from .popen_spawn_win32 import Popen
        return Popen(process_obj)

於是我們就具體看看子進程如何進行處理。

0x02 預先准備

在子進程啟動之前,Celery 會做很多准備,比如構建子進程運行的命令,設置管道,傳遞父進程信息等等。

2.1 總體准備流程

經過調試我們發現,無論是windows或者*nix系統,調試中都各有不便之處,所以我們下面以windows系統為例分析。

注:
有同學指出,Windows上 Celery 多線程出錯。特此說明下。

我的環境復雜,有mac, linux, windows,而且有的操作系統有多台,各種切換很無奈。

當分析本文部分代碼時候,手上只有Windows,所以就只能貼出來Windows下面的具體調試變量。

其實具體OS不重要,重要的是通過代碼來剖析Celery的設計思路。

另外,mac調試Celery,也是各種容易出錯,在分析代碼這點上,和Windows相比沒什么太大優勢。

前文因為是 *nix 系統,所以子進程抽象是 ForkProcess, 本文因為是 windows,替換為 SpawnProcess

因為是windows 系統,所以我們調用到:

class SpawnProcess(process.BaseProcess):
    _start_method = 'spawn'

    @staticmethod
    def _Popen(process_obj):
        from .popen_spawn_win32 import Popen
        return Popen(process_obj)

因此使用 from .popen_spawn_win32 import Popen

下面代碼位於:billiard/popen_spawn_win32.py

主要功能如下:

  • 首先調用 _winapi.CreatePipe(None, 0) 來得到 之前建立的 pipe 的讀寫管道;

  • 其次調用 get_command_line 來拼湊出子進程執行命令,注意這里傳遞的 pipe_handle 為 讀管道,parent_pid 就為父進程的pid,子進程中,*nix 和 windows 分別依據 pipe_handle 和 parent_pid 得到讀管道;

  • 然后打開讀管道,這個很重要;

  • 再次調用 windows 系統方法 CreateProcess 來執行子進程;

  • 因為已經打開了讀管道,所以通過 reduction.dump(prep_data, to_child) 把父進程的關鍵輔助信息傳遞給子進程,通過這些信息子進程才可以解讀父進程信息

  • 通過 reduction.dump(process_obj, to_child) 把父進程信息傳遞給子進程,父進程信息就為 SpawnProcess

  • 在父進程中,通過 _winapi.CloseHandle(rhandle) 關閉父進程的讀管道。這樣父進程,子進程就通過子進程的讀管道聯系;

具體如下:

class Popen(object):
    '''
    Start a subprocess to run the code of a process object
    '''
    method = 'spawn'
    sentinel = None

    def __init__(self, process_obj):
        os.environ["MULTIPROCESSING_FORKING_DISABLE"] = "1"
        spawn._Django_old_layout_hack__save()
        prep_data = spawn.get_preparation_data(process_obj._name)

        # read end of pipe will be "stolen" by the child process
        # -- see spawn_main() in spawn.py.
        rhandle, whandle = _winapi.CreatePipe(None, 0)
        wfd = msvcrt.open_osfhandle(whandle, 0)
        cmd = spawn.get_command_line(parent_pid=os.getpid(),
                                     pipe_handle=rhandle)
        cmd = ' '.join('"%s"' % x for x in cmd)

        with io.open(wfd, 'wb', closefd=True) as to_child:
            # start process
            try:
                hp, ht, pid, tid = CreateProcess(
                    spawn.get_executable(), cmd,
                    None, None, False, 0, None, None, None)
                close_thread_handle(ht)
            except:
                _winapi.CloseHandle(rhandle)
                raise

            # set attributes of self
            self.pid = pid
            self.returncode = None
            self._handle = hp
            self.sentinel = int(hp)

            # send information to child
            context.set_spawning_popen(self)
            try:
                reduction.dump(prep_data, to_child)
                reduction.dump(process_obj, to_child)
            finally:
                context.set_spawning_popen(None)

我們下面具體看看這個准備流程中的幾個重要點。

2.2 獲取命令

首先,重要點是:調用 get_command_line 來拼湊出子進程執行命令

代碼位於:billiard/spawn.py。

就是拼接出一個celery運行命令。

def get_command_line(**kwds):
    '''
    Returns prefix of command line used for spawning a child process
    '''
    if getattr(sys, 'frozen', False):
        return ([sys.executable, '--billiard-fork'] +
                ['%s=%r' % item for item in kwds.items()])
    else:
        prog = 'from billiard.spawn import spawn_main; spawn_main(%s)'
        prog %= ', '.join('%s=%r' % item for item in kwds.items())
        opts = util._args_from_interpreter_flags()
        return [_python_exe] + opts + ['-c', prog, '--billiard-fork']

命令行結果就是類似於:

python -c 'from billiard.spawn import spawn_main; spawn_main(....)' -billiard+fork ..

2.3 調用 windows 系統方法

然后會調用windows 系統方法啟動子進程。

hp, ht, pid, tid = CreateProcess(
    spawn.get_executable(), cmd,
    None, None, False, 0, None, None, None)

因此,邏輯如下:

                 +-----------------------------+
                 |        SpawnProcess         |
                 |                             |
                 |                             |
                 |               os.getpid() +-----------------+
                 |                             |               |
                 |                   rhandle +---------------+ |
                 |           Popen             |             | |
                 |             +     whandle   |             | |
                 |             |               |             | |
                 +-----------------------------+             | |
                               |                             | |
                               |                             | |
                               |  get_command_line           | |
                               |                             | |                       .
                               |                             | |
                               v                             | |
                                                             v v
python -c 'from billiard.spawn import spawn_main; spawn_main(....)' --billiard+fork ..
                               +
                               |
                               |
                               |
                               |  CreateProcess
                               |
                               |
                               |
                               v
                      +--------+--------+
                      | windows kernel  |
                      +-----------------+

2.4 傳遞父進程信息

因為已經打開了讀管道,所以通過 reduction.dump(prep_data, to_child) 把父進程的關鍵輔助信息傳遞給子進程,通過這些信息才可以解讀父進程信息。這里父進程信息 obj 就為 SpawnProcess 本身

代碼通過 picker 完成,具體如下:

def dump(obj, file, protocol=None):
    '''Replacement for pickle.dump() using ForkingPickler.'''
    ForkingPickler(file, protocol).dump(obj)

以及:

if PY3:
    import copyreg

    class ForkingPickler(pickle.Pickler):
        '''Pickler subclass used by multiprocessing.'''
        _extra_reducers = {}
        _copyreg_dispatch_table = copyreg.dispatch_table

        def __init__(self, *args):
            super(ForkingPickler, self).__init__(*args)
            self.dispatch_table = self._copyreg_dispatch_table.copy()
            self.dispatch_table.update(self._extra_reducers)

        @classmethod
        def register(cls, type, reduce):
            '''Register a reduce function for a type.'''
            cls._extra_reducers[type] = reduce

        @classmethod
        def dumps(cls, obj, protocol=None):
            buf = io.BytesIO()
            cls(buf, protocol).dump(obj)
            return buf.getbuffer()

        @classmethod
        def loadbuf(cls, buf, protocol=None):
            return cls.loads(buf.getbuffer())

        loads = pickle.loads

else:

    class ForkingPickler(pickle.Pickler):  # noqa
        '''Pickler subclass used by multiprocessing.'''
        dispatch = pickle.Pickler.dispatch.copy()

        @classmethod
        def register(cls, type, reduce):
            '''Register a reduce function for a type.'''
            def dispatcher(self, obj):
                rv = reduce(obj)
                self.save_reduce(obj=obj, *rv)
            cls.dispatch[type] = dispatcher

        @classmethod
        def dumps(cls, obj, protocol=None):
            buf = io.BytesIO()
            cls(buf, protocol).dump(obj)
            return buf.getvalue()

        @classmethod
        def loadbuf(cls, buf, protocol=None):
            return cls.loads(buf.getvalue())

        @classmethod
        def loads(cls, buf, loads=pickle.loads):
            if isinstance(buf, io.BytesIO):
                buf = buf.getvalue()
            return loads(buf)

至此,准備工作完畢,將會進入到子進程。

0x03 子進程啟動

既然已經通知了 windows,所以 windows 就進行系統調用。

3.1 從命令行進入

既然前面的命令行結果中明確提到了spawn_main:

python -c 'from billiard.spawn import spawn_main; spawn_main(....)' -billiard+fork ..

於是子進程從spawn_main啟動

代碼位於:billiard/spawn.py

def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None):
    '''
    Run code specified by data received over pipe
    '''
    assert is_forking(sys.argv)
    if sys.platform == 'win32':
        import msvcrt
        from .reduction import steal_handle
        new_handle = steal_handle(parent_pid, pipe_handle)
        fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
    else:
        from . import semaphore_tracker
        semaphore_tracker._semaphore_tracker._fd = tracker_fd
        fd = pipe_handle
    exitcode = _main(fd) # 將會調用到這里。
    sys.exit(exitcode)

注意:

這里的 pipe_handle 就為 傳遞進來的讀管道。parent_pid為父進程ID。子進程中,*nix 和 windows 分別依據 pipe_handle 和 parent_pid 得到讀管道。

此時邏輯為:

                                                                                          +
                                                                parent process            |                child process
                 +-----------------------------+                                          |
                 |        SpawnProcess         |                                          |
                 |                             |                                          |
                 |                os.getpid()+-----------------+                          |
                 |                             |               |                          |
                 |                   rhandle +---------------+ |                          |                          +---------------+
                 |           Popen             |             | |                          |                          |  spawn_main   |
                 |             +     whandle   |             | |                          |           parent_pid     |               |
                 |             |               |             | |                          |                          |               |
                 +---+-------------------------+             | |                          |       +--------------->  |               |
                     |         |                             | |                          |       |                  |          fd   |
                     |         |                             | |                          |       |   +----------->  |           ^   |
                     |         |  get_command_line           | |                          |       |   | pipe_handle  |           |   |
                     |         |                             | |                          |       |   |              +---------------+
                     |         |                             | |                          |       |   |                          |
                     |         v                             | |                          |       |   |                     ^    |
                     |                                       v v                          |       |   |                     |    |
python -c 'from billi|rd.spawn import spawn_main; spawn_main(....)' --billiard-fork ...   |       |   |                     |    |
                     |         +                              + +                         |       |   |                     |    |
                     |         |                              | |                         |       |   |                     |    |
                     |         |                              | |                         |       |   |                     |    |
                     |         |                              | |                         |       |   |                     |    |
                     |         |  CreateProcess               | |                         |       |   |                     |    |
                     |         |                              | +---------------------------------+   |                     |    |
                     |         |                              +---------------------------------------+                     |    |
                     |         |                                                                                            |    |
                     |         |                 1                               +-----------------+              2         |    |
                     |         +---------------------------------------------->  | windows kernel  |  +---------------------+    |
                     |                                                           +-----------------+                             |
                     |                                                                                                           |
                     |                                                                                                           |
                     +-----------------------------------------------------------------------------------------------------------+
                                   3  reduction.dump(process_obj, to_child)

手機如下:

因此,程序進行調用 _main。

3.2 _main 讀取父進程關鍵信息

前面提到,父進程會寫入關鍵信息。所以子進程這里打開了讀管道,讀取父進程的關鍵信息,這里父進程信息 就為 SpawnProcess 本身,因此子進程可以操作 SpawnProcess

def _main(fd):
    _Django_old_layout_hack__load()
    with io.open(fd, 'rb', closefd=True) as from_parent:
        process.current_process()._inheriting = True
        try:
            preparation_data = pickle.load(from_parent)
            prepare(preparation_data)
            _setup_logging_in_child_hack()
            self = pickle.load(from_parent) #讀取父進程的關鍵信息,就為SpawnProcess
        finally:
            del process.current_process()._inheriting
    return self._bootstrap()

邏輯如下:

                                                                                          +
                                                                parent process            |                child process
                 +-----------------------------+                                          |
                 |        SpawnProcess         |                                          |
                 |                             |                                          |
                 |                os.getpid()+-----------------+                          |
                 |                             |               |                          |
                 |                   rhandle +---------------+ |                          |                          +---------------+
                 |           Popen             |             | |                          |                          |  spawn_main   |
                 |             +     whandle   |             | |                          |           parent_pid     |               |  4    +-------------+
                 |             |               |             | |                          |                          |        self+--------> |SpawnProcess |
                 +---+-------------------------+             | |                          |       +--------------->  |               |       +------+------+
                     |         |                             | |                          |       |                  |          fd   |              |
                     |         |                             | |                          |       |   +----------->  |           ^   |              |
                     |         |  get_command_line           | |                          |       |   | pipe_handle  |           |   |           5  | _bootstrap()
                     |         |                             | |                          |       |   |              +---------------+              |
                     |         |                             | |                          |       |   |                          |                  v
                     |         v                             | |                          |       |   |                     ^    |
                     |                                       v v                          |       |   |                     |    |
python -c 'from billi|rd.spawn import spawn_main; spawn_main(....)' --billiard-fork ...   |       |   |                     |    |
                     |         +                              + +                         |       |   |                     |    |
                     |         |                              | |                         |       |   |                     |    |
                     |         |                              | |                         |       |   |                     |    |
                     |         |                              | |                         |       |   |                     |    |
                     |         |  CreateProcess               | |                         |       |   |                     |    |
                     |         |                              | +---------------------------------+   |                     |    |
                     |         |                              +---------------------------------------+                     |    |
                     |         |                                                                                            |    |
                     |         |                 1                               +-----------------+              2         |    |
                     |         +---------------------------------------------->  | windows kernel  |  +---------------------+    |
                     |                                                           +-----------------+                             |
                     |                                                                                                           |
                     |                                                                                                           |
                     +-----------------------------------------------------------------------------------------------------------+
                                   3  reduction.dump(process_obj, to_child)

手機如下:

3.3 SpawnProcess 啟動

既然子進程已經知道了SpawnProcess,因此調用到了 SpawnProcess 的基類。

代碼位於:billiard/process.py。

class BaseProcess(object):
    '''
    Process objects represent activity that is run in a separate process

    The class is analagous to `threading.Thread`
    '''

3.3.1 _bootstrap 配置必要信息

基類 billiard/process.py 之中,會通過 _bootstrap 來 配置必要信息,比如 stdin ,然后調用 run。

def _bootstrap(self):
    from . import util, context
    global _current_process, _process_counter, _children

    try:
        # 設置 stdin等等
        if self._start_method is not None:
            context._force_start_method(self._start_method)
        _process_counter = itertools.count(1)
        _children = set()
        if sys.stdin is not None:
            try:
                sys.stdin.close()
                sys.stdin = open(os.devnull)
            except (EnvironmentError, OSError, ValueError):
                pass
        old_process = _current_process
        _set_current_process(self)

        # 設置 logger等等
        loggerDict = logging.Logger.manager.loggerDict
        logger_names = list(loggerDict.keys())
        logger_names.append(None)  # for root logger
        for name in logger_names:
            if not name or not isinstance(loggerDict[name],
                                          logging.PlaceHolder):
                for handler in logging.getLogger(name).handlers:
                    handler.createLock()
        logging._lock = threading.RLock()

        try:
            util._finalizer_registry.clear()
            util._run_after_forkers()
        finally:
            # delay finalization of the old process object until after
            # _run_after_forkers() is executed
            del old_process
        util.info('child process %s calling self.run()', self.pid)
        try:
            self.run() # 運行到這里
            exitcode = 0
        finally:
            util._exit_function()

    return exitcode

3.3.2 啟動服務 Worker

SpawnProcess 繼續調用 run。

def run(self):
    '''
    Method to be run in sub-process; can be overridden in sub-class
    '''
    if self._target:
        self._target(*self._args, **self._kwargs)

由前文可知道,

 _target = {Worker} <celery.concurrency.asynpool.Worker object at 0x7f9ad358b240>

因此來到了 celery.concurrency.asynpool.Worker ,這就是子進程工作循環

如下:

                                                                                          +
                                                                parent process            |                child process
                 +-----------------------------+                                          |
                 |        SpawnProcess         |                                          |
                 |                             |                                          |
                 |                os.getpid()+-----------------+                          |
                 |                             |               |                          |
                 |                   rhandle +---------------+ |                          |                          +---------------+
                 |           Popen             |             | |                          |                          |  spawn_main   |
                 |             +     whandle   |             | |                          |           parent_pid     |               |  4    +-------------+
                 |             |               |             | |                          |                          |        self+--------> |SpawnProcess |
                 +---+-------------------------+             | |                          |       +--------------->  |               |       +------+------+
                     |         |                             | |                          |       |                  |          fd   |              |
                     |         |                             | |                          |       |   +----------->  |           ^   |              |
                     |         |  get_command_line           | |                          |       |   | pipe_handle  |           |   |           5  | _bootstrap()
                     |         |                             | |                          |       |   |              +---------------+              |
                     |         |                             | |                          |       |   |                          |                  v
                     |         v                             | |                          |       |   |                     ^    |
                     |                                       v v                          |       |   |                     |    |             +----------+
python -c 'from billi|rd.spawn import spawn_main; spawn_main(....)' --billiard-fork ...   |       |   |                     |    |             | Worker   |
                     |         +                              + +                         |       |   |                     |    |             |          |
                     |         |                              | |                         |       |   |                     |    |             +-----+----+
                     |         |                              | |                         |       |   |                     |    |                   |
                     |         |                              | |                         |       |   |                     |    |                   |
                     |         |  CreateProcess               | |                         |       |   |                     |    |                   |
                     |         |                              | +---------------------------------+   |                     |    |                   |
                     |         |                              +---------------------------------------+                     |    |                   |
                     |         |                                                                                            |    |                   v
                     |         |                 1                               +-----------------+              2         |    |
                     |         +---------------------------------------------->  | windows kernel  |  +---------------------+    |
                     |                                                           +-----------------+                             |
                     |                                                                                                           |
                     |                                                                                                           |
                     +-----------------------------------------------------------------------------------------------------------+
                                   3  reduction.dump(process_obj, to_child)

手機如下:

3.4 Worker 服務

代碼位於 :celery/billiard/pool.py

進入 Worker 之后,就來到了 __call__,主要功能如下:

                                                                                          +
                                                                parent process            |                child process
                 +-----------------------------+                                          |
                 |        SpawnProcess         |                                          |
                 |                             |                                          |
                 |                os.getpid()+-----------------+                          |
                 |                             |               |                          |
                 |                   rhandle +---------------+ |                          |                          +---------------+
                 |           Popen             |             | |                          |                          |  spawn_main   |
                 |             +     whandle   |             | |                          |           parent_pid     |               |  4    +-------------+
                 |             |               |             | |                          |                          |        self+--------> |SpawnProcess |
                 +---+-------------------------+             | |                          |       +--------------->  |               |       +------+------+
                     |         |                             | |                          |       |                  |          fd   |              |
                     |         |                             | |                          |       |   +----------->  |           ^   |              |
                     |         |  get_command_line           | |                          |       |   | pipe_handle  |           |   |           5  | _bootstrap()
                     |         |                             | |                          |       |   |              +---------------+              |
                     |         |                             | |                          |       |   |                          |                  v
                     |         v                             | |                          |       |   |                     ^    |
                     |                                       v v                          |       |   |                     |    |             +----------+
python -c 'from billi|rd.spawn import spawn_main; spawn_main(....)' --billiard-fork ...   |       |   |                     |    |             | Worker   |
                     |         +                              + +                         |       |   |                     |    |             |          |
                     |         |                              | |                         |       |   |                     |    |             +-----+----+
                     |         |                              | |                         |       |   |                     |    |                   |
                     |         |                              | |                         |       |   |                     |    |                   |
                     |         |  CreateProcess               | |                         |       |   |                     |    |                   |
                     |         |                              | +---------------------------------+   |                     |    |                   |
                     |         |                              +---------------------------------------+                     |    |                   |
                     |         |                                                                                            |    |                   v
                     |         |                 1                               +-----------------+              2         |    |
                     |         +---------------------------------------------->  | windows kernel  |  +---------------------+    |               __call__
                     |                                                           +-----------------+                             |
                     |                                                                                                           |
                     |                                                                                                           |
                     +-----------------------------------------------------------------------------------------------------------+
                                   3  reduction.dump(process_obj, to_child)
                                                                                          +
                                                                                          |
                                                                                          |
                                                                                          |
                                                                                          +

手機如下:

__call__,主要功能如下:

  • 使用 _make_child_methods 配置 監聽 任務 和 同步的方法;

  • 使用 after_fork 來恢復應用信息;

  • 使用 on_loop_start 來發送一個 WORKER_UP,以此通知父進程;

  • 使用 sys.exit(self.workloop(pid=pid)) 正式進入循環;

class Worker(object):

    def __call__(self):
        _exit = sys.exit
        _exitcode = [None]

        def exit(status=None):
            _exitcode[0] = status
            return _exit(status)
        sys.exit = exit

        pid = os.getpid()

        self._make_child_methods()
        self.after_fork()
        self.on_loop_start(pid=pid)  # callback on loop start
        try:
            sys.exit(self.workloop(pid=pid))
        except Exception as exc:
            error('Pool process %r error: %r', self, exc, exc_info=1)
            self._do_exit(pid, _exitcode[0], exc)
        finally:
            self._do_exit(pid, _exitcode[0], None)

我們下面詳細分析。

3.4.1 配置 監聽 任務 和 同步的方法

子進程 使用 _make_child_methods 配置 監聽 任務 和 同步的方法

def _make_child_methods(self, loads=pickle_loads):
    self.wait_for_job = self._make_protected_receive(self.inq)
    self.wait_for_syn = (self._make_protected_receive(self.synq)
                         if self.synq else None)

3.4.2 配置應用相關信息

於是我們遇到一個問題:Celery 應用是在父進程中,子進程如何得到

雖然在一些多進程機制中,父進程的變量是會復制到子進程中,但是這並不是一定的,所以必然有一個父進程把 Celery 應用 設置給子進程的機制。

所以,我們需要梳理父進程是如何給子進程配置 Celery應用,以及子進程如何得到這個應用的

3.4.2.1 應用信息來源

之前在父進程中,當啟動進程池時候, class Pool(object): 對應配置如下(路徑在 :billiard/pool.py):

需要注意的是:

  • 這里是回到父進程來探討;
  • 參數 initializer 就是 Celery 變量本身

代碼為:

class Pool(object):
    '''
    Class which supports an async version of applying functions to arguments.
    '''
    _wrap_exception = True
    Worker = Worker
    Supervisor = Supervisor
    TaskHandler = TaskHandler
    TimeoutHandler = TimeoutHandler
    ResultHandler = ResultHandler
    SoftTimeLimitExceeded = SoftTimeLimitExceeded

    def __init__(self, processes=None, initializer=None, initargs=(),
								 ......
                 **kwargs):
        self._ctx = context or get_context()
        self.synack = synack
        self._setup_queues()
        self._taskqueue = Queue()
        self._cache = {}
        self._state = RUN
        self.timeout = timeout
        self.soft_timeout = soft_timeout
        self._maxtasksperchild = maxtasksperchild
        self._max_memory_per_child = max_memory_per_child
        self._initializer = initializer
        self._initargs = initargs

於是 Pool 類的相關變量為如下,這里的 Celery myTest 就是 Celery 應用本身

self._initializer = {function} <function process_initializer at 0x7f90c9387488>
self._initargs = {tuple: 2} (<Celery myTest at 0x7f90c8812f98>, 'celery')
self = {AsynPool} <celery.concurrency.asynpool.AsynPool object at 0x7f90c97379b0>

從而父進程中 class Worker(object): 配置如下,可以看到設置了 initializer:

class Worker(object):

    def __init__(self, inq, outq, synq=None, initializer=None, initargs=(),
                 maxtasks=None, sentinel=None, on_exit=None,
                 sigprotection=True, wrap_exception=True,
                 max_memory_per_child=None, on_ready_counter=None):
        assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
        self.initializer = initializer
        self.initargs = initargs
3.4.2.2 調用恢復

前面提到,在子進程啟動之后,會調用 after_fork 來進行恢復應用。

process_initializer, prefork.py:44
after_fork, pool.py:421
__call__, pool.py:289
run, process.py:114
_bootstrap, process.py:327
_main, spawn.py:210
spawn_main, spawn.py:165
<frame not available>

具體看看,發現 after_fork 通過 self.initializer(*self.initargs) 恢復應用信息

def after_fork(self):
    if hasattr(self.inq, '_writer'):
        self.inq._writer.close()
    if hasattr(self.outq, '_reader'):
        self.outq._reader.close()

    if self.initializer is not None:
        self.initializer(*self.initargs)

    # Make sure all exiting signals call finally: blocks.
    # This is important for the semaphore to be released.
    reset_signals(full=self.sigprotection)

    # install signal handler for soft timeouts.
    if SIG_SOFT_TIMEOUT is not None:
        signal.signal(SIG_SOFT_TIMEOUT, soft_timeout_sighandler)

    try:
        signal.signal(signal.SIGINT, signal.SIG_IGN)
    except AttributeError:
        pass
3.4.2.3 恢復應用信息

具體恢復方法在 process_initializer。

代碼位置為 :celery/concurrency/prefork.py

這里重要的是 app.set_current()就是把 傳入的 Celery 配置到 子進程本身之中

具體代碼為:

def process_initializer(app, hostname):
    """Pool child process initializer.

    Initialize the child pool process to ensure the correct
    app instance is used and things like logging works.
    """
    _set_task_join_will_block(True)
    platforms.signals.reset(*WORKER_SIGRESET)
    platforms.signals.ignore(*WORKER_SIGIGNORE)
    platforms.set_mp_process_title('celeryd', hostname=hostname)
    # This is for Windows and other platforms not supporting
    # fork().  Note that init_worker makes sure it's only
    # run once per process.
    app.loader.init_worker()
    app.loader.init_worker_process()

    if os.environ.get('FORKED_BY_MULTIPROCESSING'):
        # pool did execv after fork
        trace.setup_worker_optimizations(app, hostname)
    else:
        app.set_current() # 這里進行配置
        set_default_app(app)
        app.finalize()
        trace._tasks = app._tasks  # enables fast_trace_task optimization.
    # rebuild execution handler for all tasks.
    from celery.app.trace import build_tracer
    for name, task in app.tasks.items():
        task.__trace__ = build_tracer(name, task, app.loader, hostname,
                                      app=app)
    from celery.worker import state as worker_state
    worker_state.reset_state()
    signals.worker_process_init.send(sender=None)
配置 Celery 自己

子進程中,具體配置代碼位於:celery/app/base.py,我們可以看到 TLS 相關信息。

def set_current(self):
    """Make this the current app for this thread."""
    _set_current_app(self)
    
def _set_current_app(app):
    _tls.current_app = app

def _get_current_app():
    if default_app is None:
        #: creates the global fallback app instance.
        from celery.app.base import Celery
        set_default_app(Celery(
            'default', fixups=[], set_as_current=False,
            loader=os.environ.get('CELERY_LOADER') or 'default',
        ))
    return _tls.current_app or default_app
TLS

TLS 定義位於:celery/_state.py

就是各個進程或者線程獨立的變量,區別取決於不同實現方式。

class _TLS(threading.local):
    #: Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute
    #: sets this, so it will always contain the last instantiated app,
    #: and is the default app returned by :func:`app_or_default`.
    current_app = None


_tls = _TLS()
后續使用

這樣后續的使用就可以使用 get_current_app 提出來 Celery 本身,獲取應用信息。

具體后續是在 celery/_state.py 做了進一步封裝,並且使用,如何使用,我們下文講解。

if os.environ.get('C_STRICT_APP'):  # pragma: no cover
    def get_current_app():
        """Return the current app."""
        raise RuntimeError('USES CURRENT APP')
elif os.environ.get('C_WARN_APP'):  # pragma: no cover
    def get_current_app():  # noqa
        import traceback
        print('-- USES CURRENT_APP', file=sys.stderr)  # noqa+
        traceback.print_stack(file=sys.stderr)
        return _get_current_app()

3.4.3 通知父進程

子進程啟動最后,會使用 on_loop_start 來發送一個 WORKER_UP,可以看到是通過管道進行交互。

於是在父進程 ResultHandler . on_process_alive 會響應。

class Worker(_pool.Worker):
    """Pool worker process."""

    def on_loop_start(self, pid):
        # our version sends a WORKER_UP message when the process is ready
        # to accept work, this will tell the parent that the inqueue fd
        # is writable.
        self.outq.put((WORKER_UP, (pid,)))

父進程啟動時候,會設置一個消息響應 函數,這樣父進程就知道子進程已經ready,可以給子進程安排工作

class ResultHandler(_pool.ResultHandler):
    """Handles messages from the pool processes."""

    def __init__(self, *args, **kwargs):
        self.fileno_to_outq = kwargs.pop('fileno_to_outq')
        self.on_process_alive = kwargs.pop('on_process_alive')
        super().__init__(*args, **kwargs)
        # add our custom message handler
        self.state_handlers[WORKER_UP] = self.on_process_alive

3.4.4 正式進入業務邏輯

子進程使用 sys.exit(self.workloop(pid=pid)) 正式進入循環;

代碼位置:billiard/pool.py

可以看到,使用 req = wait_for_job() 來監聽任務信息,然后運行

具體堆棧為:

workloop, pool.py:351
__call__, pool.py:292
run, process.py:114
_bootstrap, process.py:327
_main, spawn.py:210
spawn_main, spawn.py:165
<frame not available>

具體代碼邏輯如下:

def workloop(self, debug=debug, now=monotonic, pid=None):
    pid = pid or os.getpid()
    put = self.outq.put
    inqW_fd = self.inqW_fd
    synqW_fd = self.synqW_fd
    maxtasks = self.maxtasks
    max_memory_per_child = self.max_memory_per_child or 0
    prepare_result = self.prepare_result

    wait_for_job = self.wait_for_job
    _wait_for_syn = self.wait_for_syn

    def wait_for_syn(jid):
        i = 0
        while 1:
            if i > 60:
                error('!!!WAIT FOR ACK TIMEOUT: job:%r fd:%r!!!',
                      jid, self.synq._reader.fileno(), exc_info=1)
            req = _wait_for_syn()
            if req:
                type_, args = req
                if type_ == NACK:
                    return False
                assert type_ == ACK
                return True
            i += 1

    completed = 0
    try:
        while maxtasks is None or (maxtasks and completed < maxtasks):
            req = wait_for_job()
            if req:
                type_, args_ = req
                assert type_ == TASK
                job, i, fun, args, kwargs = args_
                put((ACK, (job, i, now(), pid, synqW_fd)))
                if _wait_for_syn:
                    confirm = wait_for_syn(job)
                    if not confirm:
                        continue  # received NACK
                try:
                    result = (True, prepare_result(fun(*args, **kwargs)))
                except Exception:
                    result = (False, ExceptionInfo())
                try:
                    put((READY, (job, i, result, inqW_fd)))
                except Exception as exc:
                    _, _, tb = sys.exc_info()
                    try:
                        wrapped = MaybeEncodingError(exc, result[1])
                        einfo = ExceptionInfo((
                            MaybeEncodingError, wrapped, tb,
                        ))
                        put((READY, (job, i, (False, einfo), inqW_fd)))
                    finally:
                        del(tb)
                completed += 1
                if max_memory_per_child > 0:
                    used_kb = mem_rss()
                    if used_kb <= 0:
                        error('worker unable to determine memory usage')
                    if used_kb > 0 and used_kb > max_memory_per_child:
                        warning(MAXMEM_USED_FMT.format(
                            used_kb, max_memory_per_child))
                        return EX_RECYCLE

        if maxtasks:
            return EX_RECYCLE if completed == maxtasks else EX_FAILURE
        return EX_OK
    finally:
        # Before exiting the worker, we want to ensure that that all
        # messages produced by the worker have been consumed by the main
        # process. This prevents the worker being terminated prematurely
        # and messages being lost.
        self._ensure_messages_consumed(completed=completed)

邏輯如下:

                                                                                          +
                                                                parent process            |                child process
                 +-----------------------------+                                          |
                 |        SpawnProcess         |                                          |                                                                     +-----------+
                 |                             |                                          |                                                                     |  Celery   |
                 |                os.getpid()+-----------------+                          |                                                                     |           |
                 |                             |               |                          |                                                                     +-----------+
                 |                   rhandle +---------------+ |                          |                          +---------------+
                 |           Popen             |             | |                          |                          |  spawn_main   |                                  ^
                 |             +     whandle   |             | |                          |           parent_pid     |               |  4    +-------------+            |
                 |             |               |             | |                          |                          |        self+--------> |SpawnProcess |            |
                 +---+-------------------------+             | |                          |       +--------------->  |               |       +------+------+            |
                     |         |                             | |                          |       |                  |          fd   |              |                   |
                     |         |                             | |                          |       |   +----------->  |           ^   |              |                   |
                     |         |  get_command_line           | |                          |       |   | pipe_handle  |           |   |           5  | _bootstrap()      |
                     |         |                             | |                          |       |   |              +---------------+              |                   |
                     |         |                             | |                          |       |   |                          |                  v         +---------+
                     |         v                             | |                          |       |   |                     ^    |                            |
                     |                                       v v                          |       |   |                     |    |      +---------------------------+
python -c 'from billi|rd.spawn import spawn_main; spawn_main(....)' --billiard-fork ...   |       |   |                     |    |      |        Worker       |     |
                     |         +                              + +                         |       |   |                     |    |      |                     |     | <---------+
                     |         |                              | |                         |       |   |                     |    |      |                     +     |           |
                     |         |                              | |                         |       |   |                     |    |      |      _tls.current_app     |           |
                     |         |                              | |                         |       |   |                     |    |      |                           |           |
                     |         |  CreateProcess               | |                         |       |   |                     |    |      +------------+--------------+           |
                     |         |                              | +---------------------------------+   |                     |    |                   |                          |
                     |         |                              +---------------------------------------+                     |    |                   |                          |
                     |         |                                                                                            |    |                   |                          |
                     |         |                 1                               +-----------------+              2         |    |                   |                          |
                     |         +---------------------------------------------->  | windows kernel  |  +---------------------+    |                   |                          |
                     |                                                           +-----------------+                             |                   |                          |
                     |                                                                                                           |                   |                          |
                     |                                                                                                           |                   |                          |
                     +-----------------------------------------------------------------------------------------------------------+                   |                          |
                                   3  reduction.dump(process_obj, to_child)                                                                          |           6    __call__  |
                                                                                          +                                                          |                          |
                                                                                          |                                                          +------------------------->+
                                                                                          |
                                                                                          |
                                                                                          +

手機如下:

至此,子進程啟動完畢,具體如何運行父進程傳來的任務,我們下期進行介紹。

0xFF 參考

Celery 源碼學習(二)多進程模型


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM