Neutron分析(3)—— neutron-l3-agent


一.Layer-3 Networking Extension

neutron l3作為一種API擴展,向租戶提供了路由和NAT功能。

l3擴展包含兩種資源:

  • router:在不同內部子網中轉發數據包;通過指定內部網關做NAT。每一個子網對應router上的一個端口,這個端口的ip就是子網的網關。
  • floating ip:代表一個外部網絡的IP,映射到內部網絡的端口上。當網絡的router:external屬性為True時,floating ip才能定義。

這兩種資源都對應有不同的屬性。支持CRUD操作。

二.代碼分析

既然neutron中支持了l3擴展,那么怎樣通過API來創建router或者floating ip,以提供路由以及NAT的功能的呢?

主要有以下幾個步驟:
1.租戶通過horizon,nova命令或者自定義的腳本,發送與router或floating ip相關的操作。
2.這些API請求發送到neutron server,通過neutron提供的API extension相對應。
3.實現這些API extension的操作,比如說create_router,則由具體的plugin和database來共同完成。
4.plugin會通過rpc機制與計算網絡節點上運行的l3 agent來執行l3 轉發和NAT的功能。

l3.py

源代碼目錄:neutron/extensions/l3.py

class RouterPluginBase(object):

    @abc.abstractmethod
    def create_router(self, context, router):
        pass

    @abc.abstractmethod
    def update_router(self, context, id, router):
        pass

    @abc.abstractmethod
    def get_router(self, context, id, fields=None):
        pass

    @abc.abstractmethod
    def delete_router(self, context, id):
        pass

    @abc.abstractmethod
    def get_routers(self, context, filters=None, fields=None,
                    sorts=None, limit=None, marker=None, page_reverse=False):
        pass

    @abc.abstractmethod
    def add_router_interface(self, context, router_id, interface_info):
        pass

    @abc.abstractmethod
    def remove_router_interface(self, context, router_id, interface_info):
        pass

    @abc.abstractmethod
    def create_floatingip(self, context, floatingip):
        pass

    @abc.abstractmethod
    def update_floatingip(self, context, id, floatingip):
        pass

    @abc.abstractmethod
    def get_floatingip(self, context, id, fields=None):
        pass

    @abc.abstractmethod
    def delete_floatingip(self, context, id):
        pass

    @abc.abstractmethod
    def get_floatingips(self, context, filters=None, fields=None,
                        sorts=None, limit=None, marker=None,
                        page_reverse=False):
        pass

    def get_routers_count(self, context, filters=None):
        raise NotImplementedError()

    def get_floatingips_count(self, context, filters=None):
        raise NotImplementedError()

 

這個模塊中,class RouterPluginBase定義了plugin中需要實現的方法。

l3_db.py

源碼目錄:/neutron/db/l3_db.py
這個模塊中,class L3_NAT_db_mixin繼承了上面l3模塊的class RouterPluginBase,因此在RouterPluginBase中定義的抽象方法就要在這里實現了。
類注釋中寫道,Mixin class to add L3/NAT router methods to db_plugin_base_v2。

在類的開始,有這樣一段代碼:

@property
    def l3_rpc_notifier(self):
        if not hasattr(self, '_l3_rpc_notifier'):
            self._l3_rpc_notifier = l3_rpc_agent_api.L3AgentNotifyAPI()
        return self._l3_rpc_notifier

 

說明l3_rpc_notifier,是模塊l3_rpc_agent_api中類L3AgentNotifyAPI的一個實例。

l3_rpc_agent_api模塊源碼在/neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py。

class L3AgentNotifyAPI(n_rpc.RpcProxy):
    """API for plugin to notify L3 agent."""
    BASE_RPC_API_VERSION = '1.0'

    def __init__(self, topic=topics.L3_AGENT):
        super(L3AgentNotifyAPI, self).__init__(
            topic=topic, default_version=self.BASE_RPC_API_VERSION)

    def _notification_host(self, context, method, payload, host):
        """Notify the agent that is hosting the router."""        ...

    def _agent_notification(self, context, method, router_ids,
                            operation, data):
        """Notify changed routers to hosting l3 agents."""
...
def _notification(self, context, method, router_ids, operation, data): """Notify all the agents that are hosting the routers.""" ...def _notification_fanout(self, context, method, router_id): """Fanout the deleted router to all L3 agents.""" ...def agent_updated(self, context, admin_state_up, host): self._notification_host(context, 'agent_updated', {'admin_state_up': admin_state_up}, host) def router_deleted(self, context, router_id): self._notification_fanout(context, 'router_deleted', router_id) def routers_updated(self, context, router_ids, operation=None, data=None): if router_ids: self._notification(context, 'routers_updated', router_ids, operation, data) def router_removed_from_agent(self, context, router_id, host): self._notification_host(context, 'router_removed_from_agent', {'router_id': router_id}, host) def router_added_to_agent(self, context, router_ids, host): self._notification_host(context, 'router_added_to_agent', router_ids, host)

這個類主要用於plugin發送rpc通知給l3 agent。

rpc處理

在上面的l3_db.py中,會將涉及router和floating ip的處理讀取或者寫入到數據中。但是還有一些操作不僅如此,還需要通過rpc(通過調用l3_rpc_agent_api中的函數,這些操作大部分會去 調用routers_updated),通知l3 agent進行處理。

這些需要處理的地方包括:update_router,delete_router,add_router_interface,remove_router_interface,create_floatingip,update_floatingip,delete_floatingip,disassociate_floatingips 等操作。

l3_agent.py

源碼目錄:neutron/agent/l3_agent.py

l3 agent使用Linux ip協議棧和iptables來實現router和NAT的功能。

這時候,如果在horizon的界面創建一個路由,不進行任何操作的話,plugin只會操作數據庫,l3 agent不會作處理。而當update router,如設置外部網關時,l3才會去處理請求。

l3 agent使用service框架啟動服務,其manager類為neutron.agent.l3_agent.L3NATAgentWithStateReport,該類繼承自L3NATAgent,主要實現了基於rpc的_report_state向PluginReportStateAPI(topic為q-plugin)匯報狀態信息,這些信息由各個plugin來處理(比如ml2中通過start_rpc_listeners來注冊該topic的消費者)。

L3NATAgent類是最主要的L3 Manager類,該類繼承關系為 class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager)FWaaSL3AgentRpcCallback主要是加載防火牆驅動,並創建RPC與Plugin通信。

再來看L3NATAgent的創建過程:

   def __init__(self, host, conf=None):
        if conf:
            self.conf = conf
        else:
            self.conf = cfg.CONF
        self.root_helper = config.get_root_helper(self.conf)
        self.router_info = {}

        self._check_config_params()

        try:
       # import driver from l3_agent.init
# Example: interface_driver = neutron.agent.linux.interface.OVSInterfaceDriver self.driver
= importutils.import_object( self.conf.interface_driver, self.conf ) except Exception: msg = _("Error importing interface driver " "'%s'") % self.conf.interface_driver LOG.error(msg) raise SystemExit(1) self.context = context.get_admin_context_without_session()
# Agent side of the l3 agent RPC API, topic is 'q-l3-plugin' self.plugin_rpc
= L3PluginApi(topics.L3PLUGIN, host) self.fullsync = True self.updated_routers = set() self.removed_routers = set() self.sync_progress = False self._clean_stale_namespaces = self.conf.use_namespaces # Start RPC Loop self.rpc_loop = loopingcall.FixedIntervalLoopingCall( self._rpc_loop) self.rpc_loop.start(interval=RPC_LOOP_INTERVAL) super(L3NATAgent, self).__init__(conf=self.conf) self.target_ex_net_id = None

上面的self.plugin_rpc會處理neutron-server轉發過來的請求,這個請求是通過service_plugins的方式處理的:

neutron.service_plugins =
    dummy = neutron.tests.unit.dummy_plugin:DummyServicePlugin
    router = neutron.services.l3_router.l3_router_plugin:L3RouterPlugin
    firewall = neutron.services.firewall.fwaas_plugin:FirewallPlugin
    lbaas = neutron.services.loadbalancer.plugin:LoadBalancerPlugin
    vpnaas = neutron.services.vpn.plugin:VPNDriverPlugin
    metering = neutron.services.metering.metering_plugin:MeteringPlugin

self.rpc_loop會循環檢測從plugin發送過來的rpc請求:

  @lockutils.synchronized('l3-agent', 'neutron-')
    def _rpc_loop(self):
        # _rpc_loop and _sync_routers_task will not be
        # executed in the same time because of lock.
        # so we can clear the value of updated_routers
        # and removed_routers, but they can be updated by
        # updated_routers and removed_routers rpc call
        try:
            LOG.debug(_("Starting RPC loop for %d updated routers"),
                      len(self.updated_routers))
            if self.updated_routers:  # 保存了需要本次處理的router信息 # We're capturing and clearing the list, and will
                # process the "captured" updates in this loop,
                # and any updates that happen due to a context switch
                # will be picked up on the next pass.
                updated_routers = set(self.updated_routers)
                self.updated_routers.clear()
                router_ids = list(updated_routers)
                routers = self.plugin_rpc.get_routers(
                    self.context, router_ids)
                # routers with admin_state_up=false will not be in the fetched
                fetched = set([r['id'] for r in routers])
#不在fetched中而在updated_routers中,說明需刪除 self.removed_routers.update(updated_routers
- fetched) self._process_routers(routers) self._process_router_delete() LOG.debug(_("RPC loop successfully completed")) except Exception: LOG.exception(_("Failed synchronizing routers")) self.fullsync = True

_process_routers

如果有rpc請求過來,即需要更新路由信息,或者添加路由子接口,創建floating ip等操作,都會在這里執行。這個函數里會去調用_process_routers函數,在_process_routers函數中會去創建綠色線程,執行process_router函數。可以說,l3 agent調用網絡設備的工作都會在process_router中進行。

    def process_router(self, ri):
        ri.iptables_manager.defer_apply_on()
        ex_gw_port = self._get_ex_gw_port(ri)
        internal_ports = ri.router.get(l3_constants.INTERFACE_KEY, [])
        existing_port_ids = set([p['id'] for p in ri.internal_ports])
        current_port_ids = set([p['id'] for p in internal_ports
                                if p['admin_state_up']])
        new_ports = [p for p in internal_ports if
                     p['id'] in current_port_ids and
                     p['id'] not in existing_port_ids]
        old_ports = [p for p in ri.internal_ports if
                     p['id'] not in current_port_ids]
        for p in new_ports:
            self._set_subnet_info(p)
            self.internal_network_added(ri, p['network_id'], p['id'],
                                        p['ip_cidr'], p['mac_address'])
            ri.internal_ports.append(p)

        for p in old_ports:
            self.internal_network_removed(ri, p['id'], p['ip_cidr'])
            ri.internal_ports.remove(p)

        existing_devices = self._get_existing_devices(ri)
        current_internal_devs = set([n for n in existing_devices
                                     if n.startswith(INTERNAL_DEV_PREFIX)])
        current_port_devs = set([self.get_internal_device_name(id) for
                                 id in current_port_ids])
        stale_devs = current_internal_devs - current_port_devs
        for stale_dev in stale_devs:
            LOG.debug(_('Deleting stale internal router device: %s'),
                      stale_dev)
            self.driver.unplug(stale_dev,
                               namespace=ri.ns_name,
                               prefix=INTERNAL_DEV_PREFIX)

        # Get IPv4 only internal CIDRs
        internal_cidrs = [p['ip_cidr'] for p in ri.internal_ports
                          if netaddr.IPNetwork(p['ip_cidr']).version == 4]
        # TODO(salv-orlando): RouterInfo would be a better place for
        # this logic too
        ex_gw_port_id = (ex_gw_port and ex_gw_port['id'] or
                         ri.ex_gw_port and ri.ex_gw_port['id'])

        interface_name = None
        if ex_gw_port_id:
            interface_name = self.get_external_device_name(ex_gw_port_id)
        if ex_gw_port and ex_gw_port != ri.ex_gw_port:
            self._set_subnet_info(ex_gw_port)
            self.external_gateway_added(ri, ex_gw_port,
                                        interface_name, internal_cidrs)
        elif not ex_gw_port and ri.ex_gw_port:
            self.external_gateway_removed(ri, ri.ex_gw_port,
                                          interface_name, internal_cidrs)

        stale_devs = [dev for dev in existing_devices
                      if dev.startswith(EXTERNAL_DEV_PREFIX)
                      and dev != interface_name]
        for stale_dev in stale_devs:
            LOG.debug(_('Deleting stale external router device: %s'),
                      stale_dev)
            self.driver.unplug(stale_dev,
                               bridge=self.conf.external_network_bridge,
                               namespace=ri.ns_name,
                               prefix=EXTERNAL_DEV_PREFIX)

        # Process static routes for router
        self.routes_updated(ri)
        # Process SNAT rules for external gateway
        ri.perform_snat_action(self._handle_router_snat_rules,
                               internal_cidrs, interface_name)

        # Process SNAT/DNAT rules for floating IPs
        fip_statuses = {}
        try:
            if ex_gw_port:
                existing_floating_ips = ri.floating_ips
                self.process_router_floating_ip_nat_rules(ri)
                ri.iptables_manager.defer_apply_off()
                # Once NAT rules for floating IPs are safely in place
                # configure their addresses on the external gateway port
                fip_statuses = self.process_router_floating_ip_addresses(
                    ri, ex_gw_port)
        except Exception:
            # TODO(salv-orlando): Less broad catching
            # All floating IPs must be put in error state
            for fip in ri.router.get(l3_constants.FLOATINGIP_KEY, []):
                fip_statuses[fip['id']] = l3_constants.FLOATINGIP_STATUS_ERROR

        if ex_gw_port:
            # Identify floating IPs which were disabled
            ri.floating_ips = set(fip_statuses.keys())
            for fip_id in existing_floating_ips - ri.floating_ips:
                fip_statuses[fip_id] = l3_constants.FLOATINGIP_STATUS_DOWN
            # Update floating IP status on the neutron server
            self.plugin_rpc.update_floatingip_statuses(
                self.context, ri.router_id, fip_statuses)

        # Update ex_gw_port and enable_snat on the router info cache
        ri.ex_gw_port = ex_gw_port
        ri.enable_snat = ri.router.get('enable_snat')
process_router函數所做的工作有:

1.處理內部接口

這個是在router添加和刪除子接口時工作。它會調用internal_network_added和internal_network_removed這個兩個函數。

在internal_network_added和internal_network_removed這個兩個函數會去調用OVSInterfaceDriver的plug和unplug 函數,這兩個函數最終會用ip linkip addr的命令去處理接口和ip地址。

2.處理外部網關
router添加和刪除外部網關。調用external_gateway_added和external_gateway_removed函數,同樣也會調用plug和unplug函數,用ip linkip addr的命令進行最終處理

3.為外部網關做SNAT

調用_handle_router_snat_rules函數,使用iptables來加鏈和刪除鏈。

在我的測試網絡中,router上有3個接口,外部網關地址為192.168.39.2,內部兩個子網的網關為10.1.0.1,10.2.0.1。iptables規則如下:

1
2
3
iptables -t nat -A POSTROUTING ! -i qg-fcb1a762-1f ! -o qg-fcb1a762-1f -m conntrack ! --ctstate DNAT -j ACCEPT
iptables -t nat -A snat -s 10.2.0.1 /24 -j SNAT --to- source 192.168.39.2
iptables -t nat -A snat -s 10.1.0.1 /24 -j SNAT --to- source 192.168.39.2

qg-fcb1a762-1f為外部網關接口的索引,使用ip netns exec $namespace ip link list可查看。

4.為floating ip做SNAT/DNAT

和浮動IP相關,如創建,更新,刪除,綁定到一個雲主機的接口,解綁定等。

不同neutron版本這部分的處理不同,這里是基於Icehouse rc1版本的,在havava stable版本,只有一個函數來處理iptables規則和floating ip。

process_router_floating_ip_nat_rules :當floating ip與雲主機綁定時,會先清除已有的floating_ip規則,再加上要添加的iptables規則,同時重新加載清除的iptables規則。

比如,一個雲主機10.1.0.2上綁定了一個floating ip(192.168.39.5)。那么最終會在iptable不同的鏈中添加iptables規則,float-snat為neutron自定義鏈。

1
2
3
iptables -t nat -A PREROUTING -d 192.168.39.5 -j DNAT --to 10.1.0.2
iptables -t nat -A OUTPUT -d 192.168.39.5 -j DNAT --to 10.1.0.2
iptables -t nat -A float-snat -s 10.1.0.2 -j SNAT --to 192.168.39.5

process_router_floating_ip_addresses:

將floating ip和雲主機綁定時,使用ip addr add命令添加ip地址。
解除floating ip和雲主機綁定時,使用ip addr del命令將floating ip刪除。

 

類圖

本文轉自http://squarey.me/cloud-virtualization/neutron-l3-analyse.html,有部分刪改。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM