SDN實驗---Ryu的應用開發(六)網絡拓撲時延探測


一:預備知識

SDN實驗---Ryu的應用開發(五)網絡拓撲發現

Ryu源碼之模塊功能分析

Ryu源碼之拓撲發現原理分析

二:實驗原理

網絡時延探測應用利用了Ryu自帶的Switches模塊的數據,獲取到了LLDP數據發送時的時間戳,然后和收到的時間戳進行相減,得到了LLDP數據包從控制器下發到交換機A,然后從交換機A到交換機B,再上報給控制器的時延T1,示例見圖1的藍色箭頭。

同理反向的時延T2由綠色的箭頭組成。

此外,控制器到交換機的往返時延由一個藍色箭頭和一個綠色箭頭組成,此部分時延由echo報文測試,分別為Ta,Tb。最后鏈路的前向后向平均時延T=(T1+T2-Ta-Tb)/2。

 

三:時延探測代碼實現

(一)拓撲發現模塊(已修改)

from ryu.base import app_manager

from ryu.ofproto import ofproto_v1_3

from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER,CONFIG_DISPATCHER,DEAD_DISPATCHER #只是表示datapath數據路徑的狀態
from ryu.controller.handler import set_ev_cls

from ryu.lib import hub
from ryu.lib.packet import packet,ethernet

from ryu.topology import event,switches
from ryu.topology.api import get_switch,get_link,get_host

import threading,time,random

DELAY_MONITOR_PERIOD = 5

class TopoDetect(app_manager.RyuApp):
    OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]

    def __init__(self,*args,**kwargs):
        super(TopoDetect,self).__init__(*args,**kwargs)
        self.topology_api_app = self
        self.name = "topology"
        self.link_list = None
        self.switch_list = None
        self.host_list = None

        self.dpid2id = {}
        self.id2dpid = {}
        self.dpid2switch = {}

        self.ip2host = {}
        self.ip2switch = {}

        self.net_size = 0
        self.net_topo = []

        self.net_flag = False
        self.net_arrived = 0
        
        self.monitor_thread = hub.spawn(self._monitor)
  def _monitor(self):  #修改,只獲取拓撲,不主動顯示!!! """
        協程實現偽並發,探測拓撲狀態
        """
        while True:
            #print("------------------_monitor")
            self._host_add_handler(None) #主機單獨提取處理
            self.get_topology(None)
            hub.sleep(DELAY_MONITOR_PERIOD) #5秒一次


    @set_ev_cls(ofp_event.EventOFPSwitchFeatures,CONFIG_DISPATCHER)
    def switch_feature_handle(self,ev):
        """
        datapath中有配置消息到達
        """
        #print("------XXXXXXXXXXX------%d------XXXXXXXXXXX------------switch_feature_handle"%self.net_arrived)
        #print("----%s----------",ev.msg)
        msg = ev.msg
        datapath = msg.datapath
        ofproto = datapath.ofproto
        ofp_parser = datapath.ofproto_parser

        match = ofp_parser.OFPMatch()

        actions = [ofp_parser.OFPActionOutput(ofproto.OFPP_CONTROLLER,ofproto.OFPCML_NO_BUFFER)]

        self.add_flow(datapath=datapath,priority=0,match=match,actions=actions,extra_info="config infomation arrived!!")


    def add_flow(self,datapath,priority,match,actions,idle_timeout=0,hard_timeout=0,extra_info=None):
        #print("------------------add_flow:")
        if extra_info != None:
            print(extra_info)
        ofproto = datapath.ofproto
        ofp_parser = datapath.ofproto_parser

        inst = [ofp_parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS,actions)]

        mod = ofp_parser.OFPFlowMod(datapath=datapath,priority=priority,
                                    idle_timeout=idle_timeout,
                                    hard_timeout=hard_timeout,
                                    match=match,instructions=inst)
        datapath.send_msg(mod);

    @set_ev_cls(ofp_event.EventOFPPacketIn,MAIN_DISPATCHER)
    def packet_in_handler(self,ev):
        #print("------------------packet_in_handler")
        msg = ev.msg
        datapath = msg.datapath
        ofproto = datapath.ofproto
        ofp_parser = datapath.ofproto_parser

        dpid = datapath.id
        in_port = msg.match['in_port']

        pkt = packet.Packet(msg.data)
        eth_pkt = pkt.get_protocol(ethernet.ethernet)
        dst = eth_pkt.dst
        src = eth_pkt.src

        #self.logger.info("------------------Controller %s get packet, Mac address from: %s send to: %s , send from datapath: %s,in port is: %s"
        #                    ,dpid,src,dst,dpid,in_port)
        self.get_topology(None)


    @set_ev_cls([event.EventHostAdd])
    def _host_add_handler(self,ev):    #主機信息單獨處理,不屬於網絡拓撲
        self.host_list = get_host(self.topology_api_app) #3.需要使用pingall,主機通過與邊緣交換機連接,才能告訴控制器
        #獲取主機信息字典ip2host{ipv4:host object}  ip2switch{ipv4:dpid}
        for i,host in enumerate(self.host_list):
            self.ip2switch["%s"%host.ipv4] = host.port.dpid
            self.ip2host["%s"%host.ipv4] = host


    events = [event.EventSwitchEnter, event.EventSwitchLeave,
               event.EventSwitchReconnected,
               event.EventPortAdd, event.EventPortDelete,
               event.EventPortModify,
               event.EventLinkAdd, event.EventLinkDelete]
    @set_ev_cls(events)
    def get_topology(self,ev):
        #print("------+++++++++++------%d------+++++++++++------------get_topology"%self.net_arrived)

        self.net_flag = False
        self.net_topo = []

        #print("-----------------get_topology")
        #獲取所有的交換機、鏈路
        self.switch_list = get_switch(self.topology_api_app) #1.只要交換機與控制器聯通,就可以獲取
        self.link_list = get_link(self.topology_api_app) #2.在ryu啟動時,加上--observe-links即可用於拓撲發現
        
        #獲取交換機字典id2dpid{id:dpid} dpid2switch{dpid:switch object}
        for i,switch in enumerate(self.switch_list):
            self.id2dpid[i] = switch.dp.id
            self.dpid2id[switch.dp.id] = i
            self.dpid2switch[switch.dp.id] = switch


        #根據鏈路信息,開始獲取拓撲信息
        self.net_size = len(self.id2dpid) #表示網絡中交換機個數
        for i in range(self.net_size):
            self.net_topo.append([0]*self.net_size)

        for link in self.link_list:
            src_dpid = link.src.dpid
            src_port = link.src.port_no

            dst_dpid = link.dst.dpid
            dst_port = link.dst.port_no

            try:
                sid = self.dpid2id[src_dpid]
                did = self.dpid2id[dst_dpid]
            except KeyError as e:
                #print("--------------Error:get KeyError with link infomation(%s)"%e)
                return
            self.net_topo[sid][did] = [src_port,0] #注意:這里0表示存在鏈路,后面可以修改為時延
            self.net_topo[did][sid] = [dst_port,0] #注意:修改為列表,不要用元組,元組無法修改,我們后面要修改時延


        self.net_flag = True #表示網絡拓撲創建成功

    def show_topology(self):
        print("-----------------show_topology")
        print("----------switch network----------")
        line_info = "         "
        for i in range(self.net_size):
            line_info+="        s%-5d        "%self.id2dpid[i]
        print(line_info)
        for i in range(self.net_size):
            line_info = "s%d      "%self.id2dpid[i]
            for j in range(self.net_size):
                if self.net_topo[i][j] == 0:
                    line_info+="%-22d"%0
                else:
                    line_info+="(%d,%.12f)    "%tuple(self.net_topo[i][j])
            print(line_info)

        print("----------host 2 switch----------")
        for key,val in self.ip2switch.items():
            print("%s---s%d"%(key,val))

(二)模塊導入

from ryu.base import app_manager
from ryu.base.app_manager import lookup_service_brick

from ryu.ofproto import ofproto_v1_3

from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER,CONFIG_DISPATCHER,DEAD_DISPATCHER,HANDSHAKE_DISPATCHER #只是表示datapath數據路徑的狀態
from ryu.controller.handler import set_ev_cls

from ryu.lib import hub
from ryu.lib.packet import packet,ethernet

from ryu.topology.switches import Switches
from ryu.topology.switches import LLDPPacket

import time

(三)數據結構

ECHO_REQUEST_INTERVAL = 0.05
DELAY_DETECTING_PERIOD = 5

class DelayDetect(app_manager.RyuApp):
    OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]

    def __init__(self,*args,**kwargs):
        super(DelayDetect,self).__init__(*args,**kwargs)
        self.name = "delay" self.topology = lookup_service_brick("topology") #注意:我們使用lookup_service_brick加載模塊實例時,對於我們自己定義的app,我們需要在類中定義self.name。 self.switches = lookup_service_brick("switches") #此外,最重要的是:我們啟動本模塊DelayDetect時,必須同時啟動自定義的模塊!!! 比如:ryu-manager ./TopoDetect.py ./DelayDetect.py --verbose --observe-links self.dpid2switch = {} #或者直接為{},也可以。下面_state_change_handler也會添加進去 self.dpid2echoDelay = {} #記錄echo時延 self.src_sport_dst2Delay = {} #記錄LLDP報文測量的時延。實際上可以直接更新,這里單獨記錄,為了單獨展示 {”src_dpid-srt_port-dst_dpid“:delay} 
        self.detector_thread = hub.spawn(self._detector)

(四)協程獲取鏈路時延

    def _detector(self):
        """
        協程實現偽並發,探測鏈路時延
        """
        while True:
            if self.topology == None:
                self.topology = lookup_service_brick("topology")
            if self.topology.net_flag:
                #print("------------------_detector------------------")
                self._send_echo_request()
                self.get_link_delay()
                if self.topology.net_flag:
                    try:
                        self.show_delay() self.topology.show_topology()  #拓撲顯示
                    except Exception as err:
                        print("------------------Detect delay failure!!!------------------")
            hub.sleep(DELAY_DETECTING_PERIOD) #5秒一次

(五)獲取Echo時延

    def _send_echo_request(self):
        """
        發生echo報文到datapath
        """
        for datapath in self.dpid2switch.values():
            parser = datapath.ofproto_parser
            echo_req = parser.OFPEchoRequest(datapath,data=bytes("%.12f"%time.time(),encoding="utf8")) #獲取當前時間

            datapath.send_msg(echo_req)

            #重要!不要同時發送echo請求,因為它幾乎同時會生成大量echo回復。
            #在echo_reply_處理程序中處理echo reply時,會產生大量隊列等待延遲。
            hub.sleep(ECHO_REQUEST_INTERVAL)

    @set_ev_cls(ofp_event.EventOFPEchoReply,[MAIN_DISPATCHER,CONFIG_DISPATCHER,HANDSHAKE_DISPATCHER])
    def echo_reply_handler(self,ev):
        """
        處理echo響應報文,獲取控制器到交換機的鏈路往返時延

              Controller
                  |    
     echo latency |  
                 `|‘ 
                   Switch        
        """
        now_timestamp = time.time()
        try:
            echo_delay = now_timestamp - eval(ev.msg.data)
            self.dpid2echoDelay[ev.msg.datapath.id] = echo_delay
        except:
            return

(六)獲取LLDP時延

補充:前面我們通過lookup_service_brick("switches"),實例化了switches模塊。詳細見:https://www.cnblogs.com/ssyfj/p/14193150.html。該模塊中通過協程實現了周期0.05s發送LLDP數據包。所以我們下面可以直接獲取LLDP數據報。

    @set_ev_cls(ofp_event.EventOFPPacketIn,MAIN_DISPATCHER)
    def packet_in_handler(self,ev): #處理到達的LLDP報文,從而獲得LLDP時延
        """
                      Controller
                    |        /|\    
                   \|/         |
                Switch----->Switch
        """
        msg = ev.msg
        try:
            src_dpid,src_outport = LLDPPacket.lldp_parse(msg.data) #獲取兩個相鄰交換機的源交換機dpid和port_no(與目的交換機相連的端口)
            dst_dpid = msg.datapath.id #獲取目的交換機(第二個),因為來到控制器的消息是由第二個(目的)交換機上傳過來的
            dst_inport = msg.match['in_port']
            if self.switches is None:
                self.switches = lookup_service_brick("switches") #獲取交換機模塊實例

            #獲得key(Port類實例)和data(PortData類實例)
            for port in self.switches.ports.keys(): #開始獲取對應交換機端口的發送時間戳
                if src_dpid == port.dpid and src_outport == port.port_no: #匹配key
                    port_data = self.switches.ports[port] #獲取滿足key條件的values值PortData實例,內部保存了發送LLDP報文時的timestamp信息
                    timestamp = port_data.timestamp
                    if timestamp:
                        delay = time.time() - timestamp
                        self._save_delay_data(src=src_dpid,dst=dst_dpid,src_port=src_outport,lldpdealy=delay)
        except:
            return

    def _save_delay_data(self,src,dst,src_port,lldpdealy):
        key = "%s-%s-%s"%(src,src_port,dst)
        self.src_sport_dst2Delay[key] = lldpdealy

(七)根據LLDP和Echo時延,更新網絡拓撲圖中的權值信息

    def get_link_delay(self):
        """
        更新圖中的權值信息
        """
        print("--------------get_link_delay-----------")
        for src_sport_dst in self.src_sport_dst2Delay.keys():
                src,sport,dst = tuple(map(eval,src_sport_dst.split("-")))
                if src in self.dpid2echoDelay.keys() and dst in self.dpid2echoDelay.keys():
                    sid,did = self.topology.dpid2id[src],self.topology.dpid2id[dst]
                    if self.topology.net_topo[sid][did] != 0:
                        if self.topology.net_topo[sid][did][0] == sport:
                            s_d_delay = self.src_sport_dst2Delay[src_sport_dst]-(self.dpid2echoDelay[src]+self.dpid2echoDelay[dst])/2;
                            if s_d_delay < 0: #注意:可能出現單向計算時延導致最后小於0,這是不允許的。則不進行更新,使用上一次原始值
                                continue
                            self.topology.net_topo[sid][did][1] = self.src_sport_dst2Delay[src_sport_dst]-(self.dpid2echoDelay[src]+self.dpid2echoDelay[dst])/2

(八)顯示網絡拓撲圖和Echo、LLDP時延信息

    @set_ev_cls(ofp_event.EventOFPStateChange,[MAIN_DISPATCHER, DEAD_DISPATCHER])
    def _state_change_handler(self, ev):
        datapath = ev.datapath
        if ev.state == MAIN_DISPATCHER:
            if not datapath.id in self.dpid2switch:
                self.logger.debug('Register datapath: %016x', datapath.id)
                self.dpid2switch[datapath.id] = datapath
        elif ev.state == DEAD_DISPATCHER:
            if datapath.id in self.dpid2switch:
                self.logger.debug('Unregister datapath: %016x', datapath.id)
                del self.dpid2switch[datapath.id]

        if self.topology == None:
            self.topology = lookup_service_brick("topology")
        print("-----------------------_state_change_handler-----------------------")
        print(self.topology.show_topology())
        print(self.switches)

    def show_delay(self):
        print("-----------------------show echo delay-----------------------")
        for key,val in self.dpid2echoDelay.items():
            print("s%d----%.12f"%(key,val))
        print("-----------------------show LLDP delay-----------------------")
        for key,val in self.src_sport_dst2Delay.items():
            print("%s----%.12f"%(key,val))

(九)全部代碼

from ryu.base import app_manager
from ryu.base.app_manager import lookup_service_brick

from ryu.ofproto import ofproto_v1_3

from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER,CONFIG_DISPATCHER,DEAD_DISPATCHER,HANDSHAKE_DISPATCHER #只是表示datapath數據路徑的狀態
from ryu.controller.handler import set_ev_cls

from ryu.lib import hub
from ryu.lib.packet import packet,ethernet

from ryu.topology.switches import Switches
from ryu.topology.switches import LLDPPacket

import time

ECHO_REQUEST_INTERVAL = 0.05
DELAY_DETECTING_PERIOD = 5

class DelayDetect(app_manager.RyuApp):
    OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]

    def __init__(self,*args,**kwargs):
        super(DelayDetect,self).__init__(*args,**kwargs)
        self.name = "delay"

        self.topology = lookup_service_brick("topology") #注意:我們使用lookup_service_brick加載模塊實例時,對於我們自己定義的app,我們需要在類中定義self.name。
        self.switches = lookup_service_brick("switches") #此外,最重要的是:我們啟動本模塊DelayDetect時,必須同時啟動自定義的模塊!!! 比如:ryu-manager ./TopoDetect.py ./DelayDetect.py --verbose --observe-links

        self.dpid2switch = {} #或者直接為{},也可以。下面_state_change_handler也會添加進去
        self.dpid2echoDelay = {}

        self.src_sport_dst2Delay = {} #記錄LLDP報文測量的時延。實際上可以直接更新,這里單獨記錄,為了單獨展示 {”src_dpid-srt_port-dst_dpid“:delay}

        self.detector_thread = hub.spawn(self._detector)

    def _detector(self):
        """
        協程實現偽並發,探測鏈路時延
        """
        while True:
            if self.topology == None:
                self.topology = lookup_service_brick("topology")
            if self.topology.net_flag:
                #print("------------------_detector------------------")
                self._send_echo_request()
                self.get_link_delay()
                if self.topology.net_flag:
                    try:
                        self.show_delay()
                        self.topology.show_topology()
                    except Exception as err:
                        print("------------------Detect delay failure!!!------------------")
            hub.sleep(DELAY_DETECTING_PERIOD) #5秒一次

    def get_link_delay(self):
        """
        更新圖中的權值信息
        """
        #print("--------------get_link_delay-----------")
        for src_sport_dst in self.src_sport_dst2Delay.keys():
                src,sport,dst = tuple(map(eval,src_sport_dst.split("-")))
                if src in self.dpid2echoDelay.keys() and dst in self.dpid2echoDelay.keys():
                    sid,did = self.topology.dpid2id[src],self.topology.dpid2id[dst]
                    if self.topology.net_topo[sid][did] != 0:
                        if self.topology.net_topo[sid][did][0] == sport:
                            s_d_delay = self.src_sport_dst2Delay[src_sport_dst]-(self.dpid2echoDelay[src]+self.dpid2echoDelay[dst])/2;
                            if s_d_delay < 0: #注意:可能出現單向計算時延導致最后小於0,這是不允許的。則不進行更新,使用上一次原始值
                                continue
                            self.topology.net_topo[sid][did][1] = self.src_sport_dst2Delay[src_sport_dst]-(self.dpid2echoDelay[src]+self.dpid2echoDelay[dst])/2

    def _send_echo_request(self):
        """
        發生echo報文到datapath
        """
        #print("==========_send_echo_request==============")
        #print(self.dpid2switch)
        for datapath in self.dpid2switch.values():
            parser = datapath.ofproto_parser
            echo_req = parser.OFPEchoRequest(datapath,data=bytes("%.12f"%time.time(),encoding="utf8")) #獲取當前時間
            #print("==========_send_echo_request=========2=====")
            datapath.send_msg(echo_req)

            #重要!不要同時發送echo請求,因為它幾乎同時會生成大量echo回復。
            #在echo_reply_處理程序中處理echo reply時,會產生大量隊列等待延遲。
            hub.sleep(ECHO_REQUEST_INTERVAL)

    @set_ev_cls(ofp_event.EventOFPEchoReply,[MAIN_DISPATCHER,CONFIG_DISPATCHER,HANDSHAKE_DISPATCHER])
    def echo_reply_handler(self,ev):
        """
        處理echo響應報文,獲取控制器到交換機的鏈路往返時延

              Controller
                  |    
     echo latency |  
                 `|‘ 
                   Switch        
        """
        #print("================================")
        #print(ev)
        #print("================================")
        now_timestamp = time.time()
        try:
            echo_delay = now_timestamp - eval(ev.msg.data)
            self.dpid2echoDelay[ev.msg.datapath.id] = echo_delay
        except:
            return


    @set_ev_cls(ofp_event.EventOFPPacketIn,MAIN_DISPATCHER)
    def packet_in_handler(self,ev): #處理到達的LLDP報文,從而獲得LLDP時延
        """
                      Controller
                    |        /|\    
                   \|/         |
                Switch----->Switch
        """
        msg = ev.msg
        try:
            src_dpid,src_outport = LLDPPacket.lldp_parse(msg.data) #獲取兩個相鄰交換機的源交換機dpid和port_no(與目的交換機相連的端口)
            dst_dpid = msg.datapath.id #獲取目的交換機(第二個),因為來到控制器的消息是由第二個(目的)交換機上傳過來的
            dst_inport = msg.match['in_port']
            if self.switches is None:
                self.switches = lookup_service_brick("switches") #獲取交換機模塊實例

            #獲得key(Port類實例)和data(PortData類實例)
            for port in self.switches.ports.keys(): #開始獲取對應交換機端口的發送時間戳
                if src_dpid == port.dpid and src_outport == port.port_no: #匹配key
                    port_data = self.switches.ports[port] #獲取滿足key條件的values值PortData實例,內部保存了發送LLDP報文時的timestamp信息
                    timestamp = port_data.timestamp
                    if timestamp:
                        delay = time.time() - timestamp
                        self._save_delay_data(src=src_dpid,dst=dst_dpid,src_port=src_outport,lldpdealy=delay)
        except:
            return

    def _save_delay_data(self,src,dst,src_port,lldpdealy):
        key = "%s-%s-%s"%(src,src_port,dst)
        self.src_sport_dst2Delay[key] = lldpdealy

    @set_ev_cls(ofp_event.EventOFPStateChange,[MAIN_DISPATCHER, DEAD_DISPATCHER])
    def _state_change_handler(self, ev):
        datapath = ev.datapath
        if ev.state == MAIN_DISPATCHER:
            if not datapath.id in self.dpid2switch:
                self.logger.debug('Register datapath: %016x', datapath.id)
                self.dpid2switch[datapath.id] = datapath
        elif ev.state == DEAD_DISPATCHER:
            if datapath.id in self.dpid2switch:
                self.logger.debug('Unregister datapath: %016x', datapath.id)
                del self.dpid2switch[datapath.id]

        if self.topology == None:
            self.topology = lookup_service_brick("topology")
        #print("-----------------------_state_change_handler-----------------------")
        #print(self.topology.show_topology())
        #print(self.switches)

    def show_delay(self):
        #print("-----------------------show echo delay-----------------------")
        for key,val in self.dpid2echoDelay.items():
            print("s%d----%.12f"%(key,val))
        #print("-----------------------show LLDP delay-----------------------")
        for key,val in self.src_sport_dst2Delay.items():
            print("%s----%.12f"%(key,val))
      
全部代碼

四:實驗測試

回顧:拓撲代碼和時延代碼

from ryu.base import app_manager

from ryu.ofproto import ofproto_v1_3

from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER,CONFIG_DISPATCHER,DEAD_DISPATCHER #只是表示datapath數據路徑的狀態
from ryu.controller.handler import set_ev_cls

from ryu.lib import hub
from ryu.lib.packet import packet,ethernet

from ryu.topology import event,switches
from ryu.topology.api import get_switch,get_link,get_host

import threading,time,random

DELAY_MONITOR_PERIOD = 5

class TopoDetect(app_manager.RyuApp):
    OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]

    def __init__(self,*args,**kwargs):
        super(TopoDetect,self).__init__(*args,**kwargs)
        self.topology_api_app = self
        self.name = "topology"
        self.link_list = None
        self.switch_list = None
        self.host_list = None

        self.dpid2id = {}
        self.id2dpid = {}
        self.dpid2switch = {}

        self.ip2host = {}
        self.ip2switch = {}

        self.net_size = 0
        self.net_topo = []

        self.net_flag = False
        self.net_arrived = 0
        
        self.monitor_thread = hub.spawn(self._monitor)

    def _monitor(self):
        """
        協程實現偽並發,探測拓撲狀態
        """
        while True:
            #print("------------------_monitor")
            self._host_add_handler(None) #主機單獨提取處理
            self.get_topology(None)
            hub.sleep(DELAY_MONITOR_PERIOD) #5秒一次


    @set_ev_cls(ofp_event.EventOFPSwitchFeatures,CONFIG_DISPATCHER)
    def switch_feature_handle(self,ev):
        """
        datapath中有配置消息到達
        """
        #print("------XXXXXXXXXXX------%d------XXXXXXXXXXX------------switch_feature_handle"%self.net_arrived)
        #print("----%s----------",ev.msg)
        msg = ev.msg
        datapath = msg.datapath
        ofproto = datapath.ofproto
        ofp_parser = datapath.ofproto_parser

        match = ofp_parser.OFPMatch()

        actions = [ofp_parser.OFPActionOutput(ofproto.OFPP_CONTROLLER,ofproto.OFPCML_NO_BUFFER)]

        self.add_flow(datapath=datapath,priority=0,match=match,actions=actions,extra_info="config infomation arrived!!")


    def add_flow(self,datapath,priority,match,actions,idle_timeout=0,hard_timeout=0,extra_info=None):
        #print("------------------add_flow:")
        if extra_info != None:
            print(extra_info)
        ofproto = datapath.ofproto
        ofp_parser = datapath.ofproto_parser

        inst = [ofp_parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS,actions)]

        mod = ofp_parser.OFPFlowMod(datapath=datapath,priority=priority,
                                    idle_timeout=idle_timeout,
                                    hard_timeout=hard_timeout,
                                    match=match,instructions=inst)
        datapath.send_msg(mod);

    @set_ev_cls(ofp_event.EventOFPPacketIn,MAIN_DISPATCHER)
    def packet_in_handler(self,ev):
        #print("------------------packet_in_handler")
        msg = ev.msg
        datapath = msg.datapath
        ofproto = datapath.ofproto
        ofp_parser = datapath.ofproto_parser

        dpid = datapath.id
        in_port = msg.match['in_port']

        pkt = packet.Packet(msg.data)
        eth_pkt = pkt.get_protocol(ethernet.ethernet)
        dst = eth_pkt.dst
        src = eth_pkt.src

        #self.logger.info("------------------Controller %s get packet, Mac address from: %s send to: %s , send from datapath: %s,in port is: %s"
        #                    ,dpid,src,dst,dpid,in_port)
        self.get_topology(None)


    @set_ev_cls([event.EventHostAdd])
    def _host_add_handler(self,ev):    #主機信息單獨處理,不屬於網絡拓撲
        self.host_list = get_host(self.topology_api_app) #3.需要使用pingall,主機通過與邊緣交換機連接,才能告訴控制器
        #獲取主機信息字典ip2host{ipv4:host object}  ip2switch{ipv4:dpid}
        for i,host in enumerate(self.host_list):
            self.ip2switch["%s"%host.ipv4] = host.port.dpid
            self.ip2host["%s"%host.ipv4] = host


    events = [event.EventSwitchEnter, event.EventSwitchLeave,
               event.EventSwitchReconnected,
               event.EventPortAdd, event.EventPortDelete,
               event.EventPortModify,
               event.EventLinkAdd, event.EventLinkDelete]
    @set_ev_cls(events)
    def get_topology(self,ev):
        #print("------+++++++++++------%d------+++++++++++------------get_topology"%self.net_arrived)

        self.net_flag = False
        self.net_topo = []

        #print("-----------------get_topology")
        #獲取所有的交換機、鏈路
        self.switch_list = get_switch(self.topology_api_app) #1.只要交換機與控制器聯通,就可以獲取
        self.link_list = get_link(self.topology_api_app) #2.在ryu啟動時,加上--observe-links即可用於拓撲發現
        
        #獲取交換機字典id2dpid{id:dpid} dpid2switch{dpid:switch object}
        for i,switch in enumerate(self.switch_list):
            self.id2dpid[i] = switch.dp.id
            self.dpid2id[switch.dp.id] = i
            self.dpid2switch[switch.dp.id] = switch


        #根據鏈路信息,開始獲取拓撲信息
        self.net_size = len(self.id2dpid) #表示網絡中交換機個數
        for i in range(self.net_size):
            self.net_topo.append([0]*self.net_size)

        for link in self.link_list:
            src_dpid = link.src.dpid
            src_port = link.src.port_no

            dst_dpid = link.dst.dpid
            dst_port = link.dst.port_no

            try:
                sid = self.dpid2id[src_dpid]
                did = self.dpid2id[dst_dpid]
            except KeyError as e:
                #print("--------------Error:get KeyError with link infomation(%s)"%e)
                return
            self.net_topo[sid][did] = [src_port,0] #注意:這里0表示存在鏈路,后面可以修改為時延
            self.net_topo[did][sid] = [dst_port,0] #注意:修改為列表,不要用元組,元組無法修改,我們后面要修改時延


        self.net_flag = True #表示網絡拓撲創建成功

    def show_topology(self):
        print("-----------------show_topology")
        print("----------switch network----------")
        line_info = "         "
        for i in range(self.net_size):
            line_info+="        s%-5d        "%self.id2dpid[i]
        print(line_info)
        for i in range(self.net_size):
            line_info = "s%d      "%self.id2dpid[i]
            for j in range(self.net_size):
                if self.net_topo[i][j] == 0:
                    line_info+="%-22d"%0
                else:
                    line_info+="(%d,%.12f)    "%tuple(self.net_topo[i][j])
            print(line_info)

        print("----------host 2 switch----------")
        for key,val in self.ip2switch.items():
            print("%s---s%d"%(key,val))
TopoDetect.py
from ryu.base import app_manager
from ryu.base.app_manager import lookup_service_brick

from ryu.ofproto import ofproto_v1_3

from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER,CONFIG_DISPATCHER,DEAD_DISPATCHER,HANDSHAKE_DISPATCHER #只是表示datapath數據路徑的狀態
from ryu.controller.handler import set_ev_cls

from ryu.lib import hub
from ryu.lib.packet import packet,ethernet

from ryu.topology.switches import Switches
from ryu.topology.switches import LLDPPacket

import time

ECHO_REQUEST_INTERVAL = 0.05
DELAY_DETECTING_PERIOD = 5

class DelayDetect(app_manager.RyuApp):
    OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]

    def __init__(self,*args,**kwargs):
        super(DelayDetect,self).__init__(*args,**kwargs)
        self.name = "delay"

        self.topology = lookup_service_brick("topology") #注意:我們使用lookup_service_brick加載模塊實例時,對於我們自己定義的app,我們需要在類中定義self.name。
        self.switches = lookup_service_brick("switches") #此外,最重要的是:我們啟動本模塊DelayDetect時,必須同時啟動自定義的模塊!!! 比如:ryu-manager ./TopoDetect.py ./DelayDetect.py --verbose --observe-links

        self.dpid2switch = {} #或者直接為{},也可以。下面_state_change_handler也會添加進去
        self.dpid2echoDelay = {}

        self.src_sport_dst2Delay = {} #記錄LLDP報文測量的時延。實際上可以直接更新,這里單獨記錄,為了單獨展示 {”src_dpid-srt_port-dst_dpid“:delay}

        self.detector_thread = hub.spawn(self._detector)

    def _detector(self):
        """
        協程實現偽並發,探測鏈路時延
        """
        while True:
            if self.topology == None:
                self.topology = lookup_service_brick("topology")
            if self.topology.net_flag:
                #print("------------------_detector------------------")
                self._send_echo_request()
                self.get_link_delay()
                if self.topology.net_flag:
                    try:
                        self.show_delay()
                        self.topology.show_topology()
                    except Exception as err:
                        print("------------------Detect delay failure!!!------------------")
            hub.sleep(DELAY_DETECTING_PERIOD) #5秒一次

    def get_link_delay(self):
        """
        更新圖中的權值信息
        """
        #print("--------------get_link_delay-----------")
        for src_sport_dst in self.src_sport_dst2Delay.keys():
                src,sport,dst = tuple(map(eval,src_sport_dst.split("-")))
                if src in self.dpid2echoDelay.keys() and dst in self.dpid2echoDelay.keys():
                    sid,did = self.topology.dpid2id[src],self.topology.dpid2id[dst]
                    if self.topology.net_topo[sid][did] != 0:
                        if self.topology.net_topo[sid][did][0] == sport:
                            s_d_delay = self.src_sport_dst2Delay[src_sport_dst]-(self.dpid2echoDelay[src]+self.dpid2echoDelay[dst])/2;
                            if s_d_delay < 0: #注意:可能出現單向計算時延導致最后小於0,這是不允許的。則不進行更新,使用上一次原始值
                                continue
                            self.topology.net_topo[sid][did][1] = self.src_sport_dst2Delay[src_sport_dst]-(self.dpid2echoDelay[src]+self.dpid2echoDelay[dst])/2

    def _send_echo_request(self):
        """
        發生echo報文到datapath
        """
        #print("==========_send_echo_request==============")
        #print(self.dpid2switch)
        for datapath in self.dpid2switch.values():
            parser = datapath.ofproto_parser
            echo_req = parser.OFPEchoRequest(datapath,data=bytes("%.12f"%time.time(),encoding="utf8")) #獲取當前時間
            #print("==========_send_echo_request=========2=====")
            datapath.send_msg(echo_req)

            #重要!不要同時發送echo請求,因為它幾乎同時會生成大量echo回復。
            #在echo_reply_處理程序中處理echo reply時,會產生大量隊列等待延遲。
            hub.sleep(ECHO_REQUEST_INTERVAL)

    @set_ev_cls(ofp_event.EventOFPEchoReply,[MAIN_DISPATCHER,CONFIG_DISPATCHER,HANDSHAKE_DISPATCHER])
    def echo_reply_handler(self,ev):
        """
        處理echo響應報文,獲取控制器到交換機的鏈路往返時延

              Controller
                  |    
     echo latency |  
                 `|‘ 
                   Switch        
        """
        #print("================================")
        #print(ev)
        #print("================================")
        now_timestamp = time.time()
        try:
            echo_delay = now_timestamp - eval(ev.msg.data)
            self.dpid2echoDelay[ev.msg.datapath.id] = echo_delay
        except:
            return


    @set_ev_cls(ofp_event.EventOFPPacketIn,MAIN_DISPATCHER)
    def packet_in_handler(self,ev): #處理到達的LLDP報文,從而獲得LLDP時延
        """
                      Controller
                    |        /|\    
                   \|/         |
                Switch----->Switch
        """
        msg = ev.msg
        try:
            src_dpid,src_outport = LLDPPacket.lldp_parse(msg.data) #獲取兩個相鄰交換機的源交換機dpid和port_no(與目的交換機相連的端口)
            dst_dpid = msg.datapath.id #獲取目的交換機(第二個),因為來到控制器的消息是由第二個(目的)交換機上傳過來的
            dst_inport = msg.match['in_port']
            if self.switches is None:
                self.switches = lookup_service_brick("switches") #獲取交換機模塊實例

            #獲得key(Port類實例)和data(PortData類實例)
            for port in self.switches.ports.keys(): #開始獲取對應交換機端口的發送時間戳
                if src_dpid == port.dpid and src_outport == port.port_no: #匹配key
                    port_data = self.switches.ports[port] #獲取滿足key條件的values值PortData實例,內部保存了發送LLDP報文時的timestamp信息
                    timestamp = port_data.timestamp
                    if timestamp:
                        delay = time.time() - timestamp
                        self._save_delay_data(src=src_dpid,dst=dst_dpid,src_port=src_outport,lldpdealy=delay)
        except:
            return

    def _save_delay_data(self,src,dst,src_port,lldpdealy):
        key = "%s-%s-%s"%(src,src_port,dst)
        self.src_sport_dst2Delay[key] = lldpdealy

    @set_ev_cls(ofp_event.EventOFPStateChange,[MAIN_DISPATCHER, DEAD_DISPATCHER])
    def _state_change_handler(self, ev):
        datapath = ev.datapath
        if ev.state == MAIN_DISPATCHER:
            if not datapath.id in self.dpid2switch:
                self.logger.debug('Register datapath: %016x', datapath.id)
                self.dpid2switch[datapath.id] = datapath
        elif ev.state == DEAD_DISPATCHER:
            if datapath.id in self.dpid2switch:
                self.logger.debug('Unregister datapath: %016x', datapath.id)
                del self.dpid2switch[datapath.id]

        if self.topology == None:
            self.topology = lookup_service_brick("topology")
        #print("-----------------------_state_change_handler-----------------------")
        #print(self.topology.show_topology())
        #print(self.switches)

    def show_delay(self):
        #print("-----------------------show echo delay-----------------------")
        for key,val in self.dpid2echoDelay.items():
            print("s%d----%.12f"%(key,val))
        #print("-----------------------show LLDP delay-----------------------")
        for key,val in self.src_sport_dst2Delay.items():
            print("%s----%.12f"%(key,val))
      
DelayDetect.py

(一)啟動Ryu

ryu-manager ./TopoDetect.py ./DelayDetect.py --verbose --observe-links

(二)啟動mininet

sudo mn --topo=linear,4 --switch=ovsk --controller=remote --link=tc

注意:需要在mininet中使用pingall,才能使得交換機獲得host存在,從而使得控制器獲取host消息!!

 

(三)結果顯示

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM