RPC 的定義這里就不再說,看文章的同學都是成熟的開發。gRPC 是 Google 開源的高性能跨語言的 RPC 方案,該框架的作者 Louis Ryan 闡述了設計這款框架的動機,有興趣的同學可以看看: gRPC的動機和設計原則 。
另一個值得一提的問題是,眾所周知 RPC 框架基本都是直接基於 TCP 協議自研數據結構和編解碼方式,但是 gRPC 卻完全不是這樣,它使用 HTTP/2 協議來傳輸數據。基於這一點來說, yRPC 肯定就不是性能最佳的那一款 RPC 框架。但是在不追求頂格 QPS 的前提下,通用性和兼容性也是不可忽略的要素。
如果要探究為什么 gRPC 要選擇使用 HTTP/2 作為底層協議,這個其實也很好解釋。HTTP 協議作為 Web 端標准協議在 Google 內被大規模廣泛使用。為了解決 1.x 的問題,Google 將自研的 SPDY 協議公開並推動基於 SPDY 的 HTTP/2 協議。所以 gRPC 從兼容性和推廣 HTTP/2 兩個角度來說有充足理由選擇 HTTP/2,何況基於 HTTP/2 的一些新特性也會讓實現方案上少寫很多代碼。
這里衍生出另一個基礎問題:既然底層使用 HTTP/2,那為啥還要用 RPC,不直接用 Restful 的方式更直接嗎。RPC 通常使用二進制編碼來壓縮消息的內容,Restful 更多的使用 JSON 格式,消息體中的冗余數據比較多,性能不如 RPC。
說了這么多題外話下面我們還是看一下作為業內使用率比較高的一款 RPC 框架是如何跑起來的。
開始前的准備
因為 gRPC 使用 Protocol Buffers 做為協議傳輸編碼格式,我們先安裝 Protocol Buffers 。具體安裝大家可以自行搜索教程,我這里使用 mac 的 brew 來安裝。
因為原生的 Protobuf 並不支持將 .proto 文件編譯為 Go 源碼,后面 Go 官方單獨開發了一款編譯插件:
go get -u github.com/golang/protobuf/protoc-gen-go
無論你是通過 go get 的方式安裝還是通過別的方式,確保它在 $GOPATH/bin
中以便編譯器protoc
能夠找到它。通過這個插件你可以將 .proto 文件編譯為 Go 文件。並且在 protoc-gen-go 插件中還提供了專門的 gRPC 編譯相關的支持,可以將 pb 文件中定義的 rpc service 方法渲染成 Go 對象。
這里對於安裝過程簡單介紹過去,因為它並不是本文介紹的要點,但是對於大家來說肯定不是那么好繞過去的🐶,安裝過程出現問題搜索一下即可,已經有人替你們踩過坑!
聲明:以下代碼都可以在 Github 倉庫中找到。
Hello World 入門
我們先定義一個用於本次測試的 pb 格式:
syntax = "proto3"; // 版本聲明,使用Protocol Buffers v3版本
option go_package = "/";
package models.pb; // 包名
service Calculate {
rpc Sum (stream SumRequest) returns (SumResponse) {}
}
message SumRequest {
int64 num = 1;
}
message SumResponse {
int64 result = 1;
}
執行如下命令生成對應的 pb 文件:
protoc --go_out=plugins=grpc:. *.proto
執行完成之后就會在當前目錄下生成 HelloWorld.pb.go 文件。
接下來開始編寫 服務端和客戶端相關的代碼。首先引入 gRPC 的包:
go get -u google.golang.org/grpc
服務端代碼如下:
package grpcTest
import (
"fmt"
"net"
"testing"
pb "gorm-demo/models/pb"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
type server struct{}
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}
func TestGrpcServer(t *testing.T) {
// 監聽本地的8972端口
lis, err := net.Listen("tcp", ":8972")
if err != nil {
fmt.Printf("failed to listen: %v", err)
return
}
s := grpc.NewServer() // 創建gRPC服務器
pb.RegisterGreeterServer(s, &server{}) // 在gRPC服務端注冊服務
reflection.Register(s) //在給定的gRPC服務器上注冊服務器反射服務
// Serve方法在lis上接受傳入連接,為每個連接創建一個ServerTransport和server的goroutine。
// 該goroutine讀取gRPC請求,然后調用已注冊的處理程序來響應它們。
err = s.Serve(lis)
if err != nil {
fmt.Printf("failed to serve: %v", err)
return
}
}
接下來是客戶端:
package grpcTest
import (
"fmt"
"testing"
pb "gorm-demo/models/pb"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
func TestGrpcClient(t *testing.T) {
// 連接服務器
conn, err := grpc.Dial(":8972", grpc.WithInsecure())
if err != nil {
fmt.Printf("faild to connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
// 調用服務端的SayHello
r, err := c.SayHello(context.Background(), &pb.HelloRequest{Name: "CN"})
if err != nil {
fmt.Printf("could not greet: %v", err)
}
fmt.Printf("Greeting: %s !\n", r.Message)
}
分別啟動服務端和客戶端程序可以看到能夠正常的收發消息。
更高端的技能
gRPC 主要有 4 種請求和響應模式,分別是簡單模式(Simple RPC)
、服務端流式(Server-side streaming RPC)
、客戶端流式(Client-side streaming RPC)
、和雙向流式(Bidirectional streaming RPC)
。
簡單模式(Simple RPC)
:客戶端發起請求並等待服務端響應,就是普通的 Ping-Pong 模式。服務端流式(Server-side streaming RPC)
:服務端發送數據,客戶端接收數據。客戶端發送請求到服務器,拿到一個流去讀取返回的消息序列。 客戶端讀取返回的流,直到里面沒有任何消息。客戶端流式(Client-side streaming RPC)
:與服務端數據流模式相反,這次是客戶端源源不斷的向服務端發送數據流,而在發送結束后,由服務端返回一個響應。雙向流式(Bidirectional streaming RPC)
:雙方使用讀寫流去發送一個消息序列,兩個流獨立操作,雙方可以同時發送和同時接收。
上面演示的代碼就是簡單模式,客戶端發起請求等待服務器回應。下面的三種數據流模式第一眼看上去的時候感覺很難理解,HTTP 協議在我們的認識中就是 Ping-Pong 模式,請求和應答。流式處理是怎么發生的。
建立在 HTTP 基本原理的基礎上, gRPC 對請求處理做了一些包裝。當一次請求的數據量過大會有兩個問題,第一是超時,第二可能超過網絡請求最大包長度限制。對於這種情況下的問題要么業務側做分解將數據拆分成多次請求返回,要么就是通過關鍵字的方式返回關鍵字由業務側根據關鍵字二次查詢詳細數據。
流式處理這個概念相當於逆天改命,跳過上面兩種基本的上層處理方案,從底層提供一次請求,多次返回的功能。客戶端發起一次流式處理請求,服務端分多次將數據分包返回給客戶端。
流式處理不是空中花園還是需要有底層支持,因為 gRPC 是基於 HTTP/2 來傳輸,HTTP/2 本身就有二進制分幀的概念。通常一個請求或響應會被分為一個或多個幀傳輸,流則表示已建立連接的虛擬通道,可以傳輸多次請求或響應。每個幀中包含 Stream Identifier,標志所屬流。HTTP/2 通過流與幀實現多路復用,對於相同域名的請求,通過 Stream Identifier 標識可在同一個流中進行,從而減少連接開銷。
了解了這些理論之后我們先來實踐一下,先看接口定義:
syntax = "proto3"; // 版本聲明,使用Protocol Buffers v3版本
option go_package = "/";
package models.pb; // 包名
service BaseService {
//計算求和的方式來測試服務端流
rpc Sum (stream SumRequest) returns (SumResponse) {}
// 服務端流式響應
rpc ServerStream (StreamRequest) returns (stream StreamResponse){}
// 客戶端流式請求
rpc ClientStream (stream StreamRequest) returns (StreamResponse){}
// 雙向流式
rpc Streaming (stream StreamRequest) returns (stream StreamResponse){}
}
message StreamRequest{
string input = 1;
}
message StreamResponse{
string output = 1;
}
message SumRequest {
int64 num = 1;
}
message SumResponse {
int64 result = 1;
}
上面這個 pb 里面定義了 4 個接口,其中第一個求和的接口就是讓你更好理解流式請求的概念,只有讀完整個流數據之后才會結束計算。我們來看一下實現。
客戶端流式響應測試 - Sum 求和:
客戶端代碼:
package normal
import (
"fmt"
"testing"
"golang.org/x/net/context"
"google.golang.org/grpc"
pb "gorm-demo/models/pb"
)
func TestGrpcClient(t *testing.T) {
// 連接服務器
conn, err := grpc.Dial(":8972", grpc.WithInsecure())
if err != nil {
fmt.Printf("faild to connect: %v", err)
}
defer conn.Close()
c := pb.NewBaseServiceClient(conn)
// 調用Sum
sumCli, err := c.Sum(context.Background())
if err != nil {
panic("sum cli err")
}
sumCli.Send(&pb.SumRequest{Num: int64(1)})
sumCli.Send(&pb.SumRequest{Num: int64(2)})
sumCli.Send(&pb.SumRequest{Num: int64(3)})
sumCli.Send(&pb.SumRequest{Num: int64(4)})
recv, err := sumCli.CloseAndRecv()
if err != nil {
fmt.Printf("send sum request err: %v", err)
}
fmt.Printf("sum = : %v !\n", recv.Result)
}
這里是客戶端調用 Sum 方法后分批多次發送請求數據給服務端,等發送完成服務端會返回一個最終的結果。
服務端代碼:
package normal
import (
"fmt"
"io"
"net"
"strconv"
"testing"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
pb "gorm-demo/models/pb"
)
type server struct{}
func TestGrpcServer(t *testing.T) {
// 監聽本地的8972端口
lis, err := net.Listen("tcp", ":8972")
if err != nil {
fmt.Printf("failed to listen: %v", err)
return
}
s := grpc.NewServer() // 創建gRPC服務器
pb.RegisterBaseServiceServer(s, &server{}) // 在gRPC服務端注冊服務
reflection.Register(s) //在給定的gRPC服務器上注冊服務器反射服務
// Serve方法在lis上接受傳入連接,為每個連接創建一個ServerTransport和server的goroutine。
// 該goroutine讀取gRPC請求,然后調用已注冊的處理程序來響應它們。
err = s.Serve(lis)
if err != nil {
fmt.Printf("failed to serve: %v", err)
return
}
}
//sum案例--客戶端流式處理
func (*server) Sum(req pb.BaseService_SumServer) (err error) {
var sum int64 = 0
for {
reqObj, err := req.Recv()
if err == io.EOF {
fmt.Printf("Recv Sum err: %v", err)
req.SendAndClose(&pb.SumResponse{Result: sum})
return nil
} else if err == nil {
fmt.Printf("get client request param = %v", reqObj.Num)
sum += reqObj.Num
} else {
return err
}
}
}
// 服務端流式處理
func (s *server) ServerStream(in *pb.StreamRequest, stream pb.BaseService_ServerStreamServer) error {
input := in.Input
for _, s := range input {
stream.Send(&pb.StreamResponse{Output: string(s)})
}
return nil
}
// 客戶端流式響應 - 服務端邏輯
func (s *server) ClientStream(stream pb.BaseService_ClientStreamServer) error {
output := ""
for {
r, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.StreamResponse{Output: output})
}
if err != nil {
fmt.Println(err)
}
output += r.Input
}
}
// 雙向流式處理
func (s *server) Streaming(stream pb.BaseService_StreamingServer) error {
for n := 0; ; {
res, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
v, _ := strconv.Atoi(res.Input)
n += v
stream.Send(&pb.StreamResponse{Output: strconv.Itoa(n)})
}
}
Sum 方法中服務端等待獲取客戶端的請求數據,直到遇到最后一個 EOF 之后將計算的結果返回給客戶端,本次請求結束。
服務端流式響應測試
與客戶端流式響應相反,服務端流式響應就是服務端持續發送數據流,客戶端接收並最終結束流。
客戶端邏輯:
package normal
import (
"fmt"
"testing"
"golang.org/x/net/context"
"google.golang.org/grpc"
pb "gorm-demo/models/pb"
)
func TestGrpcClient(t *testing.T) {
// 連接服務器
conn, err := grpc.Dial(":8972", grpc.WithInsecure())
if err != nil {
fmt.Printf("faild to connect: %v", err)
}
defer conn.Close()
c := pb.NewBaseServiceClient(conn)
clientStream(c, "我收到了服務端的請求數據拉")
}
// 客戶端
func clientStream(client pb.BaseServiceClient, input string) error {
stream, _ := client.ClientStream(context.Background())
for _, s := range input {
fmt.Println("Client Stream Send:", string(s))
err := stream.Send(&pb.StreamRequest{Input: string(s)})
if err != nil {
return err
}
}
res, err := stream.CloseAndRecv()
if err != nil {
fmt.Println(err)
}
fmt.Println("Client Stream Recv:", res.Output)
return nil
}
客戶端現在變為接收數據方。
服務端現在從客戶端收到請求之后只管去處理,處理的結果分多次發給客戶端即可:
//服務端流式處理
serverStream(c, &pb.StreamRequest{Input: "我是一只小老虎"})
// 服務端流式處理
func serverStream(client pb.BaseServiceClient, r *pb.StreamRequest) error {
fmt.Println("Server Stream Send:", r.Input)
stream, _ := client.ServerStream(context.Background(), r)
for {
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
fmt.Println("Server Stream Recv:", res.Output)
}
return nil
}
大家可以運行一下看看效果。
雙向流處理
顧名思義就是服務端和客戶端都可以發送和接收消息。
服務端代碼:
// 雙向流式處理
func (s *server) Streaming(stream pb.BaseService_StreamingServer) error {
for n := 0; ; {
res, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
v, _ := strconv.Atoi(res.Input)
n += v
stream.Send(&pb.StreamResponse{Output: strconv.Itoa(n)})
}
}
客戶端代碼:
// 雙向流式處理
func streaming(client pb.BaseServiceClient) error {
stream, _ := client.Streaming(context.Background())
for n := 0; n < 10; n++ {
fmt.Println("Streaming Send:", n)
err := stream.Send(&pb.StreamRequest{Input: strconv.Itoa(n)})
if err != nil {
return err
}
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
fmt.Println("Streaming Recv:", res.Output)
}
stream.CloseSend()
return nil
}
可以看到雙方都能接收和發送消息。
入門篇就先說這么多,都是實操案例。大家先上手看看如何玩起來再去慢慢了解更深層次的東西吧。