P4Runtime--基於mininet和bmv2環境的解析


博客園protobuf文件的高亮好像有些問題。。。

實驗環境

P4 tutorials的Ubuntu 20.04虛擬機

所有運行的文件都在/home/p4/tutorials/及其子目錄下

實驗參考材料

p4 tutorials倉庫 https://github.com/p4lang/tutorials

p4 runtime的protobuf文件 https://github.com/p4lang/p4runtime/tree/main/proto

PI(p4runtime的服務端實現)的protobuf文件 https://github.com/p4lang/PI/blob/main/proto/p4/tmp/p4config.proto

p4 runtime文檔 https://p4.org/p4-spec/p4runtime/main/P4Runtime-Spec.html

交換機的創建(使用grpc)

首先在topology.json中聲明交換機,在執行make run以后,會執行run_exercise.py(具體函數調用鏈略),之后進入P4RuntimeSwitch類的__init__()方法.

   # p4runtime_switch.py
    # 函數體部分只留下關注的部分,其余部分有所刪減,后同
    # 該類為P4RuntimeSwitch,繼承了P4Switch類,P4Switch類又繼承了Switch類,Switch類繼承Node類(代表mininet的節點)
    "BMv2 switch with gRPC support"
    next_grpc_port = 50051


    def __init__(self, name, sw_path = None, json_path = None,
                 grpc_port = None,
                 thrift_port = None,
                 pcap_dump = False,
                 log_console = False,
                 verbose = False,
                 device_id = None,
                 enable_debugger = False,
                 log_file = None,
                 **kwargs):
		# name是指定的交換機名字,會在Node類的構造方法中被初始化
        Switch.__init__(self, name, **kwargs)
        
		# 這個json文件是指向p4c后p4的編譯器編譯出來的json文件,位於build目錄下
        if json_path is not None:
            # make sure that the provided JSON file exists
            if not os.path.isfile(json_path):
                error("Invalid JSON file: {}\n".format(json_path))
                exit(1)
            self.json_path = json_path
        else:
            self.json_path = None
		
        # 如果傳遞了port,就用傳遞的,否則是用靜態變量next_grpc_port(初始值50051),並讓否則是用靜態變量next_grpc_port++,
        if grpc_port is not None:
            self.grpc_port = grpc_port
        else:
            self.grpc_port = P4RuntimeSwitch.next_grpc_port
            P4RuntimeSwitch.next_grpc_port += 1
		
        # 如果傳遞了device_id,就用傳遞的,否則是用靜態變量P4Switch類中的靜態變量device_id(初始值0),並讓否則是用靜態變量device_id++
        if device_id is not None:
            self.device_id = device_id
            P4Switch.device_id = max(P4Switch.device_id, device_id)
        else:
            self.device_id = P4Switch.device_id
            P4Switch.device_id += 1
		# 幾個比較重要的參數就是name用於標識名字,port標識連接端口,device_id用於標識設備

建立與交換機的連接

從教程中的p4runtime這一節的mycontroller.py中,可以看到官方封裝了一個Bmv2SwitchConnection類方法來幫助建立連接,這里的前三個參數和建立交換機時的要一致,最后一個參數為輸出的日志文件位置

#/home/p4/tutorials/exercises/p4runtime/mycontroller.py     	  
    	s1 = p4runtime_lib.bmv2.Bmv2SwitchConnection(
            name='s1',
            address='127.0.0.1:50051',
            device_id=0,
            proto_dump_file='logs/s1-p4runtime-requests.txt')

追蹤Bmv2SwitchConnection類,發現其繼承了SwitchConnection類並重寫了buildDeviceConfig方法。

def buildDeviceConfig(bmv2_json_file_path=None):
    "Builds the device config for BMv2"
    device_config = p4config_pb2.P4DeviceConfig()
    device_config.reassign = True
    with open(bmv2_json_file_path) as f:
        device_config.device_data = f.read().encode('utf-8')
    return device_config


class Bmv2SwitchConnection(SwitchConnection):
    def buildDeviceConfig(self, **kwargs):
        return buildDeviceConfig(**kwargs)

