gRPC介紹
gRPC是Google公司基於Protobuf開發的跨語言的開源RPC框架。gRPC基於HTTP/2協議設計,可以基於一個HTTP/2鏈接提供多個服務,對於移動設備更加友好。本節將講述gRPC的簡單用法。
gRPC的技術棧:
最底層為TCP或Unix Socket協議,在此之上是HTTP/2協議的實現,然后在HTTP/2協議之上又構建了針對Go語言的gRPC核心庫。應用程序通過gRPC插件生產的Stub代碼和gRPC核心庫通信,也可以直接和gRPC核心庫通信。
如果從Protobuf的角度看,gRPC只不過是一個針對service接口生成代碼的生成器。我們在本章的第二節(《Go 語言高級編程》)中手工實現了一個簡單的Protobuf代碼生成器插件,只不過當時生成的代碼是適配標准庫的RPC框架的。現在我們將學習gRPC的用法。
創建在項目的proto/hello.proto文件,定義HelloService接口:
syntax = "proto3"; package proto; // 服務傳遞的參數 message String { string value = 1; } // 區別於RPC服務,gRPC可以在proto文件中定義服務方法接口,從而生成給客戶端和服務端兩個用的接口 service HelloService { rpc Hello (String) returns (String); }
進入proto目錄下使用如下命令生成相應的接口go文件:protoc --go_out=plugins=grpc:. hello.proto, 這條命令會調用protoc-gen-go 內置的gRPC插件生成相應的文件。我們可以簡單分析哈生成的go文件中有什么?
.... // 用於數據傳輸的結構體 type String struct { Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } // 獲取參數的方法 func (m *String) GetValue() string { if m != nil { return m.Value } return "" } // 注冊類型 func init() { proto.RegisterType((*String)(nil), "proto.String") } // 上下文 // Reference imports to suppress errors if they are not otherwise used. var _ context.Context var _ grpc.ClientConn ... // 客戶端接口約束 type HelloServiceClient interface { Hello(ctx context.Context, in *String, opts ...grpc.CallOption) (*String, error) } type helloServiceClient struct { cc *grpc.ClientConn } func NewHelloServiceClient(cc *grpc.ClientConn) HelloServiceClient { return &helloServiceClient{cc} } func (c *helloServiceClient) Hello(ctx context.Context, in *String, opts ...grpc.CallOption) (*String, error) { out := new(String) err := c.cc.Invoke(ctx, "/proto.HelloService/Hello", in, out, opts...) if err != nil { return nil, err } return out, nil } ... // HelloServiceServer is the server API for HelloService service.服務端 type HelloServiceServer interface { Hello(context.Context, *String) (*String, error) } // UnimplementedHelloServiceServer can be embedded to have forward compatible implementations. type UnimplementedHelloServiceServer struct { } func (*UnimplementedHelloServiceServer) Hello(ctx context.Context, req *String) (*String, error) { return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented") } // 注冊服務端函數 func RegisterHelloServiceServer(s *grpc.Server, srv HelloServiceServer) { s.RegisterService(&_HelloService_serviceDesc, srv) }
接着我們可以編寫服務端代碼:service.go
package main import ( "context" "fmt" "gRPC_demo/proto" "google.golang.org/grpc" // 要么 go get google.golang.org/grpc, 要么go mod tidy "log" "net" ) type HelloServiceImp struct { } func (p *HelloServiceImp) Hello(ctx context.Context, arg *proto.String) (*proto.String, error) { reply := &proto.String{Value: "hello: " + arg.GetValue()} return reply, nil } /* 和啟動標准RPC服務流程類似 首先是通過grpc.NewServer()構造一個gRPC服務對象,然后通過gRPC插件生成的RegisterHelloServiceServer函數注冊我們實現的HelloServiceImpl服務。 然后通過grpcServer.Serve(lis)在一個監聽端口上提供gRPC服務。*/ func main() { // 創建服務初始化 grpcServer := grpc.NewServer() // 調用接口文件生成的服務端要實現的函數,完成服務注冊 proto.RegisterHelloServiceServer(grpcServer, new(HelloServiceImp)) fmt.Println("service starting....") lis, err := net.Listen("tcp",":1234") if err != nil { log.Fatal(err) } grpcServer.Serve(lis) }
gRPC通過context.Context參數,為每個方法調用提供了上下文支持。客戶端在調用方法的時候,可以通過可選的grpc.CallOption類型的參數提供額外的上下文信息。緊接着就是客戶端代碼:client.go
package main import ( "context" "fmt" "gRPC_demo/proto" "google.golang.org/grpc" "log" ) func main() { conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure()) if err != nil { log.Fatal(err) } defer conn.Close() client := proto.NewHelloServiceClient(conn) reply, err := client.Hello(context.Background(), &proto.String{Value:"Wang"}) if err != nil { log.Fatal(err) } fmt.Println(reply.GetValue()) }
其中grpc.Dial負責和gRPC服務建立鏈接,然后NewHelloServiceClient函數基於已經建立的鏈接構造HelloServiceClient對象。返回的client其實是一個HelloServiceClient接口對象,通過接口定義的方法就可以調用服務端對應的gRPC服務提供的方法。
gRPC和標准庫的RPC框架有一個區別,gRPC生成的接口並不支持異步調用。不過我們可以在多個Goroutine之間安全地共享gRPC底層的HTTP/2鏈接,因此可以通過在另一個Goroutine阻塞調用的方式模擬異步調用。
gRPC流
RPC是遠程函數調用,因此每次調用的函數參數和返回值不能太大,否則將嚴重影響每次調用的響應時間。因此傳統的RPC方法調用對於上傳和下載較大數據量場景並不適合。同時傳統RPC模式也不適用於對時間不確定的訂閱和發布模式。為此,gRPC框架針對服務器端和客戶端分別提供了流特性。
服務端或客戶端的單向流是雙向流的特例,我們在HelloService增加一個支持雙向流的Channel方法,hello_str.proto:
syntax = "proto3"; package proto; // 服務傳遞的參數 message String { string value = 1; } // 區別於RPC服務,gRPC可以在proto文件中定義服務方法接口,從而生成給客戶端和服務端兩個用的接口 // 關鍵字stream指定啟用流特性,參數部分是接收客戶端參數的流,返回值是返回給客戶端的流。 service HelloService { rpc Hello (String) returns (String); rpc Channel (stream String) returns (stream String); }
完成好proto文件后,我們使用如下命令生成go文件:protoc --go_out=plugins=grpc:. hello_str.proto, 緊接着我們分析生成的go文件:
// 服務傳遞的參數 type String struct { Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } ... // 為客戶端接口添加了Channel方法 type HelloServiceClient interface { Hello(ctx context.Context, in *String, opts ...grpc.CallOption) (*String, error) Channel(ctx context.Context, opts ...grpc.CallOption) (HelloService_ChannelClient, error) } type helloServiceClient struct { cc *grpc.ClientConn } // 返回一個grpc連接對象 func NewHelloServiceClient(cc *grpc.ClientConn) HelloServiceClient { return &helloServiceClient{cc} } // 客戶端實現hello func (c *helloServiceClient) Hello(ctx context.Context, in *String, opts ...grpc.CallOption) (*String, error) { out := new(String) err := c.cc.Invoke(ctx, "/proto.HelloService/Hello", in, out, opts...) if err != nil { return nil, err } return out, nil } // 客戶端實現channel func (c *helloServiceClient) Channel(ctx context.Context, opts ...grpc.CallOption) (HelloService_ChannelClient, error) { stream, err := c.cc.NewStream(ctx, &_HelloService_serviceDesc.Streams[0], "/proto.HelloService/Channel", opts...) if err != nil { return nil, err } x := &helloServiceChannelClient{stream} return x, nil } // 新增的結構體,用以標識流 type HelloService_ChannelClient interface { Send(*String) error Recv() (*String, error) grpc.ClientStream } type helloServiceChannelClient struct { grpc.ClientStream } // 新結構體的收發函數實現 func (x *helloServiceChannelClient) Send(m *String) error { return x.ClientStream.SendMsg(m) } func (x *helloServiceChannelClient) Recv() (*String, error) { m := new(String) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err } return m, nil } // HelloServiceServer is the server API for HelloService service. type HelloServiceServer interface { Hello(context.Context, *String) (*String, error) // 傳遞的HelloService_ChannelServer是一個新的結構體,可以用於和客戶端和向通信,而客戶端調用Channel方法后返回的HelloService_ChannelServer用於和服務端通信 Channel(HelloService_ChannelServer) error } // UnimplementedHelloServiceServer can be embedded to have forward compatible implementations. type UnimplementedHelloServiceServer struct { } func (*UnimplementedHelloServiceServer) Hello(ctx context.Context, req *String) (*String, error) { return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented") } func (*UnimplementedHelloServiceServer) Channel(srv HelloService_ChannelServer) error { return status.Errorf(codes.Unimplemented, "method Channel not implemented") } func RegisterHelloServiceServer(s *grpc.Server, srv HelloServiceServer) { s.RegisterService(&_HelloService_serviceDesc, srv) }
下面我們實現服務端代碼:str_service.go文件:
package main import ( "context" "fmt" "gRPC_demo/proto" "google.golang.org/grpc" // 要么 go get google.golang.org/grpc, 要么go mod tidy "io" "log" "net" ) type HelloServiceImps struct { } func (p *HelloServiceImps) Hello(ctx context.Context, arg *proto.StringX) (*proto.StringX, error) { reply := &proto.StringX{Value: "hello: " + arg.GetValue()} return reply, nil } // 服務端使用Channel對來進行收發消息,如果遇到io.EOF表示客戶端發送完畢 func (p *HelloServiceImps) Channel(stream proto.HelloServiceStr_ChannelServer) error { for { args, err := stream.Recv() if err != nil { if err == io.EOF { return nil } return err } reply := &proto.StringX{Value: "hello:" + args.GetValue()} err = stream.Send(reply) if err != nil { return err } } } /* 和啟動標准RPC服務流程類似 首先是通過grpc.NewServer()構造一個gRPC服務對象,然后通過gRPC插件生成的RegisterHelloServiceServer函數注冊我們實現的HelloServiceImpl服務。 然后通過grpcServer.Serve(lis)在一個監聽端口上提供gRPC服務。*/ func main() { // 創建服務初始化 grpcServer := grpc.NewServer() // 調用接口文件生成的服務端要實現的函數,完成服務注冊 proto.RegisterHelloServiceStrServer(grpcServer, new(HelloServiceImps)) fmt.Println("service starting....") lis, err := net.Listen("tcp",":1234") if err != nil { log.Fatal(err) } grpcServer.Serve(lis) }
客戶端代碼:str_client.go文件:
package main import ( "context" "fmt" "gRPC_demo/proto" "google.golang.org/grpc" "io" "log" "time" ) func main() { conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure()) if err != nil { log.Fatal(err) } defer conn.Close() client := proto.NewHelloServiceStrClient(conn) stream, err := client.Channel(context.Background()) if err != nil { log.Fatal(err) } go func() { for { if err := stream.Send(&proto.StringX{Value:"hi li"}); err != nil { log.Fatal(err) } time.Sleep(time.Second) } }() for { reply, err := stream.Recv() if err != nil { if err == io.EOF { break } log.Fatal(err) } fmt.Println(reply.GetValue()) } }