Golang的序列化-RPC和GRPC
作者:尹正傑
版權聲明:原創作品,謝絕轉載!否則將追究法律責任。
一.RPC概述
1>.什么是RPC
RPC(Remote Procedure Call Protocol),是遠程過程調用的縮寫,通俗的說就是調用遠處(一般指不同的主機)的一個函數。
2>.為什么微服務需要RPC
我們使用微服務化的一個好處就是,不限定服務的提供方使用什么技術選型,能夠實現公司跨團隊的技術解耦。
這樣的話,如果沒有統一的服務框架,RPC框架,各個團隊的服務提供方就需要各自實現一套序列化、反序列化、網絡框架、連接池、收發線程、超時處理、狀態機等“業務之外”的重復技術勞動,造成整體的低效。
所以,統一RPC框架把上述“業務之外”的技術勞動統一處理,是服務化首要解決的問題。
二.RPC入門案例
在互聯網時代,RPC已經和IPC(進程間通信)一樣成為一個不可或缺的基礎構件。因此Go語言的標准庫也提供了一個簡單的RPC實現,我們將以此為入口學習RPC的常見用法。
1>.RPC的服務端
package main import ( "fmt" "net" "net/rpc" ) type Zabbix struct{} /** 定義成員方法: 第一個參數是傳入參數. 第二個參數必須是傳出參數(引用類型). Go語言的RPC規則: 方法只能有兩個可序列化的參數,其中第二個參數是指針類型,並且返回一個error類型,同時必須是公開的方法。 溫馨提示: 當調用遠程函數之后,如果返回的錯誤不為空,那么傳出參數為空。 */ func (Zabbix) MonitorHosts(name string, response *string) error { *response = name + "主機監控中..." return nil } func main() { /** 進程間交互有很多種,比如基於信號,共享內存,管道,套接字等方式. 1>.rpc基於是TCP的,因此我們需要先開啟監聽端口 */ listener, err := net.Listen("tcp", ":8888") if err != nil { fmt.Println("開啟監聽器失敗,錯誤原因: ", err) return } defer listener.Close() fmt.Println("服務啟動成功...") /** 2>.接受鏈接,即接受傳輸的數據 */ conn, err := listener.Accept() if err != nil { fmt.Println("建立鏈接失敗...") return } defer conn.Close() fmt.Println("建立連接: ", conn.RemoteAddr()) /** 3>.注冊rpc服務,維護一個hash表,key值是服務名稱,value值是服務的地址。服務器有很多函數,希望被調用的函數需要注冊到RPC上。 以下是RegisterName的函數簽名: func RegisterName(name string, rcvr interface{}) error 以下是對函數簽名相關參數的說明: name: 指的是服務名稱。 rcvr: 指的是結構體對象(這個結構體必須含有成員方法)。 */ rpc.RegisterName("zabbix", new(Zabbix)) /** 4>.鏈接的處理交給RCP框架處理,即rpc調用,並返回執行后的數據,其工作原理大致分為以下3個步驟: (1)read,獲取服務名稱和方法名,獲取請求數據; (2)調用對應服務里面的方法,獲取傳出數據; (3)write,把數據返回給client; */ rpc.ServeConn(conn) }
2>.RPC的客戶端
package main import ( "fmt" "net" "net/rpc" ) func main() { /** 1>.首選是通過rpc.Dial撥號RPC服務 溫馨提示: 默認數據傳輸過程中編碼方式是gob,可以選擇json */ conn, err := net.Dial("tcp", "127.0.0.1:8888") if err != nil { fmt.Println("鏈接服務器失敗") return } defer conn.Close() /** 2>.把conn和rpc進行綁定 */ client := rpc.NewClient(conn) /** 3>.然后通過client.Call調用具體的RPC方法。其中Call函數的簽名如下所示: func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error 以下是對函數簽名的相關參數進行補充說明: serviceMethod: 用點號(.)鏈接的RPC服務名字和方法名字 args: 指定輸入參數 reply: 指定輸出參數接收的 */ var data string err = client.Call("zabbix.MonitorHosts", "Nginx", &data) if err != nil { fmt.Println("遠程接口調用失敗,錯誤原因: ", err) return } fmt.Println(data) }
三.跨語言的RPC
標准庫的RPC默認采用Go語言特有的gob編碼,因此從其它語言調用Go語言實現的RPC服務將比較困難。跨語言是互聯網時代RPC的一個首要條件,這里我們再來實現一個跨語言的RPC。
得益於RPC的框架設計,Go語言的RPC其實也是很容易實現跨語言支持的。這里我們將嘗試通過官方自帶的net/rpc/jsonrpc擴展實現一個跨語言RPC。
1>.RPC的服務端
package main import ( "fmt" "net" "net/rpc" "net/rpc/jsonrpc" ) type OpenFalcon struct{} /** 定義成員方法: 第一個參數是傳入參數. 第二個參數必須是傳出參數(引用類型). Go語言的RPC規則: 方法只能有兩個可序列化的參數,其中第二個參數是指針類型,並且返回一個error類型,同時必須是公開的方法。 溫馨提示: 當調用遠程函數之后,如果返回的錯誤不為空,那么傳出參數為空。 */ func (OpenFalcon) MonitorHosts(name string, response *string) error { *response = name + "主機監控中..." return nil } func main() { /** 進程間交互有很多種,比如基於信號,共享內存,管道,套接字等方式. 1>.rpc基於是TCP的,因此我們需要先開啟監聽端口 */ listener, err := net.Listen("tcp", ":8888") if err != nil { fmt.Println("開啟監聽器失敗,錯誤原因: ", err) return } defer listener.Close() fmt.Println("服務啟動成功...") /** 2>.接受鏈接,即接受傳輸的數據 */ conn, err := listener.Accept() if err != nil { fmt.Println("建立鏈接失敗...") return } defer conn.Close() fmt.Println("建立連接: ", conn.RemoteAddr()) /** 3>.注冊rpc服務,維護一個hash表,key值是服務名稱,value值是服務的地址。服務器有很多函數,希望被調用的函數需要注冊到RPC上。 以下是RegisterName的函數簽名: func RegisterName(name string, rcvr interface{}) error 以下是對函數簽名相關參數的說明: name: 指的是服務名稱。 rcvr: 指的是結構體對象(這個結構體必須含有成員方法)。 */ rpc.RegisterName("open_falcon", new(OpenFalcon)) /** 4>.鏈接的處理交給RCP框架處理,即rpc調用,並返回執行后的數據,其工作原理大致分為以下3個步驟: (1)read,獲取服務名稱和方法名,獲取請求數據; (2)調用對應服務里面的方法,獲取傳出數據; (3)write,把數據返回給client; */ jsonrpc.ServeConn(conn) }
2>.RPC的客戶端
package main import ( "fmt" "net/rpc/jsonrpc" ) func main() { /** 首選是通過rpc.Dial撥號RPC服務 溫馨提示: 默認數據傳輸過程中編碼方式是gob,可以選擇json,需要導入"net/rpc/jsonrpc"包。 */ conn, err := jsonrpc.Dial("tcp", "127.0.0.1:8888") if err != nil { fmt.Println("鏈接服務器失敗") return } defer conn.Close() var data string /** 其中Call函數的簽名如下所示: func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error 以下是對函數簽名的相關參數進行補充說明: serviceMethod: 用點號(.)鏈接的RPC服務名字和方法名字 args: 指定輸入參數 reply: 指定輸出參數接收的 */ err = conn.Call("open_falcon.MonitorHosts", "Httpd", &data) if err != nil { fmt.Println("遠程接口調用失敗,錯誤原因: ", err) return } fmt.Println(data) }
四.ProtoBuf
博主推薦閱讀: https://www.cnblogs.com/yinzhengjie2020/p/12741943.html
五.GRPC框架
1>.什么是GRPC
GRPC是Google公司基於Protobuf開發的跨語言的開源RPC框架。 GRPC是一個高性能、開源和通用的 RPC 框架,面向移動和 HTTP/2 設計。目前提供C,Java和Go語言版本,分別是:grpc, grpc-java, grpc-go. 其中C版本支持 C, C++, Node.js, Python, Ruby, Objective-C, PHP 和 C# 支持. GRPC基於HTTP/2標准設計,帶來諸如雙向流、流控、頭部壓縮、單 TCP 連接上的多復用請求等特。這些特性使得其在移動設備上表現更好,更省電和節省空間占用。 博主推薦閱讀: GRPC 官方文檔中文版: http://doc.oschina.net/grpc?t=60133 GRPC官網: https://grpc.io
2>.安裝grpc環境
C:\Users\yinzhengjie>go get -u -v google.golang.org/grpc
3>.基於protobuf編寫Grpc服務

////protobuf默認支持的版本是2.0,現在一般使用3.0版本,所以需要手動指定版本號 //c語言的編程風格 syntax = "proto3"; //指定包名 package pb; //定義傳輸數據的格式 message People{ string name = 1; //1表示表示字段是1 數據庫中表的主鍵id等於1,主鍵不能重復,標示位數據不能重復 //標示位不能使用19000 -19999 系統預留位 int32 age = 2; //結構體嵌套 student s = 3; //使用數組/切片 repeated string phone = 4; //oneof的作用是多選一 oneof data{ int32 score = 5; string city = 6; bool good = 7; } } //oneof c語言中的聯合體 message student{ string name = 1; int32 age = 6; } //通過定義服務,然后借助框架,幫助實現部分的rpc代碼 service Hello{ rpc World(student)returns(student); }

////protobuf默認支持的版本是2.0,現在一般使用3.0版本,所以需要手動指定版本號 //c語言的編程風格 // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.21.0 // protoc v3.11.4 // source: grpc.proto //指定包名 package pb import ( context "context" proto "github.com/golang/protobuf/proto" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" ) const ( // Verify that this generated code is sufficiently up-to-date. _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) // Verify that runtime/protoimpl is sufficiently up-to-date. _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) // This is a compile-time assertion that a sufficiently up-to-date version // of the legacy proto package is being used. const _ = proto.ProtoPackageIsVersion4 //定義傳輸數據的格式 type People struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` //1表示表示字段是1 數據庫中表的主鍵id等於1,主鍵不能重復,標示位數據不能重復 //標示位不能使用19000 -19999 系統預留位 Age int32 `protobuf:"varint,2,opt,name=age,proto3" json:"age,omitempty"` //結構體嵌套 S *Student `protobuf:"bytes,3,opt,name=s,proto3" json:"s,omitempty"` //使用數組/切片 Phone []string `protobuf:"bytes,4,rep,name=phone,proto3" json:"phone,omitempty"` //oneof的作用是多選一 // // Types that are assignable to Data: // *People_Score // *People_City // *People_Good Data isPeople_Data `protobuf_oneof:"data"` } func (x *People) Reset() { *x = People{} if protoimpl.UnsafeEnabled { mi := &file_grpc_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *People) String() string { return protoimpl.X.MessageStringOf(x) } func (*People) ProtoMessage() {} func (x *People) ProtoReflect() protoreflect.Message { mi := &file_grpc_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use People.ProtoReflect.Descriptor instead. func (*People) Descriptor() ([]byte, []int) { return file_grpc_proto_rawDescGZIP(), []int{0} } func (x *People) GetName() string { if x != nil { return x.Name } return "" } func (x *People) GetAge() int32 { if x != nil { return x.Age } return 0 } func (x *People) GetS() *Student { if x != nil { return x.S } return nil } func (x *People) GetPhone() []string { if x != nil { return x.Phone } return nil } func (m *People) GetData() isPeople_Data { if m != nil { return m.Data } return nil } func (x *People) GetScore() int32 { if x, ok := x.GetData().(*People_Score); ok { return x.Score } return 0 } func (x *People) GetCity() string { if x, ok := x.GetData().(*People_City); ok { return x.City } return "" } func (x *People) GetGood() bool { if x, ok := x.GetData().(*People_Good); ok { return x.Good } return false } type isPeople_Data interface { isPeople_Data() } type People_Score struct { Score int32 `protobuf:"varint,5,opt,name=score,proto3,oneof"` } type People_City struct { City string `protobuf:"bytes,6,opt,name=city,proto3,oneof"` } type People_Good struct { Good bool `protobuf:"varint,7,opt,name=good,proto3,oneof"` } func (*People_Score) isPeople_Data() {} func (*People_City) isPeople_Data() {} func (*People_Good) isPeople_Data() {} type Student struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` Age int32 `protobuf:"varint,6,opt,name=age,proto3" json:"age,omitempty"` } func (x *Student) Reset() { *x = Student{} if protoimpl.UnsafeEnabled { mi := &file_grpc_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *Student) String() string { return protoimpl.X.MessageStringOf(x) } func (*Student) ProtoMessage() {} func (x *Student) ProtoReflect() protoreflect.Message { mi := &file_grpc_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use Student.ProtoReflect.Descriptor instead. func (*Student) Descriptor() ([]byte, []int) { return file_grpc_proto_rawDescGZIP(), []int{1} } func (x *Student) GetName() string { if x != nil { return x.Name } return "" } func (x *Student) GetAge() int32 { if x != nil { return x.Age } return 0 } var File_grpc_proto protoreflect.FileDescriptor var file_grpc_proto_rawDesc = []byte{ 0x0a, 0x0a, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x02, 0x70, 0x62, 0x22, 0xab, 0x01, 0x0a, 0x06, 0x50, 0x65, 0x6f, 0x70, 0x6c, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x61, 0x67, 0x65, 0x12, 0x19, 0x0a, 0x01, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0b, 0x2e, 0x70, 0x62, 0x2e, 0x73, 0x74, 0x75, 0x64, 0x65, 0x6e, 0x74, 0x52, 0x01, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x12, 0x16, 0x0a, 0x05, 0x73, 0x63, 0x6f, 0x72, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x48, 0x00, 0x52, 0x05, 0x73, 0x63, 0x6f, 0x72, 0x65, 0x12, 0x14, 0x0a, 0x04, 0x63, 0x69, 0x74, 0x79, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x04, 0x63, 0x69, 0x74, 0x79, 0x12, 0x14, 0x0a, 0x04, 0x67, 0x6f, 0x6f, 0x64, 0x18, 0x07, 0x20, 0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x04, 0x67, 0x6f, 0x6f, 0x64, 0x42, 0x06, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x2f, 0x0a, 0x07, 0x73, 0x74, 0x75, 0x64, 0x65, 0x6e, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x61, 0x67, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x61, 0x67, 0x65, 0x32, 0x2a, 0x0a, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x12, 0x21, 0x0a, 0x05, 0x57, 0x6f, 0x72, 0x6c, 0x64, 0x12, 0x0b, 0x2e, 0x70, 0x62, 0x2e, 0x73, 0x74, 0x75, 0x64, 0x65, 0x6e, 0x74, 0x1a, 0x0b, 0x2e, 0x70, 0x62, 0x2e, 0x73, 0x74, 0x75, 0x64, 0x65, 0x6e, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( file_grpc_proto_rawDescOnce sync.Once file_grpc_proto_rawDescData = file_grpc_proto_rawDesc ) func file_grpc_proto_rawDescGZIP() []byte { file_grpc_proto_rawDescOnce.Do(func() { file_grpc_proto_rawDescData = protoimpl.X.CompressGZIP(file_grpc_proto_rawDescData) }) return file_grpc_proto_rawDescData } var file_grpc_proto_msgTypes = make([]protoimpl.MessageInfo, 2) var file_grpc_proto_goTypes = []interface{}{ (*People)(nil), // 0: pb.People (*Student)(nil), // 1: pb.student } var file_grpc_proto_depIdxs = []int32{ 1, // 0: pb.People.s:type_name -> pb.student 1, // 1: pb.Hello.World:input_type -> pb.student 1, // 2: pb.Hello.World:output_type -> pb.student 2, // [2:3] is the sub-list for method output_type 1, // [1:2] is the sub-list for method input_type 1, // [1:1] is the sub-list for extension type_name 1, // [1:1] is the sub-list for extension extendee 0, // [0:1] is the sub-list for field type_name } func init() { file_grpc_proto_init() } func file_grpc_proto_init() { if File_grpc_proto != nil { return } if !protoimpl.UnsafeEnabled { file_grpc_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*People); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } file_grpc_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Student); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } } file_grpc_proto_msgTypes[0].OneofWrappers = []interface{}{ (*People_Score)(nil), (*People_City)(nil), (*People_Good)(nil), } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_grpc_proto_rawDesc, NumEnums: 0, NumMessages: 2, NumExtensions: 0, NumServices: 1, }, GoTypes: file_grpc_proto_goTypes, DependencyIndexes: file_grpc_proto_depIdxs, MessageInfos: file_grpc_proto_msgTypes, }.Build() File_grpc_proto = out.File file_grpc_proto_rawDesc = nil file_grpc_proto_goTypes = nil file_grpc_proto_depIdxs = nil } // Reference imports to suppress errors if they are not otherwise used. var _ context.Context var _ grpc.ClientConnInterface // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. const _ = grpc.SupportPackageIsVersion6 // HelloClient is the client API for Hello service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type HelloClient interface { World(ctx context.Context, in *Student, opts ...grpc.CallOption) (*Student, error) } type helloClient struct { cc grpc.ClientConnInterface } func NewHelloClient(cc grpc.ClientConnInterface) HelloClient { return &helloClient{cc} } func (c *helloClient) World(ctx context.Context, in *Student, opts ...grpc.CallOption) (*Student, error) { out := new(Student) err := c.cc.Invoke(ctx, "/pb.Hello/World", in, out, opts...) if err != nil { return nil, err } return out, nil } // HelloServer is the server API for Hello service. type HelloServer interface { World(context.Context, *Student) (*Student, error) } // UnimplementedHelloServer can be embedded to have forward compatible implementations. type UnimplementedHelloServer struct { } func (*UnimplementedHelloServer) World(context.Context, *Student) (*Student, error) { return nil, status.Errorf(codes.Unimplemented, "method World not implemented") } func RegisterHelloServer(s *grpc.Server, srv HelloServer) { s.RegisterService(&_Hello_serviceDesc, srv) } func _Hello_World_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(Student) if err := dec(in); err != nil { return nil, err } if interceptor == nil { return srv.(HelloServer).World(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: "/pb.Hello/World", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(HelloServer).World(ctx, req.(*Student)) } return interceptor(ctx, in, info, handler) } var _Hello_serviceDesc = grpc.ServiceDesc{ ServiceName: "pb.Hello", HandlerType: (*HelloServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "World", Handler: _Hello_World_Handler, }, }, Streams: []grpc.StreamDesc{}, Metadata: "grpc.proto", }
4>.grpcServer.go
package main import ( "context" "google.golang.org/grpc" "net" "yinzhengjie/pb" ) //定義一個結構體,繼承自HelloServer接口(該接口是我們通過protobuf代碼生成的) type HelloService struct {} func (HelloService)World(ctx context.Context, req*pb.Student) (*pb.Student, error){ req.Name += " nihao" req.Age += 10 return req,nil } func main() { //先獲取grpc對象 grpcServer := grpc.NewServer() //注冊服務 pb.RegisterHelloServer(grpcServer,new(HelloService)) //開啟監聽 lis,err := net.Listen("tcp",":8888") if err != nil { return } defer lis.Close() //先獲取grpc服務端對象 grpcServer.Serve(lis) }
5>.grpcClient.go
package main import ( "google.golang.org/grpc" "context" "fmt" "yinzhengjie/pb" ) func main() { //和grpc服務端建立連接 grpcCnn ,err := grpc.Dial("127.0.0.1:8888",grpc.WithInsecure()) if err != nil { fmt.Println(err) return } defer grpcCnn.Close() //得到一個客戶端對象 client :=pb.NewHelloClient(grpcCnn) var s pb.Student s.Name = "Jason Yin" s.Age = 20 resp,err := client.World(context.TODO(),&s) fmt.Println(resp,err) }