SDN實驗---Ryu的應用開發(五)網絡拓撲發現


一:實驗簡介

(一)網絡拓撲信息:

其中1,2,3表示該交換機對應的端口號!!!

(二)用鄰接矩陣展示

其中左側列S1,S2,S3,S4表示出節點,----->,上面S1,S2,S3,S4表示入節點。

(m,0),m表示出節點的端口--->入節點,0暫時表示兩個節點之間的時延信息!

(三)主機信息展示

 

二:代碼實現 

(一)導入模塊

from ryu.base import app_manager

from ryu.ofproto import ofproto_v1_3

from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER,CONFIG_DISPATCHER,DEAD_DISPATCHER #只是表示datapath數據路徑的狀態
from ryu.controller.handler import set_ev_cls

from ryu.lib import hub
from ryu.lib.packet import packet,ethernet

from ryu.topology import event,switches
from ryu.topology.api import get_switch,get_link,get_host

import threading #需要設置線程鎖

(二)數據結構

DELAY_MONITOR_PERIOD = 5
LOCK = threading.RLock() #實現線程鎖


class
TopoDetect(app_manager.RyuApp): OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION] def __init__(self,*args,**kwargs): super(TopoDetect,self).__init__(*args,**kwargs) self.topology_api_app = self #用於保持對象本身,后面get_switch等方法需要(我們也可以直接傳入self) self.link_list = None #保存所有的link信息,由get_link獲得 self.switch_list = None #保存所有的switch信息,由get_switch獲得 self.host_list = None #保存所有的host信息,由get_host獲得 self.dpid2id = {} #獲取交換機dpid,以及自定義id--->{dpid:id} self.id2dpid = {} #對應上面的self.dpid2id,翻轉即可,因為我們使用id進行建立鄰接矩陣,這兩個結構方便查找 self.dpid2switch = {} #保存dpid和對應的交換機全部信息---->通過矩陣獲得id,然后獲得dpid,最后獲得交換機對象信息 self.ip2host = {} #根據ip,保存主機對象信息--->{ip:host} self.ip2switch = {} #根據ip,獲取當前主機是連接到哪個交換機--->{ip:dpid} self.net_size = 0 #記錄交換機個數(網絡拓撲大小) self.net_topo = [] #用於保存鄰接矩陣 self.net_flag = False #標識:用於表示拓撲網絡拓撲self.net_topo是否已經更新完成
self.net_arrived = 0 #標識:用於表示網絡中交換機消息到達,每當一個交換機到達以后,我們設置+1
self.monitor_thread
= hub.spawn(self._monitor) #協程實現定時檢測網絡拓撲

(三)實現基本openflow消息處理

    @set_ev_cls(ofp_event.EventOFPSwitchFeatures,CONFIG_DISPATCHER)
    def switch_feature_handle(self,ev):
        """
 datapath中有配置消息到達 """
LOCK.acquire()
self.net_arrived += 1 #表示有1個交換機到達
LOCK.release()
#print("------XXXXXXXXXXX------%d------XXXXXXXXXXX------------switch_feature_handle"%self.net_arrived) msg = ev.msg datapath = msg.datapath ofproto = datapath.ofproto ofp_parser = datapath.ofproto_parser match = ofp_parser.OFPMatch() actions = [ofp_parser.OFPActionOutput(ofproto.OFPP_CONTROLLER,ofproto.OFPCML_NO_BUFFER)] self.add_flow(datapath=datapath,priority=0,match=match,actions=actions,extra_info="config infomation arrived!!") def add_flow(self,datapath,priority,match,actions,idle_timeout=0,hard_timeout=0,extra_info=None): #print("------------------add_flow:") if extra_info != None: print(extra_info) ofproto = datapath.ofproto ofp_parser = datapath.ofproto_parser inst = [ofp_parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS,actions)] mod = ofp_parser.OFPFlowMod(datapath=datapath,priority=priority, idle_timeout=idle_timeout, hard_timeout=hard_timeout, match=match,instructions=inst) datapath.send_msg(mod); @set_ev_cls(ofp_event.EventOFPPacketIn,MAIN_DISPATCHER) def packet_in_handler(self,ev): #print("------------------packet_in_handler") msg = ev.msg datapath = msg.datapath ofproto = datapath.ofproto ofp_parser = datapath.ofproto_parser dpid = datapath.id in_port = msg.match['in_port'] pkt = packet.Packet(msg.data) eth_pkt = pkt.get_protocol(ethernet.ethernet) dst = eth_pkt.dst src = eth_pkt.src #self.logger.info("------------------Controller %s get packet, Mac address from: %s send to: %s , send from datapath: %s,in port is: %s" # ,dpid,src,dst,dpid,in_port) #self.get_topology(None)

注意:對於packet_in消息,我們沒有處理,所以整個網絡(交換機之間的鏈路)是無法工作通信的,

但是各個交換機可以與控制器通信(switch_feature_handle實現),主機也可以和邊緣交換機通信,

所以控制器可以獲取網絡拓撲信息!!! 

(四)實現拓撲發現功能

    def _monitor(self):
        """
        協程實現偽並發,探測拓撲狀態
        """
        while True:
            #print("------------------_monitor")
            self._host_add_handler(None) #主機單獨提取處理
            self.get_topology(None)
            if self.net_flag:
                try:
                    self.show_topology()
                except Exception as err:
                    print("Please use cmd: pingall to detect topology and wait a moment")
            hub.sleep(DELAY_MONITOR_PERIOD) #5秒一次
    @set_ev_cls([event.EventHostAdd])
    def _host_add_handler(self,ev):    #主機信息單獨處理,不屬於網絡拓撲 self.host_list = get_host(self.topology_api_app) #3.需要使用pingall,主機通過與邊緣交換機連接,才能告訴控制器
        #獲取主機信息字典ip2host{ipv4:host object}  ip2switch{ipv4:dpid}
        for i,host in enumerate(self.host_list):
            self.ip2switch["%s"%host.ipv4] = host.port.dpid
            self.ip2host["%s"%host.ipv4] = host


    events = [event.EventSwitchEnter, event.EventSwitchLeave,
               event.EventSwitchReconnected,
               event.EventPortAdd, event.EventPortDelete,
               event.EventPortModify,
               event.EventLinkAdd, event.EventLinkDelete]
    @set_ev_cls(events)
    def get_topology(self,ev):
        if not self.net_arrived:
            return

LOCK.acquire()
self.net_arrived -= 1
if self.net_arrived < 0:
self.net_arrived = 0
LOCK.release()
self.net_flag
= False self.net_topo = [] print("-----------------get_topology") #獲取所有的交換機、鏈路 self.switch_list = get_switch(self.topology_api_app) #1.只要交換機與控制器聯通,就可以獲取 self.link_list = get_link(self.topology_api_app) #2.在ryu啟動時,加上--observe-links即可用於拓撲發現 #獲取交換機字典id2dpid{id:dpid} dpid2switch{dpid:switch object} for i,switch in enumerate(self.switch_list): self.id2dpid[i] = switch.dp.id self.dpid2id[switch.dp.id] = i self.dpid2switch[switch.dp.id] = switch #根據鏈路信息,開始獲取拓撲信息 self.net_size = len(self.id2dpid) #表示網絡中交換機個數 for i in range(self.net_size): self.net_topo.append([0]*self.net_size) for link in self.link_list: src_dpid = link.src.dpid src_port = link.src.port_no dst_dpid = link.dst.dpid dst_port = link.dst.port_no try: sid = self.dpid2id[src_dpid] did = self.dpid2id[dst_dpid] except KeyError as e: print("--------------Error:get KeyError with link infomation(%s)"%e) return self.net_topo[sid][did] = [src_port,0] #注意:這里0表示存在鏈路,后面可以修改為時延 self.net_topo[did][sid] = [dst_port,0] #注意:修改為列表,不要用元組,元組無法修改,我們后面要修改時延 self.net_flag = True #表示網絡拓撲創建成功
    def show_topology(self):
        print("-----------------show_topology")
        print("----------switch network----------")
        line_info = "         "
        for i in range(self.net_size):
            line_info+="        s%-5d        "%self.id2dpid[i]
        print(line_info)
        for i in range(self.net_size):
            line_info = "s%d      "%self.id2dpid[i]
            for j in range(self.net_size):
                if self.net_topo[i][j] == 0:
                    line_info+="%-22d"%0
                else:
                    line_info+="(%d,%.12f)    "%tuple(self.net_topo[i][j])
            print(line_info)

        print("----------host 2 switch----------")
        for key,val in self.ip2switch.items():
            print("%s---s%d"%(key,val))

(五)全部代碼

from ryu.base import app_manager

from ryu.ofproto import ofproto_v1_3

from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER,CONFIG_DISPATCHER,DEAD_DISPATCHER #只是表示datapath數據路徑的狀態
from ryu.controller.handler import set_ev_cls

from ryu.lib import hub
from ryu.lib.packet import packet,ethernet

from ryu.topology import event,switches
from ryu.topology.api import get_switch,get_link,get_host

import threading,time,random

DELAY_MONITOR_PERIOD = 5

class TopoDetect(app_manager.RyuApp):
    OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]

    def __init__(self,*args,**kwargs):
        super(TopoDetect,self).__init__(*args,**kwargs)
        self.topology_api_app = self
        self.name = "topology"
        self.link_list = None
        self.switch_list = None
        self.host_list = None

        self.dpid2id = {}
        self.id2dpid = {}
        self.dpid2switch = {}

        self.ip2host = {}
        self.ip2switch = {}

        self.net_size = 0
        self.net_topo = []

        self.net_flag = False
        self.net_arrived = 0
        
        self.monitor_thread = hub.spawn(self._monitor)

    def _monitor(self):
        """
        協程實現偽並發,探測拓撲狀態
        """
        while True:
            #print("------------------_monitor")
            self._host_add_handler(None) #主機單獨提取處理
            self.get_topology(None)
            if self.net_flag:
                try:
                    self.show_topology()
                except Exception as err:
                    print("Please use cmd: pingall to detect topology and wait a moment")
            hub.sleep(DELAY_MONITOR_PERIOD) #5秒一次


    @set_ev_cls(ofp_event.EventOFPSwitchFeatures,CONFIG_DISPATCHER)
    def switch_feature_handle(self,ev):
        """
        datapath中有配置消息到達
        """
        #print("------XXXXXXXXXXX------%d------XXXXXXXXXXX------------switch_feature_handle"%self.net_arrived)
        #print("----%s----------",ev.msg)
        msg = ev.msg
        datapath = msg.datapath
        ofproto = datapath.ofproto
        ofp_parser = datapath.ofproto_parser

        match = ofp_parser.OFPMatch()

        actions = [ofp_parser.OFPActionOutput(ofproto.OFPP_CONTROLLER,ofproto.OFPCML_NO_BUFFER)]

        self.add_flow(datapath=datapath,priority=0,match=match,actions=actions,extra_info="config infomation arrived!!")


    def add_flow(self,datapath,priority,match,actions,idle_timeout=0,hard_timeout=0,extra_info=None):
        #print("------------------add_flow:")
        if extra_info != None:
            print(extra_info)
        ofproto = datapath.ofproto
        ofp_parser = datapath.ofproto_parser

        inst = [ofp_parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS,actions)]

        mod = ofp_parser.OFPFlowMod(datapath=datapath,priority=priority,
                                    idle_timeout=idle_timeout,
                                    hard_timeout=hard_timeout,
                                    match=match,instructions=inst)
        datapath.send_msg(mod);

    @set_ev_cls(ofp_event.EventOFPPacketIn,MAIN_DISPATCHER)
    def packet_in_handler(self,ev):
        #print("------------------packet_in_handler")
        msg = ev.msg
        datapath = msg.datapath
        ofproto = datapath.ofproto
        ofp_parser = datapath.ofproto_parser

        dpid = datapath.id
        in_port = msg.match['in_port']

        pkt = packet.Packet(msg.data)
        eth_pkt = pkt.get_protocol(ethernet.ethernet)
        dst = eth_pkt.dst
        src = eth_pkt.src

        #self.logger.info("------------------Controller %s get packet, Mac address from: %s send to: %s , send from datapath: %s,in port is: %s"
        #                    ,dpid,src,dst,dpid,in_port)
        self.get_topology(None)


    @set_ev_cls([event.EventHostAdd])
    def _host_add_handler(self,ev):    #主機信息單獨處理,不屬於網絡拓撲
        self.host_list = get_host(self.topology_api_app) #3.需要使用pingall,主機通過與邊緣交換機連接,才能告訴控制器
        #獲取主機信息字典ip2host{ipv4:host object}  ip2switch{ipv4:dpid}
        for i,host in enumerate(self.host_list):
            self.ip2switch["%s"%host.ipv4] = host.port.dpid
            self.ip2host["%s"%host.ipv4] = host


    events = [event.EventSwitchEnter, event.EventSwitchLeave,
               event.EventSwitchReconnected,
               event.EventPortAdd, event.EventPortDelete,
               event.EventPortModify,
               event.EventLinkAdd, event.EventLinkDelete]
    @set_ev_cls(events)
    def get_topology(self,ev):
        #print("------+++++++++++------%d------+++++++++++------------get_topology"%self.net_arrived)

        self.net_flag = False
        self.net_topo = []

        #print("-----------------get_topology")
        #獲取所有的交換機、鏈路
        self.switch_list = get_switch(self.topology_api_app) #1.只要交換機與控制器聯通,就可以獲取
        self.link_list = get_link(self.topology_api_app) #2.在ryu啟動時,加上--observe-links即可用於拓撲發現
        
        #獲取交換機字典id2dpid{id:dpid} dpid2switch{dpid:switch object}
        for i,switch in enumerate(self.switch_list):
            self.id2dpid[i] = switch.dp.id
            self.dpid2id[switch.dp.id] = i
            self.dpid2switch[switch.dp.id] = switch


        #根據鏈路信息,開始獲取拓撲信息
        self.net_size = len(self.id2dpid) #表示網絡中交換機個數
        for i in range(self.net_size):
            self.net_topo.append([0]*self.net_size)

        for link in self.link_list:
            src_dpid = link.src.dpid
            src_port = link.src.port_no

            dst_dpid = link.dst.dpid
            dst_port = link.dst.port_no

            try:
                sid = self.dpid2id[src_dpid]
                did = self.dpid2id[dst_dpid]
            except KeyError as e:
                #print("--------------Error:get KeyError with link infomation(%s)"%e)
                return
            self.net_topo[sid][did] = [src_port,0] #注意:這里0表示存在鏈路,后面可以修改為時延
            self.net_topo[did][sid] = [dst_port,0] #注意:修改為列表,不要用元組,元組無法修改,我們后面要修改時延


        self.net_flag = True #表示網絡拓撲創建成功

    def show_topology(self):
        print("-----------------show_topology")
        print("----------switch network----------")
        line_info = "         "
        for i in range(self.net_size):
            line_info+="        s%-5d        "%self.id2dpid[i]
        print(line_info)
        for i in range(self.net_size):
            line_info = "s%d      "%self.id2dpid[i]
            for j in range(self.net_size):
                if self.net_topo[i][j] == 0:
                    line_info+="%-22d"%0
                else:
                    line_info+="(%d,%.12f)    "%tuple(self.net_topo[i][j])
            print(line_info)

        print("----------host 2 switch----------")
        for key,val in self.ip2switch.items():
            print("%s---s%d"%(key,val))
全部代碼

三:實驗驗證

(一)啟動Ryu控制器

ryu-manager TopoDetect.py --verbose --observe-links

其中--observe-links用於拓撲發現,添加之后用於鏈路的信息獲取!!

(二)啟動mininet

sudo mn --topo=linear,4 --switch=ovsk --controller=remote --link=tc

注意:需要在mininet中使用pingall,才能使得交換機獲得host存在,從而使得控制器獲取host消息!!

(三)結果顯示

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM