流模式入門(上)、場景:批量查詢用戶積分
為何要用流模式
前面的例子,我們僅僅是傳輸比較小的數據 基本模式是客戶端請求----服務端響應
如果是傳輸較大數據呢?會帶來
1、數據包過大導致壓力陡增
2、需要等待客戶端包全部發送,才能處理以及響應
1,普通查詢積分方式
服務端:

syntax="proto3"; package services; import "google/protobuf/timestamp.proto"; message ProdModel{ //商品模型 int32 prod_id=1; string prod_name=2; float prod_price=3; } message OrderMain{ //主訂單模型 int32 order_id=1;//訂單ID,數字自增 string order_no=2; //訂單號 int32 user_id=3; //購買者ID float order_money=4;//商品金額 google.protobuf.Timestamp order_time=5; //下單時間 repeated OrderDetail order_details=6; } //子訂單模型 message OrderDetail{ int32 detail_id=1; string order_no=2; int32 prod_id=3; float prod_price=4; int32 prod_num=5; } //用戶模型 message UserInfo{ int32 user_id=1; int32 user_score=2; }

syntax="proto3"; package services; import "Models.proto"; message UserScoreRequest{ repeated UserInfo users=1; } message UserScoreResponse{ repeated UserInfo users=1; } service UserService{ rpc GetUserScore(UserScoreRequest) returns (UserScoreResponse); }
執行腳本 生成pd.go文件
cd pbfiles && protoc --go_out=plugins=grpc:../services Prod.proto protoc --go_out=plugins=grpc:../services Orders.proto protoc --go_out=plugins=grpc:../services Users.proto protoc --go_out=plugins=grpc:../services --validate_out=lang=go:../services Models.proto protoc --grpc-gateway_out=logtostderr=true:../services Prod.proto protoc --grpc-gateway_out=logtostderr=true:../services Orders.proto protoc --grpc-gateway_out=logtostderr=true:../services Users.proto cd ..

package services import "context" type UserService struct { } func(*UserService) GetUserScore(ctx context.Context, in *UserScoreRequest) (*UserScoreResponse, error){ var score int32=101 users:=make([]*UserInfo,0) for _,user:=range in.Users{ user.UserScore=score score++ users=append(users,user) } return &UserScoreResponse{Users:users},nil }

package main import ( "google.golang.org/grpc" "grpcpro/services" "net" ) func main() { rpcServer:=grpc.NewServer() services.RegisterProdServiceServer(rpcServer,new(services.ProdService))//商品服務 services.RegisterOrderSerivceServer(rpcServer,new(services.OrdersService))//訂單服務 services.RegisterUserServiceServer(rpcServer,new(services.UserService)) lis,_:=net.Listen("tcp",":8081") rpcServer.Serve(lis) }
go build server.go
客戶端:
拷貝服務端生成的pd.go文件到客戶端
func main(){ conn,err:=grpc.Dial(":8081",grpc.WithInsecure()) if err!=nil{ log.Fatal(err) } defer conn.Close() ctx:=context.Background() userClient:=services.NewUserServiceClient(conn) var i int32 req:=services.UserScoreRequest{} req.Users=make([]*services.UserInfo,0) for i=1;i<20;i++{ req.Users=append(req.Users,&services.UserInfo{UserId:i}) } res,_ := userClient.GetUserScore(ctx,&req) fmt.Println(res.Users) }
go build maiin.go
打印結果:
[user_id:1 user_score:101 user_id:2 user_score:102 user_id:3 user_score:103 user_id:4 user_score:104 user_id:5 user_score:105 user_id:6 user_score:106 user_id:7 user_score:107 user_id:8 user_score:108 user_id:9 user_score:109 user_id:10 user_score:110 user_id:11 user_score:111 user_id:12 user_score:112 user_id:13 user_score:113 user_id:14 user_score:114 user_id:15 user_score:115 user_id:16 user_score:116 user_id:17 user_score:117 user_id:18 user_score:118 user_id:19 user_score:119 ] Process finished with exit code 0
2,服務端流
假設 客戶端一次性發送6個客戶數據給服務端
再假設 服務端查詢用戶積分 有點慢。因此 采用的策略是 服務端每查詢2個就發送給客戶端
服務端:
修改users.proto
syntax="proto3"; package services; import "Models.proto"; message UserScoreRequest{ repeated UserInfo users=1; } message UserScoreResponse{ repeated UserInfo users=1; } service UserService{ rpc GetUserScore(UserScoreRequest) returns (UserScoreResponse); rpc GetUserScoreByServerStream(UserScoreRequest) returns (stream UserScoreResponse); }
處理方法:
func(*UserService) GetUserScoreByServerStream(in *UserScoreRequest,stream UserService_GetUserScoreByServerStreamServer) error { var score int32=101 users:=make([]*UserInfo,0) for index,user:=range in.Users{ user.UserScore=score score++ users=append(users,user) if (index+1) % 2==0 && index>0{ err:=stream.Send(&UserScoreResponse{Users:users}) if err!=nil{ return err } users=(users)[0:0] } time.Sleep(time.Second*1) } if len(users)>0{ err:=stream.Send(&UserScoreResponse{Users:users}) if err!=nil{ return err } } return nil }
客戶端調用:
stream,_:=userClient.GetUserScoreByServerStream(ctx,&req) for { resp, err := stream.Recv() if err == io.EOF { break } if err != nil { log.Fatal(err) } fmt.Println(resp.Users) }
打印出:
[user_id:1 user_score:101 user_id:2 user_score:102 ] [user_id:3 user_score:103 user_id:4 user_score:104 ] [user_id:5 user_score:105 ]
3,客戶端流:
客戶端流模式、場景:分批發送請求
場景: 客戶端批量查詢用戶積分 1、客戶端一次性把用戶列表發送過去(不是很多,獲取列表很快) 2、服務端查詢積分比較耗時 。 因此查到一部分 就返回一部分。 而不是 全部查完再返回給客戶端
服務端:
修改users.proto
syntax="proto3"; package services; import "Models.proto"; message UserScoreRequest{ repeated UserInfo users=1; } message UserScoreResponse{ repeated UserInfo users=1; } service UserService{ rpc GetUserScore(UserScoreRequest) returns (UserScoreResponse); rpc GetUserScoreByServerStream(UserScoreRequest) returns (stream UserScoreResponse); rpc GetUserScoreByClientStream(stream UserScoreRequest) returns (UserScoreResponse); }
新增service處理方法
func(*UserService) GetUserScoreByClientStream(stream UserService_GetUserScoreByClientStreamServer) error{ var score int32=101 users:=make([]*UserInfo,0) for{ req,err:=stream.Recv() if err==io.EOF{ //接收完了 return stream.SendAndClose(&UserScoreResponse{Users:users}) } if err!=nil{ return err } for _,user:=range req.Users{ user.UserScore=score //這里好比是服務端做的業務處理 score++ users=append(users,user) } } }
客戶端:
//客戶端流 func main(){ conn,err:=grpc.Dial(":8081",grpc.WithInsecure()) if err!=nil{ log.Fatal(err) } defer conn.Close() ctx:=context.Background() userClient:=services.NewUserServiceClient(conn) var i int32 if err!=nil{ log.Fatal(err) } stream,err:=userClient.GetUserScoreByClientStream(ctx) if err!=nil{ log.Fatal(err) } for j:=1;j<=3;j++{ req:=services.UserScoreRequest{} req.Users=make([]*services.UserInfo,0) for i=1;i<=5;i++{ //加了5條用戶信息 假設是一個耗時的過程 req.Users=append(req.Users,&services.UserInfo{UserId:i}) } err:=stream.Send(&req) if err!=nil{ log.Println(err) } } res,_:=stream.CloseAndRecv() fmt.Println(res.Users) }
go build server.go
go build main.go
[user_id:1 user_score:101 user_id:2 user_score:102 user_id:3 user_score:103 user_id:4 user_score:104 user_id:5 user_score:105 user_id:1 user_score:106 user_id:2 user_score:107 user_id:3 user_score:108 user_id:4 user_score:109 user_id:5 user_score:110 user_id:1 user_score:111 user_id:2 user_score:112 user_id:3 user_score:113 user_id:4 user_score:114 user_id:5 user_score:115 ] Process finished with exit code 0
客戶端分批發送,服務端一次返回結果
雙向流模式
場景: 客戶端批量查詢用戶積分 1、客戶端分批把用戶列表發送過去(客戶端獲取列表比較慢) 2、服務端查詢積分也很慢,所以分批發送過去 此時我們可以使用 雙向流模式
服務端:
修改users.proto
rpc GetUserScoreByTWS(stream UserScoreRequest) returns (stream UserScoreResponse);

syntax="proto3"; package services; import "Models.proto"; message UserScoreRequest{ repeated UserInfo users=1; } message UserScoreResponse{ repeated UserInfo users=1; } service UserService{ rpc GetUserScore(UserScoreRequest) returns (UserScoreResponse); rpc GetUserScoreByServerStream(UserScoreRequest) returns (stream UserScoreResponse); rpc GetUserScoreByClientStream(stream UserScoreRequest) returns (UserScoreResponse); rpc GetUserScoreByTWS(stream UserScoreRequest) returns (stream UserScoreResponse); }
然后生成
cd pbfiles && protoc --go_out=plugins=grpc:../services Prod.proto protoc --go_out=plugins=grpc:../services Orders.proto protoc --go_out=plugins=grpc:../services Users.proto protoc --go_out=plugins=grpc:../services Models.proto protoc --grpc-gateway_out=logtostderr=true:../services Prod.proto protoc --grpc-gateway_out=logtostderr=true:../services Orders.proto protoc --grpc-gateway_out=logtostderr=true:../services Users.proto cd ..
處理 UserService.go
//雙向流 func(*UserService) GetUserScoreByTWS(stream UserService_GetUserScoreByTWSServer) error { var score int32=101 users:=make([]*UserInfo,0) for{ req,err:=stream.Recv() if err==io.EOF{ //接收完了 return nil } if err!=nil{ return err } for _,user:=range req.Users{ user.UserScore=score //這里好比是服務端做的業務處理 score++ users=append(users,user) } err=stream.Send(&UserScoreResponse{Users:users}) if err!=nil{ log.Println(err) } users=(users)[0:0] } }
客戶端:
//雙向流 func main(){ conn,err:=grpc.Dial(":8081",grpc.WithInsecure()) if err!=nil{ log.Fatal(err) } defer conn.Close() ctx:=context.Background() userClient:=services.NewUserServiceClient(conn) var i int32 if err!=nil{ log.Fatal(err) } stream,err:=userClient.GetUserScoreByTWS(ctx) if err!=nil{ log.Fatal(err) } var uid int32=1 for j:=1;j<=3;j++{ req:=services.UserScoreRequest{} req.Users=make([]*services.UserInfo,0) for i=1;i<=5;i++{ //加5條用戶信息 假設是一個耗時的過程 req.Users=append(req.Users,&services.UserInfo{UserId:uid}) uid++ } err:=stream.Send(&req) if err!=nil{ log.Println(err) } res,err:=stream.Recv() if err==io.EOF{ break } if err!=nil{ log.Println(err) } fmt.Println(res.Users) } }
返回結果:
[user_id:1 user_score:101 user_id:2 user_score:102 user_id:3 user_score:103 user_id:4 user_score:104 user_id:5 user_score:105 ] [user_id:6 user_score:106 user_id:7 user_score:107 user_id:8 user_score:108 user_id:9 user_score:109 user_id:10 user_score:110 ] [user_id:11 user_score:111 user_id:12 user_score:112 user_id:13 user_score:113 user_id:14 user_score:114 user_id:15 user_score:115 ] Process finished with exit code 0
可以看出,當我們生產環境中 客戶端獲取數據耗時並且服務端處理數據耗時,此時運用雙向流模式大大節省任務時間
源碼地址:
https://github.com/sunlongv520/grpc-learn
https://github.com/sunlongv520/grpc-doc