一:預備知識
SDN實驗---Ryu的應用開發(五)網絡拓撲發現
Ryu源碼之模塊功能分析
Ryu源碼之拓撲發現原理分析
二:實驗原理
網絡時延探測應用利用了Ryu自帶的Switches模塊的數據,獲取到了LLDP數據發送時的時間戳,然后和收到的時間戳進行相減,得到了LLDP數據包從控制器下發到交換機A,然后從交換機A到交換機B,再上報給控制器的時延T1,示例見圖1的藍色箭頭。
同理反向的時延T2由綠色的箭頭組成。
此外,控制器到交換機的往返時延由一個藍色箭頭和一個綠色箭頭組成,此部分時延由echo報文測試,分別為Ta,Tb。最后鏈路的前向后向平均時延T=(T1+T2-Ta-Tb)/2。
三:時延探測代碼實現
(一)拓撲發現模塊(已修改)
from ryu.base import app_manager from ryu.ofproto import ofproto_v1_3 from ryu.controller import ofp_event from ryu.controller.handler import MAIN_DISPATCHER,CONFIG_DISPATCHER,DEAD_DISPATCHER #只是表示datapath數據路徑的狀態 from ryu.controller.handler import set_ev_cls from ryu.lib import hub from ryu.lib.packet import packet,ethernet from ryu.topology import event,switches from ryu.topology.api import get_switch,get_link,get_host import threading,time,random DELAY_MONITOR_PERIOD = 5 class TopoDetect(app_manager.RyuApp): OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION] def __init__(self,*args,**kwargs): super(TopoDetect,self).__init__(*args,**kwargs) self.topology_api_app = self self.name = "topology" self.link_list = None self.switch_list = None self.host_list = None self.dpid2id = {} self.id2dpid = {} self.dpid2switch = {} self.ip2host = {} self.ip2switch = {} self.net_size = 0 self.net_topo = [] self.net_flag = False self.net_arrived = 0 self.monitor_thread = hub.spawn(self._monitor) def _monitor(self): #修改,只獲取拓撲,不主動顯示!!! """ 協程實現偽並發,探測拓撲狀態 """ while True: #print("------------------_monitor") self._host_add_handler(None) #主機單獨提取處理 self.get_topology(None) hub.sleep(DELAY_MONITOR_PERIOD) #5秒一次 @set_ev_cls(ofp_event.EventOFPSwitchFeatures,CONFIG_DISPATCHER) def switch_feature_handle(self,ev): """ datapath中有配置消息到達 """ #print("------XXXXXXXXXXX------%d------XXXXXXXXXXX------------switch_feature_handle"%self.net_arrived) #print("----%s----------",ev.msg) msg = ev.msg datapath = msg.datapath ofproto = datapath.ofproto ofp_parser = datapath.ofproto_parser match = ofp_parser.OFPMatch() actions = [ofp_parser.OFPActionOutput(ofproto.OFPP_CONTROLLER,ofproto.OFPCML_NO_BUFFER)] self.add_flow(datapath=datapath,priority=0,match=match,actions=actions,extra_info="config infomation arrived!!") def add_flow(self,datapath,priority,match,actions,idle_timeout=0,hard_timeout=0,extra_info=None): #print("------------------add_flow:") if extra_info != None: print(extra_info) ofproto = datapath.ofproto ofp_parser = datapath.ofproto_parser inst = [ofp_parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS,actions)] mod = ofp_parser.OFPFlowMod(datapath=datapath,priority=priority, idle_timeout=idle_timeout, hard_timeout=hard_timeout, match=match,instructions=inst) datapath.send_msg(mod); @set_ev_cls(ofp_event.EventOFPPacketIn,MAIN_DISPATCHER) def packet_in_handler(self,ev): #print("------------------packet_in_handler") msg = ev.msg datapath = msg.datapath ofproto = datapath.ofproto ofp_parser = datapath.ofproto_parser dpid = datapath.id in_port = msg.match['in_port'] pkt = packet.Packet(msg.data) eth_pkt = pkt.get_protocol(ethernet.ethernet) dst = eth_pkt.dst src = eth_pkt.src #self.logger.info("------------------Controller %s get packet, Mac address from: %s send to: %s , send from datapath: %s,in port is: %s" # ,dpid,src,dst,dpid,in_port) self.get_topology(None) @set_ev_cls([event.EventHostAdd]) def _host_add_handler(self,ev): #主機信息單獨處理,不屬於網絡拓撲 self.host_list = get_host(self.topology_api_app) #3.需要使用pingall,主機通過與邊緣交換機連接,才能告訴控制器 #獲取主機信息字典ip2host{ipv4:host object} ip2switch{ipv4:dpid} for i,host in enumerate(self.host_list): self.ip2switch["%s"%host.ipv4] = host.port.dpid self.ip2host["%s"%host.ipv4] = host events = [event.EventSwitchEnter, event.EventSwitchLeave, event.EventSwitchReconnected, event.EventPortAdd, event.EventPortDelete, event.EventPortModify, event.EventLinkAdd, event.EventLinkDelete] @set_ev_cls(events) def get_topology(self,ev): #print("------+++++++++++------%d------+++++++++++------------get_topology"%self.net_arrived) self.net_flag = False self.net_topo = [] #print("-----------------get_topology") #獲取所有的交換機、鏈路 self.switch_list = get_switch(self.topology_api_app) #1.只要交換機與控制器聯通,就可以獲取 self.link_list = get_link(self.topology_api_app) #2.在ryu啟動時,加上--observe-links即可用於拓撲發現 #獲取交換機字典id2dpid{id:dpid} dpid2switch{dpid:switch object} for i,switch in enumerate(self.switch_list): self.id2dpid[i] = switch.dp.id self.dpid2id[switch.dp.id] = i self.dpid2switch[switch.dp.id] = switch #根據鏈路信息,開始獲取拓撲信息 self.net_size = len(self.id2dpid) #表示網絡中交換機個數 for i in range(self.net_size): self.net_topo.append([0]*self.net_size) for link in self.link_list: src_dpid = link.src.dpid src_port = link.src.port_no dst_dpid = link.dst.dpid dst_port = link.dst.port_no try: sid = self.dpid2id[src_dpid] did = self.dpid2id[dst_dpid] except KeyError as e: #print("--------------Error:get KeyError with link infomation(%s)"%e) return self.net_topo[sid][did] = [src_port,0] #注意:這里0表示存在鏈路,后面可以修改為時延 self.net_topo[did][sid] = [dst_port,0] #注意:修改為列表,不要用元組,元組無法修改,我們后面要修改時延 self.net_flag = True #表示網絡拓撲創建成功 def show_topology(self): print("-----------------show_topology") print("----------switch network----------") line_info = " " for i in range(self.net_size): line_info+=" s%-5d "%self.id2dpid[i] print(line_info) for i in range(self.net_size): line_info = "s%d "%self.id2dpid[i] for j in range(self.net_size): if self.net_topo[i][j] == 0: line_info+="%-22d"%0 else: line_info+="(%d,%.12f) "%tuple(self.net_topo[i][j]) print(line_info) print("----------host 2 switch----------") for key,val in self.ip2switch.items(): print("%s---s%d"%(key,val))
(二)模塊導入
from ryu.base import app_manager from ryu.base.app_manager import lookup_service_brick from ryu.ofproto import ofproto_v1_3 from ryu.controller import ofp_event from ryu.controller.handler import MAIN_DISPATCHER,CONFIG_DISPATCHER,DEAD_DISPATCHER,HANDSHAKE_DISPATCHER #只是表示datapath數據路徑的狀態 from ryu.controller.handler import set_ev_cls from ryu.lib import hub from ryu.lib.packet import packet,ethernet from ryu.topology.switches import Switches from ryu.topology.switches import LLDPPacket import time
(三)數據結構
ECHO_REQUEST_INTERVAL = 0.05 DELAY_DETECTING_PERIOD = 5 class DelayDetect(app_manager.RyuApp): OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION] def __init__(self,*args,**kwargs): super(DelayDetect,self).__init__(*args,**kwargs) self.name = "delay" self.topology = lookup_service_brick("topology") #注意:我們使用lookup_service_brick加載模塊實例時,對於我們自己定義的app,我們需要在類中定義self.name。 self.switches = lookup_service_brick("switches") #此外,最重要的是:我們啟動本模塊DelayDetect時,必須同時啟動自定義的模塊!!! 比如:ryu-manager ./TopoDetect.py ./DelayDetect.py --verbose --observe-links self.dpid2switch = {} #或者直接為{},也可以。下面_state_change_handler也會添加進去 self.dpid2echoDelay = {} #記錄echo時延 self.src_sport_dst2Delay = {} #記錄LLDP報文測量的時延。實際上可以直接更新,這里單獨記錄,為了單獨展示 {”src_dpid-srt_port-dst_dpid“:delay} self.detector_thread = hub.spawn(self._detector)
(四)協程獲取鏈路時延
def _detector(self): """ 協程實現偽並發,探測鏈路時延 """ while True: if self.topology == None: self.topology = lookup_service_brick("topology") if self.topology.net_flag: #print("------------------_detector------------------") self._send_echo_request() self.get_link_delay() if self.topology.net_flag: try: self.show_delay() self.topology.show_topology() #拓撲顯示 except Exception as err: print("------------------Detect delay failure!!!------------------") hub.sleep(DELAY_DETECTING_PERIOD) #5秒一次
(五)獲取Echo時延
def _send_echo_request(self): """ 發生echo報文到datapath """ for datapath in self.dpid2switch.values(): parser = datapath.ofproto_parser echo_req = parser.OFPEchoRequest(datapath,data=bytes("%.12f"%time.time(),encoding="utf8")) #獲取當前時間 datapath.send_msg(echo_req) #重要!不要同時發送echo請求,因為它幾乎同時會生成大量echo回復。 #在echo_reply_處理程序中處理echo reply時,會產生大量隊列等待延遲。 hub.sleep(ECHO_REQUEST_INTERVAL) @set_ev_cls(ofp_event.EventOFPEchoReply,[MAIN_DISPATCHER,CONFIG_DISPATCHER,HANDSHAKE_DISPATCHER]) def echo_reply_handler(self,ev): """ 處理echo響應報文,獲取控制器到交換機的鏈路往返時延 Controller | echo latency | `|‘ Switch """ now_timestamp = time.time() try: echo_delay = now_timestamp - eval(ev.msg.data) self.dpid2echoDelay[ev.msg.datapath.id] = echo_delay except: return
(六)獲取LLDP時延
補充:前面我們通過lookup_service_brick("switches"),實例化了switches模塊。詳細見:https://www.cnblogs.com/ssyfj/p/14193150.html。該模塊中通過協程實現了周期0.05s發送LLDP數據包。所以我們下面可以直接獲取LLDP數據報。
@set_ev_cls(ofp_event.EventOFPPacketIn,MAIN_DISPATCHER) def packet_in_handler(self,ev): #處理到達的LLDP報文,從而獲得LLDP時延 """ Controller | /|\ \|/ | Switch----->Switch """ msg = ev.msg try: src_dpid,src_outport = LLDPPacket.lldp_parse(msg.data) #獲取兩個相鄰交換機的源交換機dpid和port_no(與目的交換機相連的端口) dst_dpid = msg.datapath.id #獲取目的交換機(第二個),因為來到控制器的消息是由第二個(目的)交換機上傳過來的 dst_inport = msg.match['in_port'] if self.switches is None: self.switches = lookup_service_brick("switches") #獲取交換機模塊實例 #獲得key(Port類實例)和data(PortData類實例) for port in self.switches.ports.keys(): #開始獲取對應交換機端口的發送時間戳 if src_dpid == port.dpid and src_outport == port.port_no: #匹配key port_data = self.switches.ports[port] #獲取滿足key條件的values值PortData實例,內部保存了發送LLDP報文時的timestamp信息 timestamp = port_data.timestamp if timestamp: delay = time.time() - timestamp self._save_delay_data(src=src_dpid,dst=dst_dpid,src_port=src_outport,lldpdealy=delay) except: return def _save_delay_data(self,src,dst,src_port,lldpdealy): key = "%s-%s-%s"%(src,src_port,dst) self.src_sport_dst2Delay[key] = lldpdealy
(七)根據LLDP和Echo時延,更新網絡拓撲圖中的權值信息
def get_link_delay(self): """ 更新圖中的權值信息 """ print("--------------get_link_delay-----------") for src_sport_dst in self.src_sport_dst2Delay.keys(): src,sport,dst = tuple(map(eval,src_sport_dst.split("-"))) if src in self.dpid2echoDelay.keys() and dst in self.dpid2echoDelay.keys(): sid,did = self.topology.dpid2id[src],self.topology.dpid2id[dst] if self.topology.net_topo[sid][did] != 0: if self.topology.net_topo[sid][did][0] == sport: s_d_delay = self.src_sport_dst2Delay[src_sport_dst]-(self.dpid2echoDelay[src]+self.dpid2echoDelay[dst])/2; if s_d_delay < 0: #注意:可能出現單向計算時延導致最后小於0,這是不允許的。則不進行更新,使用上一次原始值 continue self.topology.net_topo[sid][did][1] = self.src_sport_dst2Delay[src_sport_dst]-(self.dpid2echoDelay[src]+self.dpid2echoDelay[dst])/2
(八)顯示網絡拓撲圖和Echo、LLDP時延信息
@set_ev_cls(ofp_event.EventOFPStateChange,[MAIN_DISPATCHER, DEAD_DISPATCHER]) def _state_change_handler(self, ev): datapath = ev.datapath if ev.state == MAIN_DISPATCHER: if not datapath.id in self.dpid2switch: self.logger.debug('Register datapath: %016x', datapath.id) self.dpid2switch[datapath.id] = datapath elif ev.state == DEAD_DISPATCHER: if datapath.id in self.dpid2switch: self.logger.debug('Unregister datapath: %016x', datapath.id) del self.dpid2switch[datapath.id] if self.topology == None: self.topology = lookup_service_brick("topology") print("-----------------------_state_change_handler-----------------------") print(self.topology.show_topology()) print(self.switches) def show_delay(self): print("-----------------------show echo delay-----------------------") for key,val in self.dpid2echoDelay.items(): print("s%d----%.12f"%(key,val)) print("-----------------------show LLDP delay-----------------------") for key,val in self.src_sport_dst2Delay.items(): print("%s----%.12f"%(key,val))
(九)全部代碼

from ryu.base import app_manager from ryu.base.app_manager import lookup_service_brick from ryu.ofproto import ofproto_v1_3 from ryu.controller import ofp_event from ryu.controller.handler import MAIN_DISPATCHER,CONFIG_DISPATCHER,DEAD_DISPATCHER,HANDSHAKE_DISPATCHER #只是表示datapath數據路徑的狀態 from ryu.controller.handler import set_ev_cls from ryu.lib import hub from ryu.lib.packet import packet,ethernet from ryu.topology.switches import Switches from ryu.topology.switches import LLDPPacket import time ECHO_REQUEST_INTERVAL = 0.05 DELAY_DETECTING_PERIOD = 5 class DelayDetect(app_manager.RyuApp): OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION] def __init__(self,*args,**kwargs): super(DelayDetect,self).__init__(*args,**kwargs) self.name = "delay" self.topology = lookup_service_brick("topology") #注意:我們使用lookup_service_brick加載模塊實例時,對於我們自己定義的app,我們需要在類中定義self.name。 self.switches = lookup_service_brick("switches") #此外,最重要的是:我們啟動本模塊DelayDetect時,必須同時啟動自定義的模塊!!! 比如:ryu-manager ./TopoDetect.py ./DelayDetect.py --verbose --observe-links self.dpid2switch = {} #或者直接為{},也可以。下面_state_change_handler也會添加進去 self.dpid2echoDelay = {} self.src_sport_dst2Delay = {} #記錄LLDP報文測量的時延。實際上可以直接更新,這里單獨記錄,為了單獨展示 {”src_dpid-srt_port-dst_dpid“:delay} self.detector_thread = hub.spawn(self._detector) def _detector(self): """ 協程實現偽並發,探測鏈路時延 """ while True: if self.topology == None: self.topology = lookup_service_brick("topology") if self.topology.net_flag: #print("------------------_detector------------------") self._send_echo_request() self.get_link_delay() if self.topology.net_flag: try: self.show_delay() self.topology.show_topology() except Exception as err: print("------------------Detect delay failure!!!------------------") hub.sleep(DELAY_DETECTING_PERIOD) #5秒一次 def get_link_delay(self): """ 更新圖中的權值信息 """ #print("--------------get_link_delay-----------") for src_sport_dst in self.src_sport_dst2Delay.keys(): src,sport,dst = tuple(map(eval,src_sport_dst.split("-"))) if src in self.dpid2echoDelay.keys() and dst in self.dpid2echoDelay.keys(): sid,did = self.topology.dpid2id[src],self.topology.dpid2id[dst] if self.topology.net_topo[sid][did] != 0: if self.topology.net_topo[sid][did][0] == sport: s_d_delay = self.src_sport_dst2Delay[src_sport_dst]-(self.dpid2echoDelay[src]+self.dpid2echoDelay[dst])/2; if s_d_delay < 0: #注意:可能出現單向計算時延導致最后小於0,這是不允許的。則不進行更新,使用上一次原始值 continue self.topology.net_topo[sid][did][1] = self.src_sport_dst2Delay[src_sport_dst]-(self.dpid2echoDelay[src]+self.dpid2echoDelay[dst])/2 def _send_echo_request(self): """ 發生echo報文到datapath """ #print("==========_send_echo_request==============") #print(self.dpid2switch) for datapath in self.dpid2switch.values(): parser = datapath.ofproto_parser echo_req = parser.OFPEchoRequest(datapath,data=bytes("%.12f"%time.time(),encoding="utf8")) #獲取當前時間 #print("==========_send_echo_request=========2=====") datapath.send_msg(echo_req) #重要!不要同時發送echo請求,因為它幾乎同時會生成大量echo回復。 #在echo_reply_處理程序中處理echo reply時,會產生大量隊列等待延遲。 hub.sleep(ECHO_REQUEST_INTERVAL) @set_ev_cls(ofp_event.EventOFPEchoReply,[MAIN_DISPATCHER,CONFIG_DISPATCHER,HANDSHAKE_DISPATCHER]) def echo_reply_handler(self,ev): """ 處理echo響應報文,獲取控制器到交換機的鏈路往返時延 Controller | echo latency | `|‘ Switch """ #print("================================") #print(ev) #print("================================") now_timestamp = time.time() try: echo_delay = now_timestamp - eval(ev.msg.data) self.dpid2echoDelay[ev.msg.datapath.id] = echo_delay except: return @set_ev_cls(ofp_event.EventOFPPacketIn,MAIN_DISPATCHER) def packet_in_handler(self,ev): #處理到達的LLDP報文,從而獲得LLDP時延 """ Controller | /|\ \|/ | Switch----->Switch """ msg = ev.msg try: src_dpid,src_outport = LLDPPacket.lldp_parse(msg.data) #獲取兩個相鄰交換機的源交換機dpid和port_no(與目的交換機相連的端口) dst_dpid = msg.datapath.id #獲取目的交換機(第二個),因為來到控制器的消息是由第二個(目的)交換機上傳過來的 dst_inport = msg.match['in_port'] if self.switches is None: self.switches = lookup_service_brick("switches") #獲取交換機模塊實例 #獲得key(Port類實例)和data(PortData類實例) for port in self.switches.ports.keys(): #開始獲取對應交換機端口的發送時間戳 if src_dpid == port.dpid and src_outport == port.port_no: #匹配key port_data = self.switches.ports[port] #獲取滿足key條件的values值PortData實例,內部保存了發送LLDP報文時的timestamp信息 timestamp = port_data.timestamp if timestamp: delay = time.time() - timestamp self._save_delay_data(src=src_dpid,dst=dst_dpid,src_port=src_outport,lldpdealy=delay) except: return def _save_delay_data(self,src,dst,src_port,lldpdealy): key = "%s-%s-%s"%(src,src_port,dst) self.src_sport_dst2Delay[key] = lldpdealy @set_ev_cls(ofp_event.EventOFPStateChange,[MAIN_DISPATCHER, DEAD_DISPATCHER]) def _state_change_handler(self, ev): datapath = ev.datapath if ev.state == MAIN_DISPATCHER: if not datapath.id in self.dpid2switch: self.logger.debug('Register datapath: %016x', datapath.id) self.dpid2switch[datapath.id] = datapath elif ev.state == DEAD_DISPATCHER: if datapath.id in self.dpid2switch: self.logger.debug('Unregister datapath: %016x', datapath.id) del self.dpid2switch[datapath.id] if self.topology == None: self.topology = lookup_service_brick("topology") #print("-----------------------_state_change_handler-----------------------") #print(self.topology.show_topology()) #print(self.switches) def show_delay(self): #print("-----------------------show echo delay-----------------------") for key,val in self.dpid2echoDelay.items(): print("s%d----%.12f"%(key,val)) #print("-----------------------show LLDP delay-----------------------") for key,val in self.src_sport_dst2Delay.items(): print("%s----%.12f"%(key,val))
四:實驗測試
回顧:拓撲代碼和時延代碼

from ryu.base import app_manager from ryu.ofproto import ofproto_v1_3 from ryu.controller import ofp_event from ryu.controller.handler import MAIN_DISPATCHER,CONFIG_DISPATCHER,DEAD_DISPATCHER #只是表示datapath數據路徑的狀態 from ryu.controller.handler import set_ev_cls from ryu.lib import hub from ryu.lib.packet import packet,ethernet from ryu.topology import event,switches from ryu.topology.api import get_switch,get_link,get_host import threading,time,random DELAY_MONITOR_PERIOD = 5 class TopoDetect(app_manager.RyuApp): OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION] def __init__(self,*args,**kwargs): super(TopoDetect,self).__init__(*args,**kwargs) self.topology_api_app = self self.name = "topology" self.link_list = None self.switch_list = None self.host_list = None self.dpid2id = {} self.id2dpid = {} self.dpid2switch = {} self.ip2host = {} self.ip2switch = {} self.net_size = 0 self.net_topo = [] self.net_flag = False self.net_arrived = 0 self.monitor_thread = hub.spawn(self._monitor) def _monitor(self): """ 協程實現偽並發,探測拓撲狀態 """ while True: #print("------------------_monitor") self._host_add_handler(None) #主機單獨提取處理 self.get_topology(None) hub.sleep(DELAY_MONITOR_PERIOD) #5秒一次 @set_ev_cls(ofp_event.EventOFPSwitchFeatures,CONFIG_DISPATCHER) def switch_feature_handle(self,ev): """ datapath中有配置消息到達 """ #print("------XXXXXXXXXXX------%d------XXXXXXXXXXX------------switch_feature_handle"%self.net_arrived) #print("----%s----------",ev.msg) msg = ev.msg datapath = msg.datapath ofproto = datapath.ofproto ofp_parser = datapath.ofproto_parser match = ofp_parser.OFPMatch() actions = [ofp_parser.OFPActionOutput(ofproto.OFPP_CONTROLLER,ofproto.OFPCML_NO_BUFFER)] self.add_flow(datapath=datapath,priority=0,match=match,actions=actions,extra_info="config infomation arrived!!") def add_flow(self,datapath,priority,match,actions,idle_timeout=0,hard_timeout=0,extra_info=None): #print("------------------add_flow:") if extra_info != None: print(extra_info) ofproto = datapath.ofproto ofp_parser = datapath.ofproto_parser inst = [ofp_parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS,actions)] mod = ofp_parser.OFPFlowMod(datapath=datapath,priority=priority, idle_timeout=idle_timeout, hard_timeout=hard_timeout, match=match,instructions=inst) datapath.send_msg(mod); @set_ev_cls(ofp_event.EventOFPPacketIn,MAIN_DISPATCHER) def packet_in_handler(self,ev): #print("------------------packet_in_handler") msg = ev.msg datapath = msg.datapath ofproto = datapath.ofproto ofp_parser = datapath.ofproto_parser dpid = datapath.id in_port = msg.match['in_port'] pkt = packet.Packet(msg.data) eth_pkt = pkt.get_protocol(ethernet.ethernet) dst = eth_pkt.dst src = eth_pkt.src #self.logger.info("------------------Controller %s get packet, Mac address from: %s send to: %s , send from datapath: %s,in port is: %s" # ,dpid,src,dst,dpid,in_port) self.get_topology(None) @set_ev_cls([event.EventHostAdd]) def _host_add_handler(self,ev): #主機信息單獨處理,不屬於網絡拓撲 self.host_list = get_host(self.topology_api_app) #3.需要使用pingall,主機通過與邊緣交換機連接,才能告訴控制器 #獲取主機信息字典ip2host{ipv4:host object} ip2switch{ipv4:dpid} for i,host in enumerate(self.host_list): self.ip2switch["%s"%host.ipv4] = host.port.dpid self.ip2host["%s"%host.ipv4] = host events = [event.EventSwitchEnter, event.EventSwitchLeave, event.EventSwitchReconnected, event.EventPortAdd, event.EventPortDelete, event.EventPortModify, event.EventLinkAdd, event.EventLinkDelete] @set_ev_cls(events) def get_topology(self,ev): #print("------+++++++++++------%d------+++++++++++------------get_topology"%self.net_arrived) self.net_flag = False self.net_topo = [] #print("-----------------get_topology") #獲取所有的交換機、鏈路 self.switch_list = get_switch(self.topology_api_app) #1.只要交換機與控制器聯通,就可以獲取 self.link_list = get_link(self.topology_api_app) #2.在ryu啟動時,加上--observe-links即可用於拓撲發現 #獲取交換機字典id2dpid{id:dpid} dpid2switch{dpid:switch object} for i,switch in enumerate(self.switch_list): self.id2dpid[i] = switch.dp.id self.dpid2id[switch.dp.id] = i self.dpid2switch[switch.dp.id] = switch #根據鏈路信息,開始獲取拓撲信息 self.net_size = len(self.id2dpid) #表示網絡中交換機個數 for i in range(self.net_size): self.net_topo.append([0]*self.net_size) for link in self.link_list: src_dpid = link.src.dpid src_port = link.src.port_no dst_dpid = link.dst.dpid dst_port = link.dst.port_no try: sid = self.dpid2id[src_dpid] did = self.dpid2id[dst_dpid] except KeyError as e: #print("--------------Error:get KeyError with link infomation(%s)"%e) return self.net_topo[sid][did] = [src_port,0] #注意:這里0表示存在鏈路,后面可以修改為時延 self.net_topo[did][sid] = [dst_port,0] #注意:修改為列表,不要用元組,元組無法修改,我們后面要修改時延 self.net_flag = True #表示網絡拓撲創建成功 def show_topology(self): print("-----------------show_topology") print("----------switch network----------") line_info = " " for i in range(self.net_size): line_info+=" s%-5d "%self.id2dpid[i] print(line_info) for i in range(self.net_size): line_info = "s%d "%self.id2dpid[i] for j in range(self.net_size): if self.net_topo[i][j] == 0: line_info+="%-22d"%0 else: line_info+="(%d,%.12f) "%tuple(self.net_topo[i][j]) print(line_info) print("----------host 2 switch----------") for key,val in self.ip2switch.items(): print("%s---s%d"%(key,val))

from ryu.base import app_manager from ryu.base.app_manager import lookup_service_brick from ryu.ofproto import ofproto_v1_3 from ryu.controller import ofp_event from ryu.controller.handler import MAIN_DISPATCHER,CONFIG_DISPATCHER,DEAD_DISPATCHER,HANDSHAKE_DISPATCHER #只是表示datapath數據路徑的狀態 from ryu.controller.handler import set_ev_cls from ryu.lib import hub from ryu.lib.packet import packet,ethernet from ryu.topology.switches import Switches from ryu.topology.switches import LLDPPacket import time ECHO_REQUEST_INTERVAL = 0.05 DELAY_DETECTING_PERIOD = 5 class DelayDetect(app_manager.RyuApp): OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION] def __init__(self,*args,**kwargs): super(DelayDetect,self).__init__(*args,**kwargs) self.name = "delay" self.topology = lookup_service_brick("topology") #注意:我們使用lookup_service_brick加載模塊實例時,對於我們自己定義的app,我們需要在類中定義self.name。 self.switches = lookup_service_brick("switches") #此外,最重要的是:我們啟動本模塊DelayDetect時,必須同時啟動自定義的模塊!!! 比如:ryu-manager ./TopoDetect.py ./DelayDetect.py --verbose --observe-links self.dpid2switch = {} #或者直接為{},也可以。下面_state_change_handler也會添加進去 self.dpid2echoDelay = {} self.src_sport_dst2Delay = {} #記錄LLDP報文測量的時延。實際上可以直接更新,這里單獨記錄,為了單獨展示 {”src_dpid-srt_port-dst_dpid“:delay} self.detector_thread = hub.spawn(self._detector) def _detector(self): """ 協程實現偽並發,探測鏈路時延 """ while True: if self.topology == None: self.topology = lookup_service_brick("topology") if self.topology.net_flag: #print("------------------_detector------------------") self._send_echo_request() self.get_link_delay() if self.topology.net_flag: try: self.show_delay() self.topology.show_topology() except Exception as err: print("------------------Detect delay failure!!!------------------") hub.sleep(DELAY_DETECTING_PERIOD) #5秒一次 def get_link_delay(self): """ 更新圖中的權值信息 """ #print("--------------get_link_delay-----------") for src_sport_dst in self.src_sport_dst2Delay.keys(): src,sport,dst = tuple(map(eval,src_sport_dst.split("-"))) if src in self.dpid2echoDelay.keys() and dst in self.dpid2echoDelay.keys(): sid,did = self.topology.dpid2id[src],self.topology.dpid2id[dst] if self.topology.net_topo[sid][did] != 0: if self.topology.net_topo[sid][did][0] == sport: s_d_delay = self.src_sport_dst2Delay[src_sport_dst]-(self.dpid2echoDelay[src]+self.dpid2echoDelay[dst])/2; if s_d_delay < 0: #注意:可能出現單向計算時延導致最后小於0,這是不允許的。則不進行更新,使用上一次原始值 continue self.topology.net_topo[sid][did][1] = self.src_sport_dst2Delay[src_sport_dst]-(self.dpid2echoDelay[src]+self.dpid2echoDelay[dst])/2 def _send_echo_request(self): """ 發生echo報文到datapath """ #print("==========_send_echo_request==============") #print(self.dpid2switch) for datapath in self.dpid2switch.values(): parser = datapath.ofproto_parser echo_req = parser.OFPEchoRequest(datapath,data=bytes("%.12f"%time.time(),encoding="utf8")) #獲取當前時間 #print("==========_send_echo_request=========2=====") datapath.send_msg(echo_req) #重要!不要同時發送echo請求,因為它幾乎同時會生成大量echo回復。 #在echo_reply_處理程序中處理echo reply時,會產生大量隊列等待延遲。 hub.sleep(ECHO_REQUEST_INTERVAL) @set_ev_cls(ofp_event.EventOFPEchoReply,[MAIN_DISPATCHER,CONFIG_DISPATCHER,HANDSHAKE_DISPATCHER]) def echo_reply_handler(self,ev): """ 處理echo響應報文,獲取控制器到交換機的鏈路往返時延 Controller | echo latency | `|‘ Switch """ #print("================================") #print(ev) #print("================================") now_timestamp = time.time() try: echo_delay = now_timestamp - eval(ev.msg.data) self.dpid2echoDelay[ev.msg.datapath.id] = echo_delay except: return @set_ev_cls(ofp_event.EventOFPPacketIn,MAIN_DISPATCHER) def packet_in_handler(self,ev): #處理到達的LLDP報文,從而獲得LLDP時延 """ Controller | /|\ \|/ | Switch----->Switch """ msg = ev.msg try: src_dpid,src_outport = LLDPPacket.lldp_parse(msg.data) #獲取兩個相鄰交換機的源交換機dpid和port_no(與目的交換機相連的端口) dst_dpid = msg.datapath.id #獲取目的交換機(第二個),因為來到控制器的消息是由第二個(目的)交換機上傳過來的 dst_inport = msg.match['in_port'] if self.switches is None: self.switches = lookup_service_brick("switches") #獲取交換機模塊實例 #獲得key(Port類實例)和data(PortData類實例) for port in self.switches.ports.keys(): #開始獲取對應交換機端口的發送時間戳 if src_dpid == port.dpid and src_outport == port.port_no: #匹配key port_data = self.switches.ports[port] #獲取滿足key條件的values值PortData實例,內部保存了發送LLDP報文時的timestamp信息 timestamp = port_data.timestamp if timestamp: delay = time.time() - timestamp self._save_delay_data(src=src_dpid,dst=dst_dpid,src_port=src_outport,lldpdealy=delay) except: return def _save_delay_data(self,src,dst,src_port,lldpdealy): key = "%s-%s-%s"%(src,src_port,dst) self.src_sport_dst2Delay[key] = lldpdealy @set_ev_cls(ofp_event.EventOFPStateChange,[MAIN_DISPATCHER, DEAD_DISPATCHER]) def _state_change_handler(self, ev): datapath = ev.datapath if ev.state == MAIN_DISPATCHER: if not datapath.id in self.dpid2switch: self.logger.debug('Register datapath: %016x', datapath.id) self.dpid2switch[datapath.id] = datapath elif ev.state == DEAD_DISPATCHER: if datapath.id in self.dpid2switch: self.logger.debug('Unregister datapath: %016x', datapath.id) del self.dpid2switch[datapath.id] if self.topology == None: self.topology = lookup_service_brick("topology") #print("-----------------------_state_change_handler-----------------------") #print(self.topology.show_topology()) #print(self.switches) def show_delay(self): #print("-----------------------show echo delay-----------------------") for key,val in self.dpid2echoDelay.items(): print("s%d----%.12f"%(key,val)) #print("-----------------------show LLDP delay-----------------------") for key,val in self.src_sport_dst2Delay.items(): print("%s----%.12f"%(key,val))
(一)啟動Ryu
ryu-manager ./TopoDetect.py ./DelayDetect.py --verbose --observe-links
(二)啟動mininet
sudo mn --topo=linear,4 --switch=ovsk --controller=remote --link=tc
注意:需要在mininet中使用pingall,才能使得交換機獲得host存在,從而使得控制器獲取host消息!!
(三)結果顯示