進入SwitchConnection類,之前調用的方法正是SwitchConnection類的構造方法

    def __init__(self, name=None, address='127.0.0.1:50051', device_id=0,
                 proto_dump_file=None):
        self.name = name
        # 交換機的ip及端口號
        self.address = address
        self.device_id = device_id
        self.p4info = None
        # 以下大部分代碼為grpc的常規寫法,不做解析
        self.channel = grpc.insecure_channel(self.address)
        if proto_dump_file is not None:
            interceptor = GrpcRequestLogger(proto_dump_file)
            self.channel = grpc.intercept_channel(self.channel, interceptor)
        # grpc使用的stub,可以認為是一個api的接口
        self.client_stub = p4runtime_pb2_grpc.P4RuntimeStub(self.channel)
        self.requests_stream = IterableQueue()
        self.stream_msg_resp = self.client_stub.StreamChannel(iter(self.requests_stream))
        self.proto_dump_file = proto_dump_file
        # connections為全局變量,維護所有連接的引用
        connections.append(self)

注意self.client_stub.StreamChannel(iter(self.requests_stream))這個方法,StreamChannel是一個rpc服務,按照p4runtime.proto文件中的說法,他有如下作用:

  1. 初始化交換機和控制器的連接
  2. 探測交換機的存活(心跳檢測)
  3. 發送/接收包
  // Represents the bidirectional stream between the controller and the
  // switch (initiated by the controller), and is managed for the following
  // purposes:
  // - connection initiation through client arbitration
  // - indicating switch session liveness: the session is live when switch
  //   sends a positive client arbitration update to the controller, and is
  //   considered dead when either the stream breaks or the switch sends a
  //   negative update for client arbitration
  // - the controller sending/receiving packets to/from the switch
  // - streaming of notifications from the switch
  rpc StreamChannel(stream StreamMessageRequest)
      returns (stream StreamMessageResponse) {
  }

到目前為止,控制平面與交換機就成功的建立連接了。

這里統一說明一點,如果有參數不正確或其他錯誤,GRPC會返回相應的錯誤類型,具體錯誤類型可以查看p4runtime文檔,以后不再贅述。

設置主節點

接下來mycontroller.py調用了每個交換機的MasterArbitrationUpdate()方法,該方法會將這個控制器設置為master控制器。之所以要這么做,個人猜測是因為官方文檔中表明p4交換機支持多個controller組成高可用集群,這時候就需要制定一個主控制器來進行控制(即便只有一個控制器也得手動設置主控制器)

image-20211202194800427

進入MasterArbitrationUpdate()方法

    def MasterArbitrationUpdate(self, dry_run=False, **kwargs):
        # 聲明StreamMessageRequest對象
        request = p4runtime_pb2.StreamMessageRequest()
        request.arbitration.device_id = self.device_id
        request.arbitration.election_id.high = 0
        request.arbitration.election_id.low = 1

        if dry_run:
            print("P4Runtime MasterArbitrationUpdate: ", request)
        else:
            self.requests_stream.put(request)
            for item in self.stream_msg_resp:
                return item # just one

查看相關的protobuf文件

message StreamMessageRequest {
  oneof update {
  	// 在該請求中,用的是MasterArbitrationUpdate
    MasterArbitrationUpdate arbitration = 1;
    PacketOut packet = 2;
    DigestListAck digest_ack = 3;
    .google.protobuf.Any other = 4;
  }
}

message MasterArbitrationUpdate {
  uint64 device_id = 1;
  // 如果不指定role_id,則默認具有所有權限
  // The role for which the primary client is being arbitrated. For use-cases
  // where multiple roles are not needed, the controller can leave this unset,
  // implying default role and full pipeline access.
  Role role = 2;
  // The stream RPC with the highest election_id is the primary. The 'primary'
  // controller instance populates this with its latest election_id. Switch
  // populates with the highest election ID it has received from all connected
  // controllers.
  Uint128 election_id = 3;
  // Switch populates this with OK for the client that is the primary, and
  // with an error status for all other connected clients (at every primary
  // client change). The controller does not populate this field.
  .google.rpc.Status status = 4;
}

對於單機使用而言,無需太關注這里的細節,只需要按照官方代碼即可。詳情可以查看p4runtime文檔的5. Client Arbitration and Controller Replication

將p4代碼和配置信息安裝到交換機中

#/home/p4/tutorials/exercises/p4runtime/mycontroller.py

# P4InfoHelper工具類會讀取並解析p4編譯器編譯得到的xxx.p4.p4info.txt
p4info_helper = p4runtime_lib.helper.P4InfoHelper(p4info_file_path)
# bmv2_json_file_path即p4編譯器編譯得到的xxx.json
# txt文件中主要記錄了p4程序的一些元信息,比如id等,json文件中則記載了具體的p4程序,比如stage,操作等,用於下發給交換機
s1.SetForwardingPipelineConfig(p4info=p4info_helper.p4info,
                               bmv2_json_file_path=bmv2_file_path)

以下是xxx.p4.p4info.txt文件內容,可以看到包含了要使用的交換機模型,表的id,名字,字段的id等信息

pkg_info {
  arch: "v1model"
}
tables {
  preamble {
    id: 37375156
    name: "MyIngress.ipv4_lpm"
    alias: "ipv4_lpm"
  }
  match_fields {
    id: 1
    name: "hdr.ipv4.dstAddr"
    bitwidth: 32
    match_type: LPM
  }
  action_refs {
    id: 28792405
  }
  action_refs {
    id: 25652968
  }
  action_refs {
    id: 21257015
  }
  size: 1024
}
tables {
  preamble {
    id: 38067515
    name: "MyIngress.icmp_lpm"
    alias: "icmp_lpm"
  }
  match_fields {
    id: 1
    name: "hdr.ipv4.dstAddr"
    bitwidth: 32
    match_type: LPM
  }
  action_refs {
    id: 25652968
  }
  action_refs {
    id: 21257015
  }
  size: 1024
}
actions {
  preamble {
    id: 21257015
    name: "NoAction"
    alias: "NoAction"
    annotations: "@noWarn(\"unused\")"
  }
}
actions {
  preamble {
    id: 25652968
    name: "MyIngress.drop"
    alias: "drop"
  }
}
actions {
  preamble {
    id: 28792405
    name: "MyIngress.ipv4_forward"
    alias: "ipv4_forward"
  }
  params {
    id: 1
    name: "dstAddr"
    bitwidth: 48
  }
  params {
    id: 2
    name: "port"
    bitwidth: 9
  }
}
type_info {
}

查看SwitchConnection類的SetForwardingPipelineConfig()方法源碼

    def SetForwardingPipelineConfig(self, p4info, dry_run=False, **kwargs):
        # 這里的buildDeviceConfig方法被Bmv2SwitchConnection類重寫過
        device_config = self.buildDeviceConfig(**kwargs)
        # 構造請求,參數詳見下方解釋
        request = p4runtime_pb2.SetForwardingPipelineConfigRequest()
        # 和設置主節點有關,單機情況視為固定參數
        request.election_id.low = 1
        request.device_id = self.device_id
        config = request.config
		# CopyFrom()和SerializeToString()都是Protobuf提供的函數,前者執行深拷貝,后者對該對象進行序列化
        config.p4info.CopyFrom(p4info)
        config.p4_device_config = device_config.SerializeToString()
		# 設置action,詳見下方解釋
        request.action = p4runtime_pb2.SetForwardingPipelineConfigRequest.VERIFY_AND_COMMIT
        # 干跑的概念一般用不上,更多的是用於測試參數是否構造正確,大部分情況是直接發送請求,這里只需關注else分支
        if dry_run:
            print("P4Runtime SetForwardingPipelineConfig:", request)
        else:
            # 發送請求
            self.client_stub.SetForwardingPipelineConfig(request)

查看protobuf文件

  // Sets the P4 forwarding-pipeline config.
  rpc SetForwardingPipelineConfig(SetForwardingPipelineConfigRequest)
      returns (SetForwardingPipelineConfigResponse) {
  }

message SetForwardingPipelineConfigRequest {
  enum Action {
    UNSPECIFIED = 0;

    VERIFY = 1;

    VERIFY_AND_SAVE = 2;

    VERIFY_AND_COMMIT = 3;

    COMMIT = 4;

    RECONCILE_AND_COMMIT = 5;
  }
  uint64 device_id = 1;
  uint64 role_id = 2 [deprecated=true];
  // 官方的例子中並未傳遞role字段,估計是單機測試不需要傳遞
  string role = 6;
  Uint128 election_id = 3;
  Action action = 4;
  ForwardingPipelineConfig config = 5;
}


message ForwardingPipelineConfig {
  // 這里存儲着要下發的p4程序,如果請求成功,p4程序就會被安裝到交換機上
  config.v1.P4Info p4info = 1;
  // Target-specific P4 configuration.
  bytes p4_device_config = 2;

  // cookie用於唯一標識一個配置
  message Cookie {
    uint64 cookie = 1;
  }
  Cookie cookie = 3;
}

// 這是一個空對象,也就是說,如果沒有返回錯誤,則SetForwardingPipelineConfig請求會返回一個空對象
message SetForwardingPipelineConfigResponse {
}

在解釋action之前,先來看ForwardingPipelineConfig中的p4_device_config字段,這個字段是一個bytes類型,在代碼中是通過config.p4_device_config = device_config.SerializeToString()來進行賦值的。這是因為交換機的config是與具體交換機有關的(Target-specific),不同型號的交換機可能會有不同的config,因此,這里不能聲明config中具體有哪些字段,必須序列化為字節數組傳遞給交換機。

接着來看action,因為交換機可以接受的配置不同,所以交換機必須要驗證自己能否接收這樣的配置信息,也要決定是暫存這份設置還是立即應用,所以衍生出了不同的action:

  • UNSPECIFIED: 保留字段,不會被使用
  • VERIFY: 僅對配置進行驗證,會返回驗證的結果
  • VERIFY_AND_SAVE:對配置進行驗證,如果驗證通過,則保存在交換機中
  • VERIFY_AND_COMMIT:對配置進行驗證,如果驗證通過,則保存並應用該配置
  • COMMIT:應用交換機保存的最新配置,如果使用該類型,則交換機中必須保存有一份配置且本次請求不能攜帶配置
  • RECONCILE_AND_COMMIT:和VERIFY_AND_COMMIT類似,不同點在於VERIFY_AND_COMMIT在應用是會清空目前的轉發狀態,而RECONCILE_AND_COMMIT則會保留(這里的轉發狀態可能指的是正在經過交換機處理的包?)

接下來查看與SetForwardingPipelineConfig對應的GetForwardingPipelineConfig方法,官方的工具類並未對這個請求作封裝,因此需要自行調用grpc服務。查看protobuf文件中對應的類型。

  rpc GetForwardingPipelineConfig(GetForwardingPipelineConfigRequest)
      returns (GetForwardingPipelineConfigResponse) {
  }
  
  message GetForwardingPipelineConfigRequest {
  enum ResponseType {
    //返回所有信息
    ALL = 0;
    //只返回用於唯一標識一個配置的cookie
    COOKIE_ONLY = 1;
    //返回p4程序的信息和cookie
    P4INFO_AND_COOKIE = 2;
    //返回交換機的配置信息和cookie
    DEVICE_CONFIG_AND_COOKIE = 3;
  }
  uint64 device_id = 1;
  ResponseType response_type = 2;
}

message GetForwardingPipelineConfigResponse {
  ForwardingPipelineConfig config = 1;
}

操作表項(TableEntry)

經過前面的步驟,我們已經成功地與p4交換機建立了連接,並且下發了p4代碼,接下來,就是通過控制平面下發對應的表項來控制p4交換機了。

寫請求

對於下發表項,switch.py中有封裝好的方法,而更新和刪除表項都只能自己寫grpc請求

 	# mycontroller.py
    # buildTableEntry方法中會讀取之前傳入的p4配置文件來獲取table_id
    table_entry = p4info_helper.buildTableEntry(
            table_name="MyIngress.ipv4_lpm",
            match_fields={
                "hdr.ipv4.dstAddr": ("10.0.2.2", 32)
            },
            action_name="MyIngress.ipv4_forward",
            action_params={
                "dstAddr": "08:00:00:00:02:22",
                "port": 2
            })
        s1.WriteTableEntry(table_entry)
        
        
        
     # switch.py
    def WriteTableEntry(self, table_entry, dry_run=False):
        request = p4runtime_pb2.WriteRequest()
        request.device_id = self.device_id
        request.election_id.low = 1
        # 添加一條表項,如果使用官方封裝的方法,一次只能下發一條表項,事實上,底層的接口可以一次性下發多條表項
        update = request.updates.add()
        if table_entry.is_default_action:
            update.type = p4runtime_pb2.Update.MODIFY
        else:
            update.type = p4runtime_pb2.Update.INSERT
        update.entity.table_entry.CopyFrom(table_entry)
        if dry_run:
            print("P4Runtime Write:", request)
        else:
            self.client_stub.Write(request)

查看protobuf文件

rpc Write(WriteRequest) returns (WriteResponse) {
  }
 
 
 message WriteRequest {
  uint64 device_id = 1;
  uint64 role_id = 2 [deprecated=true];
  string role = 6;
  Uint128 election_id = 3;
  // 這是一個數組,代表一次可以更新多個表項
  repeated Update updates = 4;
  //這個字段是用來處理並發更新問題的。(當一次性更新多條表項時可能會進行亂序更新)
  // CONTINUE_ON_ERROR 當出現錯誤時繼續執行下一條(可能會亂序更新)
  // ROLLBACK_ON_ERROR 當出現問題時回滾(可能會亂序更新)
  // DATAPLANE_ATOMIC 保證更新的原子性
  enum Atomicity {

    CONTINUE_ON_ERROR = 0;

    ROLLBACK_ON_ERROR = 1;

    DATAPLANE_ATOMIC = 2;
  }
  Atomicity atomicity = 5;
}


