博客園protobuf文件的高亮好像有些問題。。。
實驗環境
P4 tutorials的Ubuntu 20.04虛擬機
所有運行的文件都在/home/p4/tutorials/及其子目錄下
實驗參考材料
p4 tutorials倉庫 https://github.com/p4lang/tutorials
p4 runtime的protobuf文件 https://github.com/p4lang/p4runtime/tree/main/proto
PI(p4runtime的服務端實現)的protobuf文件 https://github.com/p4lang/PI/blob/main/proto/p4/tmp/p4config.proto
p4 runtime文檔 https://p4.org/p4-spec/p4runtime/main/P4Runtime-Spec.html
交換機的創建(使用grpc)
首先在topology.json中聲明交換機,在執行make run以后,會執行run_exercise.py(具體函數調用鏈略),之后進入P4RuntimeSwitch類的__init__()方法.
# p4runtime_switch.py
# 函數體部分只留下關注的部分,其余部分有所刪減,后同
# 該類為P4RuntimeSwitch,繼承了P4Switch類,P4Switch類又繼承了Switch類,Switch類繼承Node類(代表mininet的節點)
"BMv2 switch with gRPC support"
next_grpc_port = 50051
def __init__(self, name, sw_path = None, json_path = None,
grpc_port = None,
thrift_port = None,
pcap_dump = False,
log_console = False,
verbose = False,
device_id = None,
enable_debugger = False,
log_file = None,
**kwargs):
# name是指定的交換機名字,會在Node類的構造方法中被初始化
Switch.__init__(self, name, **kwargs)
# 這個json文件是指向p4c后p4的編譯器編譯出來的json文件,位於build目錄下
if json_path is not None:
# make sure that the provided JSON file exists
if not os.path.isfile(json_path):
error("Invalid JSON file: {}\n".format(json_path))
exit(1)
self.json_path = json_path
else:
self.json_path = None
# 如果傳遞了port,就用傳遞的,否則是用靜態變量next_grpc_port(初始值50051),並讓否則是用靜態變量next_grpc_port++,
if grpc_port is not None:
self.grpc_port = grpc_port
else:
self.grpc_port = P4RuntimeSwitch.next_grpc_port
P4RuntimeSwitch.next_grpc_port += 1
# 如果傳遞了device_id,就用傳遞的,否則是用靜態變量P4Switch類中的靜態變量device_id(初始值0),並讓否則是用靜態變量device_id++
if device_id is not None:
self.device_id = device_id
P4Switch.device_id = max(P4Switch.device_id, device_id)
else:
self.device_id = P4Switch.device_id
P4Switch.device_id += 1
# 幾個比較重要的參數就是name用於標識名字,port標識連接端口,device_id用於標識設備
建立與交換機的連接
從教程中的p4runtime這一節的mycontroller.py中,可以看到官方封裝了一個Bmv2SwitchConnection類方法來幫助建立連接,這里的前三個參數和建立交換機時的要一致,最后一個參數為輸出的日志文件位置
#/home/p4/tutorials/exercises/p4runtime/mycontroller.py
s1 = p4runtime_lib.bmv2.Bmv2SwitchConnection(
name='s1',
address='127.0.0.1:50051',
device_id=0,
proto_dump_file='logs/s1-p4runtime-requests.txt')
追蹤Bmv2SwitchConnection類,發現其繼承了SwitchConnection類並重寫了buildDeviceConfig方法。
def buildDeviceConfig(bmv2_json_file_path=None):
"Builds the device config for BMv2"
device_config = p4config_pb2.P4DeviceConfig()
device_config.reassign = True
with open(bmv2_json_file_path) as f:
device_config.device_data = f.read().encode('utf-8')
return device_config
class Bmv2SwitchConnection(SwitchConnection):
def buildDeviceConfig(self, **kwargs):
return buildDeviceConfig(**kwargs)
進入SwitchConnection類,之前調用的方法正是SwitchConnection類的構造方法
def __init__(self, name=None, address='127.0.0.1:50051', device_id=0,
proto_dump_file=None):
self.name = name
# 交換機的ip及端口號
self.address = address
self.device_id = device_id
self.p4info = None
# 以下大部分代碼為grpc的常規寫法,不做解析
self.channel = grpc.insecure_channel(self.address)
if proto_dump_file is not None:
interceptor = GrpcRequestLogger(proto_dump_file)
self.channel = grpc.intercept_channel(self.channel, interceptor)
# grpc使用的stub,可以認為是一個api的接口
self.client_stub = p4runtime_pb2_grpc.P4RuntimeStub(self.channel)
self.requests_stream = IterableQueue()
self.stream_msg_resp = self.client_stub.StreamChannel(iter(self.requests_stream))
self.proto_dump_file = proto_dump_file
# connections為全局變量,維護所有連接的引用
connections.append(self)
注意self.client_stub.StreamChannel(iter(self.requests_stream))這個方法,StreamChannel是一個rpc服務,按照p4runtime.proto文件中的說法,他有如下作用:
- 初始化交換機和控制器的連接
- 探測交換機的存活(心跳檢測)
- 發送/接收包
// Represents the bidirectional stream between the controller and the
// switch (initiated by the controller), and is managed for the following
// purposes:
// - connection initiation through client arbitration
// - indicating switch session liveness: the session is live when switch
// sends a positive client arbitration update to the controller, and is
// considered dead when either the stream breaks or the switch sends a
// negative update for client arbitration
// - the controller sending/receiving packets to/from the switch
// - streaming of notifications from the switch
rpc StreamChannel(stream StreamMessageRequest)
returns (stream StreamMessageResponse) {
}
到目前為止,控制平面與交換機就成功的建立連接了。
這里統一說明一點,如果有參數不正確或其他錯誤,GRPC會返回相應的錯誤類型,具體錯誤類型可以查看p4runtime文檔,以后不再贅述。
設置主節點
接下來mycontroller.py調用了每個交換機的MasterArbitrationUpdate()方法,該方法會將這個控制器設置為master控制器。之所以要這么做,個人猜測是因為官方文檔中表明p4交換機支持多個controller組成高可用集群,這時候就需要制定一個主控制器來進行控制(即便只有一個控制器也得手動設置主控制器)
進入MasterArbitrationUpdate()方法
def MasterArbitrationUpdate(self, dry_run=False, **kwargs):
# 聲明StreamMessageRequest對象
request = p4runtime_pb2.StreamMessageRequest()
request.arbitration.device_id = self.device_id
request.arbitration.election_id.high = 0
request.arbitration.election_id.low = 1
if dry_run:
print("P4Runtime MasterArbitrationUpdate: ", request)
else:
self.requests_stream.put(request)
for item in self.stream_msg_resp:
return item # just one
查看相關的protobuf文件
message StreamMessageRequest {
oneof update {
// 在該請求中,用的是MasterArbitrationUpdate
MasterArbitrationUpdate arbitration = 1;
PacketOut packet = 2;
DigestListAck digest_ack = 3;
.google.protobuf.Any other = 4;
}
}
message MasterArbitrationUpdate {
uint64 device_id = 1;
// 如果不指定role_id,則默認具有所有權限
// The role for which the primary client is being arbitrated. For use-cases
// where multiple roles are not needed, the controller can leave this unset,
// implying default role and full pipeline access.
Role role = 2;
// The stream RPC with the highest election_id is the primary. The 'primary'
// controller instance populates this with its latest election_id. Switch
// populates with the highest election ID it has received from all connected
// controllers.
Uint128 election_id = 3;
// Switch populates this with OK for the client that is the primary, and
// with an error status for all other connected clients (at every primary
// client change). The controller does not populate this field.
.google.rpc.Status status = 4;
}
對於單機使用而言,無需太關注這里的細節,只需要按照官方代碼即可。詳情可以查看p4runtime文檔的5. Client Arbitration and Controller Replication
將p4代碼和配置信息安裝到交換機中
#/home/p4/tutorials/exercises/p4runtime/mycontroller.py
# P4InfoHelper工具類會讀取並解析p4編譯器編譯得到的xxx.p4.p4info.txt
p4info_helper = p4runtime_lib.helper.P4InfoHelper(p4info_file_path)
# bmv2_json_file_path即p4編譯器編譯得到的xxx.json
# txt文件中主要記錄了p4程序的一些元信息,比如id等,json文件中則記載了具體的p4程序,比如stage,操作等,用於下發給交換機
s1.SetForwardingPipelineConfig(p4info=p4info_helper.p4info,
bmv2_json_file_path=bmv2_file_path)
以下是xxx.p4.p4info.txt文件內容,可以看到包含了要使用的交換機模型,表的id,名字,字段的id等信息
pkg_info {
arch: "v1model"
}
tables {
preamble {
id: 37375156
name: "MyIngress.ipv4_lpm"
alias: "ipv4_lpm"
}
match_fields {
id: 1
name: "hdr.ipv4.dstAddr"
bitwidth: 32
match_type: LPM
}
action_refs {
id: 28792405
}
action_refs {
id: 25652968
}
action_refs {
id: 21257015
}
size: 1024
}
tables {
preamble {
id: 38067515
name: "MyIngress.icmp_lpm"
alias: "icmp_lpm"
}
match_fields {
id: 1
name: "hdr.ipv4.dstAddr"
bitwidth: 32
match_type: LPM
}
action_refs {
id: 25652968
}
action_refs {
id: 21257015
}
size: 1024
}
actions {
preamble {
id: 21257015
name: "NoAction"
alias: "NoAction"
annotations: "@noWarn(\"unused\")"
}
}
actions {
preamble {
id: 25652968
name: "MyIngress.drop"
alias: "drop"
}
}
actions {
preamble {
id: 28792405
name: "MyIngress.ipv4_forward"
alias: "ipv4_forward"
}
params {
id: 1
name: "dstAddr"
bitwidth: 48
}
params {
id: 2
name: "port"
bitwidth: 9
}
}
type_info {
}
查看SwitchConnection類的SetForwardingPipelineConfig()方法源碼
def SetForwardingPipelineConfig(self, p4info, dry_run=False, **kwargs):
# 這里的buildDeviceConfig方法被Bmv2SwitchConnection類重寫過
device_config = self.buildDeviceConfig(**kwargs)
# 構造請求,參數詳見下方解釋
request = p4runtime_pb2.SetForwardingPipelineConfigRequest()
# 和設置主節點有關,單機情況視為固定參數
request.election_id.low = 1
request.device_id = self.device_id
config = request.config
# CopyFrom()和SerializeToString()都是Protobuf提供的函數,前者執行深拷貝,后者對該對象進行序列化
config.p4info.CopyFrom(p4info)
config.p4_device_config = device_config.SerializeToString()
# 設置action,詳見下方解釋
request.action = p4runtime_pb2.SetForwardingPipelineConfigRequest.VERIFY_AND_COMMIT
# 干跑的概念一般用不上,更多的是用於測試參數是否構造正確,大部分情況是直接發送請求,這里只需關注else分支
if dry_run:
print("P4Runtime SetForwardingPipelineConfig:", request)
else:
# 發送請求
self.client_stub.SetForwardingPipelineConfig(request)
查看protobuf文件
// Sets the P4 forwarding-pipeline config.
rpc SetForwardingPipelineConfig(SetForwardingPipelineConfigRequest)
returns (SetForwardingPipelineConfigResponse) {
}
message SetForwardingPipelineConfigRequest {
enum Action {
UNSPECIFIED = 0;
VERIFY = 1;
VERIFY_AND_SAVE = 2;
VERIFY_AND_COMMIT = 3;
COMMIT = 4;
RECONCILE_AND_COMMIT = 5;
}
uint64 device_id = 1;
uint64 role_id = 2 [deprecated=true];
// 官方的例子中並未傳遞role字段,估計是單機測試不需要傳遞
string role = 6;
Uint128 election_id = 3;
Action action = 4;
ForwardingPipelineConfig config = 5;
}
message ForwardingPipelineConfig {
// 這里存儲着要下發的p4程序,如果請求成功,p4程序就會被安裝到交換機上
config.v1.P4Info p4info = 1;
// Target-specific P4 configuration.
bytes p4_device_config = 2;
// cookie用於唯一標識一個配置
message Cookie {
uint64 cookie = 1;
}
Cookie cookie = 3;
}
// 這是一個空對象,也就是說,如果沒有返回錯誤,則SetForwardingPipelineConfig請求會返回一個空對象
message SetForwardingPipelineConfigResponse {
}
在解釋action之前,先來看ForwardingPipelineConfig中的p4_device_config字段,這個字段是一個bytes類型,在代碼中是通過config.p4_device_config = device_config.SerializeToString()來進行賦值的。這是因為交換機的config是與具體交換機有關的(Target-specific),不同型號的交換機可能會有不同的config,因此,這里不能聲明config中具體有哪些字段,必須序列化為字節數組傳遞給交換機。
接着來看action,因為交換機可以接受的配置不同,所以交換機必須要驗證自己能否接收這樣的配置信息,也要決定是暫存這份設置還是立即應用,所以衍生出了不同的action:
- UNSPECIFIED: 保留字段,不會被使用
- VERIFY: 僅對配置進行驗證,會返回驗證的結果
- VERIFY_AND_SAVE:對配置進行驗證,如果驗證通過,則保存在交換機中
- VERIFY_AND_COMMIT:對配置進行驗證,如果驗證通過,則保存並應用該配置
- COMMIT:應用交換機保存的最新配置,如果使用該類型,則交換機中必須保存有一份配置且本次請求不能攜帶配置
- RECONCILE_AND_COMMIT:和VERIFY_AND_COMMIT類似,不同點在於VERIFY_AND_COMMIT在應用是會清空目前的轉發狀態,而RECONCILE_AND_COMMIT則會保留(這里的轉發狀態可能指的是正在經過交換機處理的包?)
接下來查看與SetForwardingPipelineConfig對應的GetForwardingPipelineConfig方法,官方的工具類並未對這個請求作封裝,因此需要自行調用grpc服務。查看protobuf文件中對應的類型。
rpc GetForwardingPipelineConfig(GetForwardingPipelineConfigRequest)
returns (GetForwardingPipelineConfigResponse) {
}
message GetForwardingPipelineConfigRequest {
enum ResponseType {
//返回所有信息
ALL = 0;
//只返回用於唯一標識一個配置的cookie
COOKIE_ONLY = 1;
//返回p4程序的信息和cookie
P4INFO_AND_COOKIE = 2;
//返回交換機的配置信息和cookie
DEVICE_CONFIG_AND_COOKIE = 3;
}
uint64 device_id = 1;
ResponseType response_type = 2;
}
message GetForwardingPipelineConfigResponse {
ForwardingPipelineConfig config = 1;
}
操作表項(TableEntry)
經過前面的步驟,我們已經成功地與p4交換機建立了連接,並且下發了p4代碼,接下來,就是通過控制平面下發對應的表項來控制p4交換機了。
寫請求
對於下發表項,switch.py中有封裝好的方法,而更新和刪除表項都只能自己寫grpc請求
# mycontroller.py
# buildTableEntry方法中會讀取之前傳入的p4配置文件來獲取table_id
table_entry = p4info_helper.buildTableEntry(
table_name="MyIngress.ipv4_lpm",
match_fields={
"hdr.ipv4.dstAddr": ("10.0.2.2", 32)
},
action_name="MyIngress.ipv4_forward",
action_params={
"dstAddr": "08:00:00:00:02:22",
"port": 2
})
s1.WriteTableEntry(table_entry)
# switch.py
def WriteTableEntry(self, table_entry, dry_run=False):
request = p4runtime_pb2.WriteRequest()
request.device_id = self.device_id
request.election_id.low = 1
# 添加一條表項,如果使用官方封裝的方法,一次只能下發一條表項,事實上,底層的接口可以一次性下發多條表項
update = request.updates.add()
if table_entry.is_default_action:
update.type = p4runtime_pb2.Update.MODIFY
else:
update.type = p4runtime_pb2.Update.INSERT
update.entity.table_entry.CopyFrom(table_entry)
if dry_run:
print("P4Runtime Write:", request)
else:
self.client_stub.Write(request)
查看protobuf文件
rpc Write(WriteRequest) returns (WriteResponse) {
}
message WriteRequest {
uint64 device_id = 1;
uint64 role_id = 2 [deprecated=true];
string role = 6;
Uint128 election_id = 3;
// 這是一個數組,代表一次可以更新多個表項
repeated Update updates = 4;
//這個字段是用來處理並發更新問題的。(當一次性更新多條表項時可能會進行亂序更新)
// CONTINUE_ON_ERROR 當出現錯誤時繼續執行下一條(可能會亂序更新)
// ROLLBACK_ON_ERROR 當出現問題時回滾(可能會亂序更新)
// DATAPLANE_ATOMIC 保證更新的原子性
enum Atomicity {
CONTINUE_ON_ERROR = 0;
ROLLBACK_ON_ERROR = 1;
DATAPLANE_ATOMIC = 2;
}
Atomicity atomicity = 5;
}
message Update {
// 更新類型分為插入,修改和刪除
enum Type {
UNSPECIFIED = 0;
INSERT = 1;
MODIFY = 2;
DELETE = 3;
}
Type type = 1;
Entity entity = 2;
}
message Entity {
oneof entity {
ExternEntry extern_entry = 1;
// TableEntry封裝具體的p4表項,此處不再繼續展開
// 值得注意的是,TableEntry中有一個域是uint32 table_id,這個字段唯一標識一張表(可以在編譯后的配置文件查詢),在執行刪除 // 請求時TableEntry只需要包含這個參數
TableEntry table_entry = 2;
ActionProfileMember action_profile_member = 3;
ActionProfileGroup action_profile_group = 4;
MeterEntry meter_entry = 5;
DirectMeterEntry direct_meter_entry = 6;
CounterEntry counter_entry = 7;
DirectCounterEntry direct_counter_entry = 8;
PacketReplicationEngineEntry packet_replication_engine_entry = 9;
ValueSetEntry value_set_entry = 10;
RegisterEntry register_entry = 11;
DigestEntry digest_entry = 12;
}
}
message WriteResponse {
}
Entity字段中還可以是其他類別的請求,但是在寫請求中比較少用,具體可以查看官方文檔9. P4 Entity Messages
讀請求
對於讀請求,switch.py中也封裝了對應的方法,可以看到,只需要傳入table_id即可進行讀取
def ReadTableEntries(self, table_id=None, dry_run=False):
request = p4runtime_pb2.ReadRequest()
request.device_id = self.device_id
entity = request.entities.add()
# 這里可以看到,底層的接口也是可以一次性讀取多條表項的,但是封裝后的方法一次性只能讀一條
table_entry = entity.table_entry
if table_id is not None:
table_entry.table_id = table_id
else:
table_entry.table_id = 0
if dry_run:
print("P4Runtime Read:", request)
else:
for response in self.client_stub.Read(request):
yield response
同樣的,查看protobuf文件
message ReadRequest {
uint64 device_id = 1;
string role = 3;
// 只需要傳入對應的key即可,比如對於TableEntity,key就是table_id
// 這里也可以傳入
repeated Entity entities = 2;
}
message ReadResponse {
repeated Entity entities = 1;
}
解釋一下為什么要使用yield返回,下面是來自官方文檔的一段話,大意是說為了防止一次返回的數據量過大,一次批量讀請求可能會被拆分成多個ReadResponse通過流來返回,因此,在上面的python代碼中,使用yield來接收響應。
There is no requirement that each request in the batch will correspond to one ReadResponse message in the stream. The stream-based design for response message is to avoid memory pressure on the P4Runtime server when the Read results in a very large number of entities to be returned. The P4Runtime server is free to break them apart across multiple response messages as it sees fit.
其他
- StreamChannel請求。在建立連接時我們已經使用過這個請求。事實上,這個請求還具有一些其他功能,比如說下面展示的PacketOut和PacketIn,可以在數據平面和控制平面之間交換自定義的數據包,再比如IdleTimeoutNotification,當交換機中的表過期時,會得到這樣一個通知。
message StreamMessageRequest {
oneof update {
MasterArbitrationUpdate arbitration = 1;
PacketOut packet = 2;
DigestListAck digest_ack = 3;
.google.protobuf.Any other = 4;
}
}
// Packet sent from the controller to the switch.
message PacketOut {
bytes payload = 1;
// This will be based on P4 header annotated as
// @controller_header("packet_out").
// At most one P4 header can have this annotation.
repeated PacketMetadata metadata = 2;
}
message StreamMessageResponse {
oneof update {
MasterArbitrationUpdate arbitration = 1;
PacketIn packet = 2;
DigestList digest = 3;
IdleTimeoutNotification idle_timeout_notification = 4;
.google.protobuf.Any other = 5;
StreamError error = 6;
}
}
message PacketIn {
bytes payload = 1;
repeated PacketMetadata metadata = 2;
}
- 可以通過Capabilities查看交換機支持的P4Runtime版本
rpc Capabilities(CapabilitiesRequest) returns (CapabilitiesResponse) {
}
message CapabilitiesRequest {
}
message CapabilitiesResponse {
// The full semantic version string (e.g. "1.1.0-rc.1") corresponding to the
// version of the P4Runtime API currently implemented by the server.
string p4runtime_api_version = 1;
}