OpenStack源碼分析 Neutron源碼分析(一)-----------Restful API篇


原文:https://blog.csdn.net/happyanger6/article/details/54586463

 

首先,先分析WSGI應用的實現。

    由前面的文章http://blog.csdn.net/happyanger6/article/details/54518491可知,WSGI應用的構建過程主要就是通過paste庫的loadapp加載,因此核心就是分析這個過程。我們從neutron-server的起始代碼開始逐步分析。

neutron-server的入口是:

neutron/cmd/eventlet/server/__init__.py:main

 

def main():
    server.boot_server(_main_neutron_server)

 

boot_server在neutron/server/__init__.py中,它主要的功能就是解析命令行指定的配置文件,一般是"--config-file=/etc/neutron/neutron.conf",然后就執行_main_neutron_server。

neutron/cmd/eventlet/server/__init__.py::_main_neutron_serve

 

def _main_neutron_server():
    if cfg.CONF.web_framework == 'legacy':
        wsgi_eventlet.eventlet_wsgi_server()
    else:
        wsgi_pecan.pecan_wsgi_server()
可以看到,接下來根據配置文件中配置的web框架方式,決定如何啟動wsgi_server,傳統的方式是通過eventlet,現在又新加入了pecan方式。默認情況下,還是使用的eventlet方式,因此接着分析eventlet_wsig_server。這並不響應我們分析WSGI應用的代碼,因為這屬於WSGI服務器的部分。

 

neutron/server/wsgi_eventlet.py:

 

def eventlet_wsgi_server():
    neutron_api = service.serve_wsgi(service.NeutronApiService)
    start_api_and_rpc_workers(neutron_api)
這里也能看到,核心功能一部分是WSGI,另一部分就是rpc部分。這里將Netron提供的API功能封裝成了NeutronApiService類。我們來看看serve_wsgi:

 

neutron/service.py:

 

def serve_wsgi(cls):

    try:
        service = cls.create()
        service.start()
    except Exception:
        with excutils.save_and_reraise_exception():
            LOG.exception(_LE('Unrecoverable error: please check log '
                              'for details.'))

    return service
很明顯,這是用NeutronApiService的類方法"create"來創建實例,然后"start"啟動服務。接着分析下NeutronApiService的代碼:

 

neutron/service.py:

 

class NeutronApiService(WsgiService):
    """Class for neutron-api service."""

    @classmethod
    def create(cls, app_name='neutron'):

        # Setup logging early, supplying both the CLI options and the
        # configuration mapping from the config file
        # We only update the conf dict for the verbose and debug
        # flags. Everything else must be set up in the conf file...
        # Log the options used when starting if we're in debug mode...

        config.setup_logging()
        service = cls(app_name)
        return service
可以看到NeutronApiService繼承自"WsgiService",表明其是一個WSGI服務。然后類方法"create"構造了其實例並返回。

 

 

class WsgiService(object):
    """Base class for WSGI based services.

    For each api you define, you must also define these flags:
    :<api>_listen: The address on which to listen
    :<api>_listen_port: The port on which to listen

    """

    def __init__(self, app_name):
        self.app_name = app_name
        self.wsgi_app = None

    def start(self):
        self.wsgi_app = _run_wsgi(self.app_name)

    def wait(self):
        self.wsgi_app.wait()
構造過程很簡單,只是簡單的記錄app_name,這里是"neutron",然后在start函數里真正加載WSGI APP,並運行服務,因此這才是我們分析的開始。

 

 

def _run_wsgi(app_name):
    app = config.load_paste_app(app_name)
    if not app:
        LOG.error(_LE('No known API applications configured.'))
        return
    return run_wsgi_app(app)
load_paste_app從函數名,也可以明白它的作用就是加載paste定義的WSGI應用。

 

neutron/commom/config.py:

 

def load_paste_app(app_name):
    """Builds and returns a WSGI app from a paste config file.

    :param app_name: Name of the application to load
    """
    loader = wsgi.Loader(cfg.CONF)
    app = loader.load_app(app_name)
    return app
wsgi.Loader是從neutron.conf中讀取deploy配置文件的路徑,然后根據指定的配置文件來加載app,默認是"/etc/neutron/api-paste.ini"。然后通過deploy.loadapp來加載app,這個deploy就是PasteDeploy。

 

 

 
          
oslo_service/wsgi.py:
def load_app(self, name):
    """Return the paste URLMap wrapped WSGI application.

    :param name: Name of the application to load.
    :returns: Paste URLMap object wrapping the requested application.
    :raises: PasteAppNotFound

    """
    try:
        LOG.debug("Loading app %(name)s from %(path)s",
                  {'name': name, 'path': self.config_path})
        return deploy.loadapp("config:%s" % self.config_path, name=name)
    except LookupError:
        LOG.exception(_LE("Couldn't lookup app: %s"), name)
        raise PasteAppNotFound(name=name, path=self.config_path)

分析到這里可知,后面app的加載過程就是PasteDeploy的加載過程,有了上篇 http://blog.csdn.net/happyanger6/article/details/54518491文章中的基礎,我們對着源碼來理解:

 

先來看下配置文件"/etc/neutron/api-paste.ini":

[composite:neutron]
use = egg:Paste#urlmap
/: neutronversions
/v2.0: neutronapi_v2_0

[composite:neutronapi_v2_0]
use = call:neutron.auth:pipeline_factory
noauth = cors request_id catch_errors extensions neutronapiapp_v2_0
keystone = cors request_id catch_errors authtoken keystonecontext extensions neutronapiapp_v2_0

[filter:request_id]
paste.filter_factory = oslo_middleware:RequestId.factory

[filter:catch_errors]
paste.filter_factory = oslo_middleware:CatchErrors.factory

[filter:cors]
paste.filter_factory = oslo_middleware.cors:filter_factory
oslo_config_project = neutron

[filter:keystonecontext]
paste.filter_factory = neutron.auth:NeutronKeystoneContext.factory

[filter:authtoken]
paste.filter_factory = keystonemiddleware.auth_token:filter_factory

[filter:extensions]
paste.filter_factory = neutron.api.extensions:plugin_aware_extension_middleware_factory

[app:neutronversions]
paste.app_factory = neutron.api.versions:Versions.factory

[app:neutronapiapp_v2_0]
paste.app_factory = neutron.api.v2.router:APIRouter.factory

首先是一個組合類型的section,這個section表明用Paste.urlmap來構造應用,因此會將對"/"的訪問交給另外一個app[app:nuetronversion],而將對"/v2.0"的訪問交給另外一個組合[composite:neutronapi_v2_0]生成的app。

通過這2個就構造了所有的WSGI應用,其中對"/"的訪問,而通過neutron.api,version:Versions.factory類方法來構造一個對象,然后將請求交於這個對象處理,

具體而言就是交於對象的__call__方法。我們來看下是如何構造的:

neutron/api/versinos.py:

 

class Versions(object):

    @classmethod
    def factory(cls, global_config, **local_config):
        return cls(app=None)
通過factory方法構造一個對象,這個對象就是一個WSGI應用。它就處理對"/"的方法,而根據WSGI規范,會調用這個對象的__call__方法:

 

 

@webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, req):
    """Respond to a request for all Neutron API versions."""
    version_objs = [
        {
            "id": "v2.0",
            "status": "CURRENT",
        },
    ]

    if req.path != '/':
        if self.app:
            return req.get_response(self.app)
        language = req.best_match_language()
        msg = _('Unknown API version specified')
        msg = oslo_i18n.translate(msg, language)
        return webob.exc.HTTPNotFound(explanation=msg)
       ..............

 

可以看到,通過@webob.dec.wsgify裝飾器將__call__封裝成符合WSGI規范的函數,這樣"/"請求最終就是由"__call__"處理的。

這個"/"還比較簡單,復雜的是對"/v2.0"的訪問,這是大部分API的接口,我們看到這個組合段的app是用一個函數來構造的:

[composite:neutronapi_v2_0]
use = call:neutron.auth:pipeline_factory

use = call:...表示后面的是一個可調用對象,用它來構造最終的app.剩余的參數noauth,keystone等會作為參數傳給pipeline_factory。

