Openstack服務的啟動


基本所有的openstack服務都依賴 evenlet 完成各種並發任務,它的進程可分為兩類:
1、 WSGIService: 接收和處理 http 請求,依賴eventlet.wsgiwsgi server 處理 http 請求,比如nova-api
2、 Service: 接收和處理 rpc 請求,如 nova-operation
無論是 WSGIService 還是 Service 類型的進程,每當接收到一個請求(http 或 rpc),都會在線程池中分配一個協程處理該請求

一、WSGIService的啟動

下面以nova服務為例。
nova-api 由 nova/cmd/api.py 啟動,它初始化一個 WSGIService(由 service.py 定義) 對象。

def main():
    objects.register_all()
    CONF(sys.argv[1:], project='nova',
         version=version.version_string())
    logging.setup(CONF, "nova")

    rpc.init(CONF)
    launcher = service.get_launcher()
    server = service.WSGIService('osapi_nova')
    launcher.launch_service(server, workers=server.workers)
    launcher.wait()

api中從service層獲取一個啟動器對象,最后將server對象傳入啟動器對象的launch_service方法中,launch_service(server, workers=server.workers)方法定義如下:

class Launcher(object):
    def __init__(self):
        super(Launcher, self).__init__()
        self.launch_service = serve
        self.wait = wait

該方法被引用到serve方法,serve方法定義如下:

def serve(server, workers=None):
    global _launcher
    if _launcher:
        raise RuntimeError(_('serve() can only be called once'))

    _launcher = service.launch(CONF, server, workers=workers)

最終調用了oslo_service/service.py下的launch方法,launch方法定義如下:

def launch(conf, service, workers=1, restart_method='reload'):
    …

    if workers is not None and workers <= 0:
        raise ValueError(_("Number of workers should be positive!"))

    if workers is None or workers == 1:
        launcher = ServiceLauncher(conf, restart_method=restart_method)
    else:
        launcher = ProcessLauncher(conf, restart_method=restart_method)
    launcher.launch_service(service, workers=workers)

可以看到這里使用到了兩種啟動器,在進一步講解啟動的過程中先介紹下openstack中的啟動器

二、Openstack中的Launcher

Openstack中有一個叫Launcher的概念,即專門用來啟動服務的,這個類被放在了oslo_service這個包里面,Launcher分為兩種:
一種是ServiceLauncher
另一種為ProcessLauncher
ServiceLauncher用來啟動單進程的服務;
而ProcessLauncher用來啟動有多個worker子進程的服務,如各類api服務(nova-api、cinder-api)等

oslo_service/service.py

1、ServiceLauncher

ServiceLauncher繼承自Launcher,啟動服務的一個重要成員就是launcher_service,ServiceLauncher的該成員就是繼承於Launcher

def launch_service(self, service, workers=1):
 	…
    if workers is not None and workers != 1:
        raise ValueError(_("Launcher asked to start multiple workers"))
    _check_service_base(service)
    service.backdoor_port = self.backdoor_port
    self.services.add(service)

aucher_service就是將服務添加到self.services成員里面,services成員的類型是class Services,看看它的add方法

class Services(object):

    def __init__(self):
        self.services = []
        self.tg = threadgroup.ThreadGroup()
        self.done = event.Event()

    def add(self, service):
        """Add a service to a list and create a thread to run it.

        :param service: service to run
        """
        self.services.append(service)
        self.tg.add_thread(self.run_service, service, self.done)

Services這個類的初始化很簡單,即創建一個ThreadGroup,ThreadGroup其實是eventlet的GreenPool,Openstack利用eventlet實現並發,add方法,將self.run_service這個方法放入pool中,而service就是它的參數。run_service方法很簡單,就是調用service的start方法,這樣就完成了服務的啟動

2、ProcessLauncher

ProcessLauncher直接繼承於Object,同樣也有launch_service方法

def launch_service(self, service, workers=1):
 	…
    _check_service_base(service)
    wrap = ServiceWrapper(service, workers)

    LOG.info('Starting %d workers', wrap.workers)
    while self.running and len(wrap.children) < wrap.workers:
        self._start_child(wrap)

lauch_service除了接受service以外,還需要接受一個workers參數,即子進程的個數,然后調用_start_child啟動多個子進程

def _start_child(self, wrap):
    if len(wrap.forktimes) > wrap.workers:
        # Limit ourselves to one process a second (over the period of
        # number of workers * 1 second). This will allow workers to
        # start up quickly but ensure we don't fork off children that
        # die instantly too quickly.
        if time.time() - wrap.forktimes[0] < wrap.workers:
            LOG.info('Forking too fast, sleeping')
            time.sleep(1)

        wrap.forktimes.pop(0)

    wrap.forktimes.append(time.time())

    pid = os.fork()
    if pid == 0:
        self.launcher = self._child_process(wrap.service)
        while True:
            self._child_process_handle_signal()
            status, signo = self._child_wait_for_exit_or_signal(
                self.launcher)
            if not _is_sighup_and_daemon(signo):
                self.launcher.wait()
                break
            self.launcher.restart()

        os._exit(status)

    LOG.debug('Started child %d', pid)

    wrap.children.add(pid)
    self.children[pid] = wrap

看見熟悉的fork沒有,只是簡單的調用了一個os.fork(),然后子進程開始運行,子進程調用_child_process

def _child_process(self, service):
    self._child_process_handle_signal()

    # Reopen the eventlet hub to make sure we don't share an epoll
    # fd with parent and/or siblings, which would be bad
    eventlet.hubs.use_hub()

    # Close write to ensure only parent has it open
    os.close(self.writepipe)
    # Create greenthread to watch for parent to close pipe
    eventlet.spawn_n(self._pipe_watcher)

    # Reseed random number generator
    random.seed()

    launcher = Launcher(self.conf, restart_method=self.restart_method)
    launcher.launch_service(service)
    return launcher

_child_process其實很簡單,創建一個Launcher,調用Laucher.launch_service方法,前面介紹過,其實ServiceLauncher繼承自Launcher,也是調用的launcher_service方法,將服務啟動,因此接下來的步驟可以參考前面,最終都將調用service.start方法啟動服務

三、WSGIService的啟動—續

回到前面的啟動部分,從launcher節的說明,我們知道服務的啟動最終調用了service的start方法,而這里的service就是我們最開始在api.py中創建的service,然后一層層傳進后面的啟動器中的,我們繼續回到WSGIService類中的start(self)方法

def start(self):
    …
    if self.manager:
        self.manager.init_host()
    self.server.start()
    self.port = self.server.port

這里調用了oslo_service/wsgi.py中的start(self)方法

def start(self):
    …
    self.dup_socket = self.socket.dup()

    if self._use_ssl:
        self.dup_socket = sslutils.wrap(self.conf, self.dup_socket)

    wsgi_kwargs = {
        'func': eventlet.wsgi.server,
        'sock': self.dup_socket,
        'site': self.app,
        'protocol': self._protocol,
        'custom_pool': self._pool,
        'log': self._logger,
        'log_format': self.conf.wsgi_log_format,
        'debug': False,
        'keepalive': self.conf.wsgi_keep_alive,
        'socket_timeout': self.client_socket_timeout
        }

    if self._max_url_len:
        wsgi_kwargs['url_length_limit'] = self._max_url_len

    self._server = eventlet.spawn(**wsgi_kwargs)

注意 wsgi_kwargs 中的參數 func,它的值為 eventlet.wsgi.server,在 eventlet/wsgi.py 的定義如下:

def server(sock, site,
    …
     try:
        serv.log.info("(%s) wsgi starting up on %s" % (
            serv.pid, socket_repr(sock)))
        while is_accepting:
            try:
                client_socket = sock.accept()
                client_socket[0].settimeout(serv.socket_timeout)
                serv.log.debug("(%s) accepted %r" % (
                    serv.pid, client_socket[1]))
                try:
                    pool.spawn_n(serv.process_request, client_socket)
                except AttributeError:
                    warnings.warn("wsgi's pool should be an instance of "
                                  "eventlet.greenpool.GreenPool, is %s. Please convert your"
                                  " call site to use GreenPool instead" % type(pool),
                                  DeprecationWarning, stacklevel=2)
                    pool.execute_async(serv.process_request, client_socket)
            except ACCEPT_EXCEPTIONS as e:
                if support.get_errno(e) not in ACCEPT_ERRNO:
                    raise
            except (KeyboardInterrupt, SystemExit):
                serv.log.info("wsgi exiting")
                break
    finally:
        pool.waitall()
        …

看,是不是看到熟悉的一幕了!sock.accept() 監聽請求,每當接收到一個新請求,調用 pool.spawn_n() 啟動一個協程處理該請求

四、Service的啟動

Service 類型的進程同樣由 nova/cmd/* 目錄下某些文件創建:

  • nova-schedule: nova/cmd/schedule.py
  • ……
    作為消息中間件的消費者,它們監聽各自的 queue,每當有 rpc 請求來臨時,它們創建一個新的協程處理 rpc 請求。以nova-schedule為例,啟動時初始化一個 Server(由 service.py 定義) 對象。
    整個Launcher過程跟WSGIServer一樣,只是service的start()有些區別而已
def start(self):
    …
    target = messaging.Target(topic=self.topic, server=self.host)
    endpoints = [self.manager]
    endpoints.extend(self.manager.additional_endpoints)
    serializer = objects_base.KarborObjectSerializer()
    self.rpcserver = rpc.get_server(target, endpoints, serializer)
    self.rpcserver.start()

經過層層調用,最終生成了這樣一個RPCServer對象

class RPCServer(msg_server.MessageHandlingServer):
    def __init__(self, transport, target, dispatcher, executor='blocking'):
        super(RPCServer, self).__init__(transport, dispatcher, executor)
        self._target = target

該類繼承自MessageHandlingServer
注:nova 的各個組件都依賴 oslo.messaging 訪問消息服務器,通過 oslo/messaging/server.py 初始化一個 MessageHandlingServer 的對象,監聽消息隊列。
最終調用了該service的start方法

def start(self, override_pool_size=None):
    … 
    if self._started:
        LOG.warning(_LW('Restarting a MessageHandlingServer is inherently '
                        'racy. It is deprecated, and will become a noop '
                        'in a future release of oslo.messaging. If you '
                        'need to restart MessageHandlingServer you should '
                        'instantiate a new object.'))
    self._started = True

    executor_opts = {}

    if self.executor_type in ("threading", "eventlet"):
        executor_opts["max_workers"] = (
            override_pool_size or self.conf.executor_thread_pool_size
        )
    self._work_executor = self._executor_cls(**executor_opts)

    try:
        self.listener = self._create_listener()
    except driver_base.TransportDriverError as ex:
        raise ServerListenError(self.target, ex)

    # HACK(sileht): We temporary pass the executor to the rabbit
    # listener to fix a race with the deprecated blocking executor.
    # We do this hack because this is need only for 'synchronous'
    # executor like blocking. And this one is deprecated. Making
    # driver working in an sync and an async way is complicated
    # and blocking have 0% tests coverage.
    if hasattr(self.listener, '_poll_style_listener'):
        l = self.listener._poll_style_listener
        if hasattr(l, "_message_operations_handler"):
            l._message_operations_handler._executor = (
                self.executor_type)

    self.listener.start(self._on_incoming)

上述的對象又初始化一個 EventletExecutor(由 oslo/messaging/_executors/impl_eventlet.py) 類型的 excuete 對象,它調用 self.listener.poll() 監聽 rpc 請求,每當接收到一個請求,創建一個協程處理該請求。

class EventletExecutor(base.ExecutorBase):
    ......

    def start(self):
        if self._thread is not None:
            return

        @excutils.forever_retry_uncaught_exceptions
        def _executor_thread():
            try:
                while True:
                    incoming = self.listener.poll()
                    spawn_with(ctxt=self.dispatcher(incoming),
                               pool=self._greenpool)
            except greenlet.GreenletExit:
                return

        self._thread = eventlet.spawn(_executor_thread)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM