目錄
文章目錄
Neutron 的軟件架構分析與實現
Neutron 的軟件架構並不復雜,我們嘗試通過三張圖來講述。
第一張:Neutron 是一個主從分布式應用程序,具有多個服務進程(e.g. neutron-server.service、l3_agent.service etc.),采用異步通訊模式。分為擔任中央控制(接收北向 API 請求,控制邏輯,下達任務)的 Neutron Server 和擔任地方執行者(執行任務,反饋結果)的 Agents,兩者之間互為生產者/消費者模型,通過消息隊列(MQ)進行通訊。
第二張:為了對接多樣化的底層物理/虛擬網元支撐,Neutron Server 實現了 Plugins(插件)機制來使用這些網元功能支撐上層邏輯。所以可以將 Neutron Server 進一步細分為:
- 接收北向 RESTful API 請求的 API 層
- 對接不同廠商網元支撐的 Plugin 層
第三張:Neutron 為了兼顧優秀的穩定性(默認網元支撐的核心資源模型功能子集)和可擴展性(多樣化網元支撐的擴展資源模型功能集),Neutron Server 進一步將 API 層細分為:核心 API(Core API)和擴展 API(Extension API);將 Plugin 層細分為:核心插件(Core Plugins)和服務插件(又稱擴展插件,Service Plugins);不同的網絡供應商可以根據自己的需求對 Neutron 的功能集進行擴展,但如果不擴展的話,Neutron 自身就可以提供一套完整的解決方案。這就是 Neutron 引入 Core & Plugin 架構理念的關鍵。
簡而言之,Neutron 的軟件架構並不算太過特立獨行,秉承了 OpenStack 項目的一貫設計風格,具有以下特點:
- 分布式 — 多服務進程
- RESTful API — 統一北向接口
- Plugin — 底層異構兼容
- 異步消息隊列 — MQ
- Agents — Workers
Neutron Server 啟動流程
NOTE:下文中的代碼均來自 OpenStack Rocky 版本。
neutron-server — Accepts and routes API requests to the appropriate OpenStack Networking plug-in for action.
Neutron Server 對應的服務進程是 neutron-server.service,包含了 Web Server、Plugins(Core Plugins、Extension Plugis)、RCP Client/Server、DB ORM 等功能模塊。
neutron-server.service 啟動命令:
neutron-server --config-file /etc/neutron/neutron.conf --config-file /etc/neutron/api-paste.ini
慣例我們從 neutron-server.service 服務進程的啟動腳本開始看。
# /opt/stack/neutron/setup.cfg
[entry_points]
...
console_scripts =
...
neutron-server = neutron.cmd.eventlet.server:main
找到程序入口函數:
# /opt/stack/neutron/neutron/cmd/eventlet/server/__init__.py
def main():
server.boot_server(wsgi_eventlet.eventlet_wsgi_server)
# /opt/stack/neutron/neutron/server/wsgi_eventlet.py
def eventlet_wsgi_server():
# 獲取 WSGI Application
neutron_api = service.serve_wsgi(service.NeutronApiService)
# 啟動 API 和 RPC 服務
start_api_and_rpc_workers(neutron_api)
大致梳理 neutron-server.service 的啟動流程其實非常簡單:
- 初始化配置(加載、解析配置文件)
- 獲取 WSGI Application
- 啟動 API 和 RPC 服務
NOTE:其中,第一步初始化配置文件就是應用 oslo.config 庫加載 neutron.conf 文件的並解析其中的內容,關於 oslo.config 庫的內容在《OpenStack 實現技術分解 (7) 通用庫 — oslo_config》中已經提到,這里不再贅述。我們主要關注第二、第三步。
獲取 WSGI Application
WSGI Application 屬於 Neutron Web Server 功能模塊,Python Web Server 通常都會采用 WSGI 協議,將 Web Server 划分為 WSGI Server、WSGI Middleware、WSGI Application。
NOTE:關於 WSGI 協議的內容在《Python Web 開發規范 — WSGI》一文中已經介紹過,這里不再贅述。
獲取 WSGI Application 的關鍵代碼如下:
# /opt/stack/neutron/neutron/server/wsgi_eventlet.py
neutron_api = service.serve_wsgi(service.NeutronApiService)
# /opt/stack/neutron/neutron/common/config.py
def load_paste_app(app_name):
"""Builds and returns a WSGI app from a paste config file. :param app_name: Name of the application to load """
# oslo_service 的封裝函數,解析 api-paste.ini 文件並加載其定義的 Apps 實例
loader = wsgi.Loader(cfg.CONF)
# Log the values of registered opts
if cfg.CONF.debug:
cfg.CONF.log_opt_values(LOG, logging.DEBUG)
# 實參 app_name 為 `neutron`
app = loader.load_app(app_name)
return app
相關日志:
DEBUG oslo.service.wsgi [-] Loading app neutron from /etc/neutron/api-paste.ini
其中 api-paste.ini 是 Paste 庫的配置文件,關於 Paste 的內容在《Openstack Restful API 開發框架 Paste + PasteDeploy + Routes + WebOb》中已經提到,這里不再贅述。配置內容如下:
# /etc/neutron/api-paste.ini
[composite:neutronapi_v2_0]
use = call:neutron.auth:pipeline_factory
noauth = cors http_proxy_to_wsgi request_id catch_errors osprofiler extensions neutronapiapp_v2_0
keystone = cors http_proxy_to_wsgi request_id catch_errors osprofiler authtoken keystonecontext extensions neutronapiapp_v2_0
[filter:extensions]
paste.filter_factory = neutron.api.extensions:plugin_aware_extension_middleware_factory
[app:neutronapiapp_v2_0]
paste.app_factory = neutron.api.v2.router:APIRouter.factory
經過 Paste 庫的一系列處理之后,程序流進入 pipeline_factory
function。
# /opt/stack/neutron/neutron/auth.py
def pipeline_factory(loader, global_conf, **local_conf):
"""Create a paste pipeline based on the 'auth_strategy' config option."""
# 通過配置項 auth_strategy 指定是否需要啟用 Keystone 鑒權服務
pipeline = local_conf[cfg.CONF.auth_strategy]
pipeline = pipeline.split()
# 加載所有 WSGI Middleware 的 filter
filters = [loader.get_filter(n) for n in pipeline[:-1]]
# 加載 WSGI Application,傳入的實參為 neutronapiapp_v2_0
app = loader.get_app(pipeline[-1])
filters.reverse()
# 讓 WSGI Application 逆序的通過(執行)所有 WSGI Middleware filters
for filter in filters:
app = filter(app)
return app
# /opt/stack/neutron/neutron/api/v2/router.py
def _factory(global_config, **local_config):
return pecan_app.v2_factory(global_config, **local_config)
/opt/stack/neutron/neutron/pecan_wsgi/app.py
def v2_factory(global_config, **local_config):
# Processing Order:
# As request enters lower priority called before higher.
# Response from controller is passed from higher priority to lower.
app_hooks = [
hooks.UserFilterHook(), # priority 90
hooks.ContextHook(), # priority 95
hooks.ExceptionTranslationHook(), # priority 100
hooks.BodyValidationHook(), # priority 120
hooks.OwnershipValidationHook(), # priority 125
hooks.QuotaEnforcementHook(), # priority 130
hooks.NotifierHook(), # priority 135
hooks.QueryParametersHook(), # priority 139
hooks.PolicyHook(), # priority 140
]
# REST API 的根 "/" 控制器是 root.V2Controller
app = pecan.make_app(root.V2Controller(),
debug=False,
force_canonical=False,
hooks=app_hooks,
guess_content_type_from_ext=True)
# 初始化 Neutron Server
startup.initialize_all()
return app
自此,得到了一個最終的 WSGI Application,我們也找到了 API Request 的 “/”。從代碼可以看出 Neutron 當前使用的 Web 框架是 Pecan(A WSGI object-dispatching web framework, designed to be lean and fast with few dependencies.),不再是舊版本的 PPRW(Paste + PasteDeploy + Routes + WebOb)。Pecan 的「對象分發式路由」設計,讓 WSGI Application 的路由映射、視圖函數實現變得更加簡單,而不再像 PPRW 那樣需要編寫大量與實際業務無關的代碼,Pecan 是現在大多數 OpenStack 項目首選的 Web 框架。
Core API & Extension API
Neutron 的根控制器 root.V2Controller()
提供了打印所有 Core API、Extension API Refs(資源模型應用)的接口。
獲取 Core API refs:
[root@localhost ~]# curl -i "http://172.18.22.200:9696/v2.0/" \
> -X GET \
> -H 'Content-type: application/json' \
> -H 'Accept: application/json' \
> -H "X-Auth-Project-Id: admin" \
> -H 'X-Auth-Token:gAAAAABchg8IRf8aMdYbm-7-vPJsFCoSecCJz9GZcPgrS0UirgSpbxIaF1f5duFsrkwRePBP6duTmVhV3GSIrHLqZ3RT21GQ1oDipTwCe8RktCnkEg5kXrUuQfAXmvjltRm5_0w5XbltJahVY0c3QXlrpP9G-IBdBWI7mpvyoP6h0x94000Ux20'
HTTP/1.1 200 OK
Content-Length: 516
Content-Type: application/json
X-Openstack-Request-Id: req-7c8aa1e6-1a18-433e-8ff5-95e59028cce5
Date: Mon, 11 Mar 2019 07:36:17 GMT
{
"resources": [{
"links": [{
"href": "http://172.18.22.200:9696/v2.0/subnets",
"rel": "self"
}],
"name": "subnet",
"collection": "subnets"
}, {
"links": [{
"href": "http://172.18.22.200:9696/v2.0/subnetpools",
"rel": "self"
}],
"name": "subnetpool",
"collection": "subnetpools"
}, {
"links": [{
"href": "http://172.18.22.200:9696/v2.0/networks",
"rel": "self"
}],
"name": "network",
"collection": "networks"
}, {
"links": [{
"href": "http://172.18.22.200:9696/v2.0/ports",
"rel": "self"
}],
"name": "port",
"collection": "ports"
}]
}
獲取 Extension API refs:
[root@localhost ~]# curl -i "http://172.18.22.200:9696/v2.0/extensions/" \
> -X GET \
> -H 'Content-type: application/json' \
> -H 'Accept: application/json' \
> -H "X-Auth-Project-Id: admin" \
> -H 'X-Auth-Token:gAAAAABchg8IRf8aMdYbm-7-vPJsFCoSecCJz9GZcPgrS0UirgSpbxIaF1f5duFsrkwRePBP6duTmVhV3GSIrHLqZ3RT21GQ1oDipTwCe8RktCnkEg5kXrUuQfAXmvjltRm5_0w5XbltJahVY0c3QXlrpP9G-IBdBWI7mpvyoP6h0x94000Ux20'
HTTP/1.1 200 OK
Content-Length: 9909
Content-Type: application/json
X-Openstack-Request-Id: req-4dad9963-57c2-4b3e-a4d5-bc6fea5e78e8
Date: Mon, 11 Mar 2019 07:37:25 GMT
{
"extensions": [{
"alias": "default-subnetpools",
"updated": "2016-02-18T18:00:00-00:00",
"name": "Default Subnetpools",
"links": [],
"description": "Provides ability to mark and use a subnetpool as the default."
}, {
"alias": "availability_zone",
"updated": "2015-01-01T10:00:00-00:00",
"name": "Availability Zone",
"links": [],
"description": "The availability zone extension."
}, {
"alias": "network_availability_zone",
"updated": "2015-01-01T10:00:00-00:00",
"name": "Network Availability Zone",
"links": [],
"description": "Availability zone support for network."
}, {
"alias": "auto-allocated-topology",
"updated": "2016-01-01T00:00:00-00:00",
"name": "Auto Allocated Topology Services",
"links": [],
"description": "Auto Allocated Topology Services."
}, {
"alias": "ext-gw-mode",
"updated": "2013-03-28T10:00:00-00:00",
"name": "Neutron L3 Configurable external gateway mode",
"links": [],
"description": "Extension of the router abstraction for specifying whether SNAT should occur on the external gateway"
}, {
"alias": "binding",
"updated": "2014-02-03T10:00:00-00:00",
"name": "Port Binding",
"links": [],
"description": "Expose port bindings of a virtual port to external application"
}, {
"alias": "agent",
"updated": "2013-02-03T10:00:00-00:00",
"name": "agent",
"links": [],
"description": "The agent management extension."
}, {
"alias": "subnet_allocation",
"updated": "2015-03-30T10:00:00-00:00",
"name": "Subnet Allocation",
"links": [],
"description": "Enables allocation of subnets from a subnet pool"
}, {
"alias": "dhcp_agent_scheduler",
"updated": "2013-02-07T10:00:00-00:00",
"name": "DHCP Agent Scheduler",
"links": [],
"description": "Schedule networks among dhcp agents"
}, {
"alias": "external-net",
"updated": "2013-01-14T10:00:00-00:00",
"name": "Neutron external network",
"links": [],
"description": "Adds external network attribute to network resource."
}, {
"alias": "standard-attr-tag",
"updated": "2017-01-01T00:00:00-00:00",
"name": "Tag support for resources with standard attribute: subnet, trunk, router, network, policy, subnetpool, port, security_group, floatingip",
"links": [],
"description": "Enables to set tag on resources with standard attribute."
}, {
"alias": "flavors",
"updated": "2015-09-17T10:00:00-00:00",
"name": "Neutron Service Flavors",
"links": [],
"description": "Flavor specification for Neutron advanced services."
}, {
"alias": "net-mtu",
"updated": "2015-03-25T10:00:00-00:00",
"name": "Network MTU",
"links": [],
"description": "Provides MTU attribute for a network resource."
}, {
"alias": "network-ip-availability",
"updated": "2015-09-24T00:00:00-00:00",
"name": "Network IP Availability",
"links": [],
"description": "Provides IP availability data for each network and subnet."
}, {
"alias": "quotas",
"updated": "2012-07-29T10:00:00-00:00",
"name": "Quota management support",
"links": [],
"description": "Expose functions for quotas management per tenant"
}, {
"alias": "revision-if-match",
"updated": "2016-12-11T00:00:00-00:00",
"name": "If-Match constraints based on revision_number",
"links": [],
"description": "Extension indicating that If-Match based on revision_number is supported."
}, {
"alias": "l3-port-ip-change-not-allowed",
"updated": "2018-10-09T10:00:00-00:00",
"name": "Prevent L3 router ports IP address change extension",
"links": [],
"description": "Prevent change of IP address for some L3 router ports"
}, {
"alias": "availability_zone_filter",
"updated": "2018-06-22T10:00:00-00:00",
"name": "Availability Zone Filter Extension",
"links": [],
"description": "Add filter parameters to AvailabilityZone resource"
}, {
"alias": "l3-ha",
"updated": "2014-04-26T00:00:00-00:00",
"name": "HA Router extension",
"links": [],
"description": "Adds HA capability to routers."
}, {
"alias": "filter-validation",
"updated": "2018-03-21T10:00:00-00:00",
"name": "Filter parameters validation",
"links": [],
"description": "Provides validation on filter parameters."
}, {
"alias": "multi-provider",
"updated": "2013-06-27T10:00:00-00:00",
"name": "Multi Provider Network",
"links": [],
"description": "Expose mapping of virtual networks to multiple physical networks"
}, {
"alias": "quota_details",
"updated": "2017-02-10T10:00:00-00:00",
"name": "Quota details management support",
"links": [],
"description": "Expose functions for quotas usage statistics per project"
}, {
"alias": "address-scope",
"updated": "2015-07-26T10:00:00-00:00",
"name": "Address scope",
"links": [],
"description": "Address scopes extension."
}, {
"alias": "extraroute",
"updated": "2013-02-01T10:00:00-00:00",
"name": "Neutron Extra Route",
"links": [],
"description": "Extra routes configuration for L3 router"
}, {
"alias": "net-mtu-writable",
"updated": "2017-07-12T00:00:00-00:00",
"name": "Network MTU (writable)",
"links": [],
"description": "Provides a writable MTU attribute for a network resource."
}, {
"alias": "empty-string-filtering",
"updated": "2018-05-01T10:00:00-00:00",
"name": "Empty String Filtering Extension",
"links": [],
"description": "Allow filtering by attributes with empty string value"
}, {
"alias": "subnet-service-types",
"updated": "2016-03-15T18:00:00-00:00",
"name": "Subnet service types",
"links": [],
"description": "Provides ability to set the subnet service_types field"
}, {
"alias": "floatingip-pools",
"updated": "2018-03-21T10:00:00-00:00",
"name": "Floating IP Pools Extension",
"links": [],
"description": "Provides a floating IP pools API."
}, {
"alias": "port-mac-address-regenerate",
"updated": "2018-05-03T10:00:00-00:00",
"name": "Neutron Port MAC address regenerate",
"links": [],
"description": "Network port MAC address regenerate"
}, {
"alias": "standard-attr-timestamp",
"updated": "2016-09-12T10:00:00-00:00",
"name": "Resource timestamps",
"links": [],
"description": "Adds created_at and updated_at fields to all Neutron resources that have Neutron standard attributes."
}, {
"alias": "provider",
"updated": "2012-09-07T10:00:00-00:00",
"name": "Provider Network",
"links": [],
"description": "Expose mapping of virtual networks to physical networks"
}, {
"alias": "service-type",
"updated": "2013-01-20T00:00:00-00:00",
"name": "Neutron Service Type Management",
"links": [],
"description": "API for retrieving service providers for Neutron advanced services"
}, {
"alias": "l3-flavors",
"updated": "2016-05-17T00:00:00-00:00",
"name": "Router Flavor Extension",
"links": [],
"description": "Flavor support for routers."
}, {
"alias": "port-security",
"updated": "2012-07-23T10:00:00-00:00",
"name": "Port Security",
"links": [],
"description": "Provides port security"
}, {
"alias": "extra_dhcp_opt",
"updated": "2013-03-17T12:00:00-00:00",
"name": "Neutron Extra DHCP options",
"links": [],
"description": "Extra options configuration for DHCP. For example PXE boot options to DHCP clients can be specified (e.g. tftp-server, server-ip-address, bootfile-name)"
}, {
"alias": "port-security-groups-filtering",
"updated": "2018-01-09T09:00:00-00:00",
"name": "Port filtering on security groups",
"links": [],
"description": "Provides security groups filtering when listing ports"
}, {
"alias": "standard-attr-revisions",
"updated": "2016-04-11T10:00:00-00:00",
"name": "Resource revision numbers",
"links": [],
"description": "This extension will display the revision number of neutron resources."
}, {
"alias": "pagination",
"updated": "2016-06-12T00:00:00-00:00",
"name": "Pagination support",
"links": [],
"description": "Extension that indicates that pagination is enabled."
}, {
"alias": "sorting",
"updated": "2016-06-12T00:00:00-00:00",
"name": "Sorting support",
"links": [],
"description": "Extension that indicates that sorting is enabled."
}, {
"alias": "security-group",
"updated": "2012-10-05T10:00:00-00:00",
"name": "security-group",
"links": [],
"description": "The security groups extension."
}, {
"alias": "l3_agent_scheduler",
"updated": "2013-02-07T10:00:00-00:00",
"name": "L3 Agent Scheduler",
"links": [],
"description": "Schedule routers among l3 agents"
}, {
"alias": "fip-port-details",
"updated": "2018-04-09T10:00:00-00:00",
"name": "Floating IP Port Details Extension",
"links": [],
"description": "Add port_details attribute to Floating IP resource"
}, {
"alias": "router_availability_zone",
"updated": "2015-01-01T10:00:00-00:00",
"name": "Router Availability Zone",
"links": [],
"description": "Availability zone support for router."
}, {
"alias": "rbac-policies",
"updated": "2015-06-17T12:15:12-00:00",
"name": "RBAC Policies",
"links": [],
"description": "Allows creation and modification of policies that control tenant access to resources."
}, {
"alias": "standard-attr-description",
"updated": "2016-02-10T10:00:00-00:00",
"name": "standard-attr-description",
"links": [],
"description": "Extension to add descriptions to standard attributes"
}, {
"alias": "ip-substring-filtering",
"updated": "2017-11-28T09:00:00-00:00",
"name": "IP address substring filtering",
"links": [],
"description": "Provides IP address substring filtering when listing ports"
}, {
"alias": "router",
"updated": "2012-07-20T10:00:00-00:00",
"name": "Neutron L3 Router",
"links": [],
"description": "Router abstraction for basic L3 forwarding between L2 Neutron networks and access to external networks via a NAT gateway."
}, {
"alias": "allowed-address-pairs",
"updated": "2013-07-23T10:00:00-00:00",
"name": "Allowed Address Pairs",
"links": [],
"description": "Provides allowed address pairs"
}, {
"alias": "binding-extended",
"updated": "2017-07-17T10:00:00-00:00",
"name": "Port Bindings Extended",
"links": [],
"description": "Expose port bindings of a virtual port to external application"
}, {
"alias": "project-id",
"updated": "2016-09-09T09:09:09-09:09",
"name": "project_id field enabled",
"links": [],
"description": "Extension that indicates that project_id field is enabled."
}, {
"alias": "dvr",
"updated": "2014-06-1T10:00:00-00:00",
"name": "Distributed Virtual Router",
"links": [],
"description": "Enables configuration of Distributed Virtual Routers."
}]
}
從這兩個 API 調用可以看出,Neutron 的 Core API Resources 只有與大二層相關的 networks、subnets、subnetpools、ports 四種,其余的均為 Extension API Resources。Core API 是 Neutron 的立身之本,是 Neutron 最小而穩定的核心功能集。 相對的,Extension API 則是 Neutron 的 “生財之道”,優秀的可擴展 API 讓 Neutron 得以擁有良好的開源生態。
如果看了根控制器 V2Controller 的源碼實現或許你會感到疑惑,為什么 V2Controller 只是實現了 ExtensionsController 子控制器,而且 ExtensionsController 實現的功能也只是打印出 Extension API Resources 的清單信息而已。那么,Neutron 官方文檔(Networking API v2.0)提供的這么多資源對象相應的 Controller 到底在什么地方實現的呢?答案就在 startup.initialize_all()
function 里,但是在講述 Controllers 的實現之前,我們得先了解 Neutron Plugins 是如何被加載的,因為 API Resources、Controllers、Plugins 三者之間的關系是息息相關的。
Core Plugins & Service Plugins
與 API 的分類對應,Plugins 也分為 Core Plugin 和 Service(Extension)Plugin,可通過配置項選擇 Plugins 的具體實現。e.g.
[DEFAULT]
...
# The core plugin Neutron will use (string value)
core_plugin = ml2
# The service plugins Neutron will use (list value)
service_plugins = neutron.services.l3_router.l3_router_plugin.L3RouterPlugin
從配置項屬性看出,Core Plugin 只能有一個(默認為 ml2),而 Service Plugins 卻可以同時指定多個。例如:L3RouterPlugin、FWaaSPlugin、LBaaSPlugin、VPNaaSPlugin 等等。
加載 Plugins 的代碼實現如下:
# /opt/stack/neutron/neutron/manager.py
def init():
"""Call to load the plugins (core+services) machinery."""
# 加載所有插件 (core plugin + extension services plugin)
# directory 的含義就是登記 Plugins,實現在 neutron_lib/plugins/directory.py,主要提供 Plugins 清單的維護和管理(e.g. add_plugin、get_plugins、get_unique_plugins、is_loaded)
if not directory.is_loaded():
# 這里 get 的就是 NeutronManager 自己的 instance,實現了單例模式(為了維護統一的一套 Plugins 清單)
NeutronManager.get_instance()
class NeutronManager(object):
"""Neutron's Manager class. Neutron's Manager class is responsible for parsing a config file and instantiating the correct plugin that concretely implements neutron_plugin_base class. """
...
def __init__(self, options=None, config_file=None):
...
# 從配置文件中讀取 Core Plugin,e.g. ml2
plugin_provider = cfg.CONF.core_plugin
LOG.info("Loading core plugin: %s", plugin_provider)
# NOTE(armax): keep hold of the actual plugin object
# 實例化 Core Plugin Class,e.g. ML2Plugin Class
plugin = self._get_plugin_instance(CORE_PLUGINS_NAMESPACE,
plugin_provider)
# 將 Core Plugin 登記到 Plugins Directory
directory.add_plugin(lib_const.CORE, plugin)
...
# load services from the core plugin first
# 首先加載 Core Plugin 本身默認支持的 Extension Plugins
self._load_services_from_core_plugin(plugin)
# 繼續加載通過配置文件額外指定的 Extension Plugins
self._load_service_plugins()
...
def _load_services_from_core_plugin(self, plugin):
"""Puts core plugin in service_plugins for supported services."""
LOG.debug("Loading services supported by the core plugin")
# supported service types are derived from supported extensions
# 來之 Core Plugin 默認支持的 Extension Plugins(e.g. lbaas、fwaas、vpnaas、router、qos)
for ext_alias in getattr(plugin, "supported_extension_aliases", []):
if ext_alias in constants.EXT_TO_SERVICE_MAPPING:
service_type = constants.EXT_TO_SERVICE_MAPPING[ext_alias]
# 將 Extension Plugins 登記到 Plugins Directory
directory.add_plugin(service_type, plugin)
LOG.info("Service %s is supported by the core plugin",
service_type)
def _load_service_plugins(self):
"""Loads service plugins. Starts from the core plugin and checks if it supports advanced services then loads classes provided in configuration. """
# 從配置文件中讀取 Extension Plugins
plugin_providers = cfg.CONF.service_plugins
# 獲取 Core Plugin 使用 Neutron 原生的 Datastore 則會返回一些默認的 Extension Plugins(e.g. tag、timestamp、flavors、revisions)
plugin_providers.extend(self._get_default_service_plugins())
LOG.debug("Loading service plugins: %s", plugin_providers)
for provider in plugin_providers:
if provider == '':
continue
LOG.info("Loading Plugin: %s", provider)
# 實例化 Extension Plugins Class 的對象
plugin_inst = self._get_plugin_instance('neutron.service_plugins',
provider)
# only one implementation of svc_type allowed
# specifying more than one plugin
# for the same type is a fatal exception
# TODO(armax): simplify this by moving the conditional into the
# directory itself.
plugin_type = plugin_inst.get_plugin_type()
if directory.get_plugin(plugin_type):
raise ValueError(_("Multiple plugins for service "
"%s were configured") % plugin_type)
# 將 Extension Plugins 登記到 Plugins Directory
directory.add_plugin(plugin_type, plugin_inst)
# search for possible agent notifiers declared in service plugin
# (needed by agent management extension)
# 獲取 Core Plugin 實例對象
plugin = directory.get_plugin()
if (hasattr(plugin, 'agent_notifiers') and
hasattr(plugin_inst, 'agent_notifiers')):
# 將 Extension Plugins 對應的 Agent notifiers 更新到 Core Plugin 的 Agent notifiers 字典中
plugin.agent_notifiers.update(plugin_inst.agent_notifiers)
# disable incompatible extensions in core plugin if any
utils.disable_extension_by_service_plugin(plugin, plugin_inst)
LOG.debug("Successfully loaded %(type)s plugin. "
"Description: %(desc)s",
{"type": plugin_type,
"desc": plugin_inst.get_plugin_description()})
至此,Neutron 當前支持的 Core、Extension Plugins Class 都被實例化並登記到 Plugins Directory 了。Plugins Directory 是一個重要的工具模塊,在需要獲取 Loaded Plugins 的代碼邏輯中都會看見它的身影。
注冊完 Plugins 之后,會在啟動 neutron-server.service 服務進程的過程中啟動 Plugins 響應的 Workers 進程或協程。
# /opt/stack/neutron/neutron/server/wsgi_eventlet.py
# 在啟動 neutron-server.servce 時執行
def start_api_and_rpc_workers(neutron_api):
try:
# 獲取所有 RPC and Plugins workers 的 Launcher 實例對象
worker_launcher = service.start_all_workers()
# 創建協程池
pool = eventlet.GreenPool()
# 以協程的方式啟動 WSGI Application
api_thread = pool.spawn(neutron_api.wait)
# 以協程的方式啟動 RPC and Plugins workers
plugin_workers_thread = pool.spawn(worker_launcher.wait)
# api and other workers should die together. When one dies,
# kill the other.
api_thread.link(lambda gt: plugin_workers_thread.kill())
plugin_workers_thread.link(lambda gt: api_thread.kill())
pool.waitall()
except NotImplementedError:
LOG.info("RPC was already started in parent process by "
"plugin.")
neutron_api.wait()
# /opt/stack/neutron/neutron/service.py
def _get_rpc_workers():
# 從 Plugins Directory 獲取 Core plugin 實例對象
plugin = directory.get_plugin()
# 從 Plugins Directory 獲取 Service plugins(包括 Core Plugin)清單列表
service_plugins = directory.get_plugins().values()
...
# passing service plugins only, because core plugin is among them
# 創建所有 Plugins(core + service)的 RpcWorker 對象實例
rpc_workers = [RpcWorker(service_plugins,
worker_process_count=cfg.CONF.rpc_workers)]
if (cfg.CONF.rpc_state_report_workers > 0 and
plugin.rpc_state_report_workers_supported()):
rpc_workers.append(
RpcReportsWorker(
[plugin],
worker_process_count=cfg.CONF.rpc_state_report_workers
)
)
return rpc_workers
class RpcWorker(neutron_worker.BaseWorker):
"""Wraps a worker to be handled by ProcessLauncher"""
start_listeners_method = 'start_rpc_listeners'
def __init__(self, plugins, worker_process_count=1):
super(RpcWorker, self).__init__(
worker_process_count=worker_process_count
)
self._plugins = plugins
self._servers = []
def start(self):
super(RpcWorker, self).start()
for plugin in self._plugins:
if hasattr(plugin, self.start_listeners_method):
try:
# 獲取所有 Plugins 的 RPC listeners 方法
servers = getattr(plugin, self.start_listeners_method)()
except NotImplementedError:
continue
self._servers.extend(servers)
...
class RpcReportsWorker(RpcWorker):
start_listeners_method = 'start_rpc_state_reports_listener'
RpcWorker 與 RpcReportsWorker 的區別在於:后者僅僅具有 RPC state 的 Report 功能,前者才是真正的 Neutron 業務邏輯 RPC worker。可以通過配置項 rpc_state_report_workers
來指定是否要開啟 RPC state Report 功能。
至於 RPC workers 啟動的方式與配置項 rpc_workers 和 rpc_state_report_workers 有關。如果這兩個配置項的值(int)小於 1,那么 RPC workers 就會在 neutron-server 進程內以協程的方式啟動;如果大於 1 則會 fork 新的進程,然后在新的進程內以協程的方式啟動。
Core Controller & Extension Controller
Controller 是 Pecan 中一個非常重要的概念,是 WSGI Application 中 URL Routed、View Function、HTTP Method 以及三者間 Mapper 的封裝,可以說是一個 Web 框架的核心對象。更多關於 Controller 的內容請瀏覽 Pecan 官方網站。這里我們主要關注 Neutron 是如何實現 Controller 的。
neutronapiapp_v2_0 Factory Function 的代碼中,除了生成並返回 WSGI Application 對象之外,還執行了 startup.initialize_all()
語句,它所做的事情就是准備好 neutron-server.service 啟動所必須的前提條件。包括:Plugins 的加載、API Resources Controller 的實例化以及處理 API Resources、Controllers、Plugins 三者之間的映射關系。
# /opt/stack/neutron/neutron/pecan_wsgi/startup.py
# Core Resources 清單
RESOURCES = {'network': 'networks',
'subnet': 'subnets',
'subnetpool': 'subnetpools',
'port': 'ports'}
def initialize_all():
# 加載 Plugins,如上文所說
manager.init()
# PluginAwareExtensionManager 做的事情就是從 configured extension path 加載 extensions,並且對 Extension Plugins 提供一些常用管理函數,e.g. add_extension、extend_resources、get_resources
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
# 將 Core Resources 加入到了 Extension Resources 清單
ext_mgr.extend_resources("2.0", attributes.RESOURCES)
# At this stage we have a fully populated resource attribute map;
# build Pecan controllers and routes for all core resources
# 獲取 Core Plugin 實例對象
plugin = directory.get_plugin()
# 循環處理 Core Resources
for resource, collection in RESOURCES.items():
# Keeps track of Neutron resources for which quota limits are enforced.
resource_registry.register_resource_by_name(resource)
# 將 Core Resource、Core Plugin 封裝到 new_controller 實例對象
new_controller = res_ctrl.CollectionsController(collection, resource,
plugin=plugin)
# 將 new_controller 以 resource_name:new_controller 的方式保存到 NeutronManager 實例屬性
manager.NeutronManager.set_controller_for_resource(
collection, new_controller)
# 將 plugin 以 resource_name:plugin 的方式保存到 NeutronManager 實例屬性
manager.NeutronManager.set_plugin_for_resource(collection, plugin)
pecanized_resources = ext_mgr.get_pecan_resources()
for pec_res in pecanized_resources:
manager.NeutronManager.set_controller_for_resource(
pec_res.collection, pec_res.controller)
manager.NeutronManager.set_plugin_for_resource(
pec_res.collection, pec_res.plugin)
# Now build Pecan Controllers and routes for all extensions
# 獲取 Extension Resources 對應的 ResourceExtension
resources = ext_mgr.get_resources()
# Extensions controller is already defined, we don't need it.
resources.pop(0)
# 循環處理 ResourceExtension objects
for ext_res in resources:
path_prefix = ext_res.path_prefix.strip('/')
collection = ext_res.collection
# Retrieving the parent resource. It is expected the format of
# the parent resource to be:
# {'collection_name': 'name-of-collection',
# 'member_name': 'name-of-resource'}
# collection_name does not appear to be used in the legacy code
# inside the controller logic, so we can assume we do not need it.
parent = ext_res.parent or {}
parent_resource = parent.get('member_name')
collection_key = collection
if parent_resource:
collection_key = '/'.join([parent_resource, collection])
collection_actions = ext_res.collection_actions
member_actions = ext_res.member_actions
if manager.NeutronManager.get_controller_for_resource(collection_key):
# This is a collection that already has a pecan controller, we
# do not need to do anything else
continue
legacy_controller = getattr(ext_res.controller, 'controller',
ext_res.controller)
new_controller = None
if isinstance(legacy_controller, base.Controller):
resource = legacy_controller.resource
plugin = legacy_controller.plugin
attr_info = legacy_controller.attr_info
member_actions = legacy_controller.member_actions
pagination = legacy_controller.allow_pagination
sorting = legacy_controller.allow_sorting
# NOTE(blogan): legacy_controller and ext_res both can both have
# member_actions. the member_actions for ext_res are strictly for
# routing, while member_actions for legacy_controller are used for
# handling the request once the routing has found the controller.
# They're always the same so we will just use the ext_res
# member_action.
# 將 Extension Resource、Extension Plugin、原 Extension Controller 的部分屬性重新封裝到 new_controller 實例對象
new_controller = res_ctrl.CollectionsController(
collection, resource, resource_info=attr_info,
parent_resource=parent_resource, member_actions=member_actions,
plugin=plugin, allow_pagination=pagination,
allow_sorting=sorting, collection_actions=collection_actions)
# new_controller.collection has replaced hyphens with underscores
manager.NeutronManager.set_plugin_for_resource(
new_controller.collection, plugin)
if path_prefix:
manager.NeutronManager.add_resource_for_path_prefix(
collection, path_prefix)
else:
new_controller = utils.ShimCollectionsController(
collection, None, legacy_controller,
collection_actions=collection_actions,
member_actions=member_actions,
action_status=ext_res.controller.action_status,
collection_methods=ext_res.collection_methods)
# 將 new_controller 以 resource_name:new_controller 的方式保存到 NeutronManager 實例屬性
manager.NeutronManager.set_controller_for_resource(
collection_key, new_controller)
# Certain policy checks require that the extensions are loaded
# and the RESOURCE_ATTRIBUTE_MAP populated before they can be
# properly initialized. This can only be claimed with certainty
# once this point in the code has been reached. In the event
# that the policies have been initialized before this point,
# calling reset will cause the next policy check to
# re-initialize with all of the required data in place.
policy.reset()
簡而言之,def initialize_all
function 所做的事情就是首先加載所有的 Plugins,然后將 Core Plugins + Core Resources、Extension Plugins + Extension Resources 重新封裝為一個 CollectionsController 實例對象並且統一注冊到 NeutronManger 實例屬性 self.resource_plugin_mappings
、self.resource_controller_mappings
中。
從最初的 Core、Extension API 的分離,再到完成 Core、Extension Controller 的統一,真是一段漫長的路程。
Core API 請求處理
根控制器 V2Controller 沒有顯示的聲明 Core Resources Controller,但其實所有 Core API Request 都在 method def _lookup(self, collection, *remainder):
(注:所有未顯式定義的 URL path 都被路由到 _lookup
method)中得到了處理。
@utils.expose()
def _lookup(self, collection, *remainder):
# if collection exists in the extension to service plugins map then
# we are assuming that collection is the service plugin and
# needs to be remapped.
# Example: https://neutron.endpoint/v2.0/lbaas/loadbalancers
if (remainder and
manager.NeutronManager.get_resources_for_path_prefix(
collection)):
collection = remainder[0]
remainder = remainder[1:]
# collection 的實參為 networks、subnets、subnetpools、ports 等 Core Resources
# 從 NeutronManager 實例對象中獲取 Resource 對應的 Controller 實例對象
controller = manager.NeutronManager.get_controller_for_resource(
collection)
if not controller:
LOG.warning("No controller found for: %s - returning response "
"code 404", collection)
pecan.abort(404)
# Store resource and collection names in pecan request context so that
# hooks can leverage them if necessary. The following code uses
# attributes from the controller instance to ensure names have been
# properly sanitized (eg: replacing dashes with underscores)
request.context['resource'] = controller.resource
request.context['collection'] = controller.collection
# NOTE(blogan): initialize a dict to store the ids of the items walked
# in the path for example: /networks/1234 would cause uri_identifiers
# to contain: {'network_id': '1234'}
# This is for backwards compatibility with legacy extensions that
# defined their own controllers and expected kwargs to be passed in
# with the uri_identifiers
request.context['uri_identifiers'] = {}
return controller, remainder
在 def initialize_all
階段已經把 Core Controllers 都注冊到 NeutronManager 的實例屬性 self.resource_controller_mappings
中了,這里再根據 API Request 的類型(e.g. networks、subnets)從 NeutronManager 的實例屬性中取出。
(Pdb) controller
<neutron.pecan_wsgi.controllers.resource.CollectionsController object at 0x7f0fc2b60e10>
(Pdb) controller.resource
'network'
(Pdb) controller.plugin
<weakproxy at 0x7f0fc2b69cb0 to Ml2Plugin at 0x7f0fc3343fd0>
(Pdb) controller.plugin_lister
<bound method Ml2Plugin.get_networks of <neutron.plugins.ml2.plugin.Ml2Plugin object at 0x7f0fc3343fd0>>
通過打印上述 NetworkController 的實例屬性看出,每種 Resource networks 關聯到了 Core Plugin ml2,並在該 Plugin 類中實現對這個 Resource 的 “真·視圖函數”。比如:API 請求 GET /v2.0/networks
的視圖函數是 Ml2Plugin.get_networks
。實際上所有 Core Resources 都會關聯到同一個 Core Plugin,但 Extension Resources 就會根據不同的類型關聯到相應的 Service Plugins 上。Neutron 就是通過這種方式實現了從 Neutron API 層到 Neutron Plugin 層的調用封裝。
# /opt/stack/neutron/neutron/plugins/ml2/plugin.py
@db_api.retry_if_session_inactive()
def get_networks(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None, page_reverse=False):
# NOTE(ihrachys) use writer manager to be able to update mtu
# TODO(ihrachys) remove in Queens when mtu is not nullable
with db_api.CONTEXT_WRITER.using(context):
nets_db = super(Ml2Plugin, self)._get_networks(
context, filters, None, sorts, limit, marker, page_reverse)
# NOTE(ihrachys) pre Pike networks may have null mtus; update them
# in database if needed
# TODO(ihrachys) remove in Queens+ when mtu is not nullable
net_data = []
for net in nets_db:
if net.mtu is None:
net.mtu = self._get_network_mtu(net, validate=False)
net_data.append(self._make_network_dict(net, context=context))
self.type_manager.extend_networks_dict_provider(context, net_data)
nets = self._filter_nets_provider(context, net_data, filters)
return [db_utils.resource_fields(net, fields) for net in nets]
Extension API 請求處理
Extensions API 的本質是一個 WSGI Middleware 而非 WSGI Application。
# /opt/stack/neutron/neutron/api/extensions.py
import routes
def plugin_aware_extension_middleware_factory(global_config, **local_config):
"""Paste factory."""
def _factory(app):
ext_mgr = PluginAwareExtensionManager.get_instance()
# ExtensionMiddleware 是 Extensions middleware for WSGI(路由隱射、視圖函數)的封裝,接收 Extensions Resources Request 並進行處理
return ExtensionMiddleware(app, ext_mgr=ext_mgr)
return _factory
class ExtensionMiddleware(base.ConfigurableMiddleware):
"""Extensions middleware for WSGI."""
def __init__(self, application,
...
# extended resources
for resource in self.ext_mgr.get_resources():
...
# 自定義 Actions
for action, method in resource.collection_actions.items():
conditions = dict(method=[method])
path = "/%s/%s" % (resource.collection, action)
with mapper.submapper(controller=resource.controller,
action=action,
path_prefix=path_prefix,
conditions=conditions) as submap:
submap.connect(path_prefix + path, path)
submap.connect(path_prefix + path + "_format",
"%s.:(format)" % path)
# 自定義 Methods
for action, method in resource.collection_methods.items():
conditions = dict(method=[method])
path = "/%s" % resource.collection
with mapper.submapper(controller=resource.controller,
action=action,
path_prefix=path_prefix,
conditions=conditions) as submap:
submap.connect(path_prefix + path, path)
submap.connect(path_prefix + path + "_format",
"%s.:(format)" % path)
# 將 ResourceCollection、ResourceController、ResourceMemberAction 映射起來
mapper.resource(resource.collection, resource.collection,
controller=resource.controller,
member=resource.member_actions,
parent_resource=resource.parent,
path_prefix=path_prefix)
...
從上述代碼可以看出,雖然 Core API 使用了 Pecan 框架,但 Extension API 依舊使用了 routes 來進行 Mapper 的維護。
(Pdb) resource.collection
'routers'
(Pdb) resource.collection
'routers'
# 自定義 Method 和 Member Action 隱射
(Pdb) resource.member_actions
{'remove_router_interface': 'PUT', 'add_router_interface': 'PUT'}
(Pdb) resource.controller.__class__
<class 'webob.dec.wsgify'>
(Pdb) resource.controller.controller
<neutron.api.v2.base.Controller object at 0x7f81fd694ed0>
(Pdb) resource.controller.controller.plugin
<weakproxy at 0x7f81fd625158 to L3RouterPlugin at 0x7f81fd6c09d0>
Extension Resource routers 對應的 Plugin 是 L3RouterPlugin,API 請求 GET /v2.0/routers
對應的真·視圖函數就是 neutron.services.l3_router.l3_router_plugin:L3RouterPlugin.get_routers
。
# /opt/stack/neutron/neutron/db/l3_db.py
# L3RouterPlugin 繼承自父類 L3_NAT_dbonly_mixin
@db_api.retry_if_session_inactive()
def get_routers(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
marker_obj = lib_db_utils.get_marker_obj(
self, context, 'router', limit, marker)
return model_query.get_collection(context, l3_models.Router,
self._make_router_dict,
filters=filters, fields=fields,
sorts=sorts,
limit=limit,
marker_obj=marker_obj,
page_reverse=page_reverse)
Neutron Server 小結
Neutron Server 的啟動過程 :
- 加載(實例化)Core WSGI Application 和 Extension WSGI Middleware
- 加載(實例化) Core & Extension Plugins
- 啟動 Web Server 服務
- 啟動 Plugins RPC 服務
熟悉 OpenStack 的開發者可以感受到,相對於其他項目(e.g. Nova、Cinder),Neutron 的代碼編寫並不算常規,很難使有經驗的開發者快速掌握其要領,是 Neutron 入門難的原因。這顯然不是一個好的思路,但想想 Neutron(Quantum)是由誰開源出來的也就釋懷了。
Neutron API 主要有 Core API 和 Extension API 兩大類,在 Web Server 層面(WSGI Server、WSGI Middleware、WSGI Application)分別對應 WSGI Application 和 WSGI Middleware。無論是 Core API 還是 Extension API,它們都通過 Controller Class 封裝一種 Resource,區別在於前者使用的是 Pecan 框架,后者依舊使用 routes 庫來完成 URL Router、View Function 以及 HTTP Method 三者之間的 Mapping。雖然代碼編寫的地方和實現方式並不那么統一,但最終的結果是一致的 — 將 Request 從 API 層傳遞到 Plugin 層,再由 Plugin 層使用 RPC 協議通過 MQ 異步傳遞到真正執行任務的 Agents 服務進程中。
NOTE:並非所有的請求都會被異步專遞到 Agents 服務進程,有些請求會在 Plugins 層被完成,例如:獲取 networks 資源信息。
Plug-ins 與 Agents
Neutron Plugins 是 Neutron Server 的一部分,但在這里擰出來講是因為 Plugins 和 Agents 具有緊密的聯系。Neutron Plugins 作為 Neutron 內部調用的 “中轉” 層,上下承接 Neutron API 層和 Neutron Agents 層,中間的橋梁自然就是 RPC 通信協議以及 MQ 了。
OpenStack Networking plug-ins and agents — Plug and unplug ports, create networks or subnets, and provide IP addressing. These plug-ins and agents differ depending on the vendor and technologies used in the particular cloud. OpenStack Networking ships with plug-ins and agents for Cisco virtual and physical switches, NEC OpenFlow products, Open vSwitch, Linux bridging, and the VMware NSX product.
The common agents are L3 (layer 3), DHCP (dynamic host IP addressing), and a plug-in agent.
Messaging queue — Used by most OpenStack Networking installations to route information between the neutron-server and various agents. Also acts as a database to store networking state for particular plug-ins.
Plugin RPC
Plugin 層對 RPC 協議進行了封裝,Plugin 既充當了 RPC Producer 又充當着 RPC Consumer 的角色。
- RPC Producer:Plugin 向 Agent 發送消息
- RPC Consumer:Plugin 接收 Agent 發送的消息
首先,Plugin 要想成為 Consumer,就需要向 RPC Server 申請,這個申請的過程,我稱之為 Registered Endpoints。通過 Registered Endpoints,Plugin 就注冊好了與對應 Agent 通信的 endpoint(調用接口)。
Registered Endpoints 的代碼邏輯:
# /opt/stack/neutron/neutron/plugins/ml2/plugin.py
# 以 Core Plugin 的 RPC Listeners 啟動方法為例
class Ml2Plugin(...):
...
@log_helpers.log_method_call
def start_rpc_listeners(self):
"""Start the RPC loop to let the plugin communicate with agents."""
# 設置 ML2Plugin 與 Agents 通信的 endpoints
self._setup_rpc()
self.topic = topics.PLUGIN
self.conn = n_rpc.Connection()
# 將 endpoints 注冊到 RPC Consumer 並創建 RPC Consumer 實例對象
self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
self.conn.create_consumer(
topics.SERVER_RESOURCE_VERSIONS,
[resources_rpc.ResourcesPushToServerRpcCallback()],
fanout=True)
# process state reports despite dedicated rpc workers
self.conn.create_consumer(topics.REPORTS,
[agents_db.AgentExtRpcCallback()],
fanout=False)
# 以線程的方式啟動 endpoint 中的 RPC servers 實例對象
return self.conn.consume_in_threads()
def start_rpc_state_reports_listener(self):
self.conn_reports = n_rpc.Connection()
self.conn_reports.create_consumer(topics.REPORTS,
[agents_db.AgentExtRpcCallback()],
fanout=False)
return self.conn_reports.consume_in_threads()
def _setup_rpc(self):
"""Initialize components to support agent communication."""
# Agents endpoints 清單
self.endpoints = [
rpc.RpcCallbacks(self.notifier, self.type_manager),
securitygroups_rpc.SecurityGroupServerRpcCallback(),
dvr_rpc.DVRServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(),
agents_db.AgentExtRpcCallback(),
metadata_rpc.MetadataRpcCallback(),
resources_rpc.ResourcesPullRpcCallback()
]
其中最重要的 start_rpc_listeners 和 start_rpc_state_reports_listener 函數分別在上提到的 RpcWorker 和 RpcReportsWorker Workers 類被調用,從而實現 RPC Workers 的加載和運行。
打印 self.endpoints:
(Pdb) pp self.endpoints
[<neutron.plugins.ml2.rpc.RpcCallbacks object at 0x7f17fcd9f350>,
<neutron.api.rpc.handlers.securitygroups_rpc.SecurityGroupServerRpcCallback object at 0x7f17fcd9f390>,
<neutron.api.rpc.handlers.dvr_rpc.DVRServerRpcCallback object at 0x7f17fcd9f3d0>,
<neutron.api.rpc.handlers.dhcp_rpc.DhcpRpcCallback object at 0x7f17fcd9f410>,
<neutron.db.agents_db.AgentExtRpcCallback object at 0x7f17fcd9f450>,
<neutron.api.rpc.handlers.metadata_rpc.MetadataRpcCallback object at 0x7f17fcd9f5d0>,
<neutron.api.rpc.handlers.resources_rpc.ResourcesPullRpcCallback object at 0x7f17fcd9f610>]
Create Port 業務流程中調用 PRC 函數的示例:
/opt/stack/neutron/neutron/plugins/ml2/plugin.py
class Ml2Plugin(...):
...
def create_port(self, context, port):
...
return self._after_create_port(context, result, mech_context)
def _after_create_port(self, context, result, mech_context):
...
try:
bound_context = self._bind_port_if_needed(mech_context)
except ml2_exc.MechanismDriverError:
...
return bound_context.current
@db_api.retry_db_errors
def _bind_port_if_needed(self, context, allow_notify=False,
need_notify=False, allow_commit=True):
...
if not try_again:
if allow_notify and need_notify:
self._notify_port_updated(context)
return context
...
return context
def _notify_port_updated(self, mech_context):
port = mech_context.current
segment = mech_context.bottom_bound_segment
if not segment:
# REVISIT(rkukura): This should notify agent to unplug port
network = mech_context.network.current
LOG.debug("In _notify_port_updated(), no bound segment for "
"port %(port_id)s on network %(network_id)s",
{'port_id': port['id'], 'network_id': network['id']})
return
self.notifier.port_update(mech_context._plugin_context, port,
segment[api.NETWORK_TYPE],
segment[api.SEGMENTATION_ID],
segment[api.PHYSICAL_NETWORK])
# /opt/stack/neutron/neutron/plugins/ml2/rpc.py
class AgentNotifierApi(...):
...
def port_update(self, context, port, network_type, segmentation_id,
physical_network):
# 構建 RPC Client
cctxt = self.client.prepare(topic=self.topic_port_update,
fanout=True)
# 發送 RPC 消息
cctxt.cast(context, 'port_update', port=port,
network_type=network_type, segmentation_id=segmentation_id,
physical_network=physical_network)
最終 ML2Plugin 發出的 RPC 消息被訂閱該 Target 的 Agent(RPC 消費者)接收到,並執行最后的任務。
(Pdb) self.client.target
<Target topic=q-agent-notifier, version=1.0>
比如被 OvS Agent 接收到該消息:
# /opt/stack/neutron/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py
def port_update(self, context, **kwargs):
port = kwargs.get('port')
# Put the port identifier in the updated_ports set.
# Even if full port details might be provided to this call,
# they are not used since there is no guarantee the notifications
# are processed in the same order as the relevant API requests
self.updated_ports.add(port['id'])
Plugins 的 Callback System
除了上述舉例直接調用 RPC 函數(call、cast)的方式之外,Plugins 還是實現了一套 Callback System 機制,官方文檔《Neutron Messaging Callback System》、《Neutron Callback System》。
Callback System 與 RPC 一樣是為了實現通信,不同之處在於:RPC 是為了實現 neutron-server 與 agent 不同服務進程之間的任務消息傳遞;Callback System 是為了實現 core and service components 之間的、同一進程內部的通信,傳遞 Resource 的 Lifecycle Events(e.g. before creation, before deletion, etc.),讓不同的 Core 和 Services 之間、不同的 Services 之間可以感知到特定 Resource 的狀態變化。比如當 Neutron Network Resource 與多個 Service(VPN, Firewall and Load Balancer)關聯,那么 Service 在對 Network 進行操作時就需要確定 Network 當前的正確狀態。
舉例說明 Callback System 的作用:Service A, B, and C 都需要知道 router creation event。如果沒有一個中介來采用消息的方式通知這些 Services,那么,A 在執行 router creation 的時候就需要直接 call B/C,告知他們:“我要創建路由器”。但如果有了中介 X(Callback System),那么執行的流程就會變成:
- B 和 C 向 X 訂閱了 A create router 的 event
- 當 A 完成了 created router
- A calls X(A 具有 X 的調用句柄)
- X 就會將 A created router 的 event 通知 B 和 C(X -> notify)
整個過程中間 A、B、C 三者沒有直接通信,實現了 A、B、C(Services)之間的解耦。這就是所謂的 Callback(回調)。
Callback System 在 Neutron 中被大量應用,代碼實現均類似下述舉例:
# /opt/stack/neutron/neutron/plugins/ml2/plugin.py
class Ml2Plugin(...):
...
def create_network(self, context, network):
self._before_create_network(context, network)
...
def _before_create_network(self, context, network):
net_data = network[net_def.RESOURCE_NAME]
# 通知一個資源類型為 Network,Event 為 Before Create 的消息給訂閱了該類型 Event 的 Services
registry.notify(resources.NETWORK, events.BEFORE_CREATE, self,
context=context, network=net_data)
在 Callback System 的視線中有兩類角色:一類是事件的處理角色,另一類是事件發布角色。事件處理角色負責訂閱一個事件 registry.subscribe API
,事件發布角色則負責通知一個事件 registry.notifyAPI
。具體的代碼實現和模塊使用,在 Neutron 的官方文檔中有很多示例這里不再贅述。
Agents
從 Neutron 部署架構可以看出 Neutron 具有大量的 Networking Agents 服務進程,這個 Agents 被分散部署到各類節點之上運行,配置的對象是部署在這些節點之上的物理/虛擬網元(e.g. DHCP、Linux Bridge、Open vSwitch、Router),Agent 為 Neutron 提供各類網元功能的管理和執行服務。通過 Agents “搭配” 組合的不同,用戶得以靈活構建預期的網絡拓撲。
[root@localhost ~]# openstack network agent list
+--------------------------------------+--------------------+-----------------------+-------------------+-------+-------+---------------------------+
| ID | Agent Type | Host | Availability Zone | Alive | State | Binary |
+--------------------------------------+--------------------+-----------------------+-------------------+-------+-------+---------------------------+
| 2698f558-6b20-407c-acf5-950e707432ed | Metadata agent | localhost.localdomain | None | :-) | UP | neutron-metadata-agent |
| 7804fb5a-fe22-4f02-8e4c-5689744bb0aa | Open vSwitch agent | localhost.localdomain | None | :-) | UP | neutron-openvswitch-agent |
| a7b30a22-0a8a-4d31-bf20-9d96dbe420bc | DHCP agent | localhost.localdomain | nova | :-) | UP | neutron-dhcp-agent |
| eb1da27b-3fa2-4304-965a-f6b15c475419 | L3 agent | localhost.localdomain | nova | :-) | UP | neutron-l3-agent |
+--------------------------------------+--------------------+-----------------------+-------------------+-------+-------+---------------------------+
Agent 的抽象架構可分為三層:
- 北向提供 RPC 接口,供 Neutron Server 調用
- 南向通過 CLI 協議棧對相應的 Neutron VNF(虛擬網絡功能,虛擬網元)進行配置
- 中間進行兩種模型的轉換:從 RPC 模型轉換為 CLI 模型
例如:當 Neutron 為虛擬機創建並綁定一個 Port 時,Linux Bridge Agent 和 OvS Agent 就會執行下列指令,以此來支撐 Neutron 在計算節點上的網絡實現模型。
# Port UUID: 15c7b577-89f5-46f6-8111-5f4e0c8ebaa1
# VM UUID: 80996760-0c30-4e2a-847a-b9d882182df
brctl addbr qbr15c7b577-89
brctl setfd qbr15c7b577-89 0
brctl stp qbr15c7b577-89 off
brctl setageing qbr15c7b577-89 0
ip link add qvb15c7b577-89 type veth peer name qvo15c7b577-89
ip link set qvb15c7b577-89 up
ip link set qvb15c7b577-89 promisc on
ip link set qvb15c7b577-89 mtu 1450
ip link set qvo15c7b577-89 up
ip link set qvo15c7b577-89 promisc on
ip link set qvo15c7b577-89 mtu 1450
ip link set qbr15c7b577-89 up
brctl addif qbr15c7b577-89 qvb15c7b577-89
ovs-vsctl -- --may-exist add-br br-int -- set Bridge br-int datapath_type=system
ovs-vsctl --timeout=120 -- --if-exists del-port qvo15c7b577-89 -- add-port br-int qvo15c7b577-89 -- set Interface qvo15c7b577-89 external-ids:iface-id=15c7b577-89f5-46f6-8111-5f4e0c8ebaa1 external-ids:iface-status=active external-ids:attached-mac=fa:16:3e:d0:f6:a4 external-ids:vm-uuid=80996760-0c30-4e2a-847a-b9d882182df
ip link set qvo15c7b577-89 mtu 1450
Neutron Agent 的程序入口依舊定義在 setup.cfg 文件中,我們這里主要以 OvS Agent 為例關注 neutron-openvswitch-agent.service 服務進程的啟動流程。
啟動 OvS Agent
# /opt/stack/neutron/setup.cfg
neutron-openvswitch-agent = neutron.cmd.eventlet.plugins.ovs_neutron_agent:main
找到服務進程的程序入口函數:
# /opt/stack/neutron/neutron/plugins/ml2/drivers/openvswitch/agent/main.py
_main_modules = {
'ovs-ofctl': 'neutron.plugins.ml2.drivers.openvswitch.agent.openflow.'
'ovs_ofctl.main',
'native': 'neutron.plugins.ml2.drivers.openvswitch.agent.openflow.'
'native.main',
}
def main():
common_config.init(sys.argv[1:])
driver_name = cfg.CONF.OVS.of_interface
mod_name = _main_modules[driver_name]
mod = importutils.import_module(mod_name)
mod.init_config()
common_config.setup_logging()
profiler.setup("neutron-ovs-agent", cfg.CONF.host)
mod.main()
這里可以看見 OvS Agent 有兩種不同的啟動模式 ovs-ofctl 和 native,通過配置項 of_interface
來指定。
# openvswitch_agent.ini
of_interface -- OpenFlow interface to use.
Type: string
Default: native
Valid Values: ovs-ofctl, native
通常為 ovs-ofctl,表示使用 Open vSwitch 的 ovs-ofctl 指令來操作流表。
# /opt/stack/neutron/neutron/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/main.py
def main():
# 三種不同的 OvS Bridge 類型定義,分別對應 br-int、br-ethX、br-tun
bridge_classes = {
'br_int': br_int.OVSIntegrationBridge,
'br_phys': br_phys.OVSPhysicalBridge,
'br_tun': br_tun.OVSTunnelBridge,
}
ovs_neutron_agent.main(bridge_classes)
# /opt/stack/neutron/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py
def main(bridge_classes):
...
try:
# 實例化服務進程 app 對象
agent = OVSNeutronAgent(bridge_classes, ext_mgr, cfg.CONF)
capabilities.notify_init_event(n_const.AGENT_TYPE_OVS, agent)
except (RuntimeError, ValueError) as e:
LOG.error("%s Agent terminated!", e)
sys.exit(1)
# 啟動 Agent 守護進程
agent.daemon_loop()
class OVSNeutronAgent(...):
...
def __init__(self, bridge_classes, ext_manager, conf=None):
...
# 創建 RPC Consumer
self.setup_rpc()
...
def setup_rpc(self):
self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
# allow us to receive port_update/delete callbacks from the cache
self.plugin_rpc.register_legacy_notification_callbacks(self)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerAPIShim(
self.plugin_rpc.remote_resource_cache)
self.dvr_plugin_rpc = dvr_rpc.DVRServerRpcApi(topics.PLUGIN)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
# RPC network init
self.context = context.get_admin_context_without_session()
# Made a simple RPC call to Neutron Server.
while True:
try:
self.state_rpc.has_alive_neutron_server(self.context)
except oslo_messaging.MessagingTimeout as e:
LOG.warning('l2-agent cannot contact neutron server. '
'Check connectivity to neutron server. '
'Retrying... '
'Detailed message: %(msg)s.', {'msg': e})
continue
break
# 定義監聽消費者函數類型
# Define the listening consumers for the agent
consumers = [[constants.TUNNEL, topics.UPDATE],
[constants.TUNNEL, topics.DELETE],
[topics.DVR, topics.UPDATE]]
if self.l2_pop:
consumers.append([topics.L2POPULATION, topics.UPDATE])
self.connection = agent_rpc.create_consumers([self],
topics.AGENT,
consumers,
start_listening=False)
創建完 RPC Consumer 之后,OvS Agent 定義的 RPC 消費者函數就能夠到 MQ “消費” 從 Neutron Server 發送過來的消息了。e.g.
def port_update(self, context, **kwargs):
port = kwargs.get('port')
# Put the port identifier in the updated_ports set.
# Even if full port details might be provided to this call,
# they are not used since there is no guarantee the notifications
# are processed in the same order as the relevant API requests
self.updated_ports.add(port['id'])
def port_delete(self, context, **kwargs):
port_id = kwargs.get('port_id')
self.deleted_ports.add(port_id)
self.updated_ports.discard(port_id)
def network_update(self, context, **kwargs):
network_id = kwargs['network']['id']
for port_id in self.network_ports[network_id]:
# notifications could arrive out of order, if the port is deleted
# we don't want to update it anymore
if port_id not in self.deleted_ports:
self.updated_ports.add(port_id)
LOG.debug("network_update message processed for network "
"%(network_id)s, with ports: %(ports)s",
{'network_id': network_id,
'ports': self.network_ports[network_id]})
...
NOTE:OvS Agent 只會監聽 UPDATE 和 DELETE 的 RPC 消息,並沒有監聽 CREATE,這是因為 Neutron Port 的創建並不由 OvS Agent 完成而是由 L3 Agent、DHCP Agent 完成的。