neutron/auth.py:

 

def pipeline_factory(loader, global_conf, **local_conf):
    """Create a paste pipeline based on the 'auth_strategy' config option."""
    pipeline = local_conf[cfg.CONF.auth_strategy]
    pipeline = pipeline.split()
    filters = [loader.get_filter(n) for n in pipeline[:-1]]
    app = loader.get_app(pipeline[-1])
    filters.reverse()
    for filter in filters:
        app = filter(app)
    return app
先從配置文件neutron.conf中讀取auth策略,默認是"auth_strategy = keystone",因此從api-paste.ini中取到的pipeline為"cors request_id catch_errors authtoken keystonecontext extensions neutronapiapp_v2_0"它們都定義在其它的"filter"或"app" section段中。

 

首先,從pipeline中獲取最后一個app,即為"neutronapiapp_v2_0",從中加載app,然后依次用各個filter處理構造的app,並最終返回最后構造出的WSGI APP.

因此,我們按下面的順序分析即可:

通過app_factory工廠方法來構造app,然后通過不同的filter_factory方法構造不同的filter對象,並將app依次通過filter對象處理。
[app:neutronapiapp_v2_0]
paste.app_factory = neutron.api.v2.router:APIRouter.factory

 

neutron/api/v2/router.py:

 

class APIRouter(base_wsgi.Router):

    @classmethod
    def factory(cls, global_config, **local_config):
        return cls(**local_config)

 

工廠方法構造了一個APIRouter對象作為app返回,因此分析其__init__方法:

 

def __init__(self, **local_config):
    mapper = routes_mapper.Mapper()
    plugin = manager.NeutronManager.get_plugin()
    ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
    ext_mgr.extend_resources("2.0", attributes.RESOURCE_ATTRIBUTE_MAP)

    col_kwargs = dict(collection_actions=COLLECTION_ACTIONS,
                      member_actions=MEMBER_ACTIONS)
    def _map_resource(collection, resource, params, parent=None):
        allow_bulk = cfg.CONF.allow_bulk
        allow_pagination = cfg.CONF.allow_pagination
        allow_sorting = cfg.CONF.allow_sorting
        controller = base.create_resource(
            collection, resource, plugin, params, allow_bulk=allow_bulk,
            parent=parent, allow_pagination=allow_pagination,
            allow_sorting=allow_sorting)
        path_prefix = None
        if parent:
            path_prefix = "/%s/{%s_id}/%s" % (parent['collection_name'],
                                              parent['member_name'],
                                              collection)
        mapper_kwargs = dict(controller=controller,
                             requirements=REQUIREMENTS,
                             path_prefix=path_prefix,
                             **col_kwargs)
        return mapper.collection(collection, resource,
                                 **mapper_kwargs)

    mapper.connect('index', '/', controller=Index(RESOURCES))
    for resource in RESOURCES:
        _map_resource(RESOURCES[resource], resource,
                      attributes.RESOURCE_ATTRIBUTE_MAP.get(
                          RESOURCES[resource], dict()))
        resource_registry.register_resource_by_name(resource)

    for resource in SUB_RESOURCES:
        _map_resource(SUB_RESOURCES[resource]['collection_name'], resource,
                      attributes.RESOURCE_ATTRIBUTE_MAP.get(
                          SUB_RESOURCES[resource]['collection_name'],
                          dict()),
                      SUB_RESOURCES[resource]['parent'])

    # Certain policy checks require that the extensions are loaded
    # and the RESOURCE_ATTRIBUTE_MAP populated before they can be
    # properly initialized. This can only be claimed with certainty
    # once this point in the code has been reached. In the event
    # that the policies have been initialized before this point,
    # calling reset will cause the next policy check to
    # re-initialize with all of the required data in place.
    policy.reset()
    super(APIRouter, self).__init__(mapper)
這個屬於核心API的構造,因此詳細分析一下。

 

 

mapper = routes_mapper.Mapper()

 

首先,是聲明一個routes.Mapper,這個上篇routes分析時講過,用來構造URL和對應controller的映射,方便根據不同的URL路由給不同的controller處理。

 

plugin = manager.NeutronManager.get_plugin()
然后,先構造了一個NeutronManger的單例,這個對象構造的過程中會根據配置加載核心插件,一般就是"Ml2Plugin",然后會加載以下幾個默認的服務插件:

 

 

neutron/plugings/common/constants.py:

DEFAULT_SERVICE_PLUGINS = {
    'auto_allocate': 'auto-allocated-topology',
    'tag': 'tag',
    'timestamp_core': 'timestamp_core',
    'network_ip_availability': 'network-ip-availability'
}
然后是加載擴展插件:

 

 

extensions.PluginAwareExtensionManager.get_instance()
擴展插件的加載會從neutron/extensions目錄下加載所有插件。

 

通過上面2步就加載完了核心插件,服務插件和擴展插件,然后就是構造不同URL的controller。

 

for resource in RESOURCES:
    _map_resource(RESOURCES[resource], resource,
                  attributes.RESOURCE_ATTRIBUTE_MAP.get(
                      RESOURCES[resource], dict()))
依次構造以下幾個URL的controller."/networks","/subnets","/subnetpools","/ports"。
RESOURCES = {'network': 'networks',
             'subnet': 'subnets',
             'subnetpool': 'subnetpools',
             'port': 'ports'}
這個構造過程是通過_map_resource函數完成的,構造時會從配置文件中獲取一些允許進行的操作,如"allow_bulk"
等。

在構造具體的mapper時,會傳遞以下參數:
col_kwargs = dict(collection_actions=COLLECTION_ACTIONS,
                  member_actions=MEMBER_ACTIONS)
COLLECTION_ACTIONS = ['index', 'create']
MEMBER_ACTIONS = ['show', 'update', 'delete']

這些就是可以對URL發起的操作類型,這些操作最終會根據訪問的URL(/networks,ports)轉換為create_network,update_port這些函數交給對應的controller處理。這些后面還會分析。

 

具體的controller是通過base.create_resource生成的,來看下代碼:

neutron/api/v2/base.py:

 

class Controller(object):
    LIST = 'list'
    SHOW = 'show'
    CREATE = 'create'
    UPDATE = 'update'
    DELETE = 'delete'
..........

 

..........

 

def create_resource(collection, resource, plugin, params, allow_bulk=False,
                    member_actions=None, parent=None, allow_pagination=False,
                    allow_sorting=False):
    controller = Controller(plugin, collection, resource, params, allow_bulk,
                            member_actions=member_actions, parent=parent,
                            allow_pagination=allow_pagination,
                            allow_sorting=allow_sorting)
    return wsgi_resource.Resource(controller, FAULT_MAP)
可以看到,所有的Controller都是這個文件中定義的Controller類的實例對象,然后還會再將其調用wsgi_resouce.Resource.

 

neutron/api/v2/resouce.py:

 

def Resource(controller, faults=None, deserializers=None, serializers=None,
             action_status=None):
    """Represents an API entity resource and the associated serialization and
    deserialization logic
    """
    default_deserializers = {'application/json': wsgi.JSONDeserializer()}
    default_serializers = {'application/json': wsgi.JSONDictSerializer()}
    format_types = {'json': 'application/json'}
    action_status = action_status or dict(create=201, delete=204)

    default_deserializers.update(deserializers or {})
    default_serializers.update(serializers or {})

    deserializers = default_deserializers
    serializers = default_serializers
    faults = faults or {}

    @webob.dec.wsgify(RequestClass=Request)
    def resource(request):
        route_args = request.environ.get('wsgiorg.routing_args')
        if route_args:
            args = route_args[1].copy()
        else:
            args = {}

        # NOTE(jkoelker) by now the controller is already found, remove
        #                it from the args if it is in the matchdict
        args.pop('controller', None)
        fmt = args.pop('format', None)
        action = args.pop('action', None)
        content_type = format_types.get(fmt,
                                        request.best_match_content_type())
        language = request.best_match_language()
        deserializer = deserializers.get(content_type)
        serializer = serializers.get(content_type)

        try:
            if request.body:
                args['body'] = deserializer.deserialize(request.body)['body']

            method = getattr(controller, action)

            result = method(request=request, **args)
        except (exceptions.NeutronException,
                netaddr.AddrFormatError,
                oslo_policy.PolicyNotAuthorized) as e:
            for fault in faults:
                if isinstance(e, fault):
                    mapped_exc = faults[fault]
                    break
            else:
                mapped_exc = webob.exc.HTTPInternalServerError
            if 400 <= mapped_exc.code < 500:
                LOG.info(_LI('%(action)s failed (client error): %(exc)s'),
                         {'action': action, 'exc': e})
            else:
                LOG.exception(_LE('%s failed'), action)
            e = translate(e, language)
            body = serializer.serialize(
                {'NeutronError': get_exception_data(e)})
            kwargs = {'body': body, 'content_type': content_type}
            raise mapped_exc(**kwargs)
        except webob.exc.HTTPException as e:
            type_, value, tb = sys.exc_info()
            if hasattr(e, 'code') and 400 <= e.code < 500:
                LOG.info(_LI('%(action)s failed (client error): %(exc)s'),
                         {'action': action, 'exc': e})
            else:
                LOG.exception(_LE('%s failed'), action)
            translate(e, language)
            value.body = serializer.serialize(
                {'NeutronError': get_exception_data(e)})
            value.content_type = content_type
            six.reraise(type_, value, tb)
        except NotImplementedError as e:
            e = translate(e, language)
            # NOTE(armando-migliaccio): from a client standpoint
            # it makes sense to receive these errors, because
            # extensions may or may not be implemented by
            # the underlying plugin. So if something goes south,
            # because a plugin does not implement a feature,
            # returning 500 is definitely confusing.
            body = serializer.serialize(
                {'NotImplementedError': get_exception_data(e)})
            kwargs = {'body': body, 'content_type': content_type}
            raise webob.exc.HTTPNotImplemented(**kwargs)
        except Exception:
            # NOTE(jkoelker) Everything else is 500
            LOG.exception(_LE('%s failed'), action)
            # Do not expose details of 500 error to clients.
            msg = _('Request Failed: internal server error while '
                    'processing your request.')
            msg = translate(msg, language)
            body = serializer.serialize(
                {'NeutronError': get_exception_data(
                    webob.exc.HTTPInternalServerError(msg))})
            kwargs = {'body': body, 'content_type': content_type}
            raise webob.exc.HTTPInternalServerError(**kwargs)

        status = action_status.get(action, 200)
        body = serializer.serialize(result)
        # NOTE(jkoelker) Comply with RFC2616 section 9.7
        if status == 204:
            content_type = ''
            body = None

        return webob.Response(request=request, status=status,
                              content_type=content_type,
                              body=body)
    # NOTE(blogan): this is something that is needed for the transition to
    # pecan.  This will allow the pecan code to have a handle on the controller
    # for an extension so it can reuse the code instead of forcing every
    # extension to rewrite the code for use with pecan.
    setattr(resource, 'controller', controller)
    return resource
可以看到,所有的請求都會先交於resouce函數處理,進行反序列化和請求參數的獲取,最終再交給controller處理。

 

 

action = args.pop('action', None)
method = getattr(controller, action)

result = method(request=request, **args)

 

這樣對於"/networks","/subnets","/subnetpools","/ports"都會最終交於controller對應的action函數,以create_network為例:

 

def create(self, request, body=None, **kwargs):
    self._notifier.info(request.context,
                        self._resource + '.create.start',
                        body)
    return self._create(request, body, **kwargs)

@db_api.retry_db_errors
def _create(self, request, body, **kwargs):
       action = self._plugin_handlers[self.CREATE]
_create中會從selc._plugin_handlers里取對應操作映射的action,這個映射是在controller的構造函數里創建的:
self._plugin_handlers = {
    self.LIST: 'get%s_%s' % (parent_part, self._collection),
    self.SHOW: 'get%s_%s' % (parent_part, self._resource)
}
for action in [self.CREATE, self.UPDATE, self.DELETE]:
    self._plugin_handlers[action] = '%s%s_%s' % (action, parent_part,
                                                 self._resource)
self._resource為"network","port"這些RESOUCES,因此create對應的為create_network,create_port。
在_create中最終調用do_create:
obj_creator = getattr(self._plugin, action)
try:
    if emulated:
        return self._emulate_bulk_create(obj_creator, request,
                                         body, parent_id)
    else:
        if self._collection in body:
            # This is weird but fixing it requires changes to the
            # plugin interface
            kwargs.update({self._collection: body})
        else:
            kwargs.update({self._resource: body})
        return obj_creator(request.context, **kwargs)

可以看到會從self._plugin里獲取對應的action,這個_plugin就是核心插件Ml2Plugin,因此所有的核心操作最終都
會交給Ml2Plugin的對應create_network,create_port等方法執行。這樣就明白了所有核心資源的創建刪除等
操作最終都會將給Ml2Plugin的對應方法處理。

 

那么Ml2Plugin插件的處理過程又是如何呢?我們先來看下其構造函數:

 

def __init__(self):
    # First load drivers, then initialize DB, then initialize drivers
    self.type_manager = managers.TypeManager()
    self.extension_manager = managers.ExtensionManager()
    self.mechanism_manager = managers.MechanismManager()
    super(Ml2Plugin, self).__init__()
可以看到它初始化了type_manager,mechanism_manager這2個管理器分別用來管理type和mechanism.其中不同的網絡拓撲類型對應着Type Driver,而網絡實現機制對應着Mechanism Driver。這兩個管理器都是通過stevedor來管理的,這樣就可以向查找標准庫一樣管理Type,Mechanism Driver了。

 

其中Type插件的加載會以'neutron.ml2.type_drivers'作為命名空間,Mechanism插件的加載會以'neutron.ml2.mechanism_drivers"作為命名空間。

這樣實際上Ml2Plugin的不同操作會交給不同的type,mechanism插件處理,這樣的架構十分靈活,比如:

 

def create_network(self, context, network):
    result, mech_context = self._create_network_db(context, network)
    try:
        self.mechanism_manager.create_network_postcommit(mech_context)
創建網絡會交由mechanism_manager處理。

 

 

這樣就是APIRouter構造出的app的全部內容了,對於核心URL會交由resource->Controller-->Ml2Plugin--->Type,Mechanism層層處理。也很方便我們根據需要自己實現不同的Type,Mechanism Driver.

然后就是將這個app交由不同的filter處理,我們繼續看這些filter干了些什么。第一個filter是:

[filter:extensions]
paste.filter_factory = neutron.api.extensions:plugin_aware_extension_middleware_factory

neutron/api/extensions.py:

 

def plugin_aware_extension_middleware_factory(global_config, **local_config):
    """Paste factory."""
    def _factory(app):
        ext_mgr = PluginAwareExtensionManager.get_instance()
        return ExtensionMiddleware(app, ext_mgr=ext_mgr)
    return _factory
可以看到會用ExtensionMiddleware對象對app進行處理,這個處理和APIRouter的__init__函數處理類似,只不過這次是為擴展插件構造URL和Controller.這些擴展插件的Controller是ExtensionController。由於過程類似,就不再詳細展開了,可以自行分析下。這樣通過第一個filter就構造出了擴展插件的WSGI應用。

 

 

第二個filter:

[filter:authtoken]
paste.filter_factory = keystonemiddleware.auth_token:filter_factory

keystonemiddleware/auth_token/__init__.py:

 

def filter_factory(global_conf, **local_conf):
    """Returns a WSGI filter app for use with paste.deploy."""
    conf = global_conf.copy()
    conf.update(local_conf)

    def auth_filter(app):
        return AuthProtocol(app, conf)
    return auth_filter

 

可以看到對app封裝了一個AuthProtocol對象。分析其代碼不難看出其作用是對請求是否通過了認證進行檢查,即是否攜帶合法token。這樣后面的filter的作用也類似,就是對請求進行一些預處理,所有預處理都完成后再交由實際的Controller處理。

 

這樣我們就分析完了整個WSGI應用的構造和處理過程,不難得出下面的處理流程:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM