1. RYU結構,源碼
1.1 RYU文件目錄
下面介紹ryu/ryu目錄下的主要目錄內容。

base
base中有一個非常重要的文件:app_manager.py,其作用是RYU應用的管理中心。用於加載RYU應用程序,接受從APP發送過來的信息,同時也完成消息的路由。
其主要的函數有app注冊、注銷、查找、並定義了RYUAPP基類,定義了RYUAPP的基本屬性。包含name, threads, events, event_handlers和observers等成員,以及對應的許多基本函數。如:start(), stop()等。
這個文件中還定義了AppManager基類,用於管理APP。定義了加載APP等函數。不過如果僅僅是開發APP的話,這個類可以不必關心。
controller——實現controller和交換機之間的互聯和事件處理
controller文件夾中許多非常重要的文件,如events.py, ofp_handler.py, controller.py等。其中controller.py中定義了OpenFlowController基類。用於定義OpenFlow的控制器,用於處理交換機和控制器的連接等事件,同時還可以產生事件和路由事件。其事件系統的定義,可以查看events.py和ofp_events.py。
在ofp_handler.py中定義了基本的handler(應該怎么稱呼呢?句柄?處理函數?),完成了基本的如:握手,錯誤信息處理和keep alive 等功能。更多的如packet_in_handler應該在app中定義。
在dpset.py文件中,定義了交換機端的一些消息,如端口狀態信息等,用於描述和操作交換機。如添加端口,刪除端口等操作。
lib——網絡基本協議的實現和使用
lib中定義了我們需要使用到的基本的數據結構,如dpid, mac和ip等數據結構。在lib/packet目錄下,還定義了許多網絡協議,如ICMP, DHCP, MPLS和IGMP等協議內容。而每一個數據包的類中都有parser和serialize兩個函數。用於解析和序列化數據包。
lib目錄下,還有ovs, netconf目錄,對應的目錄下有一些定義好的數據類型,不再贅述。
ofproto
在這個目錄下,基本分為兩類文件,一類是協議的數據結構定義,另一類是協議解析,也即數據包處理函數文件。如ofproto_v1_0.py是1.0版本的OpenFlow協議數據結構的定義,而ofproto_v1_0_parser.py則定義了1.0版本的協議編碼和解碼。
topology——交換機和鏈路的查詢模塊
包含了switches.py等文件,基本定義了一套交換機的數據結構。event.py定義了交換上的事件。dumper.py定義了獲取網絡拓撲的內容。最后api.py向上提供了一套調用topology目錄中定義函數的接口。
contrib——第三方庫
這個文件夾主要存放的是開源社區貢獻者的代碼。
cmd——入口函數
定義了RYU的命令系統,為controller的執行創建環境,接收和處理相關命令
services
完成了BGP和vrrp的實現。
tests
tests目錄下存放了單元測試以及整合測試的代碼。
1.2 RYU 架構
RYU SDN 架構:



組件功能:

1.3 應用程序編程模型

Ryu 事件處理、進程與線程:
1) Applications:該類繼承自ryu.base.app_manager.RyuApp,用戶邏輯被描述為一個APP。
2) Event : 繼承自ryu.controller.event.EventBase , 應用程序之間的通信由transmitting and receiving events 完成。
3) Event Queue:每一個application 都有一個隊列用於接收事件。
4) Threads:Ryu 使用第三方庫eventlets 運行多線程。因為線程是非搶占式的,因此,當執行耗時的處理程序時要非常小心。
5) Event loops: 創建一個application 時,會自動生成一個線程,該線程運行一個事件循環。當隊列事件不為空時,這個事件循環會加載該事件並且調用相應的事件處理函數(注冊之后)。
6) Additional threads:可以使用hub.spawn()添加其它線程,用來處理特殊的應用
7) Eventlets:這是一個第三方庫,里面的庫函數被封裝到hub 模塊中被開發人員加載使用。【提供線程和事件隊列的實現】
8) Event handlers:使用ryu.controller.handler.set_ev_cls 修飾一個事件處理函數。當該類型的事件觸發后,事件處理函數就會被應用程序的事件循環調用。
1.4 OpenFlow的解析和封裝
Ofp_handler
負責底層數據通信的模塊是ofp\_handler模塊。ofp\_handler啟動之后,start函數實例化了一個controller.OpenFlowController實例。OpenFlowController實例化之后,立即調用\__call\__()函數,call函數啟動了server\_loop去創建server socket,其handler為domain\_connection\_factory函數。每當收到一個switch連接,domain\_connection\_factory就會實例化一個datapath對象。這個對象用於描述交換機的所有行為。其中定義了接收循環和發送循環。
Datapath
datapath.serve函數是socket通信收發邏輯的入口。該函數啟動了一個綠色線程去處理發送循環,然后本線程負責接收循環的處理。self.\_send\_loop是發送主循環。其主要邏輯為:不斷獲取發送隊列是否有數據,若有,則發送;底層調用的是socket.send\_all()函數。
def serve(self):
send_thr = hub.spawn(self._send_loop)
# send hello message immediately
hello = self.ofproto_parser.OFPHello(self)
self.send_msg(hello)
try:
self._recv_loop()
finally:
hub.kill(send_thr)
hub.joinall([send_thr])
接收函數\_reck\_loop中實現了數據的接收和解析。
@_deactivate
def _recv_loop(self):
buf = bytearray() #初始化一個字節數組
required_len = ofproto_common.OFP_HEADER_SIZE # ofproto_common模塊定義了OpenFlow常用的公共屬性
# 如報頭長度=8
count = 0
while self.is_active:
ret = self.socket.recv(required_len)
if len(ret) == 0:
self.is_active = False
break
buf += ret
while len(buf) >= required_len:
# ofproto_parser是在Datapath實例的父類ProtocolDesc的屬性。
# 用於尋找對應協議版本的解析文件,如ofproto_v1_0_parser.py
# header函數是解析報頭的函數。定義在ofproto_parser.py。
(version, msg_type, msg_len, xid) = ofproto_parser.header(buf)
required_len = msg_len
if len(buf) < required_len:
break
# ofproto_parser.msg的定義並沒有在對應的ofproto_parser中
# msg函數的位置和header函數位置一樣,都在ofproto_parser.py中。
# msg返回的是解析完成的消息。
# msg函數返回了msg_parser函數的返回值
# ofproto_parser.py中的_MSG_PARSERS記錄了不同版本對應的msg_parser。其注冊手法是通過@ofproto_parser.register_msg_parser(ofproto.OFP_VERSION)裝飾器。
# 在對應版本的ofproto_parser,如ofproto_v1_0_parser.py中,都有定義一個同名的_MSG_PARSERS字典,這個字典用於記錄報文類型和解析函數的關系。此處命名不恰當,引入混淆。
# parser函數通過@register_parser來將函數注冊/記錄到_MSG_PARSERS字典中。
msg = ofproto_parser.msg(self,
version, msg_type, msg_len, xid, buf)
# LOG.debug('queue msg %s cls %s', msg, msg.__class__)
if msg:
# Ryu定義的Event system很簡單,在報文名前加上前綴“Event”,即是事件的名稱。
# 同時此類系帶msg信息。
# 使用send_event_to_obserevrs()函數將事件分發給監聽事件的handler,完成事件的分發。
ev = ofp_event.ofp_msg_to_ev(msg)
self.ofp_brick.send_event_to_observers(ev, self.state)
dispatchers = lambda x: x.callers[ev.__class__].dispatchers
# handler的注冊是通過使用controller.handler.py文件下定義的set_ev_handler作為裝飾器去注冊。
# self.ofp_brick在初始化時,由注冊在服務列表中查找名為"ofp_event"的模塊賦值。
# ofp_handler模塊的名字為"ofp_event",所以對應的模塊是ofp_handler
handlers = [handler for handler in
self.ofp_brick.get_handlers(ev) if
self.state in dispatchers(handler)]
for handler in handlers:
handler(ev)
buf = buf[required_len:]
required_len = ofproto_common.OFP_HEADER_SIZE
# We need to schedule other greenlets. Otherwise, ryu
# can't accept new switches or handle the existing
# switches. The limit is arbitrary. We need the better
# approach in the future.
count += 1
if count > 2048:
count = 0
hub.sleep(0)
OpenFlow協議實現
OpenFlow協議解析部分代碼大部分在ofproto目錄下,少部分在controller目錄下。首先介紹ofproto目錄下的源碼內容,再介紹controller目錄下的ofp_event文件。
__init__
首先,__init__.py並不為空。該文件定義了兩個功能類似的函數get_ofp_module()和get_ofp_modules(),前者用於取得協議版本對應的協議定義文件和協議解析模塊,后者則取出整個字典。對應的字典在ofproto_protocol模塊中定義。
ofproto\_protocol
在ofproto\_protocol定義了\_versions字典,具體如下:
_versions = {
ofproto_v1_0.OFP_VERSION: (ofproto_v1_0, ofproto_v1_0_parser),
ofproto_v1_2.OFP_VERSION: (ofproto_v1_2, ofproto_v1_2_parser),
ofproto_v1_3.OFP_VERSION: (ofproto_v1_3, ofproto_v1_3_parser),
ofproto_v1_4.OFP_VERSION: (ofproto_v1_4, ofproto_v1_4_parser),
}
除此之外,該文件還定義了Datapath的父類ProtocolDesc,此類基本上只完成了與協議版本相關的內容。該類最重要的兩個成員是self.ofproto和self.ofproto\_parser,其值指明所本次通信所使用的OpenFlow協議的版本以及對應的解析模塊。
ofproto\_common
ofproto\_common文件比較簡單,主要定義了OpenFlow需要使用的公共屬性,如監聽端口,報頭長度,報頭封裝格式等內容。
ofproto\_parser
ofproto\_parser文件定義了所有版本都需要的解析相關的公共屬性。如定義了最重要的基類MsgBase(StringifyMixin)。
StringifyMixin類的定義在lib.stringify文件,有興趣的讀者可自行查看。MsgBase基類定義了最基礎的屬性信息,具體如下所示:
@create_list_of_base_attributes
def __init__(self, datapath):
super(MsgBase, self).__init__()
self.datapath = datapath
self.version = None
self.msg_type = None
self.msg_len = None
self.xid = None
self.buf = None
此外,該類還定義了基礎的parser函數和serialize函數。基礎的parser函數基本什么都沒有做,僅返回一個賦值后的消息體。
@classmethod
def parser(cls, datapath, version, msg_type, msg_len, xid, buf):
msg_ = cls(datapath)
msg_.set_headers(version, msg_type, msg_len, xid)
msg_.set_buf(buf)
return msg_
serialize函數分為3部分,self.\_serialize\_pre(), self.\_serialize\_body()和self.\_serialize\_header()。本質上完成了header的序列化。關於body的序列化,將在對應的派生類中得到重寫。
ofproto_v1_0
以1.0版本為例介紹ofproto\_v1\_x.py文件的作用。由於Ryu支持多版本的OpenFlow,所以在ofproto目錄下,定義了從1.0到1.5版本的所有代碼實現。所以其文件命名為ofproto\_v1_x.py,x從[1,2,3,4,5]中獲得,分別對應相應的協議版本。
此類文件最重要的一個目的是定義了所有需要的靜態內容,包括某字段的所有選項以及消息封裝的格式以及長度。與OpenFlow消息內容相關的有協議的類型,動作的類型,port的類型等。此外對應每一個報文,都需要定義其封裝的格式,以及封裝的長度。Ryu采用了Python的Struct庫去完成數據的解封裝工作,關於Struct的介紹將在后續內容介紹。具體定義內容舉例如下:
# enum ofp_port
OFPP_MAX = 0xff00
OFPP_IN_PORT = 0xfff8 # Send the packet out the input port. This
# virtual port must be explicitly used
# in order to send back out of the input
# port.
OFPP_TABLE = 0xfff9 # Perform actions in flow table.
# NB: This can only be the destination
# port for packet-out messages.
OFPP_NORMAL = 0xfffa # Process with normal L2/L3 switching.
OFPP_FLOOD = 0xfffb # All physical ports except input port and
# those disabled by STP.
OFPP_ALL = 0xfffc # All physical ports except input port.
OFPP_CONTROLLER = 0xfffd # Send to controller.
OFPP_LOCAL = 0xfffe # Local openflow "port".
OFPP_NONE = 0xffff # Not associated with a physical port.
# enum ofp_type
OFPT_HELLO = 0 # Symmetric message
OFPT_ERROR = 1 # Symmetric message
OFPT_ECHO_REQUEST = 2 # Symmetric message
OFPT_ECHO_REPLY = 3 # Symmetric message
OFPT_VENDOR = 4 # Symmetric message
OFPT_FEATURES_REQUEST = 5 # Controller/switch message
OFPT_FEATURES_REPLY = 6 # Controller/switch message
OFPT_GET_CONFIG_REQUEST = 7 # Controller/switch message
OFPT_GET_CONFIG_REPLY = 8 # Controller/switch message
OFPT_SET_CONFIG = 9 # Controller/switch message
OFPT_PACKET_IN = 10 # Async message
OFPT_FLOW_REMOVED = 11 # Async message
OFPT_PORT_STATUS = 12 # Async message
OFPT_PACKET_OUT = 13 # Controller/switch message
OFPT_FLOW_MOD = 14 # Controller/switch message
OFPT_PORT_MOD = 15 # Controller/switch message
OFPT_STATS_REQUEST = 16 # Controller/switch message
OFPT_STATS_REPLY = 17 # Controller/switch message
OFPT_BARRIER_REQUEST = 18 # Controller/switch message
OFPT_BARRIER_REPLY = 19 # Controller/switch message
OFPT_QUEUE_GET_CONFIG_REQUEST = 20 # Controller/switch message
OFPT_QUEUE_GET_CONFIG_REPLY = 21 # Controller/switch message
OFP_HEADER_PACK_STR = '!BBHI'
OFP_HEADER_SIZE = 8
OFP_MSG_SIZE_MAX = 65535
assert calcsize(OFP_HEADER_PACK_STR) == OFP_HEADER_SIZE
OFP\_HEADER\_PACK\_STR = '!BBHI'的意思是將header按照8|8|16|32的長度封裝成對應的數值。在Python中分別對應的是1個字節的integer|一個字節的integer|2個字節的integer|4個字節的integer。
calcsize函數用於計算對應的format的長度。
ofproto_v1_0_parser
本模塊用於定義報文的解析等動態內容。模塊中定義了與OpenFlow協議對應的Common\_struct及message type對應的類。每一個message對應的類都是有MsgBase派生的,其繼承了父類的parser函數和serialize函數。若報文無消息體,如Hello報文,則無需重寫parser和serialize函數。
本模塊定義了幾個重要的全局函數:\_set\_msg\_type,\_register\_parser,msg\_parser和\_set\_msg\_reply。其作用介紹如下:
- _set_msg_type: 完成類與ofproto模塊中定義的報文名字的映射,原因在於ofproto模塊定義的名字並不是類名,而解析時需要使用ofproto中的名字。
- _register_parser:完成對應的類與類中的parser函數的映射,當解析函數從ofproto模塊的名字映射到類之后,若需要解析,則需從類對應到對應的解析函數。parser函數是一個類函數,所以在使用時必須傳入對應的類的對象作為參數。
- msg_parser:從_MSG_PARSERS中獲取對msg_type的parser,並返回解析之后的內容。
- _set_msg_reply:完成該類與對應的回應報文的映射。
def _set_msg_type(msg_type):
'''Annotate corresponding OFP message type'''
def _set_cls_msg_type(cls): cls.cls_msg_type = msg_type return cls return _set_cls_msg_type def _register_parser(cls): '''class decorator to register msg parser''' assert cls.cls_msg_type is not None assert cls.cls_msg_type not in _MSG_PARSERS _MSG_PARSERS[cls.cls_msg_type] = cls.parser return cls @ofproto_parser.register_msg_parser(ofproto.OFP_VERSION) def msg_parser(datapath, version, msg_type, msg_len, xid, buf): parser = _MSG_PARSERS.get(msg_type) return parser(datapath, version, msg_type, msg_len, xid, buf) def _set_msg_reply(msg_reply): '''Annotate OFP reply message class''' def _set_cls_msg_reply(cls): cls.cls_msg_reply = msg_reply return cls return _set_cls_msg_reply
報文如果有消息體,則需要重寫parser函數或者serialize函數,具體根據報文內容而不一樣。此處,分別以Packet\_in和Flow\_mod作為parser的案例和serialize的案例,示例如下:
@_register_parser
@_set_msg_type(ofproto.OFPT_PACKET_IN)
class OFPPacketIn(MsgBase): def __init__(self, datapath, buffer_id=None, total_len=None, in_port=None, reason=None, data=None): super(OFPPacketIn, self).__init__(datapath) self.buffer_id = buffer_id self.total_len = total_len self.in_port = in_port self.reason = reason self.data = data @classmethod def parser(cls, datapath, version, msg_type, msg_len, xid, buf): # 解析頭部,獲取msg msg = super(OFPPacketIn, cls).parser(datapath, version, msg_type, msg_len, xid, buf) # 解析body,獲取packet_in相關字段。 (msg.buffer_id, msg.total_len, msg.in_port, msg.reason) = struct.unpack_from( ofproto.OFP_PACKET_IN_PACK_STR, msg.buf, ofproto.OFP_HEADER_SIZE) # 將ofproto.OFP_PACKET_IN_SIZE長度之外的buf內容,賦值給data msg.data = msg.buf[ofproto.OFP_PACKET_IN_SIZE:] if msg.total_len < len(msg.data): # discard padding for 8-byte alignment of OFP packet msg.data = msg.data[:msg.total_len] return msg @_set_msg_type(ofproto.OFPT_FLOW_MOD) class OFPFlowMod(MsgBase): def __init__(self, datapath, match, cookie, command, idle_timeout=0, hard_timeout=0, priority=ofproto.OFP_DEFAULT_PRIORITY, buffer_id=0xffffffff, out_port=ofproto.OFPP_NONE, flags=0, actions=None): if actions is None: actions = [] super(OFPFlowMod, self).__init__(datapath) self.match = match self.cookie = cookie self.command = command self.idle_timeout = idle_timeout self.hard_timeout = hard_timeout self.priority = priority self.buffer_id = buffer_id self.out_port = out_port self.flags = flags self.actions = actions def _serialize_body(self): offset = ofproto.OFP_HEADER_SIZE self.match.serialize(self.buf, offset) # 封裝的起點是offset offset += ofproto.OFP_MATCH_SIZE # 按照ofproto.OFP_FLOW_MOD_PACK_STR0的格式,將對應的字段封裝到self.buf中 msg_pack_into(ofproto.OFP_FLOW_MOD_PACK_STR0, self.buf, offset, self.cookie, self.command, self.idle_timeout, self.hard_timeout, self.priority, self.buffer_id, self.out_port, self.flags) offset = ofproto.OFP_FLOW_MOD_SIZE if self.actions is not None: for a in self.actions: # 序列化action a.serialize(self.buf, offset) offset += a.len
此模塊代碼量大,包括OpenFlow協議對應版本內容的完全描述。分類上可分為解析和序列化封裝兩個重點內容。讀者在閱讀源碼時可根據需求閱讀片段即可。
Inet & ether
這兩個模塊非常簡單,ether定義了常用的以太網的協議類型及其對應的代碼;inet定義了IP協議族中不同協議的端口號,如TCP=6。
oxm_field
在1.3等高版本OpenFlow中,使用到了oxm\_field的概念。oxm全稱為OpenFlow Extensible Match。當OpenFlow逐漸發展成熟,flow的match域越來越多。然而很多通信場景下使用到的匹配字段很少,甚至只有一個。OXM是一種TLV格式,使用OXM可以在下發流表時僅攜帶使用到的match域內容,而放棄剩余的大量的match域。當使用的match域較少時,統計概率上會減少報文傳輸的字節數。
nx_match
該文件定義了nicira extensible match的相關內容。
ofp_event
這個模塊的位置並不再ofproto,而位於controller目錄下。controller模塊下的event定義了基礎的事件基類。ofp\_event模塊完成了OpenFlow報文到event的生成過程。模塊中定義了EventOFPMsgBase(event.EventBase)類和\_ofp\_msg\_name\_to\_ev\_name(msg\_name)等函數的定義。相關函數都非常的簡單,可從函數名了解到其功能。示例代碼如下:
def _ofp_msg_name_to_ev_name(msg_name):
return 'Event' + msg_name
Struct lib
Python的struct庫是一個簡單的,高效的數據封裝\解封裝的庫。該庫主要包含5個函數,介紹如下:
- struct.pack(fmt, v1, v2, ...): 將V1,V2等值按照對應的fmt(format)進行封裝。
- struct.pack_into(fmt, buffer, offset, v1, v2, ...):將V1,V2等值按照對應的fmt(format)封裝到buffer中,從初始位置offset開始。
- struct.unpack(fmt, string): 將string按照fmt的格式解封
- struct.unpack_from(fmt, buffer[offset=0,]): 按照fmt的格式,從offset開始將buffer解封。
- struct.calcsize(fmt): 計算對應的fmt的長度。
更詳細的封裝語法,請查看struct對應的鏈接。此處僅對常用語法進行介紹:
- !:大端存儲
- c: char
- B: 一個字節長度,unsigned char.
- H:兩個字節,16位
- I: 4個字節,int型
- Q: 64bits
- x: padding
- 3x:3個字節的padding
- 5s: 5字節的字符串
1.5 Ryu的處理流程
-
入口函數執行流程

-
事件處理流程





-
補充說明

1.6 ryu運行
從main函數入手,講述RYU的ryuapp基類細節、app_manager類如何load apps,注冊並運行application,Event的產生以及分發,還有最重要的應用ofp_handler。
main()
RYU的main函數在ryu/cmd/manager.py文件中,部分內容如下:
def main(args=None, prog=None):
try: CONF(args=args, prog=prog, project='ryu', version='ryu-manager %s' % version, default_config_files=['/usr/local/etc/ryu/ryu.conf']) except cfg.ConfigFilesNotFoundError: CONF(args=args, prog=prog, project='ryu', version='ryu-manager %s' % version) log.init_log() #初始化打印log if CONF.pid_file: import os with open(CONF.pid_file, 'w') as pid_file: pid_file.write(str(os.getpid())) app_lists = CONF.app_lists + CONF.app # keep old behaivor, run ofp if no application is specified. if not app_lists: app_lists = ['ryu.controller.ofp_handler'] app_mgr = AppManager.get_instance() #在AppManager類中獲取實例 app_mgr.load_apps(app_lists) #加載App contexts = app_mgr.create_contexts() #創建運行環境,"dpset"/"wsgi" services = [] services.extend(app_mgr.instantiate_apps(**contexts))
#啟動App線程,App實例化
#ryu.controller.dpset.DPSet / rest_firewall.RestFirewallAPI / ryu.controller.ofp_handler.OFPHandler webapp = wsgi.start_service(app_mgr) #webapp啟動 if webapp: thr = hub.spawn(webapp) services.append(thr) try: hub.joinall(services) #調用t.wait(),執行等待,wait()方法使當前線程暫停執行並釋放對象鎖標志
#循環join,直到有異常或者外部中斷推遲
finally: app_mgr.close()
首先從CONF文件中讀取出app list。如果ryu-manager 命令任何參數,則默認應用為ofp_handler應用。緊接着實例化一個AppManager對象,調用load_apps函數將應用加載。調用create_contexts函數創建對應的contexts, 然后調用instantiate_apps函數將app_list和context中的app均實例化。啟動wsgi架構,提供web應用。最后將所有的應用作為任務,作為coroutine的task去執行,joinall使得程序必須等待所有的task都執行完成才可以退出程序。最后調用close函數,關閉程序,釋放資源。以下的部分將以主函數中出現的調用順序為依據,展開講解。
OFPHandler
上文說到,如果沒有捕獲Application輸入,那么默認啟動的應用是OFPHandler應用。該應用主要用於處理OpenFlow消息。在start函數初始化運行了一個OpenFlowController實例。OpenFlowController類將在后續介紹。
def start(self): super(OFPHandler, self).start() return hub.spawn(OpenFlowController())
OFPHandler應用完成了基本的消息處理,如hello_handler:用於處理hello報文,協議版本的協商。其處理並不復雜,但是值得注意的一點是裝飾器:Decorator的使用。
Decorator
Python修飾器的函數式編程 Python Decorator可以看作是一種聲明,一種修飾。以下舉例參考自Coolshell。舉例如下:
@decorator
def foo():
pass
實際上等同於foo = decorator(foo), 而且它還被執行了。舉個例子:
def keyword(fn):
print "you %s me!" % fn.__name__.upper()
@keyword
def evol():
pass
運行之后,就會輸出 you EVOL me
多個decorator:
@decorator_a
@decorator_b
def foo():
pass
這相當於:
foo = decorator_a(decorator_b(foo))
而帶參數的decorator:
@decorator(arg1, arg2)
def foo():
pass
相當於
foo = decorator(arg1,arg2)(foo)
decorator(arg1,arg2)將生成一個decorator。
class式的 Decorator
class myDecorator(object): def __init__(self, fn): print "inside myDecorator.__init__()" self.fn = fn def __call__(self): self.fn() print "inside myDecorator.__call__()" @myDecorator def aFunction(): print "inside aFunction()" print "Finished decorating aFunction()" aFunction()
#結果:
>>>
inside myDecorator.__init__()
Finished decorating aFunction()
inside aFunction()
inside myDecorator.__call__()
>>>
@decorator使用時,__init__被調用,當function被調用是,執行__call__函數,而不執行function,所以在__call__函數中需要寫出self.fn = fn,更多內容可以直接訪問Python Decorator Library。
OpenFlowController
前一部分提到OFPHandle的start函數會將OpenFlowController啟動。本小節介紹OpenFlowController類。該類的定義在ryu/cmd/controller.py文件中。OpenFlowController.__call__()函數啟動了server_loop()函數,該函數實例化了hub.py中的StreamServer類,並將handler函數初始化為datapath_connection_factory函數,並調用serve_forever(),不斷進行socket的監聽。StreamServer定義如下:
class StreamServer(object): def __init__(self, listen_info, handle=None, backlog=None, spawn='default', **ssl_args): assert backlog is None assert spawn == 'default' if ':' in listen_info[0]: self.server = eventlet.listen(listen_info, family=socket.AF_INET6) else: self.server = eventlet.listen(listen_info) if ssl_args: def wrap_and_handle(sock, addr): ssl_args.setdefault('server_side', True) handle(ssl.wrap_socket(sock, **ssl_args), addr) self.handle = wrap_and_handle else: self.handle = handle def serve_forever(self): while True: sock, addr = self.server.accept() spawn(self.handle, sock, addr)
Datapath
Datapath類在RYU中極為重要,每當一個datapath實體與控制器建立連接時,就會實例化一個Datapath的對象。 該類中不僅定義了許多的成員變量用於描述一個datapath,還管理控制器與該datapath通信的數據收發。其中_recv_loop函數完成數據的接收與解析,事件的產生與分發。
@_deactivate def _recv_loop(self): buf = bytearray() required_len = ofproto_common.OFP_HEADER_SIZE count = 0 while self.is_active: ret = self.socket.recv(required_len) if len(ret) == 0: self.is_active = False break buf += ret while len(buf) >= required_len: (version, msg_type, msg_len, xid) = ofproto_parser.header(buf) required_len = msg_len if len(buf) < required_len: break msg = ofproto_parser.msg(self, version, msg_type, msg_len, xid, buf) # 解析報文 # LOG.debug('queue msg %s cls %s', msg, msg.__class__) if msg: ev = ofp_event.ofp_msg_to_ev(msg) # 產生事件 self.ofp_brick.send_event_to_observers(ev, self.state) # 事件分發 dispatchers = lambda x: x.callers[ev.__class__].dispatchers handlers = [handler for handler in self.ofp_brick.get_handlers(ev) if self.state in dispatchers(handler)] for handler in handlers: handler(ev) buf = buf[required_len:] required_len = ofproto_common.OFP_HEADER_SIZE # We need to schedule other greenlets. Otherwise, ryu # can't accept new switches or handle the existing # switches. The limit is arbitrary. We need the better # approach in the future. count += 1 if count > 2048: count = 0 hub.sleep(0)
@_deactivate修飾符作用在於在Datapath斷開連接之后,將其狀態is_active置為False。self.ofp_brick.send_event_to_observers(ev, self.state) 語句完成了事件的分發。self.brick的初始化語句可以在self.__init__函數中找到:
self.ofp_brick = ryu.base.app_manager.lookup_service_brick('ofp_event')
由上可知,self.ofp_brick實際上是由service_brick(中文可以成為:服務鏈表?)中的“ofp_event”服務賦值的。在每一個app中,使用@set_ev_cls(ev_cls,dispatchers)時,就會將實例化ofp_event模塊,執行文件中最后一句:
handler.register_service('ryu.controller.ofp_handler')
register_service函數實體如下:
def register_service(service):
"""
Register the ryu application specified by 'service' as
a provider of events defined in the calling module.
If an application being loaded consumes events (in the sense of
set_ev_cls) provided by the 'service' application, the latter
application will be automatically loaded.
This mechanism is used to e.g. automatically start ofp_handler if
there are applications consuming OFP events.
"""
frm = inspect.stack()[1]
m = inspect.getmodule(frm[0])
m._SERVICE_NAME = service
其中inspect.stack()[1]返回了調用此函數的caller, inspect.getmodule(frm[0])返回了該caller的模塊,當前例子下,module=ofp_event。
我們可以通過ryu-manager --verbose來查看到輸出信息,從而印證這一點。
:~/ryu/ryu/app$ ryu-manager --verbose
loading app ryu.controller.ofp_handler
instantiating app ryu.controller.ofp_handler of OFPHandler
BRICK ofp_event
CONSUMES EventOFPErrorMsg
CONSUMES EventOFPEchoRequest
CONSUMES EventOFPPortDescStatsReply
CONSUMES EventOFPHello
CONSUMES EventOFPSwitchFeatures
所以當運行ofp_handler應用時,就會注冊ofp_event service,為后續的應用提供服務。分發事件之后,還要處理自身訂閱的事件,所以首先找到符合當前state的caller,然后調用handler。_caller類可以在handler.py文件中找到,包含dispatchers和ev_source兩個成員變量。前者用於描述caller需要的state,后者是event產生者的模塊名稱。
對應的發送循環由_send_loop完成。self.send_p是一個深度為16的發送queue。
@_deactivate
def _send_loop(self):
try:
while self.is_active:
buf = self.send_q.get()
self.socket.sendall(buf)
finally:
q = self.send_q
# first, clear self.send_q to prevent new references.
self.send_q = None
# there might be threads currently blocking in send_q.put().
# unblock them by draining the queue.
try:
while q.get(block=False):
pass
except hub.QueueEmpty:
pass
serve函數完成了發送循環的啟動和接受循環的啟動。啟動一個coroutine去執行self._send_loop(), 然后馬上主動發送hello報文到datapath(可以理解為交換網橋:Bridge),最后執行self._recv_loop()。
def serve(self):
send_thr = hub.spawn(self._send_loop)
# send hello message immediately
hello = self.ofproto_parser.OFPHello(self)
self.send_msg(hello)
try:
self._recv_loop()
finally:
hub.kill(send_thr)
hub.joinall([send_thr])
而serve函數又在datapath_connection_factory函數中被調用。當然向外提供完整功能的API就是這個。所以在OpenFlowController類中可以看到在初始化server實例的時候,handler賦值為datapath_connection_factory。其中使用到的contextlib module具體內容不作介紹,讀者可自行學習。
def datapath_connection_factory(socket, address):
LOG.debug('connected socket:%s address:%s', socket, address)
with contextlib.closing(Datapath(socket, address)) as datapath:
try:
datapath.serve()
except:
# Something went wrong.
# Especially malicious switch can send malformed packet,
# the parser raise exception.
# Can we do anything more graceful?
if datapath.id is None:
dpid_str = "%s" % datapath.id
else:
dpid_str = dpid_to_str(datapath.id)
LOG.error("Error in the datapath %s from %s", dpid_str, address)
raise
到此為止,OFPHandler應用的功能實現介紹完畢。RYU啟動時,需要啟動OFPHandler,才能完成數據的收發和解析。更多的上層應用邏輯都是在此基礎之上進行的。若要開發APP則需要繼承RyuApp類,並完成observer監聽事件,以及注冊handler去完成事件處理。
RyuApp
RyuApp類是RYU封裝好的APP基類,用戶只需要繼承該類,就可以方便地開發應用。而注冊對應的observer和handler都使用@derocator的形式,使得開發非常的簡單高效,這也是Python的優點之一吧。RyuApp類的定義在ryu/base/app_manager.py文件中。該文件實現了兩個類RyuApp和AppManager。前者用於定義APP基類,為應用開發提供基本的模板,后者用於Application的管理,加載應用,運行應用,消息路由等功能。
app_manager.py文件中import了instpect和itertools module,從而使得開發更方便簡潔。inspect模塊提供了一些有用的方法,用於類型檢測,獲取內容,檢測是否可迭代等功能。itertools則是一個關於迭代器的模塊,可以提供豐富的迭代器類型,在數據處理上尤其有用。
_CONTEXT
這是一個極其難理解的概念。博主的理解是,_CONTEXT內存儲着name:class的key value pairs。為什么需要存儲這個內容?實際上這個_CONTEXT攜帶的信息是所有本APP需要依賴的APP。需要在啟動本應用之前去啟動,以滿足依賴的,比如一個simple_switch.py的應用,如果沒有OFPHandler應用作為數據收發和解析的基礎的話,是無法運行的。具體文檔如下:
_CONTEXTS = {} """ A dictionary to specify contexts which this Ryu application wants to use. Its key is a name of context and its value is an ordinary class which implements the context. The class is instantiated by app_manager and the instance is shared among RyuApp subclasses which has _CONTEXTS member with the same key. A RyuApp subclass can obtain a reference to the instance via its __init__'s kwargs as the following. Example:: _CONTEXTS = { 'network': network.Network } def __init__(self, *args, *kwargs): self.network = kwargs['network'] """
_EVENTS
用於記錄本應用會產生的event。但是當且僅當定義該event的語句在其他模塊時才會被使用到。
self.__init__
__init__函數中初始化了許多重要的成員變量,如self.event_handler用於記錄向外提供的事件處理句柄,而self.observer則剛好相反,用於通知app_manager本應用監聽何種類型的事件。self.event是事件隊列。
def __init__(self, *_args, **_kwargs):
super(RyuApp, self).__init__()
self.name = self.__class__.__name__
self.event_handlers = {} # ev_cls -> handlers:list
self.observers = {} # ev_cls -> observer-name -> states:set
self.threads = []
self.events = hub.Queue(128)
if hasattr(self.__class__, 'LOGGER_NAME'):
self.logger = logging.getLogger(self.__class__.LOGGER_NAME)
else:
self.logger = logging.getLogger(self.name)
self.CONF = cfg.CONF
# prevent accidental creation of instances of this class outside RyuApp
class _EventThreadStop(event.EventBase):
pass
self._event_stop = _EventThreadStop()
self.is_active = True
self.start
start函數將啟動coroutine去處理_event_loop,並將其加入threads字典中。
self._event_loop
_event_loop函數用於啟動事件處理循環,通過調用self.get_handlers(ev, state)函數來找到事件對應的handler,然后處理事件。
def get_handlers(self, ev, state=None): """Returns a list of handlers for the specific event. :param ev: The event to handle. :param state: The current state. ("dispatcher") If None is given, returns all handlers for the event. Otherwise, returns only handlers that are interested in the specified state. The default is None. """ ev_cls = ev.__class__ handlers = self.event_handlers.get(ev_cls, []) if state is None: return handlers def _event_loop(self): while self.is_active or not self.events.empty(): ev, state = self.events.get() if ev == self._event_stop: continue handlers = self.get_handlers(ev, state) for handler in handlers: handler(ev)
event dispatch
應用中可以通過@set_ev_cls修飾符去監聽某些事件。當產生event時,通過event去get observer,得到對應的觀察者,然后再使用self.send_event函數去發送事件。在這里,實際上就是直接往self.event隊列中put event。
def _send_event(self, ev, state):
self.events.put((ev, state))
def send_event(self, name, ev, state=None):
"""
Send the specified event to the RyuApp instance specified by name.
"""
if name in SERVICE_BRICKS:
if isinstance(ev, EventRequestBase):
ev.src = self.name
LOG.debug("EVENT %s->%s %s" %
(self.name, name, ev.__class__.__name__))
SERVICE_BRICKS[name]._send_event(ev, state)
else:
LOG.debug("EVENT LOST %s->%s %s" %
(self.name, name, ev.__class__.__name__))
def send_event_to_observers(self, ev, state=None):
"""
Send the specified event to all observers of this RyuApp.
"""
for observer in self.get_observers(ev, state):
self.send_event(observer, ev, state)
其他函數如注冊handler函數:register_handler,注冊監聽函數:register_observer等都是非常簡單直白的代碼,不再贅述。
AppManager
AppManager類是RYU應用的調度中心。用於管理應用的添加刪除,消息路由等等功能。
首先從啟動函數開始介紹,我們可以看到run_apps函數中的代碼和前文提到的main函數語句基本一樣。首先獲取一個對象,然后加載對應的apps,然后獲取contexts,context中其實包含的是本應用所需要的依賴應用。所以在調用instantiate_apps函數時,將app_lists內的application和contexts中的services都實例化,然后啟動協程去運行這些服務。
@staticmethod
def run_apps(app_lists):
"""Run a set of Ryu applications
A convenient method to load and instantiate apps.
This blocks until all relevant apps stop.
"""
app_mgr = AppManager.get_instance()
app_mgr.load_apps(app_lists)
contexts = app_mgr.create_contexts()
services = app_mgr.instantiate_apps(**contexts)
webapp = wsgi.start_service(app_mgr)
if webapp:
services.append(hub.spawn(webapp))
try:
hub.joinall(services)
finally:
app_mgr.close()
load_apps
首先從創建一個apps_lists的生成器(個人理解應該是生成器而非迭代器)。在while循環中,每次pop一個應用進行處理,然后將其本身和其context中的內容添加到services中,再去調用get_dependent_services函數獲取其依賴應用,最后將所有的依賴services添加到app_lists中,循環至最終app_lists內元素全都pop出去,完成application的加載。
def load_apps(self, app_lists): app_lists = [app for app in itertools.chain.from_iterable(app.split(',') for app in app_lists)] while len(app_lists) > 0: app_cls_name = app_lists.pop(0) context_modules = map(lambda x: x.__module__, self.contexts_cls.values()) if app_cls_name in context_modules: continue LOG.info('loading app %s', app_cls_name) cls = self.load_app(app_cls_name) if cls is None: continue self.applications_cls[app_cls_name] = cls services = [] for key, context_cls in cls.context_iteritems(): v = self.contexts_cls.setdefault(key, context_cls) assert v == context_cls context_modules.append(context_cls.__module__) if issubclass(context_cls, RyuApp): services.extend(get_dependent_services(context_cls)) # we can't load an app that will be initiataed for # contexts. for i in get_dependent_services(cls): if i not in context_modules: services.append(i) if services: app_lists.extend([s for s in set(services) if s not in app_lists])
create_contexts
context實例化函數將context中name:service class鍵值對的內容實例化成對應的對象,以便加入到services 列表中,從而得到加載。首先從列表中取出對應數據,然后判斷是否時RyuApp的子類,是則實例化,否則直接賦值service class。load_app函數在讀取的時候還會再次判斷是否是RyuApp子類。
def create_contexts(self):
for key, cls in self.contexts_cls.items():
if issubclass(cls, RyuApp):
# hack for dpset
context = self._instantiate(None, cls)
else:
context = cls()
LOG.info('creating context %s', key)
assert key not in self.contexts
self.contexts[key] = context
return self.contexts
instantiate_apps
此函數調用了self._instantiate函數,在_instantiate函數中又調用了register_app()函數,此函數將app添加到SERVICE_BRICKS字典之中,然后繼續調用了ryu.controller.handler 中的 register_instance函數,最終完成了應用的注冊。此后繼續調用self._update_bricks函數完成了服務鏈表的更新,最后啟動了所有的應用。
def instantiate_apps(self, *args, **kwargs): for app_name, cls in self.applications_cls.items(): self._instantiate(app_name, cls, *args, **kwargs) self._update_bricks() self.report_bricks() threads = [] for app in self.applications.values(): t = app.start() if t is not None: threads.append(t) return threads def _instantiate(self, app_name, cls, *args, **kwargs): # for now, only single instance of a given module # Do we need to support multiple instances? # Yes, maybe for slicing. #LOG.info('instantiating app %s of %s', app_name, cls.__name__) if hasattr(cls, 'OFP_VERSIONS') and cls.OFP_VERSIONS is not None: ofproto_protocol.set_app_supported_versions(cls.OFP_VERSIONS) if app_name is not None: assert app_name not in self.applications app = cls(*args, **kwargs) register_app(app) assert app.name not in self.applications self.applications[app.name] = app return app
_update_bricks
此函數完成了更新service_bricks的功能。首先從獲取到service實例,然后再獲取到service中的方法,若方法有callers屬性,即使用了@set_ev_cls的裝飾符,擁有了calls屬性。(caller類中的ev_source和dispatcher成員變量描述了產生該event的source module, dispatcher描述了event需要在什么狀態下才可以被分發。如:HANDSHAKE_DISPATCHER,CONFIG_DISPATCHER等。)最后調用register_observer函數注冊了observer。
def _update_bricks(self):
for i in SERVICE_BRICKS.values():
for _k, m in inspect.getmembers(i, inspect.ismethod):
if not hasattr(m, 'callers'):
continue
for ev_cls, c in m.callers.iteritems():
if not c.ev_source:
continue
brick = _lookup_service_brick_by_mod_name(c.ev_source)
if brick:
brick.register_observer(ev_cls, i.name,
c.dispatchers)
# allow RyuApp and Event class are in different module
for brick in SERVICE_BRICKS.itervalues():
if ev_cls in brick._EVENTS:
brick.register_observer(ev_cls, i.name,
c.dispatchers)
ryu.controller.handler.register_instance
以上的部分介紹了App的注冊,observer的注冊,handler的查找和使用,但是,始終沒有提到handler在何處注冊。實際上,handler的注冊在register_instance部分完成了。為什么他的位置在handler文件,而不在app_manager文件呢?個人認為可能是為了給其他非Ryu APP的模塊使用吧。
def register_instance(i):
for _k, m in inspect.getmembers(i, inspect.ismethod):
# LOG.debug('instance %s k %s m %s', i, _k, m)
if _has_caller(m):
for ev_cls, c in m.callers.iteritems():
i.register_handler(ev_cls, m)
2. RYU實踐
2.1 二層交換機
http://ryu.readthedocs.org/en/latest/writing_ryu_app.html
第一步:
ryu.base import app_manager:該文件中定義了RyuApp基類,開發APP需要繼承該基類;
保存為L2Switch.py 運行: ryu-manager L2Switch.py
from ryu.base import app_manager
class L2Switch(app_manager.RyuApp):
def __init__(self, *args, **kwargs):
super(L2Switch, self).__init__(*args, **kwargs)
第二步:
ofp_event完成了事件的定義,從而我們可以在函數中注冊handler,監聽事件,並作出回應。
packet_in_handler方法用於處理packet_in事件。
@set_ev_cls修飾符用於告知RYU,被修飾的函數應該被調用。第一個參數表示事件發生時應該調用的函數,第二個參數告訴交換機只有在交換機握手完成之后,才可以被調用。
數據操作:
- ev.msg:每一個事件類ev中都有msg成員,用於攜帶觸發事件的數據包。
- msg.datapath:已經格式化的msg其實就是一個packet_in報文,msg.datapath直接可以獲得packet_in報文的datapath結構。datapath用於描述一個交換網橋。也是和控制器通信的實體單元。datapath.send_msg()函數用於發送數據到指定datapath。通過datapath.id可獲得dpid數據,在后續的教程中會有使用。
- datapath.ofproto對象是一個OpenFlow協議數據結構的對象,成員包含OpenFlow協議的數據結構,如動作類型OFPP_FLOOD。
- datapath.ofp_parser則是一個按照OpenFlow解析的數據結構。
- actions是一個列表,用於存放action list,可在其中添加動作。
- 通過ofp_parser類,可以構造構造packet_out數據結構。括弧中填寫對應字段的賦值即可。
如果datapath.send_msg()函數發送的是一個OpenFlow的數據結構,RYU將把這個數據發送到對應的datapath。
from ryu.base import app_manager
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER
from ryu.controller.handler import set_ev_cls
class L2Switch(app_manager.RyuApp):
def __init__(self, *args, **kwargs):
super(L2Switch, self).__init__(*args, **kwargs)
@set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
def packet_in_handler(self, ev):
msg = ev.msg
datapath = msg.datapath
ofp = datapath.ofproto
ofp_parser = datapath.ofproto_parser
actions = [ofp_parser.OFPActionOutput(ofp.OFPP_FLOOD)]
out = ofp_parser.OFPPacketOut(
datapath=datapath, buffer_id=msg.buffer_id, in_port=msg.in_port,
actions=actions)
datapath.send_msg(out)
第三步:
import struct
import logging
from ryu.base import app_manager
from ryu.controller import mac_to_port
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.ofproto import ofproto_v1_0
from ryu.lib.mac import haddr_to_bin
from ryu.lib.packet import packet
from ryu.lib.packet import ethernet
class L2Switch(app_manager.RyuApp):
OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION]#define the version of OpenFlow
def __init__(self, *args, **kwargs):
super(L2Switch, self).__init__(*args, **kwargs)
self.mac_to_port = {}
def add_flow(self, datapath, in_port, dst, actions):
ofproto = datapath.ofproto
match = datapath.ofproto_parser.OFPMatch(
in_port = in_port, dl_dst = haddr_to_bin(dst))
mod = datapath.ofproto_parser.OFPFlowMod(
datapath = datapath, match = match, cookie = 0,
command = ofproto.OFPFC_ADD, idle_timeout = 10,hard_timeout = 30,
priority = ofproto.OFP_DEFAULT_PRIORITY,
flags =ofproto.OFPFF_SEND_FLOW_REM, actions = actions)
datapath.send_msg(mod)
@set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
def packet_in_handler(self, ev):
msg = ev.msg
datapath = msg.datapath
ofproto = datapath.ofproto
pkt = packet.Packet(msg.data)
eth = pkt.get_protocol(ethernet.ethernet)
dst = eth.dst
src = eth.src
dpid = datapath.id #get the dpid
self.mac_to_port.setdefault(dpid, {})
self.logger.info("packet in %s %s %s %s", dpid, src, dst , msg.in_port)
#To learn a mac address to avoid FLOOD next time.
self.mac_to_port[dpid][src] = msg.in_port
out_port = ofproto.OFPP_FLOOD
#Look up the out_port
if dst in self.mac_to_port[dpid]:
out_port = self.mac_to_port[dpid][dst]
ofp_parser = datapath.ofproto_parser
actions = [ofp_parser.OFPActionOutput(out_port)]
if out_port != ofproto.OFPP_FLOOD:
self.add_flow(datapath, msg.in_port, dst, actions)
#We always send the packet_out to handle the first packet.
packet_out = ofp_parser.OFPPacketOut(datapath = datapath, buffer_id = msg.buffer_id,
in_port = msg.in_port, actions = actions)
datapath.send_msg(packet_out)
#To show the message of ports' status.
@set_ev_cls(ofp_event.EventOFPPortStatus, MAIN_DISPATCHER)
def _port_status_handler(self, ev):
msg = ev.msg
reason = msg.reason
port_no = msg.desc.port_no
ofproto = msg.datapath.ofproto
if reason == ofproto.OFPPR_ADD:
self.logger.info("port added %s", port_no)
elif reason == ofproto.OFPPR_DELETE:
self.logger.info("port deleted %s", port_no)
elif reason == ofproto.OFPPR_MODIFY:
self.logger.info("port modified %s", port_no)
else:
self.logger.info("Illeagal port state %s %s", port_no, reason)
2.2 simple-switch.py 的APP測試
在mininet上模擬一台交換機(s1)三台主機(h1,h2,h3),然后遠端連接RYU控制器,使用127.0.0.1,和6633端口建立連接
第一,在RYU控制器開啟simple-switch.py的APP,輸入命令:ryu-manager simple-switch.py:
第二,在另外一個終端上建立mininet模擬拓撲,輸入命令:mn --topo single,3 --mac --switch ovsk --controller remote
然后在RYU的那個終端就會顯示連接的建立,同時,也會同步一些交換機和控制器建立連接的信息,如圖:

此時,在交換機的轉發流表是空的,因此此時主機之間是不可以通信的,在使用h1去ping h2的時候,就會自動建立流表

注意是先進行廣播,然后建立反方向的流表,然后建立正方向的流表。流表如圖:

資料出處:
http://ryu.readthedocs.org/en/latest/api_ref.html
http://www.sdnlab.com/6395.html
http://www.sdnlab.com/12838.html