message Update {
  // 更新類型分為插入,修改和刪除
  enum Type {
    UNSPECIFIED = 0;
    INSERT = 1;
    MODIFY = 2;
    DELETE = 3;
  }
  Type type = 1;
  Entity entity = 2;
}


message Entity {
  oneof entity {
    ExternEntry extern_entry = 1;
    // TableEntry封裝具體的p4表項,此處不再繼續展開
    // 值得注意的是,TableEntry中有一個域是uint32 table_id,這個字段唯一標識一張表(可以在編譯后的配置文件查詢),在執行刪除	  // 請求時TableEntry只需要包含這個參數
    TableEntry table_entry = 2;
    ActionProfileMember action_profile_member = 3;
    ActionProfileGroup action_profile_group = 4;
    MeterEntry meter_entry = 5;
    DirectMeterEntry direct_meter_entry = 6;
    CounterEntry counter_entry = 7;
    DirectCounterEntry direct_counter_entry = 8;
    PacketReplicationEngineEntry packet_replication_engine_entry = 9;
    ValueSetEntry value_set_entry = 10;
    RegisterEntry register_entry = 11;
    DigestEntry digest_entry = 12;
  }
}


message WriteResponse {
}

Entity字段中還可以是其他類別的請求,但是在寫請求中比較少用,具體可以查看官方文檔9. P4 Entity Messages

讀請求

對於讀請求,switch.py中也封裝了對應的方法,可以看到,只需要傳入table_id即可進行讀取

    def ReadTableEntries(self, table_id=None, dry_run=False):
        request = p4runtime_pb2.ReadRequest()
        request.device_id = self.device_id
        entity = request.entities.add()
        # 這里可以看到,底層的接口也是可以一次性讀取多條表項的,但是封裝后的方法一次性只能讀一條
        table_entry = entity.table_entry
        if table_id is not None:
            table_entry.table_id = table_id
        else:
            table_entry.table_id = 0
        if dry_run:
            print("P4Runtime Read:", request)
        else:
            for response in self.client_stub.Read(request):
                yield response

同樣的,查看protobuf文件

message ReadRequest {
  uint64 device_id = 1;
  string role = 3;
  // 只需要傳入對應的key即可,比如對於TableEntity,key就是table_id
  // 這里也可以傳入
  repeated Entity entities = 2;
}

message ReadResponse {
  repeated Entity entities = 1;
}

解釋一下為什么要使用yield返回,下面是來自官方文檔的一段話,大意是說為了防止一次返回的數據量過大,一次批量讀請求可能會被拆分成多個ReadResponse通過流來返回,因此,在上面的python代碼中,使用yield來接收響應。

There is no requirement that each request in the batch will correspond to one ReadResponse message in the stream. The stream-based design for response message is to avoid memory pressure on the P4Runtime server when the Read results in a very large number of entities to be returned. The P4Runtime server is free to break them apart across multiple response messages as it sees fit.

其他

  1. StreamChannel請求。在建立連接時我們已經使用過這個請求。事實上,這個請求還具有一些其他功能,比如說下面展示的PacketOut和PacketIn,可以在數據平面和控制平面之間交換自定義的數據包,再比如IdleTimeoutNotification,當交換機中的表過期時,會得到這樣一個通知。
message StreamMessageRequest {
  oneof update {
    MasterArbitrationUpdate arbitration = 1;
    PacketOut packet = 2;
    DigestListAck digest_ack = 3;
    .google.protobuf.Any other = 4;
  }
}

// Packet sent from the controller to the switch.
message PacketOut {
  bytes payload = 1;
  // This will be based on P4 header annotated as
  // @controller_header("packet_out").
  // At most one P4 header can have this annotation.
  repeated PacketMetadata metadata = 2;
}

message StreamMessageResponse {
  oneof update {
    MasterArbitrationUpdate arbitration = 1;
    PacketIn packet = 2;
    DigestList digest = 3;
    IdleTimeoutNotification idle_timeout_notification = 4;
    .google.protobuf.Any other = 5;
    StreamError error = 6;
  }
}


message PacketIn {
  bytes payload = 1;

  repeated PacketMetadata metadata = 2;
}

  1. 可以通過Capabilities查看交換機支持的P4Runtime版本
rpc Capabilities(CapabilitiesRequest) returns (CapabilitiesResponse) {
  }
  
message CapabilitiesRequest {
}

message CapabilitiesResponse {
  // The full semantic version string (e.g. "1.1.0-rc.1") corresponding to the
  // version of the P4Runtime API currently implemented by the server.
  string p4runtime_api_version = 1;
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM