博客园protobuf文件的高亮好像有些问题。。。
实验环境
P4 tutorials的Ubuntu 20.04虚拟机
所有运行的文件都在/home/p4/tutorials/及其子目录下
实验参考材料
p4 tutorials仓库 https://github.com/p4lang/tutorials
p4 runtime的protobuf文件 https://github.com/p4lang/p4runtime/tree/main/proto
PI(p4runtime的服务端实现)的protobuf文件 https://github.com/p4lang/PI/blob/main/proto/p4/tmp/p4config.proto
p4 runtime文档 https://p4.org/p4-spec/p4runtime/main/P4Runtime-Spec.html
交换机的创建(使用grpc)
首先在topology.json中声明交换机,在执行make run以后,会执行run_exercise.py(具体函数调用链略),之后进入P4RuntimeSwitch类的__init__()方法.
# p4runtime_switch.py
# 函数体部分只留下关注的部分,其余部分有所删减,后同
# 该类为P4RuntimeSwitch,继承了P4Switch类,P4Switch类又继承了Switch类,Switch类继承Node类(代表mininet的节点)
"BMv2 switch with gRPC support"
next_grpc_port = 50051
def __init__(self, name, sw_path = None, json_path = None,
grpc_port = None,
thrift_port = None,
pcap_dump = False,
log_console = False,
verbose = False,
device_id = None,
enable_debugger = False,
log_file = None,
**kwargs):
# name是指定的交换机名字,会在Node类的构造方法中被初始化
Switch.__init__(self, name, **kwargs)
# 这个json文件是指向p4c后p4的编译器编译出来的json文件,位于build目录下
if json_path is not None:
# make sure that the provided JSON file exists
if not os.path.isfile(json_path):
error("Invalid JSON file: {}\n".format(json_path))
exit(1)
self.json_path = json_path
else:
self.json_path = None
# 如果传递了port,就用传递的,否则是用静态变量next_grpc_port(初始值50051),并让否则是用静态变量next_grpc_port++,
if grpc_port is not None:
self.grpc_port = grpc_port
else:
self.grpc_port = P4RuntimeSwitch.next_grpc_port
P4RuntimeSwitch.next_grpc_port += 1
# 如果传递了device_id,就用传递的,否则是用静态变量P4Switch类中的静态变量device_id(初始值0),并让否则是用静态变量device_id++
if device_id is not None:
self.device_id = device_id
P4Switch.device_id = max(P4Switch.device_id, device_id)
else:
self.device_id = P4Switch.device_id
P4Switch.device_id += 1
# 几个比较重要的参数就是name用于标识名字,port标识连接端口,device_id用于标识设备
建立与交换机的连接
从教程中的p4runtime这一节的mycontroller.py中,可以看到官方封装了一个Bmv2SwitchConnection类方法来帮助建立连接,这里的前三个参数和建立交换机时的要一致,最后一个参数为输出的日志文件位置
#/home/p4/tutorials/exercises/p4runtime/mycontroller.py
s1 = p4runtime_lib.bmv2.Bmv2SwitchConnection(
name='s1',
address='127.0.0.1:50051',
device_id=0,
proto_dump_file='logs/s1-p4runtime-requests.txt')
追踪Bmv2SwitchConnection类,发现其继承了SwitchConnection类并重写了buildDeviceConfig方法。
def buildDeviceConfig(bmv2_json_file_path=None):
"Builds the device config for BMv2"
device_config = p4config_pb2.P4DeviceConfig()
device_config.reassign = True
with open(bmv2_json_file_path) as f:
device_config.device_data = f.read().encode('utf-8')
return device_config
class Bmv2SwitchConnection(SwitchConnection):
def buildDeviceConfig(self, **kwargs):
return buildDeviceConfig(**kwargs)
进入SwitchConnection类,之前调用的方法正是SwitchConnection类的构造方法
def __init__(self, name=None, address='127.0.0.1:50051', device_id=0,
proto_dump_file=None):
self.name = name
# 交换机的ip及端口号
self.address = address
self.device_id = device_id
self.p4info = None
# 以下大部分代码为grpc的常规写法,不做解析
self.channel = grpc.insecure_channel(self.address)
if proto_dump_file is not None:
interceptor = GrpcRequestLogger(proto_dump_file)
self.channel = grpc.intercept_channel(self.channel, interceptor)
# grpc使用的stub,可以认为是一个api的接口
self.client_stub = p4runtime_pb2_grpc.P4RuntimeStub(self.channel)
self.requests_stream = IterableQueue()
self.stream_msg_resp = self.client_stub.StreamChannel(iter(self.requests_stream))
self.proto_dump_file = proto_dump_file
# connections为全局变量,维护所有连接的引用
connections.append(self)
注意self.client_stub.StreamChannel(iter(self.requests_stream))这个方法,StreamChannel是一个rpc服务,按照p4runtime.proto文件中的说法,他有如下作用:
- 初始化交换机和控制器的连接
- 探测交换机的存活(心跳检测)
- 发送/接收包
// Represents the bidirectional stream between the controller and the
// switch (initiated by the controller), and is managed for the following
// purposes:
// - connection initiation through client arbitration
// - indicating switch session liveness: the session is live when switch
// sends a positive client arbitration update to the controller, and is
// considered dead when either the stream breaks or the switch sends a
// negative update for client arbitration
// - the controller sending/receiving packets to/from the switch
// - streaming of notifications from the switch
rpc StreamChannel(stream StreamMessageRequest)
returns (stream StreamMessageResponse) {
}
到目前为止,控制平面与交换机就成功的建立连接了。
这里统一说明一点,如果有参数不正确或其他错误,GRPC会返回相应的错误类型,具体错误类型可以查看p4runtime文档,以后不再赘述。
设置主节点
接下来mycontroller.py调用了每个交换机的MasterArbitrationUpdate()方法,该方法会将这个控制器设置为master控制器。之所以要这么做,个人猜测是因为官方文档中表明p4交换机支持多个controller组成高可用集群,这时候就需要制定一个主控制器来进行控制(即便只有一个控制器也得手动设置主控制器)
进入MasterArbitrationUpdate()方法
def MasterArbitrationUpdate(self, dry_run=False, **kwargs):
# 声明StreamMessageRequest对象
request = p4runtime_pb2.StreamMessageRequest()
request.arbitration.device_id = self.device_id
request.arbitration.election_id.high = 0
request.arbitration.election_id.low = 1
if dry_run:
print("P4Runtime MasterArbitrationUpdate: ", request)
else:
self.requests_stream.put(request)
for item in self.stream_msg_resp:
return item # just one
查看相关的protobuf文件
message StreamMessageRequest {
oneof update {
// 在该请求中,用的是MasterArbitrationUpdate
MasterArbitrationUpdate arbitration = 1;
PacketOut packet = 2;
DigestListAck digest_ack = 3;
.google.protobuf.Any other = 4;
}
}
message MasterArbitrationUpdate {
uint64 device_id = 1;
// 如果不指定role_id,则默认具有所有权限
// The role for which the primary client is being arbitrated. For use-cases
// where multiple roles are not needed, the controller can leave this unset,
// implying default role and full pipeline access.
Role role = 2;
// The stream RPC with the highest election_id is the primary. The 'primary'
// controller instance populates this with its latest election_id. Switch
// populates with the highest election ID it has received from all connected
// controllers.
Uint128 election_id = 3;
// Switch populates this with OK for the client that is the primary, and
// with an error status for all other connected clients (at every primary
// client change). The controller does not populate this field.
.google.rpc.Status status = 4;
}
对于单机使用而言,无需太关注这里的细节,只需要按照官方代码即可。详情可以查看p4runtime文档的5. Client Arbitration and Controller Replication
将p4代码和配置信息安装到交换机中
#/home/p4/tutorials/exercises/p4runtime/mycontroller.py
# P4InfoHelper工具类会读取并解析p4编译器编译得到的xxx.p4.p4info.txt
p4info_helper = p4runtime_lib.helper.P4InfoHelper(p4info_file_path)
# bmv2_json_file_path即p4编译器编译得到的xxx.json
# txt文件中主要记录了p4程序的一些元信息,比如id等,json文件中则记载了具体的p4程序,比如stage,操作等,用于下发给交换机
s1.SetForwardingPipelineConfig(p4info=p4info_helper.p4info,
bmv2_json_file_path=bmv2_file_path)
以下是xxx.p4.p4info.txt文件内容,可以看到包含了要使用的交换机模型,表的id,名字,字段的id等信息
pkg_info {
arch: "v1model"
}
tables {
preamble {
id: 37375156
name: "MyIngress.ipv4_lpm"
alias: "ipv4_lpm"
}
match_fields {
id: 1
name: "hdr.ipv4.dstAddr"
bitwidth: 32
match_type: LPM
}
action_refs {
id: 28792405
}
action_refs {
id: 25652968
}
action_refs {
id: 21257015
}
size: 1024
}
tables {
preamble {
id: 38067515
name: "MyIngress.icmp_lpm"
alias: "icmp_lpm"
}
match_fields {
id: 1
name: "hdr.ipv4.dstAddr"
bitwidth: 32
match_type: LPM
}
action_refs {
id: 25652968
}
action_refs {
id: 21257015
}
size: 1024
}
actions {
preamble {
id: 21257015
name: "NoAction"
alias: "NoAction"
annotations: "@noWarn(\"unused\")"
}
}
actions {
preamble {
id: 25652968
name: "MyIngress.drop"
alias: "drop"
}
}
actions {
preamble {
id: 28792405
name: "MyIngress.ipv4_forward"
alias: "ipv4_forward"
}
params {
id: 1
name: "dstAddr"
bitwidth: 48
}
params {
id: 2
name: "port"
bitwidth: 9
}
}
type_info {
}
查看SwitchConnection类的SetForwardingPipelineConfig()方法源码
def SetForwardingPipelineConfig(self, p4info, dry_run=False, **kwargs):
# 这里的buildDeviceConfig方法被Bmv2SwitchConnection类重写过
device_config = self.buildDeviceConfig(**kwargs)
# 构造请求,参数详见下方解释
request = p4runtime_pb2.SetForwardingPipelineConfigRequest()
# 和设置主节点有关,单机情况视为固定参数
request.election_id.low = 1
request.device_id = self.device_id
config = request.config
# CopyFrom()和SerializeToString()都是Protobuf提供的函数,前者执行深拷贝,后者对该对象进行序列化
config.p4info.CopyFrom(p4info)
config.p4_device_config = device_config.SerializeToString()
# 设置action,详见下方解释
request.action = p4runtime_pb2.SetForwardingPipelineConfigRequest.VERIFY_AND_COMMIT
# 干跑的概念一般用不上,更多的是用于测试参数是否构造正确,大部分情况是直接发送请求,这里只需关注else分支
if dry_run:
print("P4Runtime SetForwardingPipelineConfig:", request)
else:
# 发送请求
self.client_stub.SetForwardingPipelineConfig(request)
查看protobuf文件
// Sets the P4 forwarding-pipeline config.
rpc SetForwardingPipelineConfig(SetForwardingPipelineConfigRequest)
returns (SetForwardingPipelineConfigResponse) {
}
message SetForwardingPipelineConfigRequest {
enum Action {
UNSPECIFIED = 0;
VERIFY = 1;
VERIFY_AND_SAVE = 2;
VERIFY_AND_COMMIT = 3;
COMMIT = 4;
RECONCILE_AND_COMMIT = 5;
}
uint64 device_id = 1;
uint64 role_id = 2 [deprecated=true];
// 官方的例子中并未传递role字段,估计是单机测试不需要传递
string role = 6;
Uint128 election_id = 3;
Action action = 4;
ForwardingPipelineConfig config = 5;
}
message ForwardingPipelineConfig {
// 这里存储着要下发的p4程序,如果请求成功,p4程序就会被安装到交换机上
config.v1.P4Info p4info = 1;
// Target-specific P4 configuration.
bytes p4_device_config = 2;
// cookie用于唯一标识一个配置
message Cookie {
uint64 cookie = 1;
}
Cookie cookie = 3;
}
// 这是一个空对象,也就是说,如果没有返回错误,则SetForwardingPipelineConfig请求会返回一个空对象
message SetForwardingPipelineConfigResponse {
}
在解释action之前,先来看ForwardingPipelineConfig中的p4_device_config字段,这个字段是一个bytes类型,在代码中是通过config.p4_device_config = device_config.SerializeToString()来进行赋值的。这是因为交换机的config是与具体交换机有关的(Target-specific),不同型号的交换机可能会有不同的config,因此,这里不能声明config中具体有哪些字段,必须序列化为字节数组传递给交换机。
接着来看action,因为交换机可以接受的配置不同,所以交换机必须要验证自己能否接收这样的配置信息,也要决定是暂存这份设置还是立即应用,所以衍生出了不同的action:
- UNSPECIFIED: 保留字段,不会被使用
- VERIFY: 仅对配置进行验证,会返回验证的结果
- VERIFY_AND_SAVE:对配置进行验证,如果验证通过,则保存在交换机中
- VERIFY_AND_COMMIT:对配置进行验证,如果验证通过,则保存并应用该配置
- COMMIT:应用交换机保存的最新配置,如果使用该类型,则交换机中必须保存有一份配置且本次请求不能携带配置
- RECONCILE_AND_COMMIT:和VERIFY_AND_COMMIT类似,不同点在于VERIFY_AND_COMMIT在应用是会清空目前的转发状态,而RECONCILE_AND_COMMIT则会保留(这里的转发状态可能指的是正在经过交换机处理的包?)
接下来查看与SetForwardingPipelineConfig对应的GetForwardingPipelineConfig方法,官方的工具类并未对这个请求作封装,因此需要自行调用grpc服务。查看protobuf文件中对应的类型。
rpc GetForwardingPipelineConfig(GetForwardingPipelineConfigRequest)
returns (GetForwardingPipelineConfigResponse) {
}
message GetForwardingPipelineConfigRequest {
enum ResponseType {
//返回所有信息
ALL = 0;
//只返回用于唯一标识一个配置的cookie
COOKIE_ONLY = 1;
//返回p4程序的信息和cookie
P4INFO_AND_COOKIE = 2;
//返回交换机的配置信息和cookie
DEVICE_CONFIG_AND_COOKIE = 3;
}
uint64 device_id = 1;
ResponseType response_type = 2;
}
message GetForwardingPipelineConfigResponse {
ForwardingPipelineConfig config = 1;
}
操作表项(TableEntry)
经过前面的步骤,我们已经成功地与p4交换机建立了连接,并且下发了p4代码,接下来,就是通过控制平面下发对应的表项来控制p4交换机了。
写请求
对于下发表项,switch.py中有封装好的方法,而更新和删除表项都只能自己写grpc请求
# mycontroller.py
# buildTableEntry方法中会读取之前传入的p4配置文件来获取table_id
table_entry = p4info_helper.buildTableEntry(
table_name="MyIngress.ipv4_lpm",
match_fields={
"hdr.ipv4.dstAddr": ("10.0.2.2", 32)
},
action_name="MyIngress.ipv4_forward",
action_params={
"dstAddr": "08:00:00:00:02:22",
"port": 2
})
s1.WriteTableEntry(table_entry)
# switch.py
def WriteTableEntry(self, table_entry, dry_run=False):
request = p4runtime_pb2.WriteRequest()
request.device_id = self.device_id
request.election_id.low = 1
# 添加一条表项,如果使用官方封装的方法,一次只能下发一条表项,事实上,底层的接口可以一次性下发多条表项
update = request.updates.add()
if table_entry.is_default_action:
update.type = p4runtime_pb2.Update.MODIFY
else:
update.type = p4runtime_pb2.Update.INSERT
update.entity.table_entry.CopyFrom(table_entry)
if dry_run:
print("P4Runtime Write:", request)
else:
self.client_stub.Write(request)
查看protobuf文件
rpc Write(WriteRequest) returns (WriteResponse) {
}
message WriteRequest {
uint64 device_id = 1;
uint64 role_id = 2 [deprecated=true];
string role = 6;
Uint128 election_id = 3;
// 这是一个数组,代表一次可以更新多个表项
repeated Update updates = 4;
//这个字段是用来处理并发更新问题的。(当一次性更新多条表项时可能会进行乱序更新)
// CONTINUE_ON_ERROR 当出现错误时继续执行下一条(可能会乱序更新)
// ROLLBACK_ON_ERROR 当出现问题时回滚(可能会乱序更新)
// DATAPLANE_ATOMIC 保证更新的原子性
enum Atomicity {
CONTINUE_ON_ERROR = 0;
ROLLBACK_ON_ERROR = 1;
DATAPLANE_ATOMIC = 2;
}
Atomicity atomicity = 5;
}
message Update {
// 更新类型分为插入,修改和删除
enum Type {
UNSPECIFIED = 0;
INSERT = 1;
MODIFY = 2;
DELETE = 3;
}
Type type = 1;
Entity entity = 2;
}
message Entity {
oneof entity {
ExternEntry extern_entry = 1;
// TableEntry封装具体的p4表项,此处不再继续展开
// 值得注意的是,TableEntry中有一个域是uint32 table_id,这个字段唯一标识一张表(可以在编译后的配置文件查询),在执行删除 // 请求时TableEntry只需要包含这个参数
TableEntry table_entry = 2;
ActionProfileMember action_profile_member = 3;
ActionProfileGroup action_profile_group = 4;
MeterEntry meter_entry = 5;
DirectMeterEntry direct_meter_entry = 6;
CounterEntry counter_entry = 7;
DirectCounterEntry direct_counter_entry = 8;
PacketReplicationEngineEntry packet_replication_engine_entry = 9;
ValueSetEntry value_set_entry = 10;
RegisterEntry register_entry = 11;
DigestEntry digest_entry = 12;
}
}
message WriteResponse {
}
Entity字段中还可以是其他类别的请求,但是在写请求中比较少用,具体可以查看官方文档9. P4 Entity Messages
读请求
对于读请求,switch.py中也封装了对应的方法,可以看到,只需要传入table_id即可进行读取
def ReadTableEntries(self, table_id=None, dry_run=False):
request = p4runtime_pb2.ReadRequest()
request.device_id = self.device_id
entity = request.entities.add()
# 这里可以看到,底层的接口也是可以一次性读取多条表项的,但是封装后的方法一次性只能读一条
table_entry = entity.table_entry
if table_id is not None:
table_entry.table_id = table_id
else:
table_entry.table_id = 0
if dry_run:
print("P4Runtime Read:", request)
else:
for response in self.client_stub.Read(request):
yield response
同样的,查看protobuf文件
message ReadRequest {
uint64 device_id = 1;
string role = 3;
// 只需要传入对应的key即可,比如对于TableEntity,key就是table_id
// 这里也可以传入
repeated Entity entities = 2;
}
message ReadResponse {
repeated Entity entities = 1;
}
解释一下为什么要使用yield返回,下面是来自官方文档的一段话,大意是说为了防止一次返回的数据量过大,一次批量读请求可能会被拆分成多个ReadResponse通过流来返回,因此,在上面的python代码中,使用yield来接收响应。
There is no requirement that each request in the batch will correspond to one ReadResponse message in the stream. The stream-based design for response message is to avoid memory pressure on the P4Runtime server when the Read results in a very large number of entities to be returned. The P4Runtime server is free to break them apart across multiple response messages as it sees fit.
其他
- StreamChannel请求。在建立连接时我们已经使用过这个请求。事实上,这个请求还具有一些其他功能,比如说下面展示的PacketOut和PacketIn,可以在数据平面和控制平面之间交换自定义的数据包,再比如IdleTimeoutNotification,当交换机中的表过期时,会得到这样一个通知。
message StreamMessageRequest {
oneof update {
MasterArbitrationUpdate arbitration = 1;
PacketOut packet = 2;
DigestListAck digest_ack = 3;
.google.protobuf.Any other = 4;
}
}
// Packet sent from the controller to the switch.
message PacketOut {
bytes payload = 1;
// This will be based on P4 header annotated as
// @controller_header("packet_out").
// At most one P4 header can have this annotation.
repeated PacketMetadata metadata = 2;
}
message StreamMessageResponse {
oneof update {
MasterArbitrationUpdate arbitration = 1;
PacketIn packet = 2;
DigestList digest = 3;
IdleTimeoutNotification idle_timeout_notification = 4;
.google.protobuf.Any other = 5;
StreamError error = 6;
}
}
message PacketIn {
bytes payload = 1;
repeated PacketMetadata metadata = 2;
}
- 可以通过Capabilities查看交换机支持的P4Runtime版本
rpc Capabilities(CapabilitiesRequest) returns (CapabilitiesResponse) {
}
message CapabilitiesRequest {
}
message CapabilitiesResponse {
// The full semantic version string (e.g. "1.1.0-rc.1") corresponding to the
// version of the P4Runtime API currently implemented by the server.
string p4runtime_api_version = 1;
}