【Go】Golang實現gRPC的Proxy的原理


背景

gRPC是Google開始的一個RPC服務框架, 是英文全名為Google Remote Procedure Call的簡稱。

廣泛的應用在有RPC場景的業務系統中,一些架構中將gRPC請求都經過一個gRPC服務代理節點或網關,進行服務的權限限制,限流,服務調用監控,增加請求統計等等諸多功能。

如下以Golang和gRPC為例,簡要分析gRPC的轉發原理。

 

gRPC Proxy原理

基本原理如下

  • 基於TCP啟動一個gRPC代理服務端
  • 攔截gRPC框架的服務,能將gRPC請求的服務攔截到轉發代理的一個函數中執行。
  • 接收客戶端的請求,處理業務指標后轉發給服務端。
  • 接收服務端的響應,處理業務指標后轉發給客戶端。

基於如上原理描述,如下圖所示,gRPC的客戶端將所有的請求都發給gRPC Server Proxy,這個代理網關實現請求轉發。

將gRPC Client的請求流轉發到gRPC 服務實現的節點上。並將服務處理結果響應返回給客戶端。

 

 在這個圖中的轉發需要回答如下幾個問題

  • Proxy怎么知道哪些請求轉發到哪些服務節點上,轉發的依據是什么?
  • Proxy是否需要解析gRPC協議?
  • Proxy上沒有服務的實現,該如何轉發?

 

簡化的gRPC服務處理流程

在回答如下問題之前,我們先簡單的分析一下gRPC服務器的實現原理和流程。

  • 編寫自己的服務實現,例子中以HelloWorld為例。
  • 把自己的服務實現HelloWorldServer注冊到gRPC框架中
  • 創建一個TCP的服務端監聽
  • 基於TCP監聽啟動一個gRPC服務
  • gRPC服務接收gRPC客戶端的TCP請求
  • 解析gRPC的頭部信息,找出服務名
  • 根據服務名找到第一步注冊的服務和方法實現處理器handler
  • 處理函數執行
  • 返回處理結果

簡化的注冊服務處理器函數,啟動gRPC服務,調用請求和執行數據流圖如下所示:

 

 

 

詳細的gRPC服務運行原理

第一步,定義和編寫HelloWorld的IDL文件

syntax = "proto3";

package demoapi;


// HelloWorld Service
service HelloWorldService {
   rpc HelloWorld(HelloWorldRequest) returns (HelloWorldResponse){};
}

// Request message
message HelloWorldRequest {
   string  request = 1;
}

// Response message
message HelloWorldResponse {
   string respose = 1;
}

在這個簡單的IDL中,定義了一個HelloWorldService的gRPC服務Service,這個服務中有一個HelloWorld方法Method。

 

第二步,編譯IDL文件

將IDL的proto文件編譯成helloworld.pb.go的gRPC代碼文件。

生成的代碼文件中,我們可以看到如下信息

// Hello World的客戶端接口
type HelloWorldServiceClient interface {
    HelloWorld(ctx context.Context, in *HelloWorldRequest, opts ...grpc.CallOption) (*HelloWorldResponse, error)
}

// Hello World的服務端接口
type HelloWorldServiceServer interface {
    HelloWorld(context.Context, *HelloWorldRequest) (*HelloWorldResponse, error)
}

// HelloWorld的服務注冊處理器函數Handler
func _HelloWorldService_HelloWorld_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
    in := new(HelloWorldRequest)
    if err := dec(in); err != nil {
        return nil, err
    }
    if interceptor == nil {
        return srv.(HelloWorldServiceServer).HelloWorld(ctx, in)
    }
    info := &grpc.UnaryServerInfo{
        Server:     srv,
        FullMethod: "/demoapi.HelloWorldService/HelloWorld",
    }
    handler := func(ctx context.Context, req interface{}) (interface{}, error) {
        return srv.(HelloWorldServiceServer).HelloWorld(ctx, req.(*HelloWorldRequest))
    }
    return interceptor(ctx, in, info, handler)
}

// gRPC服務注冊的服務描述信息
// gRPC服務注冊時,會建立以ServiceName為Key,Methods為Value的一個Map映射
// Methods中的Handler就是如上的服務處理Handler
var _HelloWorldService_serviceDesc = grpc.ServiceDesc{
    ServiceName: "demoapi.HelloWorldService",
    HandlerType: (*HelloWorldServiceServer)(nil),
    Methods: []grpc.MethodDesc{
        {
            MethodName: "HelloWorld",
            Handler:    _HelloWorldService_HelloWorld_Handler,
        },
    },
    Streams:  []grpc.StreamDesc{},
    Metadata: "demoapi/HelloWorld.proto",
}

如上代碼中有如下幾個關鍵信息需要解釋

  • 服務Service名稱 demoapi.HelloWorldService,對應IDL文件的package包名.service服務名稱
  • 方法Method名稱 HelloWorld,對應IDL文件的rpc方法

 

第三步,注冊HelloWorld服務到gRPC的服務映射中

  • grpc.ServiceDesc是 gRPC服務注冊的服務描述信息。
  • gRPC服務注冊時,會建立以ServiceName為Key,包裝Methods為Value的一個Map映射m。
  • Methods中的Handler就是如上的服務處理Handler。

對應的注冊代碼如下

// 注冊gRPC服務
func RegisterHelloWorldServiceServer(s *grpc.Server, srv HelloWorldServiceServer) {
    s.RegisterService(&_HelloWorldService_serviceDesc, srv)
}

// Server is a gRPC server to serve RPC requests.
type Server struct {
       // ...
    m      map[string]*service // service name -> service info
}

// gRPC service.go的服務注冊
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
    srv := &service{
        server: ss,
        md:     make(map[string]*MethodDesc),
        sd:     make(map[string]*StreamDesc),
        mdata:  sd.Metadata,
    }
    for i := range sd.Methods {
        d := &sd.Methods[i]
        srv.md[d.MethodName] = d
    }
    for i := range sd.Streams {
        d := &sd.Streams[i]
        srv.sd[d.StreamName] = d
    }
    s.m[sd.ServiceName] = srv
}

第四步,接收客戶端gRPC請求並處理

在這一步中,會進行如下幾個步驟和函數的調用,也會回答前面的第一個問題。

  • gRPC客戶端通過TCP鏈接,連接到gRPC服務端
  • gRPC的Serve函數觸發TCP的Accept函數調用,生成一個和客戶端的網絡連接
  • grpc框架代碼執行handleRawConn方法,將這個網絡連接設置打破gRPC的傳輸層,做為網絡的讀和寫實現
  • 依次調用grpc流的handlerStream方法,用於處理gRPC數據流
  • 這個函數中會接收gRPC請求的頭信息,並解析得到服務名 如第二步中的服務名 demoapi.HelloWorldService
  • 通過如下的服務名中的方法名HelloWorld,並在Method的map中找到這個方法的處理器函數Handler,並執行這個Handler函數,實現gRPC服務的調用
  • 最后將處理結果返回

 整體的數據流整理如下: 

我們發現在gRPC框架代碼中的handleStream存在兩類服務,一類是已知服務 knownService, 第二類是unknownService

這兩個有什么區別呢?

已知服務 knownService就是gRPC服務端代碼注冊到gRPC框架中的服務,叫做已知服務,其他沒有注冊的服務叫做未知服務。

為什么我們要提到這個未知服務unknownService呢?着就是我們實現gRPC服務代碼的關鍵所在,是前面問題三的答案,

 

要實現gRPC服務代理,我們在創建grpc服務grpc.NewServer時,傳遞一個未知服務的handler,將未知服務的處理進行接管,然后通過注冊的這個Handler實現gRPC代理轉發的邏輯。

基於如下描述,gRPC代理的原理如下圖所示:

  • 創建grpc服務時,注冊一個未知服務處理器Handler和一個自定義的編碼Codec編碼和解碼,此處使用proto標准的Codec(回答前面第二個問題)
  • 這個handle給業務方預留一個director的接口,用於代理重定向轉發的grpc連接獲取,這樣proxy就可以通過redirector得到gRPCServer的grpc連接。
  • proxy接收gRPC客戶端的連接,並使用gRPC的RecvMsg方法,接收客戶端的消息請求
  • proxy將接收到的gRPC客戶端消息請求,通過SendHeader和SendMsg方法發送給gRPC服務端。
  • 同樣的方法,RecvMsg接收gRPC服務端的響應消息,使用SendMsg發送給gRPC客戶端。
  • 至此gRPC代碼服務就完成了消息的轉發功能,企業的限流,權限等功能可以通過轉發的功能進行攔截處理。

gRPC Proxy的實現邏輯如下圖所示:

 gRPC 代理服務的關鍵代碼如下所示:

服務端到客戶端的轉發

// 轉發服務端的數據流到客戶端
func (s *handler) forwardServerToClient(src grpc.ServerStream, dst grpc.ClientStream) chan error {
    ret := make(chan error, 1)
    go func() {
        f := &frame{}
        for i := 0; ; i++ {
            if err := src.RecvMsg(f); err != nil {
                ret <- err // this can be io.EOF which is happy case
                break
            }
            if err := dst.SendMsg(f); err != nil {
                ret <- err
                break
            }
        }
    }()
    return ret
}

客戶端到服務端的轉發

// 轉發客戶端的數據流到服務端
func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error {
    ret := make(chan error, 1)
    go func() {
        f := &frame{}
        for i := 0; ; i++ {
            if err := src.RecvMsg(f); err != nil {
                ret <- err // this can be io.EOF which is happy case
                break
            }
            if i == 0 {
                // This is a bit of a hack, but client to server headers are only readable after first client msg is
                // received but must be written to server stream before the first msg is flushed.
                // This is the only place to do it nicely.
                md, err := src.Header()
                if err != nil {
                    ret <- err
                    break
                }
                if err := dst.SendHeader(md); err != nil {
                    ret <- err
                    break
                }
            }
            if err := dst.SendMsg(f); err != nil {
                ret <- err
                break
            }
        }
    }()
    return ret
}

 

參考材料

https://github.com/grpc/grpc

https://github.com/mwitkow/grpc-proxy

 

done。

祝玩的開心~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM