了解grpc/protobuf
gRPC是一個高性能、通用的開源RPC框架,其由Google主要面向移動應用開發並基於HTTP/2協議標准而設計,基於ProtoBuf(Protocol Buffers)序列化協議開發,且支持眾多開發語言。
gRPC提供了一種簡單的方法來精確地定義服務和為iOS、Android和后台支持服務自動生成可靠性很強的客戶端功能庫。客戶端充分利用高級流和鏈接功能,從而有助於節省帶寬、降低的TCP鏈接次數、節省CPU使用、和電池壽命。
Protobuf(Protocol Buffers),是 Google 開發的一種跨語言、跨平台的可擴展機制,用於序列化結構化數據。
grpc與傳統的 REST 架構相比,REST架構通過 http 傳輸 JSON 或者 XML ,會帶來了一個問題:服務 A 把原始數據編碼成 JSON/XML 格式,發送一長串字符給服務 B,B 通過解碼還原成原始數據,通信的總體數據量很大。
但在兩個微服務的通信間,我們不需要字符串中的所有數據,所以我們采用難理解但更加輕量的二進制數據進行交互。gRPC 采用的是支持二進制數據的 HTTP 2.0 規范,而protobuf負責處理二進制數據, 它更小、更快、更便捷。
protobuf 目前支持 C++、Java、Python、Objective-C,如果使用 proto3,還支持 C#、Ruby、Go、PHP、JavaScript 等語言。
官網地址:https://developers.google.cn/protocol-buffers/
GitHub 地址:https://github.com/protocolbuffers/protobuf
Grpc中文文檔:http://doc.oschina.net/grpc?t=60133
優點:
- 性能好
- 跨語言
缺點:
- 二進制格式可讀性差:為了提高性能,protobuf 采用了二進制格式進行編碼,這直接導致了可讀性差。
- 缺乏自描述:XML 是自描述的,而 protobuf 不是,不配合定義的結構體是看不出來什么作用的。
環境配置
第一步:安裝protobuf:
- 先下載protoc:https://github.com/protocolbuffers/protobuf/releases/
- 把這么文件里面的bin里面的protoc.exe 復制到GOPATH/bin下,GOPATH/bin加入環境變量。也可以放到別的目錄,把那個目錄 配置到環境變量。反正最后的效果就是 在cmd 輸入 protoc 不會報錯。
- 在cmd 中運行
go get -u github.com/golang/protobuf/protoc-gen-go
獲取protobuf的編譯器插件 protoc-gen-go。 - 到這里 protobuf 就配的差不多了,
protoc --go_out=plugins=grpc:. xxxx.proto
這條命令就是 編譯 協議的,具體怎么使用我們以后說。
第二步:安裝grpc:
- 第一種方法:官方的安裝方法是
go get -u google.golang.org/grpc
,但是需要翻牆。 - 第二種方法:從git上 克隆 grpc 的各種 依賴庫 ,然后 移到我們 的 GOPATH 目錄下面。(網上找的代碼,親測有效)。
git clone https://github.com/grpc/grpc-go.git $GOPATH/src/google.golang.org/grpc git clone https://github.com/golang/net.git $GOPATH/src/golang.org/x/net git clone https://github.com/golang/text.git $GOPATH/src/golang.org/x/text git clone https://github.com/google/go-genproto.git $GOPATH/src/google.golang.org/genproto cd $GOPATH/src/
第三步:安裝gRPC運行時接口編解碼支持庫
1
|
go
get -u github.com/golang/protobuf/proto
|
注意:上面這個文件也需要移到src目錄下。
調用過程
1、客戶端(gRPC Stub)調用 A 方法,發起 RPC 調用。
2、對請求信息使用 Protobuf 進行對象序列化壓縮(IDL)。
3、服務端(gRPC Server)接收到請求后,解碼請求體,進行業務邏輯處理並返回。
4、對響應結果使用 Protobuf 進行對象序列化壓縮(IDL)。
5、客戶端接受到服務端響應,解碼請求體。回調被調用的 A 方法,喚醒正在等待響應(阻塞)的客戶端調用並返回響應結果。
調用方式
一、Unary RPC:一元 RPC
Server
type SearchService struct{} func (s *SearchService) Search(ctx context.Context, r *pb.SearchRequest) (*pb.SearchResponse, error) { return &pb.SearchResponse{Response: r.GetRequest() + " Server"}, nil } const PORT = "9001" func main() { server := grpc.NewServer() pb.RegisterSearchServiceServer(server, &SearchService{}) lis, err := net.Listen("tcp", ":"+PORT) ... server.Serve(lis) }
-
創建 gRPC Server 對象,你可以理解為它是 Server 端的抽象對象。
-
將 SearchService(其包含需要被調用的服務端接口)注冊到 gRPC Server。的內部注冊中心。這樣可以在接受到請求時,通過內部的 “服務發現”,發現該服務端接口並轉接進行邏輯處理。
-
創建 Listen,監聽 TCP 端口。
-
gRPC Server 開始 lis.Accept,直到 Stop 或 GracefulStop。
Client
func main() { conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure()) ... defer conn.Close() client := pb.NewSearchServiceClient(conn) resp, err := client.Search(context.Background(), &pb.SearchRequest{ Request: "gRPC", }) ... }
-
創建與給定目標(服務端)的連接句柄。
-
創建 SearchService 的客戶端對象。
-
發送 RPC 請求,等待同步響應,得到回調后返回響應結果。
二、Server-side streaming RPC:服務端流式 RPC
Server
func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error { for n := 0; n <= 6; n++ { stream.Send(&pb.StreamResponse{ Pt: &pb.StreamPoint{ ... }, }) } return nil }
Client
func printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error { stream, err := client.List(context.Background(), r) ... for { resp, err := stream.Recv() if err == io.EOF { break } ... } return nil }
三、Client-side streaming RPC:客戶端流式 RPC
Server
func (s *StreamService) Record(stream pb.StreamService_RecordServer) error { for { r, err := stream.Recv() if err == io.EOF { return stream.SendAndClose(&pb.StreamResponse{Pt: &pb.StreamPoint{...}}) } ... } return nil }
Client
func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error { stream, err := client.Record(context.Background()) ... for n := 0; n < 6; n++ { stream.Send(r) } resp, err := stream.CloseAndRecv() ... return nil }
四、Bidirectional streaming RPC:雙向流式 RPC
Server
func (s *StreamService) Route(stream pb.StreamService_RouteServer) error { for { stream.Send(&pb.StreamResponse{...}) r, err := stream.Recv() if err == io.EOF { return nil } ... } return nil }
Client
func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error { stream, err := client.Route(context.Background()) ... for n := 0; n <= 6; n++ { stream.Send(r) resp, err := stream.Recv() if err == io.EOF { break } ... } stream.CloseSend() return nil }
淺談理解
服務端
為什么四行代碼,就能夠起一個 gRPC Server,內部做了什么邏輯。你有想過嗎?接下來我們一步步剖析,看看里面到底是何方神聖。
一、初始化
// grpc.NewServer() func NewServer(opt ...ServerOption) *Server { opts := defaultServerOptions for _, o := range opt { o(&opts) } s := &Server{ lis: make(map[net.Listener]bool), opts: opts, conns: make(map[io.Closer]bool), m: make(map[string]*service), quit: make(chan struct{}), done: make(chan struct{}), czData: new(channelzData), } s.cv = sync.NewCond(&s.mu) ... return s }
這塊比較簡單,主要是實例 grpc.Server 並進行初始化動作。涉及如下:
-
lis:監聽地址列表。
-
opts:服務選項,這塊包含 Credentials、Interceptor 以及一些基礎配置。
-
conns:客戶端連接句柄列表。
-
m:服務信息映射。
-
quit:退出信號。
-
done:完成信號。
-
czData:用於存儲 ClientConn,addrConn 和 Server 的channelz 相關數據。
-
cv:當優雅退出時,會等待這個信號量,直到所有 RPC 請求都處理並斷開才會繼續處理。
二、注冊
pb.RegisterSearchServiceServer(server, &SearchService{})
步驟一:Service API interface
// search.pb.go type SearchServiceServer interface { Search(context.Context, *SearchRequest) (*SearchResponse, error) } func RegisterSearchServiceServer(s *grpc.Server, srv SearchServiceServer) { s.RegisterService(&_SearchService_serviceDesc, srv) }
還記得我們平時編寫的 Protobuf 嗎?在生成出來的.pb.go
文件中,會定義出 Service APIs interface 的具體實現約束。而我們在 gRPC Server 進行注冊時,會傳入應用 Service 的功能接口實現,此時生成的RegisterServer
方法就會保證兩者之間的一致性。
步驟二:Service API IDL
你想亂傳糊弄一下?不可能的,請乖乖定義與 Protobuf 一致的接口方法。但是那個&_SearchService_serviceDesc
又有什么作用呢?代碼如下:
// search.pb.go var _SearchService_serviceDesc = grpc.ServiceDesc{ ServiceName: "proto.SearchService", HandlerType: (*SearchServiceServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "Search", Handler: _SearchService_Search_Handler, }, }, Streams: []grpc.StreamDesc{}, Metadata: "search.proto", }
這看上去像服務的描述代碼,用來向內部表述 “我” 都有什么。涉及如下:
-
ServiceName:服務名稱
-
HandlerType:服務接口,用於檢查用戶提供的實現是否滿足接口要求
-
Methods:一元方法集,注意結構內的
Handler
方法,其對應最終的 RPC 處理方法,在執行 RPC 方法的階段會使用。 -
Streams:流式方法集
-
Metadata:元數據,是一個描述數據屬性的東西。在這里主要是描述
SearchServiceServer
服務
步驟三:Register Service
func (s *Server) register(sd *ServiceDesc, ss interface{}) { ... srv := &service{ server: ss, md: make(map[string]*MethodDesc), sd: make(map[string]*StreamDesc), mdata: sd.Metadata, } for i := range sd.Methods { d := &sd.Methods[i] srv.md[d.MethodName] = d } for i := range sd.Streams { ... } s.m[sd.ServiceName] = srv }
在最后一步中,我們會將先前的服務接口信息、服務描述信息給注冊到內部service
去,以便於后續實際調用的使用。涉及如下:
-
server:服務的接口信息
-
md:一元服務的 RPC 方法集
-
sd:流式服務的 RPC 方法集
-
mdata:metadata,元數據
小結
在這一章節中,主要介紹的是 gRPC Server 在啟動前的整理和注冊行為,看上去很簡單,但其實一切都是為了后續的實際運行的預先准備。因此我們整理一下思路,將其串聯起來看看,如下:
三、監聽
接下來到了整個流程中,最重要也是大家最關注的監聽/處理階段,核心代碼如下:
func (s *Server) Serve(lis net.Listener) error { ... var tempDelay time.Duration for { rawConn, err := lis.Accept() if err != nil { if ne, ok := err.(interface { Temporary() bool }); ok && ne.Temporary() { if tempDelay == 0 { tempDelay = 5 * time.Millisecond } else { tempDelay *= 2 } if max := 1 * time.Second; tempDelay > max { tempDelay = max } ... timer := time.NewTimer(tempDelay) select { case <-timer.C: case <-s.quit: timer.Stop() return nil } continue } ... return err } tempDelay = 0 s.serveWG.Add(1) go func() { s.handleRawConn(rawConn) s.serveWG.Done() }() } }
Serve 會根據外部傳入的 Listener 不同而調用不同的監聽模式,這也是net.Listener
的魅力,靈活性和擴展性會比較高。而在 gRPC Server 中最常用的就是TCPConn
,基於 TCP Listener 去做。接下來我們一起看看具體的處理邏輯,如下:
-
循環處理連接,通過
lis.Accept
取出連接,如果隊列中沒有需處理的連接時,會形成阻塞等待。 -
若
lis.Accept
失敗,則觸發休眠機制,若為第一次失敗那么休眠 5ms,否則翻倍,再次失敗則不斷翻倍直至上限休眠時間 1s,而休眠完畢后就會嘗試去取下一個 “它”。 -
若
lis.Accept
成功,則重置休眠的時間計數和啟動一個新的 goroutine 調用handleRawConn
方法去執行/處理新的請求,也就是大家很喜歡說的 “每一個請求都是不同的 goroutine 在處理”。 -
在循環過程中,包含了 “退出” 服務的場景,主要是硬關閉和優雅重啟服務兩種情況。
客戶端
一、創建撥號連接
// grpc.Dial(":"+PORT, grpc.WithInsecure()) func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { cc := &ClientConn{ target: target, csMgr: &connectivityStateManager{}, conns: make(map[*addrConn]struct{}), dopts: defaultDialOptions(), blockingpicker: newPickerWrapper(), czData: new(channelzData), firstResolveEvent: grpcsync.NewEvent(), } ... chainUnaryClientInterceptors(cc) chainStreamClientInterceptors(cc) ... }
grpc.Dial
方法實際上是對於grpc.DialContext
的封裝,區別在於ctx
是直接傳入context.Background
。其主要功能是創建與給定目標的客戶端連接,其承擔了以下職責:
-
初始化 ClientConn
-
初始化(基於進程 LB)負載均衡配置
-
初始化 channelz
-
初始化重試規則和客戶端一元/流式攔截器
-
初始化協議棧上的基礎信息
-
相關 context 的超時控制
-
初始化並解析地址信息
-
創建與服務端之間的連接
連沒連
之前聽到有的人說調用grpc.Dial
后客戶端就已經與服務端建立起了連接,但這對不對呢?我們先鳥瞰全貌,看看正在跑的 goroutine。如下:
我們可以有幾個核心方法一直在等待/處理信號,通過分析底層源碼可得知。涉及如下:
func (ac *addrConn) connect()
func (ac *addrConn) resetTransport()
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time)
func (ac *addrConn) getReadyTransport()
在這里主要分析 goroutine 提示的resetTransport
方法,看看都做了啥。核心代碼如下:
func (ac *addrConn) resetTransport() { for i := 0; ; i++ { if ac.state == connectivity.Shutdown { return } ... connectDeadline := time.Now().Add(dialDuration) ac.updateConnectivityState(connectivity.Connecting) newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline) if err != nil { if ac.state == connectivity.Shutdown { return } ac.updateConnectivityState(connectivity.TransientFailure) timer := time.NewTimer(backoffFor) select { case <-timer.C: ... } continue } if ac.state == connectivity.Shutdown { newTr.Close() return } ... if !healthcheckManagingState { ac.updateConnectivityState(connectivity.Ready) } ... if ac.state == connectivity.Shutdown { return } ac.updateConnectivityState(connectivity.TransientFailure) } }
在該方法中會不斷地去嘗試創建連接,若成功則結束。否則不斷地根據Backoff
算法的重試機制去嘗試創建連接,直到成功為止。從結論上來講,單純調用DialContext
是異步建立連接的,也就是並不是馬上生效,處於Connecting
狀態,而正式下要到達Ready
狀態才可用。
二、實例化 Service API
type SearchServiceClient interface { Search(ctx context.Context, in *SearchRequest, opts ...grpc.CallOption) (*SearchResponse, error) } type searchServiceClient struct { cc *grpc.ClientConn } func NewSearchServiceClient(cc *grpc.ClientConn) SearchServiceClient { return &searchServiceClient{cc} }
這塊就是實例 Service API interface,比較簡單。
三、調用
// search.pb.go func (c *searchServiceClient) Search(ctx context.Context, in *SearchRequest, opts ...grpc.CallOption) (*SearchResponse, error) { out := new(SearchResponse) err := c.cc.Invoke(ctx, "/proto.SearchService/Search", in, out, opts...) if err != nil { return nil, err } return out, nil }
proto 生成的 RPC 方法更像是一個包裝盒,把需要的東西放進去,而實際上調用的還是grpc.invoke
方法。如下:
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error { cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...) if err != nil { return err } if err := cs.SendMsg(req); err != nil { return err } return cs.RecvMsg(reply) }
通過概覽,可以關注到三塊調用。如下:
-
newClientStream:獲取傳輸層 Trasport 並組合封裝到 ClientStream 中返回,在這塊會涉及負載均衡、超時控制、 Encoding、 Stream 的動作,與服務端基本一致的行為。
-
cs.SendMsg:發送 RPC 請求出去,但其並不承擔等待響應的功能。
-
cs.RecvMsg:阻塞等待接受到的 RPC 方法響應結果。
連接
// clientconn.go func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) { t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{ FullMethodName: method, }) if err != nil { return nil, nil, toRPCErr(err) } return t, done, nil }
在newClientStream
方法中,我們通過getTransport
方法獲取了 Transport 層中抽象出來的 ClientTransport 和 ServerTransport,實際上就是獲取一個連接給后續 RPC 調用傳輸使用。
四、關閉連接
// conn.Close() func (cc *ClientConn) Close() error { defer cc.cancel() ... cc.csMgr.updateState(connectivity.Shutdown) ... cc.blockingpicker.close() if rWrapper != nil { rWrapper.close() } if bWrapper != nil { bWrapper.close() } for ac := range conns { ac.tearDown(ErrClientConnClosing) } if channelz.IsOn() { ... channelz.AddTraceEvent(cc.channelzID, ted) channelz.RemoveEntry(cc.channelzID) } return nil }
該方法會取消 ClientConn 上下文,同時關閉所有底層傳輸。涉及如下:
-
Context Cancel
-
清空並關閉客戶端連接
-
清空並關閉解析器連接
-
清空並關閉負載均衡連接
-
添加跟蹤引用
-
移除當前通道信息
總結
-
gRPC 基於 HTTP/2 + Protobuf。
-
gRPC 有四種調用方式,分別是一元、服務端/客戶端流式、雙向流式。
-
gRPC 的附加信息都會體現在 HEADERS 幀,數據在 DATA 幀上。
-
Client 請求若使用 grpc.Dial 默認是異步建立連接,當時狀態為 Connecting。
-
Client 請求若需要同步則調用 WithBlock(),完成狀態為 Ready。
-
Server 監聽是循環等待連接,若沒有則休眠,最大休眠時間 1s;若接收到新請求則起一個新的 goroutine 去處理。
-
grpc.ClientConn 不關閉連接,會導致 goroutine 和 Memory 等泄露。
-
任何內/外調用如果不加超時控制,會出現泄漏和客戶端不斷重試。
-
特定場景下,如果不對 grpc.ClientConn 加以調控,會影響調用。
-
攔截器如果不用 go-grpc-middleware 鏈式處理,會覆蓋。
-
在選擇 gRPC 的負載均衡模式時,需要謹慎。
參考
-
http://doc.oschina.net/grpc
-
https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
-
https://juejin.im/post/5b88a4f56fb9a01a0b31a67e
-
https://www.ibm.com/developerworks/cn/web/wa-http2-under-the-hood/index.html
-
https://github.com/grpc/grpc-go/issues/1953
-
https://www.zhihu.com/question/52670041
可以拷貝的代碼見:
https://github.com/EDDYCJY/blog/blob/master/golang/gRPC/2019-06-28-talking-grpc.md
原文鏈接:https://mp.weixin.qq.com/s/qet7FX26HGnXgLIG-lOSyw