P4Runtime--基于mininet和bmv2环境的解析


博客园protobuf文件的高亮好像有些问题。。。

实验环境

P4 tutorials的Ubuntu 20.04虚拟机

所有运行的文件都在/home/p4/tutorials/及其子目录下

实验参考材料

p4 tutorials仓库 https://github.com/p4lang/tutorials

p4 runtime的protobuf文件 https://github.com/p4lang/p4runtime/tree/main/proto

PI(p4runtime的服务端实现)的protobuf文件 https://github.com/p4lang/PI/blob/main/proto/p4/tmp/p4config.proto

p4 runtime文档 https://p4.org/p4-spec/p4runtime/main/P4Runtime-Spec.html

交换机的创建(使用grpc)

首先在topology.json中声明交换机,在执行make run以后,会执行run_exercise.py(具体函数调用链略),之后进入P4RuntimeSwitch类的__init__()方法.

   # p4runtime_switch.py
    # 函数体部分只留下关注的部分,其余部分有所删减,后同
    # 该类为P4RuntimeSwitch,继承了P4Switch类,P4Switch类又继承了Switch类,Switch类继承Node类(代表mininet的节点)
    "BMv2 switch with gRPC support"
    next_grpc_port = 50051


    def __init__(self, name, sw_path = None, json_path = None,
                 grpc_port = None,
                 thrift_port = None,
                 pcap_dump = False,
                 log_console = False,
                 verbose = False,
                 device_id = None,
                 enable_debugger = False,
                 log_file = None,
                 **kwargs):
		# name是指定的交换机名字,会在Node类的构造方法中被初始化
        Switch.__init__(self, name, **kwargs)
        
		# 这个json文件是指向p4c后p4的编译器编译出来的json文件,位于build目录下
        if json_path is not None:
            # make sure that the provided JSON file exists
            if not os.path.isfile(json_path):
                error("Invalid JSON file: {}\n".format(json_path))
                exit(1)
            self.json_path = json_path
        else:
            self.json_path = None
		
        # 如果传递了port,就用传递的,否则是用静态变量next_grpc_port(初始值50051),并让否则是用静态变量next_grpc_port++,
        if grpc_port is not None:
            self.grpc_port = grpc_port
        else:
            self.grpc_port = P4RuntimeSwitch.next_grpc_port
            P4RuntimeSwitch.next_grpc_port += 1
		
        # 如果传递了device_id,就用传递的,否则是用静态变量P4Switch类中的静态变量device_id(初始值0),并让否则是用静态变量device_id++
        if device_id is not None:
            self.device_id = device_id
            P4Switch.device_id = max(P4Switch.device_id, device_id)
        else:
            self.device_id = P4Switch.device_id
            P4Switch.device_id += 1
		# 几个比较重要的参数就是name用于标识名字,port标识连接端口,device_id用于标识设备

建立与交换机的连接

从教程中的p4runtime这一节的mycontroller.py中,可以看到官方封装了一个Bmv2SwitchConnection类方法来帮助建立连接,这里的前三个参数和建立交换机时的要一致,最后一个参数为输出的日志文件位置

#/home/p4/tutorials/exercises/p4runtime/mycontroller.py     	  
    	s1 = p4runtime_lib.bmv2.Bmv2SwitchConnection(
            name='s1',
            address='127.0.0.1:50051',
            device_id=0,
            proto_dump_file='logs/s1-p4runtime-requests.txt')

追踪Bmv2SwitchConnection类,发现其继承了SwitchConnection类并重写了buildDeviceConfig方法。

def buildDeviceConfig(bmv2_json_file_path=None):
    "Builds the device config for BMv2"
    device_config = p4config_pb2.P4DeviceConfig()
    device_config.reassign = True
    with open(bmv2_json_file_path) as f:
        device_config.device_data = f.read().encode('utf-8')
    return device_config


class Bmv2SwitchConnection(SwitchConnection):
    def buildDeviceConfig(self, **kwargs):
        return buildDeviceConfig(**kwargs)

进入SwitchConnection类,之前调用的方法正是SwitchConnection类的构造方法

    def __init__(self, name=None, address='127.0.0.1:50051', device_id=0,
                 proto_dump_file=None):
        self.name = name
        # 交换机的ip及端口号
        self.address = address
        self.device_id = device_id
        self.p4info = None
        # 以下大部分代码为grpc的常规写法,不做解析
        self.channel = grpc.insecure_channel(self.address)
        if proto_dump_file is not None:
            interceptor = GrpcRequestLogger(proto_dump_file)
            self.channel = grpc.intercept_channel(self.channel, interceptor)
        # grpc使用的stub,可以认为是一个api的接口
        self.client_stub = p4runtime_pb2_grpc.P4RuntimeStub(self.channel)
        self.requests_stream = IterableQueue()
        self.stream_msg_resp = self.client_stub.StreamChannel(iter(self.requests_stream))
        self.proto_dump_file = proto_dump_file
        # connections为全局变量,维护所有连接的引用
        connections.append(self)

注意self.client_stub.StreamChannel(iter(self.requests_stream))这个方法,StreamChannel是一个rpc服务,按照p4runtime.proto文件中的说法,他有如下作用:

  1. 初始化交换机和控制器的连接
  2. 探测交换机的存活(心跳检测)
  3. 发送/接收包
  // Represents the bidirectional stream between the controller and the
  // switch (initiated by the controller), and is managed for the following
  // purposes:
  // - connection initiation through client arbitration
  // - indicating switch session liveness: the session is live when switch
  //   sends a positive client arbitration update to the controller, and is
  //   considered dead when either the stream breaks or the switch sends a
  //   negative update for client arbitration
  // - the controller sending/receiving packets to/from the switch
  // - streaming of notifications from the switch
  rpc StreamChannel(stream StreamMessageRequest)
      returns (stream StreamMessageResponse) {
  }

到目前为止,控制平面与交换机就成功的建立连接了。

这里统一说明一点,如果有参数不正确或其他错误,GRPC会返回相应的错误类型,具体错误类型可以查看p4runtime文档,以后不再赘述。

设置主节点

接下来mycontroller.py调用了每个交换机的MasterArbitrationUpdate()方法,该方法会将这个控制器设置为master控制器。之所以要这么做,个人猜测是因为官方文档中表明p4交换机支持多个controller组成高可用集群,这时候就需要制定一个主控制器来进行控制(即便只有一个控制器也得手动设置主控制器)

image-20211202194800427

进入MasterArbitrationUpdate()方法

    def MasterArbitrationUpdate(self, dry_run=False, **kwargs):
        # 声明StreamMessageRequest对象
        request = p4runtime_pb2.StreamMessageRequest()
        request.arbitration.device_id = self.device_id
        request.arbitration.election_id.high = 0
        request.arbitration.election_id.low = 1

        if dry_run:
            print("P4Runtime MasterArbitrationUpdate: ", request)
        else:
            self.requests_stream.put(request)
            for item in self.stream_msg_resp:
                return item # just one

查看相关的protobuf文件

message StreamMessageRequest {
  oneof update {
  	// 在该请求中,用的是MasterArbitrationUpdate
    MasterArbitrationUpdate arbitration = 1;
    PacketOut packet = 2;
    DigestListAck digest_ack = 3;
    .google.protobuf.Any other = 4;
  }
}

message MasterArbitrationUpdate {
  uint64 device_id = 1;
  // 如果不指定role_id,则默认具有所有权限
  // The role for which the primary client is being arbitrated. For use-cases
  // where multiple roles are not needed, the controller can leave this unset,
  // implying default role and full pipeline access.
  Role role = 2;
  // The stream RPC with the highest election_id is the primary. The 'primary'
  // controller instance populates this with its latest election_id. Switch
  // populates with the highest election ID it has received from all connected
  // controllers.
  Uint128 election_id = 3;
  // Switch populates this with OK for the client that is the primary, and
  // with an error status for all other connected clients (at every primary
  // client change). The controller does not populate this field.
  .google.rpc.Status status = 4;
}

对于单机使用而言,无需太关注这里的细节,只需要按照官方代码即可。详情可以查看p4runtime文档的5. Client Arbitration and Controller Replication

将p4代码和配置信息安装到交换机中

#/home/p4/tutorials/exercises/p4runtime/mycontroller.py

# P4InfoHelper工具类会读取并解析p4编译器编译得到的xxx.p4.p4info.txt
p4info_helper = p4runtime_lib.helper.P4InfoHelper(p4info_file_path)
# bmv2_json_file_path即p4编译器编译得到的xxx.json
# txt文件中主要记录了p4程序的一些元信息,比如id等,json文件中则记载了具体的p4程序,比如stage,操作等,用于下发给交换机
s1.SetForwardingPipelineConfig(p4info=p4info_helper.p4info,
                               bmv2_json_file_path=bmv2_file_path)

以下是xxx.p4.p4info.txt文件内容,可以看到包含了要使用的交换机模型,表的id,名字,字段的id等信息

pkg_info {
  arch: "v1model"
}
tables {
  preamble {
    id: 37375156
    name: "MyIngress.ipv4_lpm"
    alias: "ipv4_lpm"
  }
  match_fields {
    id: 1
    name: "hdr.ipv4.dstAddr"
    bitwidth: 32
    match_type: LPM
  }
  action_refs {
    id: 28792405
  }
  action_refs {
    id: 25652968
  }
  action_refs {
    id: 21257015
  }
  size: 1024
}
tables {
  preamble {
    id: 38067515
    name: "MyIngress.icmp_lpm"
    alias: "icmp_lpm"
  }
  match_fields {
    id: 1
    name: "hdr.ipv4.dstAddr"
    bitwidth: 32
    match_type: LPM
  }
  action_refs {
    id: 25652968
  }
  action_refs {
    id: 21257015
  }
  size: 1024
}
actions {
  preamble {
    id: 21257015
    name: "NoAction"
    alias: "NoAction"
    annotations: "@noWarn(\"unused\")"
  }
}
actions {
  preamble {
    id: 25652968
    name: "MyIngress.drop"
    alias: "drop"
  }
}
actions {
  preamble {
    id: 28792405
    name: "MyIngress.ipv4_forward"
    alias: "ipv4_forward"
  }
  params {
    id: 1
    name: "dstAddr"
    bitwidth: 48
  }
  params {
    id: 2
    name: "port"
    bitwidth: 9
  }
}
type_info {
}

查看SwitchConnection类的SetForwardingPipelineConfig()方法源码

    def SetForwardingPipelineConfig(self, p4info, dry_run=False, **kwargs):
        # 这里的buildDeviceConfig方法被Bmv2SwitchConnection类重写过
        device_config = self.buildDeviceConfig(**kwargs)
        # 构造请求,参数详见下方解释
        request = p4runtime_pb2.SetForwardingPipelineConfigRequest()
        # 和设置主节点有关,单机情况视为固定参数
        request.election_id.low = 1
        request.device_id = self.device_id
        config = request.config
		# CopyFrom()和SerializeToString()都是Protobuf提供的函数,前者执行深拷贝,后者对该对象进行序列化
        config.p4info.CopyFrom(p4info)
        config.p4_device_config = device_config.SerializeToString()
		# 设置action,详见下方解释
        request.action = p4runtime_pb2.SetForwardingPipelineConfigRequest.VERIFY_AND_COMMIT
        # 干跑的概念一般用不上,更多的是用于测试参数是否构造正确,大部分情况是直接发送请求,这里只需关注else分支
        if dry_run:
            print("P4Runtime SetForwardingPipelineConfig:", request)
        else:
            # 发送请求
            self.client_stub.SetForwardingPipelineConfig(request)

查看protobuf文件

  // Sets the P4 forwarding-pipeline config.
  rpc SetForwardingPipelineConfig(SetForwardingPipelineConfigRequest)
      returns (SetForwardingPipelineConfigResponse) {
  }

message SetForwardingPipelineConfigRequest {
  enum Action {
    UNSPECIFIED = 0;

    VERIFY = 1;

    VERIFY_AND_SAVE = 2;

    VERIFY_AND_COMMIT = 3;

    COMMIT = 4;

    RECONCILE_AND_COMMIT = 5;
  }
  uint64 device_id = 1;
  uint64 role_id = 2 [deprecated=true];
  // 官方的例子中并未传递role字段,估计是单机测试不需要传递
  string role = 6;
  Uint128 election_id = 3;
  Action action = 4;
  ForwardingPipelineConfig config = 5;
}


message ForwardingPipelineConfig {
  // 这里存储着要下发的p4程序,如果请求成功,p4程序就会被安装到交换机上
  config.v1.P4Info p4info = 1;
  // Target-specific P4 configuration.
  bytes p4_device_config = 2;

  // cookie用于唯一标识一个配置
  message Cookie {
    uint64 cookie = 1;
  }
  Cookie cookie = 3;
}

// 这是一个空对象,也就是说,如果没有返回错误,则SetForwardingPipelineConfig请求会返回一个空对象
message SetForwardingPipelineConfigResponse {
}

在解释action之前,先来看ForwardingPipelineConfig中的p4_device_config字段,这个字段是一个bytes类型,在代码中是通过config.p4_device_config = device_config.SerializeToString()来进行赋值的。这是因为交换机的config是与具体交换机有关的(Target-specific),不同型号的交换机可能会有不同的config,因此,这里不能声明config中具体有哪些字段,必须序列化为字节数组传递给交换机。

接着来看action,因为交换机可以接受的配置不同,所以交换机必须要验证自己能否接收这样的配置信息,也要决定是暂存这份设置还是立即应用,所以衍生出了不同的action:

  • UNSPECIFIED: 保留字段,不会被使用
  • VERIFY: 仅对配置进行验证,会返回验证的结果
  • VERIFY_AND_SAVE:对配置进行验证,如果验证通过,则保存在交换机中
  • VERIFY_AND_COMMIT:对配置进行验证,如果验证通过,则保存并应用该配置
  • COMMIT:应用交换机保存的最新配置,如果使用该类型,则交换机中必须保存有一份配置且本次请求不能携带配置
  • RECONCILE_AND_COMMIT:和VERIFY_AND_COMMIT类似,不同点在于VERIFY_AND_COMMIT在应用是会清空目前的转发状态,而RECONCILE_AND_COMMIT则会保留(这里的转发状态可能指的是正在经过交换机处理的包?)

接下来查看与SetForwardingPipelineConfig对应的GetForwardingPipelineConfig方法,官方的工具类并未对这个请求作封装,因此需要自行调用grpc服务。查看protobuf文件中对应的类型。

  rpc GetForwardingPipelineConfig(GetForwardingPipelineConfigRequest)
      returns (GetForwardingPipelineConfigResponse) {
  }
  
  message GetForwardingPipelineConfigRequest {
  enum ResponseType {
    //返回所有信息
    ALL = 0;
    //只返回用于唯一标识一个配置的cookie
    COOKIE_ONLY = 1;
    //返回p4程序的信息和cookie
    P4INFO_AND_COOKIE = 2;
    //返回交换机的配置信息和cookie
    DEVICE_CONFIG_AND_COOKIE = 3;
  }
  uint64 device_id = 1;
  ResponseType response_type = 2;
}

message GetForwardingPipelineConfigResponse {
  ForwardingPipelineConfig config = 1;
}

操作表项(TableEntry)

经过前面的步骤,我们已经成功地与p4交换机建立了连接,并且下发了p4代码,接下来,就是通过控制平面下发对应的表项来控制p4交换机了。

写请求

对于下发表项,switch.py中有封装好的方法,而更新和删除表项都只能自己写grpc请求

 	# mycontroller.py
    # buildTableEntry方法中会读取之前传入的p4配置文件来获取table_id
    table_entry = p4info_helper.buildTableEntry(
            table_name="MyIngress.ipv4_lpm",
            match_fields={
                "hdr.ipv4.dstAddr": ("10.0.2.2", 32)
            },
            action_name="MyIngress.ipv4_forward",
            action_params={
                "dstAddr": "08:00:00:00:02:22",
                "port": 2
            })
        s1.WriteTableEntry(table_entry)
        
        
        
     # switch.py
    def WriteTableEntry(self, table_entry, dry_run=False):
        request = p4runtime_pb2.WriteRequest()
        request.device_id = self.device_id
        request.election_id.low = 1
        # 添加一条表项,如果使用官方封装的方法,一次只能下发一条表项,事实上,底层的接口可以一次性下发多条表项
        update = request.updates.add()
        if table_entry.is_default_action:
            update.type = p4runtime_pb2.Update.MODIFY
        else:
            update.type = p4runtime_pb2.Update.INSERT
        update.entity.table_entry.CopyFrom(table_entry)
        if dry_run:
            print("P4Runtime Write:", request)
        else:
            self.client_stub.Write(request)

查看protobuf文件

rpc Write(WriteRequest) returns (WriteResponse) {
  }
 
 
 message WriteRequest {
  uint64 device_id = 1;
  uint64 role_id = 2 [deprecated=true];
  string role = 6;
  Uint128 election_id = 3;
  // 这是一个数组,代表一次可以更新多个表项
  repeated Update updates = 4;
  //这个字段是用来处理并发更新问题的。(当一次性更新多条表项时可能会进行乱序更新)
  // CONTINUE_ON_ERROR 当出现错误时继续执行下一条(可能会乱序更新)
  // ROLLBACK_ON_ERROR 当出现问题时回滚(可能会乱序更新)
  // DATAPLANE_ATOMIC 保证更新的原子性
  enum Atomicity {

    CONTINUE_ON_ERROR = 0;

    ROLLBACK_ON_ERROR = 1;

    DATAPLANE_ATOMIC = 2;
  }
  Atomicity atomicity = 5;
}


message Update {
  // 更新类型分为插入,修改和删除
  enum Type {
    UNSPECIFIED = 0;
    INSERT = 1;
    MODIFY = 2;
    DELETE = 3;
  }
  Type type = 1;
  Entity entity = 2;
}


message Entity {
  oneof entity {
    ExternEntry extern_entry = 1;
    // TableEntry封装具体的p4表项,此处不再继续展开
    // 值得注意的是,TableEntry中有一个域是uint32 table_id,这个字段唯一标识一张表(可以在编译后的配置文件查询),在执行删除	  // 请求时TableEntry只需要包含这个参数
    TableEntry table_entry = 2;
    ActionProfileMember action_profile_member = 3;
    ActionProfileGroup action_profile_group = 4;
    MeterEntry meter_entry = 5;
    DirectMeterEntry direct_meter_entry = 6;
    CounterEntry counter_entry = 7;
    DirectCounterEntry direct_counter_entry = 8;
    PacketReplicationEngineEntry packet_replication_engine_entry = 9;
    ValueSetEntry value_set_entry = 10;
    RegisterEntry register_entry = 11;
    DigestEntry digest_entry = 12;
  }
}


message WriteResponse {
}

Entity字段中还可以是其他类别的请求,但是在写请求中比较少用,具体可以查看官方文档9. P4 Entity Messages

读请求

对于读请求,switch.py中也封装了对应的方法,可以看到,只需要传入table_id即可进行读取

    def ReadTableEntries(self, table_id=None, dry_run=False):
        request = p4runtime_pb2.ReadRequest()
        request.device_id = self.device_id
        entity = request.entities.add()
        # 这里可以看到,底层的接口也是可以一次性读取多条表项的,但是封装后的方法一次性只能读一条
        table_entry = entity.table_entry
        if table_id is not None:
            table_entry.table_id = table_id
        else:
            table_entry.table_id = 0
        if dry_run:
            print("P4Runtime Read:", request)
        else:
            for response in self.client_stub.Read(request):
                yield response

同样的,查看protobuf文件

message ReadRequest {
  uint64 device_id = 1;
  string role = 3;
  // 只需要传入对应的key即可,比如对于TableEntity,key就是table_id
  // 这里也可以传入
  repeated Entity entities = 2;
}

message ReadResponse {
  repeated Entity entities = 1;
}

解释一下为什么要使用yield返回,下面是来自官方文档的一段话,大意是说为了防止一次返回的数据量过大,一次批量读请求可能会被拆分成多个ReadResponse通过流来返回,因此,在上面的python代码中,使用yield来接收响应。

There is no requirement that each request in the batch will correspond to one ReadResponse message in the stream. The stream-based design for response message is to avoid memory pressure on the P4Runtime server when the Read results in a very large number of entities to be returned. The P4Runtime server is free to break them apart across multiple response messages as it sees fit.

其他

  1. StreamChannel请求。在建立连接时我们已经使用过这个请求。事实上,这个请求还具有一些其他功能,比如说下面展示的PacketOut和PacketIn,可以在数据平面和控制平面之间交换自定义的数据包,再比如IdleTimeoutNotification,当交换机中的表过期时,会得到这样一个通知。
message StreamMessageRequest {
  oneof update {
    MasterArbitrationUpdate arbitration = 1;
    PacketOut packet = 2;
    DigestListAck digest_ack = 3;
    .google.protobuf.Any other = 4;
  }
}

// Packet sent from the controller to the switch.
message PacketOut {
  bytes payload = 1;
  // This will be based on P4 header annotated as
  // @controller_header("packet_out").
  // At most one P4 header can have this annotation.
  repeated PacketMetadata metadata = 2;
}

message StreamMessageResponse {
  oneof update {
    MasterArbitrationUpdate arbitration = 1;
    PacketIn packet = 2;
    DigestList digest = 3;
    IdleTimeoutNotification idle_timeout_notification = 4;
    .google.protobuf.Any other = 5;
    StreamError error = 6;
  }
}


message PacketIn {
  bytes payload = 1;

  repeated PacketMetadata metadata = 2;
}

  1. 可以通过Capabilities查看交换机支持的P4Runtime版本
rpc Capabilities(CapabilitiesRequest) returns (CapabilitiesResponse) {
  }
  
message CapabilitiesRequest {
}

message CapabilitiesResponse {
  // The full semantic version string (e.g. "1.1.0-rc.1") corresponding to the
  // version of the P4Runtime API currently implemented by the server.
  string p4runtime_api_version = 1;
}


